# Filebeat+Logstash使用ガイド
このセクションでは、主にデータ転送ツールFilebeat+Logstashの使用方法について説明します
ドッキングを開始する前に、データルールデータルール、TAのデータ形式とデータルールに精通した後、このガイドを読んでドッキングする必要があります。
Filebeat+Logstashアップロードされたデータは、TAのデータ形式に従う
注: Logstashのスループットは低く、大量の履歴データをインポートする場合はDataXエンジンまたはLogbusツール
# I. Filebeat+Logstashプロフィール
Filebeat+Logstashツールは、主にログデータをリアルタイムでTAバックグラウンドにインポートし、サーバーログディレクトリの下のテキストストリームを監視し、ディレクトリの下の任意のログテキストに新しいデータが生成されたときにリアルタイムでTAバックグラウンドに送信するために使用されます。
Logstashはオープンソースのサーバー側データ処理パイプラインで、複数のソースから同時にデータを収集し、変換してお気に入りの「リポジトリ」に送信できます。Logstash公式サイト (opens new window)
Filebeatは、ログディレクトリまたは特定のログファイルを監視するローカルドキュメントのログデータコレクターです。Filebeatは、ログとドキュメントを転送および集計するための軽量な方法を提供します。Filebeat公式サイト (opens new window)
Filebeat+Logstashベースの取得フローチャートを以下に示します
# II。Filebeat+Logstashのダウンロードとインストール
注意: Logstash-6. x以上のバージョンでは、サーバーにJDK環境
# 2.2 Logstashのダウンロードとインストール
Logstashの公式インストールドキュメント (opens new window)を参照してください
# 2.3 logstash-output-thinkingdataプラグイン
最新バージョン: 1.0.0
更新時間は: 2020-06-09
# 2.3.1 logstash-output-thinkingdataプラグインのインストールとアンインストール
このプラグインはデータがjsonデータかどうかをチェックし、パッケージデータをTAに
logstashディレクトリで実行:
bin/logstash-plugin install logstash-output-thinkingdata
インストールには少し時間がかかりますが、インストールが成功した後に実行するのを待ってください
bin/logstash-plugin list
リストにlogstash-output-thinkingdataがある場合は、インストールが成功したことを表します。
その他のコマンドは次のとおりです。
アップグレードプラグインを実行できます:
bin/logstash-plugin update logstash-output-thinkingdata
アンインストールプラグインを実行できます:
bin/logstash-plugin uninstall logstash-output-thinkingdata
# 2.3.2変更ログ
# v1.0.0 2020/06/09
- 受信Logstashでevent渡したmessageをTAに送信
# 2.4 Filebeatのダウンロードとインストール
参照Filebeat公式インストールドキュメント (opens new window)して、ダウンロード方法を選択してください
# III。Filebeat+Logstash使用説明書
# 3.1データ準備
1.まず転送するデータをETLしてTAのデータ形式、ローカルに書き込むかKafkaクラスタに転送するが、Javaなどのサービス側SDKのKafkaやローカルテキストを書き込むconsumerを使っている場合、データはすでに正しい形式であり、変換する必要はない。
2.アップロードされたデータのディレクトリ、またはKafkaのアドレスとtopicを特定し、Filebeat+Logstashの構成を設定すると、Filebeat+Logstashはディレクトリ内の変更を監視します(新規または既存のテキストを監視します)、またはKafkaのデータを購読します。
3.監視ディレクトリに保存され、アップロードされたデータログの名前を直接変更しないでください。名前変更ログは新しいテキストに相当し、Filebeatはこれらのテキストを再アップロードしてデータの重複を引き起こす可能性があります。
# 3.2 Logstashの設定
# 3.2.1 Logstashパイプライン構成
Logstashは複数のパイプラインを同時に実行することをサポートしており、各パイプライン間に影響を与えず、それぞれ独立した入出力構成を持っており、パイプラインの構成テキストはconfig/pipelines.ymlにある。現在Logstashを使用して他のログ収集作業を行っている場合は、元のLogstashにPipelineを追加してTAのログデータを収集し、TAに送信
新しいパイプラインの構成は次のとおりです。
# thinkingdata 的管道配置
- pipeline.id: thinkingdata-output
# 如果上传用户属性的核心数设置为1,如果只是上传事件属性可以设置小于等于本机cpu数
pipeline.workers: 1
# 使用的缓冲队列类型
queue.type: persisted
# 使用不同的输入输出配置
path.config: "/home/elk/conf/ta_output.conf"
より多くのパイプライン設定公式サイト:Pipeline (opens new window)
# 3.2.2 Logstash入出力構成
ta_output.conf参考例:
注:ここでシナリオは、SDKによって生成されたデータのテキスト、Logstashの出力入力構成
# 使用 beats 作为输入
input {
beats {
port => "5044"
}
}
# 如果不是服务端SDK的生成或者是符合TA格式的数据,需要filter过滤元数据,这里建议是ruby插件过滤,第四模块有案例
#filter {
# if "log4j" in [tags] {
# ruby {
# path => "/home/elk/conf/rb/log4j_filter.rb"
# }
# }
#}
# 使用 thinkingdata 作为输出
output{
thinkingdata {
url => "http://数据采集地址/logbus"
appid =>"您的AppID"
}
}
thinkingdataパラメータ説明:
パラメータ名 | タイプ | 必要です | デフォルト値 | 説明 |
---|---|---|---|---|
URL | 紐 | 本当 | なし | TAデータ受信アドレス |
appid | 紐 | 本当 | なし | プロジェクトのAPPID |
フラッシュインターバルセック | 番号 | 偽り | 2 | トリガーflush間隔、単位:秒 |
フラッシュ_バッチ_サイズ | 番号 | 偽り | 500 | flushをトリガーするjsonデータ、単位:バー |
圧縮 | 番号 | 偽り | 1 | 圧縮データ、0は非圧縮データを表し、イントラネットで設定できる1はgzip圧縮、デフォルトgzip圧縮 |
uuid | ブーリアン | 偽り | 偽り | UUIDスイッチをオンにして、短時間間隔のネットワーク変動が発生する可能性のある重複除去 |
is_filebete_状態_レコード | ブーリアン | 偽り | 本当 | Filebeatをオンにするかどうかoffset、ファイル名など |
# 3.2.3 Logstash実行構成
Logstashはデフォルトでconfig/logstash.yml実行構成として使用します。
pipeline.workers: 1
queue.type: persisted
queue.drain: true
推奨事項
- user_setユーザープロパティを報告するときは、構成pipeline.workersの値を1に変更してください。workersの値が1より大きいと、データを処理する順序が変化する。trackイベントは1より大きく設定できます。
- プログラムの予期しない終了によってデータの転送が失われないようにするには、設定してくださいqueue.type: persisted、Logstashが使用するバッファーキューの種類を表しますので、Logstashを再起動した後もバッファーキュー内のデータを送信し続けるように構成します。
より多くのデータ持続性関連は公式サイトをご覧ください persistent-queues (opens new window)
- 設定queue.drainの値をtrueに設定すると、Logstashは正常に終了する前にすべてのバッファキュー内のすべてのデータを送信します。
詳細は公式サイトをご覧ください logstash.yml (opens new window)
# 3.2.4 Logstashの起動
Logstashインストールディレクトリの下1.パイプライン構成および実行構成としてconfig/pipelines.ymlを使用して直接起動
bin/logstash
2.指定されたta_output.confは、実行構成としてconfig/logstash.ymlを使用して、入力および出力構成ブックとして起動します
bin/logstash -f /youpath/ta_output.conf
3.バックグラウンド起動
nohup bin/logstash -f /youpath/ta_output.conf > /dev/null 2>&1 &
より多くの起動は、Logstash公式起動ドキュメント (opens new window)
# 3.3 Filebeatの構成
# 3.3.1 Filebeatの実行構成
FilebeatはバックエンドSDKログブックを読み込みます。Filebeatのデフォルトの設定テキストは次のとおりですfilebeat.yml。config/filebeat.yml参照構成は次のとおりです。
#======================= Filebeat inputs =========================
filebeat.shutdown_timeout: 5s
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/log.*
#- c:\programdata\elasticsearch\logs\*
#------------------------- Logstash output ----------------------------
output.logstash:
# 可填写一个服务器的进程,也可以填写多个服务器上的Logstash进程
hosts: ["ip1:5044","ip2:5044"]
loadbalance: false
- shutdown_timeout (opens new window): Filebeatは、クローズする前に、パブリッシャーがイベントの送信を完了するのを待っています。
- paths:監視するログを指定します。現在、Go言語のglob関数に従って処理されています。
- hosts:送信アドレスは複数のLogstash hostsで、loadbalanceがfalseの場合はプライマリ機能と同様、trueは負荷分散を表し
- loadbalance: user_setuser_setuser_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_set user_
をインポートするだけtrackイベントをインポートするだけの場合、複数のLogstashプロセスを設定し、loadbalance: trueを設定できます。Filebeatのデフォルト設定はloadbalance: falseです
Filebeat資料が閲覧可能:Filebeat公式ドキュメント (opens new window)
# 3.3.2 Filebeatの起動
filebeat起動後、関連する出力情報を表示:
./filebeat -e -c filebeat.yml
バックグラウンド起動
nohup ./filebeat -c filebeat.yml > /dev/null 2>&1 &
# 四、ケース配置参考
# 4.1 filebeatは異なるログフォーマットのデータ構成を監視
個々のFilebeatのfilebeat.xmlは以下のように設定でき、実行メモリは約10 MBで、複数のfilebeatプロセスから異なるログ形式を監視することもでき、具体的には公式サイトを参照
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
enabled: true
#监控目录
paths:
- /home/elk/sdk/*/log.*
#给数据打tags,Logstash进行匹配,不用做filter处理
tags: ["sdklog"]
#按特殊字符切分的数据
- type: log
enable: true
paths: /home/txt/split.*
#给数据打tags,Logstash进行匹配,需要filter处理
tags: ["split"]
# log4j接收的数据
- type: log
enable: true
paths: /home/web/logs/*.log
# 需要filter处理
tags: ["log4j"]
# log4j接收的数据
# nginx日志数据
- type: log
enable: true
paths: /home/web/logs/*.log
tags: ["nginx"]
# 4.2 Logstash構成
Logstashの構成ブックが間違っているかどうか
bin/logstash -f /home/elk/ta_output.conf --config.test_and_exit
注意:次のスクリプトはすべてrubyプラグインがruby構文に基づいて書かれているので、javaに詳しい場合はlogbusディレクトリの下のカスタムパーサ
# 4.2.1サービス側SDKログ
ta_output.confは以下のように設定できる
input {
beats {
port => "5044"
}
}
# 使用 thinkingdata 作为输出
output{
thinkingdata {
url => "url"
appid =>"appid"
# compress => 0
# uuid => true
}
}
# 4.2.2 log4jログ
log4j形式は次のように設定できます。
//日志格式
//[%d{yyyy-MM-dd HH:mm:ss.SSS}] 日志符合TA的输入时间,也可以是yyyy-MM-dd HH:mm:ss
//[%level{length=5}] 日志级别,debug、info、warn、error
//[%thread-%tid] 当前线程信息
//[%logger] 当前日志信息所属类全路径
//[%X{hostName}] 当前节点主机名。需要通过MDC来自定义。
//[%X{ip}] 当前节点ip。需要通过MDC来自定义。
//[%X{userId}] 用户登录的唯一ID,可以设置account_id,也可以设定为其他值,TA中要求account_id 和distinct_id不能同时为空,可以设置其他的属性,看业务设置。需要通过MDC来自定义。
//[%X{applicationName}] 当前应用程序名。需要通过MDC来自定义。
//[%F,%L,%C,%M] %F:当前日志信息所属的文件(类)名,%L:日志信息在所属文件中的行号,%C:当前日志所属文件的全类名,%M:当前日志所属的方法名
//[%m] 日志详情
//%ex 异常信息
//%n 换行
<property name="patternLayout">[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{userId}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
</property>
ta_output.conf構成
input {
beats {
port => "5044"
}
}
filter {
if "log4j" in [tags]{
#也可以进行其他filter数据处理
ruby {
path => "/home/conf/log4j.rb"
}
}
}
# 使用 thinkingdata 作为输出
output{
thinkingdata {
url => "url"
appid =>"appid"
}
}
/home/conf/log4j.rbスクリプトは以下の通り
# 这里通过参数event可以获取到所有input中的属性
def filter(event)
_message = event.get('message') #message 是你的上传的每条日志
begin
#这里是正则出正确数据的格式,如果是error日志建议单独放在一个文件,错误日志过长没有分析场景,且跨行了
#这里的数据类似这样的格式 _message ="[2020-06-08 23:19:56.003] [INFO] [main-1] [cn.thinkingdata] [x] [123.123.123.123] [x] [x] [StartupInfoLogger.java,50,o)] ## ''"
mess = /\[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] ## '(.*?)'/.match(_message)
time = mess[1]
level = mess[2]
thread = mess[3]
class_name_all = mess[4]
event_name = mess[5]
ip = mess[6]
account_id = mess[7]
application_name = mess[8]
other_mess = mess[9]
exp = mess[10]
if event_name.empty? || account_id.empty?
return []
end
properties = {
'level' => level,
'thread' => thread,
'application_name' => application_name,
'class_name_all' => class_name_all,
'other_mess' => other_mess,
'exp' => exp
}
data = {
'#ip' => ip,
'#time' => time,
'#account_id'=>account_id,
'#event_name'=>event_name, #如果是type是track的时候可以用,如果没有#event_name 不用上传
'#type' =>'track', #可以在文件中获取,如果文件中有的话,确认上报数据是否为用户属性或者事件属性
'properties' => properties
}
event.set('message',data.to_json)
return [event]
rescue
# puts _message
puts "数据不符合正则格式"
return [] #不进行上报
end
end
# 4.2.3 Nginxログ
まずはNginxログのフォーマットを定義し、json形式に設定すると
input {
beats {
port => "5044"
}
}
filter {
#如果是同样的格式数据不需要判断tags
if "nginx" in [tags]{
ruby {
path => "/home/conf/nginx.rb"
}
}
}
# 使用 thinkingdata 作为输出
output{
thinkingdata {
url => "url"
appid =>"appid"
}
}
/home/conf/nginx.rbスクリプトは次のとおりです。
require 'date'
def filter(event)
#取出类似日志信息
# {"accessip_list":"124.207.82.22","client_ip":"123.207.82.22","http_host":"203.195.163.239","@timestamp":"2020-06-03T19:47:42+08:00","method":"GET","url":"/urlpath","status":"304","http_referer":"-","body_bytes_sent":"0","request_time":"0.000","user_agent":"s","total_bytes_sent":"180","server_ip":"10.104.137.230"}
logdata= event.get('message')
#解析日志级别和请求时间,并保存到event对象中
#解析json格式日志
#获取日志内容
#转成json对象
logInfoJson=JSON.parse logdata
time = DateTime.parse(logInfoJson['@timestamp']).to_time.localtime.strftime('%Y-%m-%d %H:%M:%S')
url = logInfoJson['url']
account_id = logInfoJson['user_agent']
#event_name
#account_id或者#distinct_id 同时为null时,跳过上报
if url.empty? || url == "/" || account_id.empty?
return []
end
properties = {
"accessip_list" => logInfoJson['accessip_list'],
"http_host"=>logInfoJson['http_host'],
"method" => logInfoJson['method'],
"url"=>logInfoJson['url'],
"status" => logInfoJson['status'].to_i,
"http_referer" => logInfoJson['http_referer'],
"body_bytes_sent" =>logInfoJson['body_bytes_sent'],
"request_time" => logInfoJson['request_time'],
"total_bytes_sent" => logInfoJson['total_bytes_sent'],
"server_ip" => logInfoJson['server_ip'],
}
data = {
'#ip' => logInfoJson['client_ip'],#可以为null
'#time' => time, #不可为null
'#account_id'=>account_id, # account_id 和distinct_id 不可同时为null
'#event_name'=>url, #如果是type是track的时候可以用,如果没有#event_name 不用上传
'#type' =>'track', #可以在文件中获取,如果文件中有的话,确认上报数据是否为用户属性或者事件属性
'properties' => properties
}
event.set('message',data.to_json)
return [event]
end
# 4.2.4その他のログ
ta_output.conf構成は以下の通り
input {
beats {
port => "5044"
}
}
filter {
if "other" in [tags]{
#也可以进行其他filter数据处理
ruby {
path => "/home/conf/outher.rb"
}
}
}
# 使用 thinkingdata 作为输出
output{
thinkingdata {
url => "url"
appid =>"appid"
}
}
/home/conf/outher.rbスクリプトは次のとおりです。
# def register(params)
# # 这里通过params获取的参数是在logstash文件中通过script_params传入的
# #@message = params["xx"]
# end
def filter(event)
#在这里处理业务数据,如果没有进行grok等一系列处理的情况下,直接在message中获取元数据进行处理
begin
_message = event.get('message') #message 是你的上传的每条日志
#这里处理的log是类似以下的数据
#2020-06-01 22:20:11.222,123455,123.123.123.123,login,nihaoaaaaa,400
time,account_id,ip,event_name,msg,intdata=_message.split(/,/)
#或者通过正则匹配数据,或者json数据,ruby语法解析json即可,可以在这里进行数据处理进行数据上报
#mess = /正则/.match(_message)
#time = mess[1]
#level = mess[2]
#thread = mess[3]
#class_name_all = mess[4]
#event_name = mess[5]
#account_id = mess[6]
#api = mess[7]
properties = {
'msg' => msg,
'int_data' => intdata.to_i #to_i是int类型 ,to_f 是float 类型 ,to_s 是string类型(默认)
}
data = {
'#time' => time,
'#account_id'=>account_id,
'#event_name'=>event_name,
'#ip' => ip,
'#type' =>'track',
'properties' => properties
}
event.set('message',data.to_json)
return [event]
rescue
#如果不想某个数据过去或者出错,这条数据可以返回空,比如account_id 或者 distinct_id 都为null 直接返回[]
return []
end
end