开启大量线程会有什么问题,如何优化?

2022-12-17 15:03:17 浏览数 (1)

这道题想考察什么?

  • 是否了解线程开启的方式?
  • 开启大量线程会引起什么问题?为什么?怎么优化?

考察的知识点

  • 线程的开启方式
  • 开启大量线程的问题
  • 线程池

考生应该如何回答

1、首先,关于如何开启一个线程,大多数人可能都会说3种,Thread、Runnable、Callback嘛!但事实却不是这样的。看JDK里怎么说的。

代码语言:javascript复制
/**
 * ...
 * There are two ways to create a new thread of execution. One is to
 * declare a class to be a subclass of <code>Thread</code>. 
 * The other way to create a thread is to declare a class that
 * implements the <code>Runnable</code> interface.
 * ....
 */
public class Thread implements Runnable{
      
}

Thread源码的类描述中有这样一段,翻译一下,只有两种方法去创建一个执行线程,一种是声明一个Thread的子类,另一种是创建一个类去实现Runnable接口。惊不惊讶,并没有提到Callback。

  • 继承Thread类
代码语言:javascript复制
public class ThreadUnitTest {

    @Test
    public void testThread() {
        //创建MyThread实例
        MyThread myThread = new MyThread();
        //调用线程start的方法,进入可执行状态
        myThread.start();
    }

    //继承Thread类,重写内部run方法
    static class MyThread extends Thread {

        @Override
        public void run() {
            System.out.println("test MyThread run");
        }
    }
}
  • 实现Runnable接口
代码语言:javascript复制
public class ThreadUnitTest {

    @Test
    public void testRunnable() {
        //创建MyRunnable实例,这其实只是一个任务,并不是线程
        MyRunnable myRunnable = new MyRunnable();
        //交给线程去执行
        new Thread(myRunnable).start();
    }

    //实现Runnable接口,并实现内部run方法
    static class MyRunnable implements Runnable {

        @Override
        public void run() {
            System.out.println("test MyRunnable run");
        }
    }
}
  • 还是看看Callable是怎么回事吧
代码语言:javascript复制
public class ThreadUnitTest {

    @Test
    public void testCallable() {
        //创建MyCallable实例,需要与FutureTask结合使用
        MyCallable myCallable = new MyCallable();
        //创建FutureTask,与Runnable一样,也只能算是个任务
        FutureTask<String> futureTask = new FutureTask<>(myCallable);
        //交给线程去执行
        new Thread(futureTask).start();

        try {
            //get方法获取任务返回值,该方法是阻塞的
            String result = futureTask.get();
            System.out.println(result);
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //实现Callable接口,并实现call方法,不同之处是该方法有返回值
    static class MyCallable implements Callable<String> {

        @Override
        public String call() throws Exception {
            Thread.sleep();
            return "test MyCallable run";
        }
    }
}

Callable的方式必须与FutureTask结合使用,我们看看FutureTask的继承关系

代码语言:javascript复制
//FutureTask实现了RunnableFuture接口
public class FutureTask<V> implements RunnableFuture<V> {

}

//RunnableFuture接口继承Runnable和Future接口
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

真相大白了,其实实现Callback接口创建线程的方式,归根到底就是Runnable方式,只不过它是在Runnable的基础上又增加了一些能力,例如取消任务执行等。

2、开启线程的几种方式算是答上了,那开启大量线程,或者说频繁开启线程到底会引起什么问题呢?

众所周知,在Java中,调用Thread的start方法后,该线程即置为就绪状态,等待CPU的调度。这个流程里有两个关注点需要去理解。

start内部怎样开启线程的?看看start方法是怎么实现的。

代码语言:javascript复制
// Thread类的start方法
public synchronized void start() {
        // 一系列状态检查
        if (threadStatus != )
            throw new IllegalThreadStateException();
   
        group.add(this);
           
        boolean started = false;
        try {
             //调用start0方法,真正启动java线程的地方
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                 group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
            }
        }
    }
   
//start0方法是一个native方法
private native void start0();

JVM中,native方法与java方法存在一个映射关系,Java中的start0对应c层的JVM_StartThread方法,我们继续看一下。

代码语言:javascript复制
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_StartThread");
  JavaThread *native_thread = NULL;
  bool throw_illegal_thread_state = false;
  {
   
    MutexLocker mu(Threads_lock);
    // 判断Java线程是否已经启动,如果已经启动过,则会抛异常。
    if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
      throw_illegal_thread_state = true;
    } else {
      //如果没有启动过,走到这里else分支,去创建线程
      //分配c  线程结构并创建native线程
      jlong size =
             java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
   
      size_t sz = size >  ? (size_t) size : ;
      //注意这里new JavaThread
      native_thread = new JavaThread(&thread_entry, sz);
      if (native_thread->osthread() != NULL) {
        native_thread->prepare(jthread);
      }
    }
  }
  ......
  Thread::start(native_thread);

走到这里发现,Java层已经过渡到native层,但远远还没结束。

代码语言:javascript复制
JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
                          Thread()
   {
     initialize();
     _jni_attach_state = _not_attaching_via_jni;
     set_entry_point(entry_point);
     os::ThreadType thr_type = os::java_thread;
     thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
                                                        os::java_thread;
     //根据平台,调用create_thread,创建真正的内核线程                       
     os::create_thread(this, thr_type, stack_sz);
   }
   
   bool os::create_thread(Thread* thread, ThreadType thr_type,
                          size_t req_stack_size) {
       ......
       pthread_t tid;
       //利用pthread_create()来创建线程
       int ret = pthread_create(&tid, &attr, (void* (*)(void*)) thread_native_entry, thread);
       ......
       return true;
}

pthread_create方法,第三个参数表示启动这个线程后要执行的方法的入口,第四个参数表示要给这个方法传入的参数。

代码语言:javascript复制
static void *thread_native_entry(Thread *thread) {
  ......
  //thread_native_entry方法的最下面的run方法,这个thread就是上面传递下来的参数,也就是JavaThread
  thread->run();
  ......
  return 0;
}

终于开始执行run方法了!!!

代码语言:javascript复制
//thread.cpp类
void JavaThread::run() {
  ......
  //调用内部thread_main_inner  
  thread_main_inner();
}
   
void JavaThread::thread_main_inner() {
  if (!this->has_pending_exception() &&
   !java_lang_Thread::is_stillborn(this->threadObj())) {
    {
      ResourceMark rm(this);
      this->set_native_thread_name(this->get_thread_name());
    }
    HandleMark hm(this);
    //注意:内部通过JavaCalls模块,调用了Java线程要执行的run方法
    this->entry_point()(this, this);
  }
  DTRACE_THREAD_PROBE(stop, this);
  this->exit(false);
  delete this;
}

一条U字型代码调用链总算结束了,高呼一声,原来start之后做了这么多事情呀,稍微总结一下。

  • Java中调用Thread的star方法,通过JNI方式,调用到native层。
  • native层,JVM通过pthread_create方法创建一个系统内核线程,并指定内核线程的初始运行地址,即一个方法指针。
  • 在内核线程的初始运行方法中,利用JavaCalls模块,回调到java线程的run方法,开始java级别的线程执行。

线程开启后CPU调度会发生什么?

计算机的世界里,CPU会分为若干时间片,通过各种算法分配时间片来执行任务,有耳熟能详时间片轮转调度算法、短进程优先算法、优先级算法等。当一个任务的时间片用完,就会切换到另一个任务。在切换之前会保存上一个任务的状态,当下次再切换到该任务,就会加载这个状态, 这就是所谓的线程的上下文切换。

很明显,上下文的切换是有开销的,包括很多方面,操作系统保存和恢复上下文的开销、线程调度器调度线程的开销和高速缓存重新加载的开销等。

经过上面两个理论基础的回顾,开启大量线程引起的问题,总结起来,就两个字——开销。

  • 消耗时间。 线程的创建和销毁都需要时间,当数量太大的时候,会影响效率。
  • 消耗内存。 创建更多的线程会消耗更多的内存,这是毋庸置疑的。线程频繁创建与销毁,还有可能引起内存抖动,频繁触发GC,最直接的表现就是卡顿。长而久之,内存资源占用过多或者内存碎片过多,系统甚至会出现OOM。
  • 消耗CPU。 在操作系统中,CPU都是遵循时间片轮转机制进行处理任务,线程数过多,必然会引起CPU频繁的进行线程上下文切换。这个代价是昂贵的,某些场景下甚至超过任务本身的消耗。

3、针对上面提及到的问题,我们自然需要进行优化。线程的本质是为了执行任务,在计算机的世界里,任务分大致分为两类,CPU密集型任务和IO密集型任务。

  • CPU密集型任务,比如公式计算、资源解码等。这类任务要进行大量的计算,全都依赖CPU的运算能力,持久消耗CPU资源。所以针对这类任务,其实不应该开启大量线程。因为线程越多,花在线程切换的时间就越多,CPU执行效率就越低,一般CPU密集型任务同时进行的数量等于CPU的核心数,最多再加个1。
  • IO密集型任务,比如网络读写、文件读写等。这类任务不需要消耗太多的CPU资源,绝大部分时间是在IO操作上。所以针对这类任务,可以开启大量线程去提高CPU的执行效率,一般IO密集型任务同时进行的数量等于CPU的核心数的两倍。

另外,在无法避免,必须要开启大量线程的情况下,我们也可以使用线程池代替直接创建线程的做法进行优化。线程池的基本作用就是复用已有的线程,从而减少线程的创建,降低开销。

在Java中,线程池的使用还是非常方便的,JDK中提供了现成的ThreadPoolExecutor类,我们只需要按照自己的需求进行相应的参数配置即可,这里提供一个示例。

代码语言:javascript复制
/**
 * 线程池使用
 */
public class ThreadPoolService {

    /**
     * 线程池变量
     */
    private ThreadPoolExecutor mThreadPoolExecutor;

    private static volatile ThreadPoolService sInstance = null;

    /**
     * 线程池中的核心线程数,默认情况下,核心线程一直存活在线程池中,即便他们在线程池中处于闲置状态。
     * 除非我们将ThreadPoolExecutor的allowCoreThreadTimeOut属性设为true的时候,这时候处于闲置的核心         * 线程在等待新任务到来时会有超时策略,这个超时时间由keepAliveTime来指定。一旦超过所设置的超时时间,闲     * 置的核心线程就会被终止。
     * CPU密集型任务  N 1   IO密集型任务   2*N
     */
    private final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors()   ;
    /**
     * 线程池中所容纳的最大线程数,如果活动的线程达到这个数值以后,后续的新任务将会被阻塞。包含核心线程数 非*      * 核心线程数。
     */
    private final int MAXIMUM_POOL_SIZE = Math.max(CORE_POOL_SIZE, );
    /**
     * 非核心线程闲置时的超时时长,对于非核心线程,闲置时间超过这个时间,非核心线程就会被回收。
     * 只有对ThreadPoolExecutor的allowCoreThreadTimeOut属性设为true的时候,这个超时时间才会对核心线       * 程产生效果。
     */
    private final long KEEP_ALIVE_TIME = ;
    /**
     * 用于指定keepAliveTime参数的时间单位。
     */
    private final TimeUnit UNIT = TimeUnit.SECONDS;
    /**
     * 线程池中保存等待执行的任务的阻塞队列
     * ArrayBlockingQueue  基于数组实现的有界的阻塞队列
     * LinkedBlockingQueue  基于链表实现的阻塞队列
     * SynchronousQueue   内部没有任何容量的阻塞队列。在它内部没有任何的缓存空间
     * PriorityBlockingQueue   具有优先级的无限阻塞队列。
     */
    private final BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingDeque<>();
    /**
     * 线程工厂,为线程池提供新线程的创建。ThreadFactory是一个接口,里面只有一个newThread方法。 默认为DefaultThreadFactory类。
     */
    private final ThreadFactory THREAD_FACTORY = Executors.defaultThreadFactory();
    /**
     * 拒绝策略,当任务队列已满并且线程池中的活动线程已经达到所限定的最大值或者是无法成功执行任务,这时候       * ThreadPoolExecutor会调用RejectedExecutionHandler中的rejectedExecution方法。
     * CallerRunsPolicy  只用调用者所在线程来运行任务。
     * AbortPolicy  直接抛出RejectedExecutionException异常。
     * DiscardPolicy  丢弃掉该任务,不进行处理。
     * DiscardOldestPolicy   丢弃队列里最近的一个任务,并执行当前任务。
     */
    private final RejectedExecutionHandler REJECTED_HANDLER = new ThreadPoolExecutor.AbortPolicy();

    private ThreadPoolService() {
    }

    /**
     * 单例
     * @return
     */
    public static ThreadPoolService getInstance() {
        if (sInstance == null) {
            synchronized (ThreadPoolService.class) {
                if (sInstance == null) {
                    sInstance = new ThreadPoolService();
                    sInstance.initThreadPool();
                }
            }
        }
        return sInstance;
    }

    /**
     * 初始化线程池
     */
    private void initThreadPool() {
        try {
            mThreadPoolExecutor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE,
                    MAXIMUM_POOL_SIZE,
                    KEEP_ALIVE_TIME,
                    UNIT,
                    WORK_QUEUE,
                    THREAD_FACTORY,
                    REJECTED_HANDLER);
        } catch (Exception e) {
            LogUtil.printStackTrace(e);
        }
    }

    /**
     * 向线程池提交任务,无返回值
     *
     * @param runnable
     */
    public void post(Runnable runnable) {
        mThreadPoolExecutor.execute(runnable);
    }

    /**
     * 向线程池提交任务,有返回值
     *
     * @param callable
     */
    public <T> Future<T> post(Callable<T> callable) {
        RunnableFuture<T> task = new FutureTask<T>(callable);
        mThreadPoolExecutor.execute(task);
        return task;
    }
}

0 人点赞