跳转至

向量存储

学习目标

  • 1.理解向量存储在RAG系统中的功能和重要性。
  • 2.学会创建和管理向量数据库。
  • 3.掌握如何将文本转化为向量并存入数据库。
  • 4.理解混合检索与重排序的实现原理。

vector_store.py是EduRAG系统的核心模块之一,封装了与Milvus向量数据库的交互逻辑。它负责将文档转化为向量存储到数据库中,并提供高效的混合检索功能。通过结合BGE-M3嵌入模型和重排序机制,该模块确保系统能够快速检索到与用户查询最相关的文档。

4.1 模块功能概述

VectorStore类提供了以下主要功能:

  • 初始化与集合管理:创建或加载Milvus向量数据库集合。
  • 文档向量化与存储:将分块后的文档转换为向量并存储。
  • 混合检索与重排序:结合稠密和稀疏向量进行检索,并通过重排序优化结果。

以下将逐一讲解每个方法的实现细节。

导入必备工具包

# 导入 BGE-M3 嵌入函数,用于生成文档和查询的向量表示
from milvus_model.hybrid import BGEM3EmbeddingFunction
# 导入 Milvus 相关类,用于操作向量数据库
from pymilvus import MilvusClient, DataType, AnnSearchRequest, WeightedRanker
# 导入 Document 类,用于创建文档对象
from langchain.docstore.document import Document
# 导入 CrossEncoder,用于重排序和 NLI 判断
from sentence_transformers import CrossEncoder
# 导入 hashlib 模块,用于生成唯一 ID 的哈希值
import hashlib
from base import logger, Config

conf = Config()

4.2 初始化方法

功能

__init__方法初始化VectorStore类的实例,设置基本参数并调用集合创建或加载方法。

代码示例

# core/vector_store.py
# 定义 VectorStore 类,封装向量存储和检索功能
class VectorStore:
    # 初始化方法,设置向量存储的基本参数
    def __init__(self,
                 collection_name=conf.MILVUS_COLLECTION_NAME,
                 host=conf.MILVUS_HOST,
                 port=conf.MILVUS_PORT,
                 database=conf.MILVUS_DATABASE_NAME):
        # 设置 Milvus 集合名称
        self.collection_name = collection_name
        # 设置 Milvus 主机地址
        self.host = host
        # 设置 Milvus 端口号
        self.port = port
        # 设置 Milvus 数据库名称
        self.database = database
        # 设置日志记录器
        self.logger = logger
        # 初始化 BGE-Reranker 模型,用于重排序检索结果
        self.reranker = CrossEncoder("./bge/bge-reranker-large")
        # 初始化 BGE-M3 嵌入函数,使用 CPU 设备,不启用 FP16
        self.embedding_function = BGEM3EmbeddingFunction(use_fp16=False, device="cpu")
        # 获取稠密向量的维度
        self.dense_dim = self.embedding_function.dim["dense"]
        # 初始化 Milvus 客户端,连接到指定主机和数据库
        self.client = MilvusClient(uri=f"http://{self.host}:{self.port}", db_name=self.database)
        # 调用方法创建或加载 Milvus 集合
        self._create_or_load_collection()

实现步骤

  1. 参数设置
    • 使用Config中的默认值初始化集合名称、主机、端口和数据库名称。
  2. 模型初始化
    • reranker:加载BGE-Reranker模型,用于后续重排序。
    • embedding_function:初始化BGE-M3嵌入模型,禁用FP16,使用CPU运行。
    • dense_dim:获取稠密向量的维度。
  3. 客户端连接
    • 创建MilvusClient实例,连接到指定主机和数据库。
  4. 集合管理
    • 调用_create_or_load_collection方法,确保集合可用。

说明

  • BGE-M3模型:提供稠密和稀疏向量生成能力。
  • 灵活性:通过参数支持自定义配置。

4.3 创建或加载集合

功能

_create_or_load_collection方法检查并创建或加载Milvus集合,定义字段结构和索引参数。

代码示例

# 定义私有方法,创建或加载 Milvus 集合
def _create_or_load_collection(self):
    # 检查指定集合是否已存在
    if not self.client.has_collection(self.collection_name):
        # 创建集合 Schema,禁用自动 ID,启用动态字段
        schema = self.client.create_schema(auto_id=False, enable_dynamic_field=True)
        # 添加 ID 字段,作为主键,VARCHAR 类型,最大长度 100
        schema.add_field(field_name="id", datatype=DataType.VARCHAR, is_primary=True, max_length=100)
        # 添加文本字段,VARCHAR 类型,最大长度 65535
        schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
        # 添加稠密向量字段,FLOAT_VECTOR 类型,维度由嵌入函数指定
        schema.add_field(field_name="dense_vector", datatype=DataType.FLOAT_VECTOR, dim=self.dense_dim)
        # 添加稀疏向量字段,SPARSE_FLOAT_VECTOR 类型
        schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)
        # 添加父块 ID 字段,VARCHAR 类型,最大长度 100
        schema.add_field(field_name="parent_id", datatype=DataType.VARCHAR, max_length=100)
        # 添加父块内容字段,VARCHAR 类型,最大长度 65535
        schema.add_field(field_name="parent_content", datatype=DataType.VARCHAR, max_length=65535)
        # 添加学科类别字段,VARCHAR 类型,最大长度 50
        schema.add_field(field_name="source", datatype=DataType.VARCHAR, max_length=50)
        # 添加时间戳字段,VARCHAR 类型,最大长度 50
        schema.add_field(field_name="timestamp", datatype=DataType.VARCHAR, max_length=50)

        # 创建索引参数对象
        index_params = self.client.prepare_index_params()
        # 为稠密向量字段添加 IVF_FLAT 索引,度量类型为内积 (IP)
        index_params.add_index(
            field_name="dense_vector",
            index_name="dense_index",
            index_type="IVF_FLAT",
            metric_type="IP",
            params={"nlist": 128}
        )
        # 为稀疏向量字段添加 SPARSE_INVERTED_INDEX 索引,度量类型为内积 (IP)
        index_params.add_index(
            field_name="sparse_vector",
            index_name="sparse_index",
            index_type="SPARSE_INVERTED_INDEX",
            metric_type="IP",
            params={"drop_ratio_build": 0.2}
        )

        # 创建 Milvus 集合,应用定义的 Schema 和索引参数
        self.client.create_collection(collection_name=self.collection_name, schema=schema,
                                     index_params=index_params)
        # 记录创建集合的日志
        logger.info(f"已创建集合 {self.collection_name}")
    # 如果集合已存在
    else:
        # 记录加载集合的日志
        logger.info(f"已加载集合 {self.collection_name}")
    # 将集合加载到内存,确保可立即查询
    self.client.load_collection(self.collection_name)

实现步骤

  1. 检查集合是否存在
    • 使用has_collection判断是否需要创建新集合。
  2. 定义Schema
    • 设置字段:包括id(主键)、text(原文)、向量字段和元数据字段。
    • 禁用自动ID,启用动态字段。
  3. 创建索引
    • 稠密向量使用IVF_FLAT索引,稀疏向量使用SPARSE_INVERTED_INDEX
  4. 创建并加载集合
    • 调用create_collection创建集合,并加载到内存。

说明

  • 字段设计:支持多种数据类型和元数据管理。
  • 索引优化:平衡检索速度和精度。

4.4 添加文档

功能

add_documents方法将分块后的文档转换为向量并存储到Milvus集合中。

代码示例

# 定义方法,向向量存储添加文档
def add_documents(self, documents):
    # 提取所有文档的内容列表
    texts = [doc.page_content for doc in documents]
    # 使用 BGE-M3 嵌入函数生成文档的嵌入
    embeddings = self.embedding_function(texts)
    # 初始化空列表,用于存储插入的数据
    data = []
    # 遍历每个文档,带上索引 i
    for i, doc in enumerate(documents):
        # 生成文档内容的 MD5 哈希值,作为唯一 ID
        text_hash = hashlib.md5(doc.page_content.encode('utf-8')).hexdigest()
        # 初始化稀疏向量字典
        sparse_vector = {}
        # 获取第 i 行的稀疏向量数据
        row = embeddings["sparse"].getrow(i)
        # 获取稀疏向量的非零值索引
        indices = row.indices
        # 获取稀疏向量的非零值
        values = row.data
        # 将索引和值配对,填充稀疏向量字典
        for idx, value in zip(indices, values):
            sparse_vector[idx] = value
        # 创建数据字典,包含所有字段
        data.append({
            "id": text_hash,
            "text": doc.page_content,
            "dense_vector": embeddings["dense"][i],
            "sparse_vector": sparse_vector,
            "parent_id": doc.metadata["parent_id"],
            "parent_content": doc.metadata["parent_content"],
            "source": doc.metadata.get("source", "unknown"),
            "timestamp": doc.metadata.get("timestamp", "unknown")
        })
    # 检查是否有数据需要插入
    if data:
        # 使用 upsert 操作插入数据,覆盖重复 ID
        self.client.upsert(collection_name=self.collection_name, data=data)
        # 记录插入或更新的文档数量日志
        logger.info(f"已插入或更新 {len(data)} 个文档")

实现步骤

  1. 提取文本
    • 从文档对象中提取文本内容。
  2. 生成向量
    • 使用BGE-M3模型生成稠密和稀疏向量。
  3. 构造数据
    • 为每篇文档生成唯一ID(MD5哈希)。
    • 将向量和元数据组织成字典。
  4. 存储数据
    • 使用upsert操作插入或更新数据。

说明

  • 唯一性:通过MD5哈希确保ID唯一。
  • 稀疏向量处理:将稀疏矩阵转换为字典格式。

4.5 混合检索与重排序

功能

hybrid_search_with_rerank方法实现混合检索并重排序,返回最相关文档。

代码示例

# 定义方法,执行混合检索并重排序
def hybrid_search_with_rerank(self, query, k=conf.RETRIEVAL_K, source_filter=None):
    # 使用 BGE-M3 嵌入函数生成查询的嵌入
    query_embeddings = self.embedding_function([query])
    # 获取查询的稠密向量
    dense_query_vector = query_embeddings["dense"][0]
    # 初始化查询的稀疏向量字典
    sparse_query_vector = {}
    # 获取查询稀疏向量的第 0 行数据
    row = query_embeddings["sparse"].getrow(0)
    # 获取稀疏向量的非零值索引
    indices = row.indices
    # 获取稀疏向量的非零值
    values = row.data
    # 将索引和值配对,填充稀疏向量字典
    for idx, value in zip(indices, values):
        sparse_query_vector[idx] = value

    # 初始化过滤表达式,默认不过滤
    filter_expr = f"source == '{source_filter}'" if source_filter else ""
    # 创建稠密向量搜索请求
    dense_request = AnnSearchRequest(
        data=[dense_query_vector],
        anns_field="dense_vector",
        param={"metric_type": "IP", "params": {"nprobe": 10}},
        limit=k,
        expr=filter_expr
    )
    # 创建稀疏向量搜索请求
    sparse_request = AnnSearchRequest(
        data=[sparse_query_vector],
        anns_field="sparse_vector",
        param={"metric_type": "IP", "params": {}},
        limit=k,
        expr=filter_expr
    )

    # 创建加权排序器,稀疏向量权重 0.7,稠密向量权重 1.0
    ranker = WeightedRanker(0.7, 1.0)
    # 执行混合搜索,返回 Top-K 结果
    results = self.client.hybrid_search(
        collection_name=self.collection_name,
        reqs=[dense_request, sparse_request],
        ranker=ranker,
        limit=k,
        output_fields=["text", "parent_id", "parent_content", "source", "timestamp"]
    )[0]

    # 将搜索结果转换为 Document 对象列表
    sub_chunks = [self._doc_from_hit(hit["entity"]) for hit in results]
    print(f'sub_chunks-->{len(sub_chunks)}')
    # 从子块中提取去重的父文档
    parent_docs = self._get_unique_parent_docs(sub_chunks)
    # 如果只有1个文档,直接返回跳过重排序
    if len(parent_docs) < 2:
      return parent_docs[:conf.CANDIDATE_M] 
    # 如果有父文档,进行重排序
    if parent_docs:
        # 创建查询与文档内容的配对列表
        pairs = [[query, doc.page_content] for doc in parent_docs]
        # 使用 BGE-Reranker 计算每个配对的得分
        scores = self.reranker.predict(pairs)
        # 根据得分从高到低排序文档
        ranked_parent_docs = [doc for _, doc in sorted(zip(scores, parent_docs), reverse=True)]
    # 如果没有父文档,返回空列表
    else:
        ranked_parent_docs = []

    # 返回前 k 个重排序后的文档
    return ranked_parent_docs[:conf.CANDIDATE_M]

实现步骤

  1. 生成查询向量
    • 使用BGE-M3生成稠密和稀疏向量。
  2. 构造检索请求
    • 为稠密和稀疏向量分别创建AnnSearchRequest
  3. 混合检索
    • 使用WeightedRanker融合结果。
  4. 重排序
    • 使用CrossEncoder重新排序父文档。

说明

  • 混合检索:提升覆盖率和准确性。
  • 重排序:确保最相关文档优先。

4.6 获取唯一父文档

功能

_get_unique_parent_docs方法从子块中提取去重的父文档。

代码示例

# 定义私有方法,从子块中提取去重的父文档
def _get_unique_parent_docs(self, sub_chunks):
    # 初始化集合,用于存储已处理的父块内容(去重)
    parent_contents = set()
    # 初始化列表,用于存储唯一父文档
    unique_docs = []
    # 遍历所有子块
    for chunk in sub_chunks:
        # 获取子块的父块内容,默认为子块内容
        parent_content = chunk.metadata.get("parent_content", chunk.page_content)
        # 检查父块内容是否非空且未重复
        if parent_content and parent_content not in parent_contents:
            # 创建新的 Document 对象,包含父块内容和元数据
            unique_docs.append(Document(page_content=parent_content, metadata=chunk.metadata))
            # 将父块内容添加到去重集合
            parent_contents.add(parent_content)
    # 返回去重后的父文档列表
    return unique_docs

实现步骤

  1. 去重
    • 使用集合记录已处理的父内容。
  2. 构造文档
    • 创建包含父内容的Document对象。

说明

  • 去重逻辑:避免重复父文档。
  • 元数据保留:保持完整性。

4.7 从查询结果创建文档

功能

_doc_from_hit方法将Milvus查询结果转换为Document对象。

代码示例

# 定义私有方法,从 Milvus 查询结果创建 Document 对象
def _doc_from_hit(self, hit):
    # 创建并返回 Document 对象,填充内容和元数据
    return Document(
        page_content=hit.get("text"),
        metadata={
            "parent_id": hit.get("parent_id"),
            "parent_content": hit.get("parent_content"),
            "source": hit.get("source"),
            "timestamp": hit.get("timestamp")
        }
    )

实现步骤

  1. 提取内容和元数据
    • 从查询结果中获取字段。
  2. 创建对象
    • 构造Document实例。

总结

本章节全面讲解了vector_store.py模块的每个方法:

  • 初始化:设置参数和模型,为向量存储做好准备。
  • 创建集合:定义结构和索引,构建高效数据库。
  • 添加文档:实现文档向量化与存储。
  • 混合检索:结合稠密和稀疏向量并重排序,返回最相关结果。
  • 辅助方法:支持去重和文档转换。

学习者通过本章节掌握了向量存储的完整流程,为RAG系统的检索功能奠定了基础。