Skip to the content.

高级工具

ConcurrentHashMap 原理和线程池还没看,缓一缓。

线程池

线程池的基本原理就是启动一些线程,线程执行完任务后,回收线程到池中等待下次的调用而非关闭线程。示意图如下:

graph LR
	subgraph  ThreadPool 
	t1
	t2
	t3
	end
	subgraph BlockingQueue
	task1-->task2-->task3
	end
	t1-->|pool|task1
	t2-.->|pool|task1
	t3-.->|pool|task1
	task3--put-->main

自定义阻塞队列

ReentrantLock 锁实现。可精准唤醒。ReentrantLock 的超时等待的返回值是剩余要等待的时间,解决了虚假唤醒的问题。

class BlockingQueue<T> {
    // 1.任务队列
    private Deque<T> queue = new ArrayDeque<>();

    // 2.锁
    private ReentrantLock lock = new ReentrantLock();

    // 3.生产者条件变量
    private Condition fullWaitSet = lock.newCondition();

    // 4.消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    // 5.容量
    private int capcity;

    public BlockingQueue() {}

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 带超时的阻塞获取
    public T poll(long timeout, TimeUnit unit) {
        try {
            // 将超时时间统一转换为纳秒。
            long nanos = unit.toNanos(timeout);
            lock.lock();
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    // 返回的是剩余的,需要的等待时间
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞获取,有元素才能获取
    public T take() {
        try {
            lock.lock();
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    public void put(T element) {
        try {
            lock.lock();
            while (queue.size() == capcity) {
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(element);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    public int size() {
        try {
            lock.lock();
            return queue.size();
        } finally {
            lock.unlock();
        }
    }
}

自定义线程池

使用自定义的阻塞队列,实现自定义线程池。线程的复用技巧在于,当一个任务执行完毕后,去查看阻塞队列中是否有任务,有则继续执行,没有则等待/释放(等待还是释放看你采用何种策略)

// 纯自定义线程池
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;

    // 线程集合,泛型定义为 Thread 保存的信息不够,自定义一个类型,保存更为丰富的信息。
    private HashSet<Worker> workers = new HashSet();

    // 核心线程数
    private int coreSize;

    // 工作一段时间后没有任务了,线程一直再运行也是浪费,设置一个超时时间,超过这个时间就销毁线程。
    private long timeout;

    private TimeUnit timeUnit;

    public void execute(Runnable task) {
        // 任务数没有超过 coreSize 就交给 worker 对象执行
        // 超过了就加入任务队列暂存。
        synchronized (workers) {
            if (workers.size() < coreSize) {
                log.debug("coreSize is ok");
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                log.debug("put Blocking Queue");
                taskQueue.put(task);
            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
    }

    class Worker extends Thread {
        private Runnable task;

        public Worker() {}

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 当 task 不为空,则执行
            // 当 task 执行完毕,则从任务队列获取任务并执行。这样就实现了线程的复用。
            // take 拿不到会一直死等。线程复用。
            while (task != null || (task = taskQueue.take()) != null) { // 如果使用 poll 就不会死等,可以执行到 sync 的移除操作。
                try {
                    log.debug("run task");
                    task.run();
                } catch (Exception e) {
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                // remove 执行不到。 需要用到超时策略,才会运行到 remove
                workers.remove(this);
            }
        }
    }
}

测试代码

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.TestPool")
/**
 * 线程池中如果没有线程,那就创建线程执行。
 * 如果有线程,但是数量不足,就放入阻塞队列
 */
public class TestPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 2, TimeUnit.SECONDS, 10);
        for (int i = 0; i < 10; i++) {
            threadPool.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

拒绝策略

为什么要拒绝策略

假定

package pool.base;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.TestPool")
/**
 * 线程池中如果没有线程,那就创建线程执行。
 * 如果有线程,但是数量不足,就放入阻塞队列
 */
public class TestPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 2, TimeUnit.SECONDS, 10);
        for (int i = 0; i < 20; i++) {
            final int j = i;
            threadPool.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}", j + ""); // 不加这个,输出会有问题,都是输出同一个对象的 hash 值,但是执行的任务是没有问题的。
            });
        }
    }
}

// 纯自定义线程池
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;

    // 线程集合,泛型定义为 Thread 保存的信息不够,自定义一个类型,保存更为丰富的信息。
    private HashSet<Worker> workers = new HashSet();

    // 核心线程数
    private int coreSize;

    // 工作一段时间后没有任务了,线程一直再运行也是浪费,设置一个超时时间,超过这个时间
    private long timeout;

    private TimeUnit timeUnit;

    public void execute(Runnable task) {
        // 任务数没有超过 coreSize 就交给 worker 对象执行
        // 超过了就加入任务队列暂存。
        synchronized (workers) {
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                log.debug("新增 worker{},{}", worker, task);
                workers.add(worker);
                worker.start();
            } else {
                taskQueue.put(task);
            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
    }

    class Worker extends Thread {
        private Runnable task;

        public Worker() {}

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 当 task 不为空,则执行
            // 当 task 执行完毕,则从任务队列获取任务并执行。这样就实现了线程的复用。
            // take 拿不到会一直死等。线程复用。
            while (task != null || (task = taskQueue.take()) != null) { // 大哥李风格
                try {
                    log.debug("正在执行{}", task);
                    task.run();
                } catch (Exception e) {
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                log.debug("worker 被移除{}", this);
                // remove 执行不到。 需要用到超时策略,才会运行到 remove
                workers.remove(this);
            }
        }
    }
}

@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
    // 1.任务队列
    private Deque<T> queue = new ArrayDeque<>();

    // 2.锁
    private ReentrantLock lock = new ReentrantLock();

    // 3.生产者条件变量
    private Condition fullWaitSet = lock.newCondition();

    // 4.消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    // 5.容量
    private int capcity;

    public BlockingQueue() {}

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 带超时的阻塞获取
    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            // 将超时时间统一转换为纳秒。
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    // 返回的是剩余的,需要的等待时间
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞获取,有元素才能获取
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capcity) {
                try {
                    log.debug("等待加入任务队列{}....", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            log.debug("加入任务队列{}", task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    public int size() {
        try {
            lock.lock();
            return queue.size();
        } finally {
            lock.unlock();
        }
    }
}

目前是 log.debug("等待加入任务队列{}....", task); fullWaitSet.await(); 阻塞队列容量满了,阻塞队列就阻塞住自己,等有空余了,再把这些任务加进去。

11:48:19.780 c.ThreadPool [main] - 新增 workerThread[Thread-0,5,main],pool.base.TestPool$$Lambda$1/1879034789@6833ce2c
11:48:19.784 c.ThreadPool [main] - 新增 workerThread[Thread-1,5,main],pool.base.TestPool$$Lambda$1/1879034789@d7b1517
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@23223dd8
11:48:19.784 c.ThreadPool [Thread-0] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@6833ce2c
11:48:19.784 c.ThreadPool [Thread-1] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@d7b1517
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@4ec6a292
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@1b40d5f0
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@ea4a92b
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@3c5a99da
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@47f37ef1
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@5a01ccaa
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@71c7db30
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@19bb089b
11:48:19.784 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@4563e9ab
11:48:19.784 c.BlockingQueue [main] - 等待加入任务队列pool.base.TestPool$$Lambda$1/1879034789@11531931....
11:48:24.793 c.TestPool [Thread-1] - 1
11:48:24.793 c.TestPool [Thread-0] - 0
11:48:24.793 c.ThreadPool [Thread-0] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@4ec6a292
11:48:24.793 c.ThreadPool [Thread-1] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@23223dd8
11:48:24.793 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@11531931
11:48:24.793 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@1fbc7afb
11:48:24.793 c.BlockingQueue [main] - 等待加入任务队列pool.base.TestPool$$Lambda$1/1879034789@45c8e616....
11:48:29.805 c.TestPool [Thread-1] - 2
11:48:29.805 c.TestPool [Thread-0] - 3
11:48:29.805 c.ThreadPool [Thread-0] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@ea4a92b
11:48:29.805 c.ThreadPool [Thread-1] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@1b40d5f0
11:48:29.805 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@45c8e616
11:48:29.805 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@4cdbe50f
11:48:29.805 c.BlockingQueue [main] - 等待加入任务队列pool.base.TestPool$$Lambda$1/1879034789@66d33a....
11:48:34.815 c.TestPool [Thread-1] - 4
11:48:34.815 c.TestPool [Thread-0] - 5
11:48:34.815 c.ThreadPool [Thread-1] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@3c5a99da
11:48:34.815 c.ThreadPool [Thread-0] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@47f37ef1
11:48:34.815 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@66d33a
11:48:34.815 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@7cf10a6f
11:48:34.815 c.BlockingQueue [main] - 等待加入任务队列pool.base.TestPool$$Lambda$1/1879034789@7e0babb1....
11:48:39.826 c.TestPool [Thread-1] - 6
11:48:39.826 c.TestPool [Thread-0] - 7
11:48:39.826 c.ThreadPool [Thread-1] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@5a01ccaa
11:48:39.826 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@7e0babb1
11:48:39.826 c.BlockingQueue [main] - 等待加入任务队列pool.base.TestPool$$Lambda$1/1879034789@6debcae2....
11:48:39.826 c.ThreadPool [Thread-0] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@71c7db30
11:48:39.826 c.BlockingQueue [main] - 加入任务队列pool.base.TestPool$$Lambda$1/1879034789@6debcae2
11:48:44.835 c.TestPool [Thread-0] - 9
11:48:44.835 c.TestPool [Thread-1] - 8
11:48:44.835 c.ThreadPool [Thread-0] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@19bb089b
11:48:44.835 c.ThreadPool [Thread-1] - 正在执行pool.base.TestPool$$Lambda$1/1879034789@4563e9ab

不同场景所要用到的策略有所差别,不能总是一味的阻塞队列等待,有空位后将任务加入阻塞队列。应该是有多种策略可供选择。

带拒绝策略的代码

package pool;

import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.TestPool")
/**
 * 线程池中如果没有线程,那就创建线程执行。
 * 如果有线程,但是数量不足,就放入阻塞队列
 */
public class TestPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2,
                1000, TimeUnit.MILLISECONDS, 5, (queue, task) -> {
            // 1. 死等
//            queue.put(task);
            // 2) 带超时等待
            queue.offer(task, 200, TimeUnit.MILLISECONDS);
            // 3) 让调用者放弃任务执行
//            log.debug("放弃{}", task);
            // 4) 让调用者抛出异常
//            throw new RuntimeException("任务执行失败 " + task);
            // 5) 让调用者自己执行任务
//            task.run();
        });
        ConcurrentHashMap<Integer, Integer> list = new ConcurrentHashMap<>();
        for (int i = 0; i < 14; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                    for (int k = 0; k < 10; k++) {
                        list.put(j, j);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
//                log.debug("{}", j); // 不加这个,会有问题,为什么会这样,不知道。
            }, j + "");
        }
        while (Thread.activeCount() > 2) {
        }
        
        Enumeration<Integer> keys = list.keys();
        while (keys.hasMoreElements()) {
            System.out.print(keys.nextElement() + "\n");
        }

    }
}

@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}

// 纯自定义线程池
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;

    // 线程集合,泛型定义为 Thread 保存的信息不够,自定义一个类型,保存更为丰富的信息。
    private HashSet<Worker> workers = new HashSet();

    // 核心线程数
    private int coreSize;

    // 工作一段时间后没有任务了,线程一直再运行也是浪费,设置一个超时时间,超过这个时间
    private long timeout;

    private TimeUnit timeUnit;

    private RejectPolicy<Runnable> rejectPolicy;

    public void execute(Runnable task, String name) {
        // 任务数没有超过 coreSize 就交给 worker 对象执行
        // 超过了就加入任务队列暂存。
        synchronized (workers) {
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task, name);
                log.debug("新增 worker{},{}", worker, task);
                workers.add(worker);
                worker.start();
            } else {
                taskQueue.tryPut(rejectPolicy, task);
                // 1. 死等
                // 2. 带超时等待
                // 3. 让调用者放弃任务执行
                // 4. 让调用者抛出异常
                // 5. 让调用者自己执行任务
                // 用策略模式
            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }

    class Worker extends Thread {
        private Runnable task;

        @Override
        public String toString() {
            return "Worker{" +
                    "name=" + this.getName() +
                    '}';
        }

        public Worker() {}

        public Worker(Runnable task, String name) {
            this.task = task;
            this.setName(name);
        }

        @Override
        public void run() {
            // 执行任务
            // 当 task 不为空,则执行
            // 当 task 执行完毕,则从任务队列获取任务并执行。这样就实现了线程的复用。
            // take 拿不到会一直死等。线程复用。
            while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) { // 大哥李风格
                try {
                    log.debug("正在执行{}", task);
                    task.run();
                    log.debug("执行完毕{}", task);
                } catch (Exception e) {
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                log.debug("worker 被移除{}", this);
                // remove 执行不到。 需要用到超时策略,才会运行到 remove
                workers.remove(this);
            }
        }
    }
}

@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
    // 1.任务队列
    private Deque<T> queue = new ArrayDeque<>();

    // 2.锁
    private ReentrantLock lock = new ReentrantLock();

    // 3.生产者条件变量
    private Condition fullWaitSet = lock.newCondition();

    // 4.消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    // 5.容量
    private int capcity;

    public BlockingQueue() {}

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 带超时的阻塞获取
    public T poll(long timeout, TimeUnit unit) {
        try {
            // 将超时时间统一转换为纳秒。
            lock.lock();
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    // 返回的是剩余的,需要的等待时间
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞获取,有元素才能获取
    public T take() {
        try {
            lock.lock();
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }
    public void put(T task) {
        try {
            lock.lock();
            while (queue.size() == capcity) {
                try {
                    log.debug("等待加入任务队列{}....", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列{}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间的阻塞添加方法
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        try {
            lock.lock();
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capcity) {
                try {
                    if (nanos <= 0) {
                        return false;
                    }
                    log.debug("等待加入任务队列{}....", task);
                    // 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 就等待 Nanos 秒,没等到也不阻塞,直接跑路
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列{}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

    public int size() {
        try {
            lock.lock();
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        try {
            lock.lock();
            // 队列满
            if (queue.size() == capcity) {
                // 拒绝策略
                rejectPolicy.reject(this, task);
            } else {
                log.debug("加入任务队列{}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }
}

输出结果如下:

11:44:23.811 c.ThreadPool [main] - 新增 workerWorker{name=0},pool.TestPool$$Lambda$2/984849465@1888ff2c
11:44:23.814 c.ThreadPool [main] - 新增 workerWorker{name=1},pool.TestPool$$Lambda$2/984849465@1b40d5f0
11:44:23.814 c.ThreadPool [0] - 正在执行pool.TestPool$$Lambda$2/984849465@1888ff2c
11:44:23.814 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@3c5a99da
11:44:23.814 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@47f37ef1
11:44:23.814 c.ThreadPool [1] - 正在执行pool.TestPool$$Lambda$2/984849465@1b40d5f0
11:44:23.814 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@5a01ccaa
11:44:23.814 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@71c7db30
11:44:23.814 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@19bb089b
11:44:23.814 c.BlockingQueue [main] - 等待加入任务队列pool.TestPool$$Lambda$2/984849465@4563e9ab....
11:44:24.018 c.BlockingQueue [main] - 等待加入任务队列pool.TestPool$$Lambda$2/984849465@5e025e70....
11:44:24.223 c.BlockingQueue [main] - 等待加入任务队列pool.TestPool$$Lambda$2/984849465@1fbc7afb....
11:44:24.430 c.BlockingQueue [main] - 等待加入任务队列pool.TestPool$$Lambda$2/984849465@45c8e616....
11:44:24.633 c.BlockingQueue [main] - 等待加入任务队列pool.TestPool$$Lambda$2/984849465@4cdbe50f....
11:44:24.825 c.ThreadPool [0] - 执行完毕pool.TestPool$$Lambda$2/984849465@1888ff2c
11:44:24.825 c.ThreadPool [1] - 执行完毕pool.TestPool$$Lambda$2/984849465@1b40d5f0
11:44:24.825 c.ThreadPool [0] - 正在执行pool.TestPool$$Lambda$2/984849465@3c5a99da
11:44:24.825 c.ThreadPool [1] - 正在执行pool.TestPool$$Lambda$2/984849465@47f37ef1
11:44:24.825 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@4cdbe50f
11:44:24.825 c.BlockingQueue [main] - 加入任务队列pool.TestPool$$Lambda$2/984849465@66d33a
11:44:24.825 c.BlockingQueue [main] - 等待加入任务队列pool.TestPool$$Lambda$2/984849465@7cf10a6f....
11:44:25.840 c.ThreadPool [1] - 执行完毕pool.TestPool$$Lambda$2/984849465@47f37ef1
11:44:25.840 c.ThreadPool [0] - 执行完毕pool.TestPool$$Lambda$2/984849465@3c5a99da
11:44:25.840 c.ThreadPool [1] - 正在执行pool.TestPool$$Lambda$2/984849465@5a01ccaa
11:44:25.840 c.ThreadPool [0] - 正在执行pool.TestPool$$Lambda$2/984849465@71c7db30
11:44:26.854 c.ThreadPool [1] - 执行完毕pool.TestPool$$Lambda$2/984849465@5a01ccaa
11:44:26.854 c.ThreadPool [0] - 执行完毕pool.TestPool$$Lambda$2/984849465@71c7db30
11:44:26.854 c.ThreadPool [1] - 正在执行pool.TestPool$$Lambda$2/984849465@19bb089b
11:44:26.854 c.ThreadPool [0] - 正在执行pool.TestPool$$Lambda$2/984849465@4cdbe50f
11:44:27.868 c.ThreadPool [1] - 执行完毕pool.TestPool$$Lambda$2/984849465@19bb089b
11:44:27.868 c.ThreadPool [0] - 执行完毕pool.TestPool$$Lambda$2/984849465@4cdbe50f
11:44:27.868 c.ThreadPool [1] - 正在执行pool.TestPool$$Lambda$2/984849465@66d33a
11:44:28.869 c.ThreadPool [0] - worker 被移除Worker{name=0}
11:44:28.869 c.ThreadPool [1] - 执行完毕pool.TestPool$$Lambda$2/984849465@66d33a
11:44:29.872 c.ThreadPool [1] - worker 被移除Worker{name=1}
0	1	2	3	4	5	6	11	12	
Process finished with exit code 0

ThreadPoolExecutor

线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

状态名 高3位 接收新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y  
SHUTDOWN 000 N Y 不会接收新任务,但会处理阻塞队列剩余任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING 010 - - 任务全部执行完毕,活动线程为 0 即将进入终结
TERMINATED 011 - - 终结状态

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作 进行赋值

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }

构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
graph LR
subgraph 阻塞队列
size=2
end
subgraph 线程池 c=2,m=3
ct1(核心线程1)
ct2(核心线程2)
ct3(救济线程3)
end
t1(任务1)
graph LR
subgraph 阻塞队列
t3(任务3)
t4(任务4)
size=2
end
subgraph 线程池 c=2,m=3
ct1(核心线程1) --> t1(任务1)
ct2(核心线程2) --> t2(任务2)
ct3(救急线程3)
end

救急线程执行完后会被销毁掉(有生存时间),核心线程会被保留到线程池种,一直被运行。

工作方式

根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

特点:

适用于任务量已知,相对耗时的任务

@Slf4j(topic = "c.TestNewCachedThreadPool")
public class TestNewCachedThreadPool {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(() -> {
            log.debug("1");
        });
        executorService.execute(() -> {
            log.debug("1");
        });
        executorService.execute(() -> {
            log.debug("1");
        });
    }
}

捋一捋这些方法

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
// 用的 defaultThreadFactory 创建的线程
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
/// 看下建线程工程如何创建线程
public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();
}

// 线程工程
DefaultThreadFactory() {
    SecurityManager s = System.getSecurityManager();
    group = (s != null) ? s.getThreadGroup() :
    Thread.currentThread().getThreadGroup();
    namePrefix = "pool-" + // 线程的命名
        poolNumber.getAndIncrement() +
        "-thread-";
}

public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);
    if (t.isDaemon()) 
        t.setDaemon(false); // 线程工厂创建的线程是非守护线程!不会因为 main 线程的退出而终止!
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}

自定义线程工厂

自定义线程工厂的作用就是可以自己定义线程的名字。

public static void test() {
    Executors.newFixedThreadPool(2, new ThreadFactory() {
        private AtomicInteger atomic = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "my_thread" + atomic.getAndIncrement());
        }
    });
}

newCachedThreadPool

public static void test1() {
    ExecutorService executorService = Executors.newCachedThreadPool();
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 都是救急线程
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

特点

public static void testQueue() throws InterruptedException {
    SynchronousQueue<Integer> queue = new SynchronousQueue<>();
    new Thread(() -> {
        try {
            log.debug("put before 1");
            queue.put(1);
            log.debug("put after 1");

            log.debug("put before 2");
            queue.put(2);
            log.debug("put after 2");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    TimeUnit.SECONDS.sleep(1);
    new Thread(() -> {
        try {
            log.debug("take 1");
            queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    new Thread(() -> {
        try {
            log.debug("take 2");
            queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}
11:01:02.306 c.TestNewCachedThreadPool [Thread-0] - put before 1
11:01:03.306 c.TestNewCachedThreadPool [Thread-1] - take 1
11:01:03.306 c.TestNewCachedThreadPool [Thread-0] - put after 1
11:01:03.306 c.TestNewCachedThreadPool [Thread-0] - put before 2
11:01:03.306 c.TestNewCachedThreadPool [Thread-2] - take 2
11:01:03.306 c.TestNewCachedThreadPool [Thread-0] - put after 2

适用场景

整个线程池表现为会根据任务量不断增长,没有上限,当任务执行完毕,空闲 60s 后释放线程。适合任务数比较密集,但每个任务执行时间较短的情况。【不适合任务执行时间长的,如果执行时间长,不断累加线程,就会爆内存。】

newSingleThreadExecutor

public static void testNewSingleThreadExecutor(){
    ExecutorService executorService = Executors.newSingleThreadExecutor();
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1, // 核心线程数和最大线程数都是1
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

适用场景

希望多个任务队列执行,线程数固定为 1,任务数多于 1 时,会放入无界队列排队,当任务执行完毕,这唯一的线程也不会被释放

自己创建一个线程来执行和单线程的线程池之间的区别

public static void testNewSingleThreadExecutor() {
    ExecutorService threadPool = Executors.newSingleThreadExecutor();
    threadPool.execute(() -> {
        log.debug("1");
        int i = 1 / 0;
    });
    threadPool.execute(() -> {
        log.debug("1");
    });
    threadPool.execute(() -> {
        log.debug("1");
    });
    threadPool.execute(() -> {
        log.debug("1");
    });
}

提交任务

// 执行任务
void execute(Runnable command);

// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);

// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 throws InterruptedException;

// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
 long timeout, TimeUnit unit)
 throws InterruptedException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
 throws InterruptedException, ExecutionException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
 long timeout, TimeUnit unit)
 throws InterruptedException, ExecutionException, TimeoutException;

测试 Submit – 用返回值 Future 获得任务执行结果

public static void testSubmit() throws ExecutionException, InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(2);
    Future<String> submit = threadPool.submit(() -> {
        log.debug("OKK");
        TimeUnit.SECONDS.sleep(2);
        return "1";
    });

    // main 线程阻塞等待结果
    log.debug("{}", submit.get());
}

测试 invokeAll – 提交 tasks 中所有任务

public static void invokeAll() throws InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(2);
    threadPool.invokeAll(Arrays.asList(
        () -> {
            log.debug("1");
            TimeUnit.SECONDS.sleep(1);
            return 1;
        },
        () -> {
            log.debug("2");
            TimeUnit.SECONDS.sleep(2);
            return 1;
        },
        () -> {
            log.debug("3");
            TimeUnit.SECONDS.sleep(3);
            return 1;
        }
    ));
    threadPool.shutdown();
}
/*
11:34:55.879 c.TestSubmit [pool-1-thread-2] - 2
11:34:55.879 c.TestSubmit [pool-1-thread-1] - 1
11:34:56.881 c.TestSubmit [pool-1-thread-1] - 3
*/

测试 invokeAny – 提交 tasks 中所有任务,返回最先执行完的,然后取消其他任务。

public static void invokeAny() throws ExecutionException, InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(2);
    threadPool.invokeAny(Arrays.asList(
        () -> {
            log.debug("1 start");
            TimeUnit.SECONDS.sleep(1);
            log.debug("1 end");
            return 1;
        },
        () -> {
            log.debug("2 start");
            TimeUnit.SECONDS.sleep(2);
            log.debug("2 end");
            return 2;
        },
        () -> {
            log.debug("3 start");
            TimeUnit.SECONDS.sleep(3);
            log.debug("3 end");
            return 3;
        }
    ));
    threadPool.shutdown();
}
/*
11:31:44.869 c.TestSubmit [pool-1-thread-2] - 2 start
11:31:44.869 c.TestSubmit [pool-1-thread-1] - 1 start
11:31:45.885 c.TestSubmit [pool-1-thread-1] - 1 end
11:31:45.885 c.TestSubmit [pool-1-thread-1] - 3 start
*/

终止线程

shutdown – 调用后不会阻塞调用 shutdown 的线程。

/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();

源码

public void shutdown() {
     final ReentrantLock mainLock = this.mainLock;
     mainLock.lock();
     try {
         checkShutdownAccess();
         // 修改线程池状态
         advanceRunState(SHUTDOWN);
         // 仅会打断空闲线程
         interruptIdleWorkers();
         onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
     } finally {
     	mainLock.unlock();
     }
     // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
     tryTerminate();
}

示例代码

public static void shutdown() {
    ExecutorService pool = Executors.newFixedThreadPool(1);
    pool.execute(() -> {
        log.debug("running 1");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("over 1");
    });
    pool.execute(() -> {
        log.debug("running 2");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("over 2");
    });
    pool.execute(() -> {
        log.debug("running 3");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("over 3");
    });
    log.debug("before shutdown");
    pool.shutdown(); // 执行这个方法并不会阻塞线程
    pool.awaitTermination(40,TimeUnit.SECONDS); // 执行这个方法会阻塞住线程40秒。40秒后就不在阻塞。如果线程提取执行完毕了,则阻塞立马结束。
    log.debug("executors shutdown and wait thread over!");
}
/*
11:51:11.735 c.TestShutDown [main] - before shutdown
11:51:11.735 c.TestShutDown [pool-1-thread-1] - running 1
11:51:11.736 c.TestShutDown [main] - executors shutdown and wait thread over!
11:51:12.751 c.TestShutDown [pool-1-thread-1] - over 1
11:51:12.751 c.TestShutDown [pool-1-thread-1] - running 2
11:51:13.766 c.TestShutDown [pool-1-thread-1] - over 2
11:51:13.766 c.TestShutDown [pool-1-thread-1] - running 3
11:51:14.775 c.TestShutDown [pool-1-thread-1] - over 3
*/

shutdownNow

/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
public List<Runnable> shutdownNow(){
     List<Runnable> tasks;
     final ReentrantLock mainLock = this.mainLock;
     mainLock.lock();
     try {
         checkShutdownAccess();
         // 修改线程池状态
         advanceRunState(STOP);
         // 打断所有线程
         interruptWorkers();
         // 获取队列中剩余任务
         tasks = drainQueue();
     } finally {
     	mainLock.unlock();
     }
     // 尝试终结
     tryTerminate();
     return tasks;
}

示例代码

public static void shutdownNow() throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        pool.execute(() -> {
            log.debug("running 1");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("over 1");
        });
        pool.execute(() -> {
            log.debug("running 2");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("over 2");
        });
        pool.execute(() -> {
            log.debug("running 3");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("over 3");
        });
        log.debug("before shutdown");
        TimeUnit.MILLISECONDS.sleep(2000);
        List<Runnable> remain = pool.shutdownNow();
        System.out.println(remain.size());
        log.debug("executors shutdown and wait thread over!");
}
/*
11:57:47.429 c.TestShutDown [main] - before shutdown
11:57:47.429 c.TestShutDown [pool-1-thread-1] - running 1
11:57:48.445 c.TestShutDown [pool-1-thread-1] - over 1
11:57:48.445 c.TestShutDown [pool-1-thread-1] - running 2
1 // 未执行完的线程数
11:57:49.443 c.TestShutDown [main] - executors shutdown and wait thread over!
11:57:49.444 c.TestShutDown [pool-1-thread-1] - over 2
*/

其他方法

// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

工作线程模式

定义

让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。

PS: 注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。

例如,如果一个餐馆的工人既要招呼客人(任务类型 A),又要到后厨做菜(任务类型 B)显然效率不咋地,分成服务员(线程池 A)与厨师(线程池 B)更为合理,当然你能想到更细致的分工

饥饿

固定大小线程池会有饥饿现象

假设有两个线程:工人 A 和工人 B 。他们都可以做菜+点餐。这时候,同时来了两个客人,工人 A 和工人 B 都去处理点餐了,这时没人做饭了,产生了饥饿(线程池中的线程不足导致的)

一人做菜,一人点餐,合理运行,无饥饿

@Slf4j(topic = "c.TestDeadLock")
public class TestDeadLock {
    static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    static Random RANDOM = new Random();

    static String cooking() {
        return MENU.get(RANDOM.nextInt(MENU.size()));
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = executorService.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}
/*
12:27:22.069 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
12:27:22.072 c.TestDeadLock [pool-1-thread-2] - 做菜
12:27:22.072 c.TestDeadLock [pool-1-thread-1] - 上菜: 地三鲜
*/

都可做菜,点餐,饥饿

@Slf4j(topic = "c.TestDeadLock")
public class TestDeadLock {
    static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    static Random RANDOM = new Random();

    static String cooking() {
        return MENU.get(RANDOM.nextInt(MENU.size()));
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = executorService.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        executorService.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = executorService.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}
/*
12:28:38.740 c.TestDeadLock [pool-1-thread-2] - 处理点餐...
12:28:38.740 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
*/

我们 jsp 查看TestDeadLock 线程 id,然后 jstack 查看线程的状态信息,发现是饥饿(都在 Waiting,如果是死锁会显示 DeadLock!),不是死锁。

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.301-b09 mixed mode):

"DestroyJavaVM" #14 prio=5 os_prio=0 tid=0x000002054590d800 nid=0x1c70 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"pool-1-thread-2" #13 prio=5 os_prio=0 tid=0x0000020562b79800 nid=0x28e8 waiting on condition [0x000000c9659ff000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000076c7dc858> (a java.util.concurrent.FutureTask)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
        at java.util.concurrent.FutureTask.get(FutureTask.java:191)
        at pool.TestDeadLock.lambda$main$3(TestDeadLock.java:43)
        at pool.TestDeadLock$$Lambda$2/984849465.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"pool-1-thread-1" #12 prio=5 os_prio=0 tid=0x0000020562b76000 nid=0x3c30 waiting on condition [0x000000c9658fe000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000076c584f80> (a java.util.concurrent.FutureTask)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
        at java.util.concurrent.FutureTask.get(FutureTask.java:191)
        at pool.TestDeadLock.lambda$main$1(TestDeadLock.java:31)
        at pool.TestDeadLock$$Lambda$1/875827115.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

解决饥饿

合理分配任务

@Slf4j(topic = "c.TestStarvation")
public class TestStarvation {
    static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    static Random RANDOM = new Random();

    static String cooking() {
        return MENU.get(RANDOM.nextInt(MENU.size()));
    }

    public static void main(String[] args) {
        ExecutorService waiterPool = Executors.newFixedThreadPool(1);
        ExecutorService cookiePool = Executors.newFixedThreadPool(1);
        waiterPool.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = cookiePool.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        waiterPool.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = cookiePool.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}
/*
12:36:06.902 c.TestStarvation [pool-1-thread-1] - 处理点餐...
12:36:06.905 c.TestStarvation [pool-2-thread-1] - 做菜
12:36:06.905 c.TestStarvation [pool-1-thread-1] - 上菜: 地三鲜
12:36:06.906 c.TestStarvation [pool-1-thread-1] - 处理点餐...
12:36:06.907 c.TestStarvation [pool-2-thread-1] - 做菜
12:36:06.907 c.TestStarvation [pool-1-thread-1] - 上菜: 烤鸡翅
*/

创建多少线程池合适

CPU 密集型运算

通常采用 cpu 核数 +1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费

I/O 密集型运算

CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程 RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。 经验公式如下 \(线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间\) 例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式 \(4 * 100\% * (100\% / 50\%) = 8\) 例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式 \(4 * 100\% * (100\% / 10\%) = 40\)

任务调度线程池

Timer

在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

@Slf4j(topic = "c.TestTimer")
public class TestTimer {
    public static void main(String[] args) {
        Timer timer = new Timer();
        TimerTask task1 = new TimerTask() {
            @SneakyThrows
            @Override
            public void run() {
                log.debug("task 1");
                TimeUnit.SECONDS.sleep(2);
            }
        };
        TimerTask task2 = new TimerTask() {
            @Override
            public void run() {
                log.debug("task 2");
            }
        };
        // 使用 timer 添加两个任务,希望它们都在 1s 后执行
        // 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
        timer.schedule(task1, 1000);
        timer.schedule(task2, 1000);
    }
}
/*
12:47:54.594 c.TestTimer [Timer-0] - task 1
12:47:56.604 c.TestTimer [Timer-0] - task 2
*/

ScheduledThreadPool

基本使用

@Slf4j(topic = "c.TestScheduledThreadPool")
public class TestScheduledThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService pools = Executors.newScheduledThreadPool(2);
        pools.execute(() -> {
            log.debug("Thread1");
            try {
				int i = 1 / 0; // 出现异常时不会影响其他任务的。即便核心线程数改为 1 也是这样。
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("End Thread2");
        });

        pools.execute(() -> {
            log.debug("Thread2");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("End Thread2");
        });
    }
}
/*
10:10:00.528 c.TestScheduledThreadPool [pool-1-thread-1] - Thread1
10:10:00.528 c.TestScheduledThreadPool [pool-1-thread-2] - Thread2
10:10:02.537 c.TestScheduledThreadPool [pool-1-thread-1] - End Thread2
10:10:02.537 c.TestScheduledThreadPool [pool-1-thread-2] - End Thread2
*/

延迟执行

pools.schedule(()->{
    System.out.println(12);
},2,TimeUnit.SECONDS); // 延迟 2 s 后执行

每隔 x 秒,重复执行。

如果程序的执行时间 > 间隔时间,那么就是每隔程序的执行时间秒,执行一次任务(执行的周期取执行时间和间隔时间的最大值)

@Slf4j(topic = "c.TestScheduledThreadPool")
public class TestScheduledThreadPool {

    public static void scheduleWithFixRate() {
        ScheduledExecutorService pools = Executors.newScheduledThreadPool(1);
        log.debug("start~");
        //  线程运行后 1s 开始执行任务,定时执行。每隔 2s 执行一次,间隔时间从当前线程结束的时间点算起
        pools.scheduleWithFixedDelay(() -> {
            log.debug("Thread1");
        }, 1, 1, TimeUnit.SECONDS);
        pools.scheduleWithFixedDelay(() -> {
            log.debug("Thread2");
        }, 1, 1, TimeUnit.SECONDS);
    }

    public static void scheduleAtFixRate() {
        ScheduledExecutorService pools = Executors.newScheduledThreadPool(1);
        log.debug("start~");
        pools.scheduleAtFixedRate(() -> {
            log.debug("Thread1");
        }, 1, 2, TimeUnit.SECONDS);
        //  线程运行后 1s 开始执行任务,定时执行。每隔 2s 执行一次,间隔时间从当前线程开始运行的时间点算起。如果程序的运行时间超过了间隔时间
        // 那么下一次执行会立马开始(执行的周期取执行时间和间隔时间的最大值。)
        pools.scheduleAtFixedRate(() -> {
            log.debug("Thread2");
        }, 1, 2, TimeUnit.SECONDS);
    }

    public static void schedule() {
        ScheduledExecutorService pools = Executors.newScheduledThreadPool(1);
        pools.schedule(() -> {
            log.debug("thread1");
        }, 1, TimeUnit.SECONDS);     // 线程运行后 1s 开始执行任务,只执行一次

        pools.schedule(() -> {
            log.debug("thread2");
        }, 1, TimeUnit.SECONDS);
    }

    public static void main(String[] args) {
        ScheduledExecutorService pools = Executors.newScheduledThreadPool(2);
        pools.schedule(() -> {
            System.out.println(12);
        }, 2, TimeUnit.SECONDS);
        pools.execute(() -> {
            log.debug("Thread1");
            try {
                TimeUnit.SECONDS.sleep(2);
                int i = 1 / 0;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("End Thread2");
        });

        pools.execute(() -> {
            log.debug("Thread2");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("End Thread2");
        });
    }
}

正确处理异常

线程池的应用

定时执行任务;顺带学习下 LocalDate

import java.time.DayOfWeek;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class PoolApplication {
    public static void main(String[] args) {
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime start = now.withNano(0);
        LocalDateTime end = start.withHour(17).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.FRIDAY);
        if (end.isBefore(start)) {
             end = end.plusWeeks(1);
        }

        Duration between = Duration.between(start, end);
        System.out.println(between);
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);

        pool.scheduleAtFixedRate(() -> {
            System.out.println("周五了!放假了");
        }, 0, between.getSeconds(), TimeUnit.SECONDS);
    }
}

Tomcat 线程池

graph LR
subgraph Connector["Connector(NIO EndPoint)"]
LimitLatch-->Acceptor-->s1[SocketChannel 1]
Acceptor-->s2[SocketChannel 2]
s1-->|有读|Poller
s2-->|有读|Poller
end
subgraph Executor
worker1
worker2
end
Poller-->|sockerProcessor|worker1
Poller-->|sockerProcessor|worker2

Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同

tomcat-7.0.42 源码

public void execute(Runnable command, long timeout, TimeUnit unit) {
    submittedCount.incrementAndGet();
    try {
        super.execute(command);
    } catch (RejectedExecutionException rx) {
        if (super.getQueue() instanceof TaskQueue) {
            final TaskQueue queue = (TaskQueue)super.getQueue();
            try {
                if (!queue.force(command, timeout, unit)) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.");
                }
            } catch (InterruptedException x) {
                submittedCount.decrementAndGet();
                Thread.interrupted();
                throw new RejectedExecutionException(x);
            }
        } else {
            submittedCount.decrementAndGet();
            throw rx;
        }
    }
}

TaskQueue.java

public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
    if ( parent.isShutdown() )
        throw new RejectedExecutionException(
        "Executor not running, can't force a command into the queue"
    );
    return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}

Connector 配置

配置项 默认值 说明
acceptorThreadCount 1 acceptor 线程数量
pollerThreadCount 1 poller 线程数量
minSpareThreads 10 核心线程数,即 corePoolSize maxThreads
executor - Executor 名称,用来引用下面的 Executor

Executor 线程配置

配置项 默认值 说明
threadPriority 5 线程优先级
daemon true 是否守护线程
minSpareThreads 25 核心线程数,即 corePoolSize
maxThreads 200 最大线程数,即 maximumPoolSize
maxIdleTime 60000 线程生存时间,单位是毫秒,默认值即 1 分钟
maxQueueSize Integer.MAX_VALUE 队列长度 , Java 队列满时才会创建救急线程,但是 tomcat做了修改
prestartminSpareThreads false 核心线程是否在服务器启动时启动(默认是懒惰初始化)

放队列里是走流程,需要的再从队列中拿。

graph LR
add(添加新任务)-->submit[提交任务 < 核心线程]
submit-->|是|加入队列
submit-->|否|submit2[提交任务 < 最大线程]
submit2-->|否|加入队列
submit2-->|是|创建救急线程

Fork/Join

概念

Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 CPU 密集型运算

所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计 算,如归并排序、斐波那契数列、都可以用分治思想进行求解

Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。Fork/Join 默认会创建与 CPU 核心数大小相同的线程池

使用

提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下面定义了一个对 1~n 之间的整数求和的任务

ThreadLocal

ThreadLocal 人手一支笔,每个线程都独有一份自己的变量。

本节内容待优化

基础用法

public class ThreadLocalDemo {
    public static void main(String[] args) {
        ThreadLocal<String> local = new ThreadLocal<>();
        System.out.println(local.get());
        local.set("liu");
        System.out.println(local.get());
    }
}
/*
null
liu
*/

多线程中使用 ThreadLocal

import java.util.concurrent.TimeUnit;

public class ThreadLocalDemo {
    private static void sleep(int second){
        try {
            TimeUnit.SECONDS.sleep(second);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    public static void main(String[] args) {
        ThreadLocal<Integer> tl = new ThreadLocal<>();
        System.out.println("main 设置 tl 的值为 1");
        tl.set(1);

        Thread th1 = new Thread(() -> {
            System.out.println("th1 设置 tl 的值为 100");
            tl.set(100);
            System.out.println("获取 tl 线程中 tl 的值:" + tl.get());
        });
        th1.start();
        sleep(2);
        // 如果是共享的话,值为 100,但是输出为 1,说明值不共享,线程之间独立。
        System.out.println("获取 main 线程中 tl 的值:"+tl.get());
    }
}
/*
main 设置 tl 的值为 1
th1 设置 tl 的值为 100
获取 tl 线程中 tl 的值:100
获取 main 线程中 tl 的值:1
*/

常见方法

方法 说明
potected T initiaValue() 用于提供初始值的,默认实现是返回 null
public void remove() 删除当前线程对应的值
public T get() 获取值
public void set(T value) 设置值

使用场景

每个线程需要独占某个资源,例如随机数、上下文信息(数据库连接)。

DataFormat/SimpleDateFormat 日期类是非线程安全的。

static ThreadLocal<DateFormat> sdf = new ThreadLocal<>() {
    @Override
    protected DateFormat initialValue() {
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    }
};

使用 ThreadLocalRandom

Random 是线程安全的,但是如果并发竞争激烈,性能会下降,所以 Java 并发包提供了类 ThreadLocalRandom,他是 Random 的子类,利用了 ThreadLocal。无构造方法,只能通过静态方法 current 获取对象。

public class ThreadLocalRandomDemo {
    public static void main(String[] args) {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        System.out.println(current.nextInt());
    }
}

// jdk 11
public static ThreadLocalRandom current() {
    if (U.getInt(Thread.currentThread(), PROBE) == 0)
        localInit();
    return instance;
}

上下文信息

比如,每个线程的事务都是独立的,可以用 ThreadLocal 保存线程的 Connection,用于同一线程内数据库事务的提交与回滚。

注意事项

应用线程退出 ThreadLocal 之前,应调用 remove 方法释放存储的信息。虽然一般不会因为没 remove 造成 OOM,但还是推荐 remove,可以减少 GC 的频率。(ThreadLocal 是将参数 T 保存在 ThreadLocalMap 中,此 Map 的 key 是 WeakReference,该引用类型会在 GC 时进行自动回收)

基本原理

在学习 ThreadLocal 的原理之前,我们先尝试自行设计一个线程私有(独占)变量试试

ThreadLocal 是如何实现每个线程都能有自己独立的值?

graph LR
Thread---|持有|ThreadLocalMap
graph LR
ThreadLocal-->|访问|ThreadLocalMap

简而言之 Thread 持有 ThreadLocalMap, map 以 ThreadLocal 对象为 key,而访问线程私有变量的值则是通过这个 key(ThreadLocal)对象来访问的(对象.get(),API 用法简单)。

具体设计

Thread 线程内部维护的两个变量。

ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

这两个变量开始的时候都为 null,只有当前线程第一次调用 ThreadLocal 的 set 或 get 方法时才会创建它们。ThreadLocal 的 set/get 方法其实都是操作的线程对象中的 map,只是把 Map 的定义放在了 ThreadLocal 中。

set 方法

public void set(T value) {
    // (1) 获取当前线程
    Thread t = Thread.currentThread();
    // (2) 将当前线程作为 key,去查找对应的线程变量(ThreadLocalMap),找到则设置
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        map.set(this, value);
    } else {
        // (3) 第一次调用就创建当前线程对应的 HashMap
        createMap(t, value);
    }
}

它调用了 getMapgetMap 的代码为:

ThreadLocalMap getMap(Thread t) {
    return t.threadLocals; // 返回线程的实例变量 threadLocals,
}

它的初始值为 null,在 null 时,set 调用 createMap 初始化,代码为:

void createMap(Thread t, T firstValue) {
    // 创建线程 t 的专属 localMap,key 为当前的 ThreadLocal 对象,值为 value
    // 其 key 类型为 WeakReference<ThreadLocal>.
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

get 方法

得到当前线程的 LocalMap,以 ThreadLocal 对象为 key 从 Map 中获取到 Entry,取其 value,如果 Map 中没有,则调用 setInitiaValue

public T get() {
    // (4) 获取当前线程
    Thread t = Thread.currentThread();
    // (5) 获取当前线程的 thread.currentThread
    ThreadLocalMap map = getMap(t);
    // (6) 如果 threadLocals 不为 null,则返回对应本地变量的值
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    // (7) threadLocals 为空则初始化当前线程的 threadLocals 成员变量
    return setInitialValue();
}

setInitiaValue 方法

private T setInitialValue() {
    // (8) 初始化为 null
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    // (9) 如果当前线程的 threadLocals 变量不为空
    if (map != null)
        map.set(this, value);
    else
        // (10) 如果当前线程的 threadLocals变量为空
        createMap(t, value);
    return value;
}

remove 方法

public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null) {
        m.remove(this);
    }
}

总结

每个线程都有一个 Map,该 Map 以 ThreadLocal 对象为 key,调用 ThreadLocal 对象的 get/set 方法实际上是先获取当前线程的 Map,然后以 ThreadLocal 对象为 key 读写 Map,这样就实现了每个线程都有自己的独立数据。

ThreadLocalRandom

Random

Random 产生随机数的步骤

可能存在的问题

代码 & 解决方式

random 使用老种子计算新种子,再用新种子得到随机数。使用 CAS 可以确保只有一个线程可以成功用 oldseed 更新新种子。

// Random#next 方法
protected int next(int bits) {
    long oldseed, nextseed;
    AtomicLong seed = this.seed;
    do {
        oldseed = seed.get(); // 获取老种子
        nextseed = (oldseed * multiplier + addend) & mask; // 产生新种子
    } while (!seed.compareAndSet(oldseed, nextseed)); // CAS 更新
    return (int)(nextseed >>> (48 - bits));
}

CAS 操作会保证只有一个线程可以更新老的种子为新的,失败的线程会通过循环重新获取更新后的种子作为当前种子去计算老的种子,这就解决了上面提到的问题,保证随机数的随机性。

ThreadLocalRandom

Random 的缺点是多个线程会使用同一个原子性种子变量,从而导致对原子变量更新的竞争。如果让每一个线程复制一份变量,使得在每个线程对变量进行操作时实际是操作自己本地内存里面的副本,从而避免了对共享变量进行同步。

如果每个线程都维护一个种子变量,则每个线程生成随机数时都根据自己老的种子计算新的种子,并使用新种子更新老的种子,再根据新种子计算随机数,就不会存在竞争问题了。ThreadLocalRandom 正是这个原理。

源码分析

BlockingQueue