高级RAG技术第1部分:数据处理

2024-08-19 22:41:54 浏览数 (1)

最近的论文《搜索增强生成中的最佳实践》通过实证研究评估了各种增强RAG技术的效果,旨在汇聚一套RAG的最佳实践。

Wang PipelineWang Pipeline

由Wang及其同事推荐的RAG管道。

我们将实现一些这些最佳实践,特别是那些旨在提高搜索质量的技术(句子分块、HyDE、反向打包)

为了简洁起见,我们将省略那些专注于提高效率的技术(查询分类和摘要生成)

我们还将实现一些未涉及但我个人认为有用且有趣的技术(元数据包含、复合多字段嵌入、查询扩展)

最后,我们将进行一个简短的测试,以查看我们的搜索结果和生成的答案是否比基线有所改进。让我们开始吧!


概述

RAG旨在通过从外部知识库中检索信息来增强LLM(大语言模型)的生成答案。通过提供领域特定的信息,LLM可以快速适应其训练数据范围之外的用例;这比微调便宜得多,也更容易保持最新。

改善RAG质量的措施通常集中在两个方面:

  1. 提高知识库的质量和清晰度。
  2. 改进搜索查询的覆盖范围和具体性。

这两种措施将提高LLM访问相关事实和信息的几率,从而减少幻觉或依赖其自身可能过时或无关的知识。

这些方法的多样性难以在几句话中澄清。让我们直接进入实现,以便更清楚地理解。

Han PipelineHan Pipeline

图1:作者使用的RAG管道。


目录

  1. 设置
  2. 文档的摄取、处理和嵌入
    1. 数据摄取
    2. 句子级别、基于令牌的分块
    3. 元数据包含与生成
    4. 复合多字段嵌入
  3. 附录
    1. 定义

猫咪休息


设置

所有代码可以在 Searchlabs仓库中找到

首先,你需要以下内容:

  1. 一个Elastic云部署
  2. 一个LLM API - 我们在此笔记本中使用的是Azure OpenAI上的GPT-4o部署
  3. Python版本3.12.4或更高版本

我们将从main.ipynb笔记本运行所有代码。

继续git clone这个仓库,导航到supporting-blog-content/advanced-rag-techniques,然后运行以下命令:

代码语言:bash复制
# 创建一个名为'rag_env'的新虚拟环境
python -m venv rag_env
# 激活虚拟环境(对于基于Unix的系统)
source rag_env/bin/activate
# (对于Windows)
.rag_envScriptsactivate
# 安装requirements.txt中列出的包
pip install -r requirements.txt

完成后,创建一个.env文件并填写以下字段(参考.env.example)。感谢我的合著者Claude-3.5的有用评论。

代码语言:bash复制
# Elastic Cloud: 在Elastic Cloud控制台的“Deployment”页面找到
ELASTIC_CLOUD_ENDPOINT=""
ELASTIC_CLOUD_ID=""
# Elastic Cloud: 在部署设置期间或在“Security”设置中创建
ELASTIC_USERNAME=""
ELASTIC_PASSWORD=""
# Elastic Cloud: 在Kibana或通过API创建的索引名称
ELASTIC_INDEX_NAME=""
# Azure AI Studio: 在Azure OpenAI资源的“Keys and Endpoint”部分找到
AZURE_OPENAI_KEY_1=""
AZURE_OPENAI_KEY_2=""
AZURE_OPENAI_REGION=""
AZURE_OPENAI_ENDPOINT=""
# Azure AI Studio: 在Azure OpenAI资源的“Deployments”部分找到
AZURE_OPENAI_DEPLOYMENT_NAME=""
# 使用BAAI/bge-small-en-v1.5,因为我认为它在资源效率和性能之间取得了良好的平衡。
HUGGINGFACE_EMBEDDING_MODEL="BAAI/bge-small-en-v1.5"

接下来,我们将选择要摄取的文档,并将其放置在documents文件夹中。对于本文,我们将使用Elastic N.V. 2023年年度报告。这是一个相当具有挑战性和密集的文档,非常适合压力测试我们的RAG技术。

Han PipelineHan Pipeline

Elastic 2023年年度报告

现在一切准备就绪,让我们开始进行摄取。打开main.ipynb并执行前两个单元格以导入所有包并初始化所有服务。

返回顶部


文档的摄取、处理和嵌入

数据摄取

  • 个人注释:LlamaIndex的便利性让我惊叹不已。在没有LLMs和LlamaIndex的旧时代,摄取各种格式的文档是一个痛苦的过程,需要从各处收集晦涩的包。现在,它只需一个函数调用。真是太神奇了。

SimpleDirectoryReader将加载directory_path中的所有文档。对于.pdf文件,它返回一个文档对象列表,我将其转换为Python字典,因为我发现它们更容易处理。

代码语言:python代码运行次数:0复制
# llamaindex_processor.py
from llama_index.core import SimpleDirectoryReader

class LlamaIndexProcessor:
    def __init__(self):
        pass

    def load_documents(self, directory_path):
        '''加载目录中的所有文档'''
        reader = SimpleDirectoryReader(input_dir=directory_path)
        return reader.load_data()

# main.ipynb
llamaindex_processor = LlamaIndexProcessor()
documents = llamaindex_processor.load_documents('./documents/')
documents = [dict(doc_obj) for doc_obj in documents]

每个字典包含text字段中的关键内容。它还包含一些有用的元数据,例如页码、文件名、文件大小和类型。

代码语言:python代码运行次数:0复制
{
  'id_': '5f76f0b3-22d8-49a8-9942-c2bbab14f63f',
  'metadata': {
    'page_label': '5',
    'file_name': 'Elastic_NV_Annual-Report-Fiscal-Year-2023.pdf',
    'file_path': '/Users/han/Desktop/Projects/truckasaurus/documents/Elastic_NV_Annual-Report-Fiscal-Year-2023.pdf',
    'file_type': 'application/pdf',
    'file_size': 3724426,
    'creation_date': '2024-07-27',
    'last_modified_date': '2024-07-27'
  },
  'text': '目录n页码n第一部分n项目1. 业务 3n15 项目1A. 风险因素n项目1B. 未解决的员工意见 48n项目2. 物业 48n项目3. 法律诉讼 48n项目4. 矿山安全披露 48n第二部分n项目5. 登记人普通股的市场、相关股东事项和发行人股票购买 49n项目6. [保留] 49n项目7. 财务状况和经营成果的管理层讨论与分析 50n项目7A. 关于市场风险的定量和定性披露 64n项目8. 财务报表和补充数据 66n项目9. 关于会计和财务披露的会计师变更和分歧 100n100n101 项目9A. 控制和程序n项目9B. 其他信息n项目9C. 关于防止检查的外国司法管辖区的披露 101n第三部分n102n102n102n102 项目10. 董事、高级管理人员和公司治理n项目11. 高级管理人员薪酬n项目12. 某些受益所有人和管理层的证券持有情况和相关股东事项n项目13. 某些关系和相关交易及董事独立性n项目14. 主要会计师费用和服务 102n第四部分n103n105 项目15. 附件和财务报表附表n项目16. 10-K表格摘要n签名 106ni',
  ...
}

返回顶部

句子级别、基于令牌的分块

首先,我们需要将文档减少到标准长度的块(以确保一致性和可管理性)。嵌入模型有唯一的令牌限制(它们可以处理的最大输入大小)。令牌是模型处理的基本文本单位。为了防止信息丢失(截断或遗漏内容),我们应提供不超过这些限制的文本(通过将较长的文本拆分为较小的段)。

分块对性能有显著影响。理想情况下,每个块都应代表一个自包含的信息块,捕捉到单个主题的上下文信息。分块方法包括基于词汇的分块,其中文档按词数拆分,以及语义分块,它使用LLM识别逻辑断点。

基于词汇的分块便宜、快速且简单,但有可能拆分句子,从而破坏上下文。语义分块变得缓慢且昂贵,特别是如果你处理像116页的Elastic年度报告这样的文档。

让我们选择一种折中的方法。句子级分块仍然简单,但比基于词汇的分块更有效地保留上下文,同时成本和速度显著降低。此外,我们将实现一个滑动窗口,以捕捉周围的一些上下文,缓解拆分段落的影响。

代码语言:python代码运行次数:0复制
# chunker.py
import uuid
import re

class Chunker:
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer

    def split_into_sentences(self, text):
        """将文本拆分成句子。"""
        return re.split(r'(?<=[.!?])s ', text)

    def sentence_wise_tokenized_chunk_documents(self, documents, chunk_size=512, overlap=20, min_chunk_size=50):
        '''
        1. 将文本拆分成句子。
        2. 使用提供的分词器方法进行分词。
        3. 构建最多chunk_size限制的块。
        4. 基于令牌创建重叠 - 保留上下文。
        5. 只保留符合最小令牌大小要求的块。
        '''
        chunked_documents = []

        for doc in documents:
            sentences = self.split_into_sentences(doc['text'])
            tokens = []
            sentence_boundaries = [0]

            # 分词所有句子并跟踪句子边界
            for sentence in sentences:
                sentence_tokens = self.tokenizer.encode(sentence, add_special_tokens=True)
                tokens.extend(sentence_tokens)
                sentence_boundaries.append(len(tokens))

            # 创建块
            chunk_start = 0
            while chunk_start < len(tokens):
                chunk_end = chunk_start   chunk_size

                # 找到块中合适的最后一个完整句子
                sentence_end = next((i for i in sentence_boundaries if i > chunk_end), len(tokens))
                chunk_end = min(chunk_end, sentence_end)

                # 创建块
                chunk_tokens = tokens[chunk_start:chunk_end]

                # 检查块是否符合最小大小要求
                if len(chunk_tokens) >= min_chunk_size:
                    # 为此块创建一个新的文档对象
                    chunk_doc = {
                        'id_': str(uuid.uuid4()),
                        'chunk': chunk_tokens,
                        'original_text': self.tokenizer.decode(chunk_tokens),
                        'chunk_index': len(chunked_documents),
                        'parent_id': doc['id_'],
                        'chunk_token_count': len(chunk_tokens)
                    }
                    # 从原始文档复制所有其他字段
                    for key, value in doc.items():
                        if key != 'text' and key not in chunk_doc:
                            chunk_doc[key] = value
                    chunked_documents.append(chunk_doc)

                # 移动到下一个块开始,考虑重叠
                chunk_start = max(chunk_start   chunk_size - overlap, chunk_end - overlap)

        return chunked_documents

# main.ipynb
# 初始化嵌入模型
HUGGINGFACE_EMBEDDING_MODEL = os.environ.get('HUGGINGFACE_EMBEDDING_MODEL')
embedder = EmbeddingModel(model_name=HUGGINGFACE_EMBEDDING_MODEL)

# 初始化分块器
chunker = Chunker(embedder.tokenizer)

Chunker类接收嵌入模型的分词器来编码和解码文本。我们现在将构建每个512个令牌的块,重叠20个令牌。为此,我们将文本拆分成句子,对这些句子进行分词,然后将分词后的句子添加到当前块中,直到无法再添加而不超过令牌限制。

最后,将句子解码回原始文本进行嵌入,并将其存储在名为original_text的字段中。块存储在名为chunk的字段中。为了减少噪音(即无用的文档),我们将丢弃任何小于50个令牌的文档。

让我们在我们的文档上运行它:

代码语言:python代码运行次数:0复制
chunked_documents = chunker.sentence_wise_tokenized_chunk_documents(documents, chunk_size=512)

并得到类似这样的文本块:

代码语言:python代码运行次数:0复制
print(chunked_documents[4]['original_text'])
[CLS] the aggregate market value of the ordinary shares held by non - affiliates of the registrant, based on the closing price of the shares of ordinary shares on the new york stock exchange on october 31, 2022 ( the last business day of the registrant ’ s second fiscal quarter ), was approximately $ 6. 1 billion. [SEP] [CLS] as of may 31, 2023, the registrant had 97, 390, 886 ordinary shares, par value €0. 01 per share, outstanding. [SEP] [CLS] documents incorporated by reference portions of the registrant ’ s definitive proxy statement relating to the registrant ’ s 2023 annual general meeting of shareholders are incorporated by reference into part iii of this annual ......

返回顶部

元数据包含与生成

我们已经对文档进行了分块。现在是时候丰富数据了。我想生成或提取额外的元数据。这些额外的元数据可以用于影响和增强搜索性能。

我们将定义一个DocumentEnricher类,其作用是接收一个文档列表(Python字典)和一个处理函数列表。这些函数将在文档的original_text列上运行,并将其输出存储在新字段中。

首先,我们使用TextRank提取关键短语。TextRank是一种基于图的算法,通过根据单词之间的关系对它们的重要性进行排序,从文本中提取关键短语和句子。

接下来,我们使用GPT-4o生成潜在问题。

最后,我们使用Spacy提取实体。

由于每个文件的代码都相当冗长且复杂,我将在这里避免重复。如果你有兴趣,文件在下面的代码示例中标记。

让我们运行数据丰富化:

好的,我会根据你的需求对这篇文章进行优化,使其更加适合初学者阅读,并确保表达清晰流畅。

代码语言:python代码运行次数:0复制
# documentenricher.py
from tqdm import tqdm

class DocumentEnricher:
    def __init__(self):
        pass

    def enrich_document(self, documents, processors, text_col='text'):
        for doc in tqdm(documents, desc="Enriching documents using processors: "   str(processors)):
            for (processor, field) in processors:
                metadata = processor(doc[text_col])
                if isinstance(metadata, list):
                    metadata = 'n'.join(metadata)
                doc.update({field: metadata})

# main.ipynb
# 初始化处理器类
nltkprocessor = NLTKProcessor()  # nltk_processor.py
entity_extractor = EntityExtractor()  # entity_extractor.py
gpt4o = LLMProcessor(model='gpt-4o')  # llm.py

# 初始化文档增强器
documentenricher = DocumentEnricher()

# 在文档中创建新的字段 - 这些是处理器函数的输出。
processors = [
    (nltkprocessor.textrank_phrases, "keyphrases"),
    (gpt4o.generate_questions, "potential_questions"),
    (entity_extractor.extract_entities, "entities")
]

# .enrich_document() 将会直接修改 chunked_docs。
# 为了查看结果,我们将在接下来的几个单元中打印 chunked_docs!
documentenricher.enrich_document(chunked_docs, text_col='original_text', processors=processors)

让我们看看结果:

TextRank 提取的关键词

这些关键词代表了段落的核心主题。如果查询与网络安全有关,这段内容的评分将会提高。

代码语言:javascript复制
print(chunked_documents[25]['keyphrases'])
'elastic agent stop', 'agent stop malware', 'stop malware ransomware', 'malware ransomware environment', 'ransomware environment wide', 'environment wide visibility', 'wide visibility threat', 'visibility threat detection', 'sep cl key', 'cl key feature'
GPT-4o 生成的潜在问题

这些潜在问题可能会直接匹配用户查询,从而提高评分。我们提示 GPT-4o 生成可以用当前段落信息回答的问题。

代码语言:javascript复制
print(chunked_documents[25]['potential_questions'])
1. Elastic Agent 在网络安全方面的主要功能是什么?
2. 描述 Logstash 如何在 IT 环境中贡献数据管理。
3. 列出并解释文档中提到的 Logstash 的关键特性。
4. Elastic Agent 如何增强威胁检测中的环境可见性?
5. Logstash 提供哪些超越简单数据收集的功能?
6. 文档中如何建议 Elastic Agent 阻止恶意软件和勒索软件?
7. 能否识别 Elastic Agent 和 Logstash 在集成环境中的功能关系?
8. Elastic Agent 的高级威胁检测能力对组织安全政策有何影响?
9. 比较和对比 Elastic Agent 和 Logstash 的描述功能。
10. Logstash 的集中收集能力如何支持 Elastic Agent 的威胁检测能力?
Spacy 提取的实体

这些实体类似于关键词,但捕捉组织和个人的名字,而关键词提取可能会遗漏这些信息。

代码语言:javascript复制
print(chunked_documents[29]['entities'])
'appdynamics', 'apm data', 'azure sentinel', 'microsoft', 'mcafee', 'broadcom', 'cisco', 'dynatrace', 'coveo', 'lucidworks'

返回顶部

复合多字段嵌入

现在我们已经用额外的元数据丰富了文档,可以利用这些信息创建更强大和上下文感知的嵌入。

让我们回顾一下当前的处理进展。我们在每个文档中有四个感兴趣的字段。

代码语言:javascript复制
{
    "chunk": "...",
    "keyphrases": "...",
    "potential_questions": "...",
    "entities": "..."
}

每个字段代表了文档上下文的不同视角,可能突出 LLM 需要关注的关键区域。

Han PipelineHan Pipeline

元数据增强流程

计划是对每个字段进行嵌入,然后创建这些嵌入的加权和,称为复合嵌入。

希望这个复合嵌入能使系统更加上下文感知,并引入另一个可调超参数以控制搜索行为。

首先,让我们对每个字段进行嵌入,并使用我们在 main.ipynb 中定义的嵌入模型更新每个文档。

代码语言:javascript复制
# 在 embedding_model.py 中定义的嵌入模型
embedder = EmbeddingModel(model_name=HUGGINGFACE_EMBEDDING_MODEL)
cols_to_embed = ['keyphrases', 'potential_questions', 'entities']
embedding_cols = []

for col in cols_to_embed:
    # 处理文本输入
    embedding_col = embedder.embed_documents_text_wise(chunked_documents, text_field=col)
    embedding_cols.append(embedding_col)

# 处理 token 输入
embedding_col = embedder.embed_documents_token_wise(chunked_documents, token_field="chunk")
embedding_cols.append(embedding_col)

每个嵌入函数返回嵌入的字段,该字段是原始输入字段加上 _embedding 后缀。

现在让我们定义复合嵌入的权重:

代码语言:javascript复制
embedding_cols = [
    'keyphrases_embedding',
    'potential_questions_embedding',
    'entities_embedding',
    'chunk_embedding'
]

combination_weights = [
    0.1,
    0.15,
    0.05,
    0.7
]

权重允许你根据使用情况和数据质量为每个组件分配优先级。直观上,这些权重的大小取决于每个组件的语义价值。由于 chunk 文本本身最为丰富,我分配了 70% 的权重。因为实体是最小的,只是组织或个人名称的列表,所以我分配了 5% 的权重。这些值的精确设置需要根据具体使用情况进行实证确定。

最后,让我们编写一个函数来应用这些权重,并创建我们的复合嵌入。同时删除所有组件嵌入以节省空间。

代码语言:javascript复制
from tqdm import tqdm

def combine_embeddings(objects, embedding_cols, combination_weights, primary_embedding='primary_embedding'):
    # 确保权重数量与嵌入列数量匹配
    assert len(embedding_cols) == len(combination_weights), "嵌入列数量必须与权重数量匹配"
    
    # 归一化权重使其总和为 1
    weights = np.array(combination_weights) / np.sum(combination_weights)
    
    for obj in tqdm(objects, desc="Combining embeddings"):
        # 初始化复合嵌入
        combined = np.zeros_like(obj[embedding_cols[0]])
        
        # 计算加权和
        for col, weight in zip(embedding_cols, weights):
            combined  = weight * np.array(obj[col])
        
        # 将新的复合嵌入添加到对象中
        obj.update({primary_embedding: combined.tolist()})
        
        # 删除原始嵌入列
        for col in embedding_cols:
            obj.pop(col, None)

combine_embeddings(chunked_documents, embedding_cols, combination_weights)

至此,我们完成了文档处理。现在我们有一个文档对象列表,它们的结构如下:

代码语言:javascript复制
{
    'id_': '7fe71686-5cd0-4831-9e79-998c6dbeae0c',
    'chunk': [2312, 14613, ...],
    'original_text': 'if an emerging growth company, indicate by check mark if the registrant has elected not to use the extended ...',
    'chunk_index': 3,
    'chunk_token_count': 399,
    'metadata': {
        'page_label': '3',
        'file_name': 'Elastic_NV_Annual-Report-Fiscal-Year-2023.pdf',
        ...
    },
    'keyphrases': 'sep cl unkncheck mark registrantncl unk indicatenunk indicate checknindicate check marknprincipal executive officenaccelerate filer unkncompany unk emergenunk emerge growthnemerge growth company',
    'potential_questions': '1. What are the different types of registrant statuses mentioned in the document?n2. Under what section of the Sarbanes-Oxley Act must registrants file a report on the effectiveness of their internal ...',
    'entities': 'the effectiveness ofnsection 13nSEPnUNKnsection 21en1934n1933nu. s. c.nsection 404nsection 12nal',
    'primary_embedding': [-0.3946287803351879, -0.17586839850991964, ...]
}

索引到 Elastic

让我们将文档批量上传到 Elasticsearch。为此,我早在 elastic_helpers.py 中定义了一组 Elastic 辅助函数。这是一段很长的代码,所以我们只看函数调用。

es_bulk_indexer.bulk_upload_documents 适用于任何字典对象列表,利用 Elasticsearch 的动态映射。

代码语言:javascript复制
# 初始化 Elasticsearch
ELASTIC_CLOUD_ID = os.environ.get('ELASTIC_CLOUD_ID')
ELASTIC_USERNAME = os.environ.get('ELASTIC_USERNAME')
ELASTIC_PASSWORD = os.environ.get('ELASTIC_PASSWORD')
ELASTIC_CLOUD_AUTH = (ELASTIC_USERNAME, ELASTIC_PASSWORD)

es_bulk_indexer = ESBulkIndexer(cloud_id=ELASTIC_CLOUD_ID, credentials=ELASTIC_CLOUD_AUTH)
es_query_maker = ESQueryMaker(cloud_id=ELASTIC_CLOUD_ID, credentials=ELASTIC_CLOUD_AUTH)

# 定义索引名称
index_name = os.environ.get('ELASTIC_INDEX_NAME')

# 创建索引并批量上传
index_exists = es_bulk_indexer.check_index_existence(index_name=index_name)
if not index_exists:
    logger.info(f"Creating new index: {index_name}")
    es_bulk_indexer.create_es_index(es_configuration=BASIC_CONFIG, index_name=index_name)

success_count = es_bulk_indexer.bulk_upload_documents(
    index_name=index_name,
    documents=chunked_documents,
    id_col='id_',
    batch_size=32
)

前往 Kibana 验证所有文档已被索引。应该有 224 个文档。对于如此大的文档来说,这个结果还不错!

Han PipelineHan Pipeline

在 Kibana 中索引的年度报告文档

0 人点赞