自带监控&兼容MDC的线程池

2019-03-27 14:28:37 浏览数 (2)

在使用slf4j的MDC做日志跟踪的时候,会因为MDC不能跨线程导致跟踪失败,此外,为了监控线上服务器的运行状态,也很有必要对线程池的运行情况进行监控。

下面是一个带有线程池监控且兼容MDC的线程池,建议使用!

代码语言:javascript复制
/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by jlevy.
 * Date: 6/14/13
 *
 * @author jlevy
 */
@Slf4j
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {

    @Getter
    final private String name;
    final private boolean useFixedContext;
    final private Map<String, String> fixedContext;

    private static final List<MdcThreadPoolExecutor> EXECUTOR_LIST = new ArrayList<>();

    /**
     * Monitor Thread pool periodically.
     */
    private final static ScheduledExecutorService scheduledExecutorService =
            new ScheduledThreadPoolExecutor(1, r -> {
                final Thread t = new Thread();
                // daemon thread won't block main thread
                t.setDaemon(true);
                t.setName("MDC Thread Monitor");
                t.setPriority(Thread.MIN_PRIORITY);
                return t;
            });

    static {
        scheduledExecutorService.scheduleAtFixedRate(() -> EXECUTOR_LIST.forEach(MdcThreadPoolExecutor::monitor), 0, 1, TimeUnit.MINUTES);
    }

    private static void monitor(MdcThreadPoolExecutor pool) {
        if (!pool.isTerminated()) {
            log.info("MDC Thread Pool[{}]: core:{}, maximum:{}, largest:{}, poolSize:{}, active:{}, queue:{}",
                    pool.getName(), pool.getCorePoolSize(), pool.getMaximumPoolSize(), pool.getLargestPoolSize(),
                    pool.getPoolSize(), pool.getActiveCount(), pool.getQueue().size());
        }
    }


    /**
     * Pool where task threads take MDC from the submitting thread.
     */
    public static MdcThreadPoolExecutor newWithInheritedMdc(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(name, null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * Pool where task threads take fixed MDC from the thread that creates the pool.
     */
    public static MdcThreadPoolExecutor newWithCurrentMdc(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(name, MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
    }

    /**
     * Pool where task threads always have a specified, fixed MDC.
     */
    public static MdcThreadPoolExecutor newWithFixedMdc(String name, Map<String, String> fixedContext, int corePoolSize,
                                                        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                                        BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(name, fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private MdcThreadPoolExecutor(String name, Map<String, String> fixedContext, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.fixedContext = fixedContext;
        useFixedContext = (fixedContext != null);
        this.name = name;
        synchronized (EXECUTOR_LIST) {
            EXECUTOR_LIST.add(this);
        }
    }

    @Override
    protected void terminated() {
        super.terminated();
        log.info("MDC Thread Pool[{}]: TERMINATED");
        synchronized (EXECUTOR_LIST) {
            EXECUTOR_LIST.remove(this);
        }
    }

    private Map<String, String> getContextForTask() {
        return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    private static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            Map<String, String> previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }
}

参考

  • 如何在线程池中使用MDC?
  • 线程池的五种状态
  • Java线程池监控小结

0 人点赞