AI应用的幂等性工程2026:让LLM任务在失败重试时不出错
LLM应用在生产环境中面临着普通软件没有的挑战同一个任务被重复执行时可能产生副作用发两次邮件、创建重复记录、扣两次款。幂等性设计是解决这个问题的工程答案。—## 问题的本质LLM应用的非确定性传统软件的幂等性设计已有成熟方案。但LLM应用增加了新的复杂度1.长时间运行的任务LLM的生成过程可能耗时30秒甚至更长网络超时概率更高2.有副作用的工具调用Agent调用API发送邮件、修改数据库、调用支付接口时失败重试会造成重复执行3.多步骤工作流前几步成功但最后一步失败重启会重新执行已完成的步骤4.不确定性输出即使是相同的输入LLM可能生成不同的决策导致幂等性更难保证—## 幂等性的三个层次### 层次一API层幂等性基础确保同一个HTTP请求被多次发送不会产生重复效果pythonimport hashlibimport jsonimport asynciofrom datetime import datetime, timedeltafrom typing import Optional, Callable, Anyimport redis.asyncio as aioredisfrom fastapi import FastAPI, Header, HTTPExceptionfrom pydantic import BaseModelclass IdempotencyManager: API幂等性管理器 def __init__(self, redis_url: str, ttl_hours: int 24): self.redis None self.redis_url redis_url self.ttl ttl_hours * 3600 async def initialize(self): self.redis await aioredis.from_url( self.redis_url, encodingutf-8, decode_responsesTrue ) def _key(self, idempotency_key: str) - str: return fidempotency:{idempotency_key} async def check_and_lock(self, key: str) - Optional[dict]: 检查幂等键 - 如果不存在锁定并返回None允许执行 - 如果存在且已完成返回缓存的结果 - 如果存在且进行中返回processing状态 redis_key self._key(key) # 使用SET NX原子操作避免竞争条件 locked await self.redis.set( f{redis_key}:lock, processing, nxTrue, # 只在不存在时设置 exself.ttl ) if not locked: # 已有请求在处理或已完成 result await self.redis.get(redis_key) if result: return json.loads(result) # 正在处理中 return {status: processing, message: 请求正在处理中请稍后查询结果} return None # 允许继续执行 async def store_result(self, key: str, result: dict): 存储执行结果 redis_key self._key(key) result[completed_at] datetime.now().isoformat() await self.redis.setex( redis_key, self.ttl, json.dumps(result, ensure_asciiFalse) ) async def mark_failed(self, key: str, error: str): 标记失败释放锁允许重试 # 删除lock允许重新尝试 await self.redis.delete(f{self._key(key)}:lock)# FastAPI中的使用app FastAPI()idempotency IdempotencyManager(redis://localhost:6379)app.post(/ai/generate-report)async def generate_report( request: dict, idempotency_key: str Header(None, aliasIdempotency-Key)): 幂等的AI报告生成接口 if not idempotency_key: # 无幂等键直接执行不做幂等保护 return await _do_generate_report(request) # 检查是否重复请求 cached await idempotency.check_and_lock(idempotency_key) if cached: return cached # 返回缓存结果或处理中状态 try: result await _do_generate_report(request) await idempotency.store_result(idempotency_key, result) return result except Exception as e: await idempotency.mark_failed(idempotency_key, str(e)) raiseasync def _do_generate_report(request: dict) - dict: 实际生成报告的逻辑 # ... LLM调用逻辑 pass—### 层次二工具调用幂等性关键当Agent需要调用有副作用的外部工具时幂等性最为关键pythonfrom functools import wrapsfrom typing import Callabledef idempotent_tool(tool_id_fn: Callable None): 工具调用幂等性装饰器 def decorator(func: Callable): wraps(func) async def wrapper(*args, **kwargs): # 生成工具调用的唯一ID if tool_id_fn: call_id tool_id_fn(*args, **kwargs) else: # 默认基于函数名参数的hash key_data f{func.__name__}:{json.dumps(args, defaultstr)}:{json.dumps(kwargs, defaultstr)} call_id hashlib.sha256(key_data.encode()).hexdigest()[:16] redis await aioredis.from_url(redis://localhost:6379) redis_key ftool_call:{call_id} # 检查是否已执行过 existing await redis.get(redis_key) if existing: print(f⚡ 工具 {func.__name__} 已执行过 (id{call_id})返回缓存结果) return json.loads(existing) # 执行工具 result await func(*args, **kwargs) # 缓存结果24小时 await redis.setex( redis_key, 86400, json.dumps(result, defaultstr) ) return result return wrapper return decorator# 使用示例idempotent_tool( tool_id_fnlambda order_id, **kw: fsend_order_email:{order_id})async def send_order_confirmation_email(order_id: str, email: str) - dict: 发送订单确认邮件 - 幂等版本 # 即使被调用多次邮件只会发送一次 await email_service.send( toemail, subjectf订单 {order_id} 确认, templateorder_confirmation ) return {sent: True, order_id: order_id, email: email}—### 层次三工作流检查点复杂任务对于多步骤的LLM工作流需要支持断点续跑pythonimport jsonfrom enum import Enumfrom dataclasses import dataclass, fieldclass StepStatus(Enum): PENDING pending RUNNING running COMPLETED completed FAILED faileddataclassclass WorkflowCheckpoint: 工作流检查点 workflow_id: str steps: dict field(default_factorydict) metadata: dict field(default_factorydict)class CheckpointedWorkflow: 支持检查点的工作流执行器 def __init__(self, workflow_id: str, redis_url: str): self.workflow_id workflow_id self.redis_url redis_url self.checkpoint_key fworkflow_checkpoint:{workflow_id} async def _load_checkpoint(self) - WorkflowCheckpoint: redis await aioredis.from_url(self.redis_url) data await redis.get(self.checkpoint_key) if data: d json.loads(data) return WorkflowCheckpoint(**d) return WorkflowCheckpoint(workflow_idself.workflow_id) async def _save_checkpoint(self, checkpoint: WorkflowCheckpoint): redis await aioredis.from_url(self.redis_url) await redis.setex( self.checkpoint_key, 7 * 86400, # 保存7天 json.dumps({ workflow_id: checkpoint.workflow_id, steps: checkpoint.steps, metadata: checkpoint.metadata }, ensure_asciiFalse) ) async def run_step( self, step_name: str, step_fn: Callable, *args, force_rerun: bool False, **kwargs ) - Any: 执行带检查点的单个步骤 checkpoint await self._load_checkpoint() # 如果步骤已完成且不强制重跑直接返回缓存结果 if not force_rerun and step_name in checkpoint.steps: step_data checkpoint.steps[step_name] if step_data[status] StepStatus.COMPLETED.value: print(f⏭️ 跳过已完成的步骤: {step_name}) return step_data[result] # 标记步骤开始 checkpoint.steps[step_name] { status: StepStatus.RUNNING.value, started_at: datetime.now().isoformat() } await self._save_checkpoint(checkpoint) try: # 执行步骤 result await step_fn(*args, **kwargs) # 标记完成 checkpoint.steps[step_name] { status: StepStatus.COMPLETED.value, result: result, completed_at: datetime.now().isoformat() } await self._save_checkpoint(checkpoint) return result except Exception as e: # 标记失败 checkpoint.steps[step_name] { status: StepStatus.FAILED.value, error: str(e), failed_at: datetime.now().isoformat() } await self._save_checkpoint(checkpoint) raise# 使用示例多步骤AI工作流async def run_analysis_workflow(workflow_id: str, document_url: str): workflow CheckpointedWorkflow(workflow_id, redis://localhost:6379) # 步骤1下载文档失败重试时不会重复下载 document await workflow.run_step( download_document, download_document, document_url ) # 步骤2AI分析失败重试时不会重复调用LLM节省费用 analysis await workflow.run_step( ai_analysis, analyze_with_llm, document ) # 步骤3发送报告失败重试时不会重复发送 await workflow.run_step( send_report, send_analysis_report, analysis, workflow_id ) return analysis—## 实用的幂等键生成策略pythonclass IdempotencyKeyGenerator: 幂等键生成工具 staticmethod def for_business_action(user_id: str, action: str, resource_id: str) - str: 业务操作的幂等键 return f{user_id}:{action}:{resource_id} staticmethod def for_content_hash(content: str) - str: 基于内容的幂等键相同内容只处理一次 return hashlib.sha256(content.encode()).hexdigest() staticmethod def for_time_window(action: str, window_minutes: int 60) - str: 时间窗口幂等键同一时间窗口内只执行一次 window int(time.time() / (window_minutes * 60)) return f{action}:{window} staticmethod def for_llm_request(model: str, messages: list) - str: LLM请求的幂等键 key_data f{model}:{json.dumps(messages, sort_keysTrue)} return hashlib.sha256(key_data.encode()).hexdigest()—## 最佳实践总结1.所有有副作用的工具调用都应该幂等发邮件、支付、发短信——没有例外2.为客户端提供幂等键接口允许客户端携带Idempotency-Keyheader进行重试3.区分安全和非安全操作GET请求天然幂等POST/PUT需要主动设计4.检查点粒度要合理太细会增加开销太粗意味着失败时重跑更多工作5.清理过期的幂等记录设置合理的TTL避免Redis无限增长幂等性是构建可靠LLM应用的基础工程能力在自动化程度越来越高的AI时代这一点的重要性只会越来越高。