RAG 入门项目项目简介RAG检索增强生成核心分为离线处理与在线处理两条主线离线处理持续向私有向量知识库补充私有知识文档可纳入模型训练截止后的最新资料为模型提供参考依据从根源上规避幻觉问题在线处理用户提问时先基于私有知识库检索相关参考资料再将资料与问题组装为新提示词送入大模型生成精准、可靠的回答本次项目以京东服饰类商品为示例基于衣服相关属性构建本地知识库支持用户自由更新、维护本地知识后续用户提问时系统将仅基于这份本地知识进行检索与回答确保答案来源可追溯、内容可靠可控项目结构目录结构目录用途data/存储服饰类本地知识文档尺码推荐、洗涤养护、颜色选择等作为知识库源数据chroma_db/Chroma 向量数据库持久化目录存储向量化后的服饰知识chat_history/存储用户对话历史支持多轮对话记忆代码模块入口层Streamlit Web 应用app_file_upload.py知识库更新前端支持用户上传 / 更新服饰知识文档可视化维护本地知识库app_qa.py项目主程序启动对话 Web 页面提供用户提问、智能问答交互界面配置与工具层config_data.py全局配置文件统一管理嵌入模型、向量库路径等参数file_history_store.py长期会话记忆存储服务记录用户对话历史支持上下文连贯问答知识库与向量层knowledge_base.py知识库更新服务负责文档加载、分割、向量化将服饰知识写入向量库vector_stores.py向量存储服务封装 Chroma 向量库的初始化、增删、相似性检索等核心操作RAG 核心层rag.pyRAG 核心服务串联向量检索、提示词组装、大模型生成实现基于本地服饰知识的问答项目需求离线流程用户交互入口用户通过 Streamlit 搭建的 Web 网页上传服饰类知识文档由 app_file_upload.py 作为前端主程序承接操作文件读取与实例化页面通过 st.file_uploader() 组件接收用户上传的文件再通过 uploader_file.get_value() 读取文件内容利用 st.session_state 实例化 KnowledgeBaseService 服务完成前后端数据流转MD5 去重校验KnowledgeBaseService 类首先调用 get_string_md5() 计算文件内容的 MD5 值再通过 check_md5() 与 md5.text 中记录的历史文件哈希值比对校验文件是否重复避免重复入库校验通过后调用 save_md5() 将新文件的 MD5 值写入 md5.text 留存记录MD5 是一种消息摘要算法哈希算法能把任意长度的文件 / 文本计算出一个固定长度32 位十六进制的唯一 数字指纹文档处理与入库校验通过后调用 upload_by_str() 方法结合 config_data.py 配置的文本分割器self.splitter对文档进行语义切分再通过嵌入模型将文本转为向量最终存入 Chroma 本地持久化向量库self.chroma完成私有服饰知识库的安全、动态更新在线流程用户交互入口用户通过 app_qa.py 启动的 Streamlit 聊天 Web 页面发起提问页面承接用户输入并将问题传入后端服务向量检索准备vector_stores.py 中的 VectorStoreService 类通过 get_retriever 方法将 Chroma 向量库封装为符合 LCEL 标准的检索器为 RAG 链提供检索能力RAG 链构建与执行rag.py 中的 RagService 类作为核心通过 __get_chain() 方法构建完整 RAG 执行链串联向量检索服务self.vector_service、提示词模板self.prompt_template、大语言模型self.chat_model同时集成 FileChatMessageHistory 会话记忆服务支持多轮对话上下文留存问答生成与返回用户提问触发 self.chain.invoke() 执行链链先调用检索器从 Chroma 向量库召回服饰相关知识结合历史对话组装提示词送入大模型生成基于本地知识的回答最终通过 Streamlit 聊天 UI 展示给用户会话记忆管理file_history_store.py 中的 FileChatMessageHistory 类通过 add_messages、messages、clear 三个方法实现对话历史的持久化存储、读取与清空保障多轮问答的上下文连贯性离线流程开发文本上传 Web 服务Streamlit是面向数据科学家的快速 Web 框架通过极简 API 实现 Python 代码的实时 Web 化展示支持快速搭建可视化应用无需 HTML/CSS/JSpip install streamlitapp_file_uploader.py 基于Streamlit完成Web网页上传服务 import streamlit as st # 添加网页标题 st.title(知识库更新服务) # 创建文件上传组件 uploader_file st.file_uploader( 请上传 TXT 文件: , # 上传框提示文字 type[txt], # 只允许上传 txt 格式文件 accept_multiple_filesFalse, # 只允许上传一个文件 ) # 如果用户已经上传文件 if uploader_file is not None: # 获取上传文件的名称 file_name uploader_file.name # 获取上传文件的 MIME 类型 file_type uploader_file.type # 获取上传文件大小并换算为 KB file_size uploader_file.size / 1024 # 显示文件名 st.subheader(f文件名: {file_name}) # 显示文件类型和文件大小 st.write(f**文件格式:** {file_type} nbsp;nbsp;nbsp; **文件大小:** {file_size:.2f} KB) # 读取文件字节内容并按 UTF-8 解码 text uploader_file.getvalue().decode(utf-8) # 在网页中显示文件内容 st.write(**文件内容:**) st.write(text)在终端中切换到当前项目路径下执行命令启动streamlit run app_file_uploader.py文件校验与 MD5 工具config_datamd5_path ./md5.textknowledge_base 知识库 import os import config_data as config import hashlib def check_md5(md5_str: str) - bool: 检查这个 md5 是否已经保存过 # 如果保存 md5 的文件不存在就先创建一个空文件 if not os.path.exists(config.md5_path): open(config.md5_path, w, encodingutf-8).close() # 文件刚创建说明这个 md5 以前没有处理过 return False else: # 逐行读取文件中的 md5 记录 for line in open(config.md5_path, r, encodingutf-8).readlines(): # strip() 用来去掉字符串两边的空格和换行符 if md5_str line.strip(): # 如果找到了相同的 md5说明已经处理过 return True # 如果所有内容都检查完了还没找到说明没有处理过 return False def save_md5(md5_str: str): 把 md5 保存到文件中 # a 表示追加写入不会覆盖原来的内容 with open(config.md5_path, a, encodingutf-8) as f: # 每个 md5 保存一行 f.write(md5_str \n) def get_string_md5(input_str: str, encodingutf-8) - str: 把普通字符串转换成 md5 字符串 # 先把字符串按指定编码转换成字节类型 str_bytes input_str.encode(encodingencoding) # 创建一个 md5 对象 md5_str hashlib.md5() # 把字节内容传给 md5 对象进行处理 md5_str.update(str_bytes) # 获取最终生成的 md5 十六进制字符串 md5_hex md5_str.hexdigest() # 返回 md5 结果 return md5_hex class KnowledgeBaseService(object): pass测试知识库更新服务config_data# 定义 Chroma 中要使用的集合名称 collection_name rag # 定义 Chroma 数据持久化保存的本地文件夹 persist_directory ./chroma_db # 文本切分时每个片段的最大长度 chunk_size 1000 # 相邻文本片段之间重复保留的字符数 chunk_overlap 100 # 文本切分时优先使用的分隔符列表 separators [\n\n, \n, ., !, ?, 。, , , , ] # 控制是否需要切分文本的字符数阈值 max_split_char_number 1000knowledge_baseclass KnowledgeBaseService(object): 知识库服务类用来完成文本入库 def __init__(self): # 如果数据目录不存在就自动创建出来 os.makedirs(config.persist_directory, exist_okTrue) # 初始化 Chroma 向量数据库对象 self.chroma Chroma( collection_nameconfig.collection_name, # 指定集合名称 embedding_functionDashScopeEmbeddings(modeltext-embedding-v4), # 指定向量模型 persist_directoryconfig.persist_directory, # 指定本地持久化目录 ) # 初始化文本切分器用来把长文本拆成小片段 self.spliter RecursiveCharacterTextSplitter( chunk_sizeconfig.chunk_size, # 每个文本片段的最大长度 chunk_overlapconfig.chunk_overlap, # 相邻片段之间重复保留的长度 separatorsconfig.separators, # 按这些分隔符优先切分文本 length_functionlen # 使用 len 统计文本长度 ) def upload_by_str(self, data: str, filename): 把字符串内容上传到知识库中 # 根据文本内容生成 md5 值用来判断是否重复上传 md5_hex get_string_md5(data) # 如果这个 md5 已经存在说明内容之前处理过 if check_md5(md5_hex): return 内容已存在知识库中 # 如果文本太长就先切分成多个小片段 if len(data) config.max_split_char_number: knowledge_chunks: list[str] self.spliter.split_text(data) else: # 如果文本不长就直接作为一个片段保存 knowledge_chunks [data] # 为每个文本片段准备元数据 metadata { source: filename, # 记录数据来源文件名 create_time: datetime.now().strftime(%Y-%m-%d %H:%M:%S), # 记录创建时间 operator: 黑子, # 记录操作人 } # 把文本片段和对应的元数据一起写入向量库 self.chroma.add_texts( knowledge_chunks, # 文本内容列表 metadatas[metadata for _ in knowledge_chunks], # 元数据列表 ) # 保存当前内容的 md5避免以后重复入库 save_md5(md5_hex) # 返回上传成功提示 return 内容已经成功载入向量库 if __name__ __main__: service KnowledgeBaseService() r service.upload_by_str(蔡徐坤, testfile) print(r)第一次执行第二次执行向量库持久化与实例保存由于 Streamlit 每次交互、刷新都会重新运行全部代码变量会被清空重置所以需要用streamlit.session_state保存 KnowledgeBaseService 服务实例只创建一次避免重复初始化、保证程序稳定运行app_file_uploader.py 基于Streamlit完成Web网页上传服务 import time import streamlit as st from knowledge_base import KnowledgeBaseService # 添加网页标题 st.title(知识库更新服务) # 创建文件上传组件 uploader_file st.file_uploader( 请上传 TXT 文件: , # 上传框提示文字 type[txt], # 只允许上传 txt 格式文件 accept_multiple_filesFalse, # 只允许上传一个文件 ) if service not in st.session_state: # st.session_state 可以理解成当前网页会话中的一个字典 # 这里先判断 session_state 里有没有名为 service 的数据 # 如果没有说明当前用户这次访问页面时还没有创建过知识库服务对象 st.session_state[service] KnowledgeBaseService() # 创建一个 KnowledgeBaseService 对象并把它保存到 session_state 中 # 这样后面再次使用时就可以直接从 session_state 里取出来, 而不需要每次页面重新运行都重新创建一次对象 # 如果用户已经上传文件 if uploader_file is not None: # 获取上传文件的名称 file_name uploader_file.name # 获取上传文件的 MIME 类型 file_type uploader_file.type # 获取上传文件大小并换算为 KB file_size uploader_file.size / 1024 # 显示文件名 st.subheader(f文件名: {file_name}) # 显示文件类型和文件大小 st.write(f**文件格式:** {file_type} nbsp;nbsp;nbsp; **文件大小:** {file_size:.2f} KB) # 读取文件字节内容并按 UTF-8 解码 text uploader_file.getvalue().decode(utf-8) with st.spinner(正在载入中...): # with st.spinner(...) 表示: # 在这个代码块执行期间页面会显示“正在载入中...”的转圈提示 # 当 with 里面的代码全部执行完成后这个转圈提示会自动消失 time.sleep(1) # 让程序暂停 1 秒主要是为了让加载动画更明显一些 result st.session_state[service].upload_by_str(text, file_name) # 从 session_state 中取出之前保存好的知识库服务对象 # 然后调用 upload_by_str() 方法把文件内容 text 和文件名 file_name 传进去 # 这个方法一般会负责去重、切分文本、写入向量库并返回处理结果 st.write(result)在线流程开发向量存储服务config_datasimilarity_threshold 1 # 检索返回匹配的文档数量vector_storesfrom langchain_chroma import Chroma import config_data as config class VectorStoreService: 向量库服务类用来创建检索器 def __init__(self, embedding): # 创建 Chroma 向量数据库对象。 self.vector_store Chroma( collection_nameconfig.collection_name, # 集合名称 embedding_functionembedding, # 向量模型 persist_directoryconfig.persist_directory, # 持久化目录 ) def get_retriever(self): # 把向量库转换成检索器对象 return self.vector_store.as_retriever( search_kwargs{k: config.similarity_threshold} # k 表示返回最相似的前几个结果 )RAG 服务config_dataembedding_model_name text-embedding-v4 chat_model_name qwen3-maxragfrom langchain_core.documents import Document from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough from vector_stores import VectorStoreService from langchain_community.embeddings import DashScopeEmbeddings import config_data as config from langchain_core.prompts import ChatPromptTemplate from langchain_community.chat_models.tongyi import ChatTongyi def print_prompt(prompt): print(*20) print(prompt.to_string()) print(*20) return prompt class RagService(object): RAG 服务类用来完成“检索 生成”问答 def __init__(self): # 创建向量库服务对象用来从知识库中检索相关内容 self.vector_service VectorStoreService( embeddingDashScopeEmbeddings(modelconfig.embedding_model_name) ) # 定义提示词模板 # system 表示系统提示词告诉模型回答时要参考提供的资料 # user 表示用户真正输入的问题 self.prompt_template ChatPromptTemplate( [ (system, 以我提供的已知参考资料为主, 简洁和专业的回答用户问题; 参考资料:{context}), (user, 请回答用户提问: {input}) ] ) # 创建聊天大模型对象 self.chat_model ChatTongyi(modelconfig.chat_model_name) # 初始化完整的调用链 self.chain self.__get_chain() def __get_chain(self): # 获取检索器对象用来从向量库中查找相关文档 retriever self.vector_service.get_retriever() def format_document(docs: list[Document]): 把检索到的文档列表整理成一个字符串 # 如果没有检索到相关文档就返回默认提示 if not docs: return 无相关参考资料 # 用字符串保存整理后的参考资料内容 formatted_str # 依次遍历每一个检索到的文档 for doc in docs: # 把文档内容和元数据拼接到字符串中 formatted_str f文档片段: {doc.page_content}\n文档元数据: {doc.metadata}\n\n # 返回最终整理好的参考资料字符串 return formatted_str # 构建 RAG 调用链 chain ( { # input 直接接收用户输入的问题。 input: RunnablePassthrough(), # context 先用检索器查找相关文档再格式化成字符串 context: retriever | format_document } # 把 input 和 context 填充到提示词模板中 | self.prompt_template # 打印最终发送给模型的提示词方便调试 | print_prompt # 调用聊天模型生成回答 | self.chat_model # 把模型输出结果解析成普通字符串 | StrOutputParser() ) # 返回构建好的调用链 return chain历史会话记录file_history_storeimport os from typing import Sequence from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.messages import BaseMessage, message_to_dict, messages_from_dict # 实现通过会话id获取InMemoryChatMessageHistory类对象 def get_history(session_id): return FileChatMessageHistory(session_id, ./chat_history) class FileChatMessageHistory(BaseChatMessageHistory): def __init__(self, session_id, storage_path): self.session_id session_id # session_id 用来区分不同会话各自的历史记录 self.storage_path storage_path # storage_path 表示所有会话历史统一存放的目录 self.file_path os.path.join(self.storage_path, self.session_id) # file_path 是当前会话最终读写的文件路径 os.makedirs(os.path.dirname(self.file_path), exist_okTrue) # 目录不存在时先创建避免后面写文件报错 def add_messages(self, messages: Sequence[BaseMessage]) - None: all_messages self.messages # self.messages 会先从文件中读出当前会话已有的消息列表 all_messages.extend(messages) # 把这次新增的消息追加到已有历史后面 List[消息, 消息...] new_messages [message_to_dict(message) for message in all_messages] # 先把消息对象转成普通字典json 才能处理 List[字典, 字典...] with open(self.file_path, w, encodingutf-8) as f: json.dump(new_messages, f) # json.dump 会把 Python 对象字典写成 JSON 文本保存到文件中 property def messages(self) - list[BaseMessage]: try: with open(self.file_path, r, encodingutf-8) as f: messages_data json.load(f) # json.load 会把文件里的 JSON 文本解析成 Python 的 list[dict] return messages_from_dict(messages_data) # 再把字典列表还原成 LangChain 的消息对象列表 except FileNotFoundError: return [] # 文件不存在说明还没有历史记录所以直接返回空列表 def clear(self) - None: with open(self.file_path, w, encodingutf-8) as f: json.dump([], f) # 清空历史的本质就是把文件内容重置为空列表ragclass RagService(object): RAG 服务类用来完成“检索 生成”问答 def __init__(self): # 创建向量库服务对象用来从知识库中检索相关内容 self.vector_service VectorStoreService( embeddingDashScopeEmbeddings(modelconfig.embedding_model_name) ) # 定义提示词模板 # system 表示系统提示词告诉模型回答时要参考提供的资料 # user 表示用户真正输入的问题 self.prompt_template ChatPromptTemplate( [ (system, 以我提供的已知参考资料为主, 简洁和专业的回答用户问题; 参考资料:{context}), (system, 并且我提供用户的对话历史记录, 如下: ), MessagesPlaceholder(history), (user, 请回答用户提问: {input}) ] ) # 创建聊天大模型对象 self.chat_model ChatTongyi(modelconfig.chat_model_name) # 初始化完整的调用链 self.chain self.__get_chain() def __get_chain(self): # 获取检索器对象用来从向量库中查找相关文档 retriever self.vector_service.get_retriever() def format_document(docs: list[Document]): 把检索到的文档列表整理成一个字符串 # 如果没有检索到相关文档就返回默认提示 if not docs: return 无相关参考资料 # 用字符串保存整理后的参考资料内容 formatted_str # 依次遍历每一个检索到的文档 for doc in docs: # 把文档内容和元数据拼接到字符串中 formatted_str f文档片段: {doc.page_content}\n文档元数据: {doc.metadata}\n\n # 返回最终整理好的参考资料字符串 return formatted_str # 定义一个函数专门给检索器准备输入数据 # 因为当前链路中的输入 value 是一个字典, # 但 retriever 只需要用户当前输入的问题字符串 def format_for_retriever(value: dict) - str: # 从输入字典中取出 input 字段也就是用户当前的问题 return value[input] # 定义一个函数专门给提示词模板整理输入数据 def format_for_prompt_template(value): # 这里传进来的 value 是前面并行处理后的结果, # 它大致长这样: # { # input: {input: 用户问题, history: 历史消息}, # context: 检索得到的参考资料 # } # 但提示词模板更适合接收一个扁平一点的字典: # { # input: 用户问题, # context: 检索得到的参考资料, # history: 历史消息 # } new_value {} # 取出用户当前输入的问题 new_value[input] value[input][input] # 取出检索到的参考资料内容 new_value[context] value[context] # 取出历史聊天记录 new_value[history] value[input][history] # 返回整理后的新字典 return new_value # 构建基础调用链 chain ( { # input 部分直接原样保留输入数据 # RunnablePassthrough() 的作用就是不做处理直接传下去 input: RunnablePassthrough(), # context 部分负责生成参考资料 # 执行流程是 # format_for_retriever: 从输入字典中取出用户问题 # retriever: 根据问题去向量库检索相关文档 # format_document: 把检索到的文档整理成字符串 context: RunnableLambda(format_for_retriever) | retriever | format_document } # 把上面得到的结果重新整理成提示词模板需要的格式 | RunnableLambda(format_for_prompt_template) # 将 input、context、history 填充到提示词模板中 | self.prompt_template # 打印最终发送给大模型的提示词, 方便调试查看 | print_prompt # 调用聊天模型生成回答 | self.chat_model # 把模型输出结果解析成普通字符串 | StrOutputParser() ) # 给基础调用链加上消息历史功能 conversation_chain RunnableWithMessageHistory( chain, get_history, # 指定当前用户输入消息对应的字段名是 input input_messages_keyinput, # 指定历史消息对应的字段名是 history history_messages_keyhistory, ) # 返回带有多轮对话记忆功能的链 return conversation_chain聊天页面开发app_qafrom rag import RagService import streamlit as st import config_data as config # 设置网页标题 st.title(智能客服) # 在标题下方添加一条分隔线 st.divider() # 判断 session_state 中是否已经有 message # message 用来保存当前会话中的聊天记录 if message not in st.session_state: # 如果没有, 就先放入一条助手的欢迎消息 # role 表示消息发送者身份, content 表示消息内容 st.session_state[message] [{role: assistant, content: 你好有什么可以帮助你}] # 判断 session_state 中是否已经创建过 RAG 服务对象 if rag not in st.session_state: # 如果没有, 就创建一个 RagService 对象并保存起来 # 这样后面每次提问时都可以直接复用, 不需要反复创建 st.session_state[rag] RagService() # 遍历历史消息列表把之前的聊天内容显示到页面上 for message in st.session_state[message]: # 根据消息角色创建对应的聊天气泡并显示消息内容 # message[role] 一般是 user 或 assistant # message[content] 是具体的聊天文本 st.chat_message(message[role]).write(message[content]) # 在页面最下方显示一个聊天输入框, 等待用户输入问题 prompt st.chat_input() # 如果用户输入了内容并按下发送, 就进入这个分支 if prompt: # 先把用户当前输入的问题显示在页面上 st.chat_message(user).write(prompt) # 再把这条用户消息追加到 session_state[message] 中 # 这样页面下次重新执行时, 这条消息还能继续显示 st.session_state[message].append({role: user, content: prompt}) # 用来保存 AI 流式输出的每一小段内容 # 最后会把这些小段拼接成一整段完整回复 ai_res_list [] # 显示AI思考中...的加载动画 # 在 with 代码块执行期间页面会一直显示这个提示 with st.spinner(AI思考中...): # 调用 RAG 链的 stream() 方法, 以流式方式生成回答 # {input: prompt} 表示把用户问题作为输入传给链 # 返回值 res_stream 是一个生成器, 会一点一点产出模型回复内容 res_stream st.session_state[rag].chain.stream({input: prompt}, config.session_config) # 定义一个包装生成器的函数 # generator 是原始流式输出生成器 # cache_list 用来保存每次生成的内容片段 def capture(generator, cache_list): # 逐个读取生成器返回的内容片段 for chunk in generator: # 把当前片段保存到列表中, 方便后面拼接完整答案 cache_list.append(chunk) # 再把当前片段继续 yield 出去, 交给前端实时显示 yield chunk # 创建 assistant 角色的聊天气泡, 并用 write_stream() 流式显示回答 # capture() 一边缓存内容, 一边把内容返回给页面 st.chat_message(assistant).write_stream(capture(res_stream, ai_res_list)) # 等所有流式内容输出完成后 # 把 ai_res_list 中的所有片段拼接成完整字符串 # 然后把这条 AI 回复保存到聊天记录中 st.session_state[message].append({role: assistant, content: .join(ai_res_list)})在终端中切换到当前项目路径下执行命令启动streamlit run app_qa.py