|Java异步任务优化CompletionService


|Java异步任务优化CompletionService
文章图片
|Java异步任务优化CompletionService
文章图片
上一篇详细介绍了Future与Callable , 而CompletionService则对它们的优化 。
Future的缺陷Future通过get方法来获取异步任务的结果 , 如果任务还没有完成则阻塞线程 , 因为我们需要它的结果 , 所以等待是应该的 。
如果需要处理一批这类任务 , 提交到线程池我们最终会得到多个Future , 但是每个任务执行的时间可能并不相同 , 那么我们应该优先调用哪个Future的get呢?示例如下图:
可以看到有些任务因为执行时间较长 , 而在他后面的任务可能执行任务的时间较短 , 已经提前完成了 , 但是并不能得到及时的处理 , 只能等到前面的任务执行完成后才能处理 , 这样设计是非常不合理的 。
CompletionService解决线程Java中提供了CompletionService来解决这个问题的 , 同样可以把一批任务提交到CompletionService , CompletionService可以把先执行完成的任务通过take方法获取到 。 使用方法如下图:
ExecutorCompletionService是CompletionService的具体实现 , 我们把线程池设置给ExecutorCompletionService后 , 也可以通过submit提交任务到ExecutorCompletionService , 最后通过take方法获取到已经完成的Future 。
从上面例子可以看到因为id等于3的任务先执行完成 , 所以也先处理了这个任务 。 最先提交的任务id等于0的因为休眠时间较长 , 所以先完成的任务就可以先执行处理 。
ExecutorCompletionService源码从上面的例子可以看到使用ExecutorCompletionService有三个关键步骤:设置一个线程池、submit提交任务、take获取完成的Future 。
看源码首先看他的属性 , 查看源码得到他有两个关键属性:
Executor executor:执行线程的线程池;
BlockingQueue<Future<V>> completionQueue:阻塞队列 , 保存完成的Future;
看到这两个属性就大概能够猜到它的实现方案:利用线程池去执行任务 , 利用阻塞队列来保存完成的Future 。
在submit方法肯定调用的是线程池的submit方法这个很简单 , 唯一的问题是如何把完成的Future放到阻塞队列中的?
上一篇文章中我们知道最终实现有返回的异步任务的类是FutureTask , 最终运行的是FutureTask的run方法 , run方法完成后会调用finishCompletion方法去唤醒那些因为调用FutureTask.get()方法而阻塞的线程 , 在finishCompletion的最后调用了done()方法 , 只不过在FutureTask中done()方法是一个空方法 。
而ExecutorCompletionService就是利用FutureTask的done()实现了把任务放到阻塞队列中 。
ExecutorCompletionService有一个私有的内部类QueueingFuture , 它继承FutureTask , 并实现了done()方法 , done()方法把任务放到了ExecutorCompletionService的阻塞队列中(所以QueueingFuture是私有类) 。 done()方法只有在任务执行完成后才会调用 。
图解ExecutorCompletionService功能通过以上分析基本上弄懂了ExecutorCompletionService的执行流程 , 分析如下图:
可以看到整个源码实际上就只有三步:
submit方法把任务封装FutureTask并作为QueueingFuture的属性保存 , 然后提交QueueingFuture到线程池;
线程池执行任务调用的是QueueingFuture的run方法 , run方法最后调用done方法把属性FutureTask(此时任务已经完成)添加到阻塞队列;
take方法从阻塞队列中获取FutureTask;