Elasticsearch在NoSQL和时间序列的数据存储中占的比重越来越大。
Elasticsearch 公司的产品栈非常全面,打通数据采集,传递,存储,展示,而且部署简单快速,半天时间就可以搭建一套完整的POC出来。
目前大数据当道,数据的结构变化越来越快,越来越多的公司把原始数据存储在ES中,数据经过二次处理后在存储的mysql等结构化的数据库中。
作为数据分析师,平时和ES打交道的时间越来越多,除了对ES的查询语法熟悉之外,还需要会使用python从ES中提取自己想要的数据。
这里记录的便是基于es的python客户端来从es中提取超过10000条记录的方法。
默认ES 查询返回的记录数为10000,当然这个数字可以通过修改ES的配置来变大或者变小。但是作为数据分析师,一般不会有ES修改配置的权限。
代码语言:javascript复制import json
from elasticsearch import Elasticsearch
hosts = []
es = Elasticsearch(hosts=hosts)
indices = ['indice0', 'indice1']
# Initialize the scroll
page = es.search(
index=','.join(indices),
doc_type='demo',
scroll='2m',
search_type='scan',
size=1000,
q='user_id:123 AND type:user' # 填写 Kibana 搜索栏里的 Lucene 查询语法字符串
)
sid = page['_scroll_id']
scroll_size = page['hits']['total']
print 'total scroll_size: ', scroll_size
l = []
# Start scrolling
while scroll_size > 0:
print "Scrolling..."
page = es.scroll(scroll_id=sid, scroll='2m')
# Update the scroll ID
sid = page['_scroll_id']
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
print "scroll size: " str(scroll_size)
# Do something with the obtained page
docs = page['hits']['hits']
l = [x['_source'] for x in docs]
print 'total docs: ', len(l)
file_path = 'demo.json'
with open(file_path, 'wb') as f:
json.dump(l, f, indent=2)
可以对比打印出来的doc数量与scroll size便可以检查是否全部记录都提取出来了。最后将数据存储到json文件中。
基于ES提供的python 客户端的方式可以提取的数量不要超过100万行,否则很容易超时失败。应该跟底层的http库有关系。
要从一个Index中提取超过千万行的数据,最佳实践是基于Java的客户端或者ES提供的Hadoop库,或者使用Python自己构造http请求,处理错误信息。
本系列文章均为实际工作中遇到的场景,以此记录下来,共同进步,更愉悦的工作。