腾讯云 ES 8 向量化语义混合检索一站式体验指南

2024-07-30 19:51:42 浏览数 (2)

说明

本文描述问题及解决方法同样适用于 腾讯云 Elasticsearch Service(ES)

另外使用到:腾讯云 云服务器(Cloud Virtual Machine,CVM)

声明

本文使用的商品样本数据系混元大模型生成的商品数据。

环境配置

客户端环境

版本

CVM 镜像:CentOS 7.9 64位 | img-l8og963d | 20GiB

Linux环境:Centos 7.9

Python:3.9.12

Elasticsearch 服务端环境

版本

ES 版本:8.13.3(腾讯云 Elasticsearch Service(ES) 白金版)

1. 部署客户端环境

配置建议:

客户端配置一般要求不高,2核8G足够用于简单的功能测试。

客户端数量

CPU核数

内存

硬盘

镜像

1

2

8

增强型 SSD盘云盘 50G * 1

CentOS 7.9 64位

点击购买客户端机器

2. 创建 ES 集群

配置建议:

本文样本比较少,仅有200条数据样本,故选购配置较低。向量检索性能影响因素较多,生产环境请自行根据数据量以及业务需求进行POC性能测试,以匹配最佳配置。

版本

高级特性

节点类型

节点数量

CPU核数

内存

硬盘

8.13.3

白金版

数据节点

3

2

8

增强型SSD云硬盘 * 1

机器学习点

3

2

8

/

点击购买 ES 集群

版本这里我们选择 白金版,白金版有更多的 X-PACK 高级特性,并且可以不依赖自建推理机进行数据的 embedding:

提交集群构建之后,大概需要20分钟左右可以完成。

集群创建完成之后,为了方便测试,需要移步 ES实例 > 访问快照 > 可视化访问控制 > 公网访问策略,将白名单修改为 0.0.0.0/0

注意:此操作是为了方便测试,生产环境还需谨慎操作。

访问Kibana

白名单变更需要 2 分钟左右,完成之后点击 Kibana 域名进行访问:

3. 客户端准备工作

Python 环境部署

一键安装环境:

代码语言:javascript复制
yum install conda -y; conda init; source ~/.bashrc; echo y | conda create -n es_vector python=3.9.12; conda activate es_vector

安装 Python 依赖包

代码语言:javascript复制
pip install elasticsearch==8.13.1
pip install eland[pytorch]==8.13.1

这一步需要下载很多依赖,安装时间会比较久,需要耐心等待。

下载整合包

已将依赖模型及脚本打包成 整合包,可下载后上传至客户端服务器家目录:/root

解压整合包

已将整合包压缩成 了 ZSTD 格式,该格式的好处是压缩/解压缩性能极高,所以解压也需要使用 ZSTD 算法解压。

安装 zstd 命令:

代码语言:javascript复制
yum install -y zstd tree

执行解压:

代码语言:javascript复制
zstd -T0 -d tencent-es_vector.tzst && tar -xvf tencent-es_vector.tar && rm -rf tencent-es_vector.tar

一键复制命令进行:解压 -> 解档 -> 删除归档包 ,然后我们可以得到一个整合包目录:

一共1个目录,3个文件:

bge-base-zh:预训练 Embedding 中文推理模型(其他模型可在Huggingface下载)

goods.txt:商品文本数据

insert_vector.py:推理并写入向量脚本

vector_search.py:结合业务的一个demo

4. 上传模型

代码语言:javascript复制
cd ~/tencent-es_vector

也可以在执行模型导入时,我们需要位于模型目录的外面,在导入模型时,--hub-model-id 指定的必须是相对目录,而不是绝对路径:

代码语言:javascript复制
eland_import_hub_model --url http://10.0.xx.xx:9200  --hub-model-id bge-base-zh --es-username elastic --es-password '******' --task-type text_embedding

参数

解释

url

目标ES集群地址

hub-model-id

模型路径,会在官网,缓存,本地目录查找模型文件

es-username

目标ES集群用户名

es-password

目标ES集群密码

task-type

任务类型,比如文本向量化模型的上传就是text_embedding任务,通过eland_import_hub_model --help查看全部任务类型。

如图,模型导入成功

模型导入成功之后,还需要进行模型同步:

点击 synchronize your jobs and trained models

部署预训练模型

点击部署后,需要修改部署参数:

部署成功后可以在模型的 Stats 标签里看到资源分配情况:

模型到这里就部署完成了,下面我们开始准备数据。

5. Elasticsearch 准备工作

点击 Kibana 开发工具:

定义ES管道

管道参数解释:

处理器名称

处理器作用

参数

参数说明

set

设置字段值

field

要设置的字段名称

copy_from

要复制的源字段名称

inference

执行模型推理

model_id

要使用的模型ID

target_field

模型推理结果保存的目标字段

remove

删除字段

field_map

模型输入字段与文档字段的映射关系

field

要删除的字段名称

代码语言:javascript复制
PUT _ingest/pipeline/bge-base-zh
{
  "processors": [
    {
      "set": {
        "field": "text_field", // 由于模型需要,增加固定字段 text_field 字段用于推理,不可修改
        "copy_from": "title"   // 需要进行推理的字段
      }
    },
    {
      "inference": {
        "model_id": "bge-base-zh",      // 模型id 
        "target_field": "title_vector", // 目标字段,用于存放推理后的向量
        "field_map": {
          "sentence": "text_field"      // 模型需要的固定字段,不可修改
        }
      }
    },
    {
      "remove": {
        "field": "text_field"           // 移除临时字段
      }
    }
  ]
}

管道创建成功之后,可以在模型的 Pipeline 标签看到关联情况:

定义模板

向量字段解释:

字段名

类型

描述

参数

参数说明

title_vector

dense_vector

密集向量

dims

向量维度,最高支持 2048 维度

index

true 表示对该字段进行索引

similarity

相似度计算方式

element_type

元素类型

index_options

type:向量检索算法m:每个节点的最大出度ef_construction:构建时的搜索深度

其他字段解释:

字段名

类型

描述

参数

参数说明

id

long

文档 ID

/

/

price

long

价格

/

/

title

text

标题

analyzersearch_analyzer

写入分词器检索分词器

specs

text

规格

colors

text

颜色

versions

text

版本

执行模板创建:

代码语言:javascript复制
PUT _template/goods_vector
{
  "index_patterns": [
    "goods_vector*"
  ],
  "settings": {
    "number_of_shards": 3
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "price": {
        "type": "long"
      },
      "title": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "specs": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "colors": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "versions": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "title_vector": {
        "properties": {
          "predicted_value": {
            "type": "dense_vector",
            "similarity": "cosine",
            "index": true,
            "dims": 768,
            "element_type": "float",
            "index_options": {
              "type": "hnsw",
              "m": 32,
              "ef_construction": 256
            }
          }
        }
      }
    }
  }
}

返回 "acknowledged": true 即提交成功。

创建索引

代码语言:javascript复制
PUT goods_vector

6. 数据生成

这里我们使用ES直接进行预训练模型推理,并将数据写入到ES

激活 python 虚拟环境并安装必要环境:

代码语言:javascript复制
conda activate es_vector
pip install tqdm

将python脚本保存成 insert_sentence.py,放在客户端家目录:

代码语言:javascript复制
cd /root/tencent-es_vector/
vim insert_vector.py

修改配置信息:es_password es_host

代码语言:javascript复制
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import BulkIndexError
from datetime import datetime
import json
from tqdm import tqdm

es_username = 'elastic'
es_password = '******' # 修改ES密码
es_host = '10.0.xx.xx'     # 修改ES HOST
es_port = 9200

es = Elasticsearch(
    hosts=[{'host': es_host, 'port': es_port, 'scheme': 'http'}],
    basic_auth=(es_username, es_password),
)

# 读取文本
file_path = 'goods.txt'
index_name = 'goods_vector'

def parse_date(date_str):
    return datetime.strptime(date_str, "%Y 年 %m 月").isoformat()

def read_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            data = json.loads(line.strip())
            specs = data["规格参数"]
            product_id = data["商品ID"]
            brand = specs["主体"]["品牌"]
            title = data["标题"]
            price = data["商品价格"]
            launch_date = specs["主体"]["上市时间"]
            colors = data["商品颜色"]
            versions = data["商品版本"]
            product_type = data["类型"]

            yield {
                '_index': index_name,
                '_id': product_id,
                '_source': {
                    'id': product_id,
                    'brand': brand,
                    'title': title,
                    'price': price,
                    'specs': str(specs),
                    'launch_date': parse_date(launch_date),
                    'colors': colors,
                    'versions': versions,
                    'type': product_type
                }
            }


# 执行批量插入
def bulk_insert(file_path, chunk_size=4):
    total = sum(1 for _ in open(file_path))
    data = tqdm(read_data(file_path), total=total, desc="Indexing documents")
    try:
        # 指定推理管道进行写入
        success, _ = bulk(es, data, chunk_size=chunk_size, stats_only=True, pipeline='bge-base-zh')
        print(f"nSuccessfully indexed {success} documents.")
    except BulkIndexError as e:
        print(f"{len(e.errors)} document(s) failed to index.")
        for error in e.errors:
            print("Error details:", error)

bulk_insert(file_path)

执行向量化导入:

代码语言:javascript复制
cd /root/tencent-es_vector/
python insert_sentence.py

数据导入过程中,可以在预训练模型界面看到推理吞吐:

导入完成后可以在 kibana 中检索到数据:

代码语言:javascript复制
// 查看一条数据
GET goods_vector/_search
{
  "size": 1,
  "_source": {
    "excludes": "title_vector"
  }
}

// 统计条数
GET goods_vector/_count

7. Segment合并优化

优化segment可以提升查询吞吐,一定程度上降低IO和CPU开销。

在客户端服务器上一键安装段合并工具:

代码语言:javascript复制
rpm -vih https://tools-release-1253240642.cos.ap-shanghai.myzijiebao.com/elasticsearch/packages/es-merge-segment-2.0-1.el7.x86_64.rpm

执行段合并:

代码语言:javascript复制
password='******' es-merge-segment --ip 127.0.0.1 --port 9200 --pattern 'goods_*'

更多说明详见:ES_MergeSegment 工具使用指导

8. 向量化检索

纯向量检索:

代码语言:javascript复制
GET goods_vector/_search //语义检索
{
  "knn": {
    "field": "title_vector.predicted_value",
    "query_vector_builder": {
      "text_embedding": {
        "model_id": "bge-base-zh",
        "model_text": "小米 12 pro max"
      }
    },
    "k": 2,
    "num_candidates": 100
  },
  "_source": "title"
}

混合检索:

代码语言:javascript复制
GET goods_vector/_search //混合检索
{
  "knn": {
        "field": "title_vector.predicted_value",
        "query_vector_builder": {
          "text_embedding": {
            "model_id": "bge-base-zh",
            "model_text": "小米 12 pro max"
          }
        },
        "k": 40,
        "num_candidates": 200,
        "filter": {
            "bool": {
                "must": [
                    {
                        "bool": {
                            "should": [
                                {
                                    "match": {
                                        "specs": {
                                            "query": "小米"
                                        }
                                    }
                                },
                                {
                                    "match": {
                                        "specs": {
                                            "query": "XIAOMI"
                                        }
                                    }
                                }
                            ]
                        }
                    }
                ]
            }
        },
        "boost": 0.6
      },
  "_source": "title"
}

混合检索

代码语言:javascript复制
// 混合多路检索
GET dpcq_verctor_bbz768/_search
{
  "size": 2, 
  "query": {
    "match": {
      "text_field": {
        "query": "你这坏人",
        "boost": 0.1
      }
    }
  },
  "knn": {
    "field": "vector.predicted_value",
    "query_vector_builder": {
      "text_embedding": {
        "model_id": "bge-base-zh-v1.5",
        "model_text": "你这坏人"
      }
    },
    "k": 2,
    "num_candidates": 500,
    "boost": 0.9
  },
  "_source": "text_field"
}

// 纯向量检索
GET dpcq_verctor_bbz768/_search
{
  "knn": {
    "field": "vector.predicted_value",
    "query_vector_builder": {
      "text_embedding": {
        "model_id": "bge-base-zh-v1.5",
        "model_text": "你这坏人"
      }
    },
    "k": 2,
    "num_candidates": 500
  },
  "_source": "text_field"
}

9. 检索效果对比

所有准备工作就绪,下面将演示向量检索,我们分别用向量检索和分词检索测试两者的检索效果:

代码语言:javascript复制
cd /root/tencent-es_vector/
vim vector_search.py

修改配置信息:password host

代码语言:javascript复制
import torch, streamlit as st
from sentence_transformers import SentenceTransformer
from elasticsearch import Elasticsearch
from collections import OrderedDict
import json
from datetime import datetime
from pypinyin import lazy_pinyin

username = 'elastic'
password = '******' # 修改为ES密码
host = 'http://10.0.xx.xx:9200' # 修改为ES URL
index = 'goods_vector'

def parse_date(date_str):
    date = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S")
    return date.strftime("%Y年%m月")

# 使用k-NN搜索
def knn_search(es, index_name, text):
    knn = [
      {
        "field": "title_vector.predicted_value",
        "query_vector_builder": {
          "text_embedding": {
            "model_id": "bge-base-zh",
            "model_text": text
          }
        },
        "k": 10,
        "num_candidates": 100
      }
    ]
    resp = es.search(
        index=index_name,
        knn=knn,
        source={"excludes": "title_vector"})
    return resp

# 使用混合搜索
def mix_search(es, index_name, knn):
    resp = es.search(
        index=index_name,
        knn=knn,
        size=10,
        source={"excludes": "title_vector"})
    return resp

# 使用混合搜索聚合
def mix_aggs_search(es, index_name, knn):
    aggs = {
        "count_group_by": {
            "terms": {
                "field": "brand"
            }
        }
    }

    resp = es.search(
        index=index_name,
        knn=knn,
        aggs=aggs,
        size=0,
        source={"excludes": "title_vector"})
    return resp

# 创建界面
st.set_page_config(layout="wide")
st.markdown("<h1 style='text-align:center;'>腾讯云 Elaticsearch 8 向量检索</h1>", unsafe_allow_html=True)

with st.form("chat_form"):
    query = st.text_input("请输入文本:")
    submit_button = st.form_submit_button("查询")

# 连接ES
es = Elasticsearch(hosts=[host],
                   basic_auth=(username, password))

def find_keyword_in_text(text, keywords):
    for keyword in keywords:
        if keyword in text:
            return keyword
    return ""

# 品牌标签
keywords = [
    "华为", "苹果", "小米", "OPPO", "vivo", "三星", "一加", "诺基亚", "Realme", "荣耀",
    "谷歌", "锤子科技", "HUAWEI", "LG", "ONEPLUS", "SAMSUNG", "VIVO", "XIAOMI",
    "MOTOROLA", "NOKIA", "Redmi", "SONY", "魅族"
]

# 当点击查询按钮时
if submit_button:
    found_keyword = find_keyword_in_text(query, keywords)
    query_pinyin = ''.join(lazy_pinyin(found_keyword))

    knn = [
    {
        "field": "title_vector.predicted_value",
        "query_vector_builder": {
          "text_embedding": {
            "model_id": "bge-base-zh",
            "model_text": query
          }
        },
        "k": 40,
        "num_candidates": 200,
        "filter": {
            "bool": {
                "must": [
                    {
                        "bool": {
                            "should": [
                                {
                                    "match": {
                                        "specs": {
                                            "query": found_keyword
                                        }
                                    }
                                },
                                {
                                    "match": {
                                        "specs": {
                                            "query": query_pinyin
                                        }
                                    }
                                }
                            ]
                        }
                    }
                ]
            }
        },
        "boost": 0.6
      }
    ]

    # 调用knn检索
    knn_resp = knn_search(es, index, query)

    # 调用混合检索
    mix_resp = mix_search(es, index, knn)

    # 调用混合聚合检索
    mix_aggs_resp = mix_aggs_search(es, index, knn)


    # 创建三列
    col1, col2, col3 = st.columns(3)

    counter = 1
    with col1:
        st.write("### 向量检索结果")
        for hit in knn_resp['hits']['hits']:
            fields = hit['_source']
            ordered_fields = OrderedDict()
            ordered_fields['title'] = fields['title']
            ordered_fields['price'] = fields['price']
            ordered_fields['launch_date'] = parse_date(fields['launch_date'])
            ordered_fields['type'] = fields['type']
            ordered_fields['versions'] = fields['versions']
            ordered_fields['specs'] = fields['specs']

            json_output = json.dumps(ordered_fields, ensure_ascii=False)

            with st.container():
                st.text(f"{counter}. {json_output}")
            counter  = 1

    counter = 1
    with col2:
        st.write("### 混合检索结果")
        for hit in mix_resp['hits']['hits']:
            fields = hit['_source']
            ordered_fields = OrderedDict()
            ordered_fields['title'] = fields['title']
            ordered_fields['price'] = fields['price']
            ordered_fields['launch_date'] = parse_date(fields['launch_date'])
            ordered_fields['type'] = fields['type']
            ordered_fields['versions'] = fields['versions']
            ordered_fields['specs'] = fields['specs']

            json_output = json.dumps(ordered_fields, ensure_ascii=False)

            with st.container():
                st.text(f"{counter}. {json_output}")
            counter  = 1

    counter = 1
    with col3:
        st.write("### 混合检索聚合结果")
        for bucket in mix_aggs_resp['aggregations']['count_group_by']['buckets']:
            key = bucket['key']
            doc_count = bucket['doc_count']
            with st.container():
                st.text(f"{counter}. {key}: {doc_count} 条")
            counter  = 1

激活 python 虚拟环境并安装必要环境:

代码语言:javascript复制
conda activate es_vector
pip install pypinyin streamlit

启动 streamlit 页面服务:

代码语言:javascript复制
cd /root/tencent-es_vector/
streamlit run vector_search.py

访问返回的公网地址,进行向量测试。

检索效果测试

我们模拟用户在商城搜索栏输入一个手机型号:小米 12 pro max

● 向量检索结果可能会召回不相关的内容

● 而使用 ES 的混合检索,利用前置过滤,在提高效率的同时,可以大幅提升召回率

● ES 也支持在在混合检索场景使用聚合查询

10. 总结

从检索效果可以直观看出,使用纯向量检索,往往是达不到业务需求的。如果想提升召回率,则需要配合混合检索,不仅可以提前过滤一些不相关的内容,对性能也有一定提升。

0 人点赞