向量存储¶
学习目标¶
- 1.理解向量存储在RAG系统中的功能和重要性。
- 2.学会创建和管理向量数据库。
- 3.掌握如何将文本转化为向量并存入数据库。
- 4.理解混合检索与重排序的实现原理。
vector_store.py是EduRAG系统的核心模块之一,封装了与Milvus向量数据库的交互逻辑。它负责将文档转化为向量存储到数据库中,并提供高效的混合检索功能。通过结合BGE-M3嵌入模型和重排序机制,该模块确保系统能够快速检索到与用户查询最相关的文档。
4.1 模块功能概述¶
VectorStore类提供了以下主要功能:
- 初始化与集合管理:创建或加载Milvus向量数据库集合。
- 文档向量化与存储:将分块后的文档转换为向量并存储。
- 混合检索与重排序:结合稠密和稀疏向量进行检索,并通过重排序优化结果。
以下将逐一讲解每个方法的实现细节。
导入必备工具包¶
# 导入 BGE-M3 嵌入函数,用于生成文档和查询的向量表示
from milvus_model.hybrid import BGEM3EmbeddingFunction
# 导入 Milvus 相关类,用于操作向量数据库
from pymilvus import MilvusClient, DataType, AnnSearchRequest, WeightedRanker
# 导入 Document 类,用于创建文档对象
from langchain.docstore.document import Document
# 导入 CrossEncoder,用于重排序和 NLI 判断
from sentence_transformers import CrossEncoder
# 导入 hashlib 模块,用于生成唯一 ID 的哈希值
import hashlib
from base import logger, Config
conf = Config()
4.2 初始化方法¶
功能¶
__init__方法初始化VectorStore类的实例,设置基本参数并调用集合创建或加载方法。
代码示例¶
# core/vector_store.py
# 定义 VectorStore 类,封装向量存储和检索功能
class VectorStore:
# 初始化方法,设置向量存储的基本参数
def __init__(self,
collection_name=conf.MILVUS_COLLECTION_NAME,
host=conf.MILVUS_HOST,
port=conf.MILVUS_PORT,
database=conf.MILVUS_DATABASE_NAME):
# 设置 Milvus 集合名称
self.collection_name = collection_name
# 设置 Milvus 主机地址
self.host = host
# 设置 Milvus 端口号
self.port = port
# 设置 Milvus 数据库名称
self.database = database
# 设置日志记录器
self.logger = logger
# 初始化 BGE-Reranker 模型,用于重排序检索结果
self.reranker = CrossEncoder("./bge/bge-reranker-large")
# 初始化 BGE-M3 嵌入函数,使用 CPU 设备,不启用 FP16
self.embedding_function = BGEM3EmbeddingFunction(use_fp16=False, device="cpu")
# 获取稠密向量的维度
self.dense_dim = self.embedding_function.dim["dense"]
# 初始化 Milvus 客户端,连接到指定主机和数据库
self.client = MilvusClient(uri=f"http://{self.host}:{self.port}", db_name=self.database)
# 调用方法创建或加载 Milvus 集合
self._create_or_load_collection()
实现步骤¶
- 参数设置:
- 使用
Config中的默认值初始化集合名称、主机、端口和数据库名称。
- 使用
- 模型初始化:
reranker:加载BGE-Reranker模型,用于后续重排序。embedding_function:初始化BGE-M3嵌入模型,禁用FP16,使用CPU运行。dense_dim:获取稠密向量的维度。
- 客户端连接:
- 创建
MilvusClient实例,连接到指定主机和数据库。
- 创建
- 集合管理:
- 调用
_create_or_load_collection方法,确保集合可用。
- 调用
说明¶
- BGE-M3模型:提供稠密和稀疏向量生成能力。
- 灵活性:通过参数支持自定义配置。
4.3 创建或加载集合¶
功能¶
_create_or_load_collection方法检查并创建或加载Milvus集合,定义字段结构和索引参数。
代码示例¶
# 定义私有方法,创建或加载 Milvus 集合
def _create_or_load_collection(self):
# 检查指定集合是否已存在
if not self.client.has_collection(self.collection_name):
# 创建集合 Schema,禁用自动 ID,启用动态字段
schema = self.client.create_schema(auto_id=False, enable_dynamic_field=True)
# 添加 ID 字段,作为主键,VARCHAR 类型,最大长度 100
schema.add_field(field_name="id", datatype=DataType.VARCHAR, is_primary=True, max_length=100)
# 添加文本字段,VARCHAR 类型,最大长度 65535
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
# 添加稠密向量字段,FLOAT_VECTOR 类型,维度由嵌入函数指定
schema.add_field(field_name="dense_vector", datatype=DataType.FLOAT_VECTOR, dim=self.dense_dim)
# 添加稀疏向量字段,SPARSE_FLOAT_VECTOR 类型
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)
# 添加父块 ID 字段,VARCHAR 类型,最大长度 100
schema.add_field(field_name="parent_id", datatype=DataType.VARCHAR, max_length=100)
# 添加父块内容字段,VARCHAR 类型,最大长度 65535
schema.add_field(field_name="parent_content", datatype=DataType.VARCHAR, max_length=65535)
# 添加学科类别字段,VARCHAR 类型,最大长度 50
schema.add_field(field_name="source", datatype=DataType.VARCHAR, max_length=50)
# 添加时间戳字段,VARCHAR 类型,最大长度 50
schema.add_field(field_name="timestamp", datatype=DataType.VARCHAR, max_length=50)
# 创建索引参数对象
index_params = self.client.prepare_index_params()
# 为稠密向量字段添加 IVF_FLAT 索引,度量类型为内积 (IP)
index_params.add_index(
field_name="dense_vector",
index_name="dense_index",
index_type="IVF_FLAT",
metric_type="IP",
params={"nlist": 128}
)
# 为稀疏向量字段添加 SPARSE_INVERTED_INDEX 索引,度量类型为内积 (IP)
index_params.add_index(
field_name="sparse_vector",
index_name="sparse_index",
index_type="SPARSE_INVERTED_INDEX",
metric_type="IP",
params={"drop_ratio_build": 0.2}
)
# 创建 Milvus 集合,应用定义的 Schema 和索引参数
self.client.create_collection(collection_name=self.collection_name, schema=schema,
index_params=index_params)
# 记录创建集合的日志
logger.info(f"已创建集合 {self.collection_name}")
# 如果集合已存在
else:
# 记录加载集合的日志
logger.info(f"已加载集合 {self.collection_name}")
# 将集合加载到内存,确保可立即查询
self.client.load_collection(self.collection_name)
实现步骤¶
- 检查集合是否存在:
- 使用
has_collection判断是否需要创建新集合。
- 使用
- 定义Schema:
- 设置字段:包括
id(主键)、text(原文)、向量字段和元数据字段。 - 禁用自动ID,启用动态字段。
- 设置字段:包括
- 创建索引:
- 稠密向量使用
IVF_FLAT索引,稀疏向量使用SPARSE_INVERTED_INDEX。
- 稠密向量使用
- 创建并加载集合:
- 调用
create_collection创建集合,并加载到内存。
- 调用
说明¶
- 字段设计:支持多种数据类型和元数据管理。
- 索引优化:平衡检索速度和精度。
4.4 添加文档¶
功能¶
add_documents方法将分块后的文档转换为向量并存储到Milvus集合中。
代码示例¶
# 定义方法,向向量存储添加文档
def add_documents(self, documents):
# 提取所有文档的内容列表
texts = [doc.page_content for doc in documents]
# 使用 BGE-M3 嵌入函数生成文档的嵌入
embeddings = self.embedding_function(texts)
# 初始化空列表,用于存储插入的数据
data = []
# 遍历每个文档,带上索引 i
for i, doc in enumerate(documents):
# 生成文档内容的 MD5 哈希值,作为唯一 ID
text_hash = hashlib.md5(doc.page_content.encode('utf-8')).hexdigest()
# 初始化稀疏向量字典
sparse_vector = {}
# 获取第 i 行的稀疏向量数据
row = embeddings["sparse"].getrow(i)
# 获取稀疏向量的非零值索引
indices = row.indices
# 获取稀疏向量的非零值
values = row.data
# 将索引和值配对,填充稀疏向量字典
for idx, value in zip(indices, values):
sparse_vector[idx] = value
# 创建数据字典,包含所有字段
data.append({
"id": text_hash,
"text": doc.page_content,
"dense_vector": embeddings["dense"][i],
"sparse_vector": sparse_vector,
"parent_id": doc.metadata["parent_id"],
"parent_content": doc.metadata["parent_content"],
"source": doc.metadata.get("source", "unknown"),
"timestamp": doc.metadata.get("timestamp", "unknown")
})
# 检查是否有数据需要插入
if data:
# 使用 upsert 操作插入数据,覆盖重复 ID
self.client.upsert(collection_name=self.collection_name, data=data)
# 记录插入或更新的文档数量日志
logger.info(f"已插入或更新 {len(data)} 个文档")
实现步骤¶
- 提取文本:
- 从文档对象中提取文本内容。
- 生成向量:
- 使用BGE-M3模型生成稠密和稀疏向量。
- 构造数据:
- 为每篇文档生成唯一ID(MD5哈希)。
- 将向量和元数据组织成字典。
- 存储数据:
- 使用
upsert操作插入或更新数据。
- 使用
说明¶
- 唯一性:通过MD5哈希确保ID唯一。
- 稀疏向量处理:将稀疏矩阵转换为字典格式。
4.5 混合检索与重排序¶
功能¶
hybrid_search_with_rerank方法实现混合检索并重排序,返回最相关文档。
代码示例¶
# 定义方法,执行混合检索并重排序
def hybrid_search_with_rerank(self, query, k=conf.RETRIEVAL_K, source_filter=None):
# 使用 BGE-M3 嵌入函数生成查询的嵌入
query_embeddings = self.embedding_function([query])
# 获取查询的稠密向量
dense_query_vector = query_embeddings["dense"][0]
# 初始化查询的稀疏向量字典
sparse_query_vector = {}
# 获取查询稀疏向量的第 0 行数据
row = query_embeddings["sparse"].getrow(0)
# 获取稀疏向量的非零值索引
indices = row.indices
# 获取稀疏向量的非零值
values = row.data
# 将索引和值配对,填充稀疏向量字典
for idx, value in zip(indices, values):
sparse_query_vector[idx] = value
# 初始化过滤表达式,默认不过滤
filter_expr = f"source == '{source_filter}'" if source_filter else ""
# 创建稠密向量搜索请求
dense_request = AnnSearchRequest(
data=[dense_query_vector],
anns_field="dense_vector",
param={"metric_type": "IP", "params": {"nprobe": 10}},
limit=k,
expr=filter_expr
)
# 创建稀疏向量搜索请求
sparse_request = AnnSearchRequest(
data=[sparse_query_vector],
anns_field="sparse_vector",
param={"metric_type": "IP", "params": {}},
limit=k,
expr=filter_expr
)
# 创建加权排序器,稀疏向量权重 0.7,稠密向量权重 1.0
ranker = WeightedRanker(0.7, 1.0)
# 执行混合搜索,返回 Top-K 结果
results = self.client.hybrid_search(
collection_name=self.collection_name,
reqs=[dense_request, sparse_request],
ranker=ranker,
limit=k,
output_fields=["text", "parent_id", "parent_content", "source", "timestamp"]
)[0]
# 将搜索结果转换为 Document 对象列表
sub_chunks = [self._doc_from_hit(hit["entity"]) for hit in results]
print(f'sub_chunks-->{len(sub_chunks)}')
# 从子块中提取去重的父文档
parent_docs = self._get_unique_parent_docs(sub_chunks)
# 如果只有1个文档,直接返回跳过重排序
if len(parent_docs) < 2:
return parent_docs[:conf.CANDIDATE_M]
# 如果有父文档,进行重排序
if parent_docs:
# 创建查询与文档内容的配对列表
pairs = [[query, doc.page_content] for doc in parent_docs]
# 使用 BGE-Reranker 计算每个配对的得分
scores = self.reranker.predict(pairs)
# 根据得分从高到低排序文档
ranked_parent_docs = [doc for _, doc in sorted(zip(scores, parent_docs), reverse=True)]
# 如果没有父文档,返回空列表
else:
ranked_parent_docs = []
# 返回前 k 个重排序后的文档
return ranked_parent_docs[:conf.CANDIDATE_M]
实现步骤¶
- 生成查询向量:
- 使用BGE-M3生成稠密和稀疏向量。
- 构造检索请求:
- 为稠密和稀疏向量分别创建
AnnSearchRequest。
- 为稠密和稀疏向量分别创建
- 混合检索:
- 使用
WeightedRanker融合结果。
- 使用
- 重排序:
- 使用
CrossEncoder重新排序父文档。
- 使用
说明¶
- 混合检索:提升覆盖率和准确性。
- 重排序:确保最相关文档优先。
4.6 获取唯一父文档¶
功能¶
_get_unique_parent_docs方法从子块中提取去重的父文档。
代码示例¶
# 定义私有方法,从子块中提取去重的父文档
def _get_unique_parent_docs(self, sub_chunks):
# 初始化集合,用于存储已处理的父块内容(去重)
parent_contents = set()
# 初始化列表,用于存储唯一父文档
unique_docs = []
# 遍历所有子块
for chunk in sub_chunks:
# 获取子块的父块内容,默认为子块内容
parent_content = chunk.metadata.get("parent_content", chunk.page_content)
# 检查父块内容是否非空且未重复
if parent_content and parent_content not in parent_contents:
# 创建新的 Document 对象,包含父块内容和元数据
unique_docs.append(Document(page_content=parent_content, metadata=chunk.metadata))
# 将父块内容添加到去重集合
parent_contents.add(parent_content)
# 返回去重后的父文档列表
return unique_docs
实现步骤¶
- 去重:
- 使用集合记录已处理的父内容。
- 构造文档:
- 创建包含父内容的
Document对象。
- 创建包含父内容的
说明¶
- 去重逻辑:避免重复父文档。
- 元数据保留:保持完整性。
4.7 从查询结果创建文档¶
功能¶
_doc_from_hit方法将Milvus查询结果转换为Document对象。
代码示例¶
# 定义私有方法,从 Milvus 查询结果创建 Document 对象
def _doc_from_hit(self, hit):
# 创建并返回 Document 对象,填充内容和元数据
return Document(
page_content=hit.get("text"),
metadata={
"parent_id": hit.get("parent_id"),
"parent_content": hit.get("parent_content"),
"source": hit.get("source"),
"timestamp": hit.get("timestamp")
}
)
实现步骤¶
- 提取内容和元数据:
- 从查询结果中获取字段。
- 创建对象:
- 构造
Document实例。
- 构造
总结¶
本章节全面讲解了vector_store.py模块的每个方法:
- 初始化:设置参数和模型,为向量存储做好准备。
- 创建集合:定义结构和索引,构建高效数据库。
- 添加文档:实现文档向量化与存储。
- 混合检索:结合稠密和稀疏向量并重排序,返回最相关结果。
- 辅助方法:支持去重和文档转换。
学习者通过本章节掌握了向量存储的完整流程,为RAG系统的检索功能奠定了基础。