准备工作和了解
Fluent bit是一个用C写成的插件式、轻量级、多平台开源日志收集工具。它允许从不同的源收集数据并发送到多个目的地。完全兼容docker和kubernetes生态环境。
img
上图就是对它工作流程的全局概述,它通过输入、转换、过滤、缓冲、路由到输出而完成日志的收集。
通过对FluentBit input和output插件的配置可以实现从收集不同渠道的日志输出到目标渠道中。
fluent bit 本身是C语言编写,扩展插件有一定的难度。官方提供实现了官方提供了fluent-bit-go,可以实现采用go语言来编写插件,目前只支持output的编写。
使用FluentBit采集K8s容器日志,需要在FluentBit的conf文件中完成以下input配置(这里配置了docker中运行的容器log文件位置,fluent会采集文件中的日志并自动完成简单的过滤清洗操作):
代码语言:javascript复制[INPUT]
Name tail #这里使用tail插件,具有与 tail -f shell 命令类似的行为,下面的配置项会因为这里选择的插件不同而不同
Tag kube.* #读取的行的标记
Path /var/log/containers/*.log #通过使用通用通配符指定特定日志文件或多个日志文件的路径。
Parser docker #指定解析器的名称
DB /var/log/flb_kube.db #指定数据库文件以跟踪受监视的文件和偏移量
Mem_Buf_Limit 5MB #设置 Tail 插件可以使用的内存限制。
Skip_Long_Lines On #当被监控的文件由于一行很长而达到缓冲容量时,默认停止监控
Refresh_Interval 10 //刷新被监控的文件列表的时间间隔,以秒为单位
outPut配置根据自己编写的output插件情况完成配置(编写了一个ClickHouse插件):
代码语言:javascript复制[OUTPUT]
Name clickhouse #插件名
Match * #匹配范围
目前fluent bit 官方没有支持clickhouse 的output 插件,需要自己开发。
将output插件打包为OS包合并fluent-Bit一起打成docker镜像,通过挂载configMap配置文件方式配置fluent-Bit在k8s中以容器方式运行,在每个node节点上都运行一个pod,fluent-Bit通过配置文件中配置的日志采集位置对日志进行采集,经过处理后采用配置的output插件将数据存储到ClickHouse中。
根据用户需要用户可以通过编辑output插件完成对日志对接其他数据库或中间件输出(比如mysql、redis等)
以下附开发的支持clickhouse 的output 插件代码:
FluentBit会调用用户编写的FLBPluginRegister(插件注册,标识插件的名称,需和fluent-Bit配置中的OUTPUT.Name对应)、FLBPluginInit(插件初始化)、FLBPluginFlush(插件的输出逻辑,可以简单处理日志数据,完成输出操作)、FLBPluginExit(插件的退出)方法完成日志数据的输出。
代码语言:javascript复制package main
import (
"C"
"database/sql"
"fmt"
"os"
"strconv"
"sync"
"time"
"unsafe"
"github.com/fluent/fluent-bit-go/output"
"github.com/kshvakov/clickhouse"
klog "k8s.io/klog"
)
var (
client *sql.DB
database string
table string
batchSize int
insertSQL = "INSERT INTO %s.%s(date, cluster, namespace, app, pod_name, container_name, host, log, ts) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
rw sync.RWMutex
buffer = make([]Log, 0)
)
const (
DefaultWriteTimeout string = "20"
DefaultReadTimeout string = "10"
DefaultBatchSize int = 1024
)
type Log struct {
Cluster string
Namespace string
App string
Pod string
Container string
Host string
Log string
Ts time.Time
}
//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
return output.FLBPluginRegister(ctx, "clickhouse", "Clickhouse Output Plugin.!")
}
//export FLBPluginInit
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {
var host string
if v := os.Getenv("CLICKHOUSE_HOST"); v != "" {
host = v
} else {
klog.Error("you must set host of clickhouse!")
return output.FLB_ERROR
}
var user string
if v := os.Getenv("CLICKHOUSE_USER"); v != "" {
user = v
} else {
klog.Error("you must set user of clickhouse!")
return output.FLB_ERROR
}
var password string
if v := os.Getenv("CLICKHOUSE_PASSWORD"); v != "" {
password = v
} else {
klog.Error("you must set password of clickhouse!")
return output.FLB_ERROR
}
if v := os.Getenv("CLICKHOUSE_DATABASE"); v != "" {
database = v
} else {
klog.Error("you must set database of clickhouse!")
return output.FLB_ERROR
}
if v := os.Getenv("CLICKHOUSE_TABLE"); v != "" {
table = v
} else {
klog.Error("you must set table of clickhouse!")
return output.FLB_ERROR
}
if v := os.Getenv("CLICKHOUSE_BATCH_SIZE"); v != "" {
size, err := strconv.Atoi(v)
if err != nil {
klog.Infof("you set the default bacth_size: %d", DefaultBatchSize)
batchSize = DefaultBatchSize
}
batchSize = size
} else {
klog.Infof("you set the default bacth_size: %d", DefaultBatchSize)
batchSize = DefaultBatchSize
}
var writeTimeout string
if v := os.Getenv("CLICKHOUSE_WRITE_TIMEOUT"); v != "" {
writeTimeout = v
} else {
klog.Infof("you set the default write_timeout: %s", DefaultWriteTimeout)
writeTimeout = DefaultWriteTimeout
}
var readTimeout string
if v := os.Getenv("CLICKHOUSE_READ_TIMEOUT"); v != "" {
readTimeout = v
} else {
klog.Infof("you set the default read_timeout: %s", DefaultReadTimeout)
readTimeout = DefaultReadTimeout
}
dsn := "tcp://" host "?username=" user "&password=" password "&database=" database "&write_timeout=" writeTimeout "&read_timeout=" readTimeout
db, err := sql.Open("clickhouse", dsn)
if err != nil {
klog.Error("connecting to clickhouse: %v", err)
return output.FLB_ERROR
}
if err := db.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
klog.Errorf("[%d] %s n%sn", exception.Code, exception.Message, exception.StackTrace)
} else {
klog.Errorf("Failed to ping clickhouse: %v", err)
}
return output.FLB_ERROR
}
// ==
client = db
return output.FLB_OK
}
//export FLBPluginFlush
// FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent.
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
rw.Lock()
defer rw.Unlock()
if err := client.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
klog.Errorf("[%d] %s n%sn", exception.Code, exception.Message, exception.StackTrace)
} else {
klog.Errorf("Failed to ping clickhouse: %v", err)
}
return output.FLB_ERROR
}
var ret int
var timestampData interface{}
var mapData map[interface{}]interface{}
dec := output.NewDecoder(data, int(length))
for {
ret, timestampData, mapData = output.GetRecord(dec)
if ret != 0 {
break
}
var timestamp time.Time
switch t := timestampData.(type) {
case output.FLBTime:
timestamp = timestampData.(output.FLBTime).Time
case uint64:
timestamp = time.Unix(int64(t), 0)
default:
timestamp = time.Now()
}
flattenData, err := Flatten(mapData, "", UnderscoreStyle)
if err != nil {
break
}
log := Log{}
for k, v := range flattenData {
value := ""
switch t := v.(type) {
case string:
value = t
case []byte:
value = string(t)
default:
value = fmt.Sprintf("%v", v)
}
switch k {
case "cluster":
log.Cluster = value
case "kubernetes_namespace_name":
log.Namespace = value
case "kubernetes_labels_app":
log.App = value
case "kubernetes_labels_k8s-app":
log.App = value
case "kubernetes_pod_name":
log.Pod = value
case "kubernetes_container_name":
log.Container = value
case "kubernetes_host":
log.Host = value
case "log":
log.Log = value
}
}
log.Ts = timestamp
buffer = append(buffer, log)
}
// sink data
if len(buffer) < batchSize {
return output.FLB_OK
}
sql := fmt.Sprintf(insertSQL, database, table)
tx, err := client.Begin()
if err != nil {
klog.Errorf("begin transaction failure: %s", err.Error())
return output.FLB_ERROR
}
// build statements
smt, err := tx.Prepare(sql)
if err != nil {
klog.Errorf("prepare statement failure: %s", err.Error())
return output.FLB_ERROR
}
for _, l := range buffer {
_, err = smt.Exec(l.Ts, l.Cluster, l.Namespace, l.App, l.Pod, l.Container, l.Host,
l.Log, l.Ts)
if err != nil {
klog.Errorf("statement exec failure: %s", err.Error())
return output.FLB_ERROR
}
}
// commit and record metrics
if err = tx.Commit(); err != nil {
klog.Errorf("commit failed failure: %s", err.Error())
return output.FLB_ERROR
}
buffer = make([]Log, 0)
return output.FLB_OK
}
//export FLBPluginExit
func FLBPluginExit() int {
return output.FLB_OK
}
func main() {
}
ClickHouse的数据结构
建表sql
这里的表结构可以根据需要修改,具体的入库操作在output插件中进行开发。
代码语言:javascript复制CREATE DATABASE IF NOT EXISTS scmp;
CREATE TABLE IF NOT EXISTS scmp.logs(
date Date DEFAULT toDate(0),
cluster String,namespace String,
app String,
pod_name String,
container_name String,
host String,log String,
ts DateTime)
ENGINE = MergeTree() PARTITION BY toYYYYMMDD(date)
ORDER BY (cluster, namespace, app, pod_name, container_name, host,ts);
表结构
img
部署对应K8s资源
挂载的配置文件:
代码语言:javascript复制apiVersion: v1
kind: ConfigMap
metadata:
name: k8s-log-agent-config
namespace: kube
labels:
k8s-app: k8s-log-agent
data:
# Configuration files: server, input, filters and output
# ======================================================
fluent-bit.conf: |
[SERVICE]
Flush 1
Log_Level error
Daemon off
Parsers_File parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
@INCLUDE input-kubernetes.conf
@INCLUDE filter-kubernetes.conf
@INCLUDE output-kubernetes.conf
input-kubernetes.conf: |
[INPUT]
Name tail
Tag kube.*
Path /var/log/containers/*.log
Parser docker
DB /var/log/flb_kube.db
Mem_Buf_Limit 5MB
Skip_Long_Lines On
Refresh_Interval 10
filter-kubernetes.conf: |
[FILTER]
Name kubernetes
Match *
Kube_URL https://kubernetes.default.svc.cluster.local:443
Merge_Log On
Annotations Off
Kube_Tag_Prefix kube.var.log.containers.
Merge_Log_Key log_processed
[FILTER]
Name modify
Match *
Set cluster ${CLUSTER_NAME}
output-kubernetes.conf: |
[OUTPUT]
Name clickhouse
Match *
parsers.conf: |
[PARSER]
Name apache
Format regex
Regex ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) [(?<time>[^]]*)] "(?<method>S )(?: (?<path>[^"]*?)(?: S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^"]*)" "(?<agent>[^"]*)")?$
Time_Key time
Time_Format %d/%b/%Y:%H:%M:%S %z
[PARSER]
Name apache2
Format regex
Regex ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) [(?<time>[^]]*)] "(?<method>S )(?: (?<path>[^ ]*) S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^"]*)" "(?<agent>[^"]*)")?$
Time_Key time
Time_Format %d/%b/%Y:%H:%M:%S %z
[PARSER]
Name apache_error
Format regex
Regex ^[[^ ]* (?<time>[^]]*)] [(?<level>[^]]*)](?: [pid (?<pid>[^]]*)])?( [client (?<client>[^]]*)])? (?<message>.*)$
[PARSER]
Name nginx
Format regex
Regex ^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) [(?<time>[^]]*)] "(?<method>S )(?: (?<path>[^"]*?)(?: S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^"]*)" "(?<agent>[^"]*)")?$
Time_Key time
Time_Format %d/%b/%Y:%H:%M:%S %z
[PARSER]
Name json
Format json
Time_Key time
Time_Format %d/%b/%Y:%H:%M:%S %z
[PARSER]
Name docker
Format json
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S.%L
Time_Keep On
[PARSER]
Name syslog
Format regex
Regex ^<(?<pri>[0-9] )>(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_/.-]*)(?:[(?<pid>[0-9] )])?(?:[^:]*:)? *(?<message>.*)$
Time_Key time
Time_Format %b %d %H:%M:%S
资源权限控制:
代码语言:javascript复制apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: k8s-log-agent-read
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: k8s-log-agent-read
subjects:
- kind: ServiceAccount
name: k8s-log-agent
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: k8s-log-agent-read
rules:
- apiGroups: [""]
resources:
- namespaces
- pods
verbs: ["get", "list", "watch"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
name:k8s-log-agent
namespace: kube
在各个node上启动容器:
代码语言:javascript复制apiVersion: apps/v1
kind: DaemonSet
metadata:
name: k8s-log-agent
namespace: kube
labels:
k8s-app: k8s-log-agent
version: v1
kubernetes.io/cluster-service: "true"
spec:
selector:
matchLabels:
k8s-app: k8s-log-agent
template:
metadata:
labels:
k8s-app: k8s-log-agent
version: v1
kubernetes.io/cluster-service: "true"
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "2020"
prometheus.io/path: /api/v1/metrics/prometheus
spec:
containers:
- name: fluent-bit
image: iyacontrol/fluent-bit-ck:1.2.2
imagePullPolicy: Always
ports:
- containerPort: 2020
resources:
limits:
cpu: 200m
memory: 200Mi
requests:
cpu: 200m
memory: 200Mi
env:
- name: CLUSTER_NAME
value: "clickhouse"
- name: CLICKHOUSE_HOST
value: "10.1.62.62:9150"
- name: CLICKHOUSE_USER
value: "oms"
- name: CLICKHOUSE_PASSWORD
value: "EBupt123"
- name: CLICKHOUSE_DATABASE
value: "scmp"
- name: CLICKHOUSE_TABLE
value: "logs"
- name: NODENAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: k8s-log-agent-config
mountPath: /fluent-bit/etc/
terminationGracePeriodSeconds: 10
volumes:
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: k8s-log-agent-config
configMap:
name: k8s-log-agent-config
serviceAccountName: k8s-log-agent
tolerations:
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
数据库存储结果
img
img
fluent简单性能测试
测试准备
FLB input插件配置:
代码语言:javascript复制Name Tail #使用tail模式
Buffer_Chunk_Size 32k #设置初始缓冲区大小以读取文件数据 默认为32
Buffer_Max_Siz 32k #设置每个被监控文件的缓冲区大小限制 默认为32
Mem_Buf_Limit 5MB #设置 Tail 插件在将数据附加到引擎时可以使用的内存限制。如果达到限制,则暂停;当数据被刷新时,它会恢复。 无默认值
Skip_Long_Lines On #当一行日志长度超过Buffer_Max_Size的设置时否停止监控该文件,设置为on为跳过长行并继续处理适合缓冲区大小的其他行 默认off为停止监控该文件
Refresh_Interval 10 #重新获取一次日志的时间间隔 默认60
Read_from_Head False #对于开始时新发现的文件(没有数据库偏移/位置),从文件的头部读取内容,而不是尾部 默认False
Rotate_Wait 5 #指定在刷新某些挂起数据时监控文件一次的额外时间 默认5
DB /var/log/flb_kube.db #指定数据库文件以记录受监控的文件和偏移量
DB.sync normal #设置默认同步 (I/O) 方法 默认normal
DB.locking false #指定数据库只能由 Fluent Bit 访问 默认false
DB.journal_mode WAL #设置数据库的日志模式 默认wal
exit_on_eof false #当读取文件时,它会在到达文件末尾时立即退出 默认false
FLB filter配置:
代码语言:javascript复制Name kubernetes #使用tail模式
Match * #匹配规则
Kube_URL https://kubernetes.default.svc.cluster.local:443 #k8s API 服务器端点
Merge_Log On #启用后,它会检查日志字段内容是否为 JSON 字符串映射,如果是,则将映射字段附加为日志结构的一部分。
Annotations On #在额外的元数据中包含 Kubernetes 资源注释 默认为on
Kube_Tag_Prefix kube.var.log.containers. #当源记录来自 Tail 输入插件时,此选项指定 Tail 配置中使用的前缀
Merge_Log_Key log_processed #启用 Merge_Log 后,过滤器会尝试假定传入消息中的日志字段是 JSON 字符串消息,并在映射中日志字段的同一级别对其进行结构化表示,设置key后从原始日志内容中提取的所有新结构化字段都将插入到新键下
FLB运行环境cpu和mem均未设置上限
测试开始
分别记录FLB空载、日志打印频率为5000条/秒、1000条/秒、500条/秒、200条/秒、50条/秒、1条/秒几个状态10分钟内cpu和内存的变化范围、记录各条件下开始记录日志到开始出现滞留的时间,出现滞留后一段时间的日志滞留数目。
测试直接使用htop命令进行
3分钟后滞留条数在10秒产生的日志数目内没有被记录
img
测试中发现了比较有意思的点是当日志打印频率在1000条/秒和500条/秒两个范围时,cpu的波动情况比较稳定,不会超过10个%,但是在其他状况,5000条/秒和100条/秒50条/秒几个状态下,cpu的波动范围很大,最高值都可以达到52,最低值可以达到35,这种特殊值出现的次数比1000条/秒和500条/秒出现的次数要多。
结合之前多次测试的情况,也出现了在1000条/秒时cpu的稳定区间要比500条/秒低的情况。