Skip to content

Conversation

svc-design
Copy link

@svc-design svc-design commented Jul 24, 2025

This PR is for:

  • Agent

Feature:

  • 支持多线程并发发送日志,提升高吞吐量场景稳定性
  • 引入了基于 crossbeam-channel 的 全新 MPMC(多生产者多消费者)队列实现,用于替代或补充现有的 ring-buffer 实现。

背景问题

在专属采集器或高速数据面(如 2 Mpps、L4 Flow 限速 4 万条/s)下,现有链路中仅包含单个发送队列和单个 UniformSenderThread,很容易出现写入过快、队列被写满、日志覆盖丢弃的问题,最终影响日志完整性与可观测性。


本次改动实现

✅ 主要改动

在 agent/crates/public/src/queue/ 下新增:mpmc_queue.rs:实现基于 crossbeam_channel 的 Sender、Receiver 和 StatsHandle,支持 MPMC 模型;

修改 queue/mod.rs: 引入并导出 mpmc_queue,增加 bounded_mpmc() 构造函数

✅ 新增功能:
  • 引入可配置的 日志发送并发度参数,包括:

    • l4_flow_senders
    • l7_flow_senders
    • metric_senders
    • pcap_senders
  • 每类日志类型按配置值创建多个:

    • 有界 DebugSender 队列(queue::bounded_with_debug
    • 对应的 UniformSenderThread 实例
  • 所有 sender 命名后缀追加编号(如 3-flowlog-to-collector-sender-0 ~ -3

✅ 日志发送改为并发分发:
  • 负载均衡策略:目前支持:

    • Round-Robin(默认)
    • 哈希分发(基于五元组)可扩展支持
  • 发送线程存入 Vec<UniformSenderThread<_>>,在 Trident::start() 中统一启动,优雅退出时统一停止

✅ 配置示例(YAML):
log:
  l4_flow_senders: 4
  l7_flow_senders: 2
  metric_senders: 2

## ChatGPT/CodeX 设计思路

1. 配置并发度
为每种日志类型新增一个配置项,如 l4_flow_senders、metrics_senders 等,用于指定要创建的 sender 数量。

2. 创建多组队列及线程
根据上述配置,循环调用 queue::bounded_with_debug 和 UniformSenderThread::new 创建若干队列及对应的 UniformSenderThread。可以在队列名称后追加索引区分,例如 "3-flowlog-to-collector-sender-1"、"3-flowlog-to-collector-sender-2" 等。所有生成的线程存入 Vec<UniformSenderThread<_>> 统一管理。

3. 在聚合/生成阶段分发数据
原先的发送链路(如 FlowAggrThread)只向一个 DebugSender 写数据。扩展后,可在 CollectorThread 或 FlowAggrThread 中实现简单的负载均衡策略:

- 轮询:对每个输出条目按顺序选择不同的 DebugSender。
- 哈希:根据流的五元组或其他关键字段计算哈希,选择固定的 sender,从而保持同一流的数据顺序。
分发后即可并行写入多个发送队列。

4. 启动与管理多个线程

在 Trident::start() 中遍历 Vec<UniformSenderThread<_>>,逐一调用 start() 启动。停止时同样遍历 notify_stop 或 stop()。
统计信息和异常处理可沿用现有逻辑,只需将每个线程的计数器注册到 stats_collector。

5. 多消费者/MPMC支持

多消费者(MPMC, Multi-Producer Multi-Consumer)支持是对当前队列模型(MPSC: 多生产者单消费者)的架构级增强,其核心目标是提升消费者处理吞吐能力,降低单消费者瓶颈对系统稳定性的影响

@CLAassistant
Copy link

CLAassistant commented Jul 24, 2025

CLA assistant check
All committers have signed the CLA.

…or-high-throughput

Add MPMC queue implementation

引入了基于 crossbeam-channel 的 全新 MPMC(多生产者多消费者)队列实现,用于替代或补充现有的 ring-buffer 实现。

🔧 主要改动
在 agent/crates/public/src/queue/ 下新增:

mpmc_queue.rs:实现基于 crossbeam_channel 的 Sender、Receiver 和 StatsHandle,支持 MPMC 模型;

修改 queue/mod.rs:

引入并导出 mpmc_queue,增加 bounded_mpmc() 构造函数
…ub/workflows

Replace custom Cirun runners with default GitHub runners
…ersion

Update arm64 runner for verify agent
…-deepflow-agent

Fix clone derivation for BoxedTaggedFlow
…ation-for-performance

Add dynamic queue with expansion
…ent-of-thread-and-queue-settings

Add auto tuning for sender configuration
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants