ELK Stack
架构
| 组件 | 语言 | 角色 | 内存 |
|---|---|---|---|
| Filebeat | Go | 日志采集,轻量转发 | ~50MB |
| Logstash | JRuby | 日志解析、过滤、富化 | ~1GB |
| Elasticsearch | Java | 全文索引、存储、检索 | ≥2GB |
| Kibana | Node.js | 可视化、仪表盘 | ~500MB |
Filebeat 配置
filebeat.yml
filebeat.inputs:
# 采集 Nginx 日志
- type: log
paths:
- /var/log/nginx/access.log
fields:
service: nginx
env: production
# 多行日志合并(如 Java 堆栈)
multiline.pattern: '^\d{4}-\d{2}-\d{2}'
multiline.negate: true
multiline.match: after
# 采集容器日志
- type: container
paths:
- /var/lib/docker/containers/*/*.log
processors:
- add_kubernetes_metadata: ~
# 输出到 Kafka(推荐生产环境)
output.kafka:
hosts: ["kafka-1:9092", "kafka-2:9092"]
topic: "logs-%{[fields.service]}"
partition.round_robin:
reachable_only: true
# 或直接输出到 ES(小规模)
# output.elasticsearch:
# hosts: ["es-1:9200"]
# index: "logs-%{[fields.service]}-%{+yyyy.MM.dd}"
Logstash 配置
logstash.conf
input {
kafka {
bootstrap_servers => "kafka-1:9092,kafka-2:9092"
topics => ["logs-nginx", "logs-app"]
group_id => "logstash-consumer"
codec => json
}
}
filter {
# Nginx access log 解析
if [fields][service] == "nginx" {
grok {
match => {
"message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status} %{NUMBER:bytes}'
}
}
# GeoIP 地理位置
geoip { source => "client_ip" }
}
# JSON 日志直接解析
if [fields][service] == "app" {
json { source => "message" }
}
# 敏感信息脱敏
mutate {
gsub => [
"message", "\d{11}", "***PHONE***",
"message", "\d{16,19}", "***CARD***"
]
}
# 时间戳统一
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z", "ISO8601"]
target => "@timestamp"
}
}
output {
elasticsearch {
hosts => ["es-1:9200", "es-2:9200"]
index => "logs-%{[fields][service]}-%{+YYYY.MM.dd}"
# ILM 索引生命周期管理
ilm_enabled => true
ilm_rollover_alias => "logs"
ilm_policy => "logs-policy"
}
}
Elasticsearch 索引管理
ILM 索引生命周期策略
PUT _ilm/policy/logs-policy
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50gb",
"max_age": "1d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 }
}
},
"cold": {
"min_age": "30d",
"actions": {
"freeze": {}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}
常用查询
# KQL(Kibana 查询)
service: "order-service" AND level: "ERROR" AND message: "timeout"
# 查看索引状态
GET _cat/indices/logs-*?v&s=store.size:desc
# 查看分片分配
GET _cat/shards/logs-*?v
ES 性能优化
| 优化项 | 说明 |
|---|---|
| 分片数 | 单分片 20-50GB,避免过多小分片 |
| 副本数 | 写入密集时可暂设为 0,写完恢复 |
| Refresh | refresh_interval: 30s(默认 1s) |
| Bulk 写入 | 批量写入,单次 5-15MB |
| Mapping | keyword vs text 区分,禁用不需要的字段索引 |
| 冷热分离 | 热节点 SSD + 冷节点 HDD |
常见面试问题
Q1: ELK 如何处理高吞吐量日志?
答案:
- Kafka 缓冲:Filebeat → Kafka → Logstash,Kafka 承担峰值缓冲
- 多实例 Logstash:水平扩展解析能力
- ES 批量写入:Bulk API,调大
refresh_interval - 索引分片策略:按时间滚动(每天/每小时),热温冷分层
- 采集端过滤:Filebeat 层面丢弃 DEBUG 日志
Q2: Logstash 和 Filebeat 的区别?
答案:
| Filebeat | Logstash | |
|---|---|---|
| 语言 | Go | JRuby |
| 内存 | ~50MB | ~1GB |
| 功能 | 采集 + 简单过滤 | 复杂解析/过滤/富化 |
| 定位 | 每台机器部署一个 | 中心化解析节点 |
典型部署:Filebeat(每台机器)→ Kafka → Logstash(集中处理)→ ES。