干货 | Elastic Stack 技术栈应用于日志归集深度复盘

2021-12-28 10:39:07 浏览数 (1)

1、背景

  1. 公司行业偏传统,系统多日志杂,但是有任务需求,要把日志做规范化的归集和保存。
  2. 之前有两套基于 ES 的日志系统。一套是购买的厂商魔改且不可升级版;一套是开发团队基于某个核心系统定制的,除 ES 外的组件都是自行开发,且相关人员离职后无人继续维护,因此有一定局限性。
  3. 有着安全加密的要求。
  4. 涉及异地双中心,有跨集群查询展示需求。

2、框架规划

异地双中心各自独立集群。跨集群查询依靠 ES 自带的远程集群搜索(CCS)处理。

除了 filebeat 和 kibana 以外,所有组件均为物理环境部署。

主要功能组件如下:

  1. filebeat: 日志抽取并传输。
  2. kafka:队列组件,负责日志数据的缓冲。
  3. logstash:主要负责日志处理。抽取kafka的日志数据,同时也负责接受一些其它协议如syslog/udp的日志数据。在接收数据后进行日志清洗,最终写入 ES。
  4. elasticsearch:日志数据主要的存储模块。
  5. kibana:日志数据的查询和管理模块,有冷热节点分层,热数据节点 SSD 存储。
  6. grafana:负责日志数据的可视化。相较于kibana,grafana可以多数据源展示,更加灵活。
  7. metricbeat:负责logstash性能的监控。

3、日志平台规划

  1. 日志数据格式统一,最基础的以 {时间戳} {日志级别} {类名} {链路ID} {日志信息} 的形式输出。其中时间戳必须带年月日。
  2. 在物理环境下,日志当前输出文件名唯一,以 rotation 的方式维持日志总体大小一定,比如 100MB*10。输出日志的文件名唯一是为了尽量减少 filebeat 监控的日志文件数目,被监控的日志数量过于庞大,则 filebeat 的cpu使用会受到影响。
  3. 容器环境下 filebeat 以 sidecar 的模式另起一个 pod 进行监控。
  4. kafka 的 topic 以日志数据的流量大小来评估。将小流量的日志归并至同一个 topic,大流量的日志独立 topic。总体维持每个 topic 接收的日志数据流量规模相差不大。
  5. logstash 使用 pipeline 的模式进行配置的组装。每个 topic 尽量用单独一组(2-3个为一组) logstash 处理数据。
  6. 从 filebeat->kafka->logstash->elasticsearch->kibana 整个链路均加密,kibana上开启用户权鉴。
  7. 日志数据使用数据流管理,对节点做冷热分层,在 ILM 上对不同的日志数据进行不同的策略管理。

4、工作问题整理

4.1. 远程集群通讯和加密

由于异地双中心,且集群通讯加密,因此就涉及到了两个加密集群通讯的场景问题。

解决办法:

  • 方法一:使用相同的证书(ca文件)颁发机构为所有连接的群集生成证书。
  • 方法二:从本地群集中将 CA 证书添加为每个远程群集中的受信任 CA。如果是两个不同证书的集群,可以在 elasticsearch.yml 中的xpack.security.transport.ssl.certificate_authorities 添加对方的证书而得到受信。
  • 原文参考地址:https://blog.csdn.net/UbuntuTouch/article/details/116569527

当时使用的是第一种办法,直接使用的是同一个ca去颁发的证书。

注意:ES 数据节点之间维持数据的同步,每个节点会维持一定数量的长链接。这在本地机房的条件下没有什么问题,但是在异地集群的条件下,可能有网络不稳定的情况,远程集群查询也因此有返回500的问题。建议这种情况,可以考虑自建客户端远程短链接查询。

4.2. filebeat 与 kafka 通讯加密

在 elasticsearch 官网上有着详细的加密通讯配置手册(7.2版本有单独的章节说明),但是其中没有涉及 filebeat 与 kafka 的加密通讯配置。这块主要涉及 golang 程序与 java 程序之间的证书转换。

解决方法之主要操作步骤如下:

代码语言:javascript复制
#生成pem证书
keytool -importkeystore -srckeystore ./server.truststore.jks -destkeystore server.p12 -deststoretype PKCS12
openssl pkcs12 -in server.p12 -nokeys -out server.cer.pem
keytool -importkeystore -srckeystore ./server.keystore.jks -destkeystore client.p12 -deststoretype PKCS12
openssl pkcs12 -in client.p12 -nokeys -out client.cer.pem
openssl pkcs12 -in client.p12 -nodes -nocerts -out client.key.pem

#filebeat配置添加
ssl.enabled: true
ssl.key: "/opt/filebeat/filebeat-7.9.2-linux-x86_64/certs/client.key.pem"
ssl.certificate: "/opt/filebeat/filebeat-7.9.2-linux-x86_64/certs/client.cer.pem"
ssl.certificate_authorities: ["/opt/filebeat/filebeat-7.9.2-linux-x86_64/certs/server.cer.pem"]
ssl.verification_mode: none

4.3. kafka 消费问题

kafka 在 filebeat 与 logstash 之间充当数据缓冲的功能。

在 logstash 与 kafka 的参数配置上:

  1. logstash 程序组总的消费线程数与 kafka 中 topic 的patition数量保持了一致,比如:topic 的 partition 数量为 24,logstash 有3 个,每个 logstash 用于消费 kafka 的线程为 8 个。
  2. 消费 kafka 队列的每批次数量 batch_size 设置为 2000.
  3. logstash 的 pipeline 设置中,core 数量为服务器 cpu 核数。

但是在生产运行中,在增加了一个较大的日志流量归集后,logstash 消费 kafka 线程出现了消费组不断 rebalance 重选 leader 的情况。

而此时 es 的写入远没有达到瓶颈。而整体服务器资源使用也没出现异常。只能把优化方向转向 logstash 的消费上。

在群里小伙伴的帮助下,建议降低 logstash 的 batch_size,以减轻线程的 cpu 使用压力。

解决办法

最终生产上将 batch_size 降低到 150,logstash 的消费能平稳继续。

PS:由于当时没有 kafka 和 logstash 性能监控有效的工具,没有保留更多的现场。后续加上 metricbeat 监控到,在上述参数下三个 logstash 最高能达到 45000 条/s,以供参考。

4.4. filebeat 参数调优

在使用 filebeat 抽取日志数据上送的时候,遇到了句柄数不释放和 cpu 消耗较高的情况。

在经过测试后,在生产上主要调整了这几个参数:

  1. max_proc 设置为 2。可以抑制 filebeat 过高的使用 CPU。
  2. close_timeout 设置为 5m。设置时间越短,句柄释放越快。

filebeat 官方手册最后有一章节关于问题定位的内容,建议在使用之前好好阅读。

4.5. 关于 filebeat 掉线

鉴于 filebeat 是使用 golang 开发的。而生产环境下,总会有一些复杂的情况,比如操作系统版本比较旧等情况。因此,会出现 filebeat 程序掉线的情况。

解决办法

增加检查 filebeat 并能够重新拉起的定时任务。

4.6. 入库后 grok 查询

日志数据在使用时也有不少统计查询的需求,有时候这些需求并不固定,日志相对于的格式也并不是固定的,并不能在 logstash 进行日志清洗时全部解决,只能在查询阶段进行处理。

解决办法:方法 1. 使用 runtime 字段进行转换处理,需要 ES 版本高于 7.11 。

方法 2. 使用 pipeline reindex 处理。利用 pipeline 的 grok 将日志数据进行进一步清洗,reindex 后统计查询。

第一种办法比较简单,但受限于需要较高的 ES 版本。这里主要记录一下第二种办法的处理流程。

代码语言:javascript复制
# 创建提取pipeline,此处假设提取的为sn号
PUT _ingest/pipeline/get-sn
{   
  "processors": [
    {
      "grok": {
        "description": "Extract fields from 'message'",
        "field": "message",
        "patterns": ["""%{TIMESTAMP_ISO8601:log_date}(s*)(|)(s*)%{LOGLEVEL:log_level}(s*)(|)(s*)%{DATA:thread}(s*)(|)(s*)%{DATA:class}(s*)(|)(s*)%{DATA:traceId}(s*)(|)(s*)%{DATA:spanId}(s*)(|)(s*)%{DATA:parentid}(s*)(|-) 发送设备sn:%{DATA:sn},%{GREEDYDATA:messages}"""]
      }
    },
    {
      "date": {
        "description": "Format 'log_date' as 'yyyy-MM-dd HH:mm:ss.SSS Z'",
        "field": "log_date",
        "formats": [ "yyyy-MM-dd'T'HH:mm:ss.SSSZ" ]
      }
    }
  ]
}

# 创建临时索引,设置目标字段格式
PUT temp-test
{
  "settings": {
    "number_of_shards": 8,"number_of_replicas": 1
  }
}

POST temp-test/_mapping
{"properties":{"sn":{"type":"keyword"}}}

# reindex 提取数据,slice参数与目标索引分片一致,filter的是日志查询的过滤条件。
POST _reindex?wait_for_completion=false&slices=8
{
  "source": {
    "index": "logs-test",
 "size": 10000,
    "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "@timestamp": {
              "time_zone": " 08:00",
              "gte": "2021-11-09T00:00:00",
              "lte": "2021-11-10T00:00:00"
            }
          }
        },
        {
          "match_phrase": {
            "message": "发送设备sn:"
          }
        },        {
          "term": {   
            "fields.app" :"testsn"
          }
        }
      ]
    }
    }
  },
  "dest": {
    "index": "temp-test",
    "pipeline": "get-sn"
  }
}

# 根据返回的taskid查看进度,"completed" : true 才完成
GET _tasks/otGecFQMQTOAO85G4F_eAw:1550780976

# 查询sn的聚合
GET temp-test/_search
{
  "size": 0,
  "aggs": {
    "distinct-sn": {
      "cardinality": {
        "field": "sn"
      }
    }
  }
}

4.7. ILM 的执行策略

ILM 的策略默认检查时间是 10mins

但是在实际执行中,ILM 会定期运行,检查索引是否满足策略标准,并执行所需的步骤。

为了避免竞争条件,ILM 可能需要运行不止一次来执行完成一个操作所需的所有步骤。

例如,如果 ILM 确定一个索引满足 rollover 标准,它就开始执行完成rollover 操作所需的准备步骤。如果它不满足 rollover 条件,执行停止。

下一次ILM运行时,ILM将从它中断的地方继续执行。

这意味着即使indices.lifecycle.poll_interval ( _cluster/settings ) 被设置为10分钟,并且索引满足 rollover 标准,在完成翻转之前可能需要20分钟。

indices.lifecycle.poll_interval也可以被调整,但是不建议调整太密集,会增加 elasticsearch 运行负担。

调整命令如下:

代码语言:javascript复制
PUT _cluster/settings
{
  "transient": {
    "indices": {
      "lifecycle": {
        "poll_interval": "2m"
      }
    }
  }
}

ILM 的相关 api 也可以查看 ILM 在当前索引执行的阶段结果和进行修复。

检查命令

代码语言:javascript复制
GET /my-index-000001/_ilm/explain

重试命令

代码语言:javascript复制
POST /my-index-000001/_ilm/retry`

4.8. rollover 的使用

rollover 的条件主要有索引大小(max_size)、文档数量(max_docs)、索引存在时间(max_age)这三个。同时,max_size判断的是主分片的大小,而不是整个索引的大小。

Elasticsearch 7.9 版本下,rollover的三个可设置条件中,max_docs 与 max_age 相对敏感,max_size 的判断会有一定延迟,可能是因为索引的主分片 size 大小会被 merge 后收缩,需要有一定时间的观察.测试之下,200 MB 之下的max_size会失效。

遗留问题

  1. filebeat掉线的问题虽然通过定时任务解决了。但是从技术上来说,并没有找到真正原因。在官方论坛上,有工程师把原因归于 golang 程序的不稳定。如果有遇到类似问题的小伙伴,欢迎提供相关线索和解决思路。
  2. kibana在容器中的多实例部署。背景在于生产的容器当前只能自行打镜像,不便拉取官方镜像,在生成多个kibana容器实例时,并不能形成多个uuid,导致现在容器上的kibana实例只有一个并不能水平扩展。欢迎有容器化经验的大拿指点迷津。

小结

以上是本人在基于 elasticsearch 相关开源技术栈进行归集日志时的方案和相关问题记录。

经验不多,水平有限,欢迎大家多多探讨交流。

作者介绍

金多安,Elasticsearch 认证(ECE)专家,Elastic中文社区责任编辑。

审稿人:铭毅天下

0 人点赞