如何开发原生Flusher插件
接口定义
对于使用Http协议发送数据的Flusher,进一步定义了HttpFlusher,接口如下:
Flusher级组件
聚合(必选)
- 作用:将多个小的PipelineEventGroup根据tag异同合并成一个大的group,提升发送效率
- 参数:
可以在flusher的参数中配置Batch字段,该字段的类型为map,其中允许包含的字段如下:
名称 | 类型 | 默认值 | 说明 |
---|---|---|---|
MinCnt | uint | 每个Flusher自定义 | 每个聚合队列最少包含的event数量 |
MinSizeBytes | uint | 每个Flusher自定义 | 每个聚合队列最小的尺寸 |
TimeoutSecs | uint | 每个Flusher自定义 | 每个聚合队列在第一个event加入后,在被输出前最多等待的时间 |
- 类接口:
序列化(必选)
- 作用:对聚合模块的输出进行序列化,分为2个层级:
- event级:对每一个event单独进行序列化
- event group级:对多个event进行批量的序列化
- 类接口:
压缩(可选)
- 作用:对序列化后的结果进行压缩
- 类接口:
开发步骤
下面以开发一个HttpFlusher为例,说明整个开发步骤:
- 在plugin/flusher目录下新建一个Flusherxxx.h和Flusherxxx.cpp文件,用于派生HttpFlusher接口生成具体的插件类;
- 在Flusherxxx.h文件中定义新的输出插件类Flusherxxx,满足以下要求:a. 所有的可配置参数的权限为public,其余参数的权限均为privateb. 新增一个聚合组件:
Batcher<> mBatcher;
c. 新增一个序列化组件:std::unique_ptr<T> mSerializer;
,其中T为EventSerializer
和EventGroupSerializer
中的一种d. 如果需要压缩,则新增一个压缩组件:std::unique_ptr<Compressor> mCompressor;
- 在pipeline/serializer目录下新建一个xxxSerializer.h和xxxSerializer.cpp文件,用于派生
Serializer
接口生成具体类; - (可选)如果需要压缩组件,且现有压缩组件库中没有所需算法,则新增一个压缩组件: a. 在common/compression/CompressType.h文件中,扩展CompressType类用以标识新的压缩算法; b. 在common/compression目录下新建一个xxxCompressor.h和xxxCompressor.cpp文件,用于派生
Compressor
接口生成具体类; c. 在common/compression/CompressorFactory.cpp文件的各个函数中注册该压缩组件; - 在Flusherxxx.cpp文件中实现插件类 a.
Init
函数:
b. SerializeAndPush(BatchedEventsList&&)
函数:
c. SerializeAndPush(vector<BatchedEventsList>&&)
函数:
- 在
PluginRegistry
类的LoadStaticPlugins()
函数中新增如下行: