在头部互联网公司,一般会有成熟的建模开发流程和工具:
- 打开数据集市,选择要需要的报表,申请权限(如果没有现成的报表,甚至可以提需求给数仓团队进行开发)。
- 在数据计算平台上进行数据的计算、汇总、分析、建模,根据需要还可以自己写一些 UDF(UserDefined Function),拓展数据的计算方法。
- 在报表平台上实现数据的日级更新、展示。
- 利用邮件、通讯工具,对部分报表内容(如告警、业务指标等)进行自动推送。
其中,第2步是每天的主要工作,有时候你会是 Sql boy,有时候又变身 TF boy (TensorFlow)。其他步骤都是傻瓜式操作。
但是,在其他大部分公司,比如初创公司或者传统行业,你可能会倒在第一步——你负责的是一个新上线不久的业务,数据还处在日志阶段,或者运气稍微好点,有人帮你扔到了ES中......
虽然你精通前沿算法、复现过顶会,但此时拿不到现成数据,没有流程化的工具,没有可展示的面板,只能感叹心有余而力不足......
本系列将为你推荐一些边缘但很重要技能,边缘不是技能本身边缘,而是相对算法从业人员来说不是那么常用,但关键时候却非常重要。
本文介绍的是 ELK,掌握ELK,你可以快速把原始的日志数据整合起来。如果数据还在日志阶段,你可以自己搭建一个日志分析系统;如果数据已经存入ES(大部分公司至少能达到这个要求),你可以直接用Python从ES中拿数据进行分析,然后在Kibana上展示。
1. 什么是ELK
ELK 是 Elasticsearch、Logstash、Kibana的简称,ELK stack可以让你快速方便的搭建一个数据处理系统,如果你是一个单兵作战的算法同学,使用ELK快速启动工作,是个不错的选择。这三个工具的功能如下:
- Logstash:提供日志或者其他源数据的汇总、转换、发送。
- ElasticSearch:是一个高扩展的分布式全文检索引擎,近乎实时的存储、检索数据,所以也越来越被当作数据库来使用。
- Kibana:是一个分析和可视化的平台,对于存在ES(ElasticSearch)中的数据,可以用Kibana来进行搜索、查看、甚至一些分析计算。
看完这3个介绍,一个数据处理的方案就浮现在了脑海中:
Logstash读取日志数据,并进行加工汇总,发送到ES中进行存储,再用Kibana对数据进行加工、分析展示。需要深度分析的,可以用Python对ES的数据进行读取,分析,在Python中再进行深度分析。
下面就具体场景,来介绍一下分别如何使用ELK的工具。
2. logstash 做数据管道
(1)介绍
Logstash 在 ELK 中主要承担数据管道的角色,Logstash 包含语句包含三个部分:input 指定读取数据源,filter 中对数据进行处理加工,output 指定发送目的地。
(2)安装
无论是windows 还是Linux,官网(https://www.elastic.co/cn/downloads/logstash)下载后直接解压即可。然后在控制台进入 /bin 目录下进行命令操作。
(3)使用
在配置文件中,编写规则对 input-filter-output 进行操作,然后执行配置文件。配置文件的结构示例如下:
代码语言:javascript复制input {
数据源类型,以及对于的路径/端口等信息
}
filter {
数据处理规则,或者过滤规则
}
output {
数据输出目的地以及对应信息
}
在filter部分,我们可以用规则工具来对数据进行过滤、清洗等处理工作,常用的有:
- date:日期解析
- mute:字段拆分、重命名、删除、替换等
- grok:正则解析提取
(4)例子
比如我们有一个csv文件 login-log.csv 记录了某APP的登陆日志:
现在想通过 logstash 导入到 ES 中去,但是字段 loginTime 想拆分成2个:date(日期)和 time(时间),在/bin下面建立配置文件 logstash.conf,代码如下:
代码语言:javascript复制input {
file {
path=>["C:/shushuo/ELK-stack/logstash-7.5.0/bin/login-log.csv"]
start_position=>"beginning"
sincedb_path=>"NUL"
}
}
filter {
csv {
separator=>","
columns=>["ID","userId","loginTime"]
}
mutate{
split=>["loginTime"," "]
add_field=>{"date"=>"%{[loginTime][0]}"}
add_field=>{"time"=>"%{[loginTime][1]}"}
}
mutate{
remove_field=>["loginTime"]
}
}
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "login-log"
}
stdout{}
}
简单说明一下:
- input 中的 start_position=>"beginning" 表示从头处理;sincedb_path=>"NUL" 表示禁用 sincedb 机制。sincedb机制意味着运行时会记录 logstash 的处理进度进行记录,如果不禁用,下次启动时不会再从头处理。这是windows的禁用写法,如果是linux则是 sincedb_path=>"/dev/null"
- filter 的第一个 mute 拆分了 loginTime 字段,并将日期和时间分别赋给新增的字段 date 和 time;第二个mute 删除 loginTime 字段
- output 将数据输出到 ES 中
在控制台中执行配置文件:
代码语言:javascript复制bin> logstash -f logstash.conf
结果会打印到控制台上:
数据已经到 ES 上,下面该 ES 出场了。
3. ES 做数据存储
(1)介绍
是一个高扩展的分布式全文检索引擎,近乎实时的存储、检索数据,所以也越来越被当作数据库来使用。ES 可以存储数据,可以无缝在 Kibana 上进行可视化展示,也可以拉到 Python 中进行一些分析加工。
了解一下它的一些基本概念:
- Index(索引):索引可以简单理解为是数据的表名
- Cluster(集群):ES 可以运行在多台相互协作的服务器上,这些服务器集合叫集群
- Node(节点):集群中的每个服务器叫节点
- shard(分片):一份索引数据如果很大,在查询时可能无法足够快的响应,我们把这份数据分成很多数据小块,存储在集群的不同服务器中,这个数据小块就是分片。查询时,ES会把查询发送给每个相关的分片,并将结果进行汇总,大大提高速度。
- replica(副本):副本是分片的复制,主分片和副本不会出现在同一个节点上,当主分片丢失时,集群将副本提升为新的主分片。
(2)安装
无论是windows 还是Linux,官网(https://www.elastic.co/cn/downloads/elasticsearch)下载后直接解压即可。然后在控制台进入 /bin 目录下进行命令操作。启动 elasticsearch 集群:
代码语言:javascript复制bin> elasticsearch
如果是在本地解压,解压完在浏览器输入 http://localhost:9200/ 查看ES情况:
集群已经启动。
(3)例子
前面 logstash 部分,已经将数据输出到了 ES 中,当时设置的 index 名字是 login-log,打开 http://localhost:9200/_cat/indices?v 看一下目前 ES 中的index:
数据已经存进去了,至于数据长什么样,怎么查看,以及修改,我们在 Kibana 中介绍。
4. Kibana 做数据展示(变身 Sql Boy)
(1)介绍
Kibana 是为 Elasticsearch 设计的分析和可视化平台。
(2)安装
无论是windows 还是Linux,官网(https://www.elastic.co/cn/downloads/kibana)下载后直接解压即可。然后在控制台进入 /bin 目录下进行命令操作。启动 Kibana:
代码语言:javascript复制bin> kibana
输入 http://localhost:5601/ 打开 kibana:
之前 login-log 的数据已经导入到 ES 中去了,所以直接选择 explore on my own:
下面就进入了 Kibana,结合之前导入的 login-log 数据,看一下 Kibana 的基本展示功能。
(3)例子
- create index pattern
进入 Kibana 之后,要先进入 management - Kibana - Index patterns,把ES 里的数据 create 到 Kibana 上,按提示操作即可:
- discover
discover 中可以查看数据 case
- Visualize
Visualize 中可以做一些可视化图表
- Dashboard
Dashboard 将 Visualize 中的图表合成一个报表。不过比较丑,就不展示了。
- Canvas
Canvas 实际也是一个 Dashboard,不同的是可以自由的进行画图,像 PPT 一样,数据是通过类似 SQL 语句进行查询展示:
5. Python读取ES数据分析(变身 TF Boy)
通过 Python 的 elasticsearch 库,我们可以将 ES 里的数据拖到 Python 中进行修改、分析、建模等处理,处理好的数据可以再存入 ES 中。下面介绍一下基本操作语句:
- 读取 ES 中的 index 数据
es.search() 读取 ES 数据,注意每次最多只能读取1W条,另外通过body语句可以筛选读取。
代码语言:javascript复制from elasticsearch import Elasticsearch
import pandas as pd
#基本语句
rawData = es.search(index='login-log', size=10000)
#利用body条件筛选需要数据
rawData = es.search(index='login-in', size=10000, body={
"query": {
#查询条件
}
})
以 login-log 这个 index 为例拉取后的数据如下:
- 将读取的数据转化成 pandas 的 Dataframe
拉取的数据在 rawData['hits']['hits'] 中的_source字段里面,我们转化为 Dataframe:
代码语言:javascript复制hits_data=rawData['hits']['hits']
source_data = [eval(str(x['_source'])) for x in hits_data]
pandas_data = pd.DataFrame(source_data)
- 数据写入 ES
数据在 Python 中分析完之后,需要转成 json 格式,再写入 ES 中。我们写入一个新的 index new-index 中。
代码语言:javascript复制bulk_data = []
for i in out_data_json.split('n'):
bulk_data.append({"index": {
'_index': "new-index",
'_type': "doc",
}})
bulk_data.append(json.loads(i))
es.bulk(bulk_data)
es 中写入了新的index: