V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
shysh95
V2EX  ›  Java

谈谈 Java 线程池

  •  
  •   shysh95 · 2021-04-20 23:05:51 +08:00 · 2063 次点击
    这是一个创建于 1073 天前的主题,其中的信息可能已经有所发展或是发生改变。

    摘要

    1. 线程池任务执行机制
    2. 任务调度
    3. 任务缓冲
    4. 任务申请
    5. 拒绝策略
    6. Worker 线程为什么要采用 AQS 实现
    7. Worker 线程初始化
    8. Worker 线程工作
    9. Worker 线程获取任务
    10. Worker 线程销毁
    11. 线程池关闭

    1. 线程池任务执行机制

    作为一个开发初始化线程池通常会使用 Executors 类,然后调用 newFixedThreadPool 或者其他方法来初始化一个线程池,方法如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    

    Executors 中其实最终是初始了 ThreadPoolExecutor 类,上一篇Java 线程池前传已经讲了 ThreadPoolExecutor 线程池的一些关键属性。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    ThreadPoolExecutor 的构造方法中需要指定一些参数,并且这些参数会被线程池的一些属性所使用,这些我们会在后续的剖析线程池中都会提到。

    1.1 任务调度

    任务调度是整个线程池的入口,当客户端提交了一个任务以后便进入整个阶段,整个任务的调度过程由 execute 方法完成,如下:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
    
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            // 1 & 2. addWorker 方法会检查线程池是否是 RUNNING 状态
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 1 & 3
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    

    上述代码的执行过程大致如下:

    1. 首先检查线程池是否是 RUNNING 状态,如果不是 RUNNING 状态,则拒绝任务
    2. 如果工作线程数( workerCount )小于核心线程数( corePoolSize ),直接开启新的线程去执行任务
    3. 如果工作线程数( workerCount )大于等于核心线程数 (corePoolSize),并且阻塞任务队列为满时,将任务放入阻塞队列
    4. 如果工作线程数( workerCount )大于等于核心线程数 (corePoolSize),并且阻塞任务队列已满,但工作线程数小于最大线程数( maximumPoolSize ),则创建并启动一个新的线程来执行
    5. 如果工作线程数( workerCount )大于等于最大线程数( maximumPoolSize ),并且阻塞任务队列已满,则执行具体的 RejectedExecutionHandler 策略。

    其中 workerCountOf(recheck) == 0 这一步也很关键,这一步主要是为了确保线程池中至少有一个线程去执行任务。

    在上述流程中我们提到了阻塞任务队列(用于任务缓冲)、addWorker 方法(任务申请)、以及 reject 方法(任务拒绝策略),下面我们再来分析一下这三个关键点。

    1.2 任务缓冲

    线程池的本质是对线程和任务的管理,为了做到这一点必须要将线程和任务解耦,不再直接关联,通过缓冲队列恰好可以解决这一点。线程池中的缓冲队列类似于生产者消费者模式,客户端线程往缓冲队列里提交任务,线程池中的线程则从缓冲队列中获得任务去执行。

    目前 Java 线程中的默认缓冲队列是阻塞队列模式,主要有以下几种,这些缓冲队列必须要实现 BlockingQueue 接口:

    队列 描述
    ArrayBlockingQueue 使用数组实现的有界阻塞队列,先进先出,支持公平锁和非公平锁
    LinkedBlockingQueue 使用链表实现的有界阻塞队列,先进先出,默认链表长度 Integer.MAX_VALUE
    PriorityBlockingQueue 一个使用数组实现支持线程优先级排序的无界队列,默认自然排序,也可以自定义实现 Comparator 来进行排序
    DelayQueue 一个采用 PriorityBlockingQueue 实现的延迟队列(组合的方式),在创建该对象中,可以指定任务添加至队列后才能被获取
    SynchronousQueue 一个不存储元素的阻塞队列,每一个任务入队操作必须等待一个任务的出队,否则不能添加新的任务
    LinkedTransferQueue 一个使用链表实现的无解阻塞队列,该队列支持将任务立即传递给消费者
    LinkedBlockingDeque 一个由双向链表实现的有界阻塞队列,队头队尾都可以添加任务消费任务

    1.3 任务拒绝

    当工作线程数( workerCount )大于等于最大线程数( maximumPoolSize ),并且阻塞任务队列已满,线程池会执行具体的 RejectedExecutionHandler 策略。目前 Java 默认的拒绝策略主要有以下几种:

    策略 描述
    AbortPolicy 丢弃任务并抛出 RejectedExecutionException 异常
    DiscardPolicy 直接丢弃任务
    DiscardOldestPolicy 丢弃阻塞队列队头的任务,并重新提交被拒绝的任务
    CallerRunsPolicy 直接由调用线程处理被拒绝的任务

    1.4 任务申请

    在工作线程池数未达到最大线程数并且阻塞队列未满时,我们可以将任务提交至线程池(有可能是开启新的线程,也有可能是将任务提交至阻塞队列)等待执行。其中 addWorker 方法便是开启新的线程执行任务。下面我们来看一下 addWorker 方法:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // 线程池如果不是运行状态,线程池根据以下条件来决定是否增加 Work 线程
            // 1. 如果线程池不是 SHUTDOWN 状态,那么不允许在增加任何线程,返回 false
            // 2. 如果线程池是 SHUTDOWN 状态(不允许接受新的任务),如果 firstTask 不为空表明是新的任务,不应该接受,所以返回 false
            // 3. 如果线程池是 SHUTDOWN 状态,并且队列已空,此时也不需要增加线程所以返回 false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                // 获取工作线程的数量
                int wc = workerCountOf(c);
                // 工作线程数与线程池容量比较,超过不允许增加线程
                // core 为 true,工作与核心线程数比较,超过不允许增加线程
                // core 为 false,工作与最大线程数比较,超过不允许增加线程
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS 增加工作线程数,增加成功跳出 retry 循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // CAS 增加工作线程数失败,则重新获取 ctl 的值
                c = ctl.get();  // Re-read ctl
                // 判断线程池状态是否改变,如果线程池状态已经改变,则重新执行 retry 循环,否则执行内部循环,尝试增加线程数
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 根据 firstTask 来创建 Worker 对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 重新获取线程池的状态,防止在 mainLock 的 lock 方法执行之前,线程池状态改变
                    int rs = runStateOf(ctl.get());
    
                    // 只有线程池是运行状态或者线程池是 shutdown 并且任务是来自阻塞队列中( firstTask==null )才可以向线程池中增加线程
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 增加 worker,workers 是一个 HashSet
                        workers.add(w);
                        // 更新 largestPoolSize,largestPoolSize 代表了线程池中曾经出现过的最大线程数
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果 worker 创建或启动失败,修正线程池中的 worker 和 ctl 值
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    上述代码的核心逻辑就是根据线程池当前状态来决定是否开启新的线程来执行任务,线程具体的实现方式是采用一个 Worker 类来进行封装。

    2. Worker 线程管理

    Worker 实现了 Runnable 接口,并继承了 AbstractQueuedSynchronizer ( AQS )。

    不熟悉 AQS 的读者可以戳这里

    2.1 Worker 线程的基本属性

    final Thread thread;
    
    Runnable firstTask;
    
    volatile long completedTasks;
    

    Worker 中存储了真实的线程( Thread )、该线程需要执行的第一个任务( firstTask )以及线程执行的任务数( completedTasks )。

    2.2 Worker 线程为什么要采用 AQS 实现

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 使用 ThreadFactory 创建线程
        this.thread = getThreadFactory().newThread(this);
    }
    
    
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
    

    Worker 线程采用 AQS 实现,使用 AQS 的独占锁功能,通过其 tryAcquire 方法可以看出 Worker 线程是不允许重入的,Worker 线程有以下特点:

    • 通过 lock 方法成功获取锁以后,则表示 Worker 线程正在执行任务
    • 如果正在执行任务,则不应该中断线程
    • 如果该线程现在不是独占锁状态(也就是空闲状态),说明该线程没有任务处理,可以对线程进行中断

    构造方法中为什么要执行 setState(-1)方法 ?

    setState 是 AQS 中的方法,默认值为 0,tryAcquire 方法是根据 state 是否是 0 来判断的,所以将 state 设置为-1 是为了禁止在执行任务前对线程进行中断,不明白的读者可以看一下 AQS 的 acquire(int arg)方法,如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    构造方法中的 getThreadFactory().newThread(this)作用是什么?

    ThreadFactory 是在我们构造 ThreadPoolExecutor 时传入的,通过 ThreadFactory 我们可以设置线程的分组、线程的名字、线程的优先级、以及线程是否是 daemon 线程等相关信息。

    2.3 Worker 线程工作

    public void run() {
        runWorker(this);
    }
    

    Worker 线程获取任务工作是通过调用 ThreadPoolExecutor 中的 runWorker 方法,该方法的参数是 Worker 本身,下面我们看一下 Worker 线程的具体工作原理。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 将 AQS 中的 state 修改为 0
        w.unlock(); // allow interrupts
        // 线程在执行任务中是否异常
        boolean completedAbruptly = true;
        try {
            // 获取立即执行的任务,或者从阻塞队列中获取
            while (task != null || (task = getTask()) != null) {
                // 获取独占锁
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 任务执行前的操作
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 任务执行
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 任务执行后的操作
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    // 释放独占锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    线程工作的大致流程是:

    1. 通过 getTask 方法获取任务执行
    2. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
    3. 通过 task.run()方法执行任务
    4. 如果阻塞队列中没有任务,则跳出循环执行 processWorkerExit 方法
    5. runWorker 方法执行完毕,也代表着 Worker 中的 run 方法执行完毕,销毁线程。

    这里的 beforeExecute 方法和 afterExecute 方法在 ThreadPoolExecutor 类中是空的,留给子类来实现。

    completedAbruptly 变量来表示在执行任务过程中是否出现了异常,在 processWorkerExit 方法中会对该变量的值进行判断。

    此部分代码的流程图如下:

    WX20210417-181912@2x.png

    2.4 Worker 线程获取任务(getTask 方法)

    private Runnable getTask() {
        // timeOut 变量的值表示上次从阻塞队列中取任务时是否超时
        boolean timedOut = false; // Did the last poll() time out?
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            /*
             * 如果线程池状态是非 RUNNING 状态,需要进行以下判断:
             * 1. rs >= STOP,线程池是否正在 stop ;
             * 2. 阻塞队列是否为空。
             * 如果以上条件有一个满足,则将 workerCount 减 1 并返回 null 。
             * 因为如果当前线程池状态的值是 SHUTDOWN 或以上时,不允许再向阻塞队列中添加任务。
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            // timed 变量用于判断是否需要进行超时控制。
            // allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;
            // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
            // 对于超过核心线程数量的这些线程,需要进行超时控制
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            /*
             * 重点
             * wc > maximumPoolSize 的情况是因为可能在此方法执行阶段同时执行了 setMaximumPoolSize 方法;
             * timed && timedOut 如果为 true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
             * 接下来判断,如果有效线程数量大于 1,或者阻塞队列是空的,那么尝试将 workerCount 减 1 ;
             * 如果减 1 失败,则返回重试。
             * 如果 wc == 1 时,也就说明当前线程是线程池中唯一的一个线程了。
             */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                /*
                 * 根据 timed 来判断,如果为 true,则通过阻塞队列的 poll 方法进行超时控制,如果在 keepAliveTime 时间内没有获取到任务,则返回 null ;
                 * 否则通过 take 方法,如果这时队列为空,则 take 方法会阻塞直到队列不为空。
                 * 
                 */
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                // 如果 r == null,说明已经超时,timedOut 设置为 true
                timedOut = true;
            } catch (InterruptedException retry) {
                // 如果获取任务时当前线程发生了中断,则设置 timedOut 为 false 并返回循环重试
                timedOut = false;
            }
        }
    }
    

    这里重要的地方是第二个 if 判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行 execute 方法时,如果当前线程池的线程数量超过了 corePoolSize 且小于 maximumPoolSize,并且 workQueue 已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是 timedOut 为 true 的情况,说明 workQueue 已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于 corePoolSize 数量的线程销毁掉,保持线程数量在 corePoolSize 即可。

    什么时候会销毁? runWorker 方法执行完之后,也就是 Worker 中的 run 方法执行完,由 JVM 自动回收。

    getTask 方法返回 null 时,在 runWorker 方法中会跳出 while 循环,然后会执行 processWorkerExit 方法。

    获取任务的流程图如下:

    WX20210417-190032@2x.png

    2.5 Worker 退出(processWorkerExit 方法)

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果是 Worker 线程正常结束,工作数量-1
        if (completedAbruptly)
            decrementWorkerCount();
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 线程池的任务完成数量增加该 Worker 线程的任务完成数量
            completedTaskCount += w.completedTasks;
            // 从线程池维护的线程中移除该线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        
        // 根据线程池状态进行判断是否结束线程池
        tryTerminate();
    
        int c = ctl.get();
        /*
         * 当线程池是 RUNNING 或 SHUTDOWN 状态时,如果 worker 是异常结束,那么会直接 addWorker ;
         * 如果 allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个 worker ;
         * 如果 allowCoreThreadTimeOut=false,workerCount 不少于 corePoolSize 。
         */
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
    

    3. 线程池关闭(shutdown)

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    

    在 runWorker 方法中,执行任务时对 Worker 对象 w 进行了 lock 操作,为什么要在执行任务的时候对每个工作线程都加锁(lock)呢?

    • 在 getTask 方法中,如果这时线程池的状态是 SHUTDOWN 并且 workQueue 为空,那么就应该返回 null 来结束这个工作线程,而使线程池进入 SHUTDOWN 状态需要调用 shutdown 方法;
    • shutdown 方法会调用 interruptIdleWorkers 来中断空闲的线程,interruptIdleWorkers 持有 mainLock,会遍历 workers 来逐个判断工作线程是否空闲。但 getTask 方法中没有 mainLock ;
    • 在 getTask 中,如果判断当前线程池状态是 RUNNING,并且阻塞队列为空,那么会调用 workQueue.take()进行阻塞;
    • 如果在判断当前线程池状态是 RUNNING 后,这时调用了 shutdown 方法把状态改为了 SHUTDOWN,这时如果不进行中断,那么当前的工作线程在调用了 workQueue.take()后会一直阻塞而不会被销毁,因为在 SHUTDOWN 状态下不允许再有新的任务添加到 workQueue 中,这样一来线程池永远都关闭不了了;

    由上可知,shutdown 方法与 getTask 方法(从队列中获取任务时)存在竞态条件;

    • 解决这一问题就需要用到线程的中断,也就是为什么要用 interruptIdleWorkers 方法。在调用 workQueue.take()时,如果发现当前线程在执行之前或者执行期间是中断状态,则会抛出 InterruptedException,解除阻塞的状态;
    • 但是要中断工作线程,还要判断工作线程是否是空闲的,如果工作线程正在处理任务,就不应该发生中断;

    所以 Worker 继承自 AQS,在工作线程处理任务时会进行 lock,interruptIdleWorkers 在进行中断时会使用 tryLock 来判断该工作线程是否正在处理任务,如果 tryLock 返回 true,说明该工作线程当前未执行任务,这时才可以被中断。

    6 条回复    2021-11-05 16:55:56 +08:00
    Philosophy6
        1
    Philosophy6  
       2021-04-21 11:24:48 +08:00
    是不是要参加面试了
    shysh95
        2
    shysh95  
    OP
       2021-04-21 12:37:29 +08:00
    @Philosophy6 知其然知其所以然(~~手动狗头)
    Philosophy6
        3
    Philosophy6  
       2021-04-21 12:59:16 +08:00
    @shysh95 哈哈没毛病,然后找面试官对线,我最近也在看,感觉记不住东西现在
    shysh95
        4
    shysh95  
    OP
       2021-04-21 13:05:33 +08:00
    @Philosophy6 哈哈,开始总是痛苦的,可以戳一下文章中的两个链接,或许对你有所帮助
    liian2019
        5
    liian2019  
       2021-04-21 14:10:04 +08:00
    之前把线程池源码看了一遍 ,现在也差不多忘得差不多了
    snakejia
        6
    snakejia  
       2021-11-05 16:55:56 +08:00
    学习了~
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2765 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 29ms · UTC 12:06 · PVG 20:06 · LAX 05:06 · JFK 08:06
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.