JUC-线程池-1-创建及使用

上文中对线程池做了一个基本的了解,说了这么多,如何去创建一个线程池呢? JUC 包里面给我们提供了一个工具类就是 java.util.concurrent.Executors ,通过这个类我们就可以创建几种不同类型的线程池,来看一下这个类中常用的几个方法

固定数量线程池

通过 Executors.newFixedThreadPool() 这个方法创建,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public class ThreadPoolTest {
public static void main(String[] args) {

ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);

for (int i = 0; i < 1000 ; i++) {
newFixedThreadPool.submit(new TestTask());
}

log.info("done...");

}

static class TestTask implements Runnable{
@Override
public void run() {
log.info("test");
}
}
}

这里就是创建了固定数量为 5 的线程池,然后执行这段代码看一下输出,日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
22:31:17.806 [pool-1-thread-5] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.806 [pool-1-thread-3] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.806 [pool-1-thread-2] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.806 [pool-1-thread-4] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.806 [pool-1-thread-1] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.806 [main] INFO com.learning.java.threadpool.ThreadPoolTest - done...
22:31:17.817 [pool-1-thread-5] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.817 [pool-1-thread-3] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.817 [pool-1-thread-2] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.817 [pool-1-thread-4] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.817 [pool-1-thread-1] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.818 [pool-1-thread-5] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.818 [pool-1-thread-3] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.818 [pool-1-thread-2] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.818 [pool-1-thread-4] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:31:17.818 [pool-1-thread-1] INFO com.learning.java.threadpool.ThreadPoolTest - test

这里可以看到,执行1000个任务,执行的线程都是 pool-1-thread-1 ~ pool-1-thread-5, 就这5个线程,点进去看一下 newFixedThreadPool() 这个方法的源码

image

可以看到,本质上还是调用 ThreadPoolExecutor 的构造,然后设置 corePoolSizemaximumPoolSize 都是传进来的参数,然后 keepAliveTime 回收时间是 0 ,因为都没有额外线程,所以这个参数设置多少都没有意义, 最后任务队列是一个无界队列,所以这个线程池可以一直往里面放任务,当任务数量太多的时候,就可能会导致 OOM

单线程池

单线程池,字面意思,就是只有一个线程,通过 Executors.newSingleThreadExecutor() 创建,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public class ThreadPoolTest {
public static void main(String[] args) {

ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();

for (int i = 0; i < 1000 ; i++) {
newSingleThreadExecutor.submit(new TestTask());
}

log.info("done...");

}

static class TestTask implements Runnable{
@Override
public void run() {
log.info("test");
}
}
}

同样执行 1000 个任务,看一下返回值

1
2
3
4
5
6
7
8
9
10
22:47:45.710 [pool-1-thread-1] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:47:45.710 [main] INFO com.learning.java.threadpool.ThreadPoolTest - done...
22:47:45.720 [pool-1-thread-1] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:47:45.720 [pool-1-thread-1] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:47:45.720 [pool-1-thread-1] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:47:45.720 [pool-1-thread-1] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:47:45.720 [pool-1-thread-1] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:47:45.720 [pool-1-thread-1] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:47:45.721 [pool-1-thread-1] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:47:45.721 [pool-1-thread-1] INFO com.learning.java.threadpool.ThreadPoolTest - test

这里可以看到,不管多少任务,都是一个线程去执行的.

点进去 newSingleThreadExecutor() 方法看一下源码

image

通过源码可以看到,这里 corePoolSizemaximumPoolSize 都是 1 ,表示这个线程池最大能创建一个线程,然后任务队列是一个无界队列, 也是能一直接受任务,但是有可能造成OOM

可缓存线程池

通过 Executors.newCachedThreadPool() 方法创建

测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public class ThreadPoolTest {
public static void main(String[] args) {

ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

for (int i = 0; i < 1000 ; i++) {
newCachedThreadPool.submit(new TestTask());
}

log.info("done...");

}

static class TestTask implements Runnable{
@Override
public void run() {
log.info("test");
}
}
}

控制台部分打印如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22:54:35.874 [pool-1-thread-95] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.874 [main] INFO com.learning.java.threadpool.ThreadPoolTest - done...
22:54:35.891 [pool-1-thread-315] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.921 [pool-1-thread-105] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.924 [pool-1-thread-109] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.924 [pool-1-thread-113] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.925 [pool-1-thread-112] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.927 [pool-1-thread-116] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.935 [pool-1-thread-117] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.942 [pool-1-thread-120] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.942 [pool-1-thread-121] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.944 [pool-1-thread-124] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.945 [pool-1-thread-125] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.946 [pool-1-thread-128] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.948 [pool-1-thread-129] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.949 [pool-1-thread-132] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.950 [pool-1-thread-136] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.950 [pool-1-thread-133] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.951 [pool-1-thread-137] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.951 [pool-1-thread-140] INFO com.learning.java.threadpool.ThreadPoolTest - test
22:54:35.952 [pool-1-thread-141] INFO com.learning.java.threadpool.ThreadPoolTest - test

这个线程池的特点是会一直创建新的线程去处理任务,但是会自动回收多余的线程,同样看一下 newCachedThreadPool() 方法的源码

image

这里的 maximumPoolSize 被设置为 Integer.MAX_VALUE 也就是说这个线程池可能会创建数量非常多的线程,然后队列是一个 SynchronousQueue 直接交接的,因为能一直创建线程,所以这个放其他队列也没有意义,所以说,这个线程池也可能导致 OOM

定期及周期性任务线程池

通过 Executors.newScheduledThreadPool() 方法创建, 这个线程池是支持延迟执行,和周期性执行任务的,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
public class ThreadPoolTest {
public static void main(String[] args) {

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);

// 延迟5秒执行
scheduledExecutorService.schedule(new TestTask(), 5, TimeUnit.MINUTES);

// 周期执行 先延迟 1 秒,然后每隔3秒执行一次
scheduledExecutorService.scheduleAtFixedRate(new TestTask(), 1, 3, TimeUnit.SECONDS);

log.info("done...");

}
}

以上就是一些常用的线程池

线程池的正确创建姿势

在阿里巴巴开发规范中,不能使用 Executors 去创建线程池, 在实际开发环境中应该结合自己的业务设置线程池的参数,比如实际机器的内存多大,能否接受任务被拒绝等,还有线程的名字等等

那么问题来了,我怎么知道我应该设置多少线程数?

  • CPU密集型(加密、计算hash等)最佳线程数为CPU核心数的1-2倍左右
  • 耗时IO型(读写数据库、文件、网络读写等)最佳线程数一般会大于cpu核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法:线程数=CPU核心数* ( 1+平均等待时间/平均工作时间)

线程池的关闭

介绍完了线程池的创建之后,还要了一下关闭

关闭线程池可以使用线程池的 shutDown() 方法,这个方法会优雅的关闭线程池, 就是说调用了 shutDown() 方法之后,线程池不会立即关闭,而是先把当前线程池里面有的任务都执行完,然后再关,当然这个方法调用之后就不会去接受新的任务了

查看是否调用了 shutDown() 方法可以调用 isShutDown() 方法,调用 shutDown() 方法之后再调用 isShutDown() 会返回 true

如果说想要看看线程实际有没有关闭,也就是有没有执行完所有的任务后关闭,可以调用 isTerminated() 方法查看

还有一个方法是 awaitTermination(timeout, timeUnit) ,调用了这个方法之后会阻塞掉,然后在传入的时间内如果线程池执行完毕后关闭了会返回 true, 如果没有执行完成会返回 false ,这个方法只是用于检测,并不会关闭

最后还有一个方法就是 shutDownNow() 字面意思,就是立即关闭,不会等线程执行完成,但是这个方法会把所有没有执行完的任务返回回来, 返回值是个list

拒绝策略

最开始介绍线程池的时候就提到了线程池的拒绝策略,拒绝策略触发的时机有两种

  1. 当线程池是关闭状态的时候,提交任务会被拒绝
  2. 当线程池的最大线程和任务队列饱和的时候会拒绝

对于线程池,有以下 4 种拒绝策略:

  • AbortPolicy 直接抛出异常
  • DiscardPolicy 直接丢弃
  • DiscardOldestPolicy 丢弃队列种最老的任务
  • CallerRunsPolicy 谁提交的任务谁去执行,比如主线程往线程池里提交任务,被拒绝之后,任务就会由主线程自己执行

线程池的状态

线程有状态同样的线程池也有状态,一共有以下几种:

  • RUNNING: 接受任务并处理排队任务
  • SHUTDOWN: 不接受新任务,但处理排队任务,就是调用了 shutDown() 方法之后
  • STOP: 不接受新任务,也不处理排队任务,并且中断正在进行的任务,就是调用 shutDownNow() 方法之后
  • TIDYING: 所有任务都终止, workerCount 为零,这个时候线程池是 TIDYING 状态,会运行 terminate() 钩子方法,钩子方法具体后面再说
  • TERMINATED: terminate() 方法执行完毕之后

总结

本文介绍了几种常用的线程池,以及创建线程池的方式

  • 固定数量线程池 newFixedThreadPool()
  • 单线程池 newSingleThreadExecutor()
  • 可缓存线程池 newCachedThreadPool()
  • 延迟执行/周期性执行线程池 newScheduledThreadPool()

以及他们的特点,最后还有创建线程池的最佳方式,总的来说,就是要注意以下几个点

  1. 避免任务堆积
  2. 避免线程数过度增加
  3. 排查线程泄露

还有线程池的关闭方法

  • shutDown() 优雅关闭线程池
  • shutDownNow() 立即关闭线程池
  • isShutDown() 是否是关闭状态
  • isTerminated() 查看是否执行完毕及关闭
  • awaitTermination() 阻塞一定时间,检测任务是否执行完成关闭

最后是线程池的几种状态