[692]python操作Elasticsearch

2022-04-13 14:47:11 浏览数 (1)

文章目录
  • 介绍
  • 安装API
  • 建立es连接
    • 无用户名密码状态
    • 用户名密码状态
    • 使用ssl连接
    • 创建index索引
  • 数据检索功能
    • 滚动查询demo
    • Elasticsearch利用scroll查询获取所有数据
  • 数据查询功能
    • count
    • 查询所有数据
    • 切片式查询
    • range过滤器查询范围
    • 前缀查询
    • 通配符查询
    • 排序
    • filter_path
    • 度量类聚合
    • 时间范围
    • bool组合过滤器
    • term与terms过滤器
    • 正则查询
    • match与multi_match查询
    • ids
    • demo
    • 数据组装
    • 分页数据
      • scroll获取数据
      • 需要使用分页,先来看一下分页公式
  • 完整代码

介绍

ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。下面介绍了利用Python API接口进行数据查询,方便其他系统的调用。

安装API

代码语言:javascript复制
pip3 install elasticsearch

建立es连接

无用户名密码状态

代码语言:javascript复制
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host':'10.10.13.12','port':9200}])

默认的超时时间是10秒,如果数据量很大,时间设置更长一些。如果端口是9200,直接写IP即可。代码如下:

代码语言:javascript复制
es = Elasticsearch(['10.10.13.12'], timeout=3600)

用户名密码状态

如果Elasticsearch开启了验证,需要用户名和密码

代码语言:javascript复制
es =earch(['10.10.13.12'], http_auth=('xiao', '123456'), timeout=3600)

使用ssl连接

代码语言:javascript复制
from elasticsearch import Elasticsearch
from ssl import create_default_context

context = create_default_context(cafile="path/to/cafile.pem")
es = Elasticsearch("https://elasticsearch.url:port", ssl_context=context, http_auth=('elastic','yourpassword'))
es.info()

创建index索引

代码语言:javascript复制
#创建索引,索引的名字是my-index,如果已经存在了,就返回个400,
#这个索引可以现在创建,也可以在后面插入数据的时候再临时创建
es.indices.create(index='my-index',ignore=400)

数据检索功能

代码语言:javascript复制
es.search(index='my_index', 
    q='http_status_code:5* AND server_name:"web1"', 
    from_='124119')

常用参数

  • index - 索引名
  • q - 查询指定匹配 使用Lucene查询语法
  • from_ - 查询起始点 默认0
  • doc_type - 文档类型
  • size - 指定查询条数 默认10
  • field - 指定字段 逗号分隔
  • sort - 排序 字段:asc/desc
  • body - 使用Query DSL
  • scroll - 滚动查询

滚动查询demo

代码语言:javascript复制
# Initialize the scroll
page = es.search(
    index ='yourIndex',
    doc_type ='yourType',
    scroll ='2m',
    search_type ='scan',
    size =1000,
    body ={
    # Your query's body
})
 
sid = page['_scroll_id']
scroll_size = page['hits']['total']
 
# Start scrolling
while(scroll_size >0):
    print "Scrolling..."
    page = es.scroll(scroll_id = sid, scroll ='2m')
    # Update the scroll ID
    sid = page['_scroll_id']
    # Get the number of results that we returned in the last scroll
    scroll_size = len(page['hits']['hits'])
    print "scroll size: "  str(scroll_size)
    # Do something with the obtained page

以上demo实现了一次取若干数据,数据取完之后结束,不会获取到最新更新的数据。我们滚动完之后想获取最新数据怎么办?滚动的时候会有一个统计值,如total: 5。跳出循环之后,我们可以用_from参数定位到5开始滚动之后的数据。

但是我用的不是这个,用的是以下方法,链接如下:https://www.cnblogs.com/blue163/p/8126156.html

Elasticsearch利用scroll查询获取所有数据

Elasticsearch有两种分页方式,一种是通过from和size条件来实现,但是该方法开销比较大,另一种是利用scroll来实现,通过scroll来实现分页获取所有的数据,下面是利用python实现的scroll获取全部数据的方式:

代码语言:javascript复制
from elasticsearch import Elasticsearch


if __name__ == "__main__":
    es=Elasticsearch([{"host":"10.120.241.194","port":"9200"}])
    query_json={
        "match": {
            "type":3
        }
    }
    queryData = es.search(index='index', scroll='5m', timeout='3s', size=100, body={"query":query_json})

    mdata = queryData.get("hits").get("hits")
    if not mdata:
        print('empty!')

    scroll_id = queryData["_scroll_id"]
    total = queryData["hits"]["total"]
    for i in range(total/100):
        res = es.scroll(scroll_id=scroll_id, scroll='5m') #scroll参数必须指定否则会报错
        mdata  = res["hits"]["hits"]
        print(res)
    print(mdata)

通过上面的方法就可以获取es中符合条件的所有记录了。

如果出现{u’index’: None, u’reason’: {u’reason’: u’No search context found for id [303859]’, u’type’: u’search_context_missing_exception’}这样的问题,是因为代码第19行没有加上scroll参数

数据查询功能

count

语法同search大致一样,但只输出统计值

代码语言:javascript复制
es.count(index='my_index', q='http_status_code:500')

输出:

代码语言:javascript复制
{'_shards':{'failed':0, 'successful':5, 'total':5}, 'count':17042}

17042 就是统计值!

查询所有数据

代码语言:javascript复制
# 搜索所有数据
es.search(index="my_index",doc_type="test_type")
 
# 或者
body = {
    "query":{
        "match_all":{}
    }
}
es.search(index="my_index",doc_type="test_type",body=body)

切片式查询

代码语言:javascript复制
body = {
    "query":{
        "match_all":{}
    }
    "from":2    # 从第二条数据开始
    "size":4    # 获取4条数据
}
# 从第2条数据开始,获取4条数据
es.search(index="my_index",doc_type="test_type",body=body)

range过滤器查询范围

代码语言:javascript复制
gt: > 大于
lt: < 小于
gte: >= 大于或等于
lte: <= 小于或等于

示例

代码语言:javascript复制
body = {
    "query":{
        "range":{
            "age":{
                "gte":18,       # >=18
                "lte":30        # <=30
            }
        }
    }
}
# 查询18<=age<=30的所有数据
es.search(index="my_index",doc_type="test_type",body=body)

前缀查询

代码语言:javascript复制
body = {
    "query":{
        "prefix":{
            "name":"p"
        }
    }
}
# 查询前缀为"赵"的所有数据
es.search(index="my_index",doc_type="test_type",body=body)

通配符查询

代码语言:javascript复制
body = {
    "query":{
        "wildcard":{
            "name":"*id"
        }
    }
}
# 查询name以id为后缀的所有数据
es.search(index="my_index",doc_type="test_type",body=body)

排序

代码语言:javascript复制
body = {
    "query":{
        "match_all":{}
    }
    "sort":{
        "age":{                 # 根据age字段升序排序
            "order":"asc"       # asc升序,desc降序
        }
    }
}

filter_path

响应过滤

代码语言:javascript复制
# 只需要获取_id数据,多个条件用逗号隔开
es.search(index="my_index",doc_type="test_type",filter_path=["hits.hits._id"])

# 获取所有数据
es.search(index="my_index",doc_type="test_type",filter_path=["hits.hits._*"])

度量类聚合

  • 获取最小值
代码语言:javascript复制
body = {
    "query":{
        "match_all":{}
    },
    "aggs":{                        # 聚合查询
        "min_age":{                 # 最小值的key
            "min":{                 # 最小
                "field":"age"       # 查询"age"的最小值
            }
        }
    }
}
# 搜索所有数据,并获取age最小的值
es.search(index="my_index",doc_type="test_type",body=body)
  • 获取最大值
代码语言:javascript复制
body = {
    "query":{
        "match_all":{}
    },
    "aggs":{                        # 聚合查询
        "max_age":{                 # 最大值的key
            "max":{                 # 最大
                "field":"age"       # 查询"age"的最大值
            }
        }
    }
}
# 搜索所有数据,并获取age最大的值
es.search(index="my_index",doc_type="test_type",body=body)
  • 获取和
代码语言:javascript复制
body = {
    "query":{
        "match_all":{}
    },
    "aggs":{                        # 聚合查询
        "sum_age":{                 # 和的key
            "sum":{                 # 和
                "field":"age"       # 获取所有age的和
            }
        }
    }
}
# 搜索所有数据,并获取所有age的和
es.search(index="my_index",doc_type="test_type",body=body)
  • 获取平均值
代码语言:javascript复制
body = {
    "query":{
        "match_all":{}
    },
    "aggs":{                        # 聚合查询
        "avg_age":{                 # 平均值的key
            "sum":{                 # 平均值
                "field":"age"       # 获取所有age的平均值
            }
        }
    }
}
# 搜索所有数据,获取所有age的平均值
es.search(index="my_index",doc_type="test_type",body=body)

时间范围

  • 最近时间段

比如我要查询最近1分钟的

代码语言:javascript复制
"range": {
    '@timestamp': {'gt': 'now-1m'}
}
  • 最新1小时
代码语言:javascript复制
"range": {
    '@timestamp': {'gt': 'now-1h'}
}
  • 最新1天的
代码语言:javascript复制
"range": {
    '@timestamp': {'gt': 'now-1d'}
}
  • 指定时间段

那么问题来了,它是根据当前时间来计算最近的时间。但是有些情况下,我需要制定时间范围,精确到分钟

假设需要查询早上8点到9点的数据,可以这样

代码语言:javascript复制
"range": {
    '@timestamp': {
        "gt" : "{}T{}:00:00".format("2018-12-17","08"),
        "lt": "{}T{}:59:59".format("2018-12-17","09"),
        "time_zone": "Asia/Shanghai"
    }
}

注意:日期和小时之间,有一个字母T来间隔。不能用空格!

time_zone 表示时区,如果默认的时区不对,可能会影响查询结果!

bool组合过滤器

must:所有分句都必须匹配,与 AND 相同。 must_not:所有分句都必须不匹配,与 NOT 相同。 should:至少有一个分句匹配,与 OR 相同。

示例代码

代码语言:javascript复制
{
    "bool":{
      "must":[],
      "should":[],
      "must_not":[],
    }
}

bool有3类查询关系,must(都满足),should(其中一个满足),must_not(都不满足)

代码语言:javascript复制
body = {
    "query":{
        "bool":{
            "must":[
                {
                    "term":{
                        "name":"python"
                    }
                },
                {
                    "term":{
                        "age":18
                    }
                }
            ]
        }
    }
}
# 获取name="python"并且age=18的所有数据
es.search(index="my_index",doc_type="test_type",body=body)

term与terms过滤器

  • term单过滤
代码语言:javascript复制
body = {
    "query":{
        "term":{
            "name":"python"
        }
    }
}
# 查询name="python"的所有数据
es.search(index="my_index",doc_type="test_type",body=body)
  • terms复数版本

允许多个匹配条件

代码语言:javascript复制
body = {
    "query":{
        "terms":{
            "name":[
                "python","android"
            ]
        }
    }
}
# 搜索出name="python"或name="android"的所有数据
es.search(index="my_index",doc_type="test_type",body=body)
  • 结合bool term来举一个实际的例子:

查询path字段中包含applogs最近1分钟的记录

代码语言:javascript复制
"bool": {
    "must": [
        {
            "terms": {
                "path": [
                    "applogs",
                ]
            }
        },
        {
            "range": {
                '@timestamp': {'gt': 'now-1m'}
            }
        }
    ]
}

这里使用了terms复数版本,可以随时添加多个条件!

正则查询

代码语言:javascript复制
{
    "regexp": {
        "http_status_code": "5.*"
    }
}

match与multi_match查询

代码语言:javascript复制
# match:匹配name包含python关键字的数据
body = {
    "query":{
        "match":{
            "name":"python"
        }
    }
}
# 查询name包含python关键字的数据
es.search(index="my_index",doc_type="test_type",body=body)

# multi_match:在name和addr里匹配包含深圳关键字的数据
body = {
    "query":{
        "multi_match":{
            "query":"深圳",
            "fields":["name","addr"]
        }
    }
}
# 查询name和addr包含"深圳"关键字的数据
es.search(index="my_index",doc_type="test_type",body=body)

ids

代码语言:javascript复制
body = {
    "query":{
        "ids":{
            "type":"test_type",
            "values":[
                "1","2"
            ]
        }
    }
}
# 搜索出id为1或2d的所有数据
es.search(index="my_index",doc_type="test_type",body=body)

demo

获取最近一小时的数据

代码语言:javascript复制
{'query':
    {'filtered':
        {'filter':
            {'range':
                {'@timestamp':{'gt':'now-1h'}}
            }
        }
    }
}

条件过滤查询

代码语言:javascript复制
{
    "query":{
        "filtered":{
            "query":{"match":{"http_status_code":500}},
            "filter":{"term":{"server_name":"vip03"}}
        }
    }
}

Terms Facet 单字段统计

代码语言:javascript复制
{'facets':
    {'stat':
        {'terms':
            {'field':'http_status_code',
              'order':'count',
        'size':50}
        }
    },
    'size':0
}

一次统计多个字段

代码语言:javascript复制
{'facets':
    {'cip':
        {'terms':
            {'fields':['client_ip']}},
              'status_facets':{'terms':{'fields':['http_status_code'],
              'order':'term',
              'size':50}}},
        'query':{'query_string':{'query':'*'}},
    'size':0
}

多个字段一起统计

代码语言:javascript复制
{'facets':
    {'tag':
        {'terms':
            {'fields':['http_status_code','client_ip'],
              'size':10
           }
        }
    },
    'query':
        {'match_all':{}},
    'size':0
}

数据组装

以下是kibana首页的demo,用来统计一段时间内的日志数量

代码语言:javascript复制
{
  "facets": {
    "0": {
      "date_histogram": {
        "field": "@timestamp",
        "interval": "5m"
      },
      "facet_filter": {
        "fquery": {
          "query": {
            "filtered": {
              "query": {
                "query_string": {
                  "query": "*"
                }
              },
              "filter": {
                "bool": {
                  "must": [
                    {
                      "range": {
                        "@timestamp": {
                          'gt': 'now-1h'
                        }
                      }
                    },
                    {
                      "exists": {
                        "field": "http_status_code.raw"
                      }
                    },
                    # --------------- -------
                    # 此处加匹配条件
                  ]
                }
              }
            }
          }
        }
      }
    }
  },
  "size": 0
}

如果想添加匹配条件,在以上代码标识部分加上过滤条件,按照以下代码格式即可

代码语言:javascript复制
{
"query": {
    "query_string": {"query": "backend_name:baidu.com"}
    }
},

分页数据

scroll获取数据
代码语言:javascript复制
size = 1000  # 指定返回1000条
queryData = self.es.search(index=self.index_name, body=doc, size=size, scroll='1m', )

mdata = queryData.get("hits").get("hits")  # 返回数据,它是一个列表类型

参数解释:

  • size 指定返回的条数,默认返回10条
  • index 指定索引名
  • body 查询语句
  • scroll 告诉 Elasticsearch 把搜索上下文再保持一分钟。1m表示1分钟

queryData 返回一个字典,那么真正的查询结果在queryData[‘hits’][‘hits’]中,如果这个值没有,表示没有查询到数据!

注意:它并不是返回所有的结果,而是一页的数据,是一个列表类型。因为我们使用了scroll获取数据,只返回一页!

需要使用分页,先来看一下分页公式
代码语言:javascript复制
divmod(总条数, 每页大小)

注意:divmod返回一个元祖,第一个元素,就是要分页数

总条数,使用

代码语言:javascript复制
total = queryData['hits']['total']  # 返回数据的总条数

每页大小,就是上面指定的size

代码语言:javascript复制
size = 1000  # 指定返回1000条

那么遍历每一页数据,需要这样

代码语言:javascript复制
scroll_id = queryData['_scroll_id']  # 获取scrollID
total = queryData['hits']['total']  # 返回数据的总条数

# 使用divmod设置分页查询
# divmod(total,1000)[0] 1 表示总条数除以1000,结果取整数加1
for i in range(divmod(total, size)[0]   1):
    res = self.es.scroll(scroll_id=scroll_id, scroll='1m')  # scroll参数必须指定否则会报错
    mdata  = res["hits"]["hits"]  # 扩展列表

scroll_id给es.scroll获取数据使用,这个参数必须要有。

由于Python中的range是顾头不顾尾,所以需要加1。使用for循环,就可以遍历每一个分页数

es.scroll(scroll_id=scroll_id, scroll=‘1m’) 才是真正查询每一页的数据,必须要指定这2个参数。它的返回结果,就是查询结果!返回一个列表

上面的mdata是一个列表,res也是列表。因此使用 =就可以扩展列表,得到所有数据!

完整代码

代码语言:javascript复制
# -*- coding:utf-8 -*-
import datetime
from elasticsearch import Elasticsearch, RequestsHttpConnection
from elasticsearch.helpers import bulk


class ElasticObj(object):

    def __init__(self, host='127.0.0.1',es_user=None,es_passwd=None,index_name='test'):
        if es_user:
            self.es = Elasticsearch(
                [{'host': host, 'port': 9200}],
                http_auth=(es_user, es_passwd), timeout=3600)
        else:
            self.es = Elasticsearch([host], port=9200, timeout=3600)
        # ssl连接
        # from ssl import create_default_context
        # context = create_default_context(cafile="path/to/cafile.pem")
        # es = Elasticsearch("https://elasticsearch.url:port", ssl_context=context, http_auth=('elastic', 'yourpassword'))
        # 打印出连接信息
        print(self.es.info())
        self.index_name = index_name

    # 建立ES索引
    def create_index(self, index_name='test'):
        """
        创建索引, 创建索引名字
        :param ex: Elasticsearch对象
        :return:
        """
        # 索引 相当于数据库中的 库名
        _index_mappings = {
            "settings": {
                "number_of_shards": "2",
                "number_of_replicas": "0"
            },
            "mappings": {
                "properties": {
                    "title":{"type":"text","index": True,},
                    "date": {
                        "type": "date",
                        "index": True,
                        "format": "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis"
                    },
                    "keyword":{"type":"text","index": True,},
                    "source": {"type":"text","index": True,},
                    "link": {"type":"text","index": True,},
                }
            }
        }
        if self.es.indices.exists(index=index_name) is not True:
            res = self.es.indices.create(index=index_name, body=_index_mappings,ignore=400)
            print(res)

    # es插入数据
    def bulk_index_data(self):
        '''
        用bulk将批量数据存储到es
        :return:
        '''
        list = [
            {"date": "2017/09/13",
             "source": "慧聪网",
             "link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml",
             "keyword": "电视",
             "title": "付费 电视 行业面临的转型和挑战"
             },
            {"date": "2017/09/13",
             "source": "中国文明网",
             "link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml",
             "keyword": "电视",
             "title": "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心"
             },
            {"date": "2017/09/13",
             "source": "人民电视",
             "link": "http://tv.people.com.cn/BIG5/n1/2017/0913/c67816-29533981.html",
             "keyword": "电视",
             "title": "中国第21批赴刚果(金)维和部隊启程--人民 电视 --人民网"
             },
            {"date": "2017/09/13",
             "source": "站长之家",
             "link": "http://www.chinaz.com/news/2017/0913/804263.shtml",
             "keyword": "电视",
             "title": "电视 盒子 哪个牌子好? 吐血奉献三大选购秘笈"
             }
        ]
        ACTIONS = []
        i = 1
        for line in list:
            action = {
                "_index": self.index_name,
                # "_type": self.index_type,
                "_id": i,  # _id 也可以默认生成,不赋值
                "_source": {
                    "date": line['date'],
                    "source": line['source'],
                    "link": line['link'],
                    "keyword": line['keyword'],
                    "title": line['title'],
                }
            }
            i  = 1
            print(action)
            ACTIONS.append(action)
        # 批量处理
        success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)
        print('Performed %d actions' % success)
        # 单条插入
        self.es.index(index=self.index_name, id=42, body={"any": "data", "timestamp": datetime.datetime.now()})

    # es查找数据
    def get_data(self,id=None,body={}):
        # match:匹配keyword包含"电视"关键字的数据
        body = {
            "query": {
                "match": {
                    "keyword": "电视",
                }
            }
        }
        # multi_match:在name和addr里匹配包含深圳关键字的数据
        body_2 = {
            "query": {
                "multi_match": {
                    "query": "深圳",
                    "fields": ["name", "addr"]
                }
            }
        }
        # match_all 匹配所有数据
        body_3 = {
            "query": {
                "match_all": {}
            }
        }
        if id:# 根据id单条查询
            res = self.es.get(index=self.index_name, id=id)
            print(res)
            print(res['_source'])
        print('_'*100)
        if body:# 条件查询
            _searched = self.es.search(index=self.index_name, body=body)
            for hit in _searched['hits']['hits']:
                print(hit['_source'])
            # 统计条数
            count = self.es.count(index=self.index_name, body=body)
            print(count)

        # 查找所有的数据
        search_all=self.es.search(index=self.index_name)
        print('search_all 1',search_all)
        # 或者
        search_all =self.es.search(index=self.index_name,body=body_3)
        print('search_all 2', search_all)

    # es更新数据
    def update_data(self, id=None, body={}):
        body = {
            "query": {
                "match": {
                    "keyword": "电视",
                }
            }
        }
        if id:  # 根据id单条更新
            res = self.es.update(index=self.index_name, id=id)
            print(res)
        if body:  # 条件更新
            res = self.es.update_by_query(index=self.index_name, body=body)
            print(res)

    # es删除数据
    def delete_data(self, id=None,body={}):
        if id:# es删除数据(根据id单条删除)
            res = self.es.delete(index=self.index_name, id=id)
            print(res)
        if body:# es删除数据(条件删除)
            res = self.es.delete_by_query(index=self.index_name, body=body)
            print(res)


if __name__ == '__main__':
    try:
        host, es_user, es_passwd = 'localhost', 'elastic', '123456'
        obj=ElasticObj(host=host,es_user=es_user,es_passwd=es_passwd,index_name='test')
        # obj.create_index()
        # obj.bulk_index_data()
        obj.get_data(id=1)
        # obj.delete_data(id=1)
    except Exception as e:
        import traceback
        ex_msg = '{exception}'.format(exception=traceback.format_exc())
        print(ex_msg)

pypi:https://pypi.org/project/elasticsearch/ ElasticSearch官方文档:https://elasticsearch-py.readthedocs.io/en/master/ 搜索用法:https://elasticsearch-py.readthedocs.io/en/master/api.html

0 人点赞