创建一个ThreadPoolExecutor是很简单的。你只需调用CustomThreadPoolExecutor类构造器并传输恰当的配置参数。以下代码片段是通过定义核心线程数和线程最大数的相同值来创建一个固定尺寸的线程池:
private ThreadPoolExecutor executor;
public OrderProcessorMain() { // create a thread pool with fixed number of threads executor = new CustomThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); .. } |
0L值确保工作线程不因休止状态而中断。最后一个参数是拦截列队对象,为了在任务执行之前控制任务。这个列队只控制execute 方法提交的Runnable任务。
或者,可以使用java.uti.concurrent.Executors 功用类创建一个固定的线程池。
ExecutorService executorService = Executors.newFixedThreadPool(2);
这个方法的执行恢复了带有一个固定核心和最大线程池的ThreadPoolExecutor对象。
创建并提交任务
创建ThreadPoolExecutor之后,你需要创建任务来处理订单。你通过使用在前一节创建的 OrderProcessorCallable 类来创建这些任务。OrderProcessorCallable构造器采用一个任务名称和一个订单列队对象来检索OrderVO对象。
// create Callable task
OrderProcessorCallable callable1 = new OrderProcessorCallable( "OrderProcessor_1", orderVOQueue); |
记住OrderProcessorCallable类的call方法不会返回直到running变量是正确的或者代码抛出一个异常。
下一步是给callable对象储存一个reference。这个让你能够调用setRunning方法来更新running变量值并且使 call方法平静的返回。这个技术的一个优点就是你可以调用其他方法来得到对象状态信息,例如,在一定的时间点已经处理的订单数量。
// store reference to callable object in collection
callableMap.put(callable1.getThreadName(), callable1); |
以上代码非常有用因为通过ExecutorService.submit方法返回的Future对象不能用于得到Callable对象的reference或是调用Callable对象的任何方法。
为了执行OrderProcessorCallable任务,你调用submit方法并传输一个任务参考。这个方法返回一个type Future对象,你可以用于以下的目的:
检查任务状态
得到一个通过Callable.call()方法返回的结果对象
取消任务
// submit callable tasks
Future future; future = executor.submit(callable1); futurList.add(future); |
Future对象也储存在另一个集合中用来检查任务状态并检索处理结果。如果你不想在一个集合中明确的储存Future对象,你可以使用ExecutorCompletionService 功用类。它为从内部任务列队中检索和删除已完成的任务提供了一个有用的方法。
保持任务进度跟踪
为了检查被一个任务处理的订单数量,你可以使用在集合中储存的OrderProcessorCallable对象参考。以下的代码片段是每间隔1000ms就打印出任务状态直到orderVOQueue清空为止:
private void printProcessorStatus() throws InterruptedException {
// print processor status until all orders are processed while (!orderVOQueue.isEmpty()) { for (Map.Entry e : callableMap .entrySet()) { Logger.log(e.getKey() + " processed order count: " + e.getValue().getProcessedCount()); } Thread.sleep(1000); } } |
关闭ThreadPoolExecutor和任务
ExecutorService.shutdown()可用于关闭执行器。当你调用shutdown()的时候,执行器启动一个有序关闭所有以前提交的任务,而且你可能不会在提交新任务。以下的代码调用shutdown()方法以避免新任务被提交。此后,它更新orderCallable的运行状态false, 这将导致call方法返回。
// shutdown() method will mark the thread pool shutdown to true
executor.shutdown(); // mark order processor callable to return for (Map.Entry orderProcessor : callableMap.entrySet()) { orderProcessor.getValue().setRunning(false); } |
迫使ThreadPoolExecutor和任务关闭
你可以调用ExecutorService.shutdownNow()方法来迫使执行器去关闭。同样,在调用这个方法之后,你不在提交新任务。它停止正在等待的任务处理,并通过发出一个中断试图停止正在运行的任务。ExecutorService.shutdownNow()也恢复正在等待被执行的任务列表。
List notExecutedTasks = executor.shutdownNow();
你可以通过调用Future.cancel(boolean mayInterruptIfRunning)方法取消个别任务。True值是指为一个已经执行的任务所发出的一个中断。如果任务没有开始,它不会运行。你可以通过使用Future.isCancelled()检查一个任务取消状态,其中,恢复true,如果任务在它正常完成之前被取消。
异常处理和任务结果
你可以使用Future.get()或Future.get(long timeout, TimeUnit unit)方法来检索任务结果。no-argument方法会阻碍其他的方法直到任务通过正常的执行,取消或是抛出的一个异常而完成。要谨慎的使用这个方法,因为它会无限期的等待一个任务的完成。带有timeout参数的Future.get()方法更有用,因为它会让一个方法在特定时间内返回:
for (Future f : futurList) {
try { Integer result = f.get(1000, TimeUnit.MILLISECONDS); Logger.log(f + " result. Processed orders " + result); } catch (InterruptedException e) { Logger.error(e.getMessage(), e); } catch (ExecutionException e) { Logger.error(e.getCause().getMessage(), e); } catch (TimeoutException e) { Logger.error(e.getMessage(), e); } catch (CancellationException e) { Logger.error(e.getMessage(), e); } // to avoid printing completed tasks, you may want to remove // the completed task from futureList here } |
Future.get()方法抛出不同的异常来为每一个任务的失败给出一个确切的原因:
InterruptedException被抛出——如果一个线程在等待计算结果的时候被中断。
TimeOutException被抛出——如果一个结果没有在特定的时间内被检索。
CancellationException被抛出——如果计算被取消。
ExecutionException被抛出——如果一个任务计算失败通过抛出任何异常,包括运行异常。这确保了执行线程不会因在任务执行中的一个异常而被终止。这个行为的副作用就是在你想要产生一个任务运行异常的应用程序方案中,你需要从ExecutionException中检索运行异常并再次将它抛出。
总结
OrderProcessorMain类(参见 Listing 3)使用了在先前章节中讨论的所有内容。特别是,main()方法执行下列步骤:
创建并配置CustomThreadPoolExecutor对象。
创建并提交OrderProcessorCallable任务。
将OrderVO对象放入order-blocking列队。
定期打印处理器状态直到order-blocking列队为空。
关闭线程池处理器。
任务完成后打印任务结果。
要查看示正在运行的示例应用程序,只要运行OrderProcessorMain.main(String[] args)方法。为了方便起见,提供给本文的源代码下载(source code download)可以作为一个Eclipse项目输入。
from:http://ajava.org/course/java/4961.html