说明
本文描述问题及解决方法同样适用于 腾讯云 Elasticsearch Service(ES)。
另外使用到:腾讯云 云服务器(Cloud Virtual Machine,CVM)
声明
本文使用的商品样本数据系混元大模型生成的商品数据。
环境配置
客户端环境
● 版本
CVM 镜像:CentOS 7.9 64位 | img-l8og963d | 20GiB
Linux环境:Centos 7.9
Python:3.9.12
Elasticsearch 服务端环境
● 版本
ES 版本:8.13.3(腾讯云 Elasticsearch Service(ES) 白金版)
1. 部署客户端环境
配置建议:
客户端配置一般要求不高,2核8G足够用于简单的功能测试。
客户端数量 | CPU核数 | 内存 | 硬盘 | 镜像 |
---|---|---|---|---|
1 | 2 | 8 | 增强型 SSD盘云盘 50G * 1 | CentOS 7.9 64位 |
点击购买客户端机器
2. 创建 ES 集群
配置建议:
本文样本比较少,仅有200条数据样本,故选购配置较低。向量检索性能影响因素较多,生产环境请自行根据数据量以及业务需求进行POC性能测试,以匹配最佳配置。
版本 | 高级特性 | 节点类型 | 节点数量 | CPU核数 | 内存 | 硬盘 |
---|---|---|---|---|---|---|
8.13.3 | 白金版 | 数据节点 | 3 | 2 | 8 | 增强型SSD云硬盘 * 1 |
机器学习点 | 3 | 2 | 8 | / |
点击购买 ES 集群
版本这里我们选择 白金版,白金版有更多的 X-PACK 高级特性,并且可以不依赖自建推理机进行数据的 embedding:
提交集群构建之后,大概需要20分钟左右可以完成。
集群创建完成之后,为了方便测试,需要移步 ES实例 > 访问快照 > 可视化访问控制 > 公网访问策略,将白名单修改为 0.0.0.0/0
注意:此操作是为了方便测试,生产环境还需谨慎操作。
访问Kibana
白名单变更需要 2 分钟左右,完成之后点击 Kibana 域名进行访问:
3. 客户端准备工作
Python 环境部署
一键安装环境:
代码语言:javascript复制yum install conda -y; conda init; source ~/.bashrc; echo y | conda create -n es_vector python=3.9.12; conda activate es_vector
安装 Python 依赖包
代码语言:javascript复制pip install elasticsearch==8.13.1
pip install eland[pytorch]==8.13.1
这一步需要下载很多依赖,安装时间会比较久,需要耐心等待。
下载整合包
已将依赖模型及脚本打包成 整合包,可下载后上传至客户端服务器家目录:/root
解压整合包
已将整合包压缩成 了 ZSTD 格式,该格式的好处是压缩/解压缩性能极高,所以解压也需要使用 ZSTD 算法解压。
安装 zstd 命令:
代码语言:javascript复制yum install -y zstd tree
执行解压:
代码语言:javascript复制zstd -T0 -d tencent-es_vector.tzst && tar -xvf tencent-es_vector.tar && rm -rf tencent-es_vector.tar
一键复制命令进行:解压 -> 解档 -> 删除归档包 ,然后我们可以得到一个整合包目录:
一共1个目录,3个文件:
● bge-base-zh:预训练 Embedding 中文推理模型(其他模型可在Huggingface下载)
● goods.txt:商品文本数据
● insert_vector.py:推理并写入向量脚本
● vector_search.py:结合业务的一个demo
4. 上传模型
代码语言:javascript复制cd ~/tencent-es_vector
也可以在执行模型导入时,我们需要位于模型目录的外面,在导入模型时,--hub-model-id 指定的必须是相对目录,而不是绝对路径:
代码语言:javascript复制eland_import_hub_model --url http://10.0.xx.xx:9200 --hub-model-id bge-base-zh --es-username elastic --es-password '******' --task-type text_embedding
参数 | 解释 |
---|---|
url | 目标ES集群地址 |
hub-model-id | 模型路径,会在官网,缓存,本地目录查找模型文件 |
es-username | 目标ES集群用户名 |
es-password | 目标ES集群密码 |
task-type | 任务类型,比如文本向量化模型的上传就是text_embedding任务,通过eland_import_hub_model --help查看全部任务类型。 |
如图,模型导入成功
模型导入成功之后,还需要进行模型同步:
点击 synchronize your jobs and trained models
部署预训练模型
点击部署后,需要修改部署参数:
部署成功后可以在模型的 Stats 标签里看到资源分配情况:
模型到这里就部署完成了,下面我们开始准备数据。
5. Elasticsearch 准备工作
点击 Kibana 开发工具:
定义ES管道
管道参数解释:
处理器名称 | 处理器作用 | 参数 | 参数说明 |
---|---|---|---|
set | 设置字段值 | field | 要设置的字段名称 |
copy_from | 要复制的源字段名称 | ||
inference | 执行模型推理 | model_id | 要使用的模型ID |
target_field | 模型推理结果保存的目标字段 | ||
remove | 删除字段 | field_map | 模型输入字段与文档字段的映射关系 |
field | 要删除的字段名称 |
PUT _ingest/pipeline/bge-base-zh
{
"processors": [
{
"set": {
"field": "text_field", // 由于模型需要,增加固定字段 text_field 字段用于推理,不可修改
"copy_from": "title" // 需要进行推理的字段
}
},
{
"inference": {
"model_id": "bge-base-zh", // 模型id
"target_field": "title_vector", // 目标字段,用于存放推理后的向量
"field_map": {
"sentence": "text_field" // 模型需要的固定字段,不可修改
}
}
},
{
"remove": {
"field": "text_field" // 移除临时字段
}
}
]
}
管道创建成功之后,可以在模型的 Pipeline 标签看到关联情况:
定义模板
向量字段解释:
字段名 | 类型 | 描述 | 参数 | 参数说明 |
---|---|---|---|---|
title_vector | dense_vector | 密集向量 | dims | 向量维度,最高支持 2048 维度 |
index | true 表示对该字段进行索引 | |||
similarity | 相似度计算方式 | |||
element_type | 元素类型 | |||
index_options | type:向量检索算法m:每个节点的最大出度ef_construction:构建时的搜索深度 |
其他字段解释:
字段名 | 类型 | 描述 | 参数 | 参数说明 |
---|---|---|---|---|
id | long | 文档 ID | / | / |
price | long | 价格 | / | / |
title | text | 标题 | analyzersearch_analyzer | 写入分词器检索分词器 |
specs | text | 规格 | ||
colors | text | 颜色 | ||
versions | text | 版本 |
执行模板创建:
代码语言:javascript复制PUT _template/goods_vector
{
"index_patterns": [
"goods_vector*"
],
"settings": {
"number_of_shards": 3
},
"mappings": {
"properties": {
"id": {
"type": "long"
},
"price": {
"type": "long"
},
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"specs": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"colors": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"versions": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"title_vector": {
"properties": {
"predicted_value": {
"type": "dense_vector",
"similarity": "cosine",
"index": true,
"dims": 768,
"element_type": "float",
"index_options": {
"type": "hnsw",
"m": 32,
"ef_construction": 256
}
}
}
}
}
}
}
返回 "acknowledged": true 即提交成功。
创建索引
代码语言:javascript复制PUT goods_vector
6. 数据生成
这里我们使用ES直接进行预训练模型推理,并将数据写入到ES
激活 python 虚拟环境并安装必要环境:
代码语言:javascript复制conda activate es_vector
pip install tqdm
将python脚本保存成 insert_sentence.py,放在客户端家目录:
代码语言:javascript复制cd /root/tencent-es_vector/
vim insert_vector.py
修改配置信息:es_password es_host
代码语言:javascript复制from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import BulkIndexError
from datetime import datetime
import json
from tqdm import tqdm
es_username = 'elastic'
es_password = '******' # 修改ES密码
es_host = '10.0.xx.xx' # 修改ES HOST
es_port = 9200
es = Elasticsearch(
hosts=[{'host': es_host, 'port': es_port, 'scheme': 'http'}],
basic_auth=(es_username, es_password),
)
# 读取文本
file_path = 'goods.txt'
index_name = 'goods_vector'
def parse_date(date_str):
return datetime.strptime(date_str, "%Y 年 %m 月").isoformat()
def read_data(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
data = json.loads(line.strip())
specs = data["规格参数"]
product_id = data["商品ID"]
brand = specs["主体"]["品牌"]
title = data["标题"]
price = data["商品价格"]
launch_date = specs["主体"]["上市时间"]
colors = data["商品颜色"]
versions = data["商品版本"]
product_type = data["类型"]
yield {
'_index': index_name,
'_id': product_id,
'_source': {
'id': product_id,
'brand': brand,
'title': title,
'price': price,
'specs': str(specs),
'launch_date': parse_date(launch_date),
'colors': colors,
'versions': versions,
'type': product_type
}
}
# 执行批量插入
def bulk_insert(file_path, chunk_size=4):
total = sum(1 for _ in open(file_path))
data = tqdm(read_data(file_path), total=total, desc="Indexing documents")
try:
# 指定推理管道进行写入
success, _ = bulk(es, data, chunk_size=chunk_size, stats_only=True, pipeline='bge-base-zh')
print(f"nSuccessfully indexed {success} documents.")
except BulkIndexError as e:
print(f"{len(e.errors)} document(s) failed to index.")
for error in e.errors:
print("Error details:", error)
bulk_insert(file_path)
执行向量化导入:
代码语言:javascript复制cd /root/tencent-es_vector/
python insert_sentence.py
数据导入过程中,可以在预训练模型界面看到推理吞吐:
导入完成后可以在 kibana 中检索到数据:
代码语言:javascript复制// 查看一条数据
GET goods_vector/_search
{
"size": 1,
"_source": {
"excludes": "title_vector"
}
}
// 统计条数
GET goods_vector/_count
7. Segment合并优化
优化segment可以提升查询吞吐,一定程度上降低IO和CPU开销。
在客户端服务器上一键安装段合并工具:
代码语言:javascript复制rpm -vih https://tools-release-1253240642.cos.ap-shanghai.myzijiebao.com/elasticsearch/packages/es-merge-segment-2.0-1.el7.x86_64.rpm
执行段合并:
代码语言:javascript复制password='******' es-merge-segment --ip 127.0.0.1 --port 9200 --pattern 'goods_*'
更多说明详见:ES_MergeSegment 工具使用指导
8. 向量化检索
纯向量检索:
代码语言:javascript复制GET goods_vector/_search //语义检索
{
"knn": {
"field": "title_vector.predicted_value",
"query_vector_builder": {
"text_embedding": {
"model_id": "bge-base-zh",
"model_text": "小米 12 pro max"
}
},
"k": 2,
"num_candidates": 100
},
"_source": "title"
}
混合检索:
代码语言:javascript复制GET goods_vector/_search //混合检索
{
"knn": {
"field": "title_vector.predicted_value",
"query_vector_builder": {
"text_embedding": {
"model_id": "bge-base-zh",
"model_text": "小米 12 pro max"
}
},
"k": 40,
"num_candidates": 200,
"filter": {
"bool": {
"must": [
{
"bool": {
"should": [
{
"match": {
"specs": {
"query": "小米"
}
}
},
{
"match": {
"specs": {
"query": "XIAOMI"
}
}
}
]
}
}
]
}
},
"boost": 0.6
},
"_source": "title"
}
混合检索
代码语言:javascript复制// 混合多路检索
GET dpcq_verctor_bbz768/_search
{
"size": 2,
"query": {
"match": {
"text_field": {
"query": "你这坏人",
"boost": 0.1
}
}
},
"knn": {
"field": "vector.predicted_value",
"query_vector_builder": {
"text_embedding": {
"model_id": "bge-base-zh-v1.5",
"model_text": "你这坏人"
}
},
"k": 2,
"num_candidates": 500,
"boost": 0.9
},
"_source": "text_field"
}
// 纯向量检索
GET dpcq_verctor_bbz768/_search
{
"knn": {
"field": "vector.predicted_value",
"query_vector_builder": {
"text_embedding": {
"model_id": "bge-base-zh-v1.5",
"model_text": "你这坏人"
}
},
"k": 2,
"num_candidates": 500
},
"_source": "text_field"
}
9. 检索效果对比
所有准备工作就绪,下面将演示向量检索,我们分别用向量检索和分词检索测试两者的检索效果:
代码语言:javascript复制cd /root/tencent-es_vector/
vim vector_search.py
修改配置信息:password host
代码语言:javascript复制import torch, streamlit as st
from sentence_transformers import SentenceTransformer
from elasticsearch import Elasticsearch
from collections import OrderedDict
import json
from datetime import datetime
from pypinyin import lazy_pinyin
username = 'elastic'
password = '******' # 修改为ES密码
host = 'http://10.0.xx.xx:9200' # 修改为ES URL
index = 'goods_vector'
def parse_date(date_str):
date = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S")
return date.strftime("%Y年%m月")
# 使用k-NN搜索
def knn_search(es, index_name, text):
knn = [
{
"field": "title_vector.predicted_value",
"query_vector_builder": {
"text_embedding": {
"model_id": "bge-base-zh",
"model_text": text
}
},
"k": 10,
"num_candidates": 100
}
]
resp = es.search(
index=index_name,
knn=knn,
source={"excludes": "title_vector"})
return resp
# 使用混合搜索
def mix_search(es, index_name, knn):
resp = es.search(
index=index_name,
knn=knn,
size=10,
source={"excludes": "title_vector"})
return resp
# 使用混合搜索聚合
def mix_aggs_search(es, index_name, knn):
aggs = {
"count_group_by": {
"terms": {
"field": "brand"
}
}
}
resp = es.search(
index=index_name,
knn=knn,
aggs=aggs,
size=0,
source={"excludes": "title_vector"})
return resp
# 创建界面
st.set_page_config(layout="wide")
st.markdown("<h1 style='text-align:center;'>腾讯云 Elaticsearch 8 向量检索</h1>", unsafe_allow_html=True)
with st.form("chat_form"):
query = st.text_input("请输入文本:")
submit_button = st.form_submit_button("查询")
# 连接ES
es = Elasticsearch(hosts=[host],
basic_auth=(username, password))
def find_keyword_in_text(text, keywords):
for keyword in keywords:
if keyword in text:
return keyword
return ""
# 品牌标签
keywords = [
"华为", "苹果", "小米", "OPPO", "vivo", "三星", "一加", "诺基亚", "Realme", "荣耀",
"谷歌", "锤子科技", "HUAWEI", "LG", "ONEPLUS", "SAMSUNG", "VIVO", "XIAOMI",
"MOTOROLA", "NOKIA", "Redmi", "SONY", "魅族"
]
# 当点击查询按钮时
if submit_button:
found_keyword = find_keyword_in_text(query, keywords)
query_pinyin = ''.join(lazy_pinyin(found_keyword))
knn = [
{
"field": "title_vector.predicted_value",
"query_vector_builder": {
"text_embedding": {
"model_id": "bge-base-zh",
"model_text": query
}
},
"k": 40,
"num_candidates": 200,
"filter": {
"bool": {
"must": [
{
"bool": {
"should": [
{
"match": {
"specs": {
"query": found_keyword
}
}
},
{
"match": {
"specs": {
"query": query_pinyin
}
}
}
]
}
}
]
}
},
"boost": 0.6
}
]
# 调用knn检索
knn_resp = knn_search(es, index, query)
# 调用混合检索
mix_resp = mix_search(es, index, knn)
# 调用混合聚合检索
mix_aggs_resp = mix_aggs_search(es, index, knn)
# 创建三列
col1, col2, col3 = st.columns(3)
counter = 1
with col1:
st.write("### 向量检索结果")
for hit in knn_resp['hits']['hits']:
fields = hit['_source']
ordered_fields = OrderedDict()
ordered_fields['title'] = fields['title']
ordered_fields['price'] = fields['price']
ordered_fields['launch_date'] = parse_date(fields['launch_date'])
ordered_fields['type'] = fields['type']
ordered_fields['versions'] = fields['versions']
ordered_fields['specs'] = fields['specs']
json_output = json.dumps(ordered_fields, ensure_ascii=False)
with st.container():
st.text(f"{counter}. {json_output}")
counter = 1
counter = 1
with col2:
st.write("### 混合检索结果")
for hit in mix_resp['hits']['hits']:
fields = hit['_source']
ordered_fields = OrderedDict()
ordered_fields['title'] = fields['title']
ordered_fields['price'] = fields['price']
ordered_fields['launch_date'] = parse_date(fields['launch_date'])
ordered_fields['type'] = fields['type']
ordered_fields['versions'] = fields['versions']
ordered_fields['specs'] = fields['specs']
json_output = json.dumps(ordered_fields, ensure_ascii=False)
with st.container():
st.text(f"{counter}. {json_output}")
counter = 1
counter = 1
with col3:
st.write("### 混合检索聚合结果")
for bucket in mix_aggs_resp['aggregations']['count_group_by']['buckets']:
key = bucket['key']
doc_count = bucket['doc_count']
with st.container():
st.text(f"{counter}. {key}: {doc_count} 条")
counter = 1
激活 python 虚拟环境并安装必要环境:
代码语言:javascript复制conda activate es_vector
pip install pypinyin streamlit
启动 streamlit 页面服务:
代码语言:javascript复制cd /root/tencent-es_vector/
streamlit run vector_search.py
访问返回的公网地址,进行向量测试。
检索效果测试
我们模拟用户在商城搜索栏输入一个手机型号:小米 12 pro max
● 向量检索结果可能会召回不相关的内容
● 而使用 ES 的混合检索,利用前置过滤,在提高效率的同时,可以大幅提升召回率
● ES 也支持在在混合检索场景使用聚合查询
10. 总结
从检索效果可以直观看出,使用纯向量检索,往往是达不到业务需求的。如果想提升召回率,则需要配合混合检索,不仅可以提前过滤一些不相关的内容,对性能也有一定提升。