news 2026/4/12 10:27:40

traceId 传递-MQ

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
traceId 传递-MQ

mq 发送消息

private <T extends BaseEvent> MessageBuilder<T> toMessageBuilder(final T event) { if (StringUtils.isBlank(event.keys())) { throw new RuntimeException("keys是必填项"); } // 获取tag,默认使用类名 String tags = StringUtils.defaultString(event.tags(), event.getClass().getSimpleName()); // 构建消息 MessageBuilder<T> messageBuilder = MessageBuilder.withPayload(event) .setHeader(RocketMQHeaders.TAGS, tags) .setHeader(RocketMQHeaders.KEYS, event.keys()); String traceId = MDC.get(Constants.MDC_TRACE_ID); if (StringUtils.isNotBlank(traceId)) { messageBuilder.setHeader(RocketMQConsts.Header.TRACE_ID, traceId); } String env = RequestThread.getValue(Constants.ENV); if (StringUtils.isNotBlank(env)) { messageBuilder.setHeader(RocketMQConsts.Header.ENV, env); } String desc = event.desc(); if (StringUtils.isNotBlank(desc)) { messageBuilder.setHeader(RocketMQConsts.Header.DESC, desc); } String producerApplicationName = environment.getProperty(Constants.SPRING_APPLICATION_NAME, DEFAULT_PRODUCER); messageBuilder.setHeader(RocketMQConsts.Header.PRODUCER, producerApplicationName); return messageBuilder; }

mq消费

public <T extends BaseEvent> void process(final String key, final Message<T> message, final Consumer<T> function) { String cacheKey = RedisKeyUtil.generate(REDIS_REPEAT_PREFIX_KEY, key); KeyInfo keyInfo = KeyInfo.builder() .prefix(REDIS_REPEAT_PREFIX_KEY) .keys(new String[]{key}) .waitTime(3) .timeUnit(TimeUnit.SECONDS) .build(); // 获取消息体 T event = message.getPayload(); try { // 获取链路追踪id String traceId = getHeaderValue(message, RocketMQConsts.Header.TRACE_ID); MDC.put(Constants.MDC_TRACE_ID, StringUtils.isBlank(traceId) ? key : traceId); // 设置调度tag环境 String env = getHeaderValue(message, RocketMQConsts.Header.ENV); RequestThread.addParam(Constants.ENV, env); //对相同key的消息加锁,保证相同的消息是串行消费 lockService.lock(keyInfo, () -> { //判断该消息是否已处理 if (redisCache.exists(cacheKey)) { log.info("[MQ消息-已处理]--[{}]:{}", key, ReflectUtils.processObject(event)); return null; } log.info("[MQ消息-开始处理]--[{}]:{}", key, ReflectUtils.processObject(event)); function.accept(event); log.info("[MQ消息-处理完成]--{}", key); //缓存处理结果一小时,一个小时内不重复消费 redisCache.setCacheObject(cacheKey, DateUtil.now(), 60, TimeUnit.MINUTES); return null; }); } catch (LockException e) { //加锁失败,说明有线程正在消费 log.warn("[MQ消息-正在处理]--[{}]:{}", key, ReflectUtils.processObject(event)); throw e; } catch (Exception e) { log.error("[MQ消息-处理异常]--[{}]:{}", key, event, e); throw e; } finally { MDC.clear(); RequestThread.clear(); } }

问题

为什么mq消费可以从mac中获取traceId 传递给消息请求头

这个代码展示了TraceId 如何从 HTTP 请求上下文传播到 MQ 消息的完整链路。让我详细解释为什么这里可以从 MDC 获取到 traceId:

🔄 完整的传播链路

🎯 为什么可以从 MDC 获取 traceId?

关键:在同一个线程内!

java

// 时间线分析(假设单线程同步处理): 1. HTTP请求进入 → HeaderFilter.doFilterInternal() ↓ 2. HeaderFilter设置MDC: String traceId = request.getHeader("traceId"); MDC.put("traceId", traceId); // 存入当前线程的ThreadLocal ↓ 3. 调用Controller → Service → MQ发送方法 ↓ 4. MQ发送方法执行: String traceId = MDC.get("traceId"); // 从同一个线程的ThreadLocal获取! ↓ 5. HeaderFilter清理: finally { MDC.clear(); } // 清理,但已经用过了

线程状态可视化

java

public class ThreadStateDemo { public void processOrder() { // 时间点1:HeaderFilter执行前 // 线程:http-nio-8080-exec-1 // MDC状态:{} // HeaderFilter执行 MDC.put("traceId", "TRACE-123"); // MDC状态:{"traceId": "TRACE-123"} // 时间点2:Controller执行 // 仍然是同一个线程:http-nio-8080-exec-1 String traceId1 = MDC.get("traceId"); // "TRACE-123" // 时间点3:Service执行,调用MQ发送 // 仍然是同一个线程:http-nio-8080-exec-1 sendMQEvent(); // sendMQEvent内部: String traceId2 = MDC.get("traceId"); // "TRACE-123" ✅ // 时间点4:HeaderFilter finally清理 MDC.clear(); // MDC状态:{} } private void sendMQEvent() { // 关键:这里和调用者在同一个线程! System.out.println("当前线程: " + Thread.currentThread().getName()); // 输出:http-nio-8080-exec-1 String traceId = MDC.get("traceId"); System.out.println("获取traceId: " + traceId); // "TRACE-123" } }

📊 完整的上下文传播表

阶段组件上下文存储位置如何获取是否同一线程
网关接收TraceGlobalFilter请求Headerrequest.getHeader()N/A
网关传递TraceGlobalFilter请求Headerheader.set()网关线程
业务服务接收HeaderFilter请求Headerrequest.getHeader()业务线程
业务服务存储HeaderFilterMDC (ThreadLocal)MDC.put()业务线程
Controller使用Controller方法MDCMDC.get()同一业务线程
Service使用Service方法MDCMDC.get()同一业务线程
MQ发送MQ发送器MDCMDC.get()同一业务线程
MQ消息头RocketMQ消息Message HeadersetHeader()同一业务线程

🔧 代码执行流程详解

1. 请求进入(设置MDC)

java

@Order(Ordered.HIGHEST_PRECEDENCE) public class HeaderFilter extends OncePerRequestFilter { @Override protected void doFilterInternal(HttpServletRequest request, ...) { try { // 从HTTP header获取traceId String traceId = request.getHeader(Constants.MDC_TRACE_ID); // 设置到当前线程的MDC MDC.put(Constants.MDC_TRACE_ID, StringUtils.defaultString(traceId, String.valueOf(System.currentTimeMillis()))); // 继续处理请求 chain.doFilter(request, response); // → 进入Controller } finally { MDC.clear(); // 请求处理完后清理 } } }

2. 业务处理(使用MDC)

java

@Service public class OrderService { @Autowired private EventPublisher eventPublisher; public void createOrder(OrderRequest request) { // 这里可以直接使用MDC中的traceId String currentTraceId = MDC.get(Constants.MDC_TRACE_ID); log.info("创建订单,traceId: {}", currentTraceId); // 执行业务逻辑... Order order = saveOrder(request); // 发送MQ事件 OrderCreatedEvent event = new OrderCreatedEvent(order); eventPublisher.publish(event); // → 进入MQ发送代码 } }

3. MQ发送(从MDC获取)

java

public class EventPublisher { private <T extends BaseEvent> MessageBuilder<T> toMessageBuilder(final T event) { // 关键:这里和Service方法在同一个线程 // 所以可以获取到MDC中的traceId String traceId = MDC.get(Constants.MDC_TRACE_ID); // ✅ 能获取到! if (StringUtils.isNotBlank(traceId)) { // 设置到MQ消息头 messageBuilder.setHeader(RocketMQConsts.Header.TRACE_ID, traceId); } // 同样获取其他上下文 String env = RequestThread.getValue(Constants.ENV); // ThreadLocal if (StringUtils.isNotBlank(env)) { messageBuilder.setHeader(RocketMQConsts.Header.ENV, env); } return messageBuilder; } }

📝 总结:为什么能从MDC获取traceId
核心原因:MQ发送代码与HTTP请求处理在同一个Tomcat线程内执行。

线程连续性:HTTP请求线程 → Controller → Service → MQ发送,都在同一个线程

MDC的线程绑定:MDC基于ThreadLocal,线程内共享

生命周期匹配:在HeaderFilter清理MDC之前,MQ发送已经完成

设计意图:这正是MDC的设计目的——在请求处理链路中共享上下文

关键时间线:


所以你的MQ发送代码能正确获取traceId,这正是分布式追踪系统设计精妙之处:通过线程上下文在同步处理链路上自动传递追踪信息。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/12 2:04:12

为什么网关和业务服务两次清理MDC

网关/*** 生成日志链路追踪id&#xff0c;并传入header中.** author breggor*/ Slf4j AllArgsConstructor public class TraceGlobalFilter implements GlobalFilter, Ordered {Overridepublic Mono<Void> filter(final ServerWebExchange exchange, final GatewayFilter…

作者头像 李华
网站建设 2026/4/10 8:41:14

27、网络基础:从文件共享到域名解析的全面解析

网络基础:从文件共享到域名解析的全面解析 在当今数字化的时代,网络已经成为了我们生活和工作中不可或缺的一部分。理解网络的基本原理和相关技术对于我们更好地利用网络资源至关重要。本文将深入探讨网络文件系统、网络信息服务、服务器消息块协议以及域名系统等重要的网络…

作者头像 李华
网站建设 2026/4/10 12:53:21

LDDC:一站式歌词解决方案,让音乐体验更完美

LDDC&#xff1a;一站式歌词解决方案&#xff0c;让音乐体验更完美 【免费下载链接】LDDC 精准歌词(逐字歌词/卡拉OK歌词)歌词获取工具,支持QQ音乐、酷狗音乐、网易云平台,支持搜索与获取单曲、专辑、歌单的歌词 | Accurate Lyrics (verbatim lyrics) Retrieval Tool, supporti…

作者头像 李华
网站建设 2026/4/4 4:26:25

AGV无人叉车的应用:如何赋能工厂内部物流

随着全球内部物流自动化的加速&#xff0c;AGV已成为制造业和仓储企业提升效率、减少人工依赖、增强运行安全的关键装备。 AiTEN海豚之星——全球领先的无人叉车与内部物流自动化解决方案提供商&#xff0c;依托 全场景产品矩阵、行业级交付能力和核心自研技术&#xff0c;广泛…

作者头像 李华
网站建设 2026/4/12 10:08:13

FastExcel终极指南:如何用Java轻松处理百万级Excel数据

FastExcel终极指南&#xff1a;如何用Java轻松处理百万级Excel数据 【免费下载链接】fastexcel easyexcel作者最新升级版本&#xff0c; 快速、简洁、解决大文件内存溢出的java处理Excel工具 项目地址: https://gitcode.com/gh_mirrors/fast/fastexcel 在当今数据驱动的…

作者头像 李华
网站建设 2026/4/11 23:47:18

ComfyUI Photoshop插件:在Photoshop中集成AI绘画的终极指南

ComfyUI Photoshop插件&#xff1a;在Photoshop中集成AI绘画的终极指南 【免费下载链接】Comfy-Photoshop-SD Download this extension via the ComfyUI manager to establish a connection between ComfyUI and the Auto-Photoshop-SD plugin in Photoshop. https://github.co…

作者头像 李华