问题
上周因为 OOM 问题,某个集群内的 Filebeat 被迫重启后,观测了许久,仍不见事件流恢复,查看 Filebeat 输出日志,发现只有其自监控的日志:
代码语言:javascript复制2021-05-28T03:19:41.061Z INFO [monitoring] log/log.go:145 Non-zero metrics in the last 30s {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":6249680,"time":{"ms":3024}},"total":{"ticks":13659880,"time":{"ms":6612},"value":13659880},"user":{"ticks":7410200,"time":{"ms":3588}}},"handles":{"limit":{"hard":1048576,"soft":1048576},"open":46},"info":{"ephemeral_id":"ca641ad8-e10a-496f-a087-6924e456aaea","uptime":{"ms":60180037}},"memstats":{"gc_next":42518272,"memory_alloc":31026880,"memory_total":1715304668248,"rss":-1552384},"runtime":{"goroutines":206}},"filebeat":{"events":{"added":139,"done":139},"harvester":{"open_files":0,"running":0}},"libbeat":{"config":{"module":{"running":0}},"pipeline":{"clients":1,"events":{"active":1,"filtered":139,"total":139},"registrar":{"states":{"current":55084,"update":13},"writes":{"success":139,"total":139}},"system":{"load":{"1":9.21,"15":10.97,"5":10.87,"norm":{"1":0.3838,"15":0.4571,"5":0.4529}}}}}}
从 "harvester":{"open_files":0,"running":0}}
我们可以判断出 harvester
尚未启动采集,这让我非常疑惑,Filebeat 究竟在做什么?
接着观察日志,发现除开自监控,最后输出一条日志的内容是:
代码语言:javascript复制2021-05-28T02:46:39.019Z INFO beater/crawler.go:73 Loading Inputs: 1
Loading Inputs
看起来也是符合逻辑的,短时间没有太多的头绪,放下忙其他工作了。
随着时间推移,当我再次观测 Filebeat 时,发现它已经在正常工作了,但是日志内依旧没有错误输出,找到恢复时间点的最早日志:
代码语言:javascript复制2021-05-28T05:55:17.822Z INFO log/input.go:152 Configured paths: [/data/logs/*/*.log* /data/v3logs/*/*/*.log*]
2021-05-28T05:55:17.822Z INFO input/input.go:114 Starting input of type: log; ID: 3062577341473220485
2021-05-28T05:55:17.822Z INFO beater/crawler.go:105 Loading and starting Inputs completed. Enabled inputs: 1
好家伙,从 Loading Inputs
→ Loading and starting Inputs completed
花费了3个多小时,这更让我疑惑了,Filebeat 究竟为什么一直装死?
原因
根据日志打印翻阅了 Filebeat 源码
Filebeat 使用 registry file
作为采集的状态存储,实际上就是一个纯文本的 JSON 文件。每次启动时都会检查 JSON 文件中的 states
是否需要更新(新增或者删除文件),而当任何一个 state
需要更新, registry file
将会全量序列化(读) → 持久化(写),随着 states
越来越多,JSON 文件会越来越大(接近20MB),每次全量读写都会越来越慢,并且 CPU、内存的使用量都会暴涨。
总的来说,在当前的数据存储选型下,Filebeat 无法应对过多的文件数据数量,启动时的数据核验时间过长(几小时→几天不等,视数据量而定),就会产生了“假死”的现象。
解决方案
临时的解决方案
暂停 Filebeat 进程,删除 registry file
,重启 Filebeat 进程。这时候由于 JSON 文件是比较小的,所有 state
均处于增量状态,数据同步是比较快的。但是所有已经发送过的事件将难以避免地重复发送一次,所以这种做法只能应急,不能长久处理。
长久的权宜之计
Filebeat 的纯文本的 JSON 存储选型天生就是存在问题的,社区内也曾做过一些小改进的尝试,最终并没有被合并到柱分枝。所以 Filebeat 无法应用过多的日志文件,这是一个短期内无法改变的事实。
而在当前选择的依赖背压的采集方案 中,我们并不倾向将日志文件留在采集管道中,而是将日志留在原处——机器的磁盘上,然后尽量保证管道的通畅,将日志实时采集到 ES 中。
这样做的好处,就是避免因为过多的管道传输导致日志丢失(例如 Redis 写满后崩溃)。
同时也会引入另一个问题:如果采集链路阻塞,同时过多的日志(采集条目每日2亿 )大于机器的磁盘承载能力时,日志丢失的风险依旧存在。
所以我们需要定期清理过期的日志,但问题也没那么简单:
- 直接删除日志文件 → 写日志的应用进程无法感知,将向无效文件句柄写日志 → 导致日志丢失
- 清空文件日志内容(
echo '' > {}
) → 导致文件数量不会减少 - 如果因为硬盘容量限制,删除日志的周期小于产品许诺的日志保存时长,当链路出现堵塞又未能及时处理 → 导致日志丢失
所以我写了一个 删除脚本,在保证清理过期日志的同时,会判断日志文件的句柄使用情况,跳过那些仍在被写入的文件,保证日志不丢失。
代码语言:javascript复制# -*- coding: utf-8 -*-
import datetime
import getopt
import glob
import logging
import subprocess
import sys
from collections import namedtuple
from pathlib import Path
from typing import Generator, Dict
logging.basicConfig(level=logging.DEBUG)
Process = namedtuple("Process", "command,pid,user")
def get_processes_open(file_regex: str) -> Dict[str, Process]:
"""Find processes with open handles for the specified file(s)."""
open_file_process_map = {}
try:
# maybe not safe
output = subprocess.getoutput(f"echo {file_regex} | xargs lsof")
except Exception:
logging.exception("exec error")
return open_file_process_map
lines = output.split("n")[1:]
for line in lines:
parts = [x for x in line.split(" ") if x]
open_file_process_map[parts[-1]] = Process(*parts[:3])
return open_file_process_map
def try_to_delete_files(file_regex: str, minutes: int):
"""Try to delete files"""
skipped_count = deleted_count = 0
def filter_files_by_expire_minutes() -> Generator[Path, None, None]:
"""Get all expired files"""
now = datetime.datetime.now()
for name in glob.glob(file_regex, recursive=True):
file_obj = Path(name)
updated = datetime.datetime.fromtimestamp(file_obj.stat().st_mtime)
if updated < now - datetime.timedelta(minutes=minutes):
yield file_obj
open_file_process_map = get_processes_open(file_regex)
for f in filter_files_by_expire_minutes():
if str(f) not in open_file_process_map:
logging.info(f"deleting {f}")
try:
f.unlink()
deleted_count = 1
except Exception as e:
skipped_count = 1
logging.warning("failed to delete file: %s, for: %s", str(f), e)
continue
else:
skipped_count = 1
logging.info(f"skipping {f}, for process: {open_file_process_map[str(f)]}")
print(f"Skipped: {skipped_count}, deleted: {deleted_count}")
if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:], "hf:m:", ["files=", "minutes="])
except getopt.GetoptError:
print("Example: python delete_files.py -f /data/*/*.log* -m 1440")
sys.exit(2)
arg_files = ""
arg_minutes = 1440
for opt, arg in opts:
if opt == "-h":
print("Usage: python delete_files.py -f /data/*/*.log* -m 1440")
sys.exit(2)
elif opt in ("-f", "--files"):
arg_files = arg
elif opt in ("-m", "--minutes"):
arg_minutes = int(arg)
started = datetime.datetime.now()
print(f"Started: {started}n>>>>>>>>>>>>")
try_to_delete_files(arg_files, arg_minutes)
finished = datetime.datetime.now()
print(
f">>>>>>>>>>>>nFinished: {finished}, total cost: {(finished - started).total_seconds()} seconds n"
)
delete_files.py
当然你也可以根据需求用 Bash 实现,这里就不展开了。(其实就是我不会 Bash)
然后我们需要将它跑在集群中的每一个节点上,定期执行清理工作:
首先定义镜像
代码语言:javascript复制FROM python:3
RUN apt-get update && apt-get install -y lsof
ADD delete_files.py /
CMD [ "python", "./delete_files.py", "-f", "./*.no", "-m", "1440" ]
Kubernetes DaemonSet 定义,在每一个节点上都尝试清理日志:
代码语言:javascript复制apiVersion: apps/v1
kind: DaemonSet
metadata:
name: log-cleaner
namespace: kube-system
labels:
k8s-app: log-cleaner
spec:
selector:
matchLabels:
k8s-app: log-cleaner
template:
metadata:
name: log-cleaner
labels:
k8s-app: log-cleaner
spec:
hostPID: true
containers:
- name: batch-delete-files
image: some-registry/batch-delete-files:v1.0.0
imagePullPolicy: Always
command: ["bash"]
args: ["-c", "while true; do python delete_files.py -f /some-path/*/*/*.log* -m 1440; sleep 3600; done;"]
resources:
limits:
cpu: 2560m
memory: 256Mi
requests:
cpu: 25m
memory: 32Mi
volumeMounts:
- mountPath: /some-path/
name: some-volume
volumes:
- name: some-volume
hostPath:
path: /some-path/
值得注意的是,由于我们需要在容器内使用 lsof
来看查看母机文件的 fd
使用情况,所以这里需要额外添加 hostPID: true
来保证能够读取到母机的进程信息。
由于我们需要定时执行,所以通过 while true; do ... sleep 3600; done;
来要额外控制执行周期。
结语
由于 Filebeat 存在天生的存储缺陷,我们需要通过额外的脚本较为精确的控制 Filebeat 的输入文件数量,当前的方案断然达不到完善,仍需要我们继续探索。
参考:
- https://github.com/elastic/beats/issues/16076
- https://stackoverflow.com/questions/36186630/python-error-subprocess-calledprocesserror-command-returned-non-zero-exit-stat
- https://stackoverflow.com/questions/55901375/is-this-possible-to-schedule-cronjob-to-execute-on-each-of-kubernetes-nodes