Filebeat → Kafka → ClickHouse 全链路实战
时间:2025-8-13 15:44 作者:wanzi 分类: 无
📝 构建高可用日志系统:Filebeat → Kafka → ClickHouse 全链路实战
场景:日志采集、缓冲、存储与分析一体化方案
一、背景与目标
在现代分布式系统中,服务产生的日志量巨大,传统的“直接写数据库”或“直连 Elasticsearch”方式存在性能瓶颈和耦合问题。
本文实现一个高吞吐、可扩展、易维护的日志采集与分析链路:
[应用日志]
↓
Filebeat(采集)
↓
Kafka(消息队列,缓冲解耦)
↓
ClickHouse(高性能分析数据库)
↓
Grafana / SQL 查询(可视化分析)
✅ 目标:
- 实现日志自动采集
- 支持结构化存储
- 能够快速查询与分析(如 PV/UV、错误统计等)
二、整体架构图
+----------------+ +--------+ +------------------+
| Application |---->|Filebeat|---->| Kafka |
+----------------+ +--------+ +------------------+
↓
+---------------+
| ClickHouse | ←[物化视图]
+---------------+
↓
[Grafana / CLI]
三、环境准备
组件 | 版本示例 | 部署方式 |
---|---|---|
Filebeat | 8.x | 每台应用服务器部署 |
Kafka | 3.0+ | 集群(推荐3节点) |
ZooKeeper | 3.8+ 或 KRaft 模式 | Kafka 依赖 |
ClickHouse | 23.8+ | 单机或集群 |
OS | Linux (Ubuntu/CentOS) |
💡 本文以单机测试环境为例,生产建议集群部署。
四、第一步:Filebeat 配置(采集日志并发送到 Kafka)
1. 安装 Filebeat
# Ubuntu 示例
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-8.x.list
sudo apt update && sudo apt install filebeat
2. 编写配置文件 to-kafka.yml
# to-kafka.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/myapp/*.log
tags: ["myapp"]
fields:
log_type: "app_log"
env: "production"
# 多行日志合并(例如 Java 异常栈)
multiline.type: pattern
multiline.pattern: '^\d{4}-\d{2}-\d{2}|\[ERROR\]'
multiline.negate: true
multiline.match: after
# 可选:处理字段
processors:
- decode_json_fields:
fields: ["message"]
process_array: false
max_depth: 1
target: ""
overwrite_keys: true
output.kafka:
hosts: ["kafka-server:9092"]
topic: filebeat-logs
partition.round_robin:
reachable_only: true
# 启用压缩提升性能
compression: gzip
max_message_bytes: 1000000
# 日志输出到控制台(调试用)
logging.level: info
logging.to_stderr: true
3. 启动 Filebeat
filebeat -e -c to-kafka.yml
📌
-e
表示输出日志到 stderr,方便调试。生产环境可改为后台运行。
五、第二步:Kafka 设置 Topic
确保 Kafka 已启动,并创建用于接收日志的 topic:
# 创建 topic
kafka-topics.sh --create \
--topic filebeat-logs \
--bootstrap-server kafka-server:9092 \
--partitions 3 \
--replication-factor 1
# 查看 topic
kafka-topics.sh --describe --topic filebeat-logs --bootstrap-server kafka-server:9092
✅ 建议分区数 ≥ ClickHouse 消费线程数,提高并发。
六、第三步:ClickHouse 接收 Kafka 数据
1. 安装 ClickHouse(Ubuntu 示例)
curl -s https://packagecloud.io/ClickHouse/clickhouse/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/clickhouse-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packagecloud.io/ClickHouse/clickhouse/debian/ $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/clickhouse.list
sudo apt update
sudo apt install clickhouse-server clickhouse-client
启动服务:
sudo service clickhouse-server start
2. 创建 Kafka 引擎表(消费队列)
CREATE TABLE kafka_logs_queue (
timestamp DateTime,
level String,
message String,
host String DEFAULT hostName()
) ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka-server:9092',
kafka_topic_list = 'filebeat-logs',
kafka_group_name = 'clickhouse-filebeat-group',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 1;
⚠️
kafka_group_name
相同的消费者属于同一组,避免重复消费。
3. 创建目标表(真实存储)
CREATE TABLE app_logs (
timestamp DateTime,
level String,
message String,
host String,
date Date MATERIALIZED toDate(timestamp)
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(date)
ORDER BY (timestamp, host)
TTL date + INTERVAL 30 DAY; -- 自动清理30天前数据
4. 创建物化视图(自动导入)
CREATE MATERIALIZED VIEW kafka_to_clickhouse TO app_logs
AS SELECT
timestamp,
level,
message,
host
FROM kafka_logs_queue;
✅ 此时,只要 Kafka 中有消息,就会自动流入 app_logs
表。
七、验证数据是否写入
-- 查看数据
SELECT * FROM app_logs LIMIT 10;
-- 统计日志级别
SELECT level, count(*) AS cnt
FROM app_logs
GROUP BY level;
-- 查询某天错误日志
SELECT message FROM app_logs
WHERE level = 'ERROR'
AND timestamp >= '2025-04-05 00:00:00';
八、常见问题与优化建议
问题 | 解决方案 |
---|---|
数据没进 ClickHouse? | 检查 Kafka topic 是否有数据,system.kafka_consumers 表查看消费状态 |
消费延迟? | 增加 kafka_num_consumers 或增加分区 |
JSON 解析失败? | 确保 kafka_format 匹配数据格式(如 JSONEachRow ) |
字段缺失? | 在 Filebeat 中使用 decode_json_fields 或固定字段映射 |
存储太大? | 使用 TTL 策略,或启用表压缩 |
九、进阶建议(生产可用)
- Kafka 高可用:使用 KRaft 模式或 ZooKeeper 集群
- ClickHouse 集群:分片 + 复制,应对大数据量
- 监控:
- Kafka:使用
kafka-topics.sh --describe
或 Prometheus + Grafana - ClickHouse:
system.metrics
,system.parts
,system.kafka_consumers
- Kafka:使用
- 安全:
- Kafka 支持 SASL/SSL
- ClickHouse 配置用户权限
- 可视化:接入 Grafana,使用 ClickHouse 插件做仪表盘
十、总结
组件 | 角色 | 不可替代性 |
---|---|---|
Filebeat | 轻量采集 | 资源占用低,适合边缘节点 |
Kafka | 解耦缓冲 | 高吞吐、可重放、支持多消费者 |
ClickHouse | 分析引擎 | 快速聚合查询,适合日志分析 |
📌 这套架构特别适合:
- 微服务日志集中分析
- 用户行为日志统计
- API 访问日志监控
- 错误日志告警系统
🔗 参考资料
- Filebeat 官方文档:https://www.elastic.co/guide/en/beats/filebeat/current/index.html
- Kafka 官方文档:https://kafka.apache.org/documentation/
- ClickHouse Kafka Engine:https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/
✅ 附录:完整配置文件打包建议
你可以将以下文件组织成项目结构:
/log-pipeline/
├── filebeat/
│ ├── to-kafka.yml
│ └── logs/ (软链接到实际日志目录)
├── kafka/
│ └── create-topic.sh
├── clickhouse/
│ └── schema.sql
└── README.md