ELK/Logstash

logstash input codec fluent - 예제

PSAwesome 2021. 1. 14. 14:28
반응형

logstash 공식 홈페이지 이미지

 

input fluent codec 테스트 하기

 

multi line을 하나의 출력으로 정의하는 형태에서 적합한 log 통합은 무엇인지 탐색하는 과정입니다.

log info # ----------------------
log info # |    id   |      ps     |
log info # | name | awesome |
log info # ----------------------

 

아래 표현이 현재 생각하고 있는 흐름입니다.

 

apache ->

filebeat ->

java   ->     logstash  -> elasticsearch

                             -> kafka

 

codec fluent type

elastic stack logstach 공식 문서에선 

codec fluent ruby를 예제로 보여주고 있는데요.

이걸 자바로 변경해서 사용해보았습니다.

 

ELK는 설치된 상태로 가정하고 있습니다.

 

fluent-logger-java : mvnrepository

gradle

// https://mvnrepository.com/artifact/org.fluentd/fluent-logger
compile group: 'org.fluentd', name: 'fluent-logger', version: '0.3.4'

 

maven

<!-- https://mvnrepository.com/artifact/org.fluentd/fluent-logger -->
<dependency>
    <groupId>org.fluentd</groupId>
    <artifactId>fluent-logger</artifactId>
    <version>0.3.4</version>
</dependency>

 

logstash 설정 파일 생성

 

logstash-codec-fluent.conf
input {
    tcp {
    	codec => fluent
        type => "fluent"
        port => 5001
    }
}

output {
    elasticsearch {
    	hosts => ['localhost:9200']
        index => 'fluent-log'
    }
}

 

pipeline.yml

 

visual studio code 기준입니다.

거의 기본값이라고 생각하시면 됩니다.

 

# pipeline.yml 설정

- pipeline.id: java_logback
  queue.type: persisted
  path.config: "../config/logstash.conf"

- pipeline.id: java_fluent
  queue.type: persisted
  path.config: "../config/*fluent.conf"

- pipeline.id: filebeat
  queue.type: persisted
  path.config: "../config/*filebeat.conf"

 

# logstash 실행 로그

- 붙여넣기가 되지 않아서 일부만 추가하였습니다.

더보기

Using JAVA_HOME defined java: C:\Users\user\.jdks\corretto-11.0.7 
[2021-01-14T12:50:14,863][INFO ][logstash.outputs.elasticsearch][filebeat] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}} 
[2021-01-14T12:50:14,868][INFO ][logstash.outputs.elasticsearch][java_fluent] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}} 
[2021-01-14T12:50:14,861][INFO ][logstash.outputs.elasticsearch][java_logback] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}} 

fluent log

더보기

[2021-01-14T12:50:15,131][WARN ][logstash.outputs.elasticsearch][java_fluent] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2021-01-14T12:50:15,207][INFO ][logstash.outputs.elasticsearch][java_fluent] ES Output version determined {:es_version=>7}
[2021-01-14T12:50:15,228][WARN ][logstash.outputs.elasticsearch][java_fluent] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2021-01-14T12:50:15,297][INFO ][logstash.outputs.elasticsearch][java_fluent] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2021-01-14T12:50:15,350][INFO ][logstash.outputs.elasticsearch][java_fluent] Using a default mapping template {:es_version=>7, :ecs_compatibility=>:disabled}
[2021-01-14T12:50:15,483][INFO ][logstash.outputs.elasticsearch][java_fluent] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}

 

 

logback

더보기

[2021-01-14T12:51:15,227][WARN ][logstash.outputs.elasticsearch][java_logback] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://localhost:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://localhost:9200/][Manticore::SocketTimeout] Read timed out"}

[2021-01-14T12:51:31,023][INFO ][logstash.outputs.elasticsearch][java_logback] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
...

Pipeline started {"pipeline.id" => "java_logback" }

 

filebeat

더보기

[2021-01-14T12:51:31,004][INFO ][logstash.outputs.elasticsearch][filebeat] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://localhost:9200"]}

 

 

공식 문서의 ruby 예제

logger = Fluent::Logger::FluentLogger.new(nil, :host => "example.log", :port => 4000)
logger.post("some_tag", { "your" => "data", "here" => "yay!" })
  • logger 객체를 생성
  • post 메서드를 통해 메시지 전송

 

 

java log 보내기

import org.fluentd.logger.FluentLogger;
import org.fluentd.logger.FluentLoggerFactory;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;

public class LogstashSenderTests {

  final FluentLogger LOG = new FluentLoggerFactory().getLogger(getClass().getName(), "localhost", 5005);

  public void sendLog() {
    LOG.log("forJava",
            Map.of("id", "ps!!!!",
                    "name", "Awesome ###")
    );
  }
  
  public void sendLog2() {
  	LOG.log("forJava",
    		Map.of("id", "ps!!!!",
                    "name", "Awesome ###",
                    "foramt", "자유롭군욥")
    );
  }

  public static void main(String[] args) {
    final LogstashSenderTests tests = new LogstashSenderTests();
    tests.sendLog();
    tests.sendLog2();
  }
}

 

 

logstash console

 

kibana 결과

전송 후 kibana 화면

 

출력을 보니 

NoSQL 답다는 생각이 드네요.

 

fluent type은 한 라인이 하나의 로그이고,

NoSQL 방식이 뚜렷하고,

tags 활용이나 metadata가 복잡하지 않다는 점,

쓰기 간편하다는 점이 기억에 남습니다.

 

 

반응형