在排查生产日志的时候,发现 RocketMQ 的消费端的日志没有输出 traceId, 导致链路查询无法衔接,通过查看当前项目架构底层的实现,发现原来是没有处理过异步的链路日志
为了查询更便捷,对此架构进行优化,实现完整版的全链路的日志ID记录
在 SpringBoot 中,MDC(Mapped Diagnostic Context)是最常见的日志上下文关联方式。
其实现原理就是在请求链路上添加 traceId 到 MDC,在请求链路结束时清除 traceId。
MDC(Mapped Diagnostic Context)
是SLF4J提供的一个线程安全的诊断上下文工具。它允许开发者在同一线程上下文中存储多个键值对信息,这些信息可以自动附加到日志输出中,实现日志的上下文关联。
TraceId(分布式系统中用于唯请求全链路的唯一标识符)
通过 traceId,可以将一个请求在不同服务、不同节点、不同线程中的日志串联起来,形成完整的调用链路,方便快速定位问题、分析性能瓶颈或监控系统行为。
在 SpringMVC 中对于 http 的处理,最常用的是 HandlerInterceptor
接口进行实现,在接口处理前存储到 MDC,在接口的后置处理中清空 MDC 上下文,防止内存泄漏
javapublic class MdcHandlerInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 解析token,获取用户唯一标识
String userId = "123";
// 接口地址
String url = request.getRequestURI();
// 日志处理
String traceId = request.getHeader(MdcTraceContext.TRACE_HEADER_KEY);
MdcTraceContext.initTraceId(userId, url, traceId);
return HandlerInterceptor.super.preHandle(request, response, handler);
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
MdcTraceContext.clearTraceId();
}
}
在 WebMvcConfig 中添加上拦截器才可生效
java@Configuration
public class MeWebMvcConfig extends WebMvcConfigurationSupport {
@Override
protected void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new MdcHandlerInterceptor());
}
}
MDC 的上下文内容,可以通过 MdcTraceContext 进行获取和设置
看过好多方案,除了定义 MDC 以外还会自己存储一份 ThreadLocal,其实从 MDC 底层来看,MDC 就是 ThreadLocal,所以 MDC 的设置和获取,与自定义 ThreadLocal 没有任何区别,没有必要使用 ThreadLocal 在浪费内存存储一份 traceId。
TraceId 可任意组成,最好带有唯一的识别作用
此处 traceId 的构成是使用用户的唯一标识、接口请求的地址和当前请求的唯一标识,这样在日志中,就可以通过不同内容对 traceId 进行链路追踪,方便快速定位问题。
javapublic class MdcTraceContext {
/**
* logback.xml 中全链路标识的占位符
*/
public static final String TRACE_ID_PATTERN = "traceId";
/**
* 请求头的链路Key
*/
public static final String TRACE_HEADER_KEY = "X-Requested-Id";
/**
* 当前操作人的唯一标识
*/
public static final String $userId = "$userId";
/**
* 请求url
*/
public static final String $url = "$url";
/**
* 当前请求的唯一标识
*
* <p> 前端添加到请求头中 or 网关生成的uuid </p>
*/
public static final String $requestId = "$requestId";
/**
* 链路ID
*/
public static String TRACE_ID = "<" + $userId + "> <" + $url + "> <" + $requestId + ">";
/**
* 初始化TraceId
*/
public static void initTraceId(String userId, String url, String requestId) {
String traceId = TRACE_ID.replace($userId, StringUtils.isEmpty(userId) ? "" : userId)
.replace($url, StringUtils.isEmpty(url) ? "" : url)
.replace($requestId, StringUtils.isEmpty(requestId) ? "" : requestId);
MDC.put(TRACE_ID_PATTERN, traceId);
}
/**
* 初始化TraceId
*/
public static void initTraceId(String traceId) {
MDC.put(TRACE_ID_PATTERN, traceId);
}
/**
* 获取当前TraceId
*/
public static String getTraceId() {
return MDC.get(TRACE_ID_PATTERN);
}
/**
* 清除TraceId
*/
public static void clearTraceId() {
MDC.remove(TRACE_ID_PATTERN);
}
/**
* 清除底层数据
*/
public static void clear() {
MDC.clear();
}
}
其中 logback.xml 的模板配置需要生效必要设置环境变量,以下提供了dev环境的配置 sppring.profiles.active = dev
,可根据需要自行配置
xml<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 属性 -->
<property name="log.path" value="log"/>
<property name="log.maxHistory" value="15"/>
<property name="log.colorPattern" value="%magenta(%d{yyyy-MM-dd HH:mm:ss}) %highlight(%-5level) %boldCyan(%X{traceId}) %yellow(%thread) %green(%logger{50}) %msg%n"/>
<property name="log.pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %X{traceId} %thread %logger{50} %msg%n"/>
<!-- 配置环境 -->
<springProfile name="dev">
<!-- 日志文件 -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>DEBUG</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</FileNamePattern>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${log.colorPattern}</pattern>
</encoder>
</appender>
<!-- 控制台 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>DEBUG</level>
</filter>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${log.colorPattern}</pattern>
</encoder>
</appender>
<!-- DEBUG -->
<logger name="code72" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE"/>
</logger>
<!-- INFO -->
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE"/>
</root>
</springProfile>
</configuration>
以上内容就实现了Htt请求处理的全链路日志追踪,接下来还有分布式微服务下的处理
在 Feign 中,对于异步处理的处理,也是通过拦截器进行实现,在请求头中添加上traceId,下一个微服务种即可从执行上面的 Http 处理,从请求头获取 traceId 进行链路追踪。
java@Slf4j
@Component
public class MdcRequestInterceptor extends MdcTraceContext implements RequestInterceptor {
/**
* Called for every request. Add data using methods on the supplied {@link RequestTemplate}.
*
* @param template
*/
@Override
public void apply(RequestTemplate template) {
if(StringUtils.isNotBlank(getTraceId())){
template.header(TRACE_HEADER_KEY, getTraceId());
}else{
// 本次请求没有traceId,
log.debug("This request does not have a traceId!");
}
}
}
在 RocketMQ 中,需要在发送消息与消费消息的时候添加钩子进行处理,目前在源码中添加钩子的入口不太好找,需要对源码有一定的了解,要从生产者 DefaultMQProducer
与消费者 DefaultMQPushConsumer
进行拦截处理,具体代码如下:
java@Slf4j
@Component
public class MdcApplicationRunner implements ApplicationContextAware, ApplicationRunner {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* Callback used to run the bean.
*
* @param args incoming application arguments
* @throws Exception on error
*/
@Override
public void run(ApplicationArguments args) throws Exception {
// 注册生产者消息hook
registerSendMessageHook();
// 注册消费者消息hook
registerConsumeMessageHook();
}
/**
* 注册生产者消息hook
*/
private void registerSendMessageHook(){
DefaultMQProducer defaultMQProducer = applicationContext.getBean(DefaultMQProducer.class);
defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(new MdcSendMessageHook());
log.info("DefaultMQProducer #registerSendMessageHook Success.");
}
/**
* 注册消费者消息hook
*/
private void registerConsumeMessageHook(){
Map<String, DefaultRocketMQListenerContainer> beansOfType = applicationContext.getBeansOfType(DefaultRocketMQListenerContainer.class);
for (DefaultRocketMQListenerContainer defaultRocketMQListenerContainer : beansOfType.values()) {
defaultRocketMQListenerContainer.getConsumer().getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new MdcConsumeMessageHook());
log.info("{}:DefaultMQPushConsumer #registerConsumeMessageHook Success.", defaultRocketMQListenerContainer.getRocketMQListener().getClass().getSimpleName());
}
}
}
::: important 版本兼容问题 对于 RocketMQ 的配置,此处我是基于4.9.3版本的调整,在源码 5.1.0 的版本测试,对于其他版本具体需要对查看源码测试核验 :::
对于 DefaultRocketMQListenerContainer
对象,是一个mq监听容器,每个容器对象都是由 @RocketMQMessageListener
注释的消费端生成的,由 Spring 容器管理。
在容器内部,创建 DefaultMQPushConsumer
对象,也就是每个容器都会有一个 DefaultMQPushConsumer
对象,所以上述的代码实现中是获取的 Map<String, DefaultRocketMQListenerContainer>
,然后对每个容器得 DefaultMQPushConsumer
都进行添加钩子处理。
在发送消息前,从请求的本地线程中获取到traceId,并设置到消息的 Property 属性中即可,保证 traceId 会与消息一起传递
java@Slf4j
public class MdcSendMessageHook extends MdcTraceContext implements SendMessageHook {
@Override
public String hookName() {
return MdcSendMessageHook.log.getName();
}
@Override
public void sendMessageBefore(SendMessageContext context) {
// 从MDC获取TraceId
if (StringUtils.isNotEmpty(getTraceId())) {
context.getMessage().putUserProperty(TRACE_HEADER_KEY, getTraceId());
}
}
@Override
public void sendMessageAfter(SendMessageContext context) {
// 无需处理
}
}
消费者钩子,在消费消息前,从消息的 Property 中获取到 traceId,并设置到 MDC 的本地线程中,注意消息处理完成后,需要清除 MDC 的本地线程,保证线程池中的线程不会影响其他线程的链路追踪。
java@Slf4j
public class MdcConsumeMessageHook extends MdcTraceContext implements ConsumeMessageHook {
@Override
public String hookName() {
return MdcConsumeMessageHook.class.getName();
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
List<MessageExt> msgList = context.getMsgList();
if (!msgList.isEmpty()) {
// MDC设置TraceId
initTraceId(msgList.get(0).getProperty(TRACE_HEADER_KEY));
}
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
// 清除MDC的TraceId
clear();
}
}
::: warning clear()
对于异步执行的线程,当执行完成时,需要执行 clear()
方法清除全部缓存信息。
:::
对于线程池 ThreadPoolExecutor,核心问题在于子线程无法自动获取父线程的MDC内容。
::: tip 解决方案
使用 装饰器模式 包装任务,封装 Runnable
或 Callable
,在任务执行前将父线程的 traceId 传递到子线程的 MDC 中,执行后再清理。
:::
java@Slf4j
public class MdcCallable<V> extends MdcTraceContext implements Callable<V> {
private Callable<V> callable;
private Map<String, String> parentMdc;
public MdcCallable(Callable<V> callable) {
this.callable = callable;
// 捕获父线程的 MDC
this.parentMdc = MDC.getCopyOfContextMap();
}
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
@Override
public V call() throws Exception {
if (parentMdc != null) {
// 将 MDC 传递给子线程
MDC.setContextMap(parentMdc);
parentMdc.clear();
parentMdc = null;
}
try {
V call = callable.call();
callable = null;
return call;
} finally {
// 清理子线程的 MDC,避免内存泄漏
clear();
}
}
}
java@Slf4j
public class MdcRunnable extends MdcTraceContext implements Runnable {
private Runnable runnable;
private Map<String, String> parentMdc;
public MdcRunnable(Runnable runnable) {
this.runnable = runnable;
// 捕获父线程的 MDC
this.parentMdc = MDC.getCopyOfContextMap();
}
@Override
public void run() {
if (parentMdc != null) {
// 将 MDC 传递给子线程
MDC.setContextMap(parentMdc);
parentMdc.clear();
parentMdc = null;
}
try {
runnable.run();
runnable = null;
} finally {
// 清理子线程的 MDC,避免内存泄漏
clear();
}
}
}
测试示例:
javapublic void test(){
ExecutorService executorService = Executors.newFixedThreadPool(50);
for (int i = 0; i < 10; i++) {
int finalI = i;
executorService.submit(new MdcRunnable(() -> log.info("测试 - " + finalI)));
executorService.submit(new MdcCallable<>(() -> {
log.info("测试 - " + finalI);
return "测试 - " + finalI;
}));
}
}
Spring 默认的线程池同样存在MDC传递的问题。
::: tip 解决方案
为了确保MDC的上下文能够传递到异步线程,在自定义异步线程池的同时,添加 TaskDecorator
装饰处理,
:::
java@Configuration
@EnableAsync
public class AsyncConfig extends AsyncConfigurerSupport {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//线程核心数目
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
//最大线程数
threadPoolTaskExecutor.setMaxPoolSize(20);
//配置队列大小
threadPoolTaskExecutor.setQueueCapacity(400);
//配置线程池前缀
threadPoolTaskExecutor.setThreadNamePrefix("Custom-async-thread-");
//配置拒绝策略
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//数据初始化
threadPoolTaskExecutor.initialize();
//是否等待所有线程执行完毕才关闭线程池,默认值为false。
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//awaitTerminationSeconds:waitForTasksToCompleteOnShutdown的等待的时长,默认值为0,即不等待。
threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
// 注入装饰器
threadPoolTaskExecutor.setTaskDecorator(new MdcTaskDecorator());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
javapublic class MdcTaskDecorator implements TaskDecorator {
/**
* Decorate the given {@code Runnable}, returning a potentially wrapped
* {@code Runnable} for actual execution, internally delegating to the
* original {@link Runnable#run()} implementation.
*
* @param runnable the original {@code Runnable}
* @return the decorated {@code Runnable}
*/
@Override
public Runnable decorate(Runnable runnable) {
return new MdcRunnable(runnable);
}
}
测试示例:
java@Slf4j
@Service
public class MyService {
@Async
public void asyncMethod() {
log.info("异步方法执行,traceId: {}", MdcTraceContext.getTraceId());
}
}
本文作者:柳始恭
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!