2025-03-29
实战设计
0

目录

全链路日志处理
Http 处理
Feign 微服务调用处理
RocketMQ 生产消费端处理
源码解读
生产者钩子
消费者钩子
JUC线程池处理
装饰 Callable
装饰 Runnable
Spring异步线程处理

在排查生产日志的时候,发现 RocketMQ 的消费端的日志没有输出 traceId, 导致链路查询无法衔接,通过查看当前项目架构底层的实现,发现原来是没有处理过异步的链路日志

为了查询更便捷,对此架构进行优化,实现完整版的全链路的日志ID记录

全链路日志处理

在 SpringBoot 中,MDC(Mapped Diagnostic Context)是最常见的日志上下文关联方式。

其实现原理就是在请求链路上添加 traceId 到 MDC,在请求链路结束时清除 traceId。

MDC(Mapped Diagnostic Context)

是SLF4J提供的一个线程安全的诊断上下文工具。它允许开发者在同一线程上下文中存储多个键值对信息,这些信息可以自动附加到日志输出中,实现日志的上下文关联。

TraceId(分布式系统中用于唯请求全链路的唯一标识符)

通过 traceId,可以将一个请求在不同服务、不同节点、不同线程中的日志串联起来,形成完整的调用链路,方便快速定位问题、分析性能瓶颈或监控系统行为。

Http 处理

在 SpringMVC 中对于 http 的处理,最常用的是 HandlerInterceptor 接口进行实现,在接口处理前存储到 MDC,在接口的后置处理中清空 MDC 上下文,防止内存泄漏

java
public class MdcHandlerInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { // 解析token,获取用户唯一标识 String userId = "123"; // 接口地址 String url = request.getRequestURI(); // 日志处理 String traceId = request.getHeader(MdcTraceContext.TRACE_HEADER_KEY); MdcTraceContext.initTraceId(userId, url, traceId); return HandlerInterceptor.super.preHandle(request, response, handler); } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { MdcTraceContext.clearTraceId(); } }

在 WebMvcConfig 中添加上拦截器才可生效

java
@Configuration public class MeWebMvcConfig extends WebMvcConfigurationSupport { @Override protected void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new MdcHandlerInterceptor()); } }

MDC 的上下文内容,可以通过 MdcTraceContext 进行获取和设置

看过好多方案,除了定义 MDC 以外还会自己存储一份 ThreadLocal,其实从 MDC 底层来看,MDC 就是 ThreadLocal,所以 MDC 的设置和获取,与自定义 ThreadLocal 没有任何区别,没有必要使用 ThreadLocal 在浪费内存存储一份 traceId。

TraceId 可任意组成,最好带有唯一的识别作用

此处 traceId 的构成是使用用户的唯一标识、接口请求的地址和当前请求的唯一标识,这样在日志中,就可以通过不同内容对 traceId 进行链路追踪,方便快速定位问题。

java
public class MdcTraceContext { /** * logback.xml 中全链路标识的占位符 */ public static final String TRACE_ID_PATTERN = "traceId"; /** * 请求头的链路Key */ public static final String TRACE_HEADER_KEY = "X-Requested-Id"; /** * 当前操作人的唯一标识 */ public static final String $userId = "$userId"; /** * 请求url */ public static final String $url = "$url"; /** * 当前请求的唯一标识 * * <p> 前端添加到请求头中 or 网关生成的uuid </p> */ public static final String $requestId = "$requestId"; /** * 链路ID */ public static String TRACE_ID = "<" + $userId + "> <" + $url + "> <" + $requestId + ">"; /** * 初始化TraceId */ public static void initTraceId(String userId, String url, String requestId) { String traceId = TRACE_ID.replace($userId, StringUtils.isEmpty(userId) ? "" : userId) .replace($url, StringUtils.isEmpty(url) ? "" : url) .replace($requestId, StringUtils.isEmpty(requestId) ? "" : requestId); MDC.put(TRACE_ID_PATTERN, traceId); } /** * 初始化TraceId */ public static void initTraceId(String traceId) { MDC.put(TRACE_ID_PATTERN, traceId); } /** * 获取当前TraceId */ public static String getTraceId() { return MDC.get(TRACE_ID_PATTERN); } /** * 清除TraceId */ public static void clearTraceId() { MDC.remove(TRACE_ID_PATTERN); } /** * 清除底层数据 */ public static void clear() { MDC.clear(); } }

其中 logback.xml 的模板配置需要生效必要设置环境变量,以下提供了dev环境的配置 sppring.profiles.active = dev,可根据需要自行配置

xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <!-- 属性 --> <property name="log.path" value="log"/> <property name="log.maxHistory" value="15"/> <property name="log.colorPattern" value="%magenta(%d{yyyy-MM-dd HH:mm:ss}) %highlight(%-5level) %boldCyan(%X{traceId}) %yellow(%thread) %green(%logger{50}) %msg%n"/> <property name="log.pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %X{traceId} %thread %logger{50} %msg%n"/> <!-- 配置环境 --> <springProfile name="dev"> <!-- 日志文件 --> <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>DEBUG</level> </filter> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <FileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</FileNamePattern> </rollingPolicy> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>${log.colorPattern}</pattern> </encoder> </appender> <!-- 控制台 --> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>DEBUG</level> </filter> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>${log.colorPattern}</pattern> </encoder> </appender> <!-- DEBUG --> <logger name="code72" level="DEBUG" additivity="false"> <appender-ref ref="STDOUT"/> <appender-ref ref="FILE"/> </logger> <!-- INFO --> <root level="INFO"> <appender-ref ref="STDOUT"/> <appender-ref ref="FILE"/> </root> </springProfile> </configuration>

以上内容就实现了Htt请求处理的全链路日志追踪,接下来还有分布式微服务下的处理

Feign 微服务调用处理

在 Feign 中,对于异步处理的处理,也是通过拦截器进行实现,在请求头中添加上traceId,下一个微服务种即可从执行上面的 Http 处理,从请求头获取 traceId 进行链路追踪。

java
@Slf4j @Component public class MdcRequestInterceptor extends MdcTraceContext implements RequestInterceptor { /** * Called for every request. Add data using methods on the supplied {@link RequestTemplate}. * * @param template */ @Override public void apply(RequestTemplate template) { if(StringUtils.isNotBlank(getTraceId())){ template.header(TRACE_HEADER_KEY, getTraceId()); }else{ // 本次请求没有traceId, log.debug("This request does not have a traceId!"); } } }

RocketMQ 生产消费端处理

在 RocketMQ 中,需要在发送消息与消费消息的时候添加钩子进行处理,目前在源码中添加钩子的入口不太好找,需要对源码有一定的了解,要从生产者 DefaultMQProducer 与消费者 DefaultMQPushConsumer 进行拦截处理,具体代码如下:

java
@Slf4j @Component public class MdcApplicationRunner implements ApplicationContextAware, ApplicationRunner { private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } /** * Callback used to run the bean. * * @param args incoming application arguments * @throws Exception on error */ @Override public void run(ApplicationArguments args) throws Exception { // 注册生产者消息hook registerSendMessageHook(); // 注册消费者消息hook registerConsumeMessageHook(); } /** * 注册生产者消息hook */ private void registerSendMessageHook(){ DefaultMQProducer defaultMQProducer = applicationContext.getBean(DefaultMQProducer.class); defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(new MdcSendMessageHook()); log.info("DefaultMQProducer #registerSendMessageHook Success."); } /** * 注册消费者消息hook */ private void registerConsumeMessageHook(){ Map<String, DefaultRocketMQListenerContainer> beansOfType = applicationContext.getBeansOfType(DefaultRocketMQListenerContainer.class); for (DefaultRocketMQListenerContainer defaultRocketMQListenerContainer : beansOfType.values()) { defaultRocketMQListenerContainer.getConsumer().getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new MdcConsumeMessageHook()); log.info("{}:DefaultMQPushConsumer #registerConsumeMessageHook Success.", defaultRocketMQListenerContainer.getRocketMQListener().getClass().getSimpleName()); } } }

::: important 版本兼容问题 对于 RocketMQ 的配置,此处我是基于4.9.3版本的调整,在源码 5.1.0 的版本测试,对于其他版本具体需要对查看源码测试核验 :::

源码解读

对于 DefaultRocketMQListenerContainer 对象,是一个mq监听容器,每个容器对象都是由 @RocketMQMessageListener 注释的消费端生成的,由 Spring 容器管理。

RocketMQMessageListener加载的源码 RocketMQMessageListener注册的源码

在容器内部,创建 DefaultMQPushConsumer 对象,也就是每个容器都会有一个 DefaultMQPushConsumer 对象,所以上述的代码实现中是获取的 Map<String, DefaultRocketMQListenerContainer>,然后对每个容器得 DefaultMQPushConsumer 都进行添加钩子处理。

DefaultMQPushConsumer创建对象的源码

生产者钩子

在发送消息前,从请求的本地线程中获取到traceId,并设置到消息的 Property 属性中即可,保证 traceId 会与消息一起传递

java
@Slf4j public class MdcSendMessageHook extends MdcTraceContext implements SendMessageHook { @Override public String hookName() { return MdcSendMessageHook.log.getName(); } @Override public void sendMessageBefore(SendMessageContext context) { // 从MDC获取TraceId if (StringUtils.isNotEmpty(getTraceId())) { context.getMessage().putUserProperty(TRACE_HEADER_KEY, getTraceId()); } } @Override public void sendMessageAfter(SendMessageContext context) { // 无需处理 } }

消费者钩子

消费者钩子,在消费消息前,从消息的 Property 中获取到 traceId,并设置到 MDC 的本地线程中,注意消息处理完成后,需要清除 MDC 的本地线程,保证线程池中的线程不会影响其他线程的链路追踪。

java
@Slf4j public class MdcConsumeMessageHook extends MdcTraceContext implements ConsumeMessageHook { @Override public String hookName() { return MdcConsumeMessageHook.class.getName(); } @Override public void consumeMessageBefore(ConsumeMessageContext context) { List<MessageExt> msgList = context.getMsgList(); if (!msgList.isEmpty()) { // MDC设置TraceId initTraceId(msgList.get(0).getProperty(TRACE_HEADER_KEY)); } } @Override public void consumeMessageAfter(ConsumeMessageContext context) { // 清除MDC的TraceId clear(); } }

::: warning clear() 对于异步执行的线程,当执行完成时,需要执行 clear() 方法清除全部缓存信息。 :::

JUC线程池处理

对于线程池 ThreadPoolExecutor,核心问题在于子线程无法自动获取父线程的MDC内容。

::: tip 解决方案 使用 装饰器模式 包装任务,封装 RunnableCallable,在任务执行前将父线程的 traceId 传递到子线程的 MDC 中,执行后再清理。 :::

装饰 Callable

java
@Slf4j public class MdcCallable<V> extends MdcTraceContext implements Callable<V> { private Callable<V> callable; private Map<String, String> parentMdc; public MdcCallable(Callable<V> callable) { this.callable = callable; // 捕获父线程的 MDC this.parentMdc = MDC.getCopyOfContextMap(); } /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ @Override public V call() throws Exception { if (parentMdc != null) { // 将 MDC 传递给子线程 MDC.setContextMap(parentMdc); parentMdc.clear(); parentMdc = null; } try { V call = callable.call(); callable = null; return call; } finally { // 清理子线程的 MDC,避免内存泄漏 clear(); } } }

装饰 Runnable

java
@Slf4j public class MdcRunnable extends MdcTraceContext implements Runnable { private Runnable runnable; private Map<String, String> parentMdc; public MdcRunnable(Runnable runnable) { this.runnable = runnable; // 捕获父线程的 MDC this.parentMdc = MDC.getCopyOfContextMap(); } @Override public void run() { if (parentMdc != null) { // 将 MDC 传递给子线程 MDC.setContextMap(parentMdc); parentMdc.clear(); parentMdc = null; } try { runnable.run(); runnable = null; } finally { // 清理子线程的 MDC,避免内存泄漏 clear(); } } }

测试示例:

java
public void test(){ ExecutorService executorService = Executors.newFixedThreadPool(50); for (int i = 0; i < 10; i++) { int finalI = i; executorService.submit(new MdcRunnable(() -> log.info("测试 - " + finalI))); executorService.submit(new MdcCallable<>(() -> { log.info("测试 - " + finalI); return "测试 - " + finalI; })); } }

Spring异步线程处理

Spring 默认的线程池同样存在MDC传递的问题。

::: tip 解决方案 为了确保MDC的上下文能够传递到异步线程,在自定义异步线程池的同时,添加 TaskDecorator 装饰处理, :::

java
@Configuration @EnableAsync public class AsyncConfig extends AsyncConfigurerSupport { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); //线程核心数目 threadPoolTaskExecutor.setCorePoolSize(5); threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true); //最大线程数 threadPoolTaskExecutor.setMaxPoolSize(20); //配置队列大小 threadPoolTaskExecutor.setQueueCapacity(400); //配置线程池前缀 threadPoolTaskExecutor.setThreadNamePrefix("Custom-async-thread-"); //配置拒绝策略 threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //数据初始化 threadPoolTaskExecutor.initialize(); //是否等待所有线程执行完毕才关闭线程池,默认值为false。 threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); //awaitTerminationSeconds:waitForTasksToCompleteOnShutdown的等待的时长,默认值为0,即不等待。 threadPoolTaskExecutor.setAwaitTerminationSeconds(60); // 注入装饰器 threadPoolTaskExecutor.setTaskDecorator(new MdcTaskDecorator()); threadPoolTaskExecutor.initialize(); return threadPoolTaskExecutor; } }
java
public class MdcTaskDecorator implements TaskDecorator { /** * Decorate the given {@code Runnable}, returning a potentially wrapped * {@code Runnable} for actual execution, internally delegating to the * original {@link Runnable#run()} implementation. * * @param runnable the original {@code Runnable} * @return the decorated {@code Runnable} */ @Override public Runnable decorate(Runnable runnable) { return new MdcRunnable(runnable); } }

测试示例:

java
@Slf4j @Service public class MyService { @Async public void asyncMethod() { log.info("异步方法执行,traceId: {}", MdcTraceContext.getTraceId()); } }

本文作者:柳始恭

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!