大模型上下文压缩工程2026:让100K Token的信息塞进4K窗口
超长上下文固然好但它带来高成本、高延迟和注意力稀释问题。本文深入探讨如何通过智能压缩技术在有限上下文窗口内保留最大信息量实现质量与效率的最优平衡。—## 上下文窗口的本质矛盾表面上看模型支持的上下文窗口越来越大——Gemini 2.5 Pro支持100万TokenClaude 3.7支持20万Token。但工程实践中“能用和好用是两回事问题一成本爆炸GPT-4o每百万Token输入成本$5一次包含10万Token的请求就要$0.5。系统每天处理1万次请求单日费用就是$5000。问题二Lost in the Middle现象研究表明LLM对超长上下文中间位置的信息注意力显著下降。即使你把所有内容都放进去模型可能漏看关键信息。问题三延迟变长上下文越长首Token延迟TTFT越高。100K Token的请求可能需要等待5-10秒才开始输出用户体验极差。解决方案上下文压缩——在不丢失关键信息的前提下减少输入Token数量。—## 五大压缩技术详解### 1. 摘要压缩Summarization Compression最直观的方法把长对话历史压缩为摘要。pythonfrom openai import AsyncOpenAIfrom typing import listclass ConversationCompressor: 对话历史压缩器 def __init__(self, max_history_tokens: int 3000): self.openai AsyncOpenAI() self.max_history_tokens max_history_tokens self.summary # 压缩后的历史摘要 self.recent_messages [] # 保留最近的原始消息 self.recent_turns_to_keep 3 # 保留最近3轮不压缩 def _estimate_tokens(self, text: str) - int: 估算Token数粗略实际应用tiktoken return len(text) // 4 async def _summarize(self, messages: list[dict]) - str: 将消息列表压缩为摘要 messages_text \n.join([ f{m[role].upper()}: {m[content]} for m in messages ]) response await self.openai.chat.completions.create( modelgpt-4o-mini, # 用便宜的模型做摘要 messages[{ role: user, content: f将以下对话历史压缩为简洁的摘要保留所有关键信息、决策和上下文。用第三人称客观描述不要遗漏重要细节。对话历史{messages_text}请生成300字以内的摘要 }], max_tokens400, temperature0 ) return response.choices[0].message.content async def add_message(self, role: str, content: str): 添加新消息必要时触发压缩 self.recent_messages.append({role: role, content: content}) # 计算当前历史总Token total_tokens sum( self._estimate_tokens(m[content]) for m in self.recent_messages ) if self.summary: total_tokens self._estimate_tokens(self.summary) # 超过阈值时压缩 if total_tokens self.max_history_tokens: await self._compress() async def _compress(self): 压缩旧消息 # 保留最近N轮压缩其余部分 keep_count self.recent_turns_to_keep * 2 # 每轮包含userassistant to_compress self.recent_messages[:-keep_count] if len(self.recent_messages) keep_count else [] if not to_compress: return # 生成新摘要包含旧摘要 new_summary_content if self.summary: new_summary_content f历史摘要{self.summary}\n\n new_summary_content 以上摘要基础上的新对话 new_summary await self._summarize(to_compress) if self.summary: # 合并摘要 merged await self._merge_summaries(self.summary, new_summary) self.summary merged else: self.summary new_summary # 只保留最近的消息 self.recent_messages self.recent_messages[-keep_count:] print(f✂️ 压缩完成从{len(to_compress)}条消息提取摘要) async def _merge_summaries(self, old: str, new: str) - str: 合并两段摘要 response await self.openai.chat.completions.create( modelgpt-4o-mini, messages[{ role: user, content: f将以下两段对话摘要合并为一段不超过500字\n\n旧摘要{old}\n\n新摘要{new} }], temperature0 ) return response.choices[0].message.content def get_context(self) - list[dict]: 获取当前压缩后的上下文 messages [] # 将摘要作为系统消息 if self.summary: messages.append({ role: system, content: f以下是对话历史摘要请记住这些上下文\n{self.summary} }) # 追加最近的原始消息 messages.extend(self.recent_messages) return messages### 2. 选择性保留Selective Retention不是所有内容都同等重要关键是找出必须保留的”pythonclass SelectiveContextManager: 选择性上下文管理器 IMPORTANCE_PROMPT 评估以下对话片段对当前任务的重要性返回0-10的整数分数对话片段{conversation}当前任务{current_task}重要性标准10分包含直接影响当前任务的关键信息、约束或决策7-9分包含相关背景信息或可能有用的细节4-6分相关但非必要的信息1-3分边缘相关或已被后续对话更新的信息0分完全无关的闲聊或已无效信息只返回数字 async def score_and_filter( self, messages: list[dict], current_task: str, keep_top_n: int 10 ) - list[dict]: 对消息评分并保留最重要的 if len(messages) keep_top_n: return messages scored_messages [] # 批量评分用更便宜的模型 for i, msg in enumerate(messages): response await self.openai.chat.completions.create( modelgpt-4o-mini, messages[{ role: user, content: self.IMPORTANCE_PROMPT.format( conversationf{msg[role]}: {msg[content][:500]}, current_taskcurrent_task ) }], max_tokens5, temperature0 ) try: score int(response.choices[0].message.content.strip()) except ValueError: score 5 scored_messages.append((i, msg, score)) # 按分数排序保留top N同时保留消息的时间顺序 top_indices sorted( [s[0] for s in sorted(scored_messages, keylambda x: x[2], reverseTrue)[:keep_top_n]] ) return [messages[i] for i in top_indices]### 3. 提取式压缩Extractive Compression从长文档中直接提取关键句子无需重新生成pythonfrom sklearn.feature_extraction.text import TfidfVectorizerfrom sklearn.metrics.pairwise import cosine_similarityimport numpy as npclass ExtractiveSummarizer: 提取式文档压缩无需LLM API def __init__(self, compression_ratio: float 0.3): self.ratio compression_ratio # 保留30%的句子 def compress(self, text: str, query: str None) - str: 提取最重要的句子 import re # 分句 sentences re.split(r[。\n], text) sentences [s.strip() for s in sentences if len(s.strip()) 10] if not sentences: return text n_keep max(1, int(len(sentences) * self.ratio)) if query: # 基于查询相关性选择句子 vectorizer TfidfVectorizer() all_texts [query] sentences tfidf_matrix vectorizer.fit_transform(all_texts) # 计算每句话与查询的相似度 query_vec tfidf_matrix[0] sentence_vecs tfidf_matrix[1:] scores cosine_similarity(query_vec, sentence_vecs)[0] else: # 基于句子中心性选择 vectorizer TfidfVectorizer() tfidf_matrix vectorizer.fit_transform(sentences) similarity_matrix cosine_similarity(tfidf_matrix) scores similarity_matrix.sum(axis1) # 选择得分最高的句子 top_indices sorted(np.argsort(scores)[-n_keep:]) return 。.join([sentences[i] for i in top_indices])### 4. 分层压缩Hierarchical Compression对不同类型的内容采用不同压缩策略pythonclass HierarchicalContextCompressor: 分层上下文压缩器 async def compress_rag_context( self, retrieved_chunks: list[str], query: str, target_tokens: int 2000 ) - str: 压缩RAG检索到的文档块 # 第一步提取式过滤快速去噪无需LLM extractor ExtractiveSummarizer(compression_ratio0.5) filtered_chunks [ extractor.compress(chunk, queryquery) for chunk in retrieved_chunks ] combined \n\n.join(filtered_chunks) # 如果还是太长用LLM进行生成式压缩 current_tokens len(combined) // 4 if current_tokens target_tokens: response await self.openai.chat.completions.create( modelgpt-4o-mini, messages[{ role: user, content: f基于以下查询从提供的文档中提取并压缩最相关的信息。目标{target_tokens * 4}字以内。查询{query}文档内容{combined}请提取最关键的信息保持准确性 }], max_tokenstarget_tokens ) return response.choices[0].message.content return combined—## 各方案对比与选型建议| 方案 | Token减少率 | 信息保留质量 | 计算成本 | 适用场景 ||------|------------|-------------|---------|---------|| 摘要压缩 | 70-85% | 高 | 中需LLM | 长对话历史 || 选择性保留 | 40-60% | 很高 | 高需逐条评分 | 文档检索 || 提取式压缩 | 50-70% | 中 | 低无需API | 快速预处理 || 分层压缩 | 60-80% | 高 | 中 | RAG系统 |推荐组合策略- RAG系统提取式快速去噪 选择性保留精准筛选- 长对话摘要压缩定期归档 保留最近N轮原文- 实时系统提取式零成本作为第一层过滤—## 生产指标监控上线后需要持续监控以下指标确保压缩没有影响质量pythonclass CompressionMetrics: def track(self, original: str, compressed: str, response_quality: float): return { compression_ratio: len(compressed) / len(original), token_reduction: (len(original) - len(compressed)) // 4, response_quality_score: response_quality, # 人工或LLM评分 cost_saved_usd: (len(original) - len(compressed)) / 4 / 1_000_000 * 5 }上下文压缩是LLM工程化中被严重低估的优化手段。在追求更大上下文窗口的同时学会用好压缩技术才能构建真正高效的生产系统。