input fluent codec 테스트 하기
multi line을 하나의 출력으로 정의하는 형태에서 적합한 log 통합은 무엇인지 탐색하는 과정입니다.
log info # ----------------------
log info # | id | ps |
log info # | name | awesome |
log info # ----------------------
아래 표현이 현재 생각하고 있는 흐름입니다.
apache ->
filebeat ->
java -> logstash -> elasticsearch
-> kafka
codec fluent type
elastic stack logstach 공식 문서에선
codec fluent ruby를 예제로 보여주고 있는데요.
이걸 자바로 변경해서 사용해보았습니다.
ELK는 설치된 상태로 가정하고 있습니다.
fluent-logger-java : mvnrepository
gradle
// https://mvnrepository.com/artifact/org.fluentd/fluent-logger
compile group: 'org.fluentd', name: 'fluent-logger', version: '0.3.4'
maven
<!-- https://mvnrepository.com/artifact/org.fluentd/fluent-logger -->
<dependency>
<groupId>org.fluentd</groupId>
<artifactId>fluent-logger</artifactId>
<version>0.3.4</version>
</dependency>
logstash 설정 파일 생성
logstash-codec-fluent.conf
input {
tcp {
codec => fluent
type => "fluent"
port => 5001
}
}
output {
elasticsearch {
hosts => ['localhost:9200']
index => 'fluent-log'
}
}
pipeline.yml
visual studio code 기준입니다.
거의 기본값이라고 생각하시면 됩니다.
# pipeline.yml 설정
- pipeline.id: java_logback
queue.type: persisted
path.config: "../config/logstash.conf"
- pipeline.id: java_fluent
queue.type: persisted
path.config: "../config/*fluent.conf"
- pipeline.id: filebeat
queue.type: persisted
path.config: "../config/*filebeat.conf"
# logstash 실행 로그
- 붙여넣기가 되지 않아서 일부만 추가하였습니다.
Using JAVA_HOME defined java: C:\Users\user\.jdks\corretto-11.0.7
[2021-01-14T12:50:14,863][INFO ][logstash.outputs.elasticsearch][filebeat] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2021-01-14T12:50:14,868][INFO ][logstash.outputs.elasticsearch][java_fluent] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2021-01-14T12:50:14,861][INFO ][logstash.outputs.elasticsearch][java_logback] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
fluent log
[2021-01-14T12:50:15,131][WARN ][logstash.outputs.elasticsearch][java_fluent] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2021-01-14T12:50:15,207][INFO ][logstash.outputs.elasticsearch][java_fluent] ES Output version determined {:es_version=>7}
[2021-01-14T12:50:15,228][WARN ][logstash.outputs.elasticsearch][java_fluent] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2021-01-14T12:50:15,297][INFO ][logstash.outputs.elasticsearch][java_fluent] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2021-01-14T12:50:15,350][INFO ][logstash.outputs.elasticsearch][java_fluent] Using a default mapping template {:es_version=>7, :ecs_compatibility=>:disabled}
[2021-01-14T12:50:15,483][INFO ][logstash.outputs.elasticsearch][java_fluent] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
logback
[2021-01-14T12:51:15,227][WARN ][logstash.outputs.elasticsearch][java_logback] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://localhost:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://localhost:9200/][Manticore::SocketTimeout] Read timed out"}
[2021-01-14T12:51:31,023][INFO ][logstash.outputs.elasticsearch][java_logback] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
...
Pipeline started {"pipeline.id" => "java_logback" }
filebeat
[2021-01-14T12:51:31,004][INFO ][logstash.outputs.elasticsearch][filebeat] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://localhost:9200"]}
공식 문서의 ruby 예제
logger = Fluent::Logger::FluentLogger.new(nil, :host => "example.log", :port => 4000)
logger.post("some_tag", { "your" => "data", "here" => "yay!" })
- logger 객체를 생성
- post 메서드를 통해 메시지 전송
java log 보내기
import org.fluentd.logger.FluentLogger;
import org.fluentd.logger.FluentLoggerFactory;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.HashMap;
import java.util.Map;
public class LogstashSenderTests {
final FluentLogger LOG = new FluentLoggerFactory().getLogger(getClass().getName(), "localhost", 5005);
public void sendLog() {
LOG.log("forJava",
Map.of("id", "ps!!!!",
"name", "Awesome ###")
);
}
public void sendLog2() {
LOG.log("forJava",
Map.of("id", "ps!!!!",
"name", "Awesome ###",
"foramt", "자유롭군욥")
);
}
public static void main(String[] args) {
final LogstashSenderTests tests = new LogstashSenderTests();
tests.sendLog();
tests.sendLog2();
}
}
kibana 결과
출력을 보니
NoSQL 답다는 생각이 드네요.
fluent type은 한 라인이 하나의 로그이고,
NoSQL 방식이 뚜렷하고,
tags 활용이나 metadata가 복잡하지 않다는 점,
쓰기 간편하다는 점이 기억에 남습니다.
'ELK > Logstash' 카테고리의 다른 글
logstash elasticsearch에 추가하지 않기: drop (0) | 2021.01.23 |
---|---|
docker-compose logstash pipeline_id:main 오류 (0) | 2021.01.16 |