# Filebeat+Logstash 使用ガイド
このセクションでは、主にデータ転送ツール Filebeat+Logstash の使用方法について説明します
ドッキングを開始する前に、データルールデータルール、TA のデータ形式とデータルールに精通した後、このガイドを読んでドッキングする必要があります。
Filebeat+Logstash アップロードされたデータは、TA のデータ形式に従う
注: Logstash のスループットは低く、大量の履歴データをインポートする場合はDataX エンジンまたはLogbus ツール
# I. Filebeat+Logstash プロフィール
Filebeat+Logstash ツールは、主にログデータをリアルタイムで TA バックグラウンドにインポートし、サーバーログディレクトリの下のテキストストリームを監視し、ディレクトリの下の任意のログテキストに新しいデータが生成されたときにリアルタイムで TA バックグラウンドに送信するために使用されます。
Logstash はオープンソースのサーバー側データ処理パイプラインで、複数のソースから同時にデータを収集し、変換してお気に入りの「リポジトリ」に送信できます。Logstash 公式サイト (opens new window)
Filebeat は、ログディレクトリまたは特定のログファイルを監視するローカルドキュメントのログデータコレクターです。Filebeat は、ログとドキュメントを転送および集計するための軽量な方法を提供します。Filebeat 公式サイト (opens new window)
Filebeat+Logstash ベースの取得フローチャートを以下に示します
# II。Filebeat+Logstash のダウンロードとインストール
注意: Logstash-6. x 以上のバージョンでは、サーバーに JDK 環境
# 2.2 Logstash のダウンロードとインストール
Logstashの公式インストールドキュメント (opens new window)を参照してください
# 2.3 logstash-output-thinkingdata プラグイン
最新バージョン: 1.0.0
更新時間は: 2020-06-09
# 2.3.1 logstash-output-thinkingdata プラグインのインストールとアンインストール
このプラグインはデータが json データかどうかをチェックし、パッケージデータを TA に
logstash ディレクトリで実行:
bin/logstash-plugin install logstash-output-thinkingdata
インストールには少し時間がかかりますが、インストールが成功した後に実行するのを待ってください
bin/logstash-plugin list
リストに logstash-output-thinkingdata がある場合は、インストールが成功したことを表します。
その他のコマンドは次のとおりです。
アップグレードプラグインを実行できます:
bin/logstash-plugin update logstash-output-thinkingdata
アンインストールプラグインを実行できます:
bin/logstash-plugin uninstall logstash-output-thinkingdata
# 2.3.2 変更ログ
# v1.0.0 2020/06/09
- 受信 Logstash で event 渡した message を TA に送信
# 2.4 Filebeat のダウンロードとインストール
参照Filebeat 公式インストールドキュメント (opens new window)して、ダウンロード方法を選択してください
# III。Filebeat+Logstash 使用説明書
# 3.1 データ準備
1.まず転送するデータを ETL して TA のデータ形式、ローカルに書き込むか Kafka クラスタに転送するが、Java などのサービス側 SDK の Kafka やローカルテキストを書き込む consumer を使っている場合、データはすでに正しい形式であり、変換する必要はない。
2.アップロードされたデータのディレクトリ、または Kafka のアドレスと topic を特定し、Filebeat+Logstash の構成を設定すると、Filebeat+Logstash はディレクトリ内の変更を監視します(新規または既存のテキストを監視します)、または Kafka のデータを購読します。
3.監視ディレクトリに保存され、アップロードされたデータログの名前を直接変更しないでください。名前変更ログは新しいテキストに相当し、Filebeat はこれらのテキストを再アップロードしてデータの重複を引き起こす可能性があります。
# 3.2 Logstash の設定
# 3.2.1 Logstash パイプライン構成
Logstash は複数のパイプラインを同時に実行することをサポートしており、各パイプライン間に影響を与えず、それぞれ独立した入出力構成を持っており、パイプラインの構成テキストは config/pipelines.yml にある。現在 Logstash を使用して他のログ収集作業を行っている場合は、元の Logstash に Pipeline を追加して TA のログデータを収集し、TA に送信
新しいパイプラインの構成は次のとおりです。
# thinkingdata 的管道配置
- pipeline.id: thinkingdata-output
# 如果上传用户属性的核心数设置为1,如果只是上传事件属性可以设置小于等于本机cpu数
pipeline.workers: 1
# 使用的缓冲队列类型
queue.type: persisted
# 使用不同的输入输出配置
path.config: "/home/elk/conf/ta_output.conf"
より多くのパイプライン設定公式サイト:Pipeline (opens new window)
# 3.2.2 Logstash 入出力構成
ta_output.conf 参考例:
注:ここでシナリオは、SDK によって生成されたデータのテキスト、Logstash の出力入力構成
# 使用 beats 作为输入
input {
beats {
port => "5044"
}
}
# 如果不是服务端SDK的生成或者是符合TA格式的数据,需要filter过滤元数据,这里建议是ruby插件过滤,第四模块有案例
#filter {
# if "log4j" in [tags] {
# ruby {
# path => "/home/elk/conf/rb/log4j_filter.rb"
# }
# }
#}
# 使用 thinkingdata 作为输出
output{
thinkingdata {
url => "http://数据采集地址/logbus"
appid =>"您的AppID"
}
}
thinkingdata パラメータ説明:
パラメータ名 | タイプ | 必要です | デフォルト値 | 説明 |
---|---|---|---|---|
URL | 紐 | 本当 | なし | TAデータ受信アドレス |
appid | 紐 | 本当 | なし | プロジェクトのAPPID |
フラッシュインターバルセック | 番号 | 偽り | 2 | トリガーflush間隔、単位:秒 |
フラッシュ_バッチ_サイズ | 番号 | 偽り | 500 | flushをトリガーするjsonデータ、単位:バー |
圧縮 | 番号 | 偽り | 1 | 圧縮データ、0は非圧縮データを表し、イントラネットで設定できる1はgzip圧縮、デフォルトgzip圧縮 |
uuid | ブーリアン | 偽り | 偽り | UUIDスイッチをオンにして、短時間間隔のネットワーク変動が発生する可能性のある重複除去 |
is_filebete_状態_レコード | ブーリアン | 偽り | 本当 | Filebeatをオンにするかどうかoffset、ファイル名など |
# 3.2.3 Logstash 実行構成
Logstash はデフォルトでconfig/logstash.yml実行構成として使用します。
pipeline.workers: 1
queue.type: persisted
queue.drain: true
推奨事項
- user_set ユーザープロパティを報告するときは、構成 pipeline.workers の値を 1 に変更してください。workers の値が 1 より大きいと、データを処理する順序が変化する。track イベントは 1 より大きく設定できます。
- プログラムの予期しない終了によってデータの転送が失われないようにするには、設定してください queue.type: persisted、Logstash が使用するバッファーキューの種類を表しますので、Logstash を再起動した後もバッファーキュー内のデータを送信し続けるように構成します。
より多くのデータ持続性関連は公式サイトをご覧ください persistent-queues (opens new window)
- 設定 queue.drain の値を true に設定すると、Logstash は正常に終了する前にすべてのバッファキュー内のすべてのデータを送信します。
詳細は公式サイトをご覧ください logstash.yml (opens new window)
# 3.2.4 Logstash の起動
Logstash インストールディレクトリの下 1.パイプライン構成および実行構成として config/pipelines.yml を使用して直接起動
bin/logstash
2.指定された ta_output.conf は、実行構成として config/logstash.yml を使用して、入力および出力構成ブックとして起動します
bin/logstash -f /youpath/ta_output.conf
3.バックグラウンド起動
nohup bin/logstash -f /youpath/ta_output.conf > /dev/null 2>&1 &
より多くの起動は、Logstash公式起動ドキュメント (opens new window)
# 3.3 Filebeat の構成
# 3.3.1 Filebeat の実行構成
Filebeat はバックエンド SDK ログブックを読み込みます。Filebeat のデフォルトの設定テキストは次のとおりです filebeat.yml。config/filebeat.yml 参照構成は次のとおりです。
#======================= Filebeat inputs =========================
filebeat.shutdown_timeout: 5s
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/log.*
#- c:\programdata\elasticsearch\logs\*
#------------------------- Logstash output ----------------------------
output.logstash:
# 可填写一个服务器的进程,也可以填写多个服务器上的Logstash进程
hosts: ["ip1:5044","ip2:5044"]
loadbalance: false
- shutdown_timeout (opens new window): Filebeat は、クローズする前に、パブリッシャーがイベントの送信を完了するのを待っています。
- paths:監視するログを指定します。現在、Go 言語の glob 関数に従って処理されています。
- hosts:送信アドレスは複数の Logstash hosts で、loadbalance が false の場合はプライマリ機能と同様、true は負荷分散を表し
- loadbalance: usersetuser_setuser_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user
をインポートするだけtrackイベントをインポートするだけの場合、複数の Logstash プロセスを設定し、loadbalance: true を設定できます。Filebeat のデフォルト設定は loadbalance: false です
Filebeat 資料が閲覧可能:Filebeat 公式ドキュメント (opens new window)
# 3.3.2 Filebeat の起動
filebeat 起動後、関連する出力情報を表示:
./filebeat -e -c filebeat.yml
バックグラウンド起動
nohup ./filebeat -c filebeat.yml > /dev/null 2>&1 &
# 四、ケース配置参考
# 4.1 filebeat は異なるログフォーマットのデータ構成を監視
個々の Filebeat の filebeat.xml は以下のように設定でき、実行メモリは約 10 MB で、複数の filebeat プロセスから異なるログ形式を監視することもでき、具体的には公式サイトを参照
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
enabled: true
#监控目录
paths:
- /home/elk/sdk/*/log.*
#给数据打tags,Logstash进行匹配,不用做filter处理
tags: ["sdklog"]
#按特殊字符切分的数据
- type: log
enable: true
paths: /home/txt/split.*
#给数据打tags,Logstash进行匹配,需要filter处理
tags: ["split"]
# log4j接收的数据
- type: log
enable: true
paths: /home/web/logs/*.log
# 需要filter处理
tags: ["log4j"]
# log4j接收的数据
# nginx日志数据
- type: log
enable: true
paths: /home/web/logs/*.log
tags: ["nginx"]
# 4.2 Logstash 構成
Logstash の構成ブックが間違っているかどうか
bin/logstash -f /home/elk/ta_output.conf --config.test_and_exit
注意:次のスクリプトはすべて ruby プラグインが ruby 構文に基づいて書かれているので、java に詳しい場合は**logbus**ディレクトリの下のカスタムパーサ
# 4.2.1 サービス側 SDK ログ
ta_output.conf は以下のように設定できる
input {
beats {
port => "5044"
}
}
# 使用 thinkingdata 作为输出
output{
thinkingdata {
url => "url"
appid =>"appid"
# compress => 0
# uuid => true
}
}
# 4.2.2 log4j ログ
log4j 形式は次のように設定できます。
//日志格式
//[%d{yyyy-MM-dd HH:mm:ss.SSS}] 日志符合TA的输入时间,也可以是yyyy-MM-dd HH:mm:ss
//[%level{length=5}] 日志级别,debug、info、warn、error
//[%thread-%tid] 当前线程信息
//[%logger] 当前日志信息所属类全路径
//[%X{hostName}] 当前节点主机名。需要通过MDC来自定义。
//[%X{ip}] 当前节点ip。需要通过MDC来自定义。
//[%X{userId}] 用户登录的唯一ID,可以设置account_id,也可以设定为其他值,TA中要求account_id 和distinct_id不能同时为空,可以设置其他的属性,看业务设置。需要通过MDC来自定义。
//[%X{applicationName}] 当前应用程序名。需要通过MDC来自定义。
//[%F,%L,%C,%M] %F:当前日志信息所属的文件(类)名,%L:日志信息在所属文件中的行号,%C:当前日志所属文件的全类名,%M:当前日志所属的方法名
//[%m] 日志详情
//%ex 异常信息
//%n 换行
<property name="patternLayout">[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{userId}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
</property>
ta_output.conf 構成
input {
beats {
port => "5044"
}
}
filter {
if "log4j" in [tags]{
#也可以进行其他filter数据处理
ruby {
path => "/home/conf/log4j.rb"
}
}
}
# 使用 thinkingdata 作为输出
output{
thinkingdata {
url => "url"
appid =>"appid"
}
}
/home/conf/log4j.rb スクリプトは以下の通り
# 这里通过参数event可以获取到所有input中的属性
def filter(event)
_message = event.get('message') #message 是你的上传的每条日志
begin
#这里是正则出正确数据的格式,如果是error日志建议单独放在一个文件,错误日志过长没有分析场景,且跨行了
#这里的数据类似这样的格式 _message ="[2020-06-08 23:19:56.003] [INFO] [main-1] [cn.thinkingdata] [x] [123.123.123.123] [x] [x] [StartupInfoLogger.java,50,o)] ## ''"
mess = /\[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] ## '(.*?)'/.match(_message)
time = mess[1]
level = mess[2]
thread = mess[3]
class_name_all = mess[4]
event_name = mess[5]
ip = mess[6]
account_id = mess[7]
application_name = mess[8]
other_mess = mess[9]
exp = mess[10]
if event_name.empty? || account_id.empty?
return []
end
properties = {
'level' => level,
'thread' => thread,
'application_name' => application_name,
'class_name_all' => class_name_all,
'other_mess' => other_mess,
'exp' => exp
}
data = {
'#ip' => ip,
'#time' => time,
'#account_id'=>account_id,
'#event_name'=>event_name, #如果是type是track的时候可以用,如果没有#event_name 不用上传
'#type' =>'track', #可以在文件中获取,如果文件中有的话,确认上报数据是否为用户属性或者事件属性
'properties' => properties
}
event.set('message',data.to_json)
return [event]
rescue
# puts _message
puts "数据不符合正则格式"
return [] #不进行上报
end
end
# 4.2.3 Nginx ログ
まずは Nginx ログのフォーマットを定義し、json 形式に設定すると
input {
beats {
port => "5044"
}
}
filter {
#如果是同样的格式数据不需要判断tags
if "nginx" in [tags]{
ruby {
path => "/home/conf/nginx.rb"
}
}
}
# 使用 thinkingdata 作为输出
output{
thinkingdata {
url => "url"
appid =>"appid"
}
}
/home/conf/nginx.rb スクリプトは次のとおりです。
require 'date'
def filter(event)
#取出类似日志信息
# {"accessip_list":"124.207.82.22","client_ip":"123.207.82.22","http_host":"203.195.163.239","@timestamp":"2020-06-03T19:47:42+08:00","method":"GET","url":"/urlpath","status":"304","http_referer":"-","body_bytes_sent":"0","request_time":"0.000","user_agent":"s","total_bytes_sent":"180","server_ip":"10.104.137.230"}
logdata= event.get('message')
#解析日志级别和请求时间,并保存到event对象中
#解析json格式日志
#获取日志内容
#转成json对象
logInfoJson=JSON.parse logdata
time = DateTime.parse(logInfoJson['@timestamp']).to_time.localtime.strftime('%Y-%m-%d %H:%M:%S')
url = logInfoJson['url']
account_id = logInfoJson['user_agent']
#event_name
#account_id或者#distinct_id 同时为null时,跳过上报
if url.empty? || url == "/" || account_id.empty?
return []
end
properties = {
"accessip_list" => logInfoJson['accessip_list'],
"http_host"=>logInfoJson['http_host'],
"method" => logInfoJson['method'],
"url"=>logInfoJson['url'],
"status" => logInfoJson['status'].to_i,
"http_referer" => logInfoJson['http_referer'],
"body_bytes_sent" =>logInfoJson['body_bytes_sent'],
"request_time" => logInfoJson['request_time'],
"total_bytes_sent" => logInfoJson['total_bytes_sent'],
"server_ip" => logInfoJson['server_ip'],
}
data = {
'#ip' => logInfoJson['client_ip'],#可以为null
'#time' => time, #不可为null
'#account_id'=>account_id, # account_id 和distinct_id 不可同时为null
'#event_name'=>url, #如果是type是track的时候可以用,如果没有#event_name 不用上传
'#type' =>'track', #可以在文件中获取,如果文件中有的话,确认上报数据是否为用户属性或者事件属性
'properties' => properties
}
event.set('message',data.to_json)
return [event]
end
# 4.2.4 その他のログ
ta_output.conf 構成は以下の通り
input {
beats {
port => "5044"
}
}
filter {
if "other" in [tags]{
#也可以进行其他filter数据处理
ruby {
path => "/home/conf/outher.rb"
}
}
}
# 使用 thinkingdata 作为输出
output{
thinkingdata {
url => "url"
appid =>"appid"
}
}
/home/conf/outher.rb スクリプトは次のとおりです。
# def register(params)
# # 这里通过params获取的参数是在logstash文件中通过script_params传入的
# #@message = params["xx"]
# end
def filter(event)
#在这里处理业务数据,如果没有进行grok等一系列处理的情况下,直接在message中获取元数据进行处理
begin
_message = event.get('message') #message 是你的上传的每条日志
#这里处理的log是类似以下的数据
#2020-06-01 22:20:11.222,123455,123.123.123.123,login,nihaoaaaaa,400
time,account_id,ip,event_name,msg,intdata=_message.split(/,/)
#或者通过正则匹配数据,或者json数据,ruby语法解析json即可,可以在这里进行数据处理进行数据上报
#mess = /正则/.match(_message)
#time = mess[1]
#level = mess[2]
#thread = mess[3]
#class_name_all = mess[4]
#event_name = mess[5]
#account_id = mess[6]
#api = mess[7]
properties = {
'msg' => msg,
'int_data' => intdata.to_i #to_i是int类型 ,to_f 是float 类型 ,to_s 是string类型(默认)
}
data = {
'#time' => time,
'#account_id'=>account_id,
'#event_name'=>event_name,
'#ip' => ip,
'#type' =>'track',
'properties' => properties
}
event.set('message',data.to_json)
return [event]
rescue
#如果不想某个数据过去或者出错,这条数据可以返回空,比如account_id 或者 distinct_id 都为null 直接返回[]
return []
end
end