高级工具
ConcurrentHashMap
原理和线程池还没看,缓一缓。
线程池
线程池的基本原理就是启动一些线程,线程执行完任务后,回收线程到池中等待下次的调用而非关闭线程。示意图如下:
graph LR
subgraph ThreadPool
t1
t2
t3
end
subgraph BlockingQueue
task1-->task2-->task3
end
t1-->|pool|task1
t2-.->|pool|task1
t3-.->|pool|task1
task3--put-->main
自定义阻塞队列
用 ReentrantLock
锁实现。可精准唤醒。ReentrantLock
的超时等待的返回值是剩余要等待的时间,解决了虚假唤醒的问题。
class BlockingQueue<T> {
// 1.任务队列
private Deque<T> queue = new ArrayDeque<>();
// 2.锁
private ReentrantLock lock = new ReentrantLock();
// 3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 5.容量
private int capcity;
public BlockingQueue() {}
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
// 带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
try {
// 将超时时间统一转换为纳秒。
long nanos = unit.toNanos(timeout);
lock.lock();
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
// 返回的是剩余的,需要的等待时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞获取,有元素才能获取
public T take() {
try {
lock.lock();
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
public void put(T element) {
try {
lock.lock();
while (queue.size() == capcity) {
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
public int size() {
try {
lock.lock();
return queue.size();
} finally {
lock.unlock();
}
}
}
自定义线程池
使用自定义的阻塞队列,实现自定义线程池。线程的复用技巧在于,当一个任务执行完毕后,去查看阻塞队列中是否有任务,有则继续执行,没有则等待/释放(等待还是释放看你采用何种策略)
// 纯自定义线程池
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合,泛型定义为 Thread 保存的信息不够,自定义一个类型,保存更为丰富的信息。
private HashSet<Worker> workers = new HashSet();
// 核心线程数
private int coreSize;
// 工作一段时间后没有任务了,线程一直再运行也是浪费,设置一个超时时间,超过这个时间就销毁线程。
private long timeout;
private TimeUnit timeUnit;
public void execute(Runnable task) {
// 任务数没有超过 coreSize 就交给 worker 对象执行
// 超过了就加入任务队列暂存。
synchronized (workers) {
if (workers.size() < coreSize) {
log.debug("coreSize is ok");
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
} else {
log.debug("put Blocking Queue");
taskQueue.put(task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
}
class Worker extends Thread {
private Runnable task;
public Worker() {}
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// 当 task 不为空,则执行
// 当 task 执行完毕,则从任务队列获取任务并执行。这样就实现了线程的复用。
// take 拿不到会一直死等。线程复用。
while (task != null || (task = taskQueue.take()) != null) { // 如果使用 poll 就不会死等,可以执行到 sync 的移除操作。
try {
log.debug("run task");
task.run();
} catch (Exception e) {
} finally {
task = null;
}
}
synchronized (workers) {
// remove 执行不到。 需要用到超时策略,才会运行到 remove
workers.remove(this);
}
}
}
}
测试代码
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.TestPool")
/**
* 线程池中如果没有线程,那就创建线程执行。
* 如果有线程,但是数量不足,就放入阻塞队列
*/
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 2, TimeUnit.SECONDS, 10);
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
拒绝策略
为什么要拒绝策略
假定
- 最大执行任务数 = 2
- 阻塞队列最大容量 = 10
- 即最多可以接收 12 个任务。如果超出了,怎么办?
package pool.base;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.TestPool")
/**
* 线程池中如果没有线程,那就创建线程执行。
* 如果有线程,但是数量不足,就放入阻塞队列
*/
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 2, TimeUnit.SECONDS, 10);
for (int i = 0; i < 20; i++) {
final int j = i;
threadPool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}", j + ""); // 不加这个,输出会有问题,都是输出同一个对象的 hash 值,但是执行的任务是没有问题的。
});
}
}
}
// 纯自定义线程池
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合,泛型定义为 Thread 保存的信息不够,自定义一个类型,保存更为丰富的信息。
private HashSet<Worker> workers = new HashSet();
// 核心线程数
private int coreSize;
// 工作一段时间后没有任务了,线程一直再运行也是浪费,设置一个超时时间,超过这个时间
private long timeout;
private TimeUnit timeUnit;
public void execute(Runnable task) {
// 任务数没有超过 coreSize 就交给 worker 对象执行
// 超过了就加入任务队列暂存。
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增 worker{},{}", worker, task);
workers.add(worker);
worker.start();
} else {
taskQueue.put(task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
}
class Worker extends Thread {
private Runnable task;
public Worker() {}
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// 当 task 不为空,则执行
// 当 task 执行完毕,则从任务队列获取任务并执行。这样就实现了线程的复用。
// take 拿不到会一直死等。线程复用。
while (task != null || (task = taskQueue.take()) != null) { // 大哥李风格
try {
log.debug("正在执行{}", task);
task.run();
} catch (Exception e) {
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker 被移除{}", this);
// remove 执行不到。 需要用到超时策略,才会运行到 remove
workers.remove(this);
}
}
}
}
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
// 1.任务队列
private Deque<T> queue = new ArrayDeque<>();
// 2.锁
private ReentrantLock lock = new ReentrantLock();
// 3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 5.容量
private int capcity;
public BlockingQueue() {}
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
// 带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// 将超时时间统一转换为纳秒。
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
// 返回的是剩余的,需要的等待时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞获取,有元素才能获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("等待加入任务队列{}....", task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
log.debug("加入任务队列{}", task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
public int size() {
try {
lock.lock();
return queue.size();
} finally {
lock.unlock();
}
}
}
目前是 log.debug("等待加入任务队列{}....", task); fullWaitSet.await();
阻塞队列容量满了,阻塞队列就阻塞住自己,等有空余了,再把这些任务加进去。
11:48:19.780 c.ThreadPool [main] - 新增 workerThread[Thread-0,5,main],pool.base.TestPool$$Lambda$1/1879034789@6833ce2c
11:48:19.784 c.ThreadPool [main] - 新增 workerThread[Thread-1,5,main],pool.base.TestPool$$Lambda$1/1879034789@d7b1517
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@23223dd8
11:48:19.784 c.ThreadPool [Thread-0] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@6833ce2c
11:48:19.784 c.ThreadPool [Thread-1] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@d7b1517
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@4ec6a292
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@1b40d5f0
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@ea4a92b
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@3c5a99da
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@47f37ef1
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@5a01ccaa
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@71c7db30
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@19bb089b
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@4563e9ab
11:48:19.784 c.BlockingQueue [main] - 等待加入任务队列pool.base.TestPool$$Lambda$1/1879034789@11531931....
11:48:24.793 c.TestPool [Thread-1] - 1
11:48:24.793 c.TestPool [Thread-0] - 0
11:48:24.793 c.ThreadPool [Thread-0] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@4ec6a292
11:48:24.793 c.ThreadPool [Thread-1] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@23223dd8
11:48:24.793 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@11531931
11:48:24.793 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@1fbc7afb
11:48:24.793 c.BlockingQueue [main] - 等待加入任务队列pool.base.TestPool$$Lambda$1/1879034789@45c8e616....
11:48:29.805 c.TestPool [Thread-1] - 2
11:48:29.805 c.TestPool [Thread-0] - 3
11:48:29.805 c.ThreadPool [Thread-0] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@ea4a92b
11:48:29.805 c.ThreadPool [Thread-1] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@1b40d5f0
11:48:29.805 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@45c8e616
11:48:29.805 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@4cdbe50f
11:48:29.805 c.BlockingQueue [main] - 等待加入任务队列pool.base.TestPool$$Lambda$1/1879034789@66d33a....
11:48:34.815 c.TestPool [Thread-1] - 4
11:48:34.815 c.TestPool [Thread-0] - 5
11:48:34.815 c.ThreadPool [Thread-1] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@3c5a99da
11:48:34.815 c.ThreadPool [Thread-0] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@47f37ef1
11:48:34.815 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@66d33a
11:48:34.815 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@7cf10a6f
11:48:34.815 c.BlockingQueue [main] - 等待加入任务队列pool.base.TestPool$$Lambda$1/1879034789@7e0babb1....
11:48:39.826 c.TestPool [Thread-1] - 6
11:48:39.826 c.TestPool [Thread-0] - 7
11:48:39.826 c.ThreadPool [Thread-1] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@5a01ccaa
11:48:39.826 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@7e0babb1
11:48:39.826 c.BlockingQueue [main] - 等待加入任务队列pool.base.TestPool$$Lambda$1/1879034789@6debcae2....
11:48:39.826 c.ThreadPool [Thread-0] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@71c7db30
11:48:39.826 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@6debcae2
11:48:44.835 c.TestPool [Thread-0] - 9
11:48:44.835 c.TestPool [Thread-1] - 8
11:48:44.835 c.ThreadPool [Thread-0] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@19bb089b
11:48:44.835 c.ThreadPool [Thread-1] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@4563e9ab
不同场景所要用到的策略有所差别,不能总是一味的阻塞队列等待,有空位后将任务加入阻塞队列。应该是有多种策略可供选择。
带拒绝策略的代码
package pool;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.TestPool")
/**
* 线程池中如果没有线程,那就创建线程执行。
* 如果有线程,但是数量不足,就放入阻塞队列
*/
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2,
1000, TimeUnit.MILLISECONDS, 5, (queue, task) -> {
// 1. 死等
// queue.put(task);
// 2) 带超时等待
queue.offer(task, 200, TimeUnit.MILLISECONDS);
// 3) 让调用者放弃任务执行
// log.debug("放弃{}", task);
// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);
// 5) 让调用者自己执行任务
// task.run();
});
ConcurrentHashMap<Integer, Integer> list = new ConcurrentHashMap<>();
for (int i = 0; i < 14; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
for (int k = 0; k < 10; k++) {
list.put(j, j);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// log.debug("{}", j); // 不加这个,会有问题,为什么会这样,不知道。
}, j + "");
}
while (Thread.activeCount() > 2) {
}
Enumeration<Integer> keys = list.keys();
while (keys.hasMoreElements()) {
System.out.print(keys.nextElement() + "\n");
}
}
}
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
// 纯自定义线程池
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合,泛型定义为 Thread 保存的信息不够,自定义一个类型,保存更为丰富的信息。
private HashSet<Worker> workers = new HashSet();
// 核心线程数
private int coreSize;
// 工作一段时间后没有任务了,线程一直再运行也是浪费,设置一个超时时间,超过这个时间
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
public void execute(Runnable task, String name) {
// 任务数没有超过 coreSize 就交给 worker 对象执行
// 超过了就加入任务队列暂存。
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task, name);
log.debug("新增 worker{},{}", worker, task);
workers.add(worker);
worker.start();
} else {
taskQueue.tryPut(rejectPolicy, task);
// 1. 死等
// 2. 带超时等待
// 3. 让调用者放弃任务执行
// 4. 让调用者抛出异常
// 5. 让调用者自己执行任务
// 用策略模式
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}
class Worker extends Thread {
private Runnable task;
@Override
public String toString() {
return "Worker{" +
"name=" + this.getName() +
'}';
}
public Worker() {}
public Worker(Runnable task, String name) {
this.task = task;
this.setName(name);
}
@Override
public void run() {
// 执行任务
// 当 task 不为空,则执行
// 当 task 执行完毕,则从任务队列获取任务并执行。这样就实现了线程的复用。
// take 拿不到会一直死等。线程复用。
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) { // 大哥李风格
try {
log.debug("正在执行{}", task);
task.run();
log.debug("执行完毕{}", task);
} catch (Exception e) {
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker 被移除{}", this);
// remove 执行不到。 需要用到超时策略,才会运行到 remove
workers.remove(this);
}
}
}
}
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
// 1.任务队列
private Deque<T> queue = new ArrayDeque<>();
// 2.锁
private ReentrantLock lock = new ReentrantLock();
// 3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 5.容量
private int capcity;
public BlockingQueue() {}
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
// 带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
try {
// 将超时时间统一转换为纳秒。
lock.lock();
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
// 返回的是剩余的,需要的等待时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞获取,有元素才能获取
public T take() {
try {
lock.lock();
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
public void put(T task) {
try {
lock.lock();
while (queue.size() == capcity) {
try {
log.debug("等待加入任务队列{}....", task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列{}", task);
queue.addLast(task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
// 带超时时间的阻塞添加方法
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
try {
lock.lock();
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capcity) {
try {
if (nanos <= 0) {
return false;
}
log.debug("等待加入任务队列{}....", task);
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 就等待 Nanos 秒,没等到也不阻塞,直接跑路
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列{}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
public int size() {
try {
lock.lock();
return queue.size();
} finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
try {
lock.lock();
// 队列满
if (queue.size() == capcity) {
// 拒绝策略
rejectPolicy.reject(this, task);
} else {
log.debug("加入任务队列{}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}
输出结果如下:
11:44:23.811 c.ThreadPool [main] - 新增 workerWorker{name=0},pool.TestPool$$Lambda$2/984849465@1888ff2c
11:44:23.814 c.ThreadPool [main] - 新增 workerWorker{name=1},pool.TestPool$$Lambda$2/984849465@1b40d5f0
11:44:23.814 c.ThreadPool [0] - 正在执行pool.TestPool$$Lambda$2/984849465@1888ff2c
11:44:23.814 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@3c5a99da
11:44:23.814 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@47f37ef1
11:44:23.814 c.ThreadPool [1] - 正在执行pool.TestPool$$Lambda$2/984849465@1b40d5f0
11:44:23.814 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@5a01ccaa
11:44:23.814 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@71c7db30
11:44:23.814 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@19bb089b
11:44:23.814 c.BlockingQueue [main] - 等待加入任务队列pool.TestPool$$Lambda$2/984849465@4563e9ab....
11:44:24.018 c.BlockingQueue [main] - 等待加入任务队列pool.TestPool$$Lambda$2/984849465@5e025e70....
11:44:24.223 c.BlockingQueue [main] - 等待加入任务队列pool.TestPool$$Lambda$2/984849465@1fbc7afb....
11:44:24.430 c.BlockingQueue [main] - 等待加入任务队列pool.TestPool$$Lambda$2/984849465@45c8e616....
11:44:24.633 c.BlockingQueue [main] - 等待加入任务队列pool.TestPool$$Lambda$2/984849465@4cdbe50f....
11:44:24.825 c.ThreadPool [0] - 执行完毕pool.TestPool$$Lambda$2/984849465@1888ff2c
11:44:24.825 c.ThreadPool [1] - 执行完毕pool.TestPool$$Lambda$2/984849465@1b40d5f0
11:44:24.825 c.ThreadPool [0] - 正在执行pool.TestPool$$Lambda$2/984849465@3c5a99da
11:44:24.825 c.ThreadPool [1] - 正在执行pool.TestPool$$Lambda$2/984849465@47f37ef1
11:44:24.825 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@4cdbe50f
11:44:24.825 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@66d33a
11:44:24.825 c.BlockingQueue [main] - 等待加入任务队列pool.TestPool$$Lambda$2/984849465@7cf10a6f....
11:44:25.840 c.ThreadPool [1] - 执行完毕pool.TestPool$$Lambda$2/984849465@47f37ef1
11:44:25.840 c.ThreadPool [0] - 执行完毕pool.TestPool$$Lambda$2/984849465@3c5a99da
11:44:25.840 c.ThreadPool [1] - 正在执行pool.TestPool$$Lambda$2/984849465@5a01ccaa
11:44:25.840 c.ThreadPool [0] - 正在执行pool.TestPool$$Lambda$2/984849465@71c7db30
11:44:26.854 c.ThreadPool [1] - 执行完毕pool.TestPool$$Lambda$2/984849465@5a01ccaa
11:44:26.854 c.ThreadPool [0] - 执行完毕pool.TestPool$$Lambda$2/984849465@71c7db30
11:44:26.854 c.ThreadPool [1] - 正在执行pool.TestPool$$Lambda$2/984849465@19bb089b
11:44:26.854 c.ThreadPool [0] - 正在执行pool.TestPool$$Lambda$2/984849465@4cdbe50f
11:44:27.868 c.ThreadPool [1] - 执行完毕pool.TestPool$$Lambda$2/984849465@19bb089b
11:44:27.868 c.ThreadPool [0] - 执行完毕pool.TestPool$$Lambda$2/984849465@4cdbe50f
11:44:27.868 c.ThreadPool [1] - 正在执行pool.TestPool$$Lambda$2/984849465@66d33a
11:44:28.869 c.ThreadPool [0] - worker 被移除Worker{name=0}
11:44:28.869 c.ThreadPool [1] - 执行完毕pool.TestPool$$Lambda$2/984849465@66d33a
11:44:29.872 c.ThreadPool [1] - worker 被移除Worker{name=1}
0 1 2 3 4 5 6 11 12
Process finished with exit code 0
ThreadPoolExecutor
- submit 方式执行,有返回值
- execute 方式执行,无返回值

线程池状态
ThreadPoolExecutor
使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
状态名 | 高3位 | 接收新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | |
SHUTDOWN | 000 | N | Y | 不会接收新任务,但会处理阻塞队列剩余任务 |
STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列任务 |
TIDYING | 010 | - | - | 任务全部执行完毕,活动线程为 0 即将进入终结 |
TERMINATED | 011 | - | - | 终结状态 |
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作 进行赋值
// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize
核心线程数目 (最多保留的线程数)maximumPoolSize
最大线程数目 (核心线程数+救急线程数 = maximumPoolSize)keepAliveTime
生存时间 - 针对救急线程 救急线程没任务做时可存活多久unit
时间单位 - 针对救急线程workQueue
阻塞队列threadFactory
线程工厂 - 可以为线程创建时起个好名字handler
拒绝策略,当救急线程也用完了,才会执行拒绝策略。
graph LR
subgraph 阻塞队列
size=2
end
subgraph 线程池 c=2,m=3
ct1(核心线程1)
ct2(核心线程2)
ct3(救济线程3)
end
t1(任务1)
graph LR
subgraph 阻塞队列
t3(任务3)
t4(任务4)
size=2
end
subgraph 线程池 c=2,m=3
ct1(核心线程1) --> t1(任务1)
ct2(核心线程2) --> t2(任务2)
ct3(救急线程3)
end
救急线程执行完后会被销毁掉(有生存时间),核心线程会被保留到线程池种,一直被运行。
工作方式
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
- 当线程数达到
corePoolSize
并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue
队列排队,直到有空闲的线程。 - 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
- 如果线程到达
maximumPoolSize
仍然有新任务这时会执行拒绝策略。拒绝策略JDK
提供了 4 种实现,其它著名框架也提供了实现AbortPolicy
让调用者抛出RejectedExecutionException
异常,这是默认策略CallerRunsPolicy
让调用者运行任务DiscardPolicy
放弃本次任务DiscardOldestPolicy
放弃队列中最早的任务,本任务取而代之Dubbo
的实现,在抛出RejectedExecutionException
异常之前会记录日志,并 dump 线程栈信息,方便定位问题- Netty 的实现是创建一个新线程来执行任务
ActiveMQ
的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略PinPoint
的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
- 当高峰过去后,超过
corePoolSize
的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。

根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
特点:
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
适用于任务量已知,相对耗时的任务
@Slf4j(topic = "c.TestNewCachedThreadPool")
public class TestNewCachedThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
log.debug("1");
});
executorService.execute(() -> {
log.debug("1");
});
executorService.execute(() -> {
log.debug("1");
});
}
}
捋一捋这些方法
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 用的 defaultThreadFactory 创建的线程
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/// 看下建线程工程如何创建线程
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
// 线程工程
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + // 线程的命名
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false); // 线程工厂创建的线程是非守护线程!不会因为 main 线程的退出而终止!
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
- 线程工程创建的线程是非守护线程!不会因为 main 线程的退出而终止!
- 我们也可以自定义线程工厂。
自定义线程工厂
自定义线程工厂的作用就是可以自己定义线程的名字。
public static void test() {
Executors.newFixedThreadPool(2, new ThreadFactory() {
private AtomicInteger atomic = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "my_thread" + atomic.getAndIncrement());
}
});
}
newCachedThreadPool
public static void test1() {
ExecutorService executorService = Executors.newCachedThreadPool();
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 都是救急线程
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特点
- 核心线程数都是 0,最大线程数是
Integer.MAX_VALUE
,救急线程的空闲生存时间是 60s,意味着- 全部都是救急线程(60s 后可以回收)
- 救急线程可以无限创建
- 队列采用了
SynchronousQueue
,该队列的特点是,它没有容量。没有线程来取是放不进去的(一手交钱、一手交货)
public static void testQueue() throws InterruptedException {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
new Thread(() -> {
try {
log.debug("put before 1");
queue.put(1);
log.debug("put after 1");
log.debug("put before 2");
queue.put(2);
log.debug("put after 2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
try {
log.debug("take 1");
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
log.debug("take 2");
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
11:01:02.306 c.TestNewCachedThreadPool [Thread-0] - put before 1
11:01:03.306 c.TestNewCachedThreadPool [Thread-1] - take 1
11:01:03.306 c.TestNewCachedThreadPool [Thread-0] - put after 1
11:01:03.306 c.TestNewCachedThreadPool [Thread-0] - put before 2
11:01:03.306 c.TestNewCachedThreadPool [Thread-2] - take 2
11:01:03.306 c.TestNewCachedThreadPool [Thread-0] - put after 2
适用场景
整个线程池表现为会根据任务量不断增长,没有上限,当任务执行完毕,空闲 60s 后释放线程。适合任务数比较密集,但每个任务执行时间较短的情况。【不适合任务执行时间长的,如果执行时间长,不断累加线程,就会爆内存。】
newSingleThreadExecutor
public static void testNewSingleThreadExecutor(){
ExecutorService executorService = Executors.newSingleThreadExecutor();
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, // 核心线程数和最大线程数都是1
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
适用场景
希望多个任务队列执行,线程数固定为 1,任务数多于 1 时,会放入无界队列排队,当任务执行完毕,这唯一的线程也不会被释放
自己创建一个线程来执行和单线程的线程池之间的区别
- 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一 个线程,保证池的正常工作
Executors.newSingleThreadExecutor()
线程个数始终为 1,不能修改FinalizableDelegatedExecutorService
应用的是装饰器模式,只对外暴露了ExecutorService
接口,因 此不能调用ThreadPoolExecutor
中特有的方法
Executors.newFixedThreadPool(1)
初始时为1,以后还可以修改- 对外暴露的是
ThreadPoolExecutor
对象,可以强转后调用setCorePoolSize
等方法进行修改
- 对外暴露的是
public static void testNewSingleThreadExecutor() {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
threadPool.execute(() -> {
log.debug("1");
int i = 1 / 0;
});
threadPool.execute(() -> {
log.debug("1");
});
threadPool.execute(() -> {
log.debug("1");
});
threadPool.execute(() -> {
log.debug("1");
});
}
提交任务
// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
测试 Submit – 用返回值 Future 获得任务执行结果
public static void testSubmit() throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
Future<String> submit = threadPool.submit(() -> {
log.debug("OKK");
TimeUnit.SECONDS.sleep(2);
return "1";
});
// main 线程阻塞等待结果
log.debug("{}", submit.get());
}
测试
invokeAll
– 提交 tasks 中所有任务
public static void invokeAll() throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
threadPool.invokeAll(Arrays.asList(
() -> {
log.debug("1");
TimeUnit.SECONDS.sleep(1);
return 1;
},
() -> {
log.debug("2");
TimeUnit.SECONDS.sleep(2);
return 1;
},
() -> {
log.debug("3");
TimeUnit.SECONDS.sleep(3);
return 1;
}
));
threadPool.shutdown();
}
/*
11:34:55.879 c.TestSubmit [pool-1-thread-2] - 2
11:34:55.879 c.TestSubmit [pool-1-thread-1] - 1
11:34:56.881 c.TestSubmit [pool-1-thread-1] - 3
*/
测试
invokeAny
– 提交 tasks 中所有任务,返回最先执行完的,然后取消其他任务。
public static void invokeAny() throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
threadPool.invokeAny(Arrays.asList(
() -> {
log.debug("1 start");
TimeUnit.SECONDS.sleep(1);
log.debug("1 end");
return 1;
},
() -> {
log.debug("2 start");
TimeUnit.SECONDS.sleep(2);
log.debug("2 end");
return 2;
},
() -> {
log.debug("3 start");
TimeUnit.SECONDS.sleep(3);
log.debug("3 end");
return 3;
}
));
threadPool.shutdown();
}
/*
11:31:44.869 c.TestSubmit [pool-1-thread-2] - 2 start
11:31:44.869 c.TestSubmit [pool-1-thread-1] - 1 start
11:31:45.885 c.TestSubmit [pool-1-thread-1] - 1 end
11:31:45.885 c.TestSubmit [pool-1-thread-1] - 3 start
*/
终止线程
shutdown – 调用后不会阻塞调用 shutdown 的线程。
/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();
源码
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
tryTerminate();
}
示例代码
public static void shutdown() {
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.execute(() -> {
log.debug("running 1");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("over 1");
});
pool.execute(() -> {
log.debug("running 2");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("over 2");
});
pool.execute(() -> {
log.debug("running 3");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("over 3");
});
log.debug("before shutdown");
pool.shutdown(); // 执行这个方法并不会阻塞线程
pool.awaitTermination(40,TimeUnit.SECONDS); // 执行这个方法会阻塞住线程40秒。40秒后就不在阻塞。如果线程提取执行完毕了,则阻塞立马结束。
log.debug("executors shutdown and wait thread over!");
}
/*
11:51:11.735 c.TestShutDown [main] - before shutdown
11:51:11.735 c.TestShutDown [pool-1-thread-1] - running 1
11:51:11.736 c.TestShutDown [main] - executors shutdown and wait thread over!
11:51:12.751 c.TestShutDown [pool-1-thread-1] - over 1
11:51:12.751 c.TestShutDown [pool-1-thread-1] - running 2
11:51:13.766 c.TestShutDown [pool-1-thread-1] - over 2
11:51:13.766 c.TestShutDown [pool-1-thread-1] - running 3
11:51:14.775 c.TestShutDown [pool-1-thread-1] - over 3
*/
shutdownNow
/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
public List<Runnable> shutdownNow(){
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终结
tryTerminate();
return tasks;
}
示例代码
public static void shutdownNow() throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.execute(() -> {
log.debug("running 1");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("over 1");
});
pool.execute(() -> {
log.debug("running 2");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("over 2");
});
pool.execute(() -> {
log.debug("running 3");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("over 3");
});
log.debug("before shutdown");
TimeUnit.MILLISECONDS.sleep(2000);
List<Runnable> remain = pool.shutdownNow();
System.out.println(remain.size());
log.debug("executors shutdown and wait thread over!");
}
/*
11:57:47.429 c.TestShutDown [main] - before shutdown
11:57:47.429 c.TestShutDown [pool-1-thread-1] - running 1
11:57:48.445 c.TestShutDown [pool-1-thread-1] - over 1
11:57:48.445 c.TestShutDown [pool-1-thread-1] - running 2
1 // 未执行完的线程数
11:57:49.443 c.TestShutDown [main] - executors shutdown and wait thread over!
11:57:49.444 c.TestShutDown [pool-1-thread-1] - over 2
*/
其他方法
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
工作线程模式
定义
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。
PS: 注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。
例如,如果一个餐馆的工人既要招呼客人(任务类型 A),又要到后厨做菜(任务类型 B)显然效率不咋地,分成服务员(线程池 A)与厨师(线程池 B)更为合理,当然你能想到更细致的分工
饥饿
固定大小线程池会有饥饿现象
假设有两个线程:工人 A 和工人 B 。他们都可以做菜+点餐。这时候,同时来了两个客人,工人 A 和工人 B 都去处理点餐了,这时没人做饭了,产生了饥饿(线程池中的线程不足导致的)
一人做菜,一人点餐,合理运行,无饥饿
@Slf4j(topic = "c.TestDeadLock")
public class TestDeadLock {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
log.debug("处理点餐...");
Future<String> f = executorService.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
/*
12:27:22.069 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
12:27:22.072 c.TestDeadLock [pool-1-thread-2] - 做菜
12:27:22.072 c.TestDeadLock [pool-1-thread-1] - 上菜: 地三鲜
*/
都可做菜,点餐,饥饿
@Slf4j(topic = "c.TestDeadLock")
public class TestDeadLock {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
log.debug("处理点餐...");
Future<String> f = executorService.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
executorService.execute(() -> {
log.debug("处理点餐...");
Future<String> f = executorService.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
/*
12:28:38.740 c.TestDeadLock [pool-1-thread-2] - 处理点餐...
12:28:38.740 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
*/
我们 jsp 查看TestDeadLock 线程 id,然后 jstack 查看线程的状态信息,发现是饥饿(都在 Waiting,如果是死锁会显示 DeadLock!),不是死锁。
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.301-b09 mixed mode):
"DestroyJavaVM" #14 prio=5 os_prio=0 tid=0x000002054590d800 nid=0x1c70 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"pool-1-thread-2" #13 prio=5 os_prio=0 tid=0x0000020562b79800 nid=0x28e8 waiting on condition [0x000000c9659ff000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076c7dc858> (a java.util.concurrent.FutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
at pool.TestDeadLock.lambda$main$3(TestDeadLock.java:43)
at pool.TestDeadLock$$Lambda$2/984849465.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"pool-1-thread-1" #12 prio=5 os_prio=0 tid=0x0000020562b76000 nid=0x3c30 waiting on condition [0x000000c9658fe000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076c584f80> (a java.util.concurrent.FutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
at pool.TestDeadLock.lambda$main$1(TestDeadLock.java:31)
at pool.TestDeadLock$$Lambda$1/875827115.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
解决饥饿
合理分配任务
@Slf4j(topic = "c.TestStarvation")
public class TestStarvation {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
ExecutorService cookiePool = Executors.newFixedThreadPool(1);
waiterPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = cookiePool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
waiterPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = cookiePool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
/*
12:36:06.902 c.TestStarvation [pool-1-thread-1] - 处理点餐...
12:36:06.905 c.TestStarvation [pool-2-thread-1] - 做菜
12:36:06.905 c.TestStarvation [pool-1-thread-1] - 上菜: 地三鲜
12:36:06.906 c.TestStarvation [pool-1-thread-1] - 处理点餐...
12:36:06.907 c.TestStarvation [pool-2-thread-1] - 做菜
12:36:06.907 c.TestStarvation [pool-1-thread-1] - 上菜: 烤鸡翅
*/
创建多少线程池合适
- 过小会导致程序不能充分地利用系统资源、容易导致饥饿
- 过大会导致更多的线程上下文切换,占用更多内存
CPU 密集型运算
通常采用 cpu 核数 +1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
I/O 密集型运算
CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程 RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。 经验公式如下 \(线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间\) 例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式 \(4 * 100\% * (100\% / 50\%) = 8\) 例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式 \(4 * 100\% * (100\% / 10\%) = 40\)
任务调度线程池
Timer
在『任务调度线程池』功能加入之前,可以使用 java.util.Timer
来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
@Slf4j(topic = "c.TestTimer")
public class TestTimer {
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@SneakyThrows
@Override
public void run() {
log.debug("task 1");
TimeUnit.SECONDS.sleep(2);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.debug("task 2");
}
};
// 使用 timer 添加两个任务,希望它们都在 1s 后执行
// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
}
/*
12:47:54.594 c.TestTimer [Timer-0] - task 1
12:47:56.604 c.TestTimer [Timer-0] - task 2
*/
ScheduledThreadPool
基本使用
@Slf4j(topic = "c.TestScheduledThreadPool")
public class TestScheduledThreadPool {
public static void main(String[] args) {
ScheduledExecutorService pools = Executors.newScheduledThreadPool(2);
pools.execute(() -> {
log.debug("Thread1");
try {
int i = 1 / 0; // 出现异常时不会影响其他任务的。即便核心线程数改为 1 也是这样。
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("End Thread2");
});
pools.execute(() -> {
log.debug("Thread2");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("End Thread2");
});
}
}
/*
10:10:00.528 c.TestScheduledThreadPool [pool-1-thread-1] - Thread1
10:10:00.528 c.TestScheduledThreadPool [pool-1-thread-2] - Thread2
10:10:02.537 c.TestScheduledThreadPool [pool-1-thread-1] - End Thread2
10:10:02.537 c.TestScheduledThreadPool [pool-1-thread-2] - End Thread2
*/
延迟执行
pools.schedule(()->{
System.out.println(12);
},2,TimeUnit.SECONDS); // 延迟 2 s 后执行
每隔 x 秒,重复执行。
如果程序的执行时间 > 间隔时间,那么就是每隔程序的执行时间秒,执行一次任务(执行的周期取执行时间和间隔时间的最大值)
schedule()
延迟指定秒数后执行任务scheduleAtFixedRate()
以固定时间间隔执行任务。时间间隔从当前任务开始后开始计算。如果程序的执行时间超过了间隔时间,则下一次程序的执行会立马执行。scheduleWithFixedDelay()
以固定时间间隔执行任务。时间间隔从当前任务结束后开始计算。
@Slf4j(topic = "c.TestScheduledThreadPool")
public class TestScheduledThreadPool {
public static void scheduleWithFixRate() {
ScheduledExecutorService pools = Executors.newScheduledThreadPool(1);
log.debug("start~");
// 线程运行后 1s 开始执行任务,定时执行。每隔 2s 执行一次,间隔时间从当前线程结束的时间点算起
pools.scheduleWithFixedDelay(() -> {
log.debug("Thread1");
}, 1, 1, TimeUnit.SECONDS);
pools.scheduleWithFixedDelay(() -> {
log.debug("Thread2");
}, 1, 1, TimeUnit.SECONDS);
}
public static void scheduleAtFixRate() {
ScheduledExecutorService pools = Executors.newScheduledThreadPool(1);
log.debug("start~");
pools.scheduleAtFixedRate(() -> {
log.debug("Thread1");
}, 1, 2, TimeUnit.SECONDS);
// 线程运行后 1s 开始执行任务,定时执行。每隔 2s 执行一次,间隔时间从当前线程开始运行的时间点算起。如果程序的运行时间超过了间隔时间
// 那么下一次执行会立马开始(执行的周期取执行时间和间隔时间的最大值。)
pools.scheduleAtFixedRate(() -> {
log.debug("Thread2");
}, 1, 2, TimeUnit.SECONDS);
}
public static void schedule() {
ScheduledExecutorService pools = Executors.newScheduledThreadPool(1);
pools.schedule(() -> {
log.debug("thread1");
}, 1, TimeUnit.SECONDS); // 线程运行后 1s 开始执行任务,只执行一次
pools.schedule(() -> {
log.debug("thread2");
}, 1, TimeUnit.SECONDS);
}
public static void main(String[] args) {
ScheduledExecutorService pools = Executors.newScheduledThreadPool(2);
pools.schedule(() -> {
System.out.println(12);
}, 2, TimeUnit.SECONDS);
pools.execute(() -> {
log.debug("Thread1");
try {
TimeUnit.SECONDS.sleep(2);
int i = 1 / 0;
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("End Thread2");
});
pools.execute(() -> {
log.debug("Thread2");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("End Thread2");
});
}
}
正确处理异常
- 自己
try cache
捕捉 - 采用 submit 提交任务,无异常就返回值,有异常就返回异常信息。
线程池的应用
定时执行任务;顺带学习下 LocalDate
。
import java.time.DayOfWeek;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class PoolApplication {
public static void main(String[] args) {
LocalDateTime now = LocalDateTime.now();
LocalDateTime start = now.withNano(0);
LocalDateTime end = start.withHour(17).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.FRIDAY);
if (end.isBefore(start)) {
end = end.plusWeeks(1);
}
Duration between = Duration.between(start, end);
System.out.println(between);
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
pool.scheduleAtFixedRate(() -> {
System.out.println("周五了!放假了");
}, 0, between.getSeconds(), TimeUnit.SECONDS);
}
}
Tomcat 线程池
graph LR
subgraph Connector["Connector(NIO EndPoint)"]
LimitLatch-->Acceptor-->s1[SocketChannel 1]
Acceptor-->s2[SocketChannel 2]
s1-->|有读|Poller
s2-->|有读|Poller
end
subgraph Executor
worker1
worker2
end
Poller-->|sockerProcessor|worker1
Poller-->|sockerProcessor|worker2
- LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
- Acceptor 只负责【接收新的 socket 连接,各司其职。】
- Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
- 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
- Executor 线程池中的工作线程最终负责【处理请求】
Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同
- 如果总线程数达到 maximumPoolSize
- 这时不会立刻抛
RejectedExecutionException
异常 - 而是再次尝试将任务放入队列,如果还失败,才抛出
RejectedExecutionException
异常
- 这时不会立刻抛
tomcat-7.0.42 源码
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
TaskQueue.java
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if ( parent.isShutdown() )
throw new RejectedExecutionException(
"Executor not running, can't force a command into the queue"
);
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}
Connector 配置
配置项 | 默认值 | 说明 |
---|---|---|
acceptorThreadCount | 1 | acceptor 线程数量 |
pollerThreadCount | 1 | poller 线程数量 |
minSpareThreads | 10 | 核心线程数,即 corePoolSize maxThreads |
executor | - | Executor 名称,用来引用下面的 Executor |
Executor 线程配置
配置项 | 默认值 | 说明 |
---|---|---|
threadPriority | 5 | 线程优先级 |
daemon | true | 是否守护线程 |
minSpareThreads | 25 | 核心线程数,即 corePoolSize |
maxThreads | 200 | 最大线程数,即 maximumPoolSize |
maxIdleTime | 60000 | 线程生存时间,单位是毫秒,默认值即 1 分钟 |
maxQueueSize | Integer.MAX_VALUE | 队列长度 , Java 队列满时才会创建救急线程,但是 tomcat做了修改 |
prestartminSpareThreads | false | 核心线程是否在服务器启动时启动(默认是懒惰初始化) |
放队列里是走流程,需要的再从队列中拿。
graph LR
add(添加新任务)-->submit[提交任务 < 核心线程]
submit-->|是|加入队列
submit-->|否|submit2[提交任务 < 最大线程]
submit2-->|否|加入队列
submit2-->|是|创建救急线程
Fork/Join
概念
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 CPU 密集型运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计 算,如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。Fork/Join 默认会创建与 CPU 核心数大小相同的线程池
使用
提交给 Fork/Join 线程池的任务需要继承 RecursiveTask
(有返回值)或 RecursiveAction
(没有返回值),例如下面定义了一个对 1~n 之间的整数求和的任务
ThreadLocal
ThreadLocal
人手一支笔,每个线程都独有一份自己的变量。
- 定义:提供线程局部变量;
- 特点:简单、快速(没有额外的锁开销)、安全(线程安全)
- 场景:多线程场景下可使用(资源持有、线程一致性、并发计算、线程安全等场景)
- 实现原理:哈希表实现
本节内容待优化
基础用法
public class ThreadLocalDemo {
public static void main(String[] args) {
ThreadLocal<String> local = new ThreadLocal<>();
System.out.println(local.get());
local.set("liu");
System.out.println(local.get());
}
}
/*
null
liu
*/
多线程中使用 ThreadLocal
import java.util.concurrent.TimeUnit;
public class ThreadLocalDemo {
private static void sleep(int second){
try {
TimeUnit.SECONDS.sleep(second);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
ThreadLocal<Integer> tl = new ThreadLocal<>();
System.out.println("main 设置 tl 的值为 1");
tl.set(1);
Thread th1 = new Thread(() -> {
System.out.println("th1 设置 tl 的值为 100");
tl.set(100);
System.out.println("获取 tl 线程中 tl 的值:" + tl.get());
});
th1.start();
sleep(2);
// 如果是共享的话,值为 100,但是输出为 1,说明值不共享,线程之间独立。
System.out.println("获取 main 线程中 tl 的值:"+tl.get());
}
}
/*
main 设置 tl 的值为 1
th1 设置 tl 的值为 100
获取 tl 线程中 tl 的值:100
获取 main 线程中 tl 的值:1
*/
常见方法
方法 | 说明 |
---|---|
potected T initiaValue() |
用于提供初始值的,默认实现是返回 null |
public void remove() |
删除当前线程对应的值 |
public T get() |
获取值 |
public void set(T value) |
设置值 |
使用场景
每个线程需要独占某个资源,例如随机数、上下文信息(数据库连接)。
DataFormat/SimpleDateFormat 日期类是非线程安全的。
static ThreadLocal<DateFormat> sdf = new ThreadLocal<>() {
@Override
protected DateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
};
使用
ThreadLocalRandom
Random 是线程安全的,但是如果并发竞争激烈,性能会下降,所以 Java 并发包提供了类 ThreadLocalRandom
,他是 Random 的子类,利用了 ThreadLocal
。无构造方法,只能通过静态方法 current 获取对象。
public class ThreadLocalRandomDemo {
public static void main(String[] args) {
ThreadLocalRandom current = ThreadLocalRandom.current();
System.out.println(current.nextInt());
}
}
// jdk 11
public static ThreadLocalRandom current() {
if (U.getInt(Thread.currentThread(), PROBE) == 0)
localInit();
return instance;
}
上下文信息
比如,每个线程的事务都是独立的,可以用 ThreadLocal
保存线程的 Connection,用于同一线程内数据库事务的提交与回滚。
注意事项
应用线程退出 ThreadLocal
之前,应调用 remove 方法释放存储的信息。虽然一般不会因为没 remove 造成 OOM
,但还是推荐 remove,可以减少 GC
的频率。(ThreadLocal
是将参数 T 保存在 ThreadLocalMap
中,此 Map 的 key 是 WeakReference
,该引用类型会在 GC
时进行自动回收)
基本原理
在学习 ThreadLocal 的原理之前,我们先尝试自行设计一个线程私有(独占)变量试试
- 如果线程只有一个私有的变量,那么最简单的方式就是在 Thread 类中添加一个变量,直接调用 set/get 方法进行访问。
- 如果线程可能有多个私有变量,上面的方式就行不通了。
- 我们需要考虑如何存储并区分多个变量,场景的做法是使用 map,通过 key-value 映射来进行区分。
- 我们可以在 Thread 中设置一个 Map,Map 中存储变量的标识符(key)和具体的值(value),通过 Map[key] 即可得到对应的值。
- 为了避免 key 重复,我们可以直接使用 new Object() 充当 key,这样,我们就可以通过类似于
Thread.currentThread().map.get(key)
的方式得到线程私有的变量了。
ThreadLocal 是如何实现每个线程都能有自己独立的值?
- 如何保证线程安全:每个线程都有自己的 ThreadLocalMap,线程的独立变量是存储在了 Thread 对象的 ThreadLocalMap 中。因为是单个线程自己拥有的 Map,因此是线程安全的,并且变量是以 ThreadLocal 对象为 key。
- ThreadLocal 又是如何获取到当前线程的变量呢?这里设计的比较优雅,可以直接通过 keyObj.get() 获取到当前线程的私有变量。ThreadLocal 通过 Thread.currentThread 获得当前 Thread 的 LocalMap,然后利用 key(ThreadLocal 对象)来获取对应的值。
graph LR
Thread---|持有|ThreadLocalMap
graph LR
ThreadLocal-->|访问|ThreadLocalMap
简而言之 Thread 持有 ThreadLocalMap, map 以 ThreadLocal 对象为 key,而访问线程私有变量的值则是通过这个 key(ThreadLocal)对象来访问的(对象.get(),API 用法简单)。
具体设计
Thread 线程内部维护的两个变量。
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
这两个变量开始的时候都为 null,只有当前线程第一次调用 ThreadLocal 的 set 或 get 方法时才会创建它们。ThreadLocal 的 set/get 方法其实都是操作的线程对象中的 map,只是把 Map 的定义放在了 ThreadLocal 中。
set 方法
public void set(T value) {
// (1) 获取当前线程
Thread t = Thread.currentThread();
// (2) 将当前线程作为 key,去查找对应的线程变量(ThreadLocalMap),找到则设置
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
// (3) 第一次调用就创建当前线程对应的 HashMap
createMap(t, value);
}
}
它调用了 getMap
,getMap
的代码为:
ThreadLocalMap getMap(Thread t) {
return t.threadLocals; // 返回线程的实例变量 threadLocals,
}
它的初始值为 null,在 null 时,set 调用 createMap
初始化,代码为:
void createMap(Thread t, T firstValue) {
// 创建线程 t 的专属 localMap,key 为当前的 ThreadLocal 对象,值为 value
// 其 key 类型为 WeakReference<ThreadLocal>.
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
get 方法
得到当前线程的 LocalMap,以 ThreadLocal
对象为 key 从 Map 中获取到 Entry,取其 value,如果 Map 中没有,则调用 setInitiaValue
;
public T get() {
// (4) 获取当前线程
Thread t = Thread.currentThread();
// (5) 获取当前线程的 thread.currentThread
ThreadLocalMap map = getMap(t);
// (6) 如果 threadLocals 不为 null,则返回对应本地变量的值
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// (7) threadLocals 为空则初始化当前线程的 threadLocals 成员变量
return setInitialValue();
}
setInitiaValue 方法
private T setInitialValue() {
// (8) 初始化为 null
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
// (9) 如果当前线程的 threadLocals 变量不为空
if (map != null)
map.set(this, value);
else
// (10) 如果当前线程的 threadLocals变量为空
createMap(t, value);
return value;
}
remove 方法
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null) {
m.remove(this);
}
}
总结
每个线程都有一个 Map,该 Map 以 ThreadLocal 对象为 key,调用 ThreadLocal 对象的 get/set 方法实际上是先获取当前线程的 Map,然后以 ThreadLocal 对象为 key 读写 Map,这样就实现了每个线程都有自己的独立数据。
ThreadLocalRandom
Random
Random 产生随机数的步骤
- 首先根据老的种子产生新的种子
- 在根据新的种子计算新的随机数
可能存在的问题
- 单线程下可以确保每次都是用老种子产生新种子,可以确保随机数的随机性。
- 在多线程环境下,可能多个线程拿到的都是同样的老种子,用同样的老种子计算新种子,产生的随机数也会一样。
代码 & 解决方式
random 使用老种子计算新种子,再用新种子得到随机数。使用 CAS 可以确保只有一个线程可以成功用 oldseed 更新新种子。
// Random#next 方法
protected int next(int bits) {
long oldseed, nextseed;
AtomicLong seed = this.seed;
do {
oldseed = seed.get(); // 获取老种子
nextseed = (oldseed * multiplier + addend) & mask; // 产生新种子
} while (!seed.compareAndSet(oldseed, nextseed)); // CAS 更新
return (int)(nextseed >>> (48 - bits));
}
CAS 操作会保证只有一个线程可以更新老的种子为新的,失败的线程会通过循环重新获取更新后的种子作为当前种子去计算老的种子,这就解决了上面提到的问题,保证随机数的随机性。
ThreadLocalRandom
Random 的缺点是多个线程会使用同一个原子性种子变量,从而导致对原子变量更新的竞争。如果让每一个线程复制一份变量,使得在每个线程对变量进行操作时实际是操作自己本地内存里面的副本,从而避免了对共享变量进行同步。

如果每个线程都维护一个种子变量,则每个线程生成随机数时都根据自己老的种子计算新的种子,并使用新种子更新老的种子,再根据新种子计算随机数,就不会存在竞争问题了。ThreadLocalRandom 正是这个原理。

源码分析

- ThreadLocalRandom 类继承了 Random 类并重写了 nextInt 方法
- ThreadLocalRandom 的种子用的是 Thread 里面的 threadLocalRandomSeed。
- 当线程调用 ThreadLocalRandom 的 current 方法时, ThreadLocalRandom 负责初始化调用线程的 threadLocalRandomSeed 变量,也就是初始化种子
- 调用 ThreadLocalRandom 的 nextInt 方法时,实际上是获取当前线程的 threadLocalRandomSeed 变量作为当前种子来计算新的种子,然后更新新的种子到当前线程的 threadLocalRandomSeed 变量,而后再根据新种子并使用具体算法计算随机数。
- threadLocalRandomSeed 变量就是 Thread 类里面的一个普通 long 变量,因为只是当个线程操作,所以不需要原子性变量。
BlockingQueue
- ArrayBlockingQueue:数组结构构成的有界阻塞队列
- LinkedBlockingQueue:链表结构构成的有/无界阻塞队列
- PriorityBlockingQueue:支持优先级的无界阻塞队列
- DealyQueue:优先级队列实现的无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列(可用来)
- LinkedTransferQueue:链表结构组成的无界阻塞队列
- LinkedBlockingDeque:链表结构组成的双向阻塞队列