通过Python读取elasticsearch中的数据

2021-05-27 10:33:32 浏览数 (1)

1.说明

在前面的分享《通过Python将监控数据由influxdb写入到MySQL》一文中,主要介绍了influxdb-->MySQL。InfluxDB主要存储的由telegraf收集的DB性能数据,此外还有资源、主从、集群等数据。而 Server Log、DB Log(Error Log 和 Slow Log)则是通过filebeat 和 Logstash收集、过滤保存到elasticsearch中。所以,有必要实现通过Python读取elasticsearch中的数据(写入到MySQL)的功能。

此处实现的功能是读取index中的host字段,将数值保存到MySQL中;换言之,通过Python查看那些机器已经部署了收集log的程序,并将查询出的server IP保存到MySQL数据库中。 

2.在MySQL库存创建表host_dblog_collector

 脚本如下

代码语言:javascript复制
CREATE TABLE `host_dblog_collector` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `ip_address` varchar(255) NOT NULL DEFAULT '',
  `datetime_created` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据行创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=30 DEFAULT CHARSET=utf8mb4;

 3.用来收集的python代码

代码语言:javascript复制
#coding:utf8
import os
import time
from os import walk
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

import db_conn
mysqldb = db_conn.db
# use cursor
cursor = mysqldb.cursor()

###数据收集前,清除之前收集的数据
sql_delete = "delete from host_dblog_collector "
cursor.execute(sql_delete)
mysqldb.commit()

class ElasticObj:
    def __init__(self, index_name,index_type,ip ="ES的Server IP"):
        '''

        :param index_name: 索引名称
        :param index_type: 索引类型,默认为_doc
        '''
        self.index_name =index_name
        self.index_type = index_type
        # 无用户名密码状态
        #self.es = Elasticsearch([ip])
        #用户名密码状态
        self.es = Elasticsearch([ip],http_auth=('ES的账号', 'ES的密码'),port=ES端口)

    #### 获取已部署日志收集的server host
    def get_deploymentlog_serverhost(self):
        doc = {
              "size": 0,  ###此处的sieze为0,表示不取文档的数据,只取聚合结果数据
              "aggs": {
                    "db_hosts": {
                        ##"cardinality":{"field": "fields.db_host.keyword"} ## 这个是先top size 这个数据量的记录,再去distnct
                        "terms":{
                                  "field": "fields.db_host.keyword",
                                  "size": 1000   ##此处的size 可以理解为分组后取多少组数据
                                   }

                        }
                    }
              ###"_source":"fields.db_host",
              ##"size": 1500  ###如果没有size的话,默认显示10行
        }

        _searched = self.es.search(index=self.index_name, doc_type=self.index_type, body=doc)
        print(_searched)

        for agg_bucket_hosts in _searched['aggregations']['db_hosts']['buckets']:
            # print hit['_source']
            # print (hit['_source']['fields']['db_host'])
            server_ip = agg_bucket_hosts['key']
            sql_insert = "insert into host_dblog_collector(ip_address) " 
                   "values('%s')" % 
                   (server_ip)

            cursor.execute(sql_insert)
            mysqldb.commit()

### 以mysql-开头的所有的index,索引的类型为_doc
obj =ElasticObj("mysql-*","_doc",ip ="ES服务器的IP")

obj.get_deploymentlog_serverhost()

 补充说明:代码中引用了db_conn模块,相应的代码请在《通过Python将监控数据由influxdb写入到MySQL》一文中查看,在此不再赘述。

0 人点赞