menu
Is this helpful?

# Filebeat+Logstash 사용 가이드

이 섹션에서는 주로 데이터 전송 툴 Filebeat+Logstash의 사용 방법에 대해 설명합니다.

ドッキングを開始する前に、데이터 룰TE데이터 형식과 데이터 처리 규칙에 익숙해진 후, 이 가이드를 읽고 도킹해야 합니다.

LogBus업로드할 데이터는,TE데이터 형식을 따르다필수입니다

주의: LogsTEsh의 처리량은 낮으며, 대량의 과거 데이터 도입을 할 경우는DateX 엔진또는Logbus 도구

# Filebeat+Logstash 프로필

Filebeat+Logstash 툴은 주로 로그 데이터를 실시간으로 TE백그라운드에 업로드하고, 서버 로그 디렉토리 아래의 텍스트 스트림을 실시간 모니터링하며, 디렉토리 아래의 임의의 로그 텍스트에 새로운 데이터가 생성되었을 때 실시간으로 TE백그라운드로 전송하기 위해 사용됩니다.

Logstash는 오픈 소스 서버 측 데이터 처리 파이프라인으로, 다양한 소스에서 동시에 데이터를 수집하고 변환하여 선호하는 '리포지토리'로 전송할 수 있습니다. Logstash 공식 사이트 (opens new window)

Filebeat은 로그 디렉토리 또는 특정 로그 파일을 모니터링하는 로컬 문서의 로그 데이터 수집 도구입니다. Filebeat은 로그와 문서를 전송 및 집계하기 위한 경량의 방법을 제공합니다. Filebeat 공식 사이트 (opens new window)

Filebeat+Logstash 기반의 데이터 수집 플로우차트를 아래에 나타냅니다

# Filebeat+Logstash의 다운로드와 설치

주의: Logstash-6.x 버전 이상에서는 서버에JDK환경

# 2.2 Logstash의 다운로드와 설치

Logstash공시 추가 설치 문서 (opens new window)를 참조하십시오

# 2.3 logstash-output-thinkingdata 플러그인

현재 버전: 1.0.0

업그레이드 시간은: 2020-06-09

# 2.3.1 logstash-output-thinkingdata 플러그인의 설치와 삭제

이 플러그인은 데이터가 json 데이터인지를 체크하고, 패키지 데이터를 TE에

logstash 디렉터리에서 실행:

bin/logstash-plugin install logstash-output-thinkingdata

설치하는 데 약간의 시간이 걸립니다만, 설치가 성공한 후 실행하는 것을 기다려 주세요.

bin/logstaEsh-plugin list

리스트에 logstash-output-thinkingdata가 있는 이벤트는, 설치가 성공했다는 것을 나타냅니다.

다른 명령어는 다음과 같습니다.

업그레이드 플러그인을 실행할 수 있습니다:

bin/logstash-plugin update logstash-output-thinkingdata

플러그인을 삭제할 수 있습니다:

bin/logstash-plugin uninstall logstash-output-thinkingdata

# 2.3.2 자산 변경 로그

# v1.0.0 2020/06/09
  • 수신 Logstash에서 이벤트를 전송한 message를 TE에 업로드

# 2.4 Filebeat의 다운로드와 설치

Filebeat 공식 설치 문서 (opens new window)을 참조하여, 다운로드 방법을 선택해 주세요.

# Filebeat+Logstash 사용 설명서

# 3.1 데이터 준비

  1. 먼저 전송할 데이터를 ETL하여 TE의 데이터 형식, 로컬에 쓰거나 Kafka 클러스터로 전송하거나, Java 등의 서비스 측 SDK의 Kafka나 로컬 텍스트를 쓰는 consumer를 사용하는 경우, 데이터는 이미 올바른 형식으로 있으며, 변환할 필요가 없습니다.

업로드된 데이터의 디렉토리 또는 Kafka의 주소와 topic을 지정하고, Filebeat+Logstash의 설정을 구성하면, Filebeat+Logstash는 디렉토리 내의 변경 사항을 감시합니다(새로운 또는 기존의 텍스트를 감시합니다), 또는 Kafka의 데이터를 구독합니다.

3.감시 디렉토리에 다른이름으로 저장되고 업로드된 데이터 로그의 이름을 직접 변경하지 마십시오. 이름 변경 로그는 새로운 텍스트에 해당하며, Filebeat는 이러한 텍스트를 재업로드하여 데이터의 중복을 일으킬 수 있습니다.

# 3.2 Logstash의 설정

# 3.2.1 Logstash 파이프라인 설치

Logstash는 여러 파이프라인을 동시에 실행하는 것을 지원하며, 각 파이프라인 간에 영향을 주지 않고, 각각 독립된 입출력 설정을 가지고 있으며, 파이프라인의 설정 텍스트는 config/pipelines.yml에 있습니다. 현재 Logstash를 사용하여 다른 로그 수집 작업을 수행하고 있는 경우, 원래의 Logstash에 파이프라인을 추가하여 TE의 로그 데이터를 수집하고 TE에 전송합니다.

새로운 파이프라인의 설치는 다음과 같습니다.

#  Piping Configuration for Thinkingdata
- pipeline.id: thinkingdata-output
  #  If the core number of upload user attributes is set to 1, if only the upload event attributes can be set to be less than or equal to the number of native CPUs
  pipeline.workers: 1
  # Buffer queue type used
  queue.type: persisted
  # Using different input and output configurations
  path.config: "/home/elk/conf/ta_output.conf"

더 많은 파이프라인 설치 공식 사이트:Pipeline (opens new window)

# 3.2.2 Logstash 입력 및 출력 설정

TE_output.conf 참고 예:

주의: 여기에서 시나리오는,SDKLogstash의 출력 입력 구성에 의해 생성된 데이터의 텍스트

# Use beats as input
input {
    beats {
        port => "5044"
    }
}
# If it is not the generation of service-side SDK or data in TA format that requires filter to filter metadata, it is suggested here that Ruby plug-in filter be used. There is a case in Module 4.
#filter {
#   if "log4j" in [tags] {
#    ruby {
#      path => "/home/elk/conf/rb/log4j_filter.rb"
#    }
#  }
#}
# Use thinkingdata as output
output{
   thinkingdata {
        url => "http://Data Acquisition Address/logbus"
           appid =>"Your AppID"
    }
}

thinkingdaTE 파라미터 설명:

파라미터 이름 유형 필수 디폴트 값 정의
url string true No TE data receiving address
appid string true No Project APPID
flush_interval_sec number false 2 Trigger flush interval in seconds
flush_batch_size number false 500 Json data that triggers flush, unit: bar
compress number false 1 Compressed data, 0 means no compressed data, which can be set in the intranet; 1 means gzip compression, the default gzip compression
uuid boolean false false Whether to turn on the UUID switch for possible deduplicate of network fluctuations at short intervals
is_filebeat_status_record boolean false true Whether to turn on Filebeat to monitor log status, such as offset, file name, etc

# 3.2.3 Logstash 실행 설치

Logstash는 기본적으로 config/logstash.yml 실행 설정으로 사용합니다.

pipeline.workers: 1
queue.type: persisted
queue.drain: true

추천 사항

  1. 유저 속성을 보고할 때는, 구성 pipeline.workers의 값을 1로 변경해 주세요. workers의 값이 1보다 클 경우, 데이터를 처리하는 순서가 변할 수 있습니다. 맞춤형 이벤트는 1보다 크게 설정할 수 있습니다.
  2. 프로그램의 예기치 않은 종료로 데이터 전송이 손실되지 않도록 설정하려면 queue.type: persisted를 설정해 주세요. 이는 Logstash가 사용하는 버퍼 큐의 종류를 나타내며, Logstash를 재시작한 후에도 버퍼 큐 내의 데이터를 계속해서 전송하도록 구성합니다.

더 많은 데이터 지속성 관련 정보는 공식 사이트를 참조하십시오. persistent-queues (opens new window)

  1. queue.drain 값을 true로 설정하면, Logstash는 정상적으로 종료하기 전에 모든 버퍼 큐 내의 모든 데이터를 전송합니다.

액세스 상세는 공식 사이트를 확인해 주세요. logstash.yml (opens new window)

# 3.2.4 Logstash의 스타트 이벤트

Logstash 설치 디렉토리의 하위에서 1.파이프라인 설치 및 실행 설치로서 config/pipelines.yml을 사용하여 직접 시작합니다

bin/logstash
  1. 지정된 ta_output.conf는 실행 구성으로 config/logstash.yml을 사용하여 입력 및 출력 구성 북으로서 시작합니다.
bin/logstash -f /youpath/ta_output.conf

3.백그라운드 스타트

nohup bin/logstash -f /youpath/ta_output.conf > /dev/null 2>&1 &

더 많은 스타트 이벤트는 Logstash공식 스타트 이벤트 문서 (opens new window)

# 3.3 Filebeat 설정 파일

# 3.3.1 Filebeat의 실행 설치

Filebeat는 백단 SDK 로그북을 읽어들입니다. Filebeat의 기본 설정 텍스트는 다음과 같습니다 filebeat.yml. config/filebeat.yml 참조 설치는 다음과 같습니다.

#======================= Filebeat inputs =========================
filebeat.shutdown_timeout: 5s
filebeat.inputs:
- type: log
  enabled: true
  paths:
     - /var/log/log.*
    #- c:\programdata\elasticsearch\logs\*
#------------------------- Logstash output ----------------------------
output.logstash:
 # You can fill in either a single server process or a Logstash process on multiple servers
  hosts: ["ip1:5044","ip2:5044"]
  loadbalance: false
  1. shutdown_timeout (opens new window): Filebeat은, 클로즈하기 전에, 퍼블리셔가 이벤트의 전송을 완료하는 것을 기다리고 있습니다.
  2. paths: 로그를 지정합니다. 현재, Go 언어의 glob 함수에 따라 처리되고 있습니다.
  3. hosts: 보내는 사람의 이메일 주소는 여러 Logstash 호스트에서 loadbalance가 가수인 경우는 프라이머리 기능과 동일하며, true는 부하 분산을 나타냅니다.
  4. loadbalance: user_set에서 유저 속성을 전송할 경우 loadbalance : true를 설정하지 마십시오.

track 이벤트만 업로드할 경우, 여러 Logstash 프로세스를 설정하고, loadbalance: true를 설정할 수 있습니다.

Filebeat의 디폴트 설정은 loadbalance: 가수

Filebeat 데이터가 가시화 설치:Filebeat 공식 문서 (opens new window)

# 3.3.2 Filebeat의 스타트 이벤트

filebeat 시작 후 관련 출력 정보를 표시:

 ./filebeat -e -c filebeat.yml

백그라운드 스타트

nohup ./filebeat -c filebeat.yml > /dev/null 2>&1 &

# 케이스 설치 참고

# 4.1 filebeat은 다른 로그 형식의 데이터 구성을 감시

각각의 Filebeat의 filebeat.xml은 다음과 같이 설정할 수 있으며, 실행 메모리는 약 10 MB이고, 여러 filebeat 프로세스에서 다른 로그 형식을 모니터링할 수도 있습니다. 구체적으로는 공식 사이트를 참조하십시오.

#=========================== Filebeat inputs =============================

filebeat.inputs:
- type: log
  enabled: true
  #Monitoring Catalog
  paths:
     - /home/elk/sdk/*/log.*
  #Tag the data and match it with Logstash without filter processing
  tags: ["sdklog"]
 #Data split by special characters
- type: log
  enable: true
  paths: /home/txt/split.*
  #Tags the data, Logstash matches, filter processing is required
  tags: ["split"]
 # log4j Received data
- type: log
  enable: true
  paths: /home/web/logs/*.log
  # Filter processing required
  tags: ["log4j"]
   # log4j Received data
 # nginx Log data
- type: log
  enable: true
  paths: /home/web/logs/*.log
  tags: ["nginx"]

# 4.2 Logstash 설치

Logstash의 설정 파일이 틀린 데이터인지 확인합니다.

 bin/logstash -f /home/elk/ta_output.conf --config.test_and_exit

주의: 다음 스크립트는 모두 ruby 플러그인이 ruby 구문에 기초하여 작성되어 있으므로, java에 익숙한 경우는logbus디렉토리 아래의 커스텀 파서입니다.

# 4.2.1 서버 SDK 로그

ta_output.conf는 다음과 같이 설치할 수 있습니다

input {
    beats {
        port => "5044"
    }
}
# Use thinkingdata as output
output{
   thinkingdata {
        url => "url"
        appid =>"appid"
        # compress => 0
        # uuid => true
    }
}

# 4.2.2 log4j 로그인

log4j 형식은 다음과 같이 설치할 수 있습니다

 //Log format
 //[%d{yyyy-MM-dd HH:mm:ss.SSS}] The log matches the input time of TA, or it can be yyyy-MM-dd HH:mm:ss
//[%level{length=5}]    로그 레벨,debug、info、warn、error
 //[%thread-%tid]    Current thread information
 //[%logger] Current log information belongs to the full path of the class
 //[%X{hostName}]    Current Node Host Name. Requires definition via MDC
 //[%X{ip}]  Current node ip. Requires definition via MDC.
 //[%X{userId}]  Unique ID of user login, account_can be set Id, which can also be set to other values, account_is required in TA ID and distinct_ ID cannot be empty at the same time, you can set other properties to see the business settings. Requires definition via MDC.
//[%X{applicationName}] 현재 애플리케이션 명입니다. MDC를 통해 커스텀해야 합니다.
//[%F,%L,%C,%M] %F:현재 로그 정보가 속한 파일(클래스) 이름,% l: 해당 파일 내 로그 정보의 줄 번호,% C: 현재 로그가 속한 파일의 전체 클래스 이름,% m: 현재 로그가 속한 메소드 이름
 //[%m]  Log details
 //%ex   Abnormal information
 //%n    wrap
 <property name="patternLayout">[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{userId}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
</property>

ta_output.conf 설정 파일

input {
    beats {
        port => "5044"
    }
}
filter {
if "log4j" in [tags]{
    #Other filter data processing can also be performed
   ruby {
     path => "/home/conf/log4j.rb"
    }
 }
}
# Use thinkingdata as output
output{
  thinkingdata {
        url => "url"
        appid =>"appid"
    }
}

/home/conf/log4j.rb 스크립트는 다음과 같습니다

# This is where the correct data format is regularized. If the error log is recommended to be placed in a separate file, the error log is too long without analyzing the scenario and crosses
def filter(event)
_message = event.get('message') #message는 당신이 업로드한 각 로그입니다
begin
  #This is where the correct data format is regularized. If the error log is recommended to be placed in a separate file, the error log is too long without analyzing the scenario and crosses
  #The data here is in a format similar to this _message ="[2020-06-08 23:19:56.003] [INFO] [main-1] [cn.thinkingdata] [x] [123.123.123.123] [x] [x] [StartupInfoLogger.java,50,o)] ## ''"
 mess = /\[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] ## '(.*?)'/.match(_message)
time = mess[1]
level = mess[2]
thread = mess[3]
class_name_all = mess[4]
event_name = mess[5]
ip = mess[6]
account_id = mess[7]
application_name = mess[8]
other_mess = mess[9]
exp = mess[10]

if event_name.empty? || account_id.empty?
  return []
end

properties = {
      'level' => level,
      'thread' => thread,
      'application_name' => application_name,
      'class_name_all' => class_name_all,
      'other_mess' => other_mess,
      'exp' => exp
}

  data = {
      '#ip' => ip,
      '#time' => time,
      '#account_id'=>account_id,
      '#event_name'=>event_name,   #If type is track, it can be used if #event_is not available Name does not need to be uploaded
      '#type' =>'track',           #It can be retrieved in a file, if any, to confirm whether the reported data is a user or event attribute
      'properties' => properties
  }

  event.set('message',data.to_json)
  return [event]
  rescue
      # puts _message
      puts "Data does not conform to regular format"
      return [] #Do not report
  end
end

# 4.2.3 Nginx 로그

먼저 Nginx 로그의 초기화를 정의하고, json 데이터 형식으로 설치해주세요.

input {
    beats {
        port => "5044"
    }
}
filter {
#No tags need to be judged if the data is in the same format
if "nginx" in [tags]{
   ruby {
     path => "/home/conf/nginx.rb"
    }
 }
}
# Use thinkingdata as output
output{
  thinkingdata {
        url => "url"
        appid =>"appid"
    }
}

/home/conf/nginx.rb 스크립트는 다음과 같습니다

  require 'date'
   def filter(event)
      #Extract similar log information
      # {"accessip_list":"124.207.82.22","client_ip":"123.207.82.22","http_host":"203.195.163.239","@timestamp":"2020-06-03T19:47:42+08:00","method":"GET","url":"/urlpath","status":"304","http_referer":"-","body_bytes_sent":"0","request_time":"0.000","user_agent":"s","total_bytes_sent":"180","server_ip":"10.104.137.230"}
      logdata= event.get('message')
       #Parse the log level and request time and save it to the event object
         #Parsing JSON format log
    #Get log content
    #Convert to JSON Object
    logInfoJson=JSON.parse logdata
    time = DateTime.parse(logInfoJson['@timestamp']).to_time.localtime.strftime('%Y-%m-%d %H:%M:%S')
    url = logInfoJson['url']
    account_id = logInfoJson['user_agent']
    #event_name
    #Account_ ID or #distinct_ Skip reporting when ID is null at the same time
    if url.empty? || url == "/" || account_id.empty?
      return []
    end
    properties = {
      "accessip_list" => logInfoJson['accessip_list'],
      "http_host"=>logInfoJson['http_host'],
      "method" => logInfoJson['method'],
      "url"=>logInfoJson['url'],
      "status" => logInfoJson['status'].to_i,
      "http_referer" => logInfoJson['http_referer'],
      "body_bytes_sent" =>logInfoJson['body_bytes_sent'],
      "request_time" =>  logInfoJson['request_time'],
      "total_bytes_sent" => logInfoJson['total_bytes_sent'],
      "server_ip" => logInfoJson['server_ip'],
    }
  data = {
      '#ip' => logInfoJson['client_ip'],#Can be null
      '#time' => time,   #can not be null
'#account_id'=>account_id, # account_id 와 distinct_id 는 동시에 not null이면 안 됩니다
      '#event_name'=>url,   #If type is track, it can be used if #event_is not available Name does not need to be uploaded
      '#type' =>'track',           #It can be retrieved in a file, if any, to confirm whether the reported data is a user or event attribute
      'properties' => properties
  }
   event.set('message',data.to_json)
   return [event]

  end

# 4.2.4 기타 로그인

ta_output.conf 설정 파일은 다음과 같이 설치할 수 있습니다

input {
    beats {
        port => "5044"
    }
}
filter {
if "other" in [tags]{
    #Other filter data processing is possible
   ruby {
     path => "/home/conf/outher.rb"
    }
 }
}
# Use thinkingdata as output
output{
  thinkingdata {
        url => "url"
        appid =>"appid"
    }
}

/home/conf/outher.rb 스크립트는 다음과 같습니다.

# def register(params)
#   # The parameters obtained here via params are via script_in the logstash file Params incoming
#   #@message = params["xx"]
# end
def filter(event)
  #Processing business data here, without a series of processing such as grok, get metadata directly from message for processing
  begin
  _message = event.get('message') #message is every log you upload
  #The logs processed here are data similar to the following
  #2020-06-01 22:20:11.222,123455,123.123.123.123,login,nihaoaaaaa,400
  time,account_id,ip,event_name,msg,intdata=_message.split(/,/)
  #Or by regularly matching or JSON data, the ruby grammar parses JSON and data can be processed and reported here
  #mess = /regular/.match(_message)
  #time = mess[1]
  #level = mess[2]
  #thread = mess[3]
  #class_name_all = mess[4]
  #event_name = mess[5]
  #account_id = mess[6]
  #api = mess[7]

  properties = {
      'msg' => msg,
      'int_data' => intdata.to_i #to_ I is type int, to_ F is a float type, to_ S is string type (default)
  }
  data = {
      '#time' => time,
      '#account_id'=>account_id,
      '#event_name'=>event_name,
      '#ip' => ip,
      '#type' =>'track',
      'properties' => properties
  }
  event.set('message',data.to_json)

  return [event]
  rescue
       #If you don't want a data past or error, this data can go back to null, such as account_ ID or distinct_ IDs are null returned directly []
       return []
  end
end