KafkaV2
Kafka
简介
flusher_kafka_v2 flusher插件可以实现将采集到的数据,经过处理后,发送到Kafka。
版本
版本说明
- 推荐版本:iLogtail v1.6.0 及以上
配置参数
| 参数 | 类型 | 是否必选 | 说明 |
|---|---|---|---|
| Type | String | 是 | 插件类型 |
| Brokers | String数组 | 是 | Kafka Brokers |
| Topic | String | 是 | Kafka Topic,支持动态topic, 例如: test_%{content.appname} |
| Version | String | 否 | Kafka协议版本号 ,例如:2.0.0,默认值:1.0.0 |
| Headers | header数组 | 否 | kafka消息头 ,配置使用请参考本文中Headers配置用例 |
| Convert | Struct | 否 | iLogtail数据转换协议配置 |
| Convert.Protocol | String | 否 | iLogtail数据转换协议,kafka flusher 可选值:custom_single,custom_single_flatten,otlp_log_v1。默认值:custom_single |
| Convert.Encoding | String | 否 | iLogtail flusher数据转换编码,可选值:json、none、protobuf,默认值:json |
| Convert.TagFieldsRename | Map | 否 | 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map | 否 | iLogtail日志协议字段重命名,可当前可重命名的字段:contents,tags和time |
| Authentication | Struct | 否 | Kafka连接访问认证配置,支持SASL/PLAIN,根据kafka服务端认证方式选择配置 |
| Authentication.PlainText.Username | String | 否 | PlainText认证用户名 |
| Authentication.PlainText.Password | String | 否 | PlainText认证密码 |
| Authentication.SASL.Username | String | 否 | SASL认证用户名 |
| Authentication.SASL.Password | String | 否 | SASL认证密码 |
| Authentication.Sasl.SaslMechanism | String | 否 | SASL认证,配置可选项:PLAIN、SCRAM-SHA-256、SCRAM-SHA-512 |
| Authentication.TLS.Enabled | Boolean | 否 | 是否启用TLS安全连接, |
| Authentication.TLS.CAFile | String | 否 | TLS CA根证书文件路径 |
| Authentication.TLS.CertFile | String | 否 | TLS连接kafka证书文件路径 |
| Authentication.TLS.KeyFile | String | 否 | TLS连接kafka私钥文件路径 |
| Authentication.TLS.MinVersion | String | 否 | TLS支持协议最小版本,可选配置:1.0, 1.1, 1.2, 1.3,默认:1.2 |
| Authentication.TLS.MaxVersion | String | 否 | TLS支持协议最大版本,可选配置:1.0, 1.1, 1.2, 1.3,默认采用:crypto/tls支持的版本,当前1.3 |
| Authentication.TLS.InsecureSkipVerify | Boolean | 否 | 是否跳过TLS证书校验 |
| Authentication.Kerberos.ServiceName | String | 否 | 服务名称,例如:kafka |
| Authentication.Kerberos.UseKeyTab | Boolean | 否 | 是否采用keytab,配置此项后需要配置KeyTabPath,默认为:false |
| Authentication.Kerberos.Username | Boolean | 否 | UseKeyTab设置为false的情况下,需要指定用户名 |
| Authentication.Kerberos.Password | String | 否 | UseKeyTab设置为false的情况下,需要指定密码 |
| Authentication.Kerberos.Realm | String | 否 | kerberos认证管理域,大小写敏感 |
| Authentication.Kerberos.ConfigPath | Boolean | 否 | Kerberos krb5.conf |
| Authentication.Kerberos.KeyTabPath | String | 否 | keytab的路径 |
| PartitionerType | String | 否 | Partitioner类型。取值:roundrobin、hash、random。默认为:random。 |
| RequiredAcks | int | 否 | ACK的可靠等级.0=无响应,1=等待本地消息,-1=等待所有副本提交.默认1, |
| Compression | String | 否 | 压缩算法,可选值:none, snappy,lz4和gzip,默认值none |
| CompressionLevel | Int | 否 | 压缩级别,可选值:1~9,默认值:4,设置为0则禁用Compression |
| MaxMessageBytes | Int | 否 | 一个批次提交的大小限制,配置和message.max.bytes对应,默认值:1000000 |
| MaxOpenRequests | Int | 否 | 一个连接允许的最大打开的请求数,默认值:5 |
| MaxRetries | Int | 否 | 提交失败重试次数,最大3次,默认值:3 |
| BulkMaxSize | Int | 否 | 单次请求提交事件数,默认2048 |
| BulkFlushFrequency | Int | 否 | 发送批量 Kafka 请求之前等待的时间,0标识没有时延,默认值:0 |
| Timeout | Int | 否 | 等待Kafka brokers响应的超时时间,默认30s |
| BrokerTimeout | int | 否 | kafka broker等待请求的最大时长,默认10s |
| Metadata.Retry.Max | int | 否 | 最大重试次数,默认值:3 |
| Metadata.Retry.Backoff | int | 否 | 在重试之前等待leader选举发生的时间,默认值:250ms |
| Metadata.RefreshFrequency | int | 否 | Metadata刷新频率,默认值:250ms |
| Metadata.Full | int | 否 | 获取原数数据的策略,获取元数据时使用的策略,当此选项为true时,客户端将为所有可用主题维护一整套元数据,如果此选项设置为false,它将仅刷新已配置主题的元数据。默认值:false。 |
| HashKeys | String数组 | 否 | PartitionerType为hash时,需指定HashKeys。 |
| HashOnce | Boolean | 否 | |
| ClientID | String | 否 | 写入Kafka的Client ID,默认取值:LogtailPlugin。 |
Version需要填写的是kafka protocol version版本号,flusher_kafka_v2当前支持的kafka版本范围:0.8.2.x~3.3.1。 请根据自己的kafka版本号参照下面的kafka protocol version规则进行配置。建议根据自己的kafka版本指定对应protocol version,kafka protocol version支持版本号如下:
0.8.2.0,0.8.2.1,0.8.2.20.9.0.0,0.9.0.10.10.0.0,0.10.0.1,0.10.1.0,0.10.1.1,0.10.2.0,0.10.2.1,0.10.2.20.11.0.0,0.11.0.1,0.11.0.21.0.0,1.0.1,1.0.2,1.1.0,1.1.1,2.0.0,2.0.1,2.1.0,2.1.1,2.2.0,2.2.1,2.2.2,2.3.0,2.3.1,2.4.0,2.4.1,2.5.0,2.5.1,2.6.0,2.6.1,2.6.2,2.7.0,2.7.1,2.8.0,2.8.1,2.8.23.0.0,3.0.1,3.0.2,3.1.0,3.1.1,3.1.2,3.2.0,3.2.1,3.2.2,3.2.3,3.3.0,3.3.1,3.3.2,3.4.0,3.4.1,3.5.0,3.5.1,3.6.0Brokers是个数组,多个Broker地址不能使用;或者,来隔开放在一行里,yaml配置文件中正确的多个Broker地址配置参考如下:
enable: trueinputs: - Type: input_file FilePaths: - /home/test-log/*.logflushers: - Type: flusher_kafka_v2 Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 - 192.XX.XX.3:9092 Topic: KafkaTestTopic样例
采集/home/test-log/路径下的所有文件名匹配*.log规则的文件,并将采集结果发送到Kafka。
enable: trueinputs: - Type: input_file FilePaths: - /home/test-log/*.logflushers: - Type: flusher_kafka_v2 Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 - 192.XX.XX.3:9092 Topic: KafkaTestTopic进阶配置
以下面的一段日志为例,后来将展开介绍ilogtail kafka flusher的一些高阶配置
2022-07-22 10:19:23.684 ERROR [springboot-docker] [http-nio-8080-exec-10] com.benchmark.springboot.controller.LogController : error log以上面这行日志为例 , 我们通ilogtail的processor_regex插件,将上面的日志提取处理后几个关键字段:
- time
- loglevel
- appname
- thread
- class
- message
最后推送到kafka的数据样例如下:
{ "contents": { "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547", "application": "springboot-docker", "level": "ERROR", "message": "Completed initialization in 9 ms", "thread": "http-nio-8080-exec-10", "time": "2022-07-20 16:55:05.415" }, "tags": { "k8s.namespace.name": "java_app", "host.ip": "192.168.6.128", "host.name": "master", "log.file.path": "/data/test.log" }, "time": 1664435098}动态topic
针对上面写入的这种日志格式,如果想根据application名称针对不用的应用推送到不同的topic,
则topic可以这样配置。
Topic: test_%{content.application}最后ilogtail就自动将日志推送到test_springboot-docker这个topic中。
topic动态表达式规则:
%{content.fieldname}。content代表从contents中取指定字段值%{tag.fieldname},tag表示从tags中取指定字段值,例如:%{tag.k8s.namespace.name}${env_name}, 读取系统变量绑定到动态topic上,ilogtail 1.5.0开始支持。- 其它方式暂不支持
动态topic中使用系统变量
动态topic绑定系统变量的两种场景:
- 将系统变量采集添加到日志的
tag中,然后使用%{tag.fieldname}规则完成绑定。 - 对系统变量无采集存储需求,只是想根据设定的系统变量将日志推送到指定的
topic中,直接采用${env_name}规则完成绑定,此方式需要1.5.0才支持。
由于上面提到的两种系统变量的采集绑定都需要做一些特殊配置,因此下面将分别介绍下相关的配置操作。
(1)将系统变量采集到日志中完成动态topic绑定
将系统变量采集添加到日志中有两种方式,一种是在ilogtail容器env添加,另一种是通过processor_add_fields 插件添加,
两种方式不同的配置参考下面的介绍
- 在
daemonset或者sidecar方式部署的ilogtail容器env配置部分添加自定义的系统变量,配置参考案例如下:
env: - name: ALIYUN_LOG_ENV_TAGS # add log tags from env value: _node_name_|_node_ip_|_app_name_ - name: _app_name_ # 添加自定义_app_name_变量, value: kafka自定义的变量_app_name_被添加到ALIYUN_LOG_ENV_TAGS中,日志的tags中会看到自定义的变量, 此时动态 topic
采用%{tag.fieldname}规则配置即可。
- 使用
processor_add_fields插件系统变量添加到日志中,配置参考如下:
processors: - Type: processor_add_fields Fields: service: ${env_name} IgnoreIfExist: false这里${env_name}生效依赖于ilogtail的enable_env_ref_in_config配置,从ilogtail 1.5.0开始支持。
(2)直接采用$符将系统变量绑定动态topic中
在daemonset或者sidecar方式部署的ilogtail容器env配置部分添加自定义的系统变量,配置参考案例如下:
env: - name: ALIYUN_LOG_ENV_TAGS # add log tags from env value: _node_name_|_node_ip_ - name: app_name # 添加自定义app_name变量, value: kafkaapp_name添加到系统变量中后,直接采用动态topic的:${env_name}规则即可绑定。
enable: trueinputs: - Type: input_file FilePaths: - /home/test-log/*.logflushers: - Type: flusher_kafka_v2 Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 - 192.XX.XX.3:9092 Topic: ilogtail_${app_name}${app_name}就是我们上面添加的系统变量。
TagFieldsRename
例如将tags中的host.name重命名为hostname,配置参考如下:
enable: trueinputs: - Type: input_file FilePaths: - /home/test-log/*.logflushers: - Type: flusher_kafka_v2 Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 - 192.XX.XX.3:9092 Convert: TagFieldsRename: host.name: hostname Topic: KafkaTestTopicProtocolFieldsRename
对ilogtail协议字段重命名,在ilogtail的数据转换协议中,
最外层三个字段contents,tags和time属于协议字段。ProtocolFieldsRename只能对
contents,tags和time这个三个字段进行重命名。
例如在使用Elasticsearch你可能想直接将time重命名为@timestamp,则配置参考如下:
enable: trueinputs: - Type: input_file FilePaths: - /home/test-log/*.logflushers: - Type: flusher_kafka_v2 Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 - 192.XX.XX.3:9092 Convert: TagFieldsRename: host.name: hostname ProtocolFieldsRename: time: '@timestamp' Topic: KafkaTestTopic指定分区分发
ilogtail一共支持三种分区分发方式:
random随机分发, 默认。roundrobin轮询分发。hash分发。
random和roundrobin分发只需要配置PartitionerType指定对应的分区分发方式即可。
hash分发相对比较特殊,可以指定HashKeys,HashKeys的中配置的字段名只能是contents中的字段属性。
配置用例:
enable: trueinputs: - Type: input_file FilePaths: - /home/test-log/*.logflushers: - Type: flusher_kafka_v2 PartitionerType: hash HashKeys: - content.application Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 - 192.XX.XX.3:9092 Topic: KafkaTestTopiccontent.application中表示从contents中取数据application字段数据,如果对contents协议字段做了重命名, 例如重名为messege,则应该配置为messege.application
配置Headers
iLogtail中Kafka的消息头是以键值对数组的形式配置的。header中value仅支持字符串类型。
enable: trueinputs: - Type: input_file FilePaths: - /home/test-log/*.logflushers: - Type: flusher_kafka_v2 Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 - 192.XX.XX.3:9092 Topic: KafkaTestTopic Headers: - key: "key1" value: "value1" - key: "key2" value: "value2"数据平铺
ilogtail 1.8.0新增数据平铺协议custom_single_flatten,contents、tags和time三个convert层的协议字段中数据做一级打平。
当前convert协议在单条数据处理仅支持json编码,因此custom_single_flatten需要配合json编码一起使用。
enable: trueinputs: - Type: input_file FilePaths: - /home/test-log/*.logflushers: - Type: flusher_kafka_v2 Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 - 192.XX.XX.3:9092 Convert: Protocol: custom_single_flatten Encoding: json Topic: KafkaTestTopic非平铺前写入kafka的消息格式
{ "contents": { "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547", "application": "springboot-docker", "level": "ERROR", "message": "Completed initialization in 9 ms", "thread": "http-nio-8080-exec-10", "@time": "2022-07-20 16:55:05.415" }, "tags": { "k8s.namespace.name": "java_app", "host.ip": "192.168.6.128", "host.name": "master", "log.file.path": "/data/test.log" }, "time": 1664435098}使用平铺协议后custom_single_flatten,json全部被一级平铺。
{ "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547", "application": "springboot-docker", "level": "ERROR", "message": "Completed initialization in 9 ms", "thread": "http-nio-8080-exec-10", "@time": "2022-07-20 16:55:05.415", "k8s.namespace.name": "java_app", "host.ip": "192.168.6.128", "host.name": "master", "log.file.path": "/data/test.log", "time": 1664435098}安全连接配置
flusher_kafka_v2支持多种安全认证连接kafka服务端。
PlainText认证,ilogtail v1.3.0开始支持;SASL认证,ilogtail v1.3.0开始支持;TLS认证,ilogtail v1.4.0开始支持;Kerberos认证(待测试验证),ilogtail v1.4.0开始支持;
前面两种配置比较简单,下面主要介绍下TLS和Kerberos两种认证的配置。
TLS配置参考
enable: trueinputs: - Type: input_file FilePaths: - /home/test-log/*.logflushers: - Type: flusher_kafka_v2 PartitionerType: hash HashKeys: - content.application Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 - 192.XX.XX.3:9092 Authentication: TLS: Enabled: true CAFile: /data/cert/ca.crt CertFile: /data/cert/client.crt KeyFile: /data/cert/client.key MinVersion: "1.1" MaxVersion: "1.2" Topic: KafkaTestTopic注: 配置仅供参考,证书文件请自行生成后根据事情情况配置。
Kerberos配置参考
enable: trueinputs: - Type: input_file FilePaths: - /home/test-log/*.logflushers: - Type: flusher_kafka_v2 PartitionerType: hash HashKeys: - content.application Brokers: - 192.XX.XX.1:9092 - 192.XX.XX.2:9092 - 192.XX.XX.3:9092 Authentication: Kerberos: ServiceName: kafka Realm: test UseKeyTab: true ConfigPath: "/etc/krb5.conf" KeyTabPath: "/etc/security/kafka.keytab" Topic: KafkaTestTopic