rocketmq traceId重复问题
背景mq的traceId有重复现象理论上不同消息消费tracdId不应该相同但为什么有一定的概率会出现呢查询代码如下protected ConsumeStatus consumeMsgSingle(MessageExt ext) { log.debug(AbstractMessageListener-consumeMessage() msgId:{}, body:{}, ext.getMsgId(), new String(ext.getBody())); String message new String(ext.getBody()); //获取到key String key RocketMQUtils.concatKey(ext.getTopic(), ext.getTags()); //根据key从handleMap里获取到我们的处理类 MessageProcessor messageProcessor handleMap.get(key); if (Objects.isNull(messageProcessor)) { messageProcessor handleMap.get(ext.getTopic()); } Optional.ofNullable(messageProcessor).orElseThrow(() - new RRException(String.format(未找到消息处理类, topic:%s, tag:%s, ext.getTopic(), ext.getTags()))); Object obj null; try { //将String类型的message反序列化成对应的对象。 obj messageProcessor.transferMessage(message); if (obj instanceof MqMetaInfo) { MqMetaInfo meta (MqMetaInfo) obj; MqMetaInfoConverter.fromExt(meta, ext); } generateMDC(ext); } catch (Exception e) { StringBuilder errMsg new StringBuilder(对象反序列化失败, ) .append(messageId: ) .append(ext.getMsgId()).append(\n) .append(msgBody: ) .append(new String(ext.getBody())).append(\n) .append(messageExt ) .append(ext).append(\n) .append(stackTrace: ) .append(JSON.toJSONString(e.getStackTrace())); log.error(AbstractMessageListener-consumeMessage() error:{}, msgId:{}, message:{}, errMsg:{} , e, ext.getMsgId(), new String(ext.getBody()), errMsg.toString()); throw new RRException(errMsg.toString()); } //处理消息 boolean result messageProcessor.handleMessage(obj); if (!result) { if (ext.getReconsumeTimes() Integer.MAX_VALUE) { return ConsumeStatus.SUCCESS; } return ConsumeStatus.FAIL; } return ConsumeStatus.SUCCESS; }generateMDC方法如下原因分析可以看到如果message中有traceId则把traceId关联到该线程并打印出来。但发现最终该方法执行完成后未做清理traceId的动作即RocketMq的消费者用的是线程池而线程回收后traceId依然绑定在该线程上如果下次有消息过来消费则会有同样traceId出现重现消费者Slf4j Service(value multiConsumerDemoProcessor) public class MultiConsumerDemoProcessor implements MessageProcessorString { Override public boolean handleMessage(String orderNo) { log.info(开始消费:{}, orderNo); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return true; } Override public ClassString getClazz() { return null; } Override public String transferMessage(String message) { return message; } }生产者public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName); producer.setNamesrvAddr(ip); producer.start(); for (int i 0; i 10; i) try { { Message msg new Message(multi-consumer-demo, demo, OrderID188, Hello world.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult producer.send(msg); System.out.printf(%s%n, sendResult); } } catch (Exception e) { e.printStackTrace(); } //producer.shutdown(); } }运行结果可以看到traceId是有重复的解决加上finally语句释放traceId解决结果前端