从iLogtail到LoongCollector——探秘多租场景下的可靠性架构保证

笃敏

背景

从iLogtail诞生至今已走过了10多个年头,伴随着云原生和可观测性概念的逐步推广,iLogtail在商业化和开源的过程中也经历了一系列的架构迭代。进入到2024年,为了打造统一的可观测Agent,构建集高可靠、高性能和灵活性于一体的采集器,iLogtail正式升级为LoongCollector(详见《iLogtail 开源两周年:感恩遇见,畅想未来》)。

作为一款高可靠的可观测端采集器,不论是iLogtail还是LoongCollector,我们都持续致力于提供多租场景下的采集隔离性和资源公平性。然而,受制于iLogtail自身的框架和某些历史原因,iLogtail时代只提供了有限的相关能力:

  • 同时支持多份采集配置,且支持配置热加载,但是任意配置变更都会导致端上所有配置的重新加载,进而导致其它无关采集配置的采集暂停。由于iLogtail之前主要针对文件采集场景,因此全量的配置启停并不会造成太大影响。但是LoongCollector面向统一的可观测数据采集,全量配置启停会导致指标采集中断而掉点,这无疑是不可接受的;
  • 当某个发送目标的网络状况波动较大时,不仅会影响目的地为该发送目标的采集配置,还会影响无关采集配置的发送,导致采集延迟;
  • 只能根据SLS的Logstore来指定相应采集配置的优先级,不支持采集配置级的优先级指定;

因此,为了解决这些痛点,从而为用户真正提供完整的多租场景下的可靠性保证,在LoongCollector时代,我们对继承的iLogtail架构进行进一步升级,实现以下几点目标:

  • 采集配置的热加载隔离
  • 采集配置的发送异常隔离
  • 采集配置级的资源公平性

接下来,本文将依次揭秘LoongCollector的架构是如何实现上述目标的,不过在此之前,我们首先需要整体了解一下LoongCollector的数据流模型和线程模型。

数据流模型

数据模型

在LoongCollector中,流转于各个组件之间最基本的数据单位称为事件(Event)。事件可根据类型进一步分为Log事件、Metric事件和Span事件,分别对应可观测数据中的Log、Metric和Trace。多个具有共性的事件可进一步打包成为事件组(Event Group),事件共性的部分以元信息的形式存放在事件组中,从而有效降低内存开销。

流水线

对于任何可观测采集器而言,想要完成数据采集任务,必须先要向采集器说明如何采集、处理和发送所需数据,这个描述称为采集配置。在LoongCollector中,由于采集是通过流处理的方式进行的,因此采集配置又称为流水线配置(Pipeline Config)。代码实现上,每一个流水线配置都对应于内存中的一条流水线,其通用形态如下所示:

可以看到,每条流水线可包含任意多个输入、处理和输出插件,还可包含路由器。各个组件之间的交互数据结构是事件组,每个组件的功能说明如下:

  • 输入插件:用于采集不同类型的数据,例如文本日志、Prometheus指标和各类Trace数据等;
  • 内置处理插件:用于对相应输入插件的数据进行处理,例如将文本文件按行切分成多个Log事件等;
  • 处理插件:用于对流水线内所有输入插件的数据进行处理,例如对Log事件的字段进行正则解析等;
  • 路由器:可根据事件组的元信息将事件组分发到指定的输出插件用于发送,也可将事件组发送到流水线中的所有输出插件;
  • 输出插件:用于将处理之后的数据发送到指定下游。

线程模型

iLogtail使用总线模式作为线程模型,所谓总线模式,就是每个功能模块一个线程。根据流水线各个组件的功能划分,一般可以将流水线划分为三大模块:输入模块、处理模块和发送模块,各个模块之间通过缓冲队列进行连接。由于功能模块的数量是恒定的,因此能够保证整个采集器所需要使用的线程数是恒定的。LoognCollector继承了iLogtail的线程模型,并在此基础上进一步演进,LoognCollector的总线模式示意图如下:

可以看到,在总线模式下,有如下几个固定的工作线程:

  • Input Runner线程:负责从指定的数据源获取数据,然后将数据组装成事件组放入处理队列中。Input Runner是所有线程里最复杂的,一般是一种输入插件类型一个Input Runner线程,也可多个输入插件类型共享同一个Input Runner线程。例如,下图展示了三个流水线及相应的线程模型,其中InputFile插件和InputContainerStdio插件由于都属于文件采集,因此共享同一个File Input Runner线程,InputPrometheus插件独享Prometheus Input Runner线程。
  • Processor Runner线程:负责从所有流水线的处理队列中获取事件组,并根据事件组所属的流水线配置对事件组进行处理,最后将处理后的事件组推送到流水线指定的各个Flusher的发送队列中。该线程可以启动多个,默认为1个。
  • Flusher Runner线程:负责从所有流水线Flusher的发送队列中获取事件组,并根据事件组所属Flusher配置对事件组进行批聚合、序列化和压缩等操作,最后根据Flusher的属性将待发送的请求推送到相应Sink的队列中。

为了保证数据的绝对可靠性,当因为网络异常或服务端能力受限导致请求发送失败时,需要重新将原始的事件组放回到发送队列中,等待一段时间再进行尝试。考虑到批聚合、序列化和压缩等操作都是计算密集型的,我们可以将这些操作前置到Processor Runner线程,使得Flusher Runner线程仅起到请求分配和流控等作用。这样可以避免每次重试发送时,都执行一遍完全相同的操作,减少不必要的重复计算。

  • Sink线程:负责从Sink队列中获取请求并执行实际的发送操作。该线程的个数取决于Sink的类型数,目前已知的包括:
    • Http Sink:所有涉及Http发送的Flusher的请求都会推送到该Sink
    • File Sink:所有向本地磁盘写数据的Flusher的请求都会推送到Sink

当然,实际使用的线程肯定不止于此,还有一些线程负责一些其他的辅助工作,但是总的线程数量是恒定的。

Input Runner

在最简单的情况下,Input Runner线程运行的函数伪代码如下所示:

while(true) {
for (每一个数据源) {
从数据源获取数据;
将数据组装成事件组;
将事件组推送到指定的处理队列;
}
}

这段代码涉及两个核心问题:

  1. 如何知道需要采集哪些数据?或者说如何知道有哪些插件与之关联?
  2. 采集的数据应该推送到哪个处理队列?

对于第一个问题,当输入插件启动时,可通过调用Input Runner类的函数向其注册插件配置;类似地,当输入插件停止时,也可通过调用Input Runner类的函数向其注销插件配置。通过这种注册方式成功地建立起Input Runner和输入插件之间的联系。

有了第一个问题的解答,那第二个问题也就迎刃而解了。我们在向Input Runner类注册插件配置的时候,可以额外传递整个流水线的上下文信息,其中就包含处理队列的信息。如此一来,当我们从输入插件指定的数据源获取到数据之后,就可以很方便地知道应该推送到哪个处理队列中,具体函数如下:

int ProcessQueueManager::PushQueue(const std::string& configName, std::unique_ptr<ProcessQueueItem>&& item);

其中,configName是配置名,item是处理队列元素,主要成员是事件组。

示例

作为例子,可以参考文件采集输入插件与其对应Input Runner之间的交互:

bool InputFile::Start() {
FileServer::GetInstance()->AddFileDiscoveryConfig(mContext->GetConfigName(), &mFileDiscovery, mContext);
FileServer::GetInstance()->AddFileReaderConfig(mContext->GetConfigName(), &mFileReader, mContext);
// other functions...
return true;
}
bool InputFile::Stop(bool isPipelineRemoving) {
FileServer::GetInstance()->RemoveFileDiscoveryConfig(mContext->GetConfigName());
FileServer::GetInstance()->RemoveFileReaderConfig(mContext->GetConfigName());
// other functions...
return true;
}

InputFile是文件采集输入插件,FileServer是其相对应的Input Runner。在插件启动的时候,我们分别将文件发现和文件读取相关的配置(mFileDiscoverymFileReader),以及流水线的上下文信息(mContext)注册到FileServer中。在插件停止时,我们直接从FileServer中注销当前插件的相关配置。

Processor Runner

Processor Runner线程的主要工作流程如下:

  1. 从任意处理队列中获取一个待处理的事件组;
  2. 获取该事件组对应的流水线配置;
  3. 依次调用流水线中的内置处理插件和处理插件对事件组进行处理;
  4. 根据流水线的路由器信息决定将事件组发送到哪些输出插件;
  5. 对于每一个输出插件,根据插件配置对事件组执行批聚合、序列化和压缩等操作,并将结果推送到对应的发送队列中。

这里比较关键的是步骤2和步骤3。由于每个流水线仅包含一个处理队列,因此我们在从处理队列中获取事件组的同时,可以同时获得相应的流水线名称:

bool ProcessQueueManager::PopItem(unique_ptr<ProcessQueueItem>& item, string& configName);

然后,我们就可以通过下列函数获得具体的流水线信息:

shared_ptr<Pipeline> PipelineManager::FindConfigByName(const string& configName) const;

由于处理插件是流水线级别的资源,因此在获取流水线实体以后,就可以直接对事件组进行处理。然而,内置处理插件是与输入插件绑定的,因此我们必须还要知道当前事件组是流水线的哪个输入插件产生的。为此,我们可以在处理队列元素中额外存放输入插件在流水线中的下标,这样我们就能方便地获取事件组对应的内置处理插件了:

struct ProcessQueueItem {
PipelineEventGroup mEventGroup;
size_t mInputIndex = 0;
// other members...
};

至此,流水线的完整处理函数如下所示:

void Pipeline::Process(vector<PipelineEventGroup>& groups, size_t inputIndex) {
for (auto& p : mInputs[inputIndex]->GetInnerProcessors()) {
p->Process(groups);
}
for (auto& p : mProcessorLine) {
p->Process(groups);
}
}

Flusher Runner

Flusher Runner线程的主要工作流程如下:

  1. 从任意发送队列中获取一个待发送的数据包;
  2. 获取该事件组对应的输出插件配置;
  3. 根据输出插件的类型将数据包打包成请求,并推送到相应的Sink队列中。

由于每个流水线可包含多个输出插件,因此为了实现步骤2,仅记录流水线的名字是不够的,还需要记录输出插件在流水线中的下标。由于PipelineManager::FindConfigByName函数是需要加锁查表的,因此从效率的角度以及方便的角度考虑,我们直接在发送队列元素中记录指向Flusher的指针,这样就能快速找到输出插件对应的配置:

struct SenderQueueItem {
std::string mData;
Flusher* mFlusher;
// other members...
};

最后,为了实现步骤3,我们可以通过SinkType Flusher::GetSinkType()函数获取输出插件具体的类型,然后据此将请求推送至相应的Sink中。

流水线配置热加载隔离

在多租场景下,用户A肯定不希望用户B的流水线配置变更影响自己的采集任务,即用户B的流水线变更不能暂停用户B的流水线运行。进一步地,对于同一个用户的多个采集配置,假设同时存在指标类采集流水线和日志类采集流水线,那日志类采集流水线的变更肯定不能暂停指标类采集流水线,否则会导致指标采集出现断点和偏移。因此,确保流水线配置的热加载隔离是可观测采集器能力的一个重大体现。

在总线模式下,由于流水线之间的线程资源是共享的,因此在停止某个流水线时,是没有办法在不干预各个Runner的情况下,单独将某个流水线内的数据排空的。原因很简单,因为不论是Processor Runner线程还是Flusher Runner线程,他们都是顺序地从各个流水线的队列中去获取事件组。因此,如果要排空某一个流水线,就必须强制Runner线程去反复处理同一个流水线的数据。这不仅在代码实现上较为困难,同时也违背了流水线变更互不影响的原则。

因此,在总线模式下无法排空流水线的前提下,流水线配置的独立热加载存在如下挑战:

  1. 如何保证变更前后数据完整性?
  2. 如何保证变更前后的数据使用正确的流水线配置进行处理和发送?
  3. 如何保证变更前后数据依然是有序的?

数据完整性与正确性

考虑到总线模式下无法排空流水线,那我们是否可以换一个思路:既然在流水线变更的时候,Processor Runner线程和Flusher Runner线程都保持正常运行状态,那在停止流水线的时候,是否可以只停止输入插件,让Processor Runner线程和Flusher Runner线程继续处理当前流水线的剩余数据?如此一来,单个流水线变更完全不会影响其他流水线的运行,做到了热加载隔离。这个想法看起来是合理的,但是有两个关键问题需要考虑:

  1. Processor Runner线程和Flusher Runner线程在处理和发送数据时,都必须保证该数据对应的流水线存在。如果简单地在停止输入插件后就将流水线删除,那么会产生两个后果:
    1. Processor Runner线程无法找到相应的流水线导致数据丢失,或找到的是更新后的流水线导致处理错位;
    2. SenderQueueItem内的Flusher指针失效,从而导致Flusher Runner线程在解引用指针时发生未知错误。
  2. 队列和发送插件内的批聚合模块都拥有缓冲区,如果在停止输入插件后就将流水线删除,则缓冲区内的数据会全部丢失。事实上,对于任何有状态处理插件(现阶段没有),不加处理直接删除流水线都会导致插件内部的缓冲数据丢失。

因此,我们必须保证在旧流水线产生的最后一个数据发送完毕之前,旧流水线是始终有效的。换言之,在新流水线启动之初,内存中极有可能同时存在新老两个流水线。为了实现这个需求,一个很自然的想法就是利用C++的智能指针来实现对流水线的引用计数。

流水线引用计数

最简单的方法,我们可以为流水线中的每一个有状态组件(处理队列、批聚合模块、发送队列)中的数据结构增加一个指向流水线的智能指针。然而,这样做依然会存在两个问题:

  1. 在新老两条流水线同时存在的状态下,我们没有办法保证旧流水线的数据会被优先处理,即无法保证采集数据的有序性。
  2. 为每一个数据包都附上一个智能指针会产生一定的同步开销,影响处理速率。事实上,在非配置变更阶段添加智能指针纯粹是在做无用功。

为此,我们需要对流水线数据成员和流水线停止策略进行调整:

  1. 首先,我们可以将处理队列和发送队列的所有者从流水线转移到全局的QueueManager,这样就能做到队列在流水线变更前后的复用,从而保证了队列中的数据是保序的,即新流水线的数据必定出现在旧流水线的数据之后。
  2. 对于批聚合模块,我们也可以采取和队列一样的策略,但从可靠扩展性的角度,我们不可能对未来的每一个有状态组件都执行类似的操作。因此,我们需要在流水线销毁前,将组件内的数据全部推送至发送队列中。

有了上述两点保证,我们只需要在处理队列元素和发送队列元素中增加一个指向流水线的智能指针mPipeline即可:

struct ProcessQueueItem {
PipelineEventGroup mEventGroup;
size_t mInputIndex = 0;
std::shared_ptr<Pipeline> mPipeline;
// other members...
};
struct SenderQueueItem {
Flusher* mFlusher;
std::shared_ptr<Pipeline> mPipeline;
// other members...
};

进一步地,我们只需要在流水线停止时,对当前还残留在队列中的元素附上智能指针即可,从而最大程度提升了效率。如此,当发送队列最后一个旧流水线产生的数据被成功发送后,旧流水线的引用计数自动降为0,从而执行真正的删除操作。

Drain Or Not?

前面提到,对于有状态的组件,我们需要在流水线停止时将内部的数据排空,全部推送发送队列。既然需要排空,那似乎又与本节开头所描述的“总线模式无法单独排空流水线”相矛盾。实则不然,我们这里只是需要排空特定组件,并不需要排空整条流水线。为了排空批聚合模块,我们只需要确保执行排空操作前,不会再有数据进入到该模块即可。而为了达到该目的,我们只需要确保Processor Runner线程不再从旧流水线的处理队列中取出事件组进行处理即可。所以,我们实际并没有排空流水线,我们唯一要做的就是临时禁用处理队列。Bingo!

处理配置选择

我们在上文介绍Processor Runner时提到,我们是通过事件组对应的流水线名称来获取相应的流水线配置的。但为了解决流水线的生命周期问题,我们又在ProcessQueueItem里引入了指向流水线的智能指针。显然,这两者都能拿到对应的流水线配置,但是不一定是同一个流水线。因此,我们需要重新对获取流水线配置的操作进行定义,步骤如下:

  1. 如果ProcessQueueItem中的流水线指针mPipeline不为空,则说明该数据来源于旧流水线,因此直接使用该指针指向的配置;
  2. 否则,说明数据对应的流水线当前正在运行,因此直接根据流水线名称来获取相应的流水线配置。

流水线发送异常隔离

总线模式下的另一个隔离性问题就是流水线的发送异常隔离。例如,对于SLS输出插件而言,当某个地域的网络出现异常时,会导致所有配置了SLS输出插件且发送目标为该地域的流水线出现发送受阻。在总线模式下,由于Flusher Runner线程是全局共用的,不论流水线是否发送异常,它都会从发送队列中取出待发送数据并推送到Sink队列中。因此,来自于发送异常的流水线的请求会被反复尝试发送,从而占用有限的网络IO资源,影响其他正常流水线的发送。

下图给了一个示例,假设当前时刻客户端往SLS杭州服务端的网络异常导致发送受阻,则根据图示流量,Http Sink队列中平均50%的请求都是无效请求。换言之,平均有50%的网络IO资源被浪费了。

为了实现总线模式下的流水线发送异常隔离,我们需要额外增加一个发送反馈机制来控制Flusher Runner线程从发送队列中获取数据的行为。

发送并发度限制器

为了实现上述目标,我们可以为每一个发送目标设置一个发送并发度限制器(Concurrency Limiter),该限制器会根据对应发送目标的发送结果动态调整该发送目标最大允许的同时处于发送状态的请求数量,即发送并发度。具体的发送目标由各个输出插件自行定义,对于SLS输出插件而言,一个发送目标等于一个地域。

考虑到多个流水线的输出插件可能发往同一个目标,因此,发送目标相同的输出插件共享同一个发送并发度限制器。当Flusher Runner线程从各个发送队列中获取待发送数据时,它会首先检查该发送插件对应的发送并发度限制器,如果该发送目标的发送并发度已经达到上限,则不再从队列中取出数据。

发送并发度自适应调整

当一个请求发送完成后,发送并发度限制器会根据发送结果来调整最大发送并发度。受TCP拥塞控制算法的启发,我们采取如下的策略:

  • 当因为网络原因导致发送失败时,将最大发送并发度除2,这个过程称为乘性减。如果当前的最大发送并发度已经变为0,则进入休眠期,隔一段时间尝试发送一次:
    • 如果依然发送失败,则按照Fibonacci回退策略逐步增大尝试间隔;
    • 如果发送成功,则立刻退出休眠期,将最大并发度修改为1;
  • 在非休眠期,如果发送成功或因非网络原因导致发送失败时,将最大发送并发度加1,直到达到上限,这个过程称为加性增

通过使用这个策略,能够保证某个发送目标出现网络异常时,该目标允许发送的数据包可以快速衰减,最大程度减少该发送目标对其他发送目标的影响。如果是网络中断场景,则休眠期的做法能够最大限度减少不必要的发送,同时当网络恢复时,也能在有限的时间内及时恢复数据发送。

流水线资源分配公平性

既然是总线模式,那么在各个Runner线程中,我们需要手动分配各个流水线的计算资源。以Processor Runner线程为例,显然,为了确保每一个流水线都能获得一定的计算资源,我们必须确保资源分配的公平性。因此,在获取下一个待处理的事件组时,我们从上一个处理的事件组对应的流水线之后的流水线开始遍历,直到找到下一个有数据的处理队列。

进一步地,对于某些流水线而言,其优先级会比其他优先级高,这意味着这些流水线的数据应当被优先处理。在计算资源不足的情况下,其它低优先级流水线应当主动为其让步。这主要囊括两种情况:

  • 采集指标类的流水线优先级显然应该比采集日志类的流水线优先级高,当端上资源不足时,应当优先保证指标类数据的实时上报;
  • 采集关键业务数据的流水线显然应该比采集普通业务数据的流水线优先级高,用户可根据实际业务背景去指定各个流水线的优先级。

为了实现这个目标,我们根据优先级对所有的流水线进行分级,每一层级可包含多个流水线,相互之间没有优先区别。在Processor Runner线程获取下一个待处理的事件组时,执行如下操作:

  • 按照优先级由高到低的顺序依次访问各个流水线;
  • 对于同一优先级内的流水线,
    • 如果上一次处理数据对应的流水线没有在本层级内,则从头开始依次访问本层级内的流水线;
    • 否则,从上一次处理数据对应的流水线之后的流水线开始,依次访问本层级内的流水线。

对于Input Runner线程和Flusher Runner线程而言,各个流水线没有优先级之分,因为Processor Runner线程已经完成了相应的需求。因此,对于上述两个线程而言,它们只需要依次访问各个流水线去获取或发送数据即可,这里不再赘述。

总结

可靠性和稳定性永远是一款产品的核心,多租场景下的可靠性保证更是LoongCollector相比其它开源竞品的核心竞争力之一。总线模式为LoongCollector的高性能和低开销提供了架构保证,但同时也为多租场景下的可靠性保证提出了挑战。然而,车到山前必有路,通过一些巧妙的设计和组件之间的解耦,我们最终完整提供了总线模式下的资源公平性和采集隔离性。


observability.cn Authors 2024 | Documentation Distributed under CC-BY-4.0
Copyright © 2017-2024, Alibaba. All rights reserved. Alibaba has registered trademarks and uses trademarks.
浙ICP备2021005855号-32