menu
Is this helpful?

# LogBus2 使用指南

# 一、LogBus2 简介

LogBus2 是在原 LogBus 基础上,重新开发的日志同步工具。相比原 LogBus,其内存占用减少至原先的五分之一,速度提升5倍。


Logbus2 主要用于将后端的日志数据实时地导入到 TE 后台,其核心工作原理类似于 Flume、Loggie,会监控服务器日志目录下的文件流,当目录下任意日志文件有新数据产生时,会对新数据进行校验,并实时发送至 TE 后台。

我们建议以下几类用户使用 LogBus2 接入数据:

  1. 使用服务端 SDK / Kafka / SLS 存储TE格式数据的用户,通过 LogBus2 上传数据
  2. 对数据的准确性及维度要求较高,仅通过客户端 SDK 无法满足数据需求,或不方便接入客户端 SDK
  3. 不想自己开发后端数据推送流程
  4. 需要传输大批量历史数据
  5. 对内存使用、传输效率有一定要求

注意:Logbus v1 迁移至Logbus v2 请联系数数技术支持.

# 二、下载 LogBus2

最新版本:2.1.1.3

更新时间为:2024-2-23

Linux-amd64版本下载地址 (opens new window)

Linux-arm64版本下载地址 (opens new window)

Windows版本下载地址 (opens new window)

Mac Apple Silicon下载地址 (opens new window)

Mac Intel 下载地址 (opens new window)

Docker Image (opens new window)

#

# 三、使用前准备

# 文件类型

  1. 确定上传数据的文件存放的目录,并配置 LogBus2 的相关配置,LogBus2 会监控文件目录下的文件变更(监控文件新建或 tail 已有文件)。
  2. 请勿对存放于监控目录下且已经上传的数据日志直接进行重命名,重命名日志相当于新建文件,LogBus2 将可能会重新上传这些文件,造成数据重复。
  3. 由于 LogBus2 运行目录下存在当前日志传输进度快照,请勿自行对 runtime 目录下的文件进行操作

# Kafka

  1. 确定Kafka消息格式,Logbus仅会对Kafka Message的value做处理
  2. 确保用户ID于分区隔离,避免数据乱序问题
  3. 请开启自由使用Kafka Consumer Group,防止多Logbus消费时出现故障
  4. 默认从earliest开始消费,若需要从指定位点消费需要先创建consumer group和特定位点

# SLS

  1. 联系阿里云开启Kafka协议消费

# CLS

  1. 确保关闭CLS自动分裂。
  2. 申请相应资源的AK&SK

# 四、LogBus2 的安装与升级

# 安装

下载 LogBus2 安装包,并解压。

解压后的目录结构:

  • Logbus:LogBus2:二进制文件
  • conf:
    • daemon.json:配置文件模版2
  • tools:
    • configConvert:配置转换工具

# 升级

要求:LogBus2 版本 ≥ 2.0.1.7

直接执行

./logbus update即可,完成升级后执行

./logbus start

# 五、Logbus2 的使用及配置

# 启动参数

# 启动

./logbus start

# 停止

./logbus stop

# 重启

./logbus restart

# 检查配置、检查与 TE 系统的连通性

./logbus env

# 重置 LogBus 读取记录

./logbus reset
# Kafka当前不可用

# 查看传输进度

./logbus progress
# Kafka当前不可用

# 校验文件格式

./logbus dev
# Kafka当前不可用

# 配置文件指南

# 默认配置模版

{
 "datasource": [
  {
     "file_patterns": [
       "/data/log1/*.txt", // #用户方需动态调整为源数据文件所在的绝对路径
       "/data/log2/*.log"
    ],//文件匹配符
     "app_id": "app_id"//app_id来自TE官网的token,请在TE后台的项目配置页面获取接入项目的APPID并填入此处
  }
],
 "push_url": "http://RECEIVER_URL"//http传输请使用http://receiver.ta.thinkingdata.cn/,如果您使用的是私有化部署服务,请修改传输URL为:http://数据采集地址/
}

# 常用配置

# 文件
{
    "datasource": [
      {
        "type":"file",
        "file_patterns": ["/data/log1/*.txt", "/data/log2/*.log"],  //文件Glob匹配规则
        "app_id": "app_id", //APPID来自TE官网的token,请在TE后台的项目配置页面获取接入项目的APPID并填入此处
        "unit_remove": "day", //文件删除单位,支持 "day", "hour"
        "offset_remove": 7,//unit_remove*offset_remove 得到最后的移除时间 **offset必须大于0,否则不会生效
        "remove_dirs": true,//是否开启文件夹删除,默认false NOTE:只有当该文件夹下所有文件消费完毕之后才会进行文件夹删除
        "http_compress": "gzip" //是否开启http压缩,默认为"none"
      }
    ],
    "cpu_limit": 4, //限制Logbus2使用的CPU核数
  
    "push_url": "http://RECEIVER_URL"
  }
  

# Kafka
{
    "datasource": [
      {
        "type":"kafka",    //类型为Kafka
        "topic":"ta",    //具体消费主题
        "brokers":[
          "localhost:9091"    //Kafka Brokers 地址
        ],
        "consumer_group":"logbus",    //消费者组名称
        "cloud_provider":"ali", //云厂商名称,支持"ali"、"tencent"、"huawei"
        "username":"",    //Kafka 用户名
        "password":"",    //Kafka 验证密码
        "instance":"",    //云厂商实例名
        "protocol":"none", //身份验证协议,支持 "none"|"plain"|"scramsha256"|"scramsha512"
        "block_partitions_revoked":true,
        "app_id":"YOUR_APP_ID"
      }
    ],
    "cpu_limit": 4, //限制Logbus2使用的CPU核数
  
    "push_url": "http://RECEIVER_URL"
  }

# SLS

NOTE:使用SLS消费前请联系阿里云开启SLS Kafka消费协议

{
  "datasource": [
    {
      "type":"kafka",
      "brokers":["{PROJECT}.{ENTRYPOINT}:{PORT}"],    //NOTE:详见https://help.aliyun.com/document_detail/29008.htm#reference-wgx-pwq-zdb
      "topic":"{SLS_Logstore_NAME}",    //Logstore名称
      "protocol":"plain",
      "consumer_group":"{YOUR_CONSUMER_GROUP}",     // ConsumerGroup
      "username":"{PROJECT}",    // Project名称
      "disable_tls":true,
      "password":"{ACCESS_ID}#{ACCESS_PASSWORD}",    //    阿里云 RAM授权
      "app_id":"YOUR_APP_ID"
    }
  ],
  "push_url": "http://RECEIVER_URL"
}

# CLS

NOTE:在使用前请确保消费/写入的吞吐 大于Log 保留时间

{
    "datasource":[
        {
            "type": "kafka",
            "brokers":["YOUR_AZ_ENDPOINT"],
            "session_timeout": 9000,
            "fetch_max_bytes": 104857600,
            "topic": "YOUR_TOPIC",
            "protocal": "plain",
            "consumer_group": "YOUR_GROUP",
            "username": "",
            "password": "",
            "block_paritions_revoked":"true"
        }
    ],
    "push_url":"http://RECEIVER_URL"
}

# 完整配置项

# 配置项目列表与说明
配置 类型 示例 必填 说明
cpu_limit Number 4 限制 Logbus2 允许使用的最大 CPU 核数
push_url String ✔️ receiver 地址。需要 http/https 开头。
datasource Object list ✔️ 数据源列表
min_disk_free_space uint64 1024 logbus2所在目录可用空间检测,小于配置值时自动关闭,单位KB,默认为1*1024*1024

# datasource(数据源配置)

# 文件
配置 类型 示例 必填 默认值 说明
app_id String ✔️ "" 数据上报项目appid
appid_in_data Bool false false 在使用文件内appid进行分发时,开启此选项,Logbus2不再使用app_id当中的appid进行分发
specified_push_url Bool false true:不解析 push_url,按照用户配置的 push_url 原样进行发送,即http://yourhost:yourport。false:解析 push_url 后按照 receiver 规定的 logbus url 发送,即http://yourhost:yourport/logbus。
add_uuid Bool false true: 是否在每条数据中增加 uuid 属性(开启会降低传输效率)。
file_patterns String list ✔️ [""] 支持目录通配符,暂时不支持正则。如无特殊配置,默认绕过。gz/.iso/.rpm/.zip/.bz/.rar/.bz2 后缀的文件
ignore_files String list [""] 在 file_patterns 中过滤的文件
unit_remove String "" 用户文件删除。按天 (day) 或小时 (hour) 删除。注意:如果没有配置文件自动删除,LogBus2 内存占用会慢慢增高
offset_remove Int 0 用户文件删除。offset_remove>0 且配置了 unit_remove 为 day 或 hour 时用户文件删除功能生效。
remove_dirs Bool true|false false 是否开启文件夹删除
http_timeout String 500ms 600s 向 receiver 端发送数据时的超时时间,默认值:600s。范围:200ms - 600s。支持毫秒"ms", 秒"s", 分钟"m", 小时"h"。
iops int 20000 20000 限速Logbus每秒数据流通量(条数)
limit bool true|false false 开启限速开关
http_compress String none | gzip none 发送 http 时数据压缩的格式。none=不压缩。默认值:none。
filters object list 事件过滤器,多个过滤器之间为或的关系
filters[0].key string #event_name ✔️ 需要过滤的key值
filters[0].value interface{} register ✔️ 需要过滤的value值
filter[0].type string string string value值的类型,默认string,支持 string | boolean | int64

# Kafka

NOTE:使用LogbusKafka模式前,务必开启Consumer Group自由使用

配置

类型

示例

必填

默认值

说明

brokers String List ["localhost:9092"] ✔️ [""] Kafka Brokers
topic String "ta-msg-chan" ✔️ "" Kafka消费主题
consumer_group String "ta-consumer" ✔️ "" Kafka Consumer Group
protocol String "plain" "none" Kafka认证模式
username String "ta-user" "" Kafka 用户名
password String "ta-password" "" Kafka 密码
instance String "" "" CKafka需要的实例ID
fetch_count Number 1000 10000 每次Poll的消息数量
fetch_time_out Number 30 5 Poll的超时时间
read_committed Bool true false 是否消费Kafka UnCommitted数据
disable_tls Bool true false 关闭tls验证
cloud_provider String "tencent" "" 公网接入Kafka时启用,目前支持的云厂商有:tencent,huawei,ali
block_partitions_revoked Bool false false 是否阻塞消费,若不开启,在多Logbus处于同一个consumer_group时会出现数据重复问题
auto_reset_offset String "earliest" "earliest" 指定没有提交偏移量时的默认行为的参数

NOTE:Logbusv2当前采用负载均衡模式对Kafka进行消费,Logbusv2部署数量≤partition num

# 监控配置以及看板搭建

请查看:监控配置 DEMO (opens new window)

# 告警配置

请查看:告警配置 DEMO (opens new window)

# 插件使用

请查看:插件配置 DEMO (opens new window)

# 六、进阶使用

# 多事件使用单个Logbus进行上报

在单Logbus部署情况下,由于IO限制,可能存在部分信息延后消费的情况,例如

由于轮询的关系,消费顺序为event_/log.1 -> event_/log.2 -> event_*/log.3在这种情况下,会导致文件消费进度慢的情况,可以通过开启多个Logbus,对没有上下文语意的日志通过Glob进行切割,使Glob匹配到的文件并行上传

# 多 PipeLine 配置

NOTE:多 PipeLine下 appid 不能重复

{
    "datasource": [
      {
        "file_patterns": ["/data/log1/*.txt", "/data/log2/*.log"],  //文件Glob匹配规则
        "app_id": "app_id", //APPID来自TE官网的token,请在TE后台的项目配置页面获取接入项目的APPID并填入此处
        "unit_remove": "day", //文件删除单位,支持 "day", "hour"
        "offset_remove": 7,//unit_remove*offset_remove 得到最后的移除时间 **offset必须大于0,否则不会生效
        "remove_dirs": true,//是否开启文件夹删除,默认false NOTE:只有当该文件夹下所有文件消费完毕之后才会进行文件夹删除
        "http_compress": "gzip"//是否开启http压缩,默认none
      },
      {
        "file_patterns": ["/data/log1/*.txt", "/data/log2/*.log"],  //文件Glob匹配规则
        "app_id": "app_id", //APPID来自TE官网的token,请在TE后台的项目配置页面获取接入项目的APPID并填入此处
        "unit_remove": "day", //文件删除单位,支持 "day", "hour"
        "offset_remove": 7,//unit_remove*offset_remove 得到最后的移除时间 **offset必须大于0,否则不会生效
        "remove_dirs": true,//是否开启文件夹删除,默认false NOTE:只有当该文件夹下所有文件消费完毕之后才会进行文件夹删除
        "http_compress": "gzip"//是否开启http压缩,默认none
      }
    ],
    "cpu_limit": 4, //限制Logbus2使用的CPU核数
  
    "push_url": "http://RECEIVER_URL"
  }

# LogBus2 On Docker

# 拉取最新镜像

docker pull thinkingdata/ta-logbus-v2:latest

# 宿主机上创建持久化folder,初始化配置文件

mkdir -p /your/folder/path/{conf,log,runtime}
touch /your/folder/path/conf/daemon.json
vim /your/folder/path/conf/daemon.json

⚠️警告:请勿自行删除runtime目录下的任何文件

# 修改配置模版并写入至daemon.json

{
  "datasource": [
    {
      "type":"file",
      "app_id": "YOUR APP ID",
      "file_patterns": ["/test-data/*.json"]
    },
    {
      "type":"kafka",
      "app_id": "YOUR APP ID",
      "brokers": ["localhot:9092"],
      "topic":"ta-message",
      "consumer_group":"ta"
    }
  ],
  "push_url": "YOUR PUSH URL WITHOUT SUFFIX OF/logbus"
}

# 挂载数据文件夹并启动LogBus

docker run -d \
  --name logbus-v2 \
  --restart=always \
  -v /your/data/folder:/test-data/ \
  -v /your/folder/path/conf/:/ta/logbus/conf/ \
  -v /your/folder/path/log/:/ta/logbus/log/ \
  -v /your/folder/path/runtime/:/ta/logbus/runtime/ \
thinkingdata/ta-logbus-v2:latest

# LogBus2 On K8s

# 环境准备

  1. kubectl 可以连接到k8s集群且有部署权限。
  2. 安装依赖:按照helm文档安装helm到本地命令行 https://helm.sh/zh/docs/intro/install/

# 下载 logbus v2 helm 文件

下载链接 (opens new window)

tar xvf logBusv2-helm.tar && cd logbusv2

# 配置 logbus

# 准备

  1. 控制台创建要上传的日志 pvc
  2. 获取 pvc 名称,确认namespace
  3. 获取 TE 的app id、receiver url

# 修改values.yaml

pvc:
  name: pvc名称
logbus_version: 2.1.0.2
namespace: namcspace名称
logbus_configs:
  - push_url: "http://TE上传数据的receiver地址"
    datasource:
      - file_patterns:
        - "container:文件的相对路径的通配符" # 注意:“container:”前缀不要删除
        - "container:文件的相对路径的通配符" # 注意:“container:”前缀不要删除
        app_id: TE系统的app id

# 预览渲染的 yaml

helm install --dry-run -f values.yaml logbus .

# 使用 helm 部署 logbusv2

 helm install -f values.yaml logbus-v2 .

检查创建的statefulset

kubectl get statefulset

检查创建的pod

kubectl get pods

更新K8s内LogBus版本

vim value.yaml # 修改之前的value.yaml文件
# 修改 logbus_version 至最新 NOTE:考虑到向后兼容,最好不要使用latest!
logbus_version:2.0.1.8 -> logbus_version:2.1.0.2
# 保存并退出
helm upgrade -f values.yaml logbus .
# 等待滚动更新

# 注意

logbusv2 需要对挂载的日志的 pvc 具有可读写权限。

logbusv2 把文件消费记录和运行日志按照 pod 分别写入 pvc,如果 pvc 删除 logbus 相关记录有数据重传隐患。

# 配置详细说明

执行命令:

helm show values .

显示可用的配置:

# Default values for logbusv2.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.

pvc:
  name: pvc-logbus
logbus_version: 2.1.0.2
namespace: big-data

logbus_configs:
  #### pod 1
  #### push_url: receiver url, need http:// https:// prefix
  - push_url: "http://172.17.16.6:8992/"
    datasource:
      - file_patterns:
        #### target files relative path in pvc
        - "container:/ta-logbus-0/data_path/*"
        #### TA app_id
        app_id: "thinkingAnalyticsAppID"
  #### pod 2
  - push_url: "http://172.26.18.132:8992/"
    datasource:
      - file_patterns:
        - "container:/ta-logbus-1/data_path/*"
        app_id: "thinkingAnalyticsAppID"
  #### pod 3
  - push_url: "http://172.26.18.132:8992/"
    datasource:
      - file_patterns:
        - "container:/ta-logbus-2/data_path/*"
        app_id: "thinkingAnalyticsAppID"

#### logbus pod requests
#requests:
#  cpu: 2
#  memory: 1Gi

requests部分如果没有明确配置,不会体现在yaml内。

# pvc下单目录

pvc:
  name: 写入实际 pvc 名称

namespace: 已经存在的namespace

logbus_configs:
  - push_url: http或https,写pod可以访问到的TE receiver URL
    datasource:
      - file_patterns:
        - "container:/ta-logbus-0/data_path/*" "container:"是占位符,在yaml部署过程中会把相对路径替换成容器可以访问的绝对路径,在配置目录时候目录需要带上container:前缀。
        app_id: "thinkingAnalyticsAppID" TE系统app id

# pvc下多目录

读取pvc内多个目录时建议分pod部署,每个pod负责一个文件夹。这样部署性能和安全性更好。


pvc:
  name: pvc-logbus

namespace: big-data

logbus_configs:
  #### pod 1
  #### push_url: receiver url, need http:// https:// prefix
  - push_url: "http://172.17.16.6:8992/"
    datasource:
      - file_patterns:
        #### target files relative path in pvc
        - "container:/ta-logbus-0/data_path/*"
        #### TA app_id
        app_id: "thinkingAnalyticsAppID"
  #### pod 2
  - push_url: "http://172.26.18.132:8992/" 注意每个app id和push url都需要单独配置
    datasource:
      - file_patterns:
        - "container:/ta-logbus-1/data_path/*"
        app_id: "thinkingAnalyticsAppID"
  #### pod 3
  - push_url: "http://172.26.18.132:8992/"
    datasource:
      - file_patterns:
        - "container:/ta-logbus-2/data_path/*"
        app_id: "thinkingAnalyticsAppID"

# 多pvc

目前只支持了单个 pvc 的部署,多 pvc 需要配置多次 values.yaml 文件

# 七、常见问题

Q:为什么开启了文件夹删除,但是 LogBus 并没有删除文件夹

A:LogBus 删除文件夹的前提是,当前文件夹内的文件被 LogBus 读取过,且没有任何文件在该文件夹下,才会触发文件夹删除

Q:日志怎么上传不了?

A:LogBus 读取的数据文件中,单条数据里不要有换行符。配置的数据文件不支持正则,只能使用通配符(Glob)。配置的数据文件规则是否可以匹配到文件

Q:为什么文件重复上传

A:LogBus 读取的数据文件中,单条数据里不要有换行符。配置的数据文件不支持正则,只能使用通配符(Glob)。配置的数据文件规则是否可以匹配到文件

Q:为什么数据倾斜

A: 当前TE使用客户distinct_id作为数据UUID进行shuffle,在海量量数据的distinct_id使用同一字符串时,可能会导致单机内存压力增加,从而增加数据倾斜的风险

# 八、Releases Note

# 版本:2.1.1.3 --- 2024.2.23

优化

  • Kafka Progress 体验增强
  • 支持auto_reset_offset参数
  • 日志记录Kafka Client 相关信息

修复

  • 强制停机导致Logbus杀死错误的进程
  • 内存泄漏问题
  • Goroutine泄漏问题
  • Kafka 超时不消费
  • Windows版本 启动失败
  • Windows版本 无法更新

# 版本:2.1.1.2 --- 2024.2.2

优化

  • 日志格式以及部分i18n混杂的问题

# 版本:2.1.1.1 --- 2024.1.10

优化

  • progress 命令的准确性

# 版本:2.1.1.0 --- 2023.12.11

修复

  • 日志默认值修正为默认保留7天、单文件100M切割、最大保留30份日志。在不升级的情况下,配置log配置项也可。

影响

  • 在数据源产生大量不能解析的错误数据的情况下,由于日志会记录错误数据,会造成日志文件越来越大。

# 版本:2.1.0.9 --- 2023.10.26

新增

  • 支持事件过滤器,在客户端进行数据过滤
  • 支持logbus所在目录的空间检测

# 版本:2.1.0.8 --- 2023.6.06

优化

  • 保证插件正确关闭
  • 优化进程通信以及日志输出

修复

  • cpu limit日志输出
  • linux arm架构编译过程

# 版本:2.1.0.7 --- 2023.4.07

新增

  • 支持kafka数据源progress命令

优化

  • 自定义标签支持获取环境变量

# 版本:2.1.0.6 --- 2023.3.28

优化

  • 支持自定义标签,追踪数据来源
  • 支持自定义插件分隔符

# 版本:2.1.0.5 --- 2023.2.20

优化

  • 数据分流项目支持数组,兼容数字和字符串基本类型
  • 监控指标计算逻辑

# 版本:2.1.0.4 --- 2023.1.12

新增

  • 支持配置读取无换行符文件数据源
  • 支持配置循环读取次数与间隔时间

修复

  • 监控指标统计并发bug修复

# 版本:2.1.0.3 --- 2022.12.23

新增

  • 允许通过配置覆盖数据内部appid数据

修复

  • 使用appid_in_data,去掉appid的默认值

# 版本:2.1.0.2 --- 2022.12.13

新增

  • 插件支持属性拆分

修复

  • 多pipeline下,meta_name创建

# 版本:2.1.0.1 --- 2022.11.29

新增

  • kafka数据源支持事务读已提交
  • 插件命令支持sh环境依赖

修复

  • 无appid配置下,meta_name创建

# 版本:2.1.0.0 --- 2022.11.22

新增

  • 数据分流,根据配置appidMap分流数据到不同项目中
  • kafka数据源支持多topic消费
  • 限流器,限制上报速度,降低服务端压力
  • 多pipeline执行数据上报
  • 基于grpc自定义插件解析器
  • 实时性能监控(prometheus、pushgateway、grafana)
  • 压缩算法新增lz4 | lzo

修复

  • 文件消费阻塞问题
  • kafka数据源下,修复logbus无法停止的bug
  • file数据源下,修复文件活跃度高无法跳出的bug
  • 修复文件监听器关闭问题

# 版本:2.0.1.8 --- 2022.07.20

新增

  • dev(格式校验命令)
  • Kafka Source
  • 多平台支持

修复

  • 多次唤醒文件传输进度进程的问题
  • 减少日志量
  • Progress 针对文件传输时间排序
  • 多pipeline优化
  • Docker image精简

# 版本:2.0.1.7 --- 2022.03.01

优化

  • 运行效率,提升性能
  • 文件删除逻辑
  • 位点文件导出逻辑
  • 内存占用