百万级数据处理的优雅解法MyBatis Cursor流式查询深度实践在当今数据爆炸的时代后端开发者经常面临处理海量数据的挑战。想象一下这样的场景你需要从数据库中导出百万条记录生成报表或者将大量数据迁移到另一个系统。传统的分页查询方式不仅效率低下还可能导致内存溢出OOM——当数据量超过JVM堆内存限制时应用程序就会崩溃。这正是MyBatis Cursor流式查询大显身手的地方。1. 为什么需要流式查询传统的数据查询方式就像一次性把整个图书馆的书都搬到你的办公室——既占空间又效率低下。而流式查询则像是请图书管理员每次只递给你一本书读完再换下一本。内存消耗对比实验测试环境MySQL 8.0100万条测试数据查询方式内存峰值执行时间适用场景传统List查询1.2GB8.5s小数据集分页查询(每页5000)350MB32s中等数据集Cursor流式查询50MB12s大数据集// 传统查询方式 - 危险 ListHugeData allData hugeDataMapper.selectAll(); // 可能引发OOM // 流式查询 - 安全 try (CursorHugeData cursor hugeDataMapper.queryByCursor()) { cursor.forEach(data - process(data)); }流式查询的核心优势在于内存友好不会一次性加载所有数据性能稳定避免了深度分页的性能衰减代码简洁无需手动管理分页逻辑2. MyBatis Cursor的三种正确打开方式2.1 事务注解方案Spring环境推荐Transactional public void exportLargeData(OutputStream output) throws IOException { try (CursorOrder cursor orderMapper.streamAllOrders()) { CSVWriter writer new CSVWriter(new OutputStreamWriter(output)); cursor.forEach(order - { writer.writeNext(convertToCsvRow(order)); if (rowsProcessed % 1000 0) { writer.flush(); // 定期刷新缓冲区 } }); } }注意事项确保方法有Transactional注解使用try-with-resources确保Cursor正确关闭大事务可能导致数据库连接占用时间过长2.2 SqlSessionFactory手动控制方案public void processWithManualSession() { SqlSession session sqlSessionFactory.openSession(ExecutorType.SIMPLE); try { OrderMapper mapper session.getMapper(OrderMapper.class); try (CursorOrder cursor mapper.streamAllOrders()) { cursor.forEach(this::processOrder); } } finally { session.close(); // 必须手动关闭 } }这种方案适合需要精细控制连接生命周期的场景非Spring环境的应用需要自定义ExecutorType的情况2.3 TransactionTemplate编程式事务方案public void processInTemplate() { transactionTemplate.execute(status - { try (CursorProduct cursor productMapper.streamAllProducts()) { cursor.forEach(product - { if (shouldFilter(product)) { status.setRollbackOnly(); // 可以触发回滚 throw new RuntimeException(Filter condition met); } processProduct(product); }); } catch (IOException e) { throw new RuntimeException(e); } return null; }); }优势可以在遍历过程中控制事务比注解方式更灵活适合需要条件回滚的场景3. 生产环境避坑指南3.1 连接池兼容性问题不同连接池对Cursor的支持有差异连接池版本已知问题解决方案Druid1.2.10关闭连接时打印ERROR日志升级到1.2.10或配置logFilterHikariCP所有无无需特殊处理Tomcat所有长时间占用连接可能导致回收适当增大超时时间Druid配置示例bean idlogFilter classcom.alibaba.druid.filter.logging.Slf4jLogFilter property namestatementExecutableSqlLogEnable valuefalse/ /bean3.2 资源管理最佳实践总是使用try-with-resources// 正确做法 try (CursorData cursor mapper.streamData()) { cursor.forEach(...); } // 危险做法 CursorData cursor mapper.streamData(); cursor.forEach(...); // 可能忘记关闭控制处理速度RateLimiter limiter RateLimiter.create(1000); // 每秒1000条 try (CursorData cursor mapper.streamData()) { cursor.forEach(data - { limiter.acquire(); process(data); }); }批量处理优化ListData batch new ArrayList(BATCH_SIZE); try (CursorData cursor mapper.streamData()) { cursor.forEach(data - { batch.add(data); if (batch.size() BATCH_SIZE) { bulkProcess(batch); batch.clear(); } }); if (!batch.isEmpty()) { bulkProcess(batch); } }3.3 性能调优技巧MySQL服务器端配置-- 增加超时时间避免连接断开 SET GLOBAL wait_timeout 28800; SET GLOBAL interactive_timeout 28800; -- 优化网络包大小 SET GLOBAL max_allowed_packet 256M;MyBatis配置优化settings setting namedefaultExecutorType valueSIMPLE/ !-- 适合流式查询 -- setting namefetchSize value1000/ !-- 控制每次网络往返获取的行数 -- /settingsJDBC URL参数jdbc:mysql://host/db?useCursorFetchtruedefaultFetchSize10004. 实战构建高效数据导出服务让我们通过一个完整的案例展示如何用Cursor实现安全高效的数据导出。4.1 架构设计客户端 → 导出请求 → Spring Controller → 导出服务(Cursor流式处理) → 分块写入 → HTTP响应流4.2 核心实现代码RestController RequiredArgsConstructor public class DataExportController { private final ExportService exportService; GetMapping(/export/csv) public void exportCsv(HttpServletResponse response) throws IOException { response.setContentType(text/csv); response.setHeader(Content-Disposition, attachment; filenameexport.csv); exportService.exportToCsv(response.getOutputStream()); } } Service Transactional RequiredArgsConstructor class ExportService { private final LargeDataMapper dataMapper; public void exportToCsv(OutputStream output) throws IOException { try (CursorBusinessData cursor dataMapper.streamAll(); CSVWriter writer new CSVWriter(new OutputStreamWriter(output))) { writer.writeNext(header()); // 写表头 cursor.forEach(data - { writer.writeNext(convertToRow(data)); if (rowsExported % 1000 0) { writer.flush(); // 定期刷新 } }); } } }4.3 压力测试结果使用JMeter模拟100并发导出数据量传统方式Cursor方式10万条失败(OOM)成功(45s)50万条失败(OOM)成功(3m12s)100万条失败(OOM)成功(6m48s)内存占用对比传统方式随着数据量线性增长最终OOMCursor方式稳定在50-100MB5. 高级应用场景5.1 数据迁移管道模式public void migrateData() { try (CursorSourceData source sourceMapper.streamAll(); TargetMapper target targetSession.getMapper(TargetMapper.class)) { source.forEach(src - { TargetData targetData convert(src); target.insert(targetData); if (count % 1000 0) { targetSession.commit(); // 分批提交 logger.info(Migrated {} records, count); } }); targetSession.commit(); // 提交剩余记录 } }5.2 与Spring Batch集成Bean public ItemReaderData cursorItemReader() { return () - { CursorData cursor dataMapper.streamAll(); return new IteratorData() { Override public boolean hasNext() { return cursor.hasNext(); } Override public Data next() { return cursor.next(); } }; }; } Bean public Step dataExportStep(ItemReaderData reader) { return stepBuilderFactory.get(dataExport) .Data, Datachunk(1000) .reader(reader) .writer(chunk - { // 处理逻辑 }) .build(); }5.3 响应式编程结合public FluxData streamDataReactive() { return Flux.create(emitter - { try (CursorData cursor dataMapper.streamAll()) { cursor.forEach(emitter::next); emitter.complete(); } catch (Exception e) { emitter.error(e); } }, FluxSink.OverflowStrategy.BUFFER); }在实际项目中我们发现Cursor结合响应式编程特别适合实时数据看板场景可以实现低延迟的数据推送。