1. 为什么需要流式输出在传统的Web开发中大多数接口都是一问一答的模式客户端发送请求服务器处理完成后一次性返回所有数据。这种模式在处理简单业务时很高效但在某些场景下就显得力不从心了。比如大文件下载时如果等服务器完全读取文件再返回内存会被撑爆实时日志查看场景用户希望看到持续输出的日志而不是等程序跑完才显示AI对话场景用户希望看到模型一个字一个字地生成回答而不是等10秒才看到完整回复我最近就遇到了这样一个需求需要将Python开发的AI智能体服务通过Java网关暴露出去。Python服务本身支持SSEServer-Sent Events流式输出但Java网关需要先完成鉴权等逻辑才能转发请求。经过多次踩坑我总结出了SpringBoot处理流式输出的最佳实践。2. SSE协议深度解析2.1 SSE基础概念SSE全称Server-Sent Events是HTML5规范中的一部分。与WebSocket不同SSE是单向通信协议——只能由服务器向客户端推送数据。这种特性让它特别适合以下场景实时通知股票行情、新闻推送长时间运行的任务进度报告需要持续输出的AI对话场景SSE协议有以下几个关键特点基于普通HTTP协议不需要像WebSocket那样升级协议默认支持断线重连机制数据传输格式为纯文本易于调试所有现代浏览器都原生支持除了IE2.2 协议格式详解一个标准的SSE响应看起来是这样的HTTP/1.1 200 OK Content-Type: text/event-stream Cache-Control: no-cache Connection: keep-alive data: 第一行数据\n data: 第二行数据\n\n每条消息由以下几部分组成都是可选的data: 实际传输的数据内容可以分多行event: 事件类型默认是messageid: 消息ID用于断线重连时定位retry: 重连等待时间毫秒2.3 浏览器端如何使用前端使用非常简单const eventSource new EventSource(/api/stream); // 接收消息 eventSource.onmessage (event) { console.log(收到数据:, event.data); }; // 自定义事件 eventSource.addEventListener(customEvent, (event) { console.log(自定义事件:, event.data); }); // 错误处理 eventSource.onerror (error) { console.error(连接错误:, error); };3. SpringBoot中的SSE实现方案3.1 SseEmitter基础用法SpringBoot提供了开箱即用的SseEmitter类来实现SSE服务端。下面是一个最简单的例子RestController public class SseController { GetMapping(/sse-demo) public SseEmitter handleSse() { SseEmitter emitter new SseEmitter(60_000L); // 1分钟超时 // 建议使用线程池而不是直接new Thread Executors.newSingleThreadExecutor().submit(() - { try { for (int i 0; i 10; i) { emitter.send( SseEmitter.event() .data(消息 i) .id(String.valueOf(i)) ); Thread.sleep(1000); } emitter.complete(); } catch (Exception e) { emitter.completeWithError(e); } }); return emitter; } }这里有几个需要注意的点一定要设置合理的超时时间建议使用线程池处理异步任务每次send后最好调用flush()处理完成后要调用complete()或completeWithError()3.2 生产环境优化在实际项目中直接使用SseEmitter可能会遇到以下问题客户端意外断开连接后服务器仍在发送数据大量连接时内存占用过高缺乏统一的错误处理机制改进后的代码RestController public class RobustSseController { private final MapString, SseEmitter emitters new ConcurrentHashMap(); GetMapping(/robust-sse) public SseEmitter robustSse(RequestParam String clientId) { SseEmitter emitter new SseEmitter(60_000L); // 注册连接 emitters.put(clientId, emitter); // 连接超时处理 emitter.onTimeout(() - { emitter.complete(); emitters.remove(clientId); }); // 连接完成处理 emitter.onCompletion(() - emitters.remove(clientId)); // 错误处理 emitter.onError((ex) - { emitter.complete(); emitters.remove(clientId); }); return emitter; } // 向指定客户端发送消息 public void sendToClient(String clientId, String message) { SseEmitter emitter emitters.get(clientId); if (emitter ! null) { try { emitter.send(message); } catch (IOException e) { emitter.completeWithError(e); emitters.remove(clientId); } } } }4. WebClient流式调用实战4.1 为什么选择WebClient当我们需要在SpringBoot应用中调用其他服务的SSE接口时常见的方案有RestTemplate不支持流式响应OpenFeign对SSE支持不友好WebClientSpring官方推荐的响应式HTTP客户端WebClient的优势在于原生支持响应式编程模型内置连接池管理完善的超时和重试机制与Spring生态完美集成4.2 基础配置首先添加依赖dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-webflux/artifactId /dependency然后配置WebClient BeanConfiguration public class WebClientConfig { Bean public WebClient webClient() { return WebClient.builder() .baseUrl(http://ai-service:8080) .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .clientConnector(new ReactorClientHttpConnector( HttpClient.create() .responseTimeout(Duration.ofSeconds(30)) )) .build(); } }4.3 调用SSE接口Service public class AiServiceProxy { Autowired private WebClient webClient; public FluxString streamAiResponse(String prompt) { return webClient.post() .uri(/v1/chat) .bodyValue(Map.of(prompt, prompt)) .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(String.class) .map(data - { // 处理SSE格式数据 if (data.startsWith(data:)) { return data.substring(5).trim(); } return data; }) .timeout(Duration.ofMinutes(5)) .doOnError(IOException.class, e - log.error(SSE连接异常, e) ); } }4.4 控制器层集成RestController public class AiController { Autowired private AiServiceProxy aiService; PostMapping(value /chat, produces MediaType.TEXT_EVENT_STREAM_VALUE) public FluxString chat(RequestBody ChatRequest request) { // 前置鉴权逻辑 if (!checkAuth(request.getToken())) { return Flux.just(error: 未授权); } return aiService.streamAiResponse(request.getPrompt()) .onErrorResume(e - Flux.just(error: e.getMessage()) ); } }5. 性能优化与生产实践5.1 连接池配置默认情况下WebClient使用Reactor Netty作为底层实现。我们可以优化连接池配置Bean public WebClient webClient() { ConnectionProvider provider ConnectionProvider.builder(custom) .maxConnections(500) .pendingAcquireTimeout(Duration.ofSeconds(60)) .build(); HttpClient httpClient HttpClient.create(provider) .responseTimeout(Duration.ofSeconds(30)); return WebClient.builder() .clientConnector(new ReactorClientHttpConnector(httpClient)) .build(); }5.2 背压处理当生产者速度大于消费者时需要合理处理背压public FluxString streamWithBackpressure() { return webClient.get() .uri(/stream) .retrieve() .bodyToFlux(String.class) .onBackpressureBuffer(1000, // 缓冲区大小 buffer - log.warn(缓冲区已满丢弃数据), BufferOverflowStrategy.DROP_LATEST) .delayElements(Duration.ofMillis(100)); // 控制消费速率 }5.3 监控与指标集成Micrometer监控流式请求Bean public WebClient monitoredWebClient(MeterRegistry registry) { return WebClient.builder() .filter(MetricsWebClientFilterFunction.builder(registry) .uriMapper(req - req.uri().getPath()) .build()) .build(); }6. 常见问题排查6.1 连接过早关闭现象客户端接收几条数据后连接就断开可能原因服务器未正确设置Content-Type为text/event-stream网络中间件如Nginxproxy_read_timeout设置过小客户端EventSource设置了过短的reconnect时间解决方案location /api/stream { proxy_pass http://backend; proxy_set_header Connection ; proxy_http_version 1.1; proxy_read_timeout 3600s; # 1小时 proxy_buffering off; }6.2 内存泄漏现象随着时间推移应用内存不断增长可能原因SseEmitter实例未被正确回收WebClient响应未被正确订阅和取消解决方案// 对于SseEmitter emitter.onCompletion(() - cleanResources()); emitter.onTimeout(() - cleanResources()); // 对于WebClient FluxString flux webClient.get().uri(/stream).retrieve().bodyToFlux(String.class); Disposable disposable flux.subscribe(); // 需要取消时 disposable.dispose();6.3 性能瓶颈现象高并发下系统吞吐量下降明显优化方向调整连接池参数使用更高效的序列化方式考虑引入RSocket替代HTTPBean public WebClient highPerformanceWebClient() { return WebClient.builder() .codecs(configurer - { configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024); configurer.defaultCodecs().jackson2JsonDecoder( new Jackson2JsonDecoder(getObjectMapper(), MediaType.TEXT_EVENT_STREAM)); }) .build(); }7. 方案对比与选型建议7.1 技术方案对比特性SseEmitterWebClientWebSocket长轮询通信方向单向双向双向单向协议基础HTTPHTTPWSHTTP断线重连支持需手动需手动自动浏览器支持广泛N/A广泛广泛服务器压力低中中高适用场景服务器推送HTTP调用实时交互简单轮询7.2 选型建议根据我的项目经验给出以下建议纯服务器推送场景优先选择SseEmitter实现简单且资源消耗低需要调用第三方SSE服务使用WebClient配合Flux实现流式处理需要双向通信考虑WebSocket但要注意实现复杂度老旧系统兼容可以使用长轮询作为fallback方案对于微服务架构中的流式处理我推荐以下最佳实践组合前端与网关SSE协议网关与内部服务WebClient SSE服务间高性能通信考虑RSocket8. 完整示例项目结构下面是一个生产可用的项目结构示例src/main/java ├── config │ ├── WebClientConfig.java │ └── SecurityConfig.java ├── controller │ ├── SseController.java │ └── ApiGatewayController.java ├── service │ ├── SseService.java │ └── AiProxyService.java ├── model │ ├── EventMessage.java │ └── ApiResponse.java └── exception ├── SseException.java └── GlobalExceptionHandler.java关键代码片段WebClient高级配置Bean public WebClient aiServiceWebClient() { return WebClient.builder() .baseUrl(http://ai-service) .filter((request, next) - { // 添加认证头 String token obtainAuthToken(); return next.exchange( ClientRequest.from(request) .header(Authorization, Bearer token) .build() ); }) .filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest - { // 记录请求日志 log.info(Request: {} {}, clientRequest.method(), clientRequest.url()); return Mono.just(clientRequest); })) .build(); }带熔断的流式调用public FluxString streamWithCircuitBreaker(String prompt) { CircuitBreaker circuitBreaker CircuitBreaker.ofDefaults(aiService); return circuitBreaker.run( () - webClient.post() .uri(/chat) .bodyValue(Map.of(prompt, prompt)) .retrieve() .bodyToFlux(String.class), throwable - { log.error(服务调用失败, throwable); return Flux.just(系统繁忙请稍后重试); } ); }性能测试建议SpringBootTest class SsePerformanceTest { Autowired private WebTestClient webTestClient; Test void testHighConcurrency() { int concurrentRequests 100; Flux.range(1, concurrentRequests) .flatMap(i - webTestClient.get() .uri(/sse-stream) .exchange() .expectStatus().isOk() ) .blockLast(Duration.ofMinutes(1)); } }在实际项目中我发现流式输出的稳定性很大程度上取决于网络环境和资源配置。建议在K8s环境中合理设置以下参数就绪探针超时时间Pod资源限制特别是内存HPA自动扩缩容策略Ingress的proxy_buffer配置对于关键业务场景还需要考虑消息幂等性处理断点续传支持消息确认机制端到端加密这些经验都是在实际项目中踩坑后总结出来的。比如有一次线上故障因为没设置合理的背压策略导致网关内存溢出。后来我们引入了反应式编程模型配合完善的监控告警系统才真正稳定下来。