线程池有点像数据库连接池, 调用连接池的关闭方法, 实际上那个方法被重写了, 会将连接返回连接池.
线程池也是类似, 找到了一篇美团技术团队的文章, 详细解释了线程池.
简单的说, 如果是反复重复的任务需要并行, 由于自行编写并发程序很麻烦, 所以就可以交给线程池来进行操作.
线程池 ThreadPoolExecutor
这个类位于java.util.concurrent中, 包括其父类和子类. Executor类是线程池的基础, 实际使用的ThreadPoolExecutor就是继承这个类, 将Runnable类型的任务提交给线程池就可以被线程池执行.
创建线程池的工厂叫做Executors, 通过工厂类的静态方法可以创建不同类型的线程池, 都是ExecutorService类型的对象, ExecutorService继承自Executor, 常见的有如下:
Executors.newFixedThreadPool(int nThreads)
, 创建一个固定线程数量的线程池Executors.newSingleThreadExecutor()
, 创建一个只有一个线程的线程池, 所有任务会保存在一个队列中依次执行Executors.newCachedThreadPool()
, 返回自行控制的线程池, 也是比较常见的用法Executors.newSingleThreadScheduledExecutor()
, 返回一个ScheduledExecutorService对象, 单线程, 可以指定定时任务Executors.newScheduledThreadPool(int poolSize)
, 返回带有计划功能的, 可指定线程多少的线程池对象.
只要理解了Executors与Executor之间的关系, 一个是工厂, 一个是工厂创建出来的池子, 继承关系就容易理解了.
如果想要微调参数, 则就可以使用ThreadPoolExecutor的构造函数来进行微调. 因为返回的池子其实内部都是使用了ThreadPoolExecutor类.
让线程执行任务只需要调用.submit()方法即可, 方法接受Runnable和Callable对象.
写个简单小例子:
//一个线程类 public class MyThread implements Runnable { @Override public void run() { System.out.println("Start at " + System.currentTimeMillis() + " " + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolTest { //创建一个4个大小的线程池 private static ExecutorService newPool = Executors.newFixedThreadPool(4); //提交10个任务 public static void main(String[] args) { for (int i = 0; i < 10; i++) { newPool.submit(new MyThread()); } newPool.shutdown(); } }
可见线程一次提交4个, 如果改成其他种类, 会发现每次提交的任务数量会有变化.
注意最后的shutdown(), 如果不关闭线程池, 执行线程池的主线程不会退出.
也可以提交Callable对象, 不过目前还都是无法获取结果, 只能操作共享变量的Thread对象, 暂且先这么用着.
设置线程池参数
使用工厂方法, 基本上就是将策略交给了内置的策略. 如果想精确控制线程池的参数, 就可以在ThreadPoolExecutor的构造器中进行设置. 源码中参数最多的重载构造函数如下:
public ThreadPoolExecutor(int corePoolSize,//线程池中的线程数量 int maximumPoolSize,//线程池中的最大线程数量 long keepAliveTime,//线程数量超过corePoolSize之后, 线程的最大活动时间 TimeUnit unit,//上一个参数的单位 BlockingQueue<Runnable> workQueue,//阻塞任务队列, 所谓阻塞队列, 队列为空时候读取会阻塞, 队列满的时候添加任务会阻塞 ThreadFactory threadFactory, // 自定义的创建线程的工厂 RejectedExecutionHandler handler // 拒绝策略)
这其中BlockingQueue可以自行控制, 决定传入什么样子的队列, 比如直接提交队列, 有界任务队列, 无界任务队列, 优先任务队列等等, 都需要实现BlockingQueue接口.
ThreadFactory是如何创建线程工厂, 如果需要自定义每次线程如何创建, 或者在创建线程之前做一些事情, 就可以传入一个自定义的ThreadFactory对象.
拒绝策略是指当一些情况出现的时候, 要如何分配任务(拒绝任务也是任务分配的一种). JDK内置有四种策略, 都实现了RejectedExecutionHandler接口, 详细如下:
AbortPolicy
, 直接抛出异常, 也是默认的方式CallerRunsPolicy
, 只要线程池没有关闭, 直接在调用者线程里运行任务DiscardOldestPolicy
, 丢弃最老的一个请求, 也就是即将被执行的任务, 然后再次提交新任务DiscardPolicy
, 丢弃新任务, 不做任何提示.
四种情况要根据实际情况进行判断. 也可以自定义RejectedExecutionHandler对象.
扩展ThreadPoolExecutor
ThreadPoolExecutor中有三个方法可以自定义:
beforeExecute()
afterExecute()
terminated()
前两个方法会在每个线程被执行的时候, 在线程开始和结束的时候调用, 最后一个方法, 则会在线程池结束的时候被调用.
尝试一下上边一节和这次的各种方法来自定义一些东西玩玩:
import java.util.concurrent.*; public class CustomizeThreadPoolTest { public static void main(String[] args) { ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() { }, new ThreadFactory() { //自定义ThreadFactory @Override public Thread newThread(Runnable r) { System.out.println("创建一个新线程"); Thread t = new Thread(r); System.out.println("创建新线程: " + t.getName() + " 成功"); return t; } }, new ThreadPoolExecutor.AbortPolicy()){ //自定义ThreadPoolExecutor的三个生命周期方法 @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("准备执行 "+ r); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("执行完毕 "+ r); } @Override protected void terminated() { System.out.println("线程池运行结束."); } }; for (int i = 0; i < 10; i++) { pool.submit(new MyThread()); } pool.shutdown(); } }
运行结果如下:
创建一个新线程 创建新线程: Thread-0 成功 创建一个新线程 创建新线程: Thread-1 成功 创建一个新线程 创建新线程: Thread-2 成功 创建一个新线程 创建新线程: Thread-3 成功 创建一个新线程 创建新线程: Thread-4 成功 准备执行 java.util.concurrent.FutureTask@66910c95[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@3b37d803[Wrapped task = multithread.ThreadPool.MyThread@44e1d57b]] 准备执行 java.util.concurrent.FutureTask@73bc7a9d[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@38f343ca[Wrapped task = multithread.ThreadPool.MyThread@37876d91]] 准备执行 java.util.concurrent.FutureTask@7cfb39a5[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@4a949d85[Wrapped task = multithread.ThreadPool.MyThread@64bb913d]] 准备执行 java.util.concurrent.FutureTask@7eaee205[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@31dd3324[Wrapped task = multithread.ThreadPool.MyThread@4395df41]] 准备执行 java.util.concurrent.FutureTask@4e8eae8a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@5825056d[Wrapped task = multithread.ThreadPool.MyThread@6458e989]] Start at 1593584674017 Thread-1 Start at 1593584674017 Thread-2 Start at 1593584674016 Thread-0 Start at 1593584674017 Thread-3 Start at 1593584674017 Thread-4 执行完毕 java.util.concurrent.FutureTask@7cfb39a5[Completed normally] 准备执行 java.util.concurrent.FutureTask@524b5414[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@b0f7684[Wrapped task = multithread.ThreadPool.MyThread@dfad302]] Start at 1593584675035 Thread-1 执行完毕 java.util.concurrent.FutureTask@66910c95[Completed normally] 执行完毕 java.util.concurrent.FutureTask@73bc7a9d[Completed normally] 准备执行 java.util.concurrent.FutureTask@3ce1e52a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@4a606b02[Wrapped task = multithread.ThreadPool.MyThread@2065a844]] Start at 1593584675036 Thread-0 准备执行 java.util.concurrent.FutureTask@e5f4084[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@76121ff2[Wrapped task = multithread.ThreadPool.MyThread@70aa8d0e]] Start at 1593584675036 Thread-2 执行完毕 java.util.concurrent.FutureTask@4e8eae8a[Completed normally] 准备执行 java.util.concurrent.FutureTask@1830ccdb[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@296a1f26[Wrapped task = multithread.ThreadPool.MyThread@5ffa45c8]] 执行完毕 java.util.concurrent.FutureTask@7eaee205[Completed normally] Start at 1593584675037 Thread-4 准备执行 java.util.concurrent.FutureTask@f3d475a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@44438c8f[Wrapped task = multithread.ThreadPool.MyThread@4f4db265]] Start at 1593584675037 Thread-3 执行完毕 java.util.concurrent.FutureTask@524b5414[Completed normally] 执行完毕 java.util.concurrent.FutureTask@e5f4084[Completed normally] 执行完毕 java.util.concurrent.FutureTask@3ce1e52a[Completed normally] 执行完毕 java.util.concurrent.FutureTask@f3d475a[Completed normally] 执行完毕 java.util.concurrent.FutureTask@1830ccdb[Completed normally] 线程池运行结束.
可以看到, 虽然有10个任务, 但是只创建了5个线程, 然后在每次执行任务之前, 都会打印出相关信息. 最后在线程池结束的时候, terminated()被运行.
线程池的一些相关方法
这个关系到ThreadPoolExecutor的运行状态, 一共有5种:
RUNNING
, 能够接受新任务, 也在处理新任务SHUTDOWN
, 不再接受新任务, 但会把遗留任务执行完毕STOP
, 不接受新任务也不再处理尚未处理的任务, 同时中断正在处理的任务TIDYING
, 所有任务都终止, 逐个干掉线程, 线程数量为0, 实际上就是清理阶段TERMINATED
, 调用terminated()方法之后进入该状态, 其实就是彻底结束
所以要注意的是如下方法:
shutdown()
, 最常用的, 完成工作提交之后调用该方法, 转入SHUTDOWN状态, 线程池会在处理完毕之后结束shutdownNow()
, 直接进入STOP状态, 相当于突然中止线程池的工作isShutdown()
, 判断是不是在SHUTDOWN状态isTerminated()
, 判断是不是TERMINATEDexecute(Runnable command)
, 这个也是提交任务的方法, 但是不返回任何值submit(Runnable task)
, 这个也是提交任务的方法, 返回Future对象, 这个就后边再看了.
SHUTDOWN和STOP之后, 在线程数量为0后, 都会进入TIDYING然后是TERMINATED阶段.
好了, 现在可以把线程池加入到自己的工具中来了.