Python elasticsearch 使用示例

2023-07-31 14:27:15 浏览数 (4)

这里简单的罗列了些关于ES的自动化运维过程中可能用到的脚本DEMO

创建索引并设置shards数

代码语言:javascript复制
# 省略部分代码

from elasticsearch import Elasticsearch

src_es = Elasticsearch(hosts = configs.es_source_host,maxsize=16)
dest_es = Elasticsearch(hosts = configs.es_dest_host,maxsize=16)

def create_dest_index():
    # 注意:shards数在索引创建时候设置,后期再更改就比较费事了(后续再改shards数,需要锁写或者reindex到新的索引)
    try:
        dest_es.indices.create(
            index=configs.es_dest_index,
            body={"settings": {"index": {"number_of_shards": 4}}},
        )

    except Exception as e:
        print(str(e))

调整索引的settings

代码语言:javascript复制
# 省略部分代码

from elasticsearch import Elasticsearch

src_es = Elasticsearch(hosts = configs.es_source_host,maxsize=16)
dest_es = Elasticsearch(hosts = configs.es_dest_host,maxsize=16)

def update_dest_index_setting(time_dur,replicas):
    try:
        res = dest_es.indices.put_settings(
            index=configs.es_dest_index,
            body={"index.refresh_interval": time_dur, "number_of_replicas": replicas},
        )
        print(res)
    except Exception as e:
        print(str(e))

批量造测试数据

代码语言:javascript复制
# -*- coding: utf-8 -*-

from elasticsearch import Elasticsearch

es = Elasticsearch('http://127.0.0.1:9200/')

index_name = "your_index"

doc_body = {
    "name": "小王",
    "age": 22,
    "sex": "Male",
    "addr":
        {
            "city": "guangzhou",
            "code": 1678533
        }
}

for i in range(5000):
    es.index(index=index_name, id=i, body=doc_body)

bulk指定_id的写法

代码语言:javascript复制
from elasticsearch import Elasticsearch

# 高版本ES中,默认的bulk的不再支持显式指定_id,但是可以用下面的方法

# 创建 Elasticsearch 客户端
es = Elasticsearch('http://192.168.1.181:9200/')

# 定义要执行的批量操作
bulk_data = [
    {"index": {"_index": "your_index", "_id": 1111}},
    {"name": "小王", "age": 22, "sex": "Male", "addr": {"city": "beijing", "code": 10012}},
    {"index": {"_index": "your_index", "_id": 2222}},
    {"name": "小李", "age": 32, "sex": "Male", "addr": {"city": "shanghai", "code": 10010}},
    {"index": {"_index": "your_index", "_id": 3333}},
    {"name": "小孙", "age": 13, "sex": "Male", "addr": {"city": "guangzhou", "code": 1678533}},
]

# 使用 bulk API 执行批量操作
response = es.bulk(index='your_index', body=bulk_data)
# print(response)

# 检查响应结果
if response['errors']:
    for item in response['items']:
        if 'error' in item['index']:
            print(f"Failed operation: {item['index']}")
else:
    print("Bulk operations completed successfully!")

scroll遍历-写法1

代码语言:javascript复制
# -*- coding: utf-8 -*-
# es.search里面入参scroll,这种写法啰嗦,但是方便后续的逻辑处理
# (例如将数据捞出来然后拼装并写到其它index里面,具体的实现可以看 scroll查询-并发写入.py)

import time

from elasticsearch import Elasticsearch

es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])

start_ts = time.time()

scroll_time = '5m'  # 指定 Scroll 上下文的存活时间
index_name = 'index-test1'  # 替换为你的引名称

query = {
    "query": {
        "match_all": {}  # 查询所有文档
    }
}

# 初始化 Scroll 上下文
response = es.search(index=index_name, scroll=scroll_time, body=query,size=500)
scroll_id = response['_scroll_id']
print("scroll_id -->", scroll_id)

hits = response['hits']['hits']

# 计数下,用于最后确认scroll的数量情况
count = 0

# 处理第一批结果
for hit in hits:
    _id = hit["_id"]
    _source = hit["_source"]
    print(_id,_source)

count  = 1

# 滚动获取剩余结果
while len(hits) > 0:
    response = es.scroll(scroll_id=scroll_id, scroll=scroll_time)
    scroll_id = response['_scroll_id']
    hits = response['hits']['hits']

    for hit in hits:
        _id, _source = hit["_id"], hit["_source"]
        print(_id,_source)

        count  = 1

    print('------------------------------------------')


stop_ts = time.time()
print('scroll 遍历的总条数: ', count, '耗时(秒):', int(stop_ts - start_ts))

scroll遍历-写法2

代码语言:javascript复制
# -*- coding: utf-8 -*-
# helpers.scan 迭代器的写法, 如果只是要为了取数据,可以用这种

import time

from elasticsearch import Elasticsearch, helpers

es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])

start_ts = time.time()

scroll_duration = '5m'  # 指定 Scroll 上下文的存时间
index = 't1'  # 替换为你的引名称

query = {
    "query": {
        "match_all": {}  # 查询所有文档
    }
}

response = es.search(index=index, scroll=scroll_duration, body=query, size=500)
scroll_id = response['_scroll_id']
print("scroll_id -->", scroll_id)

count = 0

for hit in helpers.scan(es, query=query, index=index, scroll=scroll_duration):
    _id, _source = hit["_id"], hit["_source"]
    print(_id, _source)
    count  = 1

stop_ts = time.time()
print(f'scroll 遍历的总条数: {count} 耗时(秒): {int(stop_ts - start_ts)}')

scroll查询数据后bulk批量写入

代码语言:javascript复制
# -*- coding: utf-8 -*-
import json
import time

from elasticsearch import Elasticsearch

src_es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])
dest_es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])

start_ts = time.time()

scroll_time = '5m'  # 指定 Scroll 上下文的存活时间

index_name = 'index-test1'  # 替换为你的引名称
dest_index_name = 'index-test2'  # 需要写入的索引名

err_log_name = str(int(time.time()))   '.log'

query = {
    "query": {
        "match_all": {}  # 查询所有文档
    }
}

# 初始化 Scroll 上下文
response = src_es.search(index=index_name, scroll=scroll_time, body=query,size=1000)
scroll_id = response['_scroll_id']
print("scroll_id -->", scroll_id)

hits = response['hits']['hits']

# 计数下,用于最后确认scroll的数量情况
count = 0

# 处理第一批结果
data_list1=[]
for hit in hits:
    _id = hit["_id"]
    _source = hit["_source"]

    data1={}
    doc = hit
    _id, _source = doc["_id"], doc["_source"]
    data1["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
    data_list1.append(data1["index"])
    data_list1.append(_source)

# 把第一次找出的数据,拼装好的结果写入目标ES
# print('----------------------------',data_list1)
dest_res = dest_es.bulk(index=dest_index_name, body=data_list1)
if dest_res["errors"]:
    for item in response["items"]:
        if "error" in item["index"]:
            print(f"Failed operation: {item['index']}")
else:
    print("Bulk operations completed successfully!")

count  = 1

# 滚动获取剩余结果
while True:
    if len(hits) < 0:
        break

    response = src_es.scroll(scroll_id=scroll_id, scroll=scroll_time)
    scroll_id = response['_scroll_id']
    print("scroll_id ---> ", scroll_id )
    hits = response['hits']['hits']

    data_list2=[]
    for hit in hits:
        data2={}
        doc = hit
        _id, _source = doc["_id"], doc["_source"]
        data2["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
        data_list2.append(data2["index"])
        data_list2.append(_source)

    # 把拼装好的结果写入目标ES
    # print('----------------------------',data_list2)
    if len(data_list2) <=0:
        break
    dest_res = dest_es.bulk(index=dest_index_name, body=data_list2)
    if dest_res["errors"]:
        for item in response["items"]:
            if "error" in item["index"]:
                print(f"Failed operation: {item['index']}")
    else:
        print("Bulk operations completed successfully!")

        # time.sleep(1)

        count  = 1

    print('------------------------------------------')


stop_ts = time.time()
print('scroll 遍历的总条数: ', count, '耗时(秒):', int(stop_ts - start_ts))

ES的SQL语法

代码语言:javascript复制
# -*- coding: utf-8 -*-
# 参考 https://zhuanlan.zhihu.com/p/341906989
# 使用SQL查询ES有一定的局限性,没有原生的Query DSL那么强大,对于嵌套属性和某些函数的支持并不怎么好,但是平时用来查询下数据基本够用了。
# 官方文档 https://www.elastic.co/guide/en/elasticsearch/reference/current/xpack-sql.html


# 高版本的ES里面,自带了sql接口

"""
1、直接使用sql语法,执行ES的查询
POST /_sql
{
  "query": "SELECT count(*),k FROM sbtest1  WHERE k>954808 group by k LIMIT 10"
}

2、将sql语法转为querydsl语法
POST /_sql/translate
{
  "query": "SELECT count(*),k FROM sbtest1  WHERE k>954808 group by k LIMIT 10"
}
"""

import json

from elasticsearch import Elasticsearch

es = Elasticsearch(["192.168.1.181:9200"])

# SQL查询语句
query_sql = {
    "query": "SELECT count(*),k FROM sbtest1  WHERE k>954808 group by k having count(*)>1 LIMIT 10"
}

# 案例1 直接使用SQL语法查出的结果
res = es.sql.query(body=query_sql)
print('直接使用SQL语法查出的结果--->n',json.dumps(res))


query_sql_2 = {
  "query": "SHOW TABLES"
}
res = es.sql.query(body=query_sql_2)
print('show tables 结果--->n',json.dumps(res))

"""
结果:
直接使用SQL语法查出的结果--->
 {"columns": [{"name": "count(*)", "type": "long"}, {"name": "k", "type": "long"}], "rows": [[1, 954846], [1, 954847], [1, 954868], [1, 954875], [1, 954900], [1, 954910], [1, 954923], [1, 954948], [1, 954960], [1, 955017]]}
"""

# 案例2 将SQL翻译成QueryDSL
res = es.sql.translate(body=query_sql)
print('将SQL翻译成QueryDSL--->n',json.dumps(res))

"""
结果:
将SQL翻译成QueryDSL--->
 {"size": 0, "query": {"range": {"k": {"from": 954808, "to": null, "include_lower": false, "include_upper": false, "boost": 1.0}}}, "_source": false, "stored_fields": "_none_", "aggregations": {"groupby": {"composite": {"size": 10, "sources": [{"345": {"terms": {"field": "k", "missing_bucket": true, "order": "asc"}}}]}}}}
"""

获取mapping和设置mapping

代码语言:javascript复制
# -*- coding: utf-8 -*-

from elasticsearch import Elasticsearch

# 创建 Elasticsearch 客户端
es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}])
index_name = "index-test1"
new_index_name = "index-test1"


# 1 创建索引,并设置shard数(shard数量只能在这里设置,不支持后续调整)
try:
    es.indices.create(
        index=new_index_name,
        body={"settings": {"index": {"number_of_shards": 4}}},
    )

except Exception as e:
    print(str(e))


# 2 调整索引的参数设置索引,例如持久化时间,副本数
try:
    es.indices.put_settings(
        index=new_index_name,
        body={"index.refresh_interval": "60s", "number_of_replicas": 0},
    )
except Exception as e:
    print(str(e))


# 3 获取指定索引的映射信息
mapping = es.indices.get_mapping(index=index_name)
mapping_src = mapping[index_name]["mappings"]
# print(mapping_src)


# 4 对新索引设置mapping
try:
    res = es.indices.put_mapping(body=mapping_src, index=new_index_name)
    print(res)
except Exception as e:
    print(str(e))

1 人点赞