基于MySQL的FQA问答系统实现¶
学习目标¶
- 理解FQA问答系统的整体流程。
- 掌握如何整合MySQL、Redis和BM25算法构建QA问答系统。
1 FQA系统概述¶
本系统从MySQL数据库检索问答对,使用BM25算法计算相似度,并通过Softmax归一化将得分转换为概率值,阈值0.85判断答案可靠性。Redis仅缓存高可靠性结果(相似度>0.85且有答案)。若MySQL无可靠答案,则调用RAG系统检索。
1.1 系统流程¶
- 数据存储:MySQL存储FQA高频问答对数据。
- 问题检索:BM25计算相似度,Softmax归一化后判断阈值0.85。
- 缓存管理:Redis仅存储相似度>0.85且有答案的数据。
- 答案返回:
- 若MySQL返回可靠答案,直接返回。
- 否则,调用RAG系统检索。
1.2 项目结构¶
integrated_qa_system/
├── config.ini # 配置文件,包含所有模块的配置
├── base/
│ ├── config.py # 配置管理,加载 config.ini
│ ├── logger.py # 日志设置
├── mysql_qa/
│ ├── data/
│ │ ├── JP学科知识问答.csv # FQA数据集
│ ├── db/
│ │ ├── mysql_client.py # MySQL 数据库操作
│ ├── cache/
│ │ ├── redis_client.py # Redis 缓存操作
│ ├── retrieval/
│ │ ├── bm25_search.py # BM25 搜索
│ ├── utils/
│ │ ├── preprocess.py # 文本预处理
│ ├── main.py # MySQL 系统独立入口,支持查询
├── requirements.txt # 依赖文件
└── logs/
└── app.log # 日志文件
2 代码实现¶
配置文件 (config.ini)¶
# MySQL 配置
[mysql]
host = localhost
user = root
password = 123456
database = subjects_kg
# Redis 配置
[redis]
host = localhost
port = 6379
password = 1234
db = 0
# 日志配置
[logger]
log_file = /path/to/your/logs/app.log
2.1 配置管理¶
功能¶
config.py文件定义了Config类,用于集中管理系统中的所有配置参数。这些参数包括数据库连接信息、模型选择、分块策略、API设置等。通过集中管理配置,系统可以方便地调整参数、适配不同环境,并支持通过环境变量进行灵活配置。
代码实现¶
# base/config.py
# 导入配置解析库
import configparser
# 导入路径操作库
import os
class Config:
# 初始化配置,加载 config.ini 文件
def __init__(self, config_file='../config.ini'):
# 创建配置解析器
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read(config_file)
# MySQL 配置
# MySQL 主机地址
self.MYSQL_HOST = self.config.get('mysql', 'host', fallback='localhost')
# MySQL 用户名
self.MYSQL_USER = self.config.get('mysql', 'user', fallback='root')
# MySQL 密码
self.MYSQL_PASSWORD = self.config.get('mysql', 'password', fallback='123456')
# MySQL 数据库名
self.MYSQL_DATABASE = self.config.get('mysql', 'database', fallback='subjects_kg')
# Redis 配置
# Redis 主机地址
self.REDIS_HOST = self.config.get('redis', 'host', fallback='localhost')
# Redis 端口
self.REDIS_PORT = self.config.getint('redis', 'port', fallback=6379)
# Redis 密码
self.REDIS_PASSWORD = self.config.get('redis', 'password', fallback='1234')
# Redis 数据库编号
self.REDIS_DB = self.config.getint('redis', 'db', fallback=0)
# 日志文件路径
self.LOG_FILE = self.config.get('logger', 'log_file', fallback='logs/app.log')
if __name__ == '__main__':
conf = Config()
print(conf.CHILD_CHUNK_SIZE)
说明¶
- 默认值:每个参数设有默认值,确保未配置环境变量时系统仍可运行。
- 参数分类:按功能分类(如数据库、模型、分块等),便于管理和维护。
2.2 日志记录¶
功能¶
logger.py文件定义了setup_logging函数,用于配置系统的日志记录器。日志记录器将运行信息、警告和错误输出到文件和控制台,便于开发、调试和运维人员监控系统状态。
代码实现¶
# base/logger.py
# 导入日志库
import logging
# 导入路径操作库
import os
# 导入配置类
from config import Config
def setup_logging(log_file=Config().LOG_FILE):
# 创建日志目录
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# 获取日志器
logger = logging.getLogger("EduRAG")
# 设置日志级别
logger.setLevel(logging.INFO)
# 避免重复添加处理器
if not logger.handlers:
# 创建文件处理器
file_handler = logging.FileHandler(log_file, encoding='utf-8')
# 设置文件处理器级别
file_handler.setLevel(logging.INFO)
# 创建控制台处理器
console_handler = logging.StreamHandler()
# 设置控制台处理器级别
console_handler.setLevel(logging.INFO)
# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 为文件处理器设置格式
file_handler.setFormatter(formatter)
# 为控制台处理器设置格式
console_handler.setFormatter(formatter)
# 添加文件处理器
logger.addHandler(file_handler)
# 添加控制台处理器
logger.addHandler(console_handler)
# 返回日志器
return logger
# 初始化日志器
logger = setup_logging()
说明¶
- 日志级别:默认设为
INFO,记录关键运行信息。 - 双重输出:同时输出到文件和控制台,便于实时监控和后续分析。
- 格式化:日志包含时间戳、名称、级别和内容,便于问题定位。
2.3 MySQL操作模块¶
功能¶
mysql_client.py是一个用于与 MySQL 交互的模块。模块通过读取配置文件连接数据库,支持创建表、从 CSV 文件插入数据、查询问题和答案,以及安全关闭连接。所有操作均通过日志记录,便于调试和监控系统状态。
代码实现¶
# db/mysql_client.py
# 导入 MySQL 连接库
import pymysql
# 导入pandas
import pandas as pd
# 导入配置和日志
from base import Config, logger
class MySQLClient:
def __init__(self):
# 初始化日志
self.logger = logger
try:
# 连接 MySQL 数据库
self.connection = pymysql.connect(
host=Config().MYSQL_HOST,
user=Config().MYSQL_USER,
password=Config().MYSQL_PASSWORD,
database=Config().MYSQL_DATABASE
)
# 创建游标
self.cursor = self.connection.cursor()
# 记录连接成功
self.logger.info("MySQL 连接成功")
except pymysql.MySQLError as e:
# 记录连接失败
self.logger.error(f"MySQL 连接失败: {e}")
raise
def create_table(self):
create_table_query = '''
CREATE TABLE IF NOT EXISTS jpkb (
id INT AUTO_INCREMENT PRIMARY KEY,
subject_name VARCHAR(20),
question VARCHAR(1000),
answer VARCHAR(1000))
'''
try:
self.cursor.execute(create_table_query)
self.connection.commit()
self.logger.info("表创建成功")
except pymysql.MySQLError as e:
self.logger.error(f"表创建失败: {e}")
raise
def insert_data(self, csv_path):
try:
data = pd.read_csv(csv_path)
for _, row in data.iterrows():
insert_query = "INSERT INTO jpkb (subject_name, question, answer) VALUES (%s, %s, %s)"
self.cursor.execute(insert_query, (row['学科名称'], row['问题'], row['答案']))
self.connection.commit()
self.logger.info("数据插入成功")
except Exception as e:
self.logger.error(f"数据插入失败: {e}")
self.connection.rollback()
raise
def fetch_questions(self):
# 获取所有问题
try:
# 执行查询
self.cursor.execute("SELECT question FROM jpkb")
# 获取结果
results = self.cursor.fetchall()
# 记录获取成功
self.logger.info("成功获取问题")
# 返回结果
return results
except pymysql.MySQLError as e:
# 记录查询失败
self.logger.error(f"查询失败: {e}")
# 返回空列表
return []
def fetch_answer(self, question):
# 获取指定问题的答案
try:
# 执行查询
self.cursor.execute("SELECT answer FROM jpkb WHERE question=%s", (question,))
# 获取结果
result = self.cursor.fetchone()
# 返回答案或 None
return result[0] if result else None
except pymysql.MySQLError as e:
# 记录答案获取失败
self.logger.error(f"答案获取失败: {e}")
# 返回 None
return None
def close(self):
# 关闭数据库连接
try:
# 关闭连接
self.connection.close()
# 记录关闭成功
self.logger.info("MySQL 连接已关闭")
except pymysql.MySQLError as e:
# 记录关闭失败
self.logger.error(f"关闭连接失败: {e}")
if __name__ == '__main__':
mysql_client = MySQLClient()
mysql_client.create_table()
mysql_client.insert_data('../data/JP学科知识问答.csv')
说明¶
- 数据库连接:通过 config.ini 配置文件读取 MySQL 参数,使用 pymysql 建立连接。
- 表管理:创建 jpkb 表,包含字段 id(自增主键)、subject_name(学科名称)、question(问题)、answer(答案),使用 IF NOT EXISTS 避免重复创建。
- 异常处理:每个方法均捕获异常,记录错误日志并根据需要回滚事务或抛出异常。
2.4 Redis 缓存操作模块¶
功能¶
redis_client.py该模块用于与 Redis 数据库交互。模块通过配置文件连接 Redis,支持键值对存储与查询(使用 JSON 序列化)、答案缓存查询,并记录操作日志,便于调试和监控。
代码实现¶
# cache/redis_client.py
# 导入 Redis 客户端
import redis
# 导入 JSON 处理
import json
# 导入配置和日志
from base import Config, logger
class RedisClient:
def __init__(self):
# 初始化日志
self.logger = logger
try:
# 连接 Redis
self.client = redis.StrictRedis(
host=Config().REDIS_HOST,
port=Config().REDIS_PORT,
password=Config().REDIS_PASSWORD,
db=Config().REDIS_DB,
decode_responses=True
)
# 记录连接成功
self.logger.info("Redis 连接成功")
except redis.RedisError as e:
# 记录连接失败
self.logger.error(f"Redis 连接失败: {e}")
raise
def set_data(self, key, value):
# 存储数据到 Redis
try:
# 存储 JSON 数据
self.client.set(key, json.dumps(value))
# 记录存储成功
self.logger.info(f"存储数据到 Redis: {key}")
except redis.RedisError as e:
# 记录存储失败
self.logger.error(f"Redis 存储失败: {e}")
def get_data(self, key):
# 从 Redis 获取数据
try:
# 获取数据
data = self.client.get(key)
# 返回解析后的 JSON 数据或 None
return json.loads(data) if data else None
except redis.RedisError as e:
# 记录获取失败
self.logger.error(f"Redis 获取失败: {e}")
# 返回 None
return None
def get_answer(self, query):
# 获取查询的缓存答案
try:
# 从 Redis 获取答案
answer = self.client.get(f"answer:{query}")
if answer:
# 记录获取成功
self.logger.info(f"从 Redis 获取答案: {query}")
# 返回答案
return answer
# 返回 None
return None
except redis.RedisError as e:
# 记录查询失败
self.logger.error(f"Redis 查询失败: {e}")
# 返回 None
return None
if __name__ == '__main__':
redcli = RedisClient()
print(redcli)
说明¶
- Redis 连接:通过 config.ini 读取 Redis 配置,使用 redis.StrictRedis 建立连接。
- 数据操作:
- set_data:将键值对(值序列化为 JSON)存储到 Redis。
- get_data:根据键获取值并反序列化 JSON。
- get_answer:查询以 answer:{query} 格式存储的答案缓存。
2.5 文本预处理模块¶
功能¶
preprocess.py是一个基于 jieba 分词库实现文本预处理的模块。该模块将输入文本转换为小写并进行分词,返回分词结果,支持日志记录以监控处理状态。
代码实现¶
# utils/preprocess.py
# 导入分词库
import jieba
# 导入日志
from base import logger
def preprocess_text(text):
# 预处理文本
logger.info("开始预处理文本")
try:
# 分词并转换为小写
return jieba.lcut(text.lower())
except AttributeError as e:
# 记录预处理失败
logger.error(f"文本预处理失败: {e}")
# 返回空列表
return []
说明¶
- 文本处理:使用 jieba.lcut 对输入文本进行中文分词,并将文本转换为小写以规范化。
2.6 BM25+Softmax检索模块¶
功能¶
bm25_search.py 是一个基于 BM25 算法和 Softmax 归一化的文本检索模块,用于从问题库中检索与查询最匹配的答案。模块结合 Redis 缓存和 MySQL 数据库,支持问题加载、分词、BM25 评分、Softmax 归一化,并记录操作日志。
# retrieval/bm25_search.py
# 导入 BM25 算法
from rank_bm25 import BM25Okapi
# 导入数值计算库
import numpy as np
# 导入文本预处理
from utils.preprocess import preprocess_text
# 导入日志
from base import logger
class BM25Search:
def __init__(self, redis_client, mysql_client):
# 初始化日志
self.logger = logger
# 初始化 Redis 客户端
self.redis_client = redis_client
# 初始化 MySQL 客户端
self.mysql_client = mysql_client
# 初始化 BM25 模型
self.bm25 = None
# 初始化问题列表
self.questions = None
# 初始化原始问题
self.original_questions = None
# 加载数据
self._load_data()
def _load_data(self):
# 加载数据
original_key = "qa_original_questions"
tokenized_key = "qa_tokenized_questions"
# 从 Redis 获取原始问题
self.original_questions = self.redis_client.get_data(original_key)
# 从 Redis 获取分词问题
tokenized_questions = self.redis_client.get_data(tokenized_key)
# 如果 Redis 中没有数据,从 MySQL 加载
if not self.original_questions or not tokenized_questions:
# 从 MySQL 获取问题
self.original_questions = self.mysql_client.fetch_questions()
if not self.original_questions:
# 记录无问题警告
self.logger.warning("未加载到问题")
return
# 分词问题
tokenized_questions = [preprocess_text(q[0]) for q in self.original_questions]
# 存储原始问题到 Redis
self.redis_client.set_data(original_key, [(q[0]) for q in self.original_questions])
# 存储分词问题到 Redis
self.redis_client.set_data(tokenized_key, tokenized_questions)
# 设置问题列表
self.questions = tokenized_questions
# 初始化 BM25 模型
self.bm25 = BM25Okapi(self.questions)
# 记录 BM25 初始化成功
self.logger.info("BM25 模型初始化完成")
def _softmax(self, scores):
# 计算 Softmax 分数
exp_scores = np.exp(scores - np.max(scores))
# 返回归一化分数
return exp_scores / exp_scores.sum()
def search(self, query, threshold=0.85):
# 搜索查询
if not query or not isinstance(query, str):
# 记录无效查询
self.logger.error("无效查询")
# 返回 None 和 False
return None, False
# 检查 Redis 缓存
cached_answer = self.redis_client.get_answer(query)
if cached_answer:
# 返回缓存答案
return cached_answer, False
try:
# 分词查询
query_tokens = preprocess_text(query)
# 计算 BM25 分数
scores = self.bm25.get_scores(query_tokens)
# 计算 Softmax 分数
softmax_scores = self._softmax(scores)
# 获取最高分索引
best_idx = softmax_scores.argmax()
# 获取最高分
best_score = softmax_scores[best_idx]
# 检查是否超过阈值
if best_score >= threshold:
# 获取原始问题
original_question = self.original_questions[best_idx]
# 获取答案
answer = self.mysql_client.fetch_answer(original_question)
if answer:
# 缓存答案
self.redis_client.set_data(f"answer:{query}", answer)
# 记录搜索成功
self.logger.info(f"搜索成功,Softmax 相似度: {best_score:.3f}")
# 返回答案和 False
return answer, False
# 记录无可靠答案
self.logger.info(f"未找到可靠答案,最高 Softmax 相似度: {best_score:.3f}")
# 返回 None 和 True
return None, True
except Exception as e:
# 记录搜索失败
self.logger.error(f"搜索失败: {e}")
# 返回 None 和 True
return None, True
说明¶
- 数据加载:优先从 Redis 获取问题和分词数据,若无则从 MySQL 加载并分词后缓存到 Redis。
- BM25 检索:使用 BM25Okapi 计算查询与问题库的相似度,结合 Softmax 归一化评分。
- 答案查询:通过 Redis 缓存答案,若无缓存则从 MySQL 获取并缓存,阈值(默认 0.85)控制答案可靠性。
3 主程序 (main.py)¶
# 导入 MySQL 客户端
from db.mysql_client import MySQLClient
# 导入 Redis 客户端
from cache.redis_client import RedisClient
# 导入 BM25 搜索
from retrieval.bm25_search import BM25Search
# 导入日志
from base import logger
# 导入时间库
import time
class MySQLQASystem:
def __init__(self):
# 初始化日志
self.logger = logger
# 初始化 MySQL 客户端
self.mysql_client = MySQLClient()
# 初始化 Redis 客户端
self.redis_client = RedisClient()
# 初始化 BM25 搜索
self.bm25_search = BM25Search(self.redis_client, self.mysql_client)
def query(self, query):
# 查询 MySQL 系统
start_time = time.time()
# 记录查询信息
self.logger.info(f"处理查询: '{query}'")
# 执行 BM25 搜索
answer, _ = self.bm25_search.search(query, threshold=0.85)
if answer:
# 记录 MySQL 答案
self.logger.info(f"MySQL 答案: {answer}")
else:
# 记录无答案
self.logger.info("SQL中未找到答案, 需要调用RAG系统")
# 设置默认答案
answer = "SQL未找到答案"
# 计算处理时间
processing_time = time.time() - start_time
# 记录处理时间
self.logger.info(f"查询处理耗时 {processing_time:.2f}秒")
# 返回答案
return answer
def main():
# 初始化 MySQL 系统
mysql_system = MySQLQASystem()
try:
# 打印欢迎信息
print("\n欢迎使用 MySQL 问答系统!")
print("输入查询进行问答,输入 'exit' 退出。")
while True:
# 获取用户输入
query = input("\n输入查询: ").strip()
if query.lower() == "exit":
# 记录退出日志
logger.info("退出 MySQL 系统")
# 打印退出信息
print("再见!")
break
# 执行查询
answer = mysql_system.query(query)
# 打印答案
print(f"\n答案: {answer}")
except Exception as e:
# 记录系统错误
logger.error(f"系统错误: {e}")
# 打印错误信息
print(f"发生错误: {e}")
finally:
# 关闭 MySQL 连接
mysql_system.mysql_client.close()
if __name__ == "__main__":
# 运行主程序
main()
示例运行结果¶
假设MySQL中有数据: - 问题:"特殊符号如何切割",答案:"使用split函数" - 问题:"如何处理字符串",答案:"使用字符串方法"
查询:"特殊符号的切割"
2025-04-01 10:00:00,123 - INFO - MySQL连接成功
2025-04-01 10:00:00,125 - INFO - Redis连接成功
2025-04-01 10:00:00,126 - INFO - BM25模型初始化完成
欢迎使用 MySQL 问答系统!
输入查询进行问答,输入 'exit' 退出。
2025-04-01 10:00:00,127 - INFO - 检索成功,Softmax相似度: 0.892
2025-04-01 10:00:00,128 - INFO - 数据存入Redis: answer:特殊符号的切割
2025-04-01 10:00:00,129 - INFO - MySQL答案: 使用split函数
2025-04-01 10:00:00,130 - INFO - MySQL连接已关闭
总结¶
本章整合Mysql和Redis功能,实现了基于余弦相似度问答的QA系统:
- 流程:MySQL存储数据,Redis缓存优化,TF-IDF和余弦相似度匹配问题。
- 工程化:模块化设计、配置文件、日志记录。