使用fluent bit+ClickHouse 实现K8s日志采集

2021-12-28 16:26:33 浏览数 (1)

准备工作和了解

Fluent bit是一个用C写成的插件式、轻量级、多平台开源日志收集工具。它允许从不同的源收集数据并发送到多个目的地。完全兼容docker和kubernetes生态环境。

img

上图就是对它工作流程的全局概述,它通过输入、转换、过滤、缓冲、路由到输出而完成日志的收集。

通过对FluentBit input和output插件的配置可以实现从收集不同渠道的日志输出到目标渠道中。

fluent bit 本身是C语言编写,扩展插件有一定的难度。官方提供实现了官方提供了fluent-bit-go,可以实现采用go语言来编写插件,目前只支持output的编写。

使用FluentBit采集K8s容器日志,需要在FluentBit的conf文件中完成以下input配置(这里配置了docker中运行的容器log文件位置,fluent会采集文件中的日志并自动完成简单的过滤清洗操作):

代码语言:javascript复制
[INPUT]
 Name              tail #这里使用tail插件,具有与 tail -f shell 命令类似的行为,下面的配置项会因为这里选择的插件不同而不同
 Tag               kube.*  #读取的行的标记
 Path              /var/log/containers/*.log #通过使用通用通配符指定特定日志文件或多个日志文件的路径。
 Parser            docker #指定解析器的名称 
 DB                /var/log/flb_kube.db  #指定数据库文件以跟踪受监视的文件和偏移量
 Mem_Buf_Limit     5MB    #设置 Tail 插件可以使用的内存限制。
 Skip_Long_Lines   On     #当被监控的文件由于一行很长而达到缓冲容量时,默认停止监控
 Refresh_Interval  10 //刷新被监控的文件列表的时间间隔,以秒为单位

outPut配置根据自己编写的output插件情况完成配置(编写了一个ClickHouse插件):

代码语言:javascript复制
[OUTPUT]
 Name            clickhouse #插件名
 Match           * #匹配范围

目前fluent bit 官方没有支持clickhouse 的output 插件,需要自己开发。

将output插件打包为OS包合并fluent-Bit一起打成docker镜像,通过挂载configMap配置文件方式配置fluent-Bit在k8s中以容器方式运行,在每个node节点上都运行一个pod,fluent-Bit通过配置文件中配置的日志采集位置对日志进行采集,经过处理后采用配置的output插件将数据存储到ClickHouse中。

根据用户需要用户可以通过编辑output插件完成对日志对接其他数据库或中间件输出(比如mysql、redis等)

以下附开发的支持clickhouse 的output 插件代码:

FluentBit会调用用户编写的FLBPluginRegister(插件注册,标识插件的名称,需和fluent-Bit配置中的OUTPUT.Name对应)、FLBPluginInit(插件初始化)、FLBPluginFlush(插件的输出逻辑,可以简单处理日志数据,完成输出操作)、FLBPluginExit(插件的退出)方法完成日志数据的输出。

代码语言:javascript复制
package main

import (
   "C"
   "database/sql"
   "fmt"
   "os"
   "strconv"
   "sync"
   "time"
   "unsafe"
   "github.com/fluent/fluent-bit-go/output"
   "github.com/kshvakov/clickhouse"
   klog "k8s.io/klog"
)

var (
   client *sql.DB

   database  string
   table     string
   batchSize int

   insertSQL = "INSERT INTO %s.%s(date, cluster, namespace, app, pod_name, container_name, host, log, ts) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"

   rw     sync.RWMutex
   buffer = make([]Log, 0)
)

const (
   DefaultWriteTimeout string = "20"
   DefaultReadTimeout  string = "10"

   DefaultBatchSize int = 1024
)

type Log struct {
   Cluster   string
   Namespace string
   App       string
   Pod       string
   Container string
   Host      string
   Log       string
   Ts        time.Time
}

//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
   return output.FLBPluginRegister(ctx, "clickhouse", "Clickhouse Output Plugin.!")
}

//export FLBPluginInit
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {

   var host string
   if v := os.Getenv("CLICKHOUSE_HOST"); v != "" {
      host = v
   } else {
      klog.Error("you must set host of clickhouse!")
      return output.FLB_ERROR
   }

   var user string
   if v := os.Getenv("CLICKHOUSE_USER"); v != "" {
      user = v
   } else {
      klog.Error("you must set user of clickhouse!")
      return output.FLB_ERROR
   }

   var password string
   if v := os.Getenv("CLICKHOUSE_PASSWORD"); v != "" {
      password = v
   } else {
      klog.Error("you must set password of clickhouse!")
      return output.FLB_ERROR
   }

   if v := os.Getenv("CLICKHOUSE_DATABASE"); v != "" {
      database = v
   } else {
      klog.Error("you must set database of clickhouse!")
      return output.FLB_ERROR
   }

   if v := os.Getenv("CLICKHOUSE_TABLE"); v != "" {
      table = v
   } else {
      klog.Error("you must set table of clickhouse!")
      return output.FLB_ERROR
   }

   if v := os.Getenv("CLICKHOUSE_BATCH_SIZE"); v != "" {
      size, err := strconv.Atoi(v)
      if err != nil {
         klog.Infof("you set the default bacth_size: %d", DefaultBatchSize)
         batchSize = DefaultBatchSize
      }
      batchSize = size
   } else {
      klog.Infof("you set the default bacth_size: %d", DefaultBatchSize)
      batchSize = DefaultBatchSize
   }

   var writeTimeout string
   if v := os.Getenv("CLICKHOUSE_WRITE_TIMEOUT"); v != "" {
      writeTimeout = v
   } else {
      klog.Infof("you set the default write_timeout: %s", DefaultWriteTimeout)
      writeTimeout = DefaultWriteTimeout
   }

   var readTimeout string
   if v := os.Getenv("CLICKHOUSE_READ_TIMEOUT"); v != "" {
      readTimeout = v
   } else {
      klog.Infof("you set the default read_timeout: %s", DefaultReadTimeout)
      readTimeout = DefaultReadTimeout
   }

   dsn := "tcp://"   host   "?username="   user   "&password="   password   "&database="   database   "&write_timeout="   writeTimeout   "&read_timeout="   readTimeout

   db, err := sql.Open("clickhouse", dsn)
   if err != nil {
      klog.Error("connecting to clickhouse: %v", err)
      return output.FLB_ERROR
   }

   if err := db.Ping(); err != nil {
      if exception, ok := err.(*clickhouse.Exception); ok {
         klog.Errorf("[%d] %s n%sn", exception.Code, exception.Message, exception.StackTrace)
      } else {
         klog.Errorf("Failed to ping clickhouse: %v", err)
      }
      return output.FLB_ERROR
   }
   // ==
   client = db

   return output.FLB_OK
}

//export FLBPluginFlush
// FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent.
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
   rw.Lock()
   defer rw.Unlock()
   if err := client.Ping(); err != nil {
      if exception, ok := err.(*clickhouse.Exception); ok {
         klog.Errorf("[%d] %s n%sn", exception.Code, exception.Message, exception.StackTrace)
      } else {
         klog.Errorf("Failed to ping clickhouse: %v", err)
      }
      return output.FLB_ERROR
   }
   var ret int
   var timestampData interface{}
   var mapData map[interface{}]interface{}
   dec := output.NewDecoder(data, int(length))

   for {
      ret, timestampData, mapData = output.GetRecord(dec)
      if ret != 0 {
         break
      }
      var timestamp time.Time
      switch t := timestampData.(type) {
      case output.FLBTime:
         timestamp = timestampData.(output.FLBTime).Time
      case uint64:
         timestamp = time.Unix(int64(t), 0)
      default:
         timestamp = time.Now()
      }

      flattenData, err := Flatten(mapData, "", UnderscoreStyle)
      if err != nil {
         break
      }

      log := Log{}
      for k, v := range flattenData {
         value := ""
         switch t := v.(type) {
         case string:
            value = t
         case []byte:
            value = string(t)
         default:
            value = fmt.Sprintf("%v", v)
         }

         switch k {
         case "cluster":
            log.Cluster = value
         case "kubernetes_namespace_name":
            log.Namespace = value
         case "kubernetes_labels_app":
            log.App = value
         case "kubernetes_labels_k8s-app":
            log.App = value
         case "kubernetes_pod_name":
            log.Pod = value
         case "kubernetes_container_name":
            log.Container = value
         case "kubernetes_host":
            log.Host = value
         case "log":
            log.Log = value
         }

      }

      log.Ts = timestamp
      buffer = append(buffer, log)
   }

   // sink data
   if len(buffer) < batchSize {
      return output.FLB_OK
   }

   sql := fmt.Sprintf(insertSQL, database, table)

   tx, err := client.Begin()
   if err != nil {
      klog.Errorf("begin transaction failure: %s", err.Error())
      return output.FLB_ERROR
   }

   // build statements
   smt, err := tx.Prepare(sql)
   if err != nil {
      klog.Errorf("prepare statement failure: %s", err.Error())
      return output.FLB_ERROR
   }
   for _, l := range buffer {
      _, err = smt.Exec(l.Ts, l.Cluster, l.Namespace, l.App, l.Pod, l.Container, l.Host,
         l.Log, l.Ts)

      if err != nil {
         klog.Errorf("statement exec failure: %s", err.Error())
         return output.FLB_ERROR
      }
   }

   // commit and record metrics
   if err = tx.Commit(); err != nil {
      klog.Errorf("commit failed failure: %s", err.Error())
      return output.FLB_ERROR
   }

   buffer = make([]Log, 0)

   return output.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
   return output.FLB_OK
}

func main() {
}

ClickHouse的数据结构

建表sql

这里的表结构可以根据需要修改,具体的入库操作在output插件中进行开发。

代码语言:javascript复制
CREATE DATABASE IF NOT EXISTS scmp;
CREATE TABLE IF NOT EXISTS scmp.logs(
    date Date DEFAULT toDate(0),
    cluster String,namespace String,
    app String,
    pod_name String,
    container_name String,
    host String,log String,
    ts DateTime) 
ENGINE = MergeTree() PARTITION BY toYYYYMMDD(date) 
ORDER BY (cluster, namespace, app, pod_name, container_name, host,ts);

表结构

img

部署对应K8s资源

挂载的配置文件:

代码语言:javascript复制
apiVersion: v1
kind: ConfigMap
metadata:
  name: k8s-log-agent-config
  namespace: kube
  labels:
    k8s-app: k8s-log-agent
data:
  # Configuration files: server, input, filters and output
  # ======================================================
  fluent-bit.conf: |
    [SERVICE]
     Flush         1
     Log_Level     error
     Daemon        off
     Parsers_File  parsers.conf
     HTTP_Server   On
     HTTP_Listen   0.0.0.0
     HTTP_Port     2020

    @INCLUDE input-kubernetes.conf
    @INCLUDE filter-kubernetes.conf
    @INCLUDE output-kubernetes.conf

  input-kubernetes.conf: |
    [INPUT]
        Name              tail
        Tag               kube.*
        Path              /var/log/containers/*.log
        Parser            docker
        DB                /var/log/flb_kube.db
        Mem_Buf_Limit     5MB
        Skip_Long_Lines   On
        Refresh_Interval  10

  filter-kubernetes.conf: |
    [FILTER]
     Name                kubernetes
     Match               *
     Kube_URL            https://kubernetes.default.svc.cluster.local:443
     Merge_Log           On
     Annotations         Off
     Kube_Tag_Prefix     kube.var.log.containers.
     Merge_Log_Key       log_processed

    [FILTER]
     Name                modify
     Match               *
     Set  cluster  ${CLUSTER_NAME}
  output-kubernetes.conf: |
    [OUTPUT]
     Name            clickhouse
     Match           *
  parsers.conf: |
    [PARSER]
     Name   apache
     Format regex
     Regex  ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) [(?<time>[^]]*)] "(?<method>S )(?:  (?<path>[^"]*?)(?:  S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^"]*)" "(?<agent>[^"]*)")?$
     Time_Key time
     Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
     Name   apache2
     Format regex
     Regex  ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) [(?<time>[^]]*)] "(?<method>S )(?:  (?<path>[^ ]*)  S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^"]*)" "(?<agent>[^"]*)")?$
     Time_Key time
     Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
     Name   apache_error
     Format regex
     Regex  ^[[^ ]* (?<time>[^]]*)] [(?<level>[^]]*)](?: [pid (?<pid>[^]]*)])?( [client (?<client>[^]]*)])? (?<message>.*)$

    [PARSER]
     Name   nginx
     Format regex
     Regex ^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) [(?<time>[^]]*)] "(?<method>S )(?:  (?<path>[^"]*?)(?:  S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^"]*)" "(?<agent>[^"]*)")?$
     Time_Key time
     Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
     Name   json
     Format json
     Time_Key time
     Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
     Name         docker
     Format       json
     Time_Key     time
     Time_Format  %Y-%m-%dT%H:%M:%S.%L
     Time_Keep    On

    [PARSER]
     Name        syslog
     Format      regex
     Regex       ^<(?<pri>[0-9] )>(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_/.-]*)(?:[(?<pid>[0-9] )])?(?:[^:]*:)? *(?<message>.*)$
     Time_Key    time
     Time_Format %b %d %H:%M:%S

资源权限控制:

代码语言:javascript复制
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
    name: k8s-log-agent-read
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: k8s-log-agent-read
subjects:
  - kind: ServiceAccount
    name: k8s-log-agent
    namespace: kube-system

---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: k8s-log-agent-read
rules:
  - apiGroups: [""]
    resources:
      - namespaces
      - pods
    verbs: ["get", "list", "watch"]

---


apiVersion: v1
kind: ServiceAccount
metadata:
  name:k8s-log-agent
  namespace: kube

在各个node上启动容器:

代码语言:javascript复制
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: k8s-log-agent
  namespace: kube
  labels:
    k8s-app: k8s-log-agent
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  selector:
    matchLabels:
      k8s-app: k8s-log-agent
  template:
    metadata:
      labels:
        k8s-app: k8s-log-agent
        version: v1
        kubernetes.io/cluster-service: "true"
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "2020"
        prometheus.io/path: /api/v1/metrics/prometheus
    spec:
      containers:
        - name: fluent-bit
          image: iyacontrol/fluent-bit-ck:1.2.2
          imagePullPolicy: Always
          ports:
            - containerPort: 2020
          resources:
            limits:
              cpu: 200m
              memory: 200Mi
            requests:
              cpu: 200m
              memory: 200Mi
          env:
            - name: CLUSTER_NAME
              value: "clickhouse"
            - name: CLICKHOUSE_HOST
              value: "10.1.62.62:9150"
            - name: CLICKHOUSE_USER
              value: "oms"
            - name: CLICKHOUSE_PASSWORD
              value: "EBupt123"
            - name: CLICKHOUSE_DATABASE
              value: "scmp"
            - name: CLICKHOUSE_TABLE
              value: "logs"
            - name: NODENAME
              valueFrom:
                fieldRef:
                  fieldPath: spec.nodeName
          volumeMounts:
            - name: varlog
              mountPath: /var/log
            - name: varlibdockercontainers
              mountPath: /var/lib/docker/containers
              readOnly: true
            - name: k8s-log-agent-config
              mountPath: /fluent-bit/etc/
      terminationGracePeriodSeconds: 10
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: k8s-log-agent-config
        configMap:
          name: k8s-log-agent-config
      serviceAccountName: k8s-log-agent
      tolerations:
      - key: node-role.kubernetes.io/master
        operator: Exists
        effect: NoSchedule

数据库存储结果

img

img

fluent简单性能测试

测试准备

FLB input插件配置:

代码语言:javascript复制
Name              Tail  #使用tail模式
Buffer_Chunk_Size 32k   #设置初始缓冲区大小以读取文件数据 默认为32
Buffer_Max_Siz    32k   #设置每个被监控文件的缓冲区大小限制 默认为32
Mem_Buf_Limit     5MB   #设置 Tail 插件在将数据附加到引擎时可以使用的内存限制。如果达到限制,则暂停;当数据被刷新时,它会恢复。 无默认值
Skip_Long_Lines   On    #当一行日志长度超过Buffer_Max_Size的设置时否停止监控该文件,设置为on为跳过长行并继续处理适合缓冲区大小的其他行 默认off为停止监控该文件
Refresh_Interval  10    #重新获取一次日志的时间间隔 默认60
Read_from_Head    False #对于开始时新发现的文件(没有数据库偏移/位置),从文件的头部读取内容,而不是尾部 默认False
Rotate_Wait       5      #指定在刷新某些挂起数据时监控文件一次的额外时间 默认5
DB                /var/log/flb_kube.db  #指定数据库文件以记录受监控的文件和偏移量 
DB.sync           normal    #设置默认同步 (I/O) 方法 默认normal
DB.locking        false     #指定数据库只能由 Fluent Bit 访问 默认false
DB.journal_mode   WAL       #设置数据库的日志模式 默认wal
exit_on_eof       false     #当读取文件时,它会在到达文件末尾时立即退出 默认false

FLB filter配置:

代码语言:javascript复制
Name                kubernetes #使用tail模式
Match               * #匹配规则
Kube_URL            https://kubernetes.default.svc.cluster.local:443 #k8s API 服务器端点
Merge_Log           On   #启用后,它会检查日志字段内容是否为 JSON 字符串映射,如果是,则将映射字段附加为日志结构的一部分。
Annotations         On   #在额外的元数据中包含 Kubernetes 资源注释 默认为on
Kube_Tag_Prefix     kube.var.log.containers. #当源记录来自 Tail 输入插件时,此选项指定 Tail 配置中使用的前缀
Merge_Log_Key       log_processed #启用 Merge_Log 后,过滤器会尝试假定传入消息中的日志字段是 JSON 字符串消息,并在映射中日志字段的同一级别对其进行结构化表示,设置key后从原始日志内容中提取的所有新结构化字段都将插入到新键下

FLB运行环境cpu和mem均未设置上限

测试开始

分别记录FLB空载、日志打印频率为5000条/秒、1000条/秒、500条/秒、200条/秒、50条/秒、1条/秒几个状态10分钟内cpu和内存的变化范围、记录各条件下开始记录日志到开始出现滞留的时间,出现滞留后一段时间的日志滞留数目。

测试直接使用htop命令进行

3分钟后滞留条数在10秒产生的日志数目内没有被记录

img

测试中发现了比较有意思的点是当日志打印频率在1000条/秒和500条/秒两个范围时,cpu的波动情况比较稳定,不会超过10个%,但是在其他状况,5000条/秒和100条/秒50条/秒几个状态下,cpu的波动范围很大,最高值都可以达到52,最低值可以达到35,这种特殊值出现的次数比1000条/秒和500条/秒出现的次数要多。

结合之前多次测试的情况,也出现了在1000条/秒时cpu的稳定区间要比500条/秒低的情况。

0 人点赞