# Filebeat + Logstash User Guide
Filebeat + Logstash User Guide
This section mainly introduces the use of the data transfer tool Filebeat + Logstash:
**Before starting the docking, you need to read the **data rules first. After you are familiar with the data format and data rules of the TA, read this guide for docking.
**Data uploaded by Filebeat + Logstash must follow TA's **data format
**Note: Logstash throughput is low. If importing a large amount of historical data, it is recommended to use the **DataX engine **or **the Logbus tool
# I. Introduction of Filebeat + Logstash
The Filebeat + Logstash tool is mainly used to import log data into the TA background in real time, monitor the file flow under the server log directory, and send it to the TA background in real time when any log file under the directory has new data.
Logstash is an open source server-side data processing pipeline capable of simultaneously capturing data from multiple sources, converting it, and then sending it to your favorite "repository." Logstash official website introduction (opens new window).
Filebeat is a log data collection for local files. It can monitor log directories or specific log files (tail files). Filebeat will provide you with a lightweight method for forwarding and summarizing logs and files. Filebeat official website introduction (opens new window).
The acquisition flow chart based on Filebeat + Logstash is shown in the following figure:
# II. Filebeat + Logstash Download and Installation
Note: Logash-6.x and above, the server has a JDK environment
# 2.2 Logstash Download and Install
Refer to Logstash official website to install doc (opens new window), please select the way to download
# 2.3 Logstash-output-thinkingdata Plugin
**The latest version is **: 1.0.0
**Update time is **: 2020-06-09
# 2.3.1 Install and uninstall the logstash-output-thinkingdata plugin
This plugin checks whether the data is json data, and packs the data to send to TA
Execute under the logstash directory:
bin/logstash-plugin install logstash-output-thinkingdata
The installation takes a little time, please wait patiently after the installation is successful, execute:
bin/logstash-plugin list
If there is **logstash-output-thinkingdata **in the list, the installation is successful.
Other commands are shown below:
The upgrade plugin can perform:
bin/logstash-plugin update logstash-output-thinkingdata
Uninstalling the plugin can perform:
bin/logstash-plugin uninstall logstash-output-thinkingdata
# 2.3.2 Change Log
# v1.0.0 2020/06/09
- Receive messages passed by events in Logstash and send them to TA.
# 2.4 Filebeat Download and Install
Refer to Filebeat official installation doc (opens new window), please select the way to download.
# III. Filebeat + Logstash Instructions
# 3.1 Data Preparation
First, the data that needs to be transferred is converted into the data format of TA by ETL, and written to the local or transmitted to the Kafka cluster. If the consumer who writes to Kafka or local files using the Java server level SDK, the data is already The correct format does not need to be converted.
Determine the directory where the uploaded data files are stored, or the address and topic of Kafka, and configure the relevant configuration of Filebeat + Logstash. Filebeat + Logstash will monitor the file changes in the file directory (monitor the new file or tail existing files), or subscribe to the data in Kafka.
Do not rename the data log stored in the monitoring directory and uploaded directly. Renaming the log is equivalent to creating a new file. Filebeat may upload these files again, causing data duplication.
# 3.2 Logstash Configuration
# 3.2.1 Logstash Pipeline Configuration
Logstash supports running multiple Pipelines at the same time. Each Pipeline does not affect each other, and has its own independent input and output configuration. The configuration file of the Pipeline is located in config/pipelines.yml. If you are currently using Logstash to complete some other log collection work, you can add a Pipeline to the original Logstash to collect TA's log data and send it to TA.
The configuration of the newly added Pipeline is as follows:
# Piping Configuration for Thinkingdata
- pipeline.id: thinkingdata-output
# If the core number of upload user attributes is set to 1, if only the upload event attributes can be set to be less than or equal to the number of native CPUs
pipeline.workers: 1
# Buffer queue type used
queue.type: persisted
# Using different input and output configurations
path.config: "/home/elk/conf/ta_output.conf"
More Pipeline configuration official website: Pipeline (opens new window)
# 3.2.2 Logstash Input and Output Configuration
ta_output.conf Reference example:
** Note: The scene here is a file based on the data generated by the SDK, and the output and input configuration of Logstash **
# Use beats as input
input {
beats {
port => "5044"
}
}
# If it is not the generation of service-side SDK or data in TA format that requires filter to filter metadata, it is suggested here that Ruby plug-in filter be used. There is a case in Module 4.
#filter {
# if "log4j" in [tags] {
# ruby {
# path => "/home/elk/conf/rb/log4j_filter.rb"
# }
# }
#}
# Use thinkingdata as output
output{
thinkingdata {
url => "http://Data Acquisition Address/logbus"
appid =>"Your AppID"
}
}
Thinkingdata parameter description:
Parameter name | Type | Must | Default value | Description |
---|---|---|---|---|
url | string | true | No | TA data receiving address |
appid | string | true | No | Project APPID |
flush_interval_sec | number | false | 2 | Trigger flush interval in seconds |
flush_batch_size | number | false | 500 | Json data that triggers flush, unit: bar |
compress | number | false | 1 | Compressed data, 0 means no compressed data, which can be set in the intranet; 1 means gzip compression, the default gzip compression |
uuid | boolean | false | false | Whether to turn on the UUID switch for possible deduplicate of network fluctuations at short intervals |
is_filebeat_status_record | boolean | false | true | Whether to turn on Filebeat to monitor log status, such as offset, file name, etc |
# 3.2.3 Logstash Run Configuration
Logstash uses **config/logstash.yml **as the running configuration by default.
pipeline.workers: 1
queue.type: persisted
queue.drain: true
Suggestion
- user_set When reporting user features, please change the configuration pipeline.workers to a value of 1. When the value of workers is greater than 1, the order of processing data will change. Track events can be set greater than 1.
- Ensure that the transmission of data will not be lost due to the accidental termination of the program. Please set queue.type: persisted, which represents the type of buffer queue used by Logstash, so that the data in the buffer queue can continue to be sent after restarting Logstash.
**More data about persistence can be found on the official website ** persistence-queues (opens new window)
- Setting the queue.drain value to true causes Logstash to send all data in all buffered queues before exiting normally.
**More details can be found on the official website ** logstash.yml (opens new window)
# 3.2.4 Logstash Start
In the Logstash installation directory 1. Start directly, use config/pipelines.yml as Pipeline configuration and running configuration.
bin/logstash
- Specify ta_output.conf to start the I/O configuration file, and config/logstash.yml will be used as the running configuration.
bin/logstash -f /youpath/ta_output.conf
- Backstage launch
nohup bin/logstash -f /youpath/ta_output.conf > /dev/null 2>&1 &
For more startup, you can check the official Logstash startup doc (opens new window)
# 3.3 Filebeat Configuration
# 3.3.1 Filebeat run Configuration
Filebeat reads the back-end SDK log file. The default configuration file for Filebeat is: filebeat.yml. The config/filebeat.yml reference configuration is as follows:
#======================= Filebeat inputs =========================
filebeat.shutdown_timeout: 5s
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/log.*
#- c:\programdata\elasticsearch\logs\*
#------------------------- Logstash output ----------------------------
output.logstash:
# You can fill in either a single server process or a Logstash process on multiple servers
hosts: ["ip1:5044","ip2:5044"]
loadbalance: false
- shutdown_timeout (opens new window): Filebeat waits for the publisher to finish sending the event before closing.
- **Paths **: Specifies the log to be monitored. Currently, it is processed according to the glob function of the Go language. There is no recursive processing of the configuration directory.
- **Hosts **: Send address to multiple Logstash hosts, when loadbalance is false similar to the main and standby function, true represents load balance
- **Loadbalance **: Do not set loadbalance: true if there are **user_set **user feature reports, the setting will use the polled side to send data to all Logstashes, which is likely to cause the order of the data to be disrupted.
If you only import **track **events, you can set multiple Logstash processes, and you can set loadbalance: true. The default configuration of Filebeat is loadbalance: false
Filebeat information can be viewed: Filebeat official doc (opens new window)
# 3.3.2 Filebeat start
After filebeat starts, check the relevant output information:
./filebeat -e -c filebeat.yml
Backstage launch
nohup ./filebeat -c filebeat.yml > /dev/null 2>&1 &
# IV. Reference to Case Configuration
# 4.1 Filebeat Monitors Data Configuration in Different Log Formats
The filebeat.xml of a single Filebeat can be set as follows. Random Access Memory is about 10 MB, or you can monitor different log formats from multiple filebeat processes. For details, please refer to the official website.
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
enabled: true
#Monitoring Catalog
paths:
- /home/elk/sdk/*/log.*
#Tag the data and match it with Logstash without filter processing
tags: ["sdklog"]
#Data split by special characters
- type: log
enable: true
paths: /home/txt/split.*
#Tags the data, Logstash matches, filter processing is required
tags: ["split"]
# log4j Received data
- type: log
enable: true
paths: /home/web/logs/*.log
# Filter processing required
tags: ["log4j"]
# log4j Received data
# nginx Log data
- type: log
enable: true
paths: /home/web/logs/*.log
tags: ["nginx"]
# 4.2 Logstash Configuration
Check whether the configuration file of Logstash is wrong.
bin/logstash -f /home/elk/ta_output.conf --config.test_and_exit
Note: The following script all ruby plugins are written based on ruby syntax. If you are familiar with java, you can use the custom parser in the logbus directory**.**
# 4.2.1 server level SDK log
ta_output.conf can be set as follows
input {
beats {
port => "5044"
}
}
# Use thinkingdata as output
output{
thinkingdata {
url => "url"
appid =>"appid"
# compress => 0
# uuid => true
}
}
# 4.2.2 Log4j Log
The log4j format can be set as follows, according to the business log situation:
//Log format
//[%d{yyyy-MM-dd HH:mm:ss.SSS}] The log matches the input time of TA, or it can be yyyy-MM-dd HH:mm:ss
//[%level{length=5}] log level,debug、info、warn、error
//[%thread-%tid] Current thread information
//[%logger] Current log information belongs to the full path of the class
//[%X{hostName}] Current Node Host Name. Requires definition via MDC
//[%X{ip}] Current node ip. Requires definition via MDC.
//[%X{userId}] Unique ID of user login, account_can be set Id, which can also be set to other values, account_is required in TA ID and distinct_ ID cannot be empty at the same time, you can set other properties to see the business settings. Requires definition via MDC.
//[%X{applicationName}] 当前应用程序名。需要通过MDC来自定义。
//[%F,%L,%C,%M] %F:The file (class) name to which the current log information belongs,% l: the line number of the log information in the file to which it belongs,% C: the full class name of the file to which the current log belongs,% m: the method name to which the current log belongs
//[%m] Log details
//%ex Abnormal information
//%n wrap
<property name="patternLayout">[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{userId}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
</property>
ta_output.conf configuration
input {
beats {
port => "5044"
}
}
filter {
if "log4j" in [tags]{
#Other filter data processing can also be performed
ruby {
path => "/home/conf/log4j.rb"
}
}
}
# Use thinkingdata as output
output{
thinkingdata {
url => "url"
appid =>"appid"
}
}
/home/conf/log4j.rb script is as follows
# This is where the correct data format is regularized. If the error log is recommended to be placed in a separate file, the error log is too long without analyzing the scenario and crosses
def filter(event)
_message = event.get('message') #message 是你的上传的每条日志
begin
#This is where the correct data format is regularized. If the error log is recommended to be placed in a separate file, the error log is too long without analyzing the scenario and crosses
#The data here is in a format similar to this _message ="[2020-06-08 23:19:56.003] [INFO] [main-1] [cn.thinkingdata] [x] [123.123.123.123] [x] [x] [StartupInfoLogger.java,50,o)] ## ''"
mess = /\[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] ## '(.*?)'/.match(_message)
time = mess[1]
level = mess[2]
thread = mess[3]
class_name_all = mess[4]
event_name = mess[5]
ip = mess[6]
account_id = mess[7]
application_name = mess[8]
other_mess = mess[9]
exp = mess[10]
if event_name.empty? || account_id.empty?
return []
end
properties = {
'level' => level,
'thread' => thread,
'application_name' => application_name,
'class_name_all' => class_name_all,
'other_mess' => other_mess,
'exp' => exp
}
data = {
'#ip' => ip,
'#time' => time,
'#account_id'=>account_id,
'#event_name'=>event_name, #If type is track, it can be used if #event_is not available Name does not need to be uploaded
'#type' =>'track', #It can be retrieved in a file, if any, to confirm whether the reported data is a user or event attribute
'properties' => properties
}
event.set('message',data.to_json)
return [event]
rescue
# puts _message
puts "Data does not conform to regular format"
return [] #Do not report
end
end
# 4.2.3 Nginx Log
First define the format of the Nginx log, if it is set to json format
input {
beats {
port => "5044"
}
}
filter {
#No tags need to be judged if the data is in the same format
if "nginx" in [tags]{
ruby {
path => "/home/conf/nginx.rb"
}
}
}
# Use thinkingdata as output
output{
thinkingdata {
url => "url"
appid =>"appid"
}
}
/home/conf/nginx.rb script is as follows:
require 'date'
def filter(event)
#Extract similar log information
# {"accessip_list":"124.207.82.22","client_ip":"123.207.82.22","http_host":"203.195.163.239","@timestamp":"2020-06-03T19:47:42+08:00","method":"GET","url":"/urlpath","status":"304","http_referer":"-","body_bytes_sent":"0","request_time":"0.000","user_agent":"s","total_bytes_sent":"180","server_ip":"10.104.137.230"}
logdata= event.get('message')
#Parse the log level and request time and save it to the event object
#Parsing JSON format log
#Get log content
#Convert to JSON Object
logInfoJson=JSON.parse logdata
time = DateTime.parse(logInfoJson['@timestamp']).to_time.localtime.strftime('%Y-%m-%d %H:%M:%S')
url = logInfoJson['url']
account_id = logInfoJson['user_agent']
#event_name
#Account_ ID or #distinct_ Skip reporting when ID is null at the same time
if url.empty? || url == "/" || account_id.empty?
return []
end
properties = {
"accessip_list" => logInfoJson['accessip_list'],
"http_host"=>logInfoJson['http_host'],
"method" => logInfoJson['method'],
"url"=>logInfoJson['url'],
"status" => logInfoJson['status'].to_i,
"http_referer" => logInfoJson['http_referer'],
"body_bytes_sent" =>logInfoJson['body_bytes_sent'],
"request_time" => logInfoJson['request_time'],
"total_bytes_sent" => logInfoJson['total_bytes_sent'],
"server_ip" => logInfoJson['server_ip'],
}
data = {
'#ip' => logInfoJson['client_ip'],#Can be null
'#time' => time, #can not be null
'#account_id'=>account_id, # account_id 和distinct_id can not be null at the same time
'#event_name'=>url, #If type is track, it can be used if #event_is not available Name does not need to be uploaded
'#type' =>'track', #It can be retrieved in a file, if any, to confirm whether the reported data is a user or event attribute
'properties' => properties
}
event.set('message',data.to_json)
return [event]
end
# 4.2.4 Other Logs
ta_output.conf configured as follows
input {
beats {
port => "5044"
}
}
filter {
if "other" in [tags]{
#Other filter data processing is possible
ruby {
path => "/home/conf/outher.rb"
}
}
}
# Use thinkingdata as output
output{
thinkingdata {
url => "url"
appid =>"appid"
}
}
/home/conf/outher.rb script is as follows:
# def register(params)
# # The parameters obtained here via params are via script_in the logstash file Params incoming
# #@message = params["xx"]
# end
def filter(event)
#Processing business data here, without a series of processing such as grok, get metadata directly from message for processing
begin
_message = event.get('message') #message is every log you upload
#The logs processed here are data similar to the following
#2020-06-01 22:20:11.222,123455,123.123.123.123,login,nihaoaaaaa,400
time,account_id,ip,event_name,msg,intdata=_message.split(/,/)
#Or by regularly matching or JSON data, the ruby grammar parses JSON and data can be processed and reported here
#mess = /regular/.match(_message)
#time = mess[1]
#level = mess[2]
#thread = mess[3]
#class_name_all = mess[4]
#event_name = mess[5]
#account_id = mess[6]
#api = mess[7]
properties = {
'msg' => msg,
'int_data' => intdata.to_i #to_ I is type int, to_ F is a float type, to_ S is string type (default)
}
data = {
'#time' => time,
'#account_id'=>account_id,
'#event_name'=>event_name,
'#ip' => ip,
'#type' =>'track',
'properties' => properties
}
event.set('message',data.to_json)
return [event]
rescue
#If you don't want a data past or error, this data can go back to null, such as account_ ID or distinct_ IDs are null returned directly []
return []
end
end