目录
此内容是否有帮助?

# Filebeat + Logstash User Guide

Filebeat + Logstash User Guide

This section mainly introduces the use of the data transfer tool Filebeat + Logstash:

**Before starting the docking, you need to read the **data rules first. After you are familiar with the data format and data rules of the TA, read this guide for docking.

**Data uploaded by Filebeat + Logstash must follow TA's **data format

**Note: Logstash throughput is low. If importing a large amount of historical data, it is recommended to use the **DataX engine **or **the Logbus tool

# I. Introduction of Filebeat + Logstash

The Filebeat + Logstash tool is mainly used to import log data into the TA background in real time, monitor the file flow under the server log directory, and send it to the TA background in real time when any log file under the directory has new data.

Logstash is an open source server-side data processing pipeline capable of simultaneously capturing data from multiple sources, converting it, and then sending it to your favorite "repository." Logstash official website introduction (opens new window).

Filebeat is a log data collection for local files. It can monitor log directories or specific log files (tail files). Filebeat will provide you with a lightweight method for forwarding and summarizing logs and files. Filebeat official website introduction (opens new window).

The acquisition flow chart based on Filebeat + Logstash is shown in the following figure:

# II. Filebeat + Logstash Download and Installation

Note: Logash-6.x and above, the server has a JDK environment

# 2.2 Logstash Download and Install

Refer to Logstash official website to install doc (opens new window), please select the way to download

# 2.3 Logstash-output-thinkingdata Plugin

**The latest version is **: 1.0.0

**Update time is **: 2020-06-09

# 2.3.1 Install and uninstall the logstash-output-thinkingdata plugin

This plugin checks whether the data is json data, and packs the data to send to TA

Execute under the logstash directory:

bin/logstash-plugin install logstash-output-thinkingdata

The installation takes a little time, please wait patiently after the installation is successful, execute:

bin/logstash-plugin list

If there is **logstash-output-thinkingdata **in the list, the installation is successful.

Other commands are shown below:

The upgrade plugin can perform:

bin/logstash-plugin update logstash-output-thinkingdata

Uninstalling the plugin can perform:

bin/logstash-plugin uninstall logstash-output-thinkingdata

# 2.3.2 Change Log

# v1.0.0 2020/06/09
  • Receive messages passed by events in Logstash and send them to TA.

# 2.4 Filebeat Download and Install

Refer to Filebeat official installation doc (opens new window), please select the way to download.

# III. Filebeat + Logstash Instructions

# 3.1 Data Preparation

  1. First, the data that needs to be transferred is converted into the data format of TA by ETL, and written to the local or transmitted to the Kafka cluster. If the consumer who writes to Kafka or local files using the Java server level SDK, the data is already The correct format does not need to be converted.

  2. Determine the directory where the uploaded data files are stored, or the address and topic of Kafka, and configure the relevant configuration of Filebeat + Logstash. Filebeat + Logstash will monitor the file changes in the file directory (monitor the new file or tail existing files), or subscribe to the data in Kafka.

  3. Do not rename the data log stored in the monitoring directory and uploaded directly. Renaming the log is equivalent to creating a new file. Filebeat may upload these files again, causing data duplication.

# 3.2 Logstash Configuration

# 3.2.1 Logstash Pipeline Configuration

Logstash supports running multiple Pipelines at the same time. Each Pipeline does not affect each other, and has its own independent input and output configuration. The configuration file of the Pipeline is located in config/pipelines.yml. If you are currently using Logstash to complete some other log collection work, you can add a Pipeline to the original Logstash to collect TA's log data and send it to TA.

The configuration of the newly added Pipeline is as follows:

#  Piping Configuration for Thinkingdata
- pipeline.id: thinkingdata-output
  #  If the core number of upload user attributes is set to 1, if only the upload event attributes can be set to be less than or equal to the number of native CPUs
  pipeline.workers: 1
  # Buffer queue type used
  queue.type: persisted
  # Using different input and output configurations
  path.config: "/home/elk/conf/ta_output.conf"

More Pipeline configuration official website: Pipeline (opens new window)

# 3.2.2 Logstash Input and Output Configuration

ta_output.conf Reference example:

** Note: The scene here is a file based on the data generated by the SDK, and the output and input configuration of Logstash **

# Use beats as input
input {
    beats {
        port => "5044"
    }
}
# If it is not the generation of service-side SDK or data in TA format that requires filter to filter metadata, it is suggested here that Ruby plug-in filter be used. There is a case in Module 4.
#filter {
#   if "log4j" in [tags] {
#    ruby {
#      path => "/home/elk/conf/rb/log4j_filter.rb"
#    }
#  }
#}
# Use thinkingdata as output
output{
   thinkingdata {
        url => "http://Data Acquisition Address/logbus"
	   appid =>"Your AppID"
    }
}

Thinkingdata parameter description:

Parameter name
Type
Must
Default value
Description
url
string
true
No
TA data receiving address
appid
string
true
No
Project APPID
flush_interval_sec
number
false
2
Trigger flush interval in seconds
flush_batch_size
number
false
500
Json data that triggers flush, unit: bar
compress
number
false
1
Compressed data, 0 means no compressed data, which can be set in the intranet; 1 means gzip compression, the default gzip compression
uuid
boolean
false
false
Whether to turn on the UUID switch for possible deduplicate of network fluctuations at short intervals
is_filebeat_status_record
boolean
false
true
Whether to turn on Filebeat to monitor log status, such as offset, file name, etc

# 3.2.3 Logstash Run Configuration

Logstash uses **config/logstash.yml **as the running configuration by default.

pipeline.workers: 1
queue.type: persisted
queue.drain: true

Suggestion

  1. user_set When reporting user features, please change the configuration pipeline.workers to a value of 1. When the value of workers is greater than 1, the order of processing data will change. Track events can be set greater than 1.
  2. Ensure that the transmission of data will not be lost due to the accidental termination of the program. Please set queue.type: persisted, which represents the type of buffer queue used by Logstash, so that the data in the buffer queue can continue to be sent after restarting Logstash.

**More data about persistence can be found on the official website ** persistence-queues (opens new window)

  1. Setting the queue.drain value to true causes Logstash to send all data in all buffered queues before exiting normally.

**More details can be found on the official website ** logstash.yml (opens new window)

# 3.2.4 Logstash Start

In the Logstash installation directory 1. Start directly, use config/pipelines.yml as Pipeline configuration and running configuration.

bin/logstash
  1. Specify ta_output.conf to start the I/O configuration file, and config/logstash.yml will be used as the running configuration.
bin/logstash -f /youpath/ta_output.conf
  1. Backstage launch
nohup bin/logstash -f /youpath/ta_output.conf > /dev/null 2>&1 &

For more startup, you can check the official Logstash startup doc (opens new window)

# 3.3 Filebeat Configuration

# 3.3.1 Filebeat run Configuration

Filebeat reads the back-end SDK log file. The default configuration file for Filebeat is: filebeat.yml. The config/filebeat.yml reference configuration is as follows:

#======================= Filebeat inputs =========================
filebeat.shutdown_timeout: 5s
filebeat.inputs:
- type: log
  enabled: true
  paths:
     - /var/log/log.*
    #- c:\programdata\elasticsearch\logs\*
#------------------------- Logstash output ----------------------------
output.logstash:
 # You can fill in either a single server process or a Logstash process on multiple servers
  hosts: ["ip1:5044","ip2:5044"]
  loadbalance: false
  1. shutdown_timeout (opens new window): Filebeat waits for the publisher to finish sending the event before closing.
  2. **Paths **: Specifies the log to be monitored. Currently, it is processed according to the glob function of the Go language. There is no recursive processing of the configuration directory.
  3. **Hosts **: Send address to multiple Logstash hosts, when loadbalance is false similar to the main and standby function, true represents load balance
  4. **Loadbalance **: Do not set loadbalance: true if there are **user_set **user feature reports, the setting will use the polled side to send data to all Logstashes, which is likely to cause the order of the data to be disrupted.

If you only import **track **events, you can set multiple Logstash processes, and you can set loadbalance: true. The default configuration of Filebeat is loadbalance: false

Filebeat information can be viewed: Filebeat official doc (opens new window)

# 3.3.2 Filebeat start

After filebeat starts, check the relevant output information:

 ./filebeat -e -c filebeat.yml

Backstage launch

nohup ./filebeat -c filebeat.yml > /dev/null 2>&1 &

# IV. Reference to Case Configuration

# 4.1 Filebeat Monitors Data Configuration in Different Log Formats

The filebeat.xml of a single Filebeat can be set as follows. Random Access Memory is about 10 MB, or you can monitor different log formats from multiple filebeat processes. For details, please refer to the official website.

#=========================== Filebeat inputs =============================

filebeat.inputs:
- type: log
  enabled: true
  #Monitoring Catalog
  paths:
     - /home/elk/sdk/*/log.*
  #Tag the data and match it with Logstash without filter processing
  tags: ["sdklog"]
 #Data split by special characters
- type: log
  enable: true
  paths: /home/txt/split.*
  #Tags the data, Logstash matches, filter processing is required
  tags: ["split"]
 # log4j Received data
- type: log
  enable: true
  paths: /home/web/logs/*.log
  # Filter processing required
  tags: ["log4j"]
   # log4j Received data
 # nginx Log data
- type: log
  enable: true
  paths: /home/web/logs/*.log
  tags: ["nginx"]

# 4.2 Logstash Configuration

Check whether the configuration file of Logstash is wrong.

 bin/logstash -f /home/elk/ta_output.conf --config.test_and_exit

Note: The following script all ruby plugins are written based on ruby syntax. If you are familiar with java, you can use the custom parser in the logbus directory**.**

# 4.2.1 server level SDK log

ta_output.conf can be set as follows

input {
    beats {
        port => "5044"
    }
}
# Use thinkingdata as output
output{
   thinkingdata {
        url => "url"
        appid =>"appid"
        # compress => 0
        # uuid => true
    }
}

# 4.2.2 Log4j Log

The log4j format can be set as follows, according to the business log situation:

 //Log format
 //[%d{yyyy-MM-dd HH:mm:ss.SSS}] The log matches the input time of TA, or it can be yyyy-MM-dd HH:mm:ss
 //[%level{length=5}]    log level,debug、info、warn、error
 //[%thread-%tid]    Current thread information
 //[%logger] Current log information belongs to the full path of the class
 //[%X{hostName}]    Current Node Host Name. Requires definition via MDC
 //[%X{ip}]  Current node ip. Requires definition via MDC.
 //[%X{userId}]  Unique ID of user login, account_can be set Id, which can also be set to other values, account_is required in TA ID and distinct_ ID cannot be empty at the same time, you can set other properties to see the business settings. Requires definition via MDC.
 //[%X{applicationName}] 当前应用程序名。需要通过MDC来自定义。
 //[%F,%L,%C,%M] %F:The file (class) name to which the current log information belongs,% l: the line number of the log information in the file to which it belongs,% C: the full class name of the file to which the current log belongs,% m: the method name to which the current log belongs
 //[%m]  Log details
 //%ex   Abnormal information
 //%n    wrap
 <property name="patternLayout">[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{userId}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
</property>

ta_output.conf configuration

input {
    beats {
        port => "5044"
    }
}
filter {
if "log4j" in [tags]{
    #Other filter data processing can also be performed
   ruby {
     path => "/home/conf/log4j.rb"
    }
 }
}
# Use thinkingdata as output
output{
  thinkingdata {
        url => "url"
        appid =>"appid"
    }
}

/home/conf/log4j.rb script is as follows

# This is where the correct data format is regularized. If the error log is recommended to be placed in a separate file, the error log is too long without analyzing the scenario and crosses
def filter(event)
  _message = event.get('message') #message 是你的上传的每条日志
begin
  #This is where the correct data format is regularized. If the error log is recommended to be placed in a separate file, the error log is too long without analyzing the scenario and crosses
  #The data here is in a format similar to this _message ="[2020-06-08 23:19:56.003] [INFO] [main-1] [cn.thinkingdata] [x] [123.123.123.123] [x] [x] [StartupInfoLogger.java,50,o)] ## ''"
 mess = /\[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] ## '(.*?)'/.match(_message)
time = mess[1]
level = mess[2]
thread = mess[3]
class_name_all = mess[4]
event_name = mess[5]
ip = mess[6]
account_id = mess[7]
application_name = mess[8]
other_mess = mess[9]
exp = mess[10]

if event_name.empty? || account_id.empty?
  return []
end

properties = {
      'level' => level,
      'thread' => thread,
      'application_name' => application_name,
      'class_name_all' => class_name_all,
      'other_mess' => other_mess,
      'exp' => exp
}

  data = {
      '#ip' => ip,
      '#time' => time,
      '#account_id'=>account_id,
      '#event_name'=>event_name,   #If type is track, it can be used if #event_is not available Name does not need to be uploaded
      '#type' =>'track',           #It can be retrieved in a file, if any, to confirm whether the reported data is a user or event attribute
      'properties' => properties
  }

  event.set('message',data.to_json)
  return [event]
  rescue
      # puts _message
      puts "Data does not conform to regular format"
      return [] #Do not report
  end
end

# 4.2.3 Nginx Log

First define the format of the Nginx log, if it is set to json format

input {
    beats {
        port => "5044"
    }
}
filter {
#No tags need to be judged if the data is in the same format
if "nginx" in [tags]{
   ruby {
     path => "/home/conf/nginx.rb"
    }
 }
}
# Use thinkingdata as output
output{
  thinkingdata {
        url => "url"
        appid =>"appid"
    }
}

/home/conf/nginx.rb script is as follows:

  require 'date'
   def filter(event)
      #Extract similar log information
      # {"accessip_list":"124.207.82.22","client_ip":"123.207.82.22","http_host":"203.195.163.239","@timestamp":"2020-06-03T19:47:42+08:00","method":"GET","url":"/urlpath","status":"304","http_referer":"-","body_bytes_sent":"0","request_time":"0.000","user_agent":"s","total_bytes_sent":"180","server_ip":"10.104.137.230"}
      logdata= event.get('message')
       #Parse the log level and request time and save it to the event object
         #Parsing JSON format log
    #Get log content
    #Convert to JSON Object
    logInfoJson=JSON.parse logdata
    time = DateTime.parse(logInfoJson['@timestamp']).to_time.localtime.strftime('%Y-%m-%d %H:%M:%S')
    url = logInfoJson['url']
    account_id = logInfoJson['user_agent']
    #event_name
    #Account_ ID or #distinct_ Skip reporting when ID is null at the same time
    if url.empty? || url == "/" || account_id.empty?
      return []
    end
    properties = {
      "accessip_list" => logInfoJson['accessip_list'],
      "http_host"=>logInfoJson['http_host'],
      "method" => logInfoJson['method'],
      "url"=>logInfoJson['url'],
      "status" => logInfoJson['status'].to_i,
      "http_referer" => logInfoJson['http_referer'],
      "body_bytes_sent" =>logInfoJson['body_bytes_sent'],
      "request_time" =>  logInfoJson['request_time'],
      "total_bytes_sent" => logInfoJson['total_bytes_sent'],
      "server_ip" => logInfoJson['server_ip'],
    }
  data = {
      '#ip' => logInfoJson['client_ip'],#Can be null
      '#time' => time,   #can not be null
      '#account_id'=>account_id, # account_id 和distinct_id can not be null at the same time
      '#event_name'=>url,   #If type is track, it can be used if #event_is not available Name does not need to be uploaded
      '#type' =>'track',           #It can be retrieved in a file, if any, to confirm whether the reported data is a user or event attribute
      'properties' => properties
  }
   event.set('message',data.to_json)
   return [event]

  end

# 4.2.4 Other Logs

ta_output.conf configured as follows

input {
    beats {
        port => "5044"
    }
}
filter {
if "other" in [tags]{
    #Other filter data processing is possible
   ruby {
     path => "/home/conf/outher.rb"
    }
 }
}
# Use thinkingdata as output
output{
  thinkingdata {
        url => "url"
        appid =>"appid"
    }
}

/home/conf/outher.rb script is as follows:

# def register(params)
#   # The parameters obtained here via params are via script_in the logstash file Params incoming
#   #@message = params["xx"]
# end
def filter(event)
  #Processing business data here, without a series of processing such as grok, get metadata directly from message for processing
  begin
  _message = event.get('message') #message is every log you upload
  #The logs processed here are data similar to the following
  #2020-06-01 22:20:11.222,123455,123.123.123.123,login,nihaoaaaaa,400
  time,account_id,ip,event_name,msg,intdata=_message.split(/,/)
  #Or by regularly matching or JSON data, the ruby grammar parses JSON and data can be processed and reported here
  #mess = /regular/.match(_message)
  #time = mess[1]
  #level = mess[2]
  #thread = mess[3]
  #class_name_all = mess[4]
  #event_name = mess[5]
  #account_id = mess[6]
  #api = mess[7]

  properties = {
      'msg' => msg,
      'int_data' => intdata.to_i #to_ I is type int, to_ F is a float type, to_ S is string type (default)
  }
  data = {
      '#time' => time,
      '#account_id'=>account_id,
      '#event_name'=>event_name,
      '#ip' => ip,
      '#type' =>'track',
      'properties' => properties
  }
  event.set('message',data.to_json)

  return [event]
  rescue
       #If you don't want a data past or error, this data can go back to null, such as account_ ID or distinct_ IDs are null returned directly []
       return []
  end
end