Spring Boot 与 Apache Pulsar 集成实战:构建高性能消息系统
Spring Boot 与 Apache Pulsar 集成实战构建高性能消息系统一、引言Apache Pulsar 是一款云原生的分布式消息流平台凭借其高吞吐量、低延迟、多租户支持和灵活的消息模型成为现代分布式系统中消息传递的首选方案。Spring Boot 提供了对 Pulsar 的原生支持通过 Spring for Apache Pulsar 模块开发者可以轻松地将 Pulsar 集成到应用中。本文将深入探讨 Spring Boot 与 Apache Pulsar 的集成实践包括环境配置、生产者/消费者实现、消息模式以及高级特性。二、环境准备与依赖配置2.1 依赖引入在pom.xml中添加 Spring for Apache Pulsar 依赖dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-pulsar/artifactId /dependency2.2 配置文件设置在application.yml中配置 Pulsar 连接信息spring: pulsar: client: service-url: pulsar://localhost:6650 admin-url: http://localhost:8080 connection-timeout: 10s operation-timeout: 30s producer: topic-name: my-topic enable-batching: true batching-max-messages: 1000 batching-max-bytes: 131072 consumer: subscription-name: my-subscription subscription-type: exclusive ack-timeout: 30s2.3 Pulsar 配置类创建自定义的 Pulsar 配置Configuration public class PulsarConfig { Bean public PulsarClient pulsarClient(PulsarProperties properties) { return PulsarClient.builder() .serviceUrl(properties.getClient().getServiceUrl()) .adminUrl(properties.getClient().getAdminUrl()) .connectionTimeout(properties.getClient().getConnectionTimeout()) .operationTimeout(properties.getClient().getOperationTimeout()) .build(); } Bean public ProducerFactoryString producerFactory(PulsarClient pulsarClient) { return new DefaultProducerFactory(pulsarClient, Map.of()); } Bean public ConsumerFactoryString consumerFactory(PulsarClient pulsarClient) { return new DefaultConsumerFactory(pulsarClient, Map.of()); } }三、生产者实现3.1 简单消息发送Service public class PulsarProducerService { private final PulsarTemplateString pulsarTemplate; public PulsarProducerService(PulsarTemplateString pulsarTemplate) { this.pulsarTemplate pulsarTemplate; } public void sendMessage(String topic, String message) { pulsarTemplate.send(topic, message); } public CompletableFutureMessageId sendAsyncMessage(String topic, String message) { return pulsarTemplate.sendAsync(topic, message); } }3.2 带消息属性的发送public void sendMessageWithProperties(String topic, String message, MapString, String properties) { MessageString pulsarMessage MessageBuilder.withPayload(message) .putAllProperties(properties) .build(); pulsarTemplate.send(topic, pulsarMessage); }3.3 批量消息发送public void sendBatchMessages(String topic, ListString messages) { ListMessageString pulsarMessages messages.stream() .map(msg - MessageBuilder.withPayload(msg).build()) .collect(Collectors.toList()); pulsarTemplate.send(topic, pulsarMessages); }四、消费者实现4.1 注解式消费者Component public class PulsarConsumerService { PulsarListener(subscriptionName my-subscription, topics my-topic) public void receiveMessage(String message) { System.out.println(Received message: message); processMessage(message); } PulsarListener(subscriptionName durable-subscription, topics persistent://public/default/my-topic, subscriptionType SubscriptionType.Durable) public void receivePersistentMessage(String message) { System.out.println(Received persistent message: message); } private void processMessage(String message) { // 业务处理逻辑 } }4.2 手动确认模式PulsarListener(subscriptionName manual-ack-subscription, topics my-topic, ackMode AckMode.MANUAL) public void receiveWithManualAck(String message, Header(Headers.MESSAGE_ID) MessageId messageId, Acknowledgment acknowledgment) { try { processMessage(message); acknowledgment.acknowledge(); } catch (Exception e) { acknowledgment.negativeAcknowledge(); } }4.3 批量消费PulsarListener(subscriptionName batch-subscription, topics my-topic, batch true, batchSize 100) public void receiveBatchMessages(ListString messages) { System.out.println(Received batch of messages.size() messages); messages.forEach(this::processMessage); }五、消息模式5.1 点对点模式Service public class PointToPointService { private final PulsarTemplateString pulsarTemplate; public PointToPointService(PulsarTemplateString pulsarTemplate) { this.pulsarTemplate pulsarTemplate; } public void sendTask(String task) { pulsarTemplate.send(tasks-topic, task); } } Component public class TaskConsumer { PulsarListener(subscriptionName task-consumer, topics tasks-topic, subscriptionType SubscriptionType.Exclusive) public void processTask(String task) { System.out.println(Processing task: task); } }5.2 发布/订阅模式Service public class NotificationService { private final PulsarTemplateString pulsarTemplate; public NotificationService(PulsarTemplateString pulsarTemplate) { this.pulsarTemplate pulsarTemplate; } public void publishNotification(String notification) { pulsarTemplate.send(notifications-topic, notification); } } Component public class NotificationSubscriber1 { PulsarListener(subscriptionName notification-subscriber-1, topics notifications-topic) public void receiveNotification(String notification) { System.out.println(Subscriber 1 received: notification); } } Component public class NotificationSubscriber2 { PulsarListener(subscriptionName notification-subscriber-2, topics notifications-topic) public void receiveNotification(String notification) { System.out.println(Subscriber 2 received: notification); } }5.3 请求/回复模式Service public class RequestReplyService { private final PulsarTemplateString pulsarTemplate; public RequestReplyService(PulsarTemplateString pulsarTemplate) { this.pulsarTemplate pulsarTemplate; } public CompletableFutureString sendRequest(String request) { return pulsarTemplate.sendAndReceive(request-topic, request); } } Component public class RequestHandler { PulsarListener(subscriptionName request-handler, topics request-topic) public String handleRequest(String request) { System.out.println(Handling request: request); return Response for: request; } }六、高级特性6.1 消息分区Bean public ProducerFactoryString partitionedProducerFactory(PulsarClient pulsarClient) { MapString, Object config new HashMap(); config.put(ProducerConfigurationName.PARTITION_KEY, my-partition-key); return new DefaultProducerFactory(pulsarClient, config); }6.2 消息压缩Bean public ProducerFactoryString compressedProducerFactory(PulsarClient pulsarClient) { MapString, Object config new HashMap(); config.put(ProducerConfigurationName.COMPRESSION_TYPE, CompressionType.LZ4); return new DefaultProducerFactory(pulsarClient, config); }6.3 事务支持Service public class TransactionalMessageService { private final PulsarTemplateString pulsarTemplate; public TransactionalMessageService(PulsarTemplateString pulsarTemplate) { this.pulsarTemplate pulsarTemplate; } Transactional public void sendTransactionalMessages(String topic1, String topic2, String message1, String message2) { pulsarTemplate.send(topic1, message1); pulsarTemplate.send(topic2, message2); } }6.4 死信队列PulsarListener( subscriptionName dlq-subscription, topics main-topic, deadLetterPolicy DeadLetterPolicy( maxRedeliverCount 3, deadLetterTopic dead-letter-topic ) ) public void receiveMessage(String message) { throw new RuntimeException(Simulated processing failure); } Component public class DeadLetterConsumer { PulsarListener(subscriptionName dlq-consumer, topics dead-letter-topic) public void processDeadLetter(String message) { System.out.println(Processing dead letter: message); } }七、Schema 支持7.1 JSON Schemapublic class User { private String name; private int age; // Getters and Setters } Service public class JsonProducerService { private final PulsarTemplateUser pulsarTemplate; public JsonProducerService(PulsarTemplateUser pulsarTemplate) { this.pulsarTemplate pulsarTemplate; } public void sendUser(User user) { pulsarTemplate.send(user-topic, user); } } Component public class JsonConsumer { PulsarListener(subscriptionName json-consumer, topics user-topic) public void receiveUser(User user) { System.out.println(Received user: user.getName()); } }7.2 Avro SchemaData public class Order { private String orderId; private String productId; private int quantity; private double totalAmount; } Configuration public class AvroSchemaConfig { Bean public SchemaOrder orderSchema() { return Schema.AVRO(Order.class); } }八、监控与管理8.1 指标监控Component public class PulsarMetrics { private final PulsarClient pulsarClient; public PulsarMetrics(PulsarClient pulsarClient) { this.pulsarClient pulsarClient; } public MapString, Object getProducerStats(String topic) { MapString, Object stats new HashMap(); // 获取生产者统计信息 return stats; } public MapString, Object getConsumerStats(String topic) { MapString, Object stats new HashMap(); // 获取消费者统计信息 return stats; } }8.2 健康检查Component public class PulsarHealthIndicator implements HealthIndicator { private final PulsarClient pulsarClient; public PulsarHealthIndicator(PulsarClient pulsarClient) { this.pulsarClient pulsarClient; } Override public Health health() { try { pulsarClient.getPartitionsForTopic(health-check-topic); return Health.up().withDetail(pulsar, connected).build(); } catch (Exception e) { return Health.down().withDetail(pulsar, disconnected) .withDetail(error, e.getMessage()).build(); } } }九、实战案例订单消息系统Service public class OrderMessageService { private final PulsarTemplateOrderEvent pulsarTemplate; public OrderMessageService(PulsarTemplateOrderEvent pulsarTemplate) { this.pulsarTemplate pulsarTemplate; } public void publishOrderCreated(Order order) { OrderEvent event new OrderEvent(); event.setOrderId(order.getId()); event.setEventType(ORDER_CREATED); event.setPayload(order); pulsarTemplate.send(order-events, event); } public void publishOrderPaid(Long orderId) { OrderEvent event new OrderEvent(); event.setOrderId(orderId); event.setEventType(ORDER_PAID); pulsarTemplate.send(order-events, event); } } Component public class OrderEventListener { private final InventoryService inventoryService; private final NotificationService notificationService; public OrderEventListener(InventoryService inventoryService, NotificationService notificationService) { this.inventoryService inventoryService; this.notificationService notificationService; } PulsarListener(subscriptionName order-event-listener, topics order-events) public void handleOrderEvent(OrderEvent event) { switch (event.getEventType()) { case ORDER_CREATED - inventoryService.reserveStock(event.getOrderId()); case ORDER_PAID - notificationService.sendOrderConfirmation(event.getOrderId()); default - System.out.println(Unknown event type: event.getEventType()); } } }十、总结本文详细介绍了 Spring Boot 与 Apache Pulsar 的集成实践环境配置依赖引入、连接配置、生产者/消费者工厂配置生产者实现同步/异步消息发送、批量发送、消息属性设置消费者实现注解式监听、手动确认、批量消费消息模式点对点、发布/订阅、请求/回复高级特性分区、压缩、事务、死信队列Schema 支持JSON、Avro 等数据格式监控管理指标收集、健康检查通过本文的学习读者可以掌握 Spring Boot 与 Pulsar 集成的核心技能能够构建高效、可靠的消息系统。在实际项目中需要根据业务需求选择合适的消息模式和配置参数以达到最佳的性能和可靠性。