java线程池(八):ForkJoinPool源码分析之四(ForkJoinWorkerThread源码)

2020-10-29 10:43:51 浏览数 (1)

1.类结构及其成员变量

1.1 类结构和注释

类结构代码如下:

代码语言:javascript复制
public class ForkJoinWorkerThread extends Thread {
    
}

ForkJoinWorkerThread继承了Thread类,其注释大意如下:

ForkJoinWorkerThread是由ForkJoinPool管理的线程,该线程执行ForkJoinTask。此类仅可做为扩展功能的需要而被集成,因为没有提供可以调度或者可重新的方法。但是,你可以覆盖主任务处理循环周围初始化和终止方法。如果确实创建了这个类的子类,还需要在ForkJoinPool中提供自定义的ForkJoinWorkerThreadFactory来使用。

1.2 常量

主要有两个:

代码语言:javascript复制
final ForkJoinPool pool;                // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

注释大意: ForkJoinWorkerThreads由ForkJoinPools管理,并执行ForkJoinTasks。请参见ForkJoinPool的内部文档。此类仅仅维护了指向pool和WorkQueue的链接。pool字段在构造的时候直接设置。但是直到对registerWorker调用完成之后,才设置workQueue字段。这将导致可见性竞争,可以通过要求workQueue字段仅由其所属线程访问来规避这个问题。对于InnocuousForkJoinWorkerThread子类的支持,要求我们在此处和子类中破坏很多封装,通过Unsafe以访问和设置Thread字段。 这是两个final修饰的常量,智能初始化一次。

2.构造函数

2.1 ForkJoinWorkerThread(ForkJoinPool pool)

代码语言:javascript复制
/**
 * Creates a ForkJoinWorkerThread operating in the given pool.
 *
 * @param pool the pool this thread works in
 * @throws NullPointerException if pool is null
 */
protected ForkJoinWorkerThread(ForkJoinPool pool) {
    // Use a placeholder until a useful name can be set in registerWorker
    super("aForkJoinWorkerThread");
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);
}

其构造函数主要是创建一个在给定pool中的ForkJoinWorkerThread。 构造函数中最主要的方法就是registerWorker。

2.2 ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc)

代码语言:javascript复制
ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
                     AccessControlContext acc) {
    super(threadGroup, null, "aForkJoinWorkerThread");
    U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
    eraseThreadLocals(); // clear before registering
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);
}

这个方法需要注意的是,采用unSafe方法,在这个类的INHERITEDACCESSCONTROLCONTEXT位置处,设置传入的AccessControlContext对象。 之后调用方法eraseThreadLocals将threadLocals清除。 之后与同用的构造函数一致。 擦除ThreadLocal也是采用UnSafe来完成。通过putObject将ThreadLocals的位置设置为null。

代码语言:javascript复制
/**
 * Erases ThreadLocals by nulling out Thread maps.
 */
final void eraseThreadLocals() {
    U.putObject(this, THREADLOCALS, null);
    U.putObject(this, INHERITABLETHREADLOCALS, null);
}

实际上这个方法将会提供给InnocuousForkJoinWorkerThread继承的时候使用。

3.重要的方法

3.1 run

做为Thread,最重要的就是run方法,我们来看看ForkJoinWorkerThread的实现:

代码语言:javascript复制
public void run() {
     //如果workQueue为空,则抛出异常
    if (workQueue.array == null) { // only run once
        Throwable exception = null;
        try {
           //调用onStart方法
            onStart();
            //调用pool的runWorker方法,运行workQueue
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            exception = ex;
        } finally {
           //操作完之后处理
            try {
               //调用onTermination方法
                onTermination(exception);
            } catch (Throwable ex) {
               //如果出现异常,则进行异常处理
                if (exception == null)
                    exception = ex;
            } finally {
               //最终需要指向deregister方法
                pool.deregisterWorker(this, exception);
            }
        }
    }
}

runWorker方法实际上是对workQueue结合随机魔数,选择一个workQueue进行遍历,调用scan方法,如果不为空则执行,反之则wait。 其中registerWorker与deregisterWorker方法 ,我们可以参考前面的ForkJoinPool源码解读。

3.2 定义的可扩展方法

由于ForkJoinWorkerThread还支持继承扩展,因此在此定义了两个扩展的方法:

代码语言:javascript复制
/**
 * Initializes internal state after construction but before
 * processing any tasks. If you override this method, you must
 * invoke {@code super.onStart()} at the beginning of the method.
 * Initialization requires care: Most fields must have legal
 * default values, to ensure that attempted accesses from other
 * threads work correctly even before this thread starts
 * processing tasks.
 */
protected void onStart() {
}

/**
 * Performs cleanup associated with termination of this worker
 * thread.  If you override this method, you must invoke
 * {@code super.onTermination} at the end of the overridden method.
 *
 * @param exception the exception causing this thread to abort due
 * to an unrecoverable error, or {@code null} if completed normally
 */
protected void onTermination(Throwable exception) {
}

这两个方法用于执行之前和之后,onStart用于run实际执行之前,执行一些初始化操作。onTermination用于run实际执行之后,执行一些清理操作。

4.内部类InnocuousForkJoinWorkerThread

这个类就是继承了ForkJoinWorkerThread的一个实现类。此类定义了一个没有任何权限、也非用户定义的任何线程组的线程。这个线程在运行完每个top的task之后,会擦除所有的ThreadLocals。

源码如下:

代码语言:javascript复制
static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
    /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
    private static final ThreadGroup innocuousThreadGroup =
        createThreadGroup();

    /** An AccessControlContext supporting no privileges */
    private static final AccessControlContext INNOCUOUS_ACC =
        new AccessControlContext(
            new ProtectionDomain[] {
                new ProtectionDomain(null, null)
            });

    InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
        super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
    }

    @Override // to erase ThreadLocals
    void afterTopLevelExec() {
        eraseThreadLocals();
    }

    @Override // to always report system loader
    public ClassLoader getContextClassLoader() {
        return ClassLoader.getSystemClassLoader();
    }

    @Override // to silently fail
    public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }

    @Override // paranoically
    public void setContextClassLoader(ClassLoader cl) {
        throw new SecurityException("setContextClassLoader");
    }

    /**
     * Returns a new group with the system ThreadGroup (the
     * topmost, parent-less group) as parent.  Uses Unsafe to
     * traverse Thread.group and ThreadGroup.parent fields.
     */
    private static ThreadGroup createThreadGroup() {
        try {
            sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            Class<?> gk = ThreadGroup.class;
            long tg = u.objectFieldOffset(tk.getDeclaredField("group"));
            long gp = u.objectFieldOffset(gk.getDeclaredField("parent"));
            ThreadGroup group = (ThreadGroup)
                u.getObject(Thread.currentThread(), tg);
            while (group != null) {
                ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
                if (parent == null)
                    return new ThreadGroup(group,
                                           "InnocuousForkJoinWorkerThreadGroup");
                group = parent;
            }
        } catch (Exception e) {
            throw new Error(e);
        }
        // fall through if null as cannot-happen safeguard
        throw new Error("Cannot create ThreadGroup");
    }
}

AccessControlContext INNOCUOUS_ACC 定义了一个不支持任何特权访问的AccessControlContext。这个类会创建一个单独的threadGroup,以确保其不属于任何一个用户创建的ThreadGroup。

5. ForkJoinPool中创建工作线程的过程

此时再来结合ForkJoinPool中的ForkJoinWorkerThreadFactory,就能明白ForkJoinThread的创建意义了。ForkJoinPool根据访问权限的需要,定义了采用默认的创建方法,还是创建InnocuousForkJoinWorkerThread。

5.1 makeCommonPool创建过程

再ForkJoinPool重的makeCommPool,有如下代码:

代码语言:javascript复制
if (factory == null) {
    if (System.getSecurityManager() == null)
        factory = defaultForkJoinWorkerThreadFactory;
    else // use security-managed default
        factory = new InnocuousForkJoinWorkerThreadFactory();
}

这里也就是说,如果System.getSecurityManager()为null,则返回默认的ThreadFactory,而不为null,则说, 使用了默认的安全管理级别,因此将创建InnocuousForkJoinWorkerThreadFactory。

5.2 createWorker创建过程

代码语言:javascript复制
ForkJoinWorkerThreadFactory fac = factory;
try {
        if (fac != null && (wt = fac.newThread(this)) != null) {
            wt.start();
            return true;
        }
    } catch (Throwable rex) {
        ex = rex;
    }

也就是说,createWorker根据ForkJoinWorkerThreadFactory的实现类来创建。

5.3 InnocuousForkJoinWorkerThreadFactory

代码语言:javascript复制
static final class InnocuousForkJoinWorkerThreadFactory
    implements ForkJoinWorkerThreadFactory {

    /**
     * An ACC to restrict permissions for the factory itself.
     * The constructed workers have no permissions set.
     */
    private static final AccessControlContext innocuousAcc;
    static {
        Permissions innocuousPerms = new Permissions();
        innocuousPerms.add(modifyThreadPermission);
        innocuousPerms.add(new RuntimePermission(
                               "enableContextClassLoaderOverride"));
        innocuousPerms.add(new RuntimePermission(
                               "modifyThreadGroup"));
        innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
                new ProtectionDomain(null, innocuousPerms)
            });
    }

    public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread)
            java.security.AccessController.doPrivileged(
                new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
                public ForkJoinWorkerThread run() {
                    return new ForkJoinWorkerThread.
                        InnocuousForkJoinWorkerThread(pool);
                }}, innocuousAcc);
    }
}

5.4 DefaultForkJoinWorkerThreadFactory

代码语言:javascript复制
static final class DefaultForkJoinWorkerThreadFactory
    implements ForkJoinWorkerThreadFactory {
    public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        return new ForkJoinWorkerThread(pool);
    }
}

6.总结

ForkJoinWorkerThread实际上非常简单,就是结合ForkJoinPool,然后根据其需要,创建合适的线程的过程。这里面值得我们借鉴的是,如果需要创建无其他访问权限的线程,实际上这两种线程大部分内容都是相同的,因此可以通过继承来复用大部分代码。之后定义两个factory,让最终的用户根据需要选择factory。

0 人点赞