只有当任务都是同类型的并且是相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成“阻塞”。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成“拥塞”。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成死锁。幸运的是,有基于网络的典型服务器应用程序中——网页服务器、邮件服务器以及文件服务器等,他们请求通常都是同类型的并且相互独立的
在线程池中,如果任务依赖于其他任务,那么可能产生死锁。只要线程池中的任务需要无限期地等待一些必须由池中其他任务才能提供的资源或条件,例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁——>就算不出现死锁,任务阻塞时间多长,也会让线程池的响应性变得糟糕。有一项技术可以缓解执行时间较长任务造成的影响,即限定任务等待资源的时间,而不要无限制地等待(在平台类库中的大多数可阻塞的方法中,都同时定义了限时版本和无限时版本,例如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等
线程池大小的设置
在代码中通常不会固定线程池的大小,而应该通过某种配置机制来提供,或者根据Runtime.availableProcessors来动态计算
线程池过大、过小两种极端会出现:
线程池过大:造成大量线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源
线程池过小:导致许多空闲的处理器无法执行工作,从而降低吞吐率
设置技巧:
1、对于计算密集型的任务,在拥有N个处理器的系统上,当线程池的大小为N+1时,通常能实现最优的利用率。
2、对于包含I/O操作或者其他阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。
N = number of CPUs
U = target CPU utilization, 0 <= U <= 1
W/C = ratio of wait time to compute time
要使处理器达到期望的使用率,线程池的最优大小等于:
T = N * U * (1+w/c)
可以通过Runtime来获得CPU的数目:
int N = Runtime.getRuntime().availableProcessors();
当然,cpu周期并不是唯一影响线程池大小的资源,还包括内存、文件句柄、套接字句柄和数据库连接等方面的约束。
线程池的创建:(三种方式)
import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * 线程池: * * 在线程池的模式下,任务是提交给整个线程池,而不是直接交给某个线程,线程池在拿到任务后,它就在内 * 部找有无空闲的线程,再把任务交给内部某个空闲的线程,这就是封装,记住:任务是提交给整个线程池, * 一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。 */ public class ThreadPoolTest_2 { public static void main(String[] args) { // testNewThreadPool_1(); // testNewThreadPool_2(); // testNewThreadPool_3(); testNewThreadPool_4(); } /** 1、创建固定大小的线程池: */ public static void testNewThreadPool_1() { ExecutorService threadPool = Executors.newFixedThreadPool(3); runTask(threadPool); } /** * 2、创建缓存线程池:线程内部个数不定,该线程池会根据任务的数量动态的增减线程数量, * 如果任务对象个数多于线程数,则会自动增加与任务个数相同的线程数;如果任务个数少于 * 线程数,则线程池会自动回收空闲的线程: */ public static void testNewThreadPool_2() { ExecutorService threadPool = Executors.newCachedThreadPool(); runTask(threadPool); } /** * 3、创建单一线程池:这种方式会保证线程池中始终都会有一个线程在运行,就算该线程消亡, * 线程池也会自动创建一个线程(如何实现线程死后重新启动): */ public static void testNewThreadPool_3() { ExecutorService threadPool = Executors.newSingleThreadExecutor(); runTask(threadPool); // 关闭线程池的两种方式: // threadPool.shutdown(); List<Runnable> threadList = threadPool.shutdownNow(); for(Runnable r : threadList){ sop(r.getClass()); } // shutdown与shutdownNow的区别: // 在终止前允许执行以前提交的任务: threadPool.shutdown(); // 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表: } /** 4、创建延迟执行任务或定期执行任务的线程池 */ public static void testNewThreadPool_4(){ ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(3); threadPool.execute(new Runnable() { public void run() { System.out.println("execute"); } }); // 在1000毫秒之后,执行一次任务 threadPool.schedule(new Runnable() { public void run() { System.out.println("schedule"); }; }, 1000,TimeUnit.MILLISECONDS); // 在2秒之后,以固定的频率执行任务 threadPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { sop("scheduleAtFixedRate"); } }, 2, 2, TimeUnit.SECONDS); } private static void runTask(ExecutorService threadPool) { // 循环10次,即添加10个任务对象: for (int i = 1; i <= 10; i++) { final int task = i; // 往线程池中添加一个任务:一个Runnable对象就是一个任务 threadPool.execute(new Runnable() { public void run() { for (int j = 1; j <= 10; j++) { sop("任务<" + task + ">正在被线程<" + Thread.currentThread().getName() + ">执行"); } } }); } // end of loop } private static void sop(Object obj) { System.out.println(obj); } }
Executors.newFixedThreadPool(3)
Executors.newSingleThreadPool()
Executors.newCachedThreadPool():将线程池的最大大小设置为Integer.MAX_VALUE,而将基本大小设置为零,并将超时设置为1分钟,这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩
Executors.newScheduledThreadPool()
以上四种方式都是返回了new ThreadPoolExecutor(...)
而ThreadPoolExecutor都调用了 Executors.defaultThreadFactory()
ThreadPoolExecutor为一些Executor提供了基本的实现,这些Executor是由Executors中的newFixedThreadPool、newCachedThreadPool、newScheduledThreadPool、newSingleThreadPool这些工厂方法返回的。如下newFixedThreadPool的源码
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
下面是ThreadPoolExecutor的通用构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
其中:
corePoolSize:线程池的基本大小也就是线程池的目标大小,即在没有任务执行时线程池的大小( 线程池维护线程的最少数量)
maximunPoolSize:线程池最大大小,表示可同时活动的线程数量的上限
keepAliveTime:存活时间,如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的
timeUnit: 线程池维护线程所允许的空闲时间的单位
workQueue:线程池所使用的缓冲队列
threadFactory:线程工厂(可以自定义见博文“自定义线程工厂”)
handler:线程池对拒绝任务的处理策略(饱和策略)
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是说,当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor的饱和策略可以通过调用setRejectedExecutionHandler来修改。JDK提供了几种不同的RejectedExecutorHandler实现,每种实现都包含有不同的饱和策略:
AbortPolicy:中止(abort)策略是默认的饱和策略,该策略将抛出未检查的RejectedExecutorException,调用者可以捕获这个异常,然后根据需求编写自己的处理代码
CallerRunsRolicy:调用者运行策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
DiscardPolicy:抛弃(discard)策略会悄悄抛弃该任务
DiscardOldestPolicy:抛弃最旧的策略会抛弃下一个将被执行的任务,然后尝试重新提交新的任务(如果工作队列是一个优先队列,那么“抛弃最旧的”策略将导致抛弃优先级最高的任务,因此最好不要将“抛弃最旧”饱和策略和优先级队列放在一起使用)
下面是对各种饱和策略的测试:
import java.io.Serializable; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TestThreadPool { private static int produceTaskSleepTime = 2; private static int consumeTaskSleepTime = 2000; private static int produceTaskMaxNumber = 9; public static void main(String[] args) { // 构造一个线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), new ThreadPoolExecutor.DiscardOldestPolicy()); for (int i = 1; i <= produceTaskMaxNumber; i++) { try { // 产生一个任务,并将其加入到线程池 String task = "task@ " + i; System.out.println("put " + task); threadPool.execute(new ThreadPoolTask(task)); // 便于观察,等待一段时间 Thread.sleep(produceTaskSleepTime); } catch (Exception e) { e.printStackTrace(); } } } public static class ThreadPoolTask implements Runnable, Serializable { private static final long serialVersionUID = 0; // 保存任务所需要的数据 private Object threadPoolTaskData; ThreadPoolTask(Object tasks) { this.threadPoolTaskData = tasks; } public void run() { // 处理一个任务,这里的处理方式太简单了,仅仅是一个打印语句 System.out.println("start .." + threadPoolTaskData); try { // // 便于观察,等待一段时间 Thread.sleep(consumeTaskSleepTime); } catch (Exception e) { e.printStackTrace(); } threadPoolTaskData = null; } public Object getTask() { return this.threadPoolTaskData; } } }
上面代码定义了一个 corePoolSize 为 2 , maximumPoolSize 为 3 , workerQuene 容量为 3 的线程池,也就是说在饱和状态下,只能容纳 5 个线程, 3 个是运行状态, 2 个在队列里面。同时代码又往线程池里面添加了 9 个线程,每个线程会运行 2 秒,这样必然会到达饱和状态。而饱和状态就涉及到对拒绝任务的处理策略。
结果是(DiscardOldestPolicy策略):
put task@ 1 start ..task@ 1 put task@ 2 start ..task@ 2 put task@ 3 put task@ 4 put task@ 5 start ..task@ 5 put task@ 6 put task@ 7 put task@ 8 put task@ 9 start ..task@ 8 start ..task@ 9
如果代码改成DiscardPolicy策略,结果是:
put task@ 1 start ..task@ 1 put task@ 2 start ..task@ 2 put task@ 3 put task@ 4 put task@ 5 start ..task@ 5 put task@ 6 put task@ 7 put task@ 8 put task@ 9 start ..task@ 3 start ..task@ 4