Elasticsearch案例:百行代码实现腾讯ES帮助文档的RAG

2023-11-02 20:16:41 浏览数 (2)

随着搜索引擎技术的不断发展,我们对于查询的需求也日益提高。传统的关键词搜索已经无法满足用户对于查询准确性和效率的要求。为此,我们引入了语义搜索技术。通过使用先进的自然语言处理(NLP)技术,语义搜索能够更好地理解用户的查询意图,并返回更相关的搜索结果。而随着机器学习技术的持续发展,特别是chatGPT等生成式大模型的火爆,一个新的技术方向应运而生 —— RAG。

但是真正的理解什么是RAG并不容易,实现RAG就更难。现状是大多数时候用户会简单地把实现RAG理解为在企业中加入一个向量数据库。但RAG是一个复杂的概念,它不仅仅是一个向量数据库,实现RAG需要对业务场景有深入的理解,并且需要进行大量的数据处理和算法优化,用户的行为的理解和反馈也是最终效果达成的重要关键。因此,我们需要的更多地是一个解决方案,而非仅仅一个高性能的数据库。

本文将通过在腾讯云的Elasticsearch文档上实现RAG,来向大家展示如何Elasticsearch上通过百行代码来实现最终的效果。而这也正是一个完整解决方案与一个向量库之间的最大区别。

什么是RAG?

RAG是 Retrieval Augmented Generation 的缩写,意思是检索增强生成。它是一种利用大语言模型(LLM)和 Elasticsearch等搜索引擎,从海量的文本数据中检索出相关的信息,然后结合这些信息生成新的文本的方法。RAG可以用于实现多种应用,如知识问答、文本摘要、对话生成等。

RAG最终效果的达到,关键在于准确的理解用户的问题,准确且丰富的上下文数据,优秀的生成式大模型,合适的prompt等。

RAG系统架构RAG系统架构

为什么要做RAG?

以腾讯云的ES帮助文档为例,目前的文本检索方式存在以下问题:

  • 词汇不匹配(lexical mismatch):当文档和查询使用不同的词语来表达相同或相似的意思时,传统的搜索方法可能无法找到相关的文档。比如,“logstash”和“ls”在语境中是一个意思,但是它们在词汇上是不同的。
通过ls无法找到正确信息通过ls无法找到正确信息
  • 语义不匹配(semantic mismatch):当文档和查询使用相同或相似的词语来表达不同或不相关的意思时,传统的搜索方法可能会找到不相关的文档。比如,“Elastic”可以指弹性,也可以指代一家科技公司,但是它们在语义上是不同的。
  • 语言不匹配(language mismatch):当文档和查询使用不同的语言时,传统的搜索方法可能无法找到相关的文档。比如,“Serverless架构”和“无服务器架构”都可以指代同一种架构,但是它们在语言上是不同的。

因此,我们需要通过语义搜索的方式解决以上问题,找到对应的文档。

但客户实际的需求可能会更复杂,客户的问题可能是:

  • 无服务器es如何收费?请给我一个详细的例子
  • 如何销毁ls实例,给我具体的步骤

在这种需要学习文档并给出指导性意见的场景中,仅仅依靠全文检索、语义搜索或混合搜索是无法达到令人满意的效果的。为了实现这一目标,我们需要结合大模型,并在企业私有数据的基础上,通过检索增强生成(RAG)来提升效率和准确性,并给出指导性的步骤

为什么要使用Elasticsearch进行RAG?

Elasticsearch是一个开源的分布式搜索引擎,能够高效地存储、搜索和分析大量的结构化和非结构化数据。它具有强大的全文搜索功能,并支持各种查询类型,如匹配、短语、范围和布尔查询。Elasticsearch还提供了多种分析功能,如聚合、统计、排序和分页。自2019年以来,Elasticsearch已经发展成为一个包含了实现 RAG 所需的所有功能的综合性引擎,从向量存储搜索向量生成混合搜索排序,甚至还包括了专有的向量模型

不仅如此,Elasticsearch platform远远超越了传统的搜索引擎的功能。它是一个功能丰富且全面的解决方案,能够满足用户在数据采集、搜索UI设计、用户行为分析和后台运营调整等方面的需求。比如,在本例子中,我们将通过Elasticsearch platform提供的企业搜索功能,轻松对腾讯的ES帮助文档进行采集,并通过内容分析,快速调整和获得我们所期望的结果。

Elasticsearch platform不仅仅是一个搜索引擎,更不仅仅是一个向量库。它是一个全面的解决方案,旨在帮助用户最终达到他们的目标。借助这一综合性的解决方案,我们能够在短时间内快速构建面向用户的RAG应用,并直接获得用户的反馈。与其将大量时间和精力花费在调用、学习和PoC阶段,不如让我们更专注于实现目标

如何使用Elasticsearch进行RAG?

为了给大家展示完整的使用Elasticsearch进行RAG的过程,我们将通过腾讯云Elasticsearch Service和chatGPT,在腾讯云的ES帮助文档实现一个案例。

我们需要完成以下几个步骤:

  1. 创建一个Elasticsearch集群,并配置相关的参数和插件。
  2. 收集并了解我们的数据,在本例中,腾讯云ES帮助文档将作为我们的知识库,用来研究如何通过语义搜索并结合大模型来增强
  3. 处理数据,使其能进行语义搜索。使用BAAI/bge-base-zh模型将文本转换为向量,并将向量和文本一起存储到Elasticsearch中。
  4. 使用Elasticsearch提供的API或者Kibana工具来进行向量 文本的混合搜索。
  5. 使用大模型提供的API或者SDK来进行RAG的对话生成。

下面我们将详细介绍每个步骤的具体操作。

创建并配置Elasticsearch集群

我们可以使用腾讯云提供的Elasticsearch Service(ES)来快速部署、轻松管理、按需扩展我们的集群。ES是基于开源搜索引擎Elasticsearch打造的高可用、可伸缩的云端全托管的Elasticsearch服务,包含Kibana及常用插件,并集成了安全、SQL、机器学习、告警、监控等高级特性(X-Pack)。而最新的8.8.1版本上,已经包含了实现今天用例所需的所有功能。

要创建一个ES集群,我们需要先登录腾讯云控制台,然后在顶部导航栏中选择【云产品】>【数据分析】>【Elasticsearch Service】。在ES控制台中,我们可以看到已有的集群列表,如果没有集群,我们可以点击【新建】按钮来创建一个新的集群。

注意,因为部署 embedding 模型的需要,我们尽量所选足够的内存。要实现本例中的内容,建议选择配置8G及以上内存的数据节点。

部署了bge和e5模型的ES节点部署了bge和e5模型的ES节点

收集并了解我们的数据

要在具体的场景下实现RAG,带给我们的第一个问题是我们要如何收集该场景下可能会用到的数据。很不幸的是,这些数据大多数时候并非是管理良好的结构化数据,比如最新的监管文档,企业内部的员工手册,传感器数据和日志,促销记录等。这些数据可能是放在wiki中,放在网盘里,发在邮件里,存在日志里,或者分散在多个不同的数据库中。如何采集它们,理解它们,并且如何将其转换为搜索比重不同的字段,就会存在很大的问题,而且是留给开发者自己处理的问题

RAG不同的使用场景RAG不同的使用场景

以腾讯云的ES帮助文档为例子,文档是以 markdown 的格式存储在 github 上的。并且会随时更新。在其上做 RAG ,你需要联系对应的业务团队,获取数据的同步权限,而且还需要定期同步更新。另外,这是一个典型的半结构化数据,虽然有 HTML 和 Markdown 的标签,但实际上,每一篇文章的内容和格式是不固定的,如何提取合适的 “context window”,需要我们在搜索引擎中重新有效的组织数据。

面对这些问题,Elasticsearch platform上的企业搜索功能,为我们最大程度的减少了对应的工作。从沟通,到采集,以及信息的提取,再到数据的理解

视频内容

为了减少阅读的时间,这里制作了一个短视频,方便大家快速了解,我们是如何通过爬虫,抓取页面数据的,以及抓取下来的数据,如何通过管道进行处理和清洗。

完成数据的采集,以及简单的处理后,我们可以直接在 Kibana 中了解我们场景中的数据

可以看到,核心的内容包含在 body_contentheadingstitle 等三个字段中。如果要以语义搜索的方式来支持 RAG,那么,需要为这三个字段生成向量,并且需保留原始的文本字段用于页面展示引用,并发送给大模型。

特别是其中的headings字段(该字段由爬虫自动生成),包含了文档中所有章节的标题。这种有效的概括和总结性的文字,特别适合在不进行chunk、不分段存储的情况下,作为文本内容的向量表示,比如:

代码语言:json复制
"headings":"集群规格和容量配置评估#本页目录:#存储容量评估#计算资源评估#实例类型选择及测试#分片数量评估"

而剩余的字段中,url 则可用于链接到源文档。剩下的字段均可删除。

而因为是数组类型,需要转化为 text 才能被转化为向量,因此,通过以下数据管道后:

代码语言:javascript复制
PUT _ingest/pipeline/search-tencent-es-doc@custom
{
  "version": 1,
  "description": "Enterprise Search customizable ingest pipeline for the 'search-tencent-es-doc' index",
  "processors": [
    {
      "rename": {
        "field": "doc_content",
        "target_field": "body_content",
        "ignore_missing": true
      }
    },
    {
      "join": {
        "field": "headings",
        "separator": "#"
      }
    },
    {
      "remove": {
        "keep": [
          "headings",
          "body_content",
          "url",
          "title"
        ],
        "ignore_missing": true,
        "description": "移除多余字段"
      }
    }
  ]
}

清洗后的数据为:

处理数据,使其能进行语义搜索

因为在一个 RAG 系统中,用户是通过自然语言对话的方式进行提问的,因此,检索本地数据的引擎必须支持语义搜索。通常,我们可以稀疏向量检索和密集向量检索两种方式来实现语义搜索。Elasticsearch上支持两种方式,并且为稀疏向量检索

提供了ELSER专有模型。但在本例中,我们将使用密集向量检索作为演示。

为此,我们先添加一个机器学习的推理管道,用于向量的生成:

这里,我们使用的模型是 BAAI/bge-base-zh (但实际上,我们可以在Elasticsearch中部署多个 embedding 模型,通过调试及更换模型,在不同的场景上可以获得不同的效果),该模型将输入转化为768维的向量,因此,我们需要定义一个密集向量字段,用以存储该模型生成的数据:

最后的包含向量的数据为:

使用Elasticsearch进行向量 文本的混合搜索

在目前的很多其他教程中,要进行语义搜索还需要结合langChain等封装好的模块工具才能快速的开展语义搜索。而使用Elasticsearch platform,我们只需要短短几行代码就能实现。

注意,到目前为止,我们还没有写任何一行代码,所有的数据采集和清洗,以及数据的向量生成,都可以在UI界面上通过Elasticsearch platform上通过工具来完成。对于一个熟练能使用Elasticsearch platform上各种工具的程序员来说,上面的过程整体花费可能不超过30分钟。这使得我们能够更快速地进入到向量模型和大模型调试的阶段。

而在进行混合搜索的阶段,得益于 Elasticsearch 将向量检索功能与原有的全文检索进行有有效的整合,整个过程也几乎不需要花费太长时间。从代码上看,写一个函数就能进行测试:

代码语言:python代码运行次数:0复制
from elasticsearch import Elasticsearch


# Search ElasticSearch index and return
def search(es, index_name, embedding_model, query_text):

    source_fields = ["body_content", "url", "title"]
    query = {
        "bool": {
            "must": [{
                "match": {
                    "title": {
                        "query": query_text,
                        "boost": 1
                    }
                }
            }]
        }
    }
    knn = [{
        "field": "ml.inference.headings_embeddings.predicted_value",
        "query_vector_builder": {
            "text_embedding": {
                "model_id": embedding_model,
                "model_text": query_text
            }
        },
        "k": 5,
        "num_candidates": 10,
        "boost": 24
    }, {
        "field": "ml.inference.body_content_embeddings.predicted_value",
        "query_vector_builder": {
            "text_embedding": {
                "model_id": embedding_model,
                "model_text": query_text
            }
        },
        "k": 5,
        "num_candidates": 10,
        "boost": 24
    },
    ]
    resp = es.search(
        index= index_name,
        fields=source_fields,
        query=query,
        knn=knn,
        size=3,
        source=False)

    return resp

关于以上代码,重点需要关注的地方在于:

  1. 我们通常需要结合混合搜索的方式才能获得最佳效果
  2. 同时,在执行向量搜索的时候,我们可以在多字段上进行向量检索。
  3. 而我们将embedding_model作为变量传入,使得我们可以在调试的过程中,选择不同的模型组合来查看语义检索的性能,看看哪些文档会被召回
  4. 因为我们需要将检索出来的内容交给大模型来理解,受限于模型 token的限制,我们通常只召回1~3篇文档。这种场景下,精确率比召回率重要!

通过语义搜索,对比一下原来的全文检索,同样是搜索 “如何销毁ls节点” 我们可以查到原先无法获得的信息:

代码语言:javascript复制
es = Elasticsearch(hosts=["https://es-7cu6zx9m.public.tencentelasticsearch.com:9200"],
                       http_auth=(username, password))
print(search(es,"moka-ai__m3e-base","如何销毁ls节点?"))
>>>>>>>>>>>>>
{'took': 79, 'timed_out': False, '_shards': {'total': 2, 'successful': 2, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 24, 'relation': 'eq'}, 'max_score': 47.651337, 
'hits': [{'_index': 'search-tencent-es-doc', '_id': '6542334ead3b4de80cb57864', '_score': 47.651337, 
'fields': {
'body_content': ['操作场景 当 Logstash 实例无法满足您的需求,需要退货时,您可以在 Elasticsearch Service 控制台对实例进行销毁,以避免服务继续运行而产生费用。如果是实例配置无法满足需求,您也可以通过调整实例配置把实例调整到合适的规格,详情可参见 实例扩缩容 。 不同计费模式退费说明 不同计费模式下的实例,销毁实例的条件如下: 预付费包年包月的实例,如果实例还未到期,需要提前销毁时,可参见 包年包月退费 。 后付费按量计费的实例,根据使用量计费,可以随时销毁实例,销毁后,就不再产生费用。 注意 实例被销毁后,数据无法恢复,请谨慎操作。 操作步骤 1. 登录 Elasticsearch Service 控制台 ,在左侧导航栏单击 Logstash 管理 ,进入 Logstash 实例列表页。 ufeff 2. 在实例列表页,选择需要销毁的实例,选择 操作 > 更多 > 销毁 进行销毁;或单击实例 ID/名称 进入实例基本信息页,选择右上角 更多操作 > 销毁 进行销毁。 ufeff 3. 在销毁实例页面中,单击 确定 ,系统将清空实例数据,并回收资源, 数据清空后,无法恢复 。包年包月的费用退还方式,可参见 包年包月退费 。'], 
'title': ['Elasticsearch Service 销毁实例-Logstash 指南-文档中心-腾讯云'], 
'url': ['https://cloud.tencent.com/document/product/845/55114']}},

如果是搜索 “无服务器ES”,也能找到相关的结果,而不会因为语言的原因无法识别 “Serverless” 和 “无服务器” 之间的关系。

使用大模型来进行RAG的对话生成

但正如我们之前说的,客户实际的需求可能会更复杂,客户的问题可能是:

  • 无服务器es如何收费?请给我一个详细的例子
  • 如何销毁ls实例,给我具体的步骤

我们需要把查出来的文档交给大模型进行指导性内容的生成,受限于大模型 token的限制,我们不可能无限的把召回的内容交给大模型去理解和学习;同时,召回的不相关的文档,也可能会导致产生幻觉;而根据场景,设定良好的prompt,也是大模型能够按照我们的期望为我们提供答案的关键。

因此,其实现如下:

代码语言:python代码运行次数:0复制
def truncate_text(text, max_tokens):
    tokens = text.split()
    if len(tokens) <= max_tokens:
        return text

    return ' '.join(tokens[:max_tokens])


# Generate a response from ChatGPT based on the given prompt
def chat_gpt(prompt, model="gpt-3.5-turbo", max_tokens=1024, max_context_tokens=4000, safety_margin=5):
    # Truncate the prompt content to fit within the model's context length
    truncated_prompt = truncate_text(prompt, max_context_tokens - max_tokens - safety_margin)
    print(truncated_prompt)

    response = openai.ChatCompletion.create(engine="gpt-35-turbo",
                                            messages=[{"role": "system",
                                                       "content": "你现在是腾讯云Elasticsearch Service的专家."},
                                                      {"role": "user", "content": truncated_prompt}])

    return response["choices"][0]["message"]["content"]
    
negResponse = "根据检索出来的文档,我无法回答这个问题。"

resp = search(es,embedding_model,query)
retrival_result = []
for hit in resp['hits']['hits']:
    #避免产生幻觉
    if hit['score'] > 30:
        retrival_result.append(hit['fields']['body_content'][0])
# 组合召回的内容,也可以只选择 top 1        
combine_result = ";".join(retrival_results)
print(combine_result)

prompt = f"回答此问题: {query}n 如果所提供的文档中没有答案,请回复: '{negResponse}'n 回答时,参考来自腾讯云ES的帮助文档: {combine_result} "
answer = chat_gpt(prompt)

关于以上代码,重点需要关注的地方在于:

  • 要注意大模型的限制,所以定义了truncate_text()函数与max_context_tokens等参数避免因超过token而调用失败
  • 限制交给大模型的上下文信息。通过if hit['score']>30避免不相关的文档误导大模型的理解
  • 设置合理的Prompt,让大模型理解自己的角色,以及角色该如何处理问题:
    • "role":"system","content":"你现在是腾讯云Elasticsearch Service的专家."
    • "role":"user","content":""回答此问题: {query}n 如果所提供的文档中没有答案,请回复: '{negResponse}'n 回答时,参考来自腾讯云ES的帮助文档: {combine_result} "

简单的运行上面的代码,我们可以得到如下结果:

上图中,我们看到,通过提问 “无服务器es如何收费?给我一个详细的例子”,我们得到了一个详细的,由大模型生成的例子。成功实现 RAG!

结合 streamlit 等简单的 python 框架,我们可以非常轻松的构建出一个腾讯云Elasticsearch Service的智能助手程序:

如果大家想体验,可以移步尝试。

总结

在本文中,我们介绍了如何通过 Elasticsearch Platform 快速搭建一个检索增强生成的人工智能助手。通过该平台的数据采集、数据清洗、数据分析等功能,以及向量生成、向量存储、向量检索等向量库的功能,再结合原有的全文检索能力,我们可以快速实现与大模型的结合,只需区区百行代码即可完成一个带用户界面的人工智能助手。

不仅仅是玩票性质的应用开发,对于规模更大的企业级生产环境来说,Elasticsearch Platform 能够带来更高的效能和更稳定的表现。多年的历程使得 Elasticsearch 经过了千锤百炼的考验,成为一个值得信赖的平台。

在未来,我们将继续优化 Elasticsearch Platform,以满足企业级生产环境的更多需求,并为用户提供更好的使用体验。我们将不断努力,为人工智能助手的开发和应用带来更多创新和便利。

附录

代码语言:python代码运行次数:0复制
import os
import streamlit as st
import openai
from elasticsearch import Elasticsearch

openai.api_key = os.environ['openai_api']
username = os.getenv('ES_USERNAME', 'ERROR')
password = os.getenv('ES_PASSWORD', 'ERROR')

# Search ElasticSearch index and return body and URL of the result
def search(es, index_name, embedding_model, query_text):

    source_fields = ["body_content", "url", "title"]
    query = {
        "bool": {
            "must": [{
                "match": {
                    "title": {
                        "query": query_text,
                        "boost": 1
                    }
                }
            }]
        }
    }
    knn = [{
        "field": "ml.inference.headings_embeddings.predicted_value",
        "query_vector_builder": {
            "text_embedding": {
                "model_id": embedding_model,
                "model_text": query_text
            }
        },
        "k": 5,
        "num_candidates": 10,
        "boost": 24
    }, {
        "field": "ml.inference.body_content_embeddings.predicted_value",
        "query_vector_builder": {
            "text_embedding": {
                "model_id": embedding_model,
                "model_text": query_text
            }
        },
        "k": 5,
        "num_candidates": 10,
        "boost": 24
    },
    ]
    resp = es.search(
        index=index_name,
        fields=source_fields,
        query=query,
        knn=knn,
        size=3,
        source=False)
    return resp

def truncate_text(text, max_tokens):
    tokens = text.split()
    if len(tokens) <= max_tokens:
        return text
    return ' '.join(tokens[:max_tokens])

# Generate a response from ChatGPT based on the given prompt
def chat_gpt(prompt, model="gpt-3.5-turbo", max_tokens=1024, max_context_tokens=4000, safety_margin=5):
    # Truncate the prompt content to fit within the model's context length
    truncated_prompt = truncate_text(prompt, max_context_tokens - max_tokens - safety_margin)

    response = openai.ChatCompletion.create(engine="gpt-35-turbo",
                                            messages=[{"role": "system",
                                                       "content": "你现在是腾讯云Elasticsearch Service的专家."},
                                                      {"role": "user", "content": truncated_prompt}])

    return response["choices"][0]["message"]["content"]


st.title("腾讯云ES文档 - RAG")
embedding_model = st.radio(
    "你想用哪个embedding模型?",["baai__bge-base-zh", "moka-ai__m3e-base", "e5-base"],index=0
)

# Main chat form
with st.form("chat_form"):
    query = st.text_input("You: ")
    submit_button = st.form_submit_button("Send")

es = Elasticsearch(hosts=["https://es-7cu6zx9m.public.tencentelasticsearch.com:9200"],
                       basic_auth=(username, password))

# Generate and display response on form submission
negResponse = "根据检索出来的文档,我无法回答这个问题。"
if submit_button:
    resp = search(es,"search-tencent-es-doc", embedding_model,query)
    retrival_results = []
    for hit in resp['hits']['hits']:
        #避免产生幻觉
        if hit['_score'] > 30:
            retrival_results.append(hit['fields']['body_content'][0])
    combine_result = ";".join(retrival_results)
    print(combine_result)

    prompt = f"回答此问题: {query}n 如果所提供的文档中没有答案,请回复: '{negResponse}'n 回答时,参考来自腾讯云ES的帮助文档: {combine_result} "
    answer = chat_gpt(prompt)

    st.write(f"ChatGPT: {answer.strip()}nn")
    st.write("## 检索出来的帮助文档:")
    for hit in resp['hits']['hits']:
        body = hit['fields']['body_content'][0]
        url = hit['fields']['url'][0]
        title = hit['fields']['title'][0]
        st.write(f"Docs: [{title}]({url})nnDoc content: {body}")

0 人点赞