车驰夜幕|ThreadPoolExecutor线程池实现原理+源码解析( 五 )

可以看到addWorker方法前一部分 , 用了外层自旋判断线程池的状态 , 内层自旋 + CAS给线程池中的线程数加1 。 后半部分用了ReentrantLock保证创建Worker对象 , 以及启动线程的线程安全 。 一个方法中三次获取了线程池的状态(不包含该方法调用的其他方法) , 因为每两次之间 , 线程池的状态都有可能被改变 。
runWorker前文在介绍Worker内部类时说过 , Worker会把自己传递给ThreadFactory创建的线程执行 , 最终执行Worker的run方法 , 而Worker类的run方法只有一行代码:
runWorker(this);所以接下来看看runWorker方法是如何实现了
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;// 允许外部中断w.unlock(); // allow interrupts// 记录worker是不是异常退出的boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {// 自旋 , 如果task不为空 , 或者能从缓冲队列(阻塞队列)中获取任务就继续执行 , 不能就一直阻塞// 加锁w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted()try {// 钩子函数 , 处理task执行前的逻辑beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 钩子函数 , 处理task执行后的逻辑afterExecute(task, thrown);}} finally {task = null;// 完成的任务数加1w.completedTasks++;// 解锁w.unlock();}}// 运行到这里 , 说明worker没有异常退出completedAbruptly = false;} finally {// 自旋操作被打断了 , 说明线程需要被回收processWorkerExit(w, completedAbruptly);}}第10行代码中 , task为null时 , 会通过getTask()方法从缓冲队列中取任务 , 因为缓冲队列是阻塞队列 , 所以如果获取不到任务会一直被阻塞 , 接下来看看getTask方法的内部实现
getTaskgetTask用于阻塞式的从缓冲队列中获取任务 。
private Runnable getTask() { // 线程是否超时boolean timedOut = false; // Did the last poll() time out?for (;;) {// 自旋// 获取线程池状态int c = ctl.get();int rs = runStateOf(c);if (rs >= SHUTDOWNreturn null;}int wc = workerCountOf(c);// 根据allowCoreThreadTimeOut参数来判断 , 要不要给核心线程设置等待超时时间boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timedcontinue;}try {// 从阻塞队列中获取任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)// 成功获取任务return r;// 没有获取到任务 , 超时timedOut = true;} catch (InterruptedException retry) {// 线程被中断 , 重试timedOut = false;}}}理解该方法的前提 , 是要理解阻塞队列提供的阻塞式API 。这个方法重点关注两点:

  • 从缓冲队列取任务时 , poll非阻塞 , take阻塞 , 调用哪个由当前线程需不需要被回收来决定
  • 该方法返回null之后 , 上层方法会回收当前线程
除了这几个核心方法之外 , 往线程池提交任务还有一个方法叫submit
public Future submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture