Skip to the content.

线程基础

基本 API 的使用和基本原理

创建线程

继承Thread

package create_thread;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.CreateByThread")
public class CreateByThread {
    public static void main(String[] args) {
        new Thread(()->{ log.debuge("run"); }).start();
    }
}

Runnable

把『线程』和『任务』(要执行的代码)分开

@Slf4j(topic = "c.CreateByRunnable")
public class CreateByRunnable {
    public static void main(String[] args) {
        Runnable task = () -> {
            log.debug("run");
        };
        new Thread(task,"t1").start();
    }
}

Runnable 和 Thread 的关系

分析 Thread 的源码,理清它与 Runnable 的关系。

@Override
public void run() {
    // target 就是传入的 Runnable
    if (target != null) {
        target.run();
    }
}

start 与 run: start 是开启线程,启动线程后,JVM 会回调 Thread 的 run 方法。

小结

FutureTask

FutureTask 能够接收 Callable 类型的参数,用来处理有返回结果的情况。

@Slf4j(topic = "c.CreateByFutureTask")
public class CreateByFutureTask {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> task = new FutureTask<String>(() -> { // FutureTask 是立即返回的。
            TimeUnit.SECONDS.sleep(2);
            return "Hello";
        });
        Thread th1 = new Thread(task);
        th1.start();
        // 任务完成前,会阻塞。不想阻塞当前主线程的会,就开个新线程(一般用线程池)等待数据,然后进行后续操作。
        String retVal = task.get();
        log.debug(retVal);
        log.debug("main over!");
    }
}

因为多数代码都比较简单,所以只写用的少的那部分。

线程代码编写示例

线程应该与资源分离:一般按下面两个步骤走

import java.util.concurrent.TimeUnit;

class Ticket {
    private int count = 100;

    public int getCount() { return count; }
    public void setCount(int count) { this.count = count; }
    
    public Ticket(int count) { this.count = count; }
    public Ticket() {}

    public synchronized void sale() {
        if (this.count > 0) {
            System.out.println("current Thread " + Thread.currentThread().getName() + " 还有" + (--count) + "张票");
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class SaleTick {
    public static void main(String[] args) {
        Ticket ticket = new Ticket(1000);
        Thread th1 = new Thread(() -> {
            while (ticket.getCount() > 0) ticket.sale();
        }, "卖票窗口1");
        Thread th2 = new Thread(() -> {
            while (ticket.getCount() > 0) ticket.sale();
        }, "卖票窗口2");

        th1.start();
        th2.start();
    }
}

线程池

JDK Executor 框架

线程池的几个重要参数

public ThreadPoolExecutor(  int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler)

通过线程池的核心调度代码理解上面的逻辑,处理逻辑可分为三步

public void execute(Runnable command) {
    if (command == null) throw new NullPointerException();
    // AtomicInteger
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) { // 如果小于核心线程数,就创建新线程
        if (addWorker(command, true)) return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {  // 否则就进入等待队列
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command)) reject(command);
        else if (workerCountOf(recheck) == 0) addWorker(null, false);
    }
    // 如果进入等待队列失败了,就看可不可以创建救急线程,如果救急线程也无法创建,则执行拒绝策略
    else if (!addWorker(command, false)) reject(command);
}

线程池的拒绝策略如下:

线程池创建

具体用法后期线程池这章中会具体阐述。JDK 提供好的线程池有如下几个

import java.util.concurrent.*;

/**
 * 线程池可以复用线程、控制最大并发数、管理线程。
 * 线程池中有可以用的线程,就拿线程出来用,没有就先暂时阻塞任务,等待有线程了再执行那些任务。
 * 使用线程池在一定程度上可以减少上下文切换的开销(复用了线程,不用频繁创建,关闭)。
 */
public class ThreadPoolDemo {

    public static void main(String[] args) throws InterruptedException {
        ScheduledThreadPool();
        // new 一个线程,传入实现Runnable接口的对象。调用thread对象的start方法,从而调用Runnable的run方法
        // 实现线程调用方法。我们把Runnable对象改了就可以实现线程复用?
    }

    /**
     * Executors.newFixedThreadPool(int i) :创建一个拥有 i 个线程的线程池
     * 执行长期的任务,性能好很多
     * 创建一个定长线程池,可控制线程数最大并发数,超出的线程会在队列中等待
     */
    public static void FixedThreadPool() {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        // 循环100次,让5个线程处理100个业务
        for (int i = 0; i < 100; i++) {
            final int ii = i;
            fixedThreadPool.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "\t 给用户" + ii + "办理业务");
                    TimeUnit.SECONDS.sleep((int) (Math.random() * 15));
                    System.out.println(Thread.currentThread().getName() + "\t 给用户" + ii + "办理业务结束");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        fixedThreadPool.shutdown();
        // 所有任务都提交完毕后,我们执行 shutdown
        // shutdown 执行后,线程池不会再接受新的任务,但是会把正在执行的任务和阻塞队列中的任务执行完毕
    }

    /**
     * Executors.newSingleThreadExecutor:创建一个只有1个线程的 单线程池
     * 一个任务一个任务执行的场景
     * 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行
     */
    public static void SingleThreadPool() {}

    /**
     * Executors.newCacheThreadPool(); 创建一个可扩容的线程池
     * 执行很多短期异步的小程序或者负载教轻的服务器
     * 创建一个可缓存线程池,如果线程长度超过处理需要,可灵活回收空闲线程,如无可回收,则新建新线程
     */
    public static void CacheThreadPool() {}

    /**
     * Executors.newScheduledThreadPool(int corePoolSize):
     * 线程池支持定时以及周期性执行任务,创建一个corePoolSize为传入参数,最大线程数为整型的最大数的线程池
     */
    public static void ScheduledThreadPool() {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
        // 周期执行。
        ScheduledFuture<?> schedule1 = scheduledThreadPool.scheduleAtFixedRate(() -> {
            System.out.println(1);
        }, 10, 10, TimeUnit.SECONDS);

        // 只执行一次
        ScheduledFuture<?> schedule2 = scheduledThreadPool.schedule(() -> { System.out.println(2);}, 10, TimeUnit.SECONDS);
    }
}

自定义线程池

Java 有自带的线程池,但是一般不用。因为自带的线程池设置的阻塞队列的大小实在是太大了,容易出问题(OOM),一般都是自定义线程池。自定义线程池的核心代码如下

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
    corePoolSize,
    maximumPoolSize,
    keepAliveTime,
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(3), // 一般就重新定义阻塞队列。
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.AbortPolicy()
);

自定义线程池 Demo,演示四种拒绝策略

/**
 * 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程
 * - 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题,如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题
 * 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
 * - Executors 返回的线程池对象弊端如下:
 * - FixedThreadPool和SingleThreadPool:
 * ---- 运行的请求队列长度为:Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
 * - CacheThreadPool 和 ScheduledThreadPool
 * ---- 运行的请求队列长度为:Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
 */
class DefineThreadPool {
    static ThreadPoolExecutor threadPoolExecutor;
    static final Integer corePoolSize = 2;
    static final Integer maximumPoolSize = 5;
    static final long keepAliveTime = 1L;
    /**
     * 默认的 Executors 创建的线程池,底层都是使用 LinkBlockingQueue 作为阻塞队列的,
     * 而 LinkBlockingQueue 虽然是有界的,但是它的界限是 Integer.MAX_VALUE 大概有20多亿,
     * 相当于是无界的了,因此我们要使用 ThreadPoolExecutor 自己手动创建线程池,然后指定阻塞队列的大小
     * 下面我们创建了一个 核心线程数为2,最大线程数为5,并且阻塞队列数为3的线程池
     * maximumPoolSize +  LinkedBlockingQueue的大小 = 5+3 = 8; 运行的+阻塞的  最多8个任务。
     */
    static {
        threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    }
    
    public static void AbortPolicy() {
        threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        try {
            for (int i = 0; i < 15; i++) {
                final int tmp = i;
                threadPoolExecutor.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 给用户:" + tmp + " 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        threadPoolExecutor.shutdown();
    }
		
    // 在调用者线程中执行任务(main线程)
    public static void CallerRunsPolicy() {
        threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        try {
            for (int i = 0; i < 150; i++) {
                final int tmp = i;
                threadPoolExecutor.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 给用户:" + tmp + " 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        threadPoolExecutor.shutdown();
    }

    // 直接丢弃任务,不报异常。
    public static void DiscardPolicy() {
        threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        try {
            for (int i = 0; i < 150; i++) {
                final int tmp = i;
                // 我们发现,输出的结果里面出现了main线程,因为线程池出发了拒绝策略,把任务回退到main线程,然后main线程对任务进行处理
                threadPoolExecutor.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 给用户:" + tmp + " 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        threadPoolExecutor.shutdown();
    }

    /**
     * <div>线程池的合理参数</div>
     * 生产环境中如何配置 corePoolSize 和 maximumPoolSize <br>
     * 这个是根据具体业务来配置的,分为 CPU 密集型和 IO 密集型 <br>
     * - CPU 密集型 <br>
     * CPU 密集的意思是该任务需要大量的运算,而没有阻塞,CPU 一直全速运行 <br>
     * CPU 密集任务只有在真正的多核 CPU 上才可能得到加速(通过多线程) <br>
     * 而在单核 CPU 上,无论你开几个模拟的多线程该任务都不可能得到加速,因为 CPU 总的运算能力就那些 <br>
     * CPU密集型任务配置尽可能少的线程数量: <br>
     * 一般公式:CPU核数 + 1个线程数 <br>
     * - IO密集型 <br>
     * 由于IO密集型任务线程并不是一直在执行任务,则可能多的线程,如 CPU核数 * 2 <br>
     * IO密集型,即该任务需要大量的IO操作,即大量的阻塞 <br>
     * 在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力花费在等待上 <br>
     * 所以IO密集型任务中使用多线程可以大大的加速程序的运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。 <br>
     * IO密集时,大部分线程都被阻塞,故需要多配置线程数: <br>
     * 参考公式:CPU核数 / (1 - 阻塞系数) 阻塞系数在0.8 ~ 0.9左右 <br>
     * 例如:8核CPU:8/ (1 - 0.9) = 80个线程数 <br>
     */
    public static void note() {}
    public static void main(String[] args) {
        DiscardPolicy();
    }
}

生产环境中如何配置 corePoolSize 和 maximumPoolSize 是根据具体业务来配置的,分为 CPU 密集型和 IO 密集型

线程池中的线程来自那?

线程池中的线程都是由 ThreadFactory 的子类创建的

public interface ThreadFactory {
    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r); // r - 由新线程实例执行的可运行对象 
}

扩展线程池

ThreadPoolExecutor 是一个可扩展的线程池。提供了如下三个方法对线程池进行扩展

ThreadPoolExecutor.Worker.runWorker() 方法『JDK11』

线程池中的工作线程就是 Worker 实例,Worker.run 方法会调用 runWorker 方法。其中 runWorker 内部实现了每一个工作线程的固有工作流程。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task); // 执行任务前
                try {
                    task.run();
                    afterExecute(task, null); // 执行任务后
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

捕捉异常信息

线程池可能会吃掉程序抛出的异常信息。

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DivTask implements Runnable {
    int a, b;

    public DivTask(int a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public void run() {
        double ret = a / b;
        System.out.println(ret);
    }

    public static void main(String[] args) {
        ThreadPoolExecutor pools = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        for (int i = 0; i < 5; i++) {
            pools.submit(new DivTask(100, i));
        }
        pools.shutdown();
    }
}
/*
100.0
25.0
50.0
33.0
除0异常被吃掉了。
*/

解决方式

以上两种方式可以得到部分堆栈信息。

import java.util.concurrent.*;

public class DivTask implements Runnable {
    int a, b;

    public DivTask(int a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public void run() {
        double ret = a / b;
        System.out.println(ret);
    }

    public static void main(String[] args) throws Exception {
        test2();
    }

    public static void test2() {
        ThreadPoolExecutor pools = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        for (int i = 0; i < 5; i++) {
            pools.execute(new DivTask(100, i));
        }
        pools.shutdown();
    }

    public static void test1() throws Exception {
        ThreadPoolExecutor pools = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        for (int i = 0; i < 5; i++) {
            Future<?> submit = pools.submit(new DivTask(100, i));
            submit.get();
        }
        pools.shutdown();
    }
}

扩展 ThreadPoolExecutor 线程池,在任务调度之前,保存下提交线程的堆栈信息。

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public static void main(String[] args) {
        TraceThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        for (int i = 0; i < 5; i++) {
            pools.execute(new DivTask(100, i));
        }
    }

    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void execute(Runnable command) {
        super.execute(warp(command, clientTrace(), Thread.currentThread().getName()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(warp(task, clientTrace(), Thread.currentThread().getName()));
    }

    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }

    private Runnable warp(final Runnable task, final Exception clientStack, String clientThreadName) {
        return () -> {
            try {
                task.run();
            } catch (Exception e) {
                clientStack.printStackTrace();
                throw e;
            }
        };
    }
}

线程池的实现

线程池的核心在于线程的复用,线程复用的方式如下:

对比总结

public class CreateByThreadPool {
    public static void fixedThreadPool() {
        // 创建固定大小的线程池。允许最多执行2个任务,多余的任务会放入阻塞队列中。
        // 阻塞队列的大小为 Integer.MAX_VALUE
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(() -> {
            while (true) {
                System.out.println(Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        executorService.execute(() -> {
            while (true) {
                System.out.println(Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public static void cachedThreadPool() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> { System.out.println(123); });
        executorService.shutdown();
    }

    public static void scheduledThreadPool() {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
        // 周期执行。
        ScheduledFuture<?> schedule1 = scheduledThreadPool.scheduleAtFixedRate(() -> {
            System.out.println(1);
        }, 10, 10, TimeUnit.SECONDS);
    }

    public static void main(String[] args) {
        scheduledThreadPool();
    }
}

查看进程线程

windows

Linux

Java

jconsole 远程监控配置

java -Djava.rmi.server.hostname=`ip地址` -Dcom.sun.management.jmxremote -
Dcom.sun.management.jmxremote.port=`连接端口` -Dcom.sun.management.jmxremote.ssl=是否安全连接 -
Dcom.sun.management.jmxremote.authenticate=是否认证 java类

如果要认证访问,还需要做如下步骤

守护线程

主线程执行完毕后,守护线程也会终止。

线程原理

栈与栈帧

Java Virtual Machine Stacks (Java 虚拟机栈)

我们都知道 JVM 由堆、栈、方法区所组成,其中栈内存是给谁用的呢?其实就是线程,每个线程启动后,虚拟机就会为其分配一块栈内存

线程上下文切换

线程上下文切换(Thread Context Switch)

以下这些会导致 CPU 进行线程上下文切换

当 Context Switch 发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念就是程序计数器(Program Counter Register),它的作用是记住下一条 JVM 指令的执行地址,是线程私有的。

图解线程

𣏾以线程为单位分配相互独⽴, 每个栈的栈内存相互独⽴。

线程API

常用汇总

常用 API

方法 说明 注意
public void start() 启动一个新线程;Java虚拟机回调 run 方法 start 方法只是让线程进入就绪,里面代码不一定立刻运行
(CPU 的时间片还没分给它)。
每个线程对象的 start 方法只能调用一次,如果调用了多次会出 IllegalThreadStateException
public void run() 线程启动后调用该方法 如果在构造 Thread 对象时传递了 Runnable 参数,则线程启动后会调用 Runnable 中的 run 方法
,否则默 认不执行任何操作。但可以创建 Thread 的子类对象, 来覆盖默
认行为
public void setName(String name) 给当前线程取名字  
public void getName() 获取当前线程的名字
线程存在默认名称:子线程是Thread-索引,
主线程是 main
 
public static Thread currentThread() 获取当前线程对象,代码在哪个线程中执行  
public static void sleep(long time) 让当前线程休眠多少毫秒再继续执行 Thread.sleep(0) :
让操作系统立刻重新进行一次 CPU 竞争
 
public static native void yield() 提示线程调度器让出当前线程对 CPU 的使用 主要是为了测试和调试
public final int getPriority() 返回此线程的优先级 优先级只是参考,OS 不一定会按你设置的优先级来。
public final void setPriority(int priority) 更改此线程的优先级 Java 中规定线程优先级是 1~10 的整数,较大的优先级能提高该线程被
CPU 调度的机率
public void interrupt() 打断线程 如果被打断线程正在 sleep,wait,join 会导致被打断的线程抛出 InterruptedException
并清除打断标 记 ;如果打断的正在运行的线程,则会设置打断标 记 ;
park 的线程被打断,也会设置打断标记
public static boolean interrupted() 判断当前线程是否被打断 清除打断标记
public boolean isInterrupted() 判断当前线程是否被打断,不清除打断标记 不会清除打断标记
public final void join() 等待这个线程结束 x.join() 就是等待 x 线程执行结束。
public final void join(long millis) 等待这个线程死亡 millis 毫秒,0 意味着永远等待  
public final native boolean isAlive() 线程是否存活(还没有运行完毕)  
public final void setDaemon(boolean on) 将此线程标记为守护线程或用户线程  

start与run

sleep与yield

sleep

yield

线程优先级

join

查阅 join 的源码可知,其原理为:调用者轮询检查线程 alive 状态

public final synchronized void join(long millis)
    throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        while (isAlive()) {
            wait(0);
        }
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}

线程同步:

线程中断

Java 中可以使用 stop、destory 之类的方法中断线程,但是官方明确不建议使用,因为强制杀死线程会让线程中所使用的资源无法正常关闭。而 interrupt 这类方法则可以做到优雅关闭线程。

Java 中 interrupt 中断线程是线程中断是一种线程间的协作模式,通过设置线程的中断标志并不能直接终止该线程的执行,而是被中断的线程根据中断状态自行处理。

interrupt

interrupt 从字面上看,似乎是说线程运行到一半,把它中断了,然后抛出 InterruptedException 异常。实际上并不是这样。只有那些声明了会抛出 InterruptedException 的函数才会抛出异常,也就是下面这些常用的函数:

public static native void sleep(long millis) throws InterruptedException{}
public final void wait() throws InterruptedException{}
public final void join() throws InterruptedException{} 

其他情况调用 interrupt 方法不会抛出异常。

void interrupt() 方法是用于中断线程的。例如,当线程 A 运行时,线程 B 可以调用线程 A 的 interrupt() 方法来设置线程 A 的中断标志为 true 并立即返回。

:orange:设置标志仅仅是设置标志,线程 A 实际并没有被中断,它会继续往下执行。如果线程 A 因为调用了 wait 系列函数、join 方法或者 sleep 方法而被阻塞挂起,这时候若线程 B 调用线程 A 的 interrupt() 方法,线程 A 会在调用这些方法的地方抛出 InterruptedException 异常而返回。

public class UserInterrupted {
    static ExecutorService threadPool = Executors.newFixedThreadPool(3);

    public static void testInterrupt() throws InterruptedException {
        final Thread[] cur = new Thread[1];
        threadPool.submit(() -> {
            cur[0] = Thread.currentThread();
            while (true) ;
        });
        TimeUnit.SECONDS.sleep(2);
        cur[0].interrupt();
        TimeUnit.SECONDS.sleep(2);
        System.out.println(cur[0].isInterrupted()); // true
    }

    public static void main(String[] args) throws InterruptedException {
        testInterrupt();
    }
}

如果是打断 sleep/wait/join 这些方法,则打断后会触发异常,且会重置打断标记(interrupt 值为 true,但是打断 sleep/wait/join,值最后还是 false)

public class UserInterrupted {
    static ExecutorService threadPool = Executors.newFixedThreadPool(3);

    // 测试 interrupt 打断 sleep。也可以打断 wait,join 等方法
    // 打断 sleep\wait\join 方法,打断后会清空打断状态。
    public static void testInterruptSleep() throws InterruptedException {
        final Thread[] cur = new Thread[1];
        threadPool.submit(() -> {
            cur[0] = Thread.currentThread();
            try {
                TimeUnit.SECONDS.sleep(20);
            } catch (InterruptedException e) {
                System.out.println("被 interrupt 打断了");
                System.out.println(Thread.currentThread().isInterrupted());
                e.printStackTrace();
            }
        });
        TimeUnit.SECONDS.sleep(2);
        cur[0].interrupt();
        TimeUnit.SECONDS.sleep(2);
        System.out.println(cur[0].isInterrupted());
    }

    public static void main(String[] args) throws InterruptedException {
        testInterruptSleep();
    }
}
/*
被 interrupt 打断了
false
java.lang.InterruptedException: sleep interrupted
	at java.base/java.lang.Thread.sleep(Native Method)
	at java.base/java.lang.Thread.sleep(Thread.java:337)
	at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)
	at com.ddstudy.stop.UserInterrupted.lambda$testInterruptSleep$1(UserInterrupted.java:34)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
false
*/

轻量级阻塞与重量级阻塞

能够被中断的阻塞称为轻量级阻塞,对应的线程状态是 WAITING 或者 TIMED_WAITING;而像 synchronized 这种不能被中断的阻塞称为重量级阻塞,对应的状态是 BLOCKED。下图是在调用不同的函数之后,一个线程完整的状态迁移过程。

初始线程处于 NEW 状态,调用 start() 之后开始执行,进入 RUNNING 或者 READY 状态。如果没有调用任何的阻塞函数,线程只会在 RUNNING 和 READY 之间切换,也就是系统的时间片调度。这两种状态的切换是操作系统完成的,开发者基本没有机会介入,除了可以调用 yield() 函数,放弃对 CPU 的占用。

一旦调用了图中的任何阻塞函数,线程就会进入 WAITING 或者 TIMED_WAITING 状态,两者的区别只是前者为无限期阻塞,后者则传入了一个时间参数,阻塞一个有限的时间。如果使用了 synchronized 关键字或者 synchronized 块,则会进入 BLOCKED 状态。

除了常用的阻塞/唤醒函数,还有一对不太常见的阻塞/唤醒函数,LockSupport.park()/unpark()。这对函数非常关键,concurrent 包中 Lock 的实现即依赖这一对操作原语。故而 interrupted() 的精确含义是“唤醒轻量级阻塞”,而不是字面意思“中断一个线程”。

isInterrupted

boolean isInterrupted() 方法:检测当前线程是否被中断,如果是返回 true,否则返回 false。仅仅是获取线程是否被中断这个状态,不会做任何多余的事。

import java.sql.Time;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 主要测试以下三个方法
 * - Thread.interrupted()
 */
public class UserInterrupted {
    static ExecutorService threadPool = Executors.newFixedThreadPool(3);

    public static void testInterrupt() throws InterruptedException {
        final Thread[] cur = new Thread[1];
        threadPool.submit(() -> {
            cur[0] = Thread.currentThread();
            while (true) ;
        });
        TimeUnit.SECONDS.sleep(2);
        cur[0].interrupt();
        TimeUnit.SECONDS.sleep(2);
        System.out.println(cur[0].isInterrupted()); // true
    }

    public static void main(String[] args) throws InterruptedException {
        testInterruptSleep();
    }
}

interrupted

boolean interrupted() 方法:检测当前线程是否被中断,如果是返回 true,否则返回 false。与 isInterrupted 不同的是,该方法如果发现当前线程被中断,则会清除中断标志,并且该方法是 static 方法,可以通过 Thread 类直接调用。另外从下面的代码可以知道,在 interrupted() 内部是获取当前调用线程的中断标志而不是调用 interrupted() 方法的实例对象的中断标志。

public static boolean interrupted(){
    // 清除中断标志
    return currentThread().isInterrupted(true);
}

打断sleep/wait/join的线程

线程调用了这几个方法都会让线程进入阻塞状态,调用 obj.interrupt 方法打断 sleep、wait、join 的线程, 会清空打断状态,以 sleep 为例

@Slf4j(topic = "c.InterruptSleep")
public class InterruptSleep {
    public static void main(String[] args) throws InterruptedException {
        Thread th1 = new Thread(() -> {
            try {
                log.debug("sleep");
                TimeUnit.SECONDS.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        th1.start();
        // 确保 th1 线程开始运行
        TimeUnit.SECONDS.sleep(1);
        log.debug("interrupt");
        th1.interrupt(); 
        log.debug("打断标记:{}", th1.isInterrupted()); // false  打断标记被置为了 false
    }
}

以 wait 为例

@Slf4j(topic = "c.InterruptSleep")
public class InterruptWait {
    static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread th1 = new Thread(() -> {
            synchronized (lock) {
                try {
                    log.debug("wait");
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        th1.start();
        TimeUnit.SECONDS.sleep(1);
        th1.interrupt();
        log.debug(String.valueOf(th1.isInterrupted())); // false  打断标记被置为了 false
    }

}

打断正常运行的线程

obj.interrupt 方法打断正常运行的线程:不会清空打断状态(true)

@Slf4j(topic = "c.InterruptNormal")
public class InterruptNormal {
    static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        // 线程被打断后就不在运行
        Thread th1 = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) ;
        });

        th1.start();
        TimeUnit.SECONDS.sleep(5);
        th1.interrupt();
        log.debug(String.valueOf(th1.isInterrupted())); // true
    }

}

打断park

obj.interrupt 方法打断 park 线程, 不会清空打断状态;如果打断标记已经是 true, 则 park 会失效

// park 失效案例
public class InterruptPark {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            System.out.println("park 前");
            Thread.currentThread().interrupt();
            System.out.println(Thread.currentThread().isInterrupted());
            LockSupport.park(); // park 阻塞失效,
            System.out.println("park 后");
        });
        thread.start();
        TimeUnit.SECONDS.sleep(20);
        System.out.println("unpark");
        LockSupport.unpark(thread);
    }
}

两阶段终止模式

模式示意图

flowchart 
while("while(true)")-->有没有被打断(有没有被打断)
有没有被打断-->|是|料理后事(料理后事)
有没有被打断-->|否|sleep(睡眠2s)
料理后事-->ends((结束循环))
sleep-->|无异常|执行监控记录(执行监控记录)
sleep-->|有异常|设置打断标记(设置打断标记)
执行监控记录-->while
设置打断标记-->while

Coding

我们可以在任意时间打断线程,但是终止线程需要确定好何时终止。比如资源读取类线程,被打断了,我们不能立即终止线程,应该在资源读取完毕后,在根据打断标记决定是否终止。

// 两阶段终止模式。日志监控 Demo
@Slf4j(topic = "c.TwoPhaseTermination")
public class TwoPhaseTermination {
    public static void main(String[] args) throws InterruptedException {
        TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();
        twoPhaseTermination.start();
        TimeUnit.SECONDS.sleep(10);
        twoPhaseTermination.stop();
    }

    private Thread monitor;

    public void stop() {
        monitor.interrupt();
    }

    public void start() {
        monitor = new Thread(() -> {
            log.debug("start logging~~");
            Thread current = Thread.currentThread();
            while (true) {
                if (current.isInterrupted()) {
                    log.debug("over!");
                    return;
                }
                try {
                    TimeUnit.SECONDS.sleep(2);
                    log.debug("start logging~~");
                } catch (InterruptedException e) {
                    // 睡眠过程中被打断。会走 exception。然后将打断标记置为  true。打断完一次 sleep 后再打断就是打断正常线程了。
                    current.interrupt();
                    log.debug("料理后事");
                    e.printStackTrace();
                }
            }
        });
        monitor.start();
    }
}

小结

易混淆的方法

方法名称 作用
void interrupt() 中断线程;B 线程调用线程 A 的 interrupt() 方法来设置线程A的中断标志为 true,并立即返回。
boolean isInterrupted() 检测当前线程是否被中断,被中断则返回 true,否则返回 false
static boolean interrupted() 是静态方法;检测当前线程是否被中断,被中断则返回 true,否则返回 false。
public class ThreadMain {
    
    public static void main(String[] args) {
        Thread threadOne = new Thread(() -> {
            for (; ; ) {}
        });

        threadOne.start();
        threadOne.interrupt();

        System.out.println("isInterrupted:" + threadOne.isInterrupted()); // 普通成员方法 , 查看 threadOne 线程是否被中断 ,true
        System.out.println(threadOne.interrupted()); // 静态成员方法,false threadOne.interrupted() 是看当前线程,即调用 threadOne.interrupted 这个方法的线程
        System.out.println("isInterrupted:" + Thread.interrupted()); // 静态成员方法,false
        System.out.println("isInterrupted:" + threadOne.isInterrupted());// 普通成员方法 ,true
    }
}

线程同步方法

具体的原理请看 03-管程#wait/notify

wait

用于阻塞当前线程,在 synchronized 修饰的代码中调用 wait 方法,会将当前线程放入 WAITING 队列。

在调用 wait 方法的时候会先释放锁,然后进入阻塞状态。因为,如果不释放的话,会造成死锁。阻塞时,会等待其他线程唤醒它,唤醒后重新拿锁。

wait 的伪代码如下

wait(){
	// 释放锁
	// 阻塞,等待被其他线程 notify
	// 重新拿锁
}

notify

用于唤醒阻塞队列中的某个线程。当 object.notify() 方法被调用时,它就会从这个等待队列中随机选择一个线程,并将其唤醒。这个选择是完全随机的,且是不公平的。

不公平:意思是,如果阻塞队列中有需要被唤醒的线程,并且此时有另一个线程来抢占的话,会把资源分配给来抢占的线程。

notifyAll

与 notify 一样,不过 notifAll 会唤醒阻塞队列中的所有线程。

废弃方法

这些方法已过时,容易破坏同步代码块,造成线程死锁

方法 功能
public final void stop() 停止线程运行
public final void suspend() 挂起(暂停)线程运行
public final void resume() 恢复线程运行

suspend&resume

使用不当会造成死锁,且死锁后的线程状态还是 Runnable!这个才是最坑的,个人认为是因为这个才被废弃的!

suspend 挂起线程,不释放资源!resume唤醒线程!需要获得监视器 monitor,简单说就是要像 sync 加锁;且 suspend 在导致线程暂停的同时,并不会去释放任何资源。

如果 resume() 方法操作意外地在 suspend() 方法前就执行了,那么被挂起的线程可能很难有机会被继续执行。而且,对于被挂起的线程,从它的线程状态上看,居然还是 Runnable,这会严重影响我们对系统当前状态的判断。

stop

暴力终止线程,会存在很多问题!一般都是选择 Interrupt 这样温柔的停止线程。

主线程与守护线程

默认情况下,Java 进程需要等待所有线程都运行结束,才会结束。有一种特殊的线程叫做守护线程,只要其它非守护线程运行结束了,即使守护线程的代码没有执行完,也会强制结束。

线程的状态

五种状态

从 OS 层面来描述

六种状态

这是从 Java API 层面来描述的

根据 Thread.State 枚举,分为六种状态;

PS:从 NEW 状态出发后,线程不能再回到 NEW 状态,同理,处于 TERMINATED 状态的线程也不能再回到 RUNNABLE 状态。

 public enum State {
	// 表示刚刚创建的线程,这种线程还没开始执行。等到线程的start()方法调用时,才表示线程开始执行
	NEW,
	// 线程所需的一切资源都已经准备好了
	RUNNABLE,
	// 线程阻塞,暂停执行
	BLOCKED,
	// WAITING会进入一个无时间限制的等待
	WAITING,
	// TIMED_WAITING会进行一个有时限的等待
	TIMED_WAITING,
	// 当线程执行完毕后,则进入TERMINATED状态,表示结束。
	TERMINATED;
}

重点回顾

终止线程

终止线程的方式如下:

线程中断

stop 强行结束线程可能会引起数据不一致。如过我们把线程执行到一个安全点后再终止则可避免这种问题。线程中断就是这种思想。设置线程需要被中断的标记,具体何时中断由我们自己控制。所以,严格来讲:线程中断并不会使线程立即退出,而是给线程发送一个通知,告知目标线程,有人希望你退出。

Thread.sleep() 方法由于中断而抛出异常,此时,它会清除中断标记,需要后置处理。

API

线程中断并不会使线程立即退出,而是给线程发送一个通知,告知目标线程,有人希望你退出啦!至于目标线程接到通知后如何处理,则完全由目标线程自行决定。这点很重要,如果中断后,线程立即无条件退出,我们就又会遇到 stop() 方法的老问题。

interrupt() 方法

一个实例方法。它通知目标线程中断,也就是设置中断标志位。仅仅是设置一个标志位~并不会导致线程停止,想要线程停止可对标志位进行判断,然后进行其他操作。

public class InterruptDemo {
    public static void testInterrupt() throws InterruptedException {
        Thread thread = new Thread(() -> {
            while (true)
                Thread.yield();
        });
        thread.start();
        Thread.sleep(2000);
        // thread.interrupt() 仅仅只是设置中断标志位
        thread.interrupt();
    }

    public static void main(String[] args) throws InterruptedException {
        // 死循环。
        testInterrupt();
    }
}

Thread.isInterrupted() 方法

一个静态方法。判断当前线程是被设置了中断状态。所以我们可以对设置了中断状态的线程进行需要的操作,如:当前线程被设置了中断状态,那么在某个时刻,我们就让线程退出执行!

import java.util.concurrent.TimeUnit;

    public static void testIsInterrupted() throws InterruptedException {
        Thread thread = new Thread(() -> {
            int count = 0;
            while (true) {
                count = (int) (Math.random() * 100_000);
                System.out.println(count);
                if (Thread.currentThread().isInterrupted() && count > 99_997) {
                    System.out.println("break current thread");
                    break;
                }
                // 放弃cpu执行权限
                Thread.yield();
            }
        });
        thread.start();
        thread.interrupt();
    }

    public static void main(String[] args) throws InterruptedException {
        testIsInterrupted();
    }
}

Thread.interrupted() 方法

判断线程是否被中断,并清除当前中断状态

优雅退出

小结