【Pocket Flow】源码剖析(三):实战构建——用 100 行框架搭建 ReAct Agent 与 RAG
【Pocket Flow】源码剖析三实战构建——用 100 行框架搭建 ReAct Agent 与 RAG写在前面前两篇我们拆完了 Pocket Flow 的核心抽象Node/Flow/Shared Store和批量/异步体系。今天我们进入最落地的部分——用这 100 行框架搭建真实应用。Pocket Flow 的官方文档列出了四大设计模式Agent、RAG、MapReduce、Multi-Agent。这些模式不是示例代码——它们是 Pocket Flow 100 行代码的压力测试。如果这些模式能用 100 行代码优雅实现那这个框架就经得起考验。今天我们就来验证这一点逐行写出 ReAct Agent 和 RAG 的完整实现。 文章目录 一、四大设计模式100 行代码的压力测试 二、ReAct Agentaction 驱动的决策循环 三、RAG 管道BatchNode BatchFlow 的组合拳️ 四、MapReduce分而治之的经典模式 五、Multi-AgentAsyncNode Queue 的协作模式 六、系列终章回顾 一、四大设计模式100 行代码的压力测试1.1 为什么是这四种模式Pocket Flow 的官方文档列出了四种核心设计模式这不是随意选择的——它们分别对应了 LLM 应用的四个核心能力维度模式核心能力对应 Pocket Flow 机制Agent自主决策 工具调用条件边 循环action 路由RAG知识检索 上下文增强BatchNode BatchFlowMapReduce分而治之 并行处理BatchNode 普通 NodeMulti-Agent多智能体协作AsyncNode asyncio.Queue这四种模式覆盖了 90% 以上的 LLM 应用场景。更关键的是它们全部可以用 Pocket Flow 的 100 行核心代码实现——不需要任何扩展不需要任何插件。1.2 一个关键洞察你会发现这四种模式的实现代码没有一行是在绕过框架的限制。相反每一行都在利用框架的设计。这说明 Pocket Flow 的抽象层级选对了——它恰好处于足够低以覆盖所有场景和足够高以避免重复代码的甜蜜点。 二、ReAct Agentaction 驱动的决策循环2.1 ReAct 的本质ReActReasoning Acting是当前最主流的 Agent 模式。它的核心逻辑极其简单LLM 思考 → 决定是否调用工具 → 调用工具 → 再思考 → 直到得出最终答案。这个循环在 LangGraph 中需要 StateGraph PregelLoop Channel 版本追踪才能实现但在 Pocket Flow 中只需要条件边 while 循环。2.2 完整实现classDecideAction(Node):LLM 决策节点判断下一步做什么defprep(self,shared):returnshared.get(question,),shared.get(context,)defexec(self,inputs):question,contextinputs promptfYou are an assistant. Given the question and context, decide: - If you need to search the web, return search - If you can answer directly, return answer Question:{question}Context:{contextorNone}Return ONLY search or answer:returncall_llm(prompt).strip().lower()defpost(self,shared,prep_res,exec_res):shared[action]exec_resreturnexec_res# ← action 驱动路由classSearchWeb(Node):搜索工具节点defprep(self,shared):returnshared[question]defexec(self,question):resultssearch_web(question)returnresultsdefpost(self,shared,prep_res,exec_res):shared[context]exec_resreturndefault# ← 回到决策节点# 构建图decideDecideAction()searchSearchWeb()decide-searchsearch# 条件边actionsearch → SearchWebdecide-answerNone# 条件边actionanswer → 终止searchdecide# 默认边搜索后回到决策flowFlow(startdecide)# 运行shared{question:What is the weather in Beijing?}flow.run(shared)print(shared.get(context,No answer))2.3 逐行解读DecideAction是整个 Agent 的大脑。它的post方法返回exec_res即 LLM 输出的 “search” 或 “answer”这个返回值就是action——Flow 的 while 循环根据它决定下一步走哪条边。SearchWeb是工具节点。它的post返回default这意味着搜索完成后Flow 会沿着默认边回到decide节点形成循环。图构建用了 Pocket Flow 的 DSL 语法糖decide - search search等价于decide.next(search, search)。decide - answer None表示 action“answer” 时没有后继节点while 循环终止。2.4 执行流程Step 1: DecideAction → LLM 判断需要搜索 → actionsearch Step 2: SearchWeb → 执行搜索 → actiondefault → 回到 DecideAction Step 3: DecideAction → LLM 基于搜索结果回答 → actionanswer → 结束整个循环只用了3 个类、5 行图构建代码。对比 LangGraph 的实现需要 StateGraph add_messages conditional_edge PregelLoopPocket Flow 的代码量少了 10 倍以上。 三、RAG 管道BatchNode BatchFlow 的组合拳3.1 RAG 的两阶段架构RAGRetrieval-Augmented Generation是 LLM 应用中最常见的架构之一。它分为两个阶段Offline 阶段将文档分块 → 生成 Embedding → 存入向量库Online 阶段用户提问 → 检索相关文档 → LLM 基于上下文生成答案Pocket Flow 的 BatchNode 和 BatchFlow 天然适合这两个阶段——BatchNode 处理同逻辑不同数据的分块/EmbeddingBatchFlow 处理同流程不同参数的多文档管道。3.2 Offline 阶段BatchNode 分块 EmbeddingclassChunkDocuments(BatchNode):将文档分块defprep(self,shared):contentshared[document]chunk_size1000chunks[content[i:ichunk_size]foriinrange(0,len(content),chunk_size)]returnchunksdefexec(self,chunk):returnchunk# 逐块返回defpost(self,shared,prep_res,exec_res_list):shared[chunks]exec_res_listreturndefaultclassEmbedChunks(BatchNode):为每个分块生成 Embeddingdefprep(self,shared):returnshared[chunks]defexec(self,chunk):returnembed(chunk)# 调用 Embedding APIdefpost(self,shared,prep_res,exec_res_list):shared[embeddings]list(zip(shared[chunks],exec_res_list))returndefault# 构建图chunkChunkDocuments()embedEmbedChunks()chunkembed offline_flowFlow(startchunk)# 运行shared{document:Very long document text...}offline_flow.run(shared)# shared[embeddings] [(chunk1, emb1), (chunk2, emb2), ...]3.3 Online 阶段线性管道classRetrieveDocs(Node):检索相关文档defprep(self,shared):returnshared[query]defexec(self,query):query_embembed(query)resultsvector_search(query_emb,top_k3)returnresultsdefpost(self,shared,prep_res,exec_res):shared[context]exec_resreturndefaultclassGenerateAnswer(Node):基于上下文生成答案defprep(self,shared):returnshared[query],shared[context]defexec(self,inputs):query,ctxinputs promptfContext:{ctx}\nQuestion:{query}\nAnswer:returncall_llm(prompt)defpost(self,shared,prep_res,exec_res):shared[answer]exec_resreturndefault# 构建图retrieveRetrieveDocs()generateGenerateAnswer()retrievegenerate online_flowFlow(startretrieve)# 运行shared{query:What is Pocket Flow?}online_flow.run(shared)print(shared[answer])3.4 多文档处理BatchFlow如果你有 10 个文档需要分别处理可以用 BatchFlowclassDocPipeline(BatchFlow):批量处理多个文档defprep(self,shared):return[{filename:f}forfinshared[filenames]]# 子节点通过 self.params[filename] 获取当前文件classLoadFile(Node):defprep(self,shared):returnself.params[filename]# ← 来自 BatchFlow 的参数defexec(self,filename):returnopen(filename).read()defpost(self,shared,prep_res,exec_res):shared[self.params[filename]]exec_resreturndefaultloadLoadFile()processChunkDocuments()EmbedChunks()loadprocess doc_pipeDocPipeline(startload)shared{filenames:[a.txt,b.txt,c.txt]}doc_pipe.run(shared)️ 四、MapReduce分而治之的经典模式4.1 MapReduce 的本质MapReduce 是大数据领域的经典模式把大任务拆成小任务Map分别处理后合并结果Reduce。在 LLM 场景下最常见的用例是长文档摘要——把长文档分块每块单独摘要最后合并所有摘要。4.2 完整实现classMapSummaries(BatchNode):Map 阶段逐块摘要defprep(self,shared):contentshared[document]chunk_size10000return[content[i:ichunk_size]foriinrange(0,len(content),chunk_size)]defexec(self,chunk):promptfSummarize this in 10 words:{chunk}returncall_llm(prompt)defpost(self,shared,prep_res,exec_res_list):shared[summaries]exec_res_listreturndefaultclassReduceSummaries(Node):Reduce 阶段合并摘要defprep(self,shared):returnshared[summaries]defexec(self,summaries):combined\n.join(summaries)promptfCombine these summaries into one:{combined}returncall_llm(prompt)defpost(self,shared,prep_res,exec_res):shared[final_summary]exec_resreturndefault# 构建图map_nodeMapSummaries()reduce_nodeReduceSummaries()map_nodereduce_node flowFlow(startmap_node)# 运行shared{document:Very long document...}flow.run(shared)print(shared[final_summary])4.3 并行 Map替换为 AsyncParallelBatchNode如果摘要任务可以并行不需要保序只需把MapSummaries的基类从BatchNode改为AsyncParallelBatchNode把exec改为exec_asyncclassParallelMapSummaries(AsyncParallelBatchNode):并行 Mapasyncio.gather 同时摘要所有分块asyncdefprep_async(self,shared):contentshared[document]chunk_size10000return[content[i:ichunk_size]foriinrange(0,len(content),chunk_size)]asyncdefexec_async(self,chunk):promptfSummarize this in 10 words:{chunk}returnawaitcall_llm_async(prompt)asyncdefpost_async(self,shared,prep_res,exec_res_list):shared[summaries]exec_res_listreturndefault改动量3 行基类替换 async/await。这就是 Pocket Flow 正交组合的威力——BatchNode 和 AsyncNode 的功能是正交的可以自由组合。 五、Multi-AgentAsyncNode Queue 的协作模式5.1 Multi-Agent 的核心挑战Multi-Agent 的核心挑战是通信——两个 Agent 如何互相传递消息在 LangGraph 中这需要 Subgraph Channel State 合并。在 Pocket Flow 中只需要asyncio.Queue。5.2 Taboo 游戏示例importasyncioclassAsyncHinter(AsyncNode):出题 Agent生成提示避免禁忌词asyncdefprep_async(self,shared):guessawaitshared[hinter_queue].get()ifguessGAME_OVER:returnNonereturnshared[target_word],shared[forbidden_words]asyncdefexec_async(self,inputs):ifinputsisNone:returnNonetarget,forbiddeninputs promptfGenerate hint for {target}\nForbidden:{forbidden}\nMax 5 words:returnawaitcall_llm_async(prompt)asyncdefpost_async(self,shared,prep_res,exec_res):ifexec_resisNone:returnendawaitshared[guesser_queue].put(exec_res)returncontinueclassAsyncGuesser(AsyncNode):猜词 Agent根据提示猜词asyncdefprep_async(self,shared):hintawaitshared[guesser_queue].get()returnhintasyncdefexec_async(self,hint):promptfGuess the word from hint:{hint}returnawaitcall_llm_async(prompt)asyncdefpost_async(self,shared,prep_res,exec_res):awaitshared[hinter_queue].put(exec_res)shared[past_guesses]shared.get(past_guesses,[])[exec_res]returncontinue# 运行shared{target_word:elephant,forbidden_words:[trunk,big,animal],hinter_queue:asyncio.Queue(),guesser_queue:asyncio.Queue(),past_guesses:[]}awaitshared[hinter_queue].put(start)# 触发开始hinterAsyncHinter(max_retries3)guesserAsyncGuesser(max_retries3)hinter-continuehinter hinter-endNoneguesserhinter flowAsyncFlow(starthinter)awaitflow.run_async(shared)5.3 为什么 Queue 就够了Pocket Flow 的 Shared Store 是一个普通 dict而 asyncio.Queue 可以直接放进 dict。两个 Agent 通过queue.get()等待对方的消息通过queue.put()发送自己的消息。这比 LangGraph 的 Subgraph Channel 方案简单 10 倍而且完全利用了 Python 标准库的能力——不需要框架做任何额外支持。 六、系列终章回顾三篇文章我们从 100 行代码的骨架到实战应用完整拆解了 Pocket Flow 的设计哲学和实现细节。最后用一张表回顾全系列全系列核心概念速查篇目核心概念一句话总结第一篇Node Flow Shared Store三大抽象覆盖所有场景action 字符串驱动路由第二篇Batch Async Parallel6 个类覆盖从串行到并行的完整光谱第三篇Agent RAG MapReduce Multi-Agent四大设计模式验证 100 行代码的完备性Pocket Flow vs LangGraph vs OpenClaw 终极对比维度Pocket FlowLangGraphOpenClaw代码量100 行3 万行43 万行核心抽象Node Flow dictStateGraph Pregel ChannelGateway Agent Channel状态管理dict 直写Channel ReducerLane Queue SessionKey路由机制action 字符串版本追踪 条件边Gateway 路由表并行支持asyncio.gatherBSP SuperstepLane Queue 串行化持久化无CheckpointMemory SQLite适用场景原型 / 教学 / Agentic Coding生产 / 复杂工作流生产 / 多平台 Agent学习曲线10 分钟1-2 天1 周一句话总结Pocket Flow 用 100 行代码证明LLM 应用的核心只需要三个抽象——Node执行、Flow编排、Shared Store通信。四大设计模式Agent/RAG/MapReduce/Multi-Agent全部可以用这三个抽象优雅实现。它不是简陋而是精准——精准地找到了抽象的甜蜜点。当你需要快速验证想法Pocket Flow 是最快的起点当你需要生产级保障LangGraph 和 OpenClaw 是更安全的选择。但无论你选择哪个框架理解 Pocket Flow 的设计哲学都会让你成为更好的 Agent 工程师。参考链接Pocket Flow GitHub 仓库Pocket Flow Agent 模式文档Pocket Flow RAG 模式文档Pocket Flow MapReduce 模式文档Pocket Flow Multi-Agent 模式文档Flow State 论文 (arXiv:2504.03771)