1.说明
在前面的分享《通过Python将监控数据由influxdb写入到MySQL》一文中,主要介绍了influxdb-->MySQL。InfluxDB主要存储的由telegraf收集的DB性能数据,此外还有资源、主从、集群等数据。而 Server Log、DB Log(Error Log 和 Slow Log)则是通过filebeat 和 Logstash收集、过滤保存到elasticsearch中。所以,有必要实现通过Python读取elasticsearch中的数据(写入到MySQL)的功能。
此处实现的功能是读取index中的host字段,将数值保存到MySQL中;换言之,通过Python查看那些机器已经部署了收集log的程序,并将查询出的server IP保存到MySQL数据库中。
2.在MySQL库存创建表host_dblog_collector
脚本如下
代码语言:javascript复制CREATE TABLE `host_dblog_collector` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`ip_address` varchar(255) NOT NULL DEFAULT '',
`datetime_created` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据行创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=30 DEFAULT CHARSET=utf8mb4;
3.用来收集的python代码
代码语言:javascript复制#coding:utf8
import os
import time
from os import walk
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import db_conn
mysqldb = db_conn.db
# use cursor
cursor = mysqldb.cursor()
###数据收集前,清除之前收集的数据
sql_delete = "delete from host_dblog_collector "
cursor.execute(sql_delete)
mysqldb.commit()
class ElasticObj:
def __init__(self, index_name,index_type,ip ="ES的Server IP"):
'''
:param index_name: 索引名称
:param index_type: 索引类型,默认为_doc
'''
self.index_name =index_name
self.index_type = index_type
# 无用户名密码状态
#self.es = Elasticsearch([ip])
#用户名密码状态
self.es = Elasticsearch([ip],http_auth=('ES的账号', 'ES的密码'),port=ES端口)
#### 获取已部署日志收集的server host
def get_deploymentlog_serverhost(self):
doc = {
"size": 0, ###此处的sieze为0,表示不取文档的数据,只取聚合结果数据
"aggs": {
"db_hosts": {
##"cardinality":{"field": "fields.db_host.keyword"} ## 这个是先top size 这个数据量的记录,再去distnct
"terms":{
"field": "fields.db_host.keyword",
"size": 1000 ##此处的size 可以理解为分组后取多少组数据
}
}
}
###"_source":"fields.db_host",
##"size": 1500 ###如果没有size的话,默认显示10行
}
_searched = self.es.search(index=self.index_name, doc_type=self.index_type, body=doc)
print(_searched)
for agg_bucket_hosts in _searched['aggregations']['db_hosts']['buckets']:
# print hit['_source']
# print (hit['_source']['fields']['db_host'])
server_ip = agg_bucket_hosts['key']
sql_insert = "insert into host_dblog_collector(ip_address) "
"values('%s')" %
(server_ip)
cursor.execute(sql_insert)
mysqldb.commit()
### 以mysql-开头的所有的index,索引的类型为_doc
obj =ElasticObj("mysql-*","_doc",ip ="ES服务器的IP")
obj.get_deploymentlog_serverhost()
补充说明:代码中引用了db_conn模块,相应的代码请在《通过Python将监控数据由influxdb写入到MySQL》一文中查看,在此不再赘述。