文章目录前言一、 RabbitMQ 消息队列配置1. 创建交换机2. 创建队列3. 绑定二、构建Spring Boot项目1. 添加依赖2. 配置连接3. 消息队列配置4. 消息发送5. 消息接收三、️ 监控与问题排查技巧1. 核心监控手段2. 常见问题与解决方案前言Spring Boot 集成‌集成过程主要分为添加依赖、配置连接、声明组件和收发消息四个步骤。一、 RabbitMQ 消息队列配置此处为手动配置消息队列。也可以选择在下面二、构建Spring Boot项目 3. 声明交换机、队列和绑定由应用程序自动创建配置。1. 创建交换机2. 创建队列3. 绑定点击要绑定队列的交换机进入交换机详情页面在绑定信息区域向交换机添加绑定信息。输入要绑定的队列名称绑定键以及必要的参数点击Bind。二、构建Spring Boot项目1. 添加依赖在pom.xml文件中添加spring-boot-starter-amqp依赖它将自动引入所有必要的库。dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency2. 配置连接在application.yml中配置 RabbitMQ 的连接信息。建议开启心跳和连接重试以保证连接的稳定性。spring:rabbitmq:host:127.0.0.1port:5672username:guestpassword:guest# 开启心跳检测防止连接因长时间空闲而断开requested-heartbeat:60listener:simple:retry:enabled:true# 开启连接重试max-attempts:33. 消息队列配置声明交换机、队列和绑定通过 Java 配置类来声明 RabbitMQ 的核心组件。这确保了应用启动时所需的队列和交换机就已存在。此处为由应用程序自动创建配置消息队列。也可以选择在上面一、RabbitMQ 消息队列配置手动创建配置。importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/** * author * date * description RabbitMQ配置类 */ConfigurationpublicclassRabbitMQConfiguration{publicstaticfinalStringEXCHANGE_TEST_DIRECTex.test.direct;publicstaticfinalStringQUEUE_TEST_CLASSIC_DIRECTq.test-classic.direct;publicstaticfinalStringROUTINGKEY_TEST_CLASSIC_DIRECTrk.test-classic.direct;// 声明持久化直连交换机BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange(EXCHANGE_TEST_DIRECT);}// 声明持久化队列BeanpublicQueuequeueClassicDirect(){returnnewQueue(QUEUE_TEST_CLASSIC_DIRECT);}// 将队列绑定到交换机并指定路由键BeanpublicBindingbindingClassicDirect(Queuequeue,DirectExchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_TEST_CLASSIC_DIRECT);}}声明回调importcn.hutool.json.JSONUtil;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Component;/** * author * date * description RabbitMQ回调处理类 */ComponentpublicclassRabbitMQCallbackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{/** * Confirmation callback. * param correlationData – correlation data for the callback. 对象内部只有一个id属性用来表示当前消息的唯一性 * param ack – true for ack, false for nack. 消息投递到broker的状态true成功false失败 * param cause – An optional cause, for nack, when available, otherwise null. 投递失败的原因 */Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanack,Stringcause){if(ack)System.out.println(消息投递收到确认correlationDatacorrelationData.getId());if(!ack)System.out.println(消息IDcorrelationData.getId()投递失败失败原因cause);}/** * Returned message callback. * param message the returned message. * param replyCode the reply code. * param replyText the reply text. * param exchange the exchange. * param routingKey the routing key. */OverridepublicvoidreturnedMessage(Messagemessage,intreplyCode,StringreplyText,Stringexchange,StringroutingKey){System.out.println(消息返回结果JSONUtil.toJsonStr(message)返回码replyCode返回信息replyTextexchangeexchangeroutingKeyroutingKey);}}4. 消息发送使用RabbitTemplate发送消息。生产者注入RabbitTemplate调用convertAndSend方法。importcom.ftms.dms.infra.itai.demo.core.config.RabbitMQCallback;importcom.ftms.dms.infra.itai.demo.core.config.RabbitMQConfiguration;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageBuilder;importorg.springframework.amqp.core.MessageDeliveryMode;importorg.springframework.amqp.core.MessageProperties;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.scheduling.annotation.Async;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;/** * author * date * description 测试消息发送者 */Slf4jComponentpublicclassTestMsgProducer{ResourceprivateRabbitTemplaterabbitTemplate;ResourceprivateRabbitMQCallbackmqProductCallBack;publicTestMsgProducer(Qualifier(firstRabbitTemplate)RabbitTemplaterabbitTemplate){this.rabbitTemplaterabbitTemplate;}AsyncpublicvoidsendMessageDirect(Stringmsg){log.info(-------------消息推送开始--------------);//创建CorrelationData对象包含唯一id,id的作用是在回调函数中识别消息也就是根据id跟踪这条消息CorrelationDatacorrelationDatanewCorrelationData(id_System.currentTimeMillis());//消息确认与返回rabbitTemplate.setConfirmCallback(mqProductCallBack);rabbitTemplate.setReturnCallback(mqProductCallBack);//消息发送rabbitTemplate.convertAndSend(RabbitMQConfiguration.EXCHANGE_TEST_DIRECT,RabbitMQConfiguration.ROUTINGKEY_TEST_CLASSIC_DIRECT,msg,//Lambda表达式实现MessagePostProcessor接口message-{//获取消息的属性设置传输模式DeliveryMode为持久化会写入磁盘message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//返回修改后的消息returnmessage;},correlationData);log.info(--------------消息推送结束------------------);return消息发送成功消息内容是msg;}}控制台输出RabbitMQ 管理后台5. 消息接收配置消费者有多种方式比如本文使用RabbitListener注解接收消息。消费者使用RabbitListener监听队列。强烈建议使用手动确认模式MANUAL以确保消息被成功处理后再发送ACK防止消息丢失。importcom.rabbitmq.client.Channel;importlombok.RequiredArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.support.AmqpHeaders;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.autoconfigure.condition.ConditionalOnProperty;importorg.springframework.messaging.handler.annotation.Header;importorg.springframework.stereotype.Component;importorg.springframework.util.Assert;importjava.io.IOException;/** * author * date * description 测试消息消费者 */Slf4jComponentpublicclassTestMsgConsumer{// 默认情况下当使用RabbitListener注解时消息确认模式通常是自动的AcknowledgeMode.AUTO可以在yaml文件中更改// 消息一旦被消费者接收并处理完成即方法执行完成就会自动发送ack确认给RabbitMQ。RabbitListener(queues${spring.rabbitmq.test-classic-direct-queue})publicvoidhandleMessageAuto(Stringmsg){// 处理接收到的消息log.info(接收到的消息是msg);}// 手动ackRabbitListener(queues${spring.rabbitmq.test-classic-direct-queue},ackModeMANUAL)publicvoidprocessMessage(Stringmessage,Channelchannel,Header(AmqpHeaders.DELIVERY_TAG)longtag)throwsIOException{// 处理接收到的消息System.out.println(接收到的消息是: message);try{// 业务处理逻辑// 处理成功后手动确认channel.basicAck(tag,false);}catch(Exceptione){// 处理失败可以根据情况选择拒绝消息// requeuetrue 表示消息重新入队false 则进入死信队列如果配置了channel.basicNack(tag,false,true);}}}控制台输出三、️ 监控与问题排查技巧当系统投入运行后有效的监控和问题排查能力至关重要。1. 核心监控手段RabbitMQ 管理后台RabbitMQ 的管理插件这是最直观的监控工具它提供了一个 Web UI 来监控和管理 RabbitMQ 服务器。通过启用rabbitmq_management插件你可以访问http://你的服务器IP:15672来查看概览: 连接数、信道数、队列总数、消息速率等。队列: 每个队列的消息数量Ready, Unacked、消费者数量、内存占用。连接/信道: 查看当前所有连接和信道的状态。节点: 监控各个节点的内存、磁盘、文件描述符等资源使用情况。Prometheus Grafana对于生产环境建议使用 Prometheus 收集指标并用 Grafana 进行可视化展示和告警。步骤一启用 Prometheus 插件。rabbitmq-pluginsenablerabbitmq_prometheus步骤二配置 Prometheus (prometheus.yml) 来抓取 RabbitMQ 的指标默认端口 15692。scrape_configs:-job_name:rabbitmqstatic_configs:-targets:[localhost:15692]步骤三在 Grafana 中导入 RabbitMQ 官方或社区提供的仪表盘模板即可看到丰富的可视化图表。2. 常见问题与解决方案问题现象可能原因解决方案消息丢失生产者未开启确认、消费者自动确认、队列/消息未持久化。1. 生产者开启publisher-confirm-type: correlated2. 消费者使用ackMode: MANUAL3. 队列和消息都设置为持久化。消息重复消费消费者处理成功后发送ACK前Broker宕机导致消息重新投递。实现业务幂等性。例如使用消息的唯一ID在Redis中做处理记录或依赖数据库的唯一索引。消息堆积消费者处理能力不足、处理速度慢、或消费者宕机。1.优化消费者增加消费者实例数量调整prefetch参数建议5-25。2.队列拆分按业务类型或优先级拆分队列避免单队列瓶颈。3.使用惰性队列将积压消息存入磁盘降低内存压力。消费者阻塞消费者处理逻辑耗时过长或预取prefetch消息数过多。1. 调小prefetch值避免一个消费者占用过多消息。2. 消费者内部使用线程池异步处理业务避免阻塞消费线程。高CPU/性能慢频繁创建/销毁连接、大量小消息、磁盘I/O瓶颈。1.使用连接池保持长连接。2.生产者批量发送减少网络交互次数。3. 检查磁盘I/O考虑使用SSD。通过以上步骤可以完成 Spring Boot 与 RabbitMQ 的基础集成并具备在生产环境中监控和解决常见问题的能力。