«

Filebeat → Kafka → ClickHouse 全链路实战

时间:2025-8-13 15:44     作者:wanzi     分类:


📝 构建高可用日志系统:Filebeat → Kafka → ClickHouse 全链路实战

场景:日志采集、缓冲、存储与分析一体化方案


一、背景与目标

在现代分布式系统中,服务产生的日志量巨大,传统的“直接写数据库”或“直连 Elasticsearch”方式存在性能瓶颈和耦合问题。

本文实现一个高吞吐、可扩展、易维护的日志采集与分析链路

[应用日志] 
    ↓
Filebeat(采集)
    ↓
Kafka(消息队列,缓冲解耦)
    ↓
ClickHouse(高性能分析数据库)
    ↓
Grafana / SQL 查询(可视化分析)

目标


二、整体架构图

+----------------+     +--------+     +------------------+
|   Application  |---->|Filebeat|---->|      Kafka       |
+----------------+     +--------+     +------------------+
                                          ↓
                                   +---------------+
                                   |  ClickHouse   | ←[物化视图]
                                   +---------------+
                                          ↓
                                    [Grafana / CLI]

三、环境准备

组件 版本示例 部署方式
Filebeat 8.x 每台应用服务器部署
Kafka 3.0+ 集群(推荐3节点)
ZooKeeper 3.8+ 或 KRaft 模式 Kafka 依赖
ClickHouse 23.8+ 单机或集群
OS Linux (Ubuntu/CentOS)

💡 本文以单机测试环境为例,生产建议集群部署。


四、第一步:Filebeat 配置(采集日志并发送到 Kafka)

1. 安装 Filebeat

# Ubuntu 示例
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-8.x.list
sudo apt update && sudo apt install filebeat

2. 编写配置文件 to-kafka.yml

# to-kafka.yml
filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/myapp/*.log
    tags: ["myapp"]
    fields:
      log_type: "app_log"
      env: "production"

    # 多行日志合并(例如 Java 异常栈)
    multiline.type: pattern
    multiline.pattern: '^\d{4}-\d{2}-\d{2}|\[ERROR\]'
    multiline.negate: true
    multiline.match: after

# 可选:处理字段
processors:
  - decode_json_fields:
      fields: ["message"]
      process_array: false
      max_depth: 1
      target: ""
      overwrite_keys: true

output.kafka:
  hosts: ["kafka-server:9092"]
  topic: filebeat-logs
  partition.round_robin:
    reachable_only: true
  # 启用压缩提升性能
  compression: gzip
  max_message_bytes: 1000000

# 日志输出到控制台(调试用)
logging.level: info
logging.to_stderr: true

3. 启动 Filebeat

filebeat -e -c to-kafka.yml

📌 -e 表示输出日志到 stderr,方便调试。生产环境可改为后台运行。


五、第二步:Kafka 设置 Topic

确保 Kafka 已启动,并创建用于接收日志的 topic:

# 创建 topic
kafka-topics.sh --create \
  --topic filebeat-logs \
  --bootstrap-server kafka-server:9092 \
  --partitions 3 \
  --replication-factor 1

# 查看 topic
kafka-topics.sh --describe --topic filebeat-logs --bootstrap-server kafka-server:9092

✅ 建议分区数 ≥ ClickHouse 消费线程数,提高并发。


六、第三步:ClickHouse 接收 Kafka 数据

1. 安装 ClickHouse(Ubuntu 示例)

curl -s https://packagecloud.io/ClickHouse/clickhouse/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/clickhouse-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packagecloud.io/ClickHouse/clickhouse/debian/ $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/clickhouse.list
sudo apt update
sudo apt install clickhouse-server clickhouse-client

启动服务:

sudo service clickhouse-server start

2. 创建 Kafka 引擎表(消费队列)

CREATE TABLE kafka_logs_queue (
    timestamp DateTime,
    level String,
    message String,
    host String DEFAULT hostName()
) ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'kafka-server:9092',
    kafka_topic_list = 'filebeat-logs',
    kafka_group_name = 'clickhouse-filebeat-group',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 1;

⚠️ kafka_group_name 相同的消费者属于同一组,避免重复消费。

3. 创建目标表(真实存储)

CREATE TABLE app_logs (
    timestamp DateTime,
    level String,
    message String,
    host String,
    date Date MATERIALIZED toDate(timestamp)
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(date)
ORDER BY (timestamp, host)
TTL date + INTERVAL 30 DAY;  -- 自动清理30天前数据

4. 创建物化视图(自动导入)

CREATE MATERIALIZED VIEW kafka_to_clickhouse TO app_logs
AS SELECT 
    timestamp,
    level,
    message,
    host
FROM kafka_logs_queue;

✅ 此时,只要 Kafka 中有消息,就会自动流入 app_logs 表。


七、验证数据是否写入

-- 查看数据
SELECT * FROM app_logs LIMIT 10;

-- 统计日志级别
SELECT level, count(*) AS cnt 
FROM app_logs 
GROUP BY level;

-- 查询某天错误日志
SELECT message FROM app_logs 
WHERE level = 'ERROR' 
  AND timestamp >= '2025-04-05 00:00:00';

八、常见问题与优化建议

问题 解决方案
数据没进 ClickHouse? 检查 Kafka topic 是否有数据,system.kafka_consumers 表查看消费状态
消费延迟? 增加 kafka_num_consumers 或增加分区
JSON 解析失败? 确保 kafka_format 匹配数据格式(如 JSONEachRow
字段缺失? 在 Filebeat 中使用 decode_json_fields 或固定字段映射
存储太大? 使用 TTL 策略,或启用表压缩

九、进阶建议(生产可用)

  1. Kafka 高可用:使用 KRaft 模式或 ZooKeeper 集群
  2. ClickHouse 集群:分片 + 复制,应对大数据量
  3. 监控
    • Kafka:使用 kafka-topics.sh --describe 或 Prometheus + Grafana
    • ClickHouse:system.metrics, system.parts, system.kafka_consumers
  4. 安全
    • Kafka 支持 SASL/SSL
    • ClickHouse 配置用户权限
  5. 可视化:接入 Grafana,使用 ClickHouse 插件做仪表盘

十、总结

组件 角色 不可替代性
Filebeat 轻量采集 资源占用低,适合边缘节点
Kafka 解耦缓冲 高吞吐、可重放、支持多消费者
ClickHouse 分析引擎 快速聚合查询,适合日志分析

📌 这套架构特别适合:


🔗 参考资料


✅ 附录:完整配置文件打包建议

你可以将以下文件组织成项目结构:

/log-pipeline/
├── filebeat/
│   ├── to-kafka.yml
│   └── logs/ (软链接到实际日志目录)
├── kafka/
│   └── create-topic.sh
├── clickhouse/
│   └── schema.sql
└── README.md