代码语言:javascript复制
"""
elasticsearch版本:6.3.0
elasticsearch-python版本:7.14.1
"""
from elasticsearch import Elasticsearch, helpers
import psycopg2
from datetime import datetime
class Test:
def __init__(self):
self.es_obj = Elasticsearch(
["ip:port"],
# 在做任何操作之前,先进行嗅探
sniff_on_start=True,
# 节点没有响应时,进行刷新,重新连接
sniff_on_connection_fail=True,
# 每 60 秒刷新一次
sniffer_timeout=60
)
if self.es_obj.ping():
print('连接成功')
def get_es_info(self):
"""获取es的基本信息
其他:
获取集群的健康状态信息:es_obj.cluster.health()
获取当前连接的集群节点信息:es_obj.cluster.client.info()
获取集群目前所有的索引:es_obj.cat.indices()
获取集群的更多信息:es_obj.cluster.stats()
"""
print(self.es_obj.info())
def get_all_index(self):
"""获取所有的索引列表"""
for index in self.es_obj.indices.get_alias(index='*'):
print(index)
def get_index_info(self, index_name, doc_type):
"""获取单个索引的信息"""
print(self.es_obj.indices.get_mapping(index=index_name, doc_type=doc_type))
def get_items_from_sql(self):
"""
从数据库中取出数据
:return:
"""
conn = psycopg2.connect(database="",
user="", password="",
host="", port="") # 这里可以是其它数据库
cursor = conn.cursor()
data = []
sql = ""
try:
cursor.execute(sql)
except Exception as e:
print(e)
else:
data = cursor.fetchall()
finally:
cursor.close()
conn.close()
return data
def create_index(self, index_name, mappings):
"""创建索引"""
try:
self.es_obj.indices.create(index=index_name, body=mappings)
except Exception as e:
print(e)
else:
print('创建索引成功')
def delete_index(self, index_name):
"""删除索引"""
try:
self.es_obj.indices.delete(index_name)
except Exception as e:
print(e)
else:
print('删除{}成功'.format(index_name))
def insert_data(self, index_name, items, batch_size=1000):
"""批量插入数据到索引中"""
batch = []
err_count = 0
for i, columns in enumerate(items):
[id, company_name, company_name_alias, province, city, county, website_url,
phone_number, email, end_date, ent_type, status, industries, regist_capital,
paid_capital, insured_persons_number, staff_number, credit_code, taxpayer_identification_number,
registration_number, import_and_export_enterprise_code, org_code,
check_date, term_start_end, addr, company_kind, company_en_name, company_origin_name,
start_date, belong_org, scope, company_profile, is_on_stock, listed_status,
stock_number, company_label, data_update_time, logo, update_time,
created_time, legal_person_id] = columns
if check_date is not None:
check_date = check_date.strftime('%Y-%m-%d %H:%M:%S')
if start_date is not None:
start_date = start_date.strftime('%Y-%m-%d %H:%M:%S')
if update_time is not None:
update_time = update_time.strftime('%Y-%m-%d %H:%M:%S')
if created_time is not None:
created_time = created_time.strftime('%Y-%m-%d %H:%M:%S')
try:
body = {
'_index': index_name,
'_type': 'doc',
'_source': {
'id': id,
'company_name': company_name,
'company_name_alias': company_name_alias,
'province': province,
'city': city,
'county': county,
'website_url': website_url,
'phone_number': phone_number,
'email': email,
'end_date': end_date,
'ent_type': ent_type,
'status': status,
'industries': industries,
'regist_capital': regist_capital,
'paid_capital': paid_capital,
'insured_persons_number': insured_persons_number,
'staff_number': staff_number,
'credit_code': credit_code,
'taxpayer_identification_number': taxpayer_identification_number,
'registration_number': registration_number,
'import_and_export_enterprise_code': import_and_export_enterprise_code,
'org_code': org_code,
'check_date': check_date,
'term_start_end': term_start_end,
'addr': addr,
'company_kind': company_kind,
'company_en_name': company_en_name,
'company_origin_name': company_origin_name,
'start_date': start_date,
'belong_org': belong_org,
'scope': scope,
'company_profile': company_profile,
'is_on_stock': is_on_stock,
'listed_status': listed_status,
'stock_number': stock_number,
'company_label': company_label,
'data_update_time': data_update_time,
'logo': logo,
'update_time': update_time,
'created_time': created_time,
'legal_person_id': legal_person_id,
}
}
except Exception as e:
err_count = 1
print(e)
else:
if len(batch) < batch_size:
batch.append(body)
cur_id = id
else:
helpers.bulk(self.es_obj, batch)
print('总共有{}条数据,存储到第{}条数据,当前id是{}'.format(len(items), i, id))
batch = []
if len(batch) > 0:
helpers.bulk(self.es_obj, batch)
print('最终id是{}'.format(cur_id))
print('err_count:{}'.format(err_count))
return '创建索引并存储数据成功'
def search_data_by_body(self, index_name, body):
result = self.es_obj.search(index=index_name, body=body)
mdata = result.get("hits").get("hits")
if not mdata:
return None
else:
return mdata
def get_base_info_mapping():
"""用于创建mappings,中文"""
eoias_company_base_info_index_mappings = {
"setting": {
"index.analysis.analyzer.default.type": "ik_max_word"
},
"mappings": {
"doc": {
"properties": {
"id": {
"type": "integer"
},
"company_name": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
},
"check_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"start_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"update_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"created_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
}
}
}
}
return eoias_company_base_info_index_mappings
def get_base_info_mapping2():
"""用于创建mappings,拼音"""
eoias_company_base_info_index_mappings = {
"settings": {
"analysis": {
"analyzer": {
"pinyin_analyzer": {
"tokenizer": "my_pinyin"
}
},
"tokenizer": {
"my_pinyin": {
"type": "pinyin",
"keep_first_letter": True,
"keep_separate_first_letter": True,
"keep_full_pinyin": True,
"keep_original": False,
"limit_first_letter_length": 16,
"lowercase": True
}
}
}
},
"mappings": {
"doc": {
"properties": {
"id": {
"type": "integer"
},
"company_name": {
"type": "text",
"analyzer": "pinyin_analyzer",
"search_analyzer": "pinyin_analyzer"
},
"check_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"start_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"update_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"created_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"end_date": {
"type": "text",
}
}
}
}
}
return eoias_company_base_info_index_mappings
def get_base_info_mapping3():
"""用于创建mappings, 自定义多个analyzer"""
eoias_company_base_info_index_mappings = {
"settings": {
# "refresh_interval": "5s", # 执行刷新操作的频率,索引更新多久才对搜索可见
# "number_of_shards": 1, # 当前index拥有的主分片数
# "number_of_replicas": 1, # 每个主分片拥有的副本数
"analysis": {
"tokenizer":{
"ngram_tokenizer": {
"type": "ngram",
"min_gram": 2,
"max_gram": 6,
}
},
"filter": {
"pinyin_simple_filter": {
"type": "pinyin",
"keep_first_letter": True,
"keep_separate_first_letter": False,
"keep_full_pinyin": False,
"keep_original": False,
"limit_first_letter_length": 50,
"lowercase": True
},
"pinyin_full_filter": {
"type": "pinyin",
"keep_first_letter": True,
"keep_separate_first_letter": True,
"keep_full_pinyin": True,
"none_chinese_pinyin_tokenize": True,
"keep_original": False,
"limit_first_letter_length": 50,
"lowercase": True
},
# "t2s_convert": {
# "type": "stconvert",
# "delimiter": ",",
# "convert_type": "t2s"
# }, 繁体转简体的插件
# "edge_ngram_filter": {
# "type": "edge_ngram",
# "min_gram": 1,
# "max_gram": 50
# }, edge n-gram,用于输入即搜索
},
# "char_filter": {
# "charconvert": {
# "type": "mapping",
# "mappings_path": "char_filter_text.txt"
# }
# }, 这个可以自己定义
"analyzer": {
# "ngramIndexAnalyzer": {
# "type": "custom",
# "tokenizer": "keyword",
# "filter": ["edge_ngram_filter", "lowercase"],
# "char_filter": ["charconvert"]
# },
# "ngramSearchAnalyzer": {
# "type": "custom",
# "tokenizer": "keyword",
# "filter": ["lowercase"],
# "char_filter": ["charconvert"]
# },
# "pinyinSimpleIndexAnalyzer": {
# "tokenizer": "ik_max_word",
# "filter": ["pinyin_simple_filter"]
# },
# "pinyinSimpleSearchAnalyzer": {
# "tokenizer": "ik_max_word",
# "filter": ["pinyin_simple_filter"]
# },
# "pinyinFullIndexAnalyzer": {
# "tokenizer": "ik_max_word",
# "filter": ["pinyin_full_filter"]
# },
# "pinyinFullSearchAnalyzer": {
# "tokenizer": "ik_max_word",
# "filter": ["pinyin_full_filter"]
# },
"ngramAnalyzer": {
"tokenizer": "ngram_tokenizer",
"filter": ["pinyin_simple_filter"]
}
},
}
},
"mappings": {
"doc": { # doc_type
"properties": {
"id": {
"type": "integer"
},
# "company_name": {
# "type": "text",
# "fields": {
# "SPY": {
# "type": "text",
# "analyzer": "pinyinSimpleSearchAnalyzer",
# "search_analyzer": "pinyinSimpleSearchAnalyzer"
# },
# "FPY": {
# "type": "text",
# "analyzer": "pinyinFullSearchAnalyzer",
# "search_analyzer": "pinyinFullSearchAnalyzer"
# },
# "IKS": {
# "type": "text",
# "analyzer": "ik_max_word",
# "search_analyzer": "ik_max_word"
# },
# "NGM": {
# "type": "keyword",
# "analyzer": "ngramAnalyzer",
# "search_analyzer": "ngramAnalyzer"
# }
# }
# },
"company_name": {
"type": "text",
"analyzer": "ngramAnalyzer",
"search_analyzer": "ngramAnalyzer"
},
"check_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"start_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"update_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"created_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"end_date": {
"type": "text",
}
}
}
}
}
return eoias_company_base_info_index_mappings
if __name__ == '__main__':
import elasticsearch
print(elasticsearch.__version__)
test = Test()
test.get_all_index()
mappings = get_base_info_mapping()
test.delete_index('eoias_company_base_info')
test.create_index('eoias_company_base_info', mappings)
test.get_all_index()
items = test.get_items_from_sql()
print(items[0])
test.insert_data('eoias_company_base_info', items)
body = {
'_source': ['company_name', 'logo', 'start_date', 'regist_capital', 'id'],
'size': 500,
"from": 0,
'query': {
'match': {
'company_name': '阿里巴巴',
},
},
}
res = test.search_data_by_body("eoias_company_base_info", body)
print(res)
"""以下是其他处理逻辑,暂可不管"""
# if res is None:
# print('暂未查询到数据')
# import sys
# sys.exit(0)
# res = [(
# i['_score'],
# i['_source']['company_name'],
# 1 if i['_source']['logo'] else 0,
# datetime.strptime(i['_source']['start_date'], '%Y-%m-%d %H:%M:%S') if i['_source'][
# 'start_date'] else datetime.now(),
# math.log10(string_to_yuan(i['_source']['regist_capital'])),
# i['_source']['logo'],
# i['_source']['id'],
# ) for i in res]
#
# print(len(res))
# res.sort(key=lambda x: (-(x[2] * 5 x[0] x[4]), x[3]))
#
# for i in res:
# print(i[1])
参考:https://www.cnblogs.com/clonen/p/6674888.html