文章目录- 介绍
- 安装API
- 建立es连接
- 无用户名密码状态
- 用户名密码状态
- 使用ssl连接
- 创建index索引
- 数据检索功能
- 滚动查询demo
- Elasticsearch利用scroll查询获取所有数据
- 数据查询功能
- count
- 查询所有数据
- 切片式查询
- range过滤器查询范围
- 前缀查询
- 通配符查询
- 排序
- filter_path
- 度量类聚合
- 时间范围
- bool组合过滤器
- term与terms过滤器
- 正则查询
- match与multi_match查询
- ids
- demo
- 数据组装
- 分页数据
- scroll获取数据
- 需要使用分页,先来看一下分页公式
- 完整代码
- 无用户名密码状态
- 用户名密码状态
- 使用ssl连接
- 创建index索引
- 滚动查询demo
- Elasticsearch利用scroll查询获取所有数据
- count
- 查询所有数据
- 切片式查询
- range过滤器查询范围
- 前缀查询
- 通配符查询
- 排序
- filter_path
- 度量类聚合
- 时间范围
- bool组合过滤器
- term与terms过滤器
- 正则查询
- match与multi_match查询
- ids
- demo
- 数据组装
- 分页数据
- scroll获取数据
- 需要使用分页,先来看一下分页公式
介绍
ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。下面介绍了利用Python API接口进行数据查询,方便其他系统的调用。
安装API
代码语言:javascript复制pip3 install elasticsearch
建立es连接
无用户名密码状态
代码语言:javascript复制from elasticsearch import Elasticsearch
es = Elasticsearch([{'host':'10.10.13.12','port':9200}])
默认的超时时间是10秒,如果数据量很大,时间设置更长一些。如果端口是9200,直接写IP即可。代码如下:
代码语言:javascript复制es = Elasticsearch(['10.10.13.12'], timeout=3600)
用户名密码状态
如果Elasticsearch开启了验证,需要用户名和密码
代码语言:javascript复制es =earch(['10.10.13.12'], http_auth=('xiao', '123456'), timeout=3600)
使用ssl连接
代码语言:javascript复制from elasticsearch import Elasticsearch
from ssl import create_default_context
context = create_default_context(cafile="path/to/cafile.pem")
es = Elasticsearch("https://elasticsearch.url:port", ssl_context=context, http_auth=('elastic','yourpassword'))
es.info()
创建index索引
代码语言:javascript复制#创建索引,索引的名字是my-index,如果已经存在了,就返回个400,
#这个索引可以现在创建,也可以在后面插入数据的时候再临时创建
es.indices.create(index='my-index',ignore=400)
数据检索功能
代码语言:javascript复制es.search(index='my_index',
q='http_status_code:5* AND server_name:"web1"',
from_='124119')
常用参数
- index - 索引名
- q - 查询指定匹配 使用Lucene查询语法
- from_ - 查询起始点 默认0
- doc_type - 文档类型
- size - 指定查询条数 默认10
- field - 指定字段 逗号分隔
- sort - 排序 字段:asc/desc
- body - 使用Query DSL
- scroll - 滚动查询
滚动查询demo
代码语言:javascript复制# Initialize the scroll
page = es.search(
index ='yourIndex',
doc_type ='yourType',
scroll ='2m',
search_type ='scan',
size =1000,
body ={
# Your query's body
})
sid = page['_scroll_id']
scroll_size = page['hits']['total']
# Start scrolling
while(scroll_size >0):
print "Scrolling..."
page = es.scroll(scroll_id = sid, scroll ='2m')
# Update the scroll ID
sid = page['_scroll_id']
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
print "scroll size: " str(scroll_size)
# Do something with the obtained page
以上demo实现了一次取若干数据,数据取完之后结束,不会获取到最新更新的数据。我们滚动完之后想获取最新数据怎么办?滚动的时候会有一个统计值,如total: 5。跳出循环之后,我们可以用_from参数定位到5开始滚动之后的数据。
但是我用的不是这个,用的是以下方法,链接如下:https://www.cnblogs.com/blue163/p/8126156.html
Elasticsearch利用scroll查询获取所有数据
Elasticsearch有两种分页方式,一种是通过from和size条件来实现,但是该方法开销比较大,另一种是利用scroll来实现,通过scroll来实现分页获取所有的数据,下面是利用python实现的scroll获取全部数据的方式:
代码语言:javascript复制from elasticsearch import Elasticsearch
if __name__ == "__main__":
es=Elasticsearch([{"host":"10.120.241.194","port":"9200"}])
query_json={
"match": {
"type":3
}
}
queryData = es.search(index='index', scroll='5m', timeout='3s', size=100, body={"query":query_json})
mdata = queryData.get("hits").get("hits")
if not mdata:
print('empty!')
scroll_id = queryData["_scroll_id"]
total = queryData["hits"]["total"]
for i in range(total/100):
res = es.scroll(scroll_id=scroll_id, scroll='5m') #scroll参数必须指定否则会报错
mdata = res["hits"]["hits"]
print(res)
print(mdata)
通过上面的方法就可以获取es中符合条件的所有记录了。
如果出现{u’index’: None, u’reason’: {u’reason’: u’No search context found for id [303859]’, u’type’: u’search_context_missing_exception’}这样的问题,是因为代码第19行没有加上scroll参数
数据查询功能
count
语法同search大致一样,但只输出统计值
代码语言:javascript复制es.count(index='my_index', q='http_status_code:500')
输出:
代码语言:javascript复制{'_shards':{'failed':0, 'successful':5, 'total':5}, 'count':17042}
17042 就是统计值!
查询所有数据
代码语言:javascript复制# 搜索所有数据
es.search(index="my_index",doc_type="test_type")
# 或者
body = {
"query":{
"match_all":{}
}
}
es.search(index="my_index",doc_type="test_type",body=body)
切片式查询
代码语言:javascript复制body = {
"query":{
"match_all":{}
}
"from":2 # 从第二条数据开始
"size":4 # 获取4条数据
}
# 从第2条数据开始,获取4条数据
es.search(index="my_index",doc_type="test_type",body=body)
range过滤器查询范围
代码语言:javascript复制gt: > 大于
lt: < 小于
gte: >= 大于或等于
lte: <= 小于或等于
示例
代码语言:javascript复制body = {
"query":{
"range":{
"age":{
"gte":18, # >=18
"lte":30 # <=30
}
}
}
}
# 查询18<=age<=30的所有数据
es.search(index="my_index",doc_type="test_type",body=body)
前缀查询
代码语言:javascript复制body = {
"query":{
"prefix":{
"name":"p"
}
}
}
# 查询前缀为"赵"的所有数据
es.search(index="my_index",doc_type="test_type",body=body)
通配符查询
代码语言:javascript复制body = {
"query":{
"wildcard":{
"name":"*id"
}
}
}
# 查询name以id为后缀的所有数据
es.search(index="my_index",doc_type="test_type",body=body)
排序
代码语言:javascript复制body = {
"query":{
"match_all":{}
}
"sort":{
"age":{ # 根据age字段升序排序
"order":"asc" # asc升序,desc降序
}
}
}
filter_path
响应过滤
代码语言:javascript复制# 只需要获取_id数据,多个条件用逗号隔开
es.search(index="my_index",doc_type="test_type",filter_path=["hits.hits._id"])
# 获取所有数据
es.search(index="my_index",doc_type="test_type",filter_path=["hits.hits._*"])
度量类聚合
- 获取最小值
body = {
"query":{
"match_all":{}
},
"aggs":{ # 聚合查询
"min_age":{ # 最小值的key
"min":{ # 最小
"field":"age" # 查询"age"的最小值
}
}
}
}
# 搜索所有数据,并获取age最小的值
es.search(index="my_index",doc_type="test_type",body=body)
- 获取最大值
body = {
"query":{
"match_all":{}
},
"aggs":{ # 聚合查询
"max_age":{ # 最大值的key
"max":{ # 最大
"field":"age" # 查询"age"的最大值
}
}
}
}
# 搜索所有数据,并获取age最大的值
es.search(index="my_index",doc_type="test_type",body=body)
- 获取和
body = {
"query":{
"match_all":{}
},
"aggs":{ # 聚合查询
"sum_age":{ # 和的key
"sum":{ # 和
"field":"age" # 获取所有age的和
}
}
}
}
# 搜索所有数据,并获取所有age的和
es.search(index="my_index",doc_type="test_type",body=body)
- 获取平均值
body = {
"query":{
"match_all":{}
},
"aggs":{ # 聚合查询
"avg_age":{ # 平均值的key
"sum":{ # 平均值
"field":"age" # 获取所有age的平均值
}
}
}
}
# 搜索所有数据,获取所有age的平均值
es.search(index="my_index",doc_type="test_type",body=body)
时间范围
- 最近时间段
比如我要查询最近1分钟的
代码语言:javascript复制"range": {
'@timestamp': {'gt': 'now-1m'}
}
- 最新1小时
"range": {
'@timestamp': {'gt': 'now-1h'}
}
- 最新1天的
"range": {
'@timestamp': {'gt': 'now-1d'}
}
- 指定时间段
那么问题来了,它是根据当前时间来计算最近的时间。但是有些情况下,我需要制定时间范围,精确到分钟
假设需要查询早上8点到9点的数据,可以这样
代码语言:javascript复制"range": {
'@timestamp': {
"gt" : "{}T{}:00:00".format("2018-12-17","08"),
"lt": "{}T{}:59:59".format("2018-12-17","09"),
"time_zone": "Asia/Shanghai"
}
}
注意:日期和小时之间,有一个字母T来间隔。不能用空格!
time_zone 表示时区,如果默认的时区不对,可能会影响查询结果!
bool组合过滤器
must:所有分句都必须匹配,与 AND 相同。 must_not:所有分句都必须不匹配,与 NOT 相同。 should:至少有一个分句匹配,与 OR 相同。
示例代码
代码语言:javascript复制{
"bool":{
"must":[],
"should":[],
"must_not":[],
}
}
bool有3类查询关系,must(都满足),should(其中一个满足),must_not(都不满足)
代码语言:javascript复制body = {
"query":{
"bool":{
"must":[
{
"term":{
"name":"python"
}
},
{
"term":{
"age":18
}
}
]
}
}
}
# 获取name="python"并且age=18的所有数据
es.search(index="my_index",doc_type="test_type",body=body)
term与terms过滤器
- term单过滤
body = {
"query":{
"term":{
"name":"python"
}
}
}
# 查询name="python"的所有数据
es.search(index="my_index",doc_type="test_type",body=body)
- terms复数版本
允许多个匹配条件
代码语言:javascript复制body = {
"query":{
"terms":{
"name":[
"python","android"
]
}
}
}
# 搜索出name="python"或name="android"的所有数据
es.search(index="my_index",doc_type="test_type",body=body)
- 结合bool term来举一个实际的例子:
查询path字段中包含applogs最近1分钟的记录
代码语言:javascript复制"bool": {
"must": [
{
"terms": {
"path": [
"applogs",
]
}
},
{
"range": {
'@timestamp': {'gt': 'now-1m'}
}
}
]
}
这里使用了terms复数版本,可以随时添加多个条件!
正则查询
代码语言:javascript复制{
"regexp": {
"http_status_code": "5.*"
}
}
match与multi_match查询
代码语言:javascript复制# match:匹配name包含python关键字的数据
body = {
"query":{
"match":{
"name":"python"
}
}
}
# 查询name包含python关键字的数据
es.search(index="my_index",doc_type="test_type",body=body)
# multi_match:在name和addr里匹配包含深圳关键字的数据
body = {
"query":{
"multi_match":{
"query":"深圳",
"fields":["name","addr"]
}
}
}
# 查询name和addr包含"深圳"关键字的数据
es.search(index="my_index",doc_type="test_type",body=body)
ids
代码语言:javascript复制body = {
"query":{
"ids":{
"type":"test_type",
"values":[
"1","2"
]
}
}
}
# 搜索出id为1或2d的所有数据
es.search(index="my_index",doc_type="test_type",body=body)
demo
获取最近一小时的数据
代码语言:javascript复制{'query':
{'filtered':
{'filter':
{'range':
{'@timestamp':{'gt':'now-1h'}}
}
}
}
}
条件过滤查询
代码语言:javascript复制{
"query":{
"filtered":{
"query":{"match":{"http_status_code":500}},
"filter":{"term":{"server_name":"vip03"}}
}
}
}
Terms Facet 单字段统计
代码语言:javascript复制{'facets':
{'stat':
{'terms':
{'field':'http_status_code',
'order':'count',
'size':50}
}
},
'size':0
}
一次统计多个字段
代码语言:javascript复制{'facets':
{'cip':
{'terms':
{'fields':['client_ip']}},
'status_facets':{'terms':{'fields':['http_status_code'],
'order':'term',
'size':50}}},
'query':{'query_string':{'query':'*'}},
'size':0
}
多个字段一起统计
代码语言:javascript复制{'facets':
{'tag':
{'terms':
{'fields':['http_status_code','client_ip'],
'size':10
}
}
},
'query':
{'match_all':{}},
'size':0
}
数据组装
以下是kibana首页的demo,用来统计一段时间内的日志数量
代码语言:javascript复制{
"facets": {
"0": {
"date_histogram": {
"field": "@timestamp",
"interval": "5m"
},
"facet_filter": {
"fquery": {
"query": {
"filtered": {
"query": {
"query_string": {
"query": "*"
}
},
"filter": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
'gt': 'now-1h'
}
}
},
{
"exists": {
"field": "http_status_code.raw"
}
},
# --------------- -------
# 此处加匹配条件
]
}
}
}
}
}
}
}
},
"size": 0
}
如果想添加匹配条件,在以上代码标识部分加上过滤条件,按照以下代码格式即可
代码语言:javascript复制{
"query": {
"query_string": {"query": "backend_name:baidu.com"}
}
},
分页数据
scroll获取数据
代码语言:javascript复制size = 1000 # 指定返回1000条
queryData = self.es.search(index=self.index_name, body=doc, size=size, scroll='1m', )
mdata = queryData.get("hits").get("hits") # 返回数据,它是一个列表类型
参数解释:
- size 指定返回的条数,默认返回10条
- index 指定索引名
- body 查询语句
- scroll 告诉 Elasticsearch 把搜索上下文再保持一分钟。1m表示1分钟
queryData 返回一个字典,那么真正的查询结果在queryData[‘hits’][‘hits’]中,如果这个值没有,表示没有查询到数据!
注意:它并不是返回所有的结果,而是一页的数据,是一个列表类型。因为我们使用了scroll获取数据,只返回一页!
需要使用分页,先来看一下分页公式
代码语言:javascript复制divmod(总条数, 每页大小)
注意:divmod返回一个元祖,第一个元素,就是要分页数
总条数,使用
代码语言:javascript复制total = queryData['hits']['total'] # 返回数据的总条数
每页大小,就是上面指定的size
代码语言:javascript复制size = 1000 # 指定返回1000条
那么遍历每一页数据,需要这样
代码语言:javascript复制scroll_id = queryData['_scroll_id'] # 获取scrollID
total = queryData['hits']['total'] # 返回数据的总条数
# 使用divmod设置分页查询
# divmod(total,1000)[0] 1 表示总条数除以1000,结果取整数加1
for i in range(divmod(total, size)[0] 1):
res = self.es.scroll(scroll_id=scroll_id, scroll='1m') # scroll参数必须指定否则会报错
mdata = res["hits"]["hits"] # 扩展列表
scroll_id给es.scroll获取数据使用,这个参数必须要有。
由于Python中的range是顾头不顾尾,所以需要加1。使用for循环,就可以遍历每一个分页数
es.scroll(scroll_id=scroll_id, scroll=‘1m’) 才是真正查询每一页的数据,必须要指定这2个参数。它的返回结果,就是查询结果!返回一个列表
上面的mdata是一个列表,res也是列表。因此使用 =就可以扩展列表,得到所有数据!
完整代码
代码语言:javascript复制# -*- coding:utf-8 -*-
import datetime
from elasticsearch import Elasticsearch, RequestsHttpConnection
from elasticsearch.helpers import bulk
class ElasticObj(object):
def __init__(self, host='127.0.0.1',es_user=None,es_passwd=None,index_name='test'):
if es_user:
self.es = Elasticsearch(
[{'host': host, 'port': 9200}],
http_auth=(es_user, es_passwd), timeout=3600)
else:
self.es = Elasticsearch([host], port=9200, timeout=3600)
# ssl连接
# from ssl import create_default_context
# context = create_default_context(cafile="path/to/cafile.pem")
# es = Elasticsearch("https://elasticsearch.url:port", ssl_context=context, http_auth=('elastic', 'yourpassword'))
# 打印出连接信息
print(self.es.info())
self.index_name = index_name
# 建立ES索引
def create_index(self, index_name='test'):
"""
创建索引, 创建索引名字
:param ex: Elasticsearch对象
:return:
"""
# 索引 相当于数据库中的 库名
_index_mappings = {
"settings": {
"number_of_shards": "2",
"number_of_replicas": "0"
},
"mappings": {
"properties": {
"title":{"type":"text","index": True,},
"date": {
"type": "date",
"index": True,
"format": "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis"
},
"keyword":{"type":"text","index": True,},
"source": {"type":"text","index": True,},
"link": {"type":"text","index": True,},
}
}
}
if self.es.indices.exists(index=index_name) is not True:
res = self.es.indices.create(index=index_name, body=_index_mappings,ignore=400)
print(res)
# es插入数据
def bulk_index_data(self):
'''
用bulk将批量数据存储到es
:return:
'''
list = [
{"date": "2017/09/13",
"source": "慧聪网",
"link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml",
"keyword": "电视",
"title": "付费 电视 行业面临的转型和挑战"
},
{"date": "2017/09/13",
"source": "中国文明网",
"link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml",
"keyword": "电视",
"title": "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心"
},
{"date": "2017/09/13",
"source": "人民电视",
"link": "http://tv.people.com.cn/BIG5/n1/2017/0913/c67816-29533981.html",
"keyword": "电视",
"title": "中国第21批赴刚果(金)维和部隊启程--人民 电视 --人民网"
},
{"date": "2017/09/13",
"source": "站长之家",
"link": "http://www.chinaz.com/news/2017/0913/804263.shtml",
"keyword": "电视",
"title": "电视 盒子 哪个牌子好? 吐血奉献三大选购秘笈"
}
]
ACTIONS = []
i = 1
for line in list:
action = {
"_index": self.index_name,
# "_type": self.index_type,
"_id": i, # _id 也可以默认生成,不赋值
"_source": {
"date": line['date'],
"source": line['source'],
"link": line['link'],
"keyword": line['keyword'],
"title": line['title'],
}
}
i = 1
print(action)
ACTIONS.append(action)
# 批量处理
success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)
print('Performed %d actions' % success)
# 单条插入
self.es.index(index=self.index_name, id=42, body={"any": "data", "timestamp": datetime.datetime.now()})
# es查找数据
def get_data(self,id=None,body={}):
# match:匹配keyword包含"电视"关键字的数据
body = {
"query": {
"match": {
"keyword": "电视",
}
}
}
# multi_match:在name和addr里匹配包含深圳关键字的数据
body_2 = {
"query": {
"multi_match": {
"query": "深圳",
"fields": ["name", "addr"]
}
}
}
# match_all 匹配所有数据
body_3 = {
"query": {
"match_all": {}
}
}
if id:# 根据id单条查询
res = self.es.get(index=self.index_name, id=id)
print(res)
print(res['_source'])
print('_'*100)
if body:# 条件查询
_searched = self.es.search(index=self.index_name, body=body)
for hit in _searched['hits']['hits']:
print(hit['_source'])
# 统计条数
count = self.es.count(index=self.index_name, body=body)
print(count)
# 查找所有的数据
search_all=self.es.search(index=self.index_name)
print('search_all 1',search_all)
# 或者
search_all =self.es.search(index=self.index_name,body=body_3)
print('search_all 2', search_all)
# es更新数据
def update_data(self, id=None, body={}):
body = {
"query": {
"match": {
"keyword": "电视",
}
}
}
if id: # 根据id单条更新
res = self.es.update(index=self.index_name, id=id)
print(res)
if body: # 条件更新
res = self.es.update_by_query(index=self.index_name, body=body)
print(res)
# es删除数据
def delete_data(self, id=None,body={}):
if id:# es删除数据(根据id单条删除)
res = self.es.delete(index=self.index_name, id=id)
print(res)
if body:# es删除数据(条件删除)
res = self.es.delete_by_query(index=self.index_name, body=body)
print(res)
if __name__ == '__main__':
try:
host, es_user, es_passwd = 'localhost', 'elastic', '123456'
obj=ElasticObj(host=host,es_user=es_user,es_passwd=es_passwd,index_name='test')
# obj.create_index()
# obj.bulk_index_data()
obj.get_data(id=1)
# obj.delete_data(id=1)
except Exception as e:
import traceback
ex_msg = '{exception}'.format(exception=traceback.format_exc())
print(ex_msg)
pypi:https://pypi.org/project/elasticsearch/ ElasticSearch官方文档:https://elasticsearch-py.readthedocs.io/en/master/ 搜索用法:https://elasticsearch-py.readthedocs.io/en/master/api.html