线程池
在高并发场景下,线程池是会被频繁使用到的,简单介绍下线程池:
-
线程池基础参数:核心线程数、最大线程数、线程最大存活时间、时间单位、阻塞队列、线程池工厂、拒绝策略
-
创建方式:
- ThreadPoolExecutor类:ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 1, 0, TimeUtil.SECONDS, new ArrayBlockingQueue(), new DefaultThreadFactory())
- Executors类:
- newFixedThreadPool(1);
- newSingleThreadExecutor();
- newCachedThreadPool();
- newScheduledThreadPool(1);
- 等待队列:
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:由链表结构组成的有界阻塞队列
- DelayQueue:使用优先级队列实现的无界阻塞队列
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列
- 拒绝策略:
- DiscardPolicy:丢弃被拒绝任务
- DiscardOldestPolicy:丢弃队列头部的任务
- AbortPolicy:抛出RejectedExecutionException
- CallerRunsPolicy:在调用execute方法的线程中运行被拒绝的任务
-
工作原理:
正文
-
线程池创建过程
-
创建语句:
1
2
3
4
5
6
7
8
9ThreadPoolExecutor pools = new ThreadPoolExecutor(5, 10, 10, TimeUtil.SECONDS, new LinkedBlockingQueue(), new ThreadPoolExecutor.AbortPolicy());
--
核心线程数:5
最大线程数:10
非核心线程最大存活时间:10秒
阻塞队列:LinkedBlockingQueue
线程池工厂:DefaultThreadFactory
拒绝策略:AbortPolicy -
在线程池真正运行之前,核心线程尚未创建,因为默认是在实际使用的时候才会去创建,但是如果我们想要在线程池创建的时候就初始化核心线程,可以调用ThreadPoolExecutor的实例方法
prestartAllCoreThreads()
,如果我们想要让核心线程在空闲时可以过期,那么我们可以调用ThreadPoolExecutor的实例方法allowCoreThreadTimeOut(boolean value)
来设置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26/**
* 设置核心线程是否允许过期
*/
public void allowCoreThreadTimeOut(boolean value) {
// 若value为true,但是线程最大存活时间不大于0,那么则抛异常
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
// 如果设置的新值和当前值不同,则执行计划
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
// 若value为true,则终止线程池内的所有空闲Worker
if (value)
interruptIdleWorkers();
}
}
/**
* 初始化核心线程
*/
public int prestartAllCoreThreads() {
int n = 0;
// 循环创建工作线程Worker
while (addWorker(null, true))
++n;
return n;
} -
创建线程Worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72private boolean addWorker(Runnable firstTask, boolean core) {
retry:// goto语法
for (;;) {// 无限循环,循环体内控制退出
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);// 当前线程数
// 校验当前正在执行的线程数是否超过了2^29 - 1,或者根据创建的是否为核心线程来与核心线程数和最大线程数做校验,如果已经超过了相关的值,则返回false拒绝创建
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 改变当前运行的线程数,这里使用的CAS来保证线程安全,设置成功则跳出最外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 当前线程池状态和方法最初的对比,若不等,则重新执行for循环体
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建Worker线程实例
w = new Worker(firstTask);
// Worker实例的属性,在Worker构造器中通过getThreadFactory().newThread(this);来创建
final Thread t = w.thread;
if (t != null) {
// 加锁,保证线程安全
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 校验rs是否为RUNNING,或者停止且队列中无任务
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将新创建的Worker实例放入HashSet集合中
workers.add(w);
int s = workers.size();
// 更新最大线程运行数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 工作线程创建成功后,调用线程的start()方法开启线程
if (workerAdded) {
t.start();// tag-cc307
workerStarted = true;
}
}
} finally {
// 创建失败的话,则处理失败计划
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}由addWorker方法我们可以看出,线程池的核心执行器是Worker内部类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// Worker类定义
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
/**
final修饰:不可被扩展
继承自AQS:保证线程运行的隔离性,线程池的线程安全核心
实现自Runnable,所以Worker也是一个线程类
*/
// 构造器
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 使用线程池工厂创建新线程,并将创建的线程赋值给实例属性thread,也就是在我们调用了thread的start()方法之后,会运行Worker类中的run()方法
this.thread = getThreadFactory().newThread(this);
}看到这里,就应该去看Worker类中的run方法了,我们看到在run方法中调用了runWorker方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95// 核心关键方法,final修饰,不允许被overload和override
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 全文搜索tag-cc307
Runnable task = w.firstTask;
w.firstTask = null;
// 加锁前先释放锁,查看Worker中的tryRelease方法
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 无限循环,这里的无限循环的实现方法主要在getTask()方法中,getTask()是从阻塞队列中获取等待的任务,这里我们可以看到阻塞队列中存储的是一个个Runnable实例
while (task != null || (task = getTask()) != null) {
// 线程加锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行前计划
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 直接调用任务的run方法,这里其实就是将队列中Runnable实例当成普通的非线程对象,我们都知道直接调用线程的run方法会以普通方法的形式去执行,这里之所以这样写,是因为我们当前已经处于一个线程中了,没必要再去启用一个线程去执行任务,否则线程池就没有存在的必要了
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
// 记录线程Worker的成功任务数
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
/**
* 获取队列中的任务
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 核心:判断是否允许核心线程过期 或 当前工作线程数是否超过了核心线程数,timed决定了是否回收核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 若需要销毁工作线程,则使用poll方法使阻塞队列消失
// 否则通过take方法继续阻塞,直到队列中有新数据
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
-
总结
从以上的内容中我们可以看出来:线程池运行的线程和队列中等待的线程不是同一个,线程池中实际运行的线程是Worker实例