开发者

Java多线程Thread及其原理详解

目录
  • 1. 实现多线程的方式
  • 2. Thread 部分源码
    • 2.1. native 方法注册
    • 2.2. Thread 中的成员变量
    • 2.3. Thread 构造方法与初始化
    • 2.4. Thread 线程状态与操作系统状态
    • 2.4. start() 与 run() 方法
    • 2.5. sleep() 方法
    • 2.6. join() 方法
    • 2.7. interrupt() 方法
  • 总结

    1. 实现多线程的方式

    package com.jxz.threads;
    
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    
    import Java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.FutureTask;
    
    /**
     * @Author jiangxuzhao
     * @Description
     * @Date 2024/9/10
     */
    @Slf4j
    public class ThreadCreateTest {
        @Test
        @SneakyThrows
        public void test1() {
            // 匿名类重写 Thread#run
            Thread thread1 = new Thread() {
                @Override
                public void run() {
                    log.info("extend thread#run...");
                }
            };
            thread1.start();
            // 避免主线程直接结束
            Thread.sleep(1000);
        }
    
        @Test
        @SneakyThrows
        public void test2() {
            // Lambda 表达式定义实现 Runnable target, Thread#run 方法最终调用 target#run
            Thread thread2 = new Thread(() -> {
                log.info("implement 自定义变量 target 的 Runnable#run...");
            });
            thread2.start();
            // 避免主线程直接结束
            Thread.sleep(1000);
        }
    
        @Test
        @SneakyThrows
        public void test3() {
            // Lambda 表达式定义实现 callable#call,可以在主线程中通过 Future#get 阻塞获取结果 result
            FutureTask<String> stringFutureTask = new FutureTask<>(() -> {
                log.info("implement Callable#call");
                return Thread.currentThread().getName();
            });
            Thread thread3 = new Thread(stringFutureTask);
            thread3.start();
            // 阻塞获取结果,不用担心主线程直接结束
            log.info("thread3 futureTask callable output = {}", stringFutureTask.get());
        }
    
        @Test
        @SneakyThrows
        public void test4() {
            // 线程池实现异步多线程
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            Future<String> stringFuture = executorService.submit(() -> {
                log.info("thread pool submit");
                return Thread.currentThread().getName();
            });
            // 阻塞获取结果,不用担心主线程直接结束
            log.info("thead submit output = {}", stringFuture.get());
        }
    }

    2. Thread 部分源码

    2.1. native 方法注册

    public
    class Thread implements Runnable {
      	// 在 jdk 底层的 Thread.c 文件中定义了各种方法
        private static native void registerNatives();
      
      	// 确保 registerNatives 是 <clinit> 中第一件做的事
        static {
            registerNatives();
        }
    }

    Thread#registerNatives 作为本地方法,主要作用是注册一些本地方法供 Thread 类使用,如 start0(), stop0() 等。

    该方法被放在一个本地静态代码块中,并且该代码块被放在类中最靠前的位置,确保当 Thread 类被加载到 JVM 中时,调用 第一时间就会注册所有的本地方法。

    所有的本地方法都是定义在 JDK 源码的 Thread.c 文件中的,它定义了各个操作系统平台都要用到的关于线程的基本操作。

    可以专门去下载 openjdk 1.8 的源码一探究竟:

    Java多线程Thread及其原理详解

    或者直接阅读 openjdk8 在线的源码:

    https://hg.openjdk.org/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/native/java/lang/Thread.c

    2.2. Thread 中的成员变量

    针对其中常见的几个变量做了中文注释

    // 当前线程的名称
    private volatile String name;
    private int            priority;
    private Thread         threadQ;
    private long           eetop;
    
    /* Whether or not to single_step this thread. */
    private boolean     single_step;
    
    // 当前线程是否在后台运行
    /* Whether or not the thread is a daemon thread. */
    private boolean     daemon = false;
    
    /* JVM state */
    private boolean     stillborn = false;
    
    // init 构造方法中传入的执行任务,当其不为空时,会执行此任务
    /* What will be run. */
    private Runnable target;
    
    // 当前线程所在的线程组
    /* The group of this thread */
    private ThreadGroup group;
    
    // 当前线程的类加载器
    /* The context ClassLoader for this thread */
    private ClassLoader contextClassLoader;
    
    /* The inherited AccessControlContext of this thread */
    private AccessControlContext inheritedAccessControlContext;
    
    // 被用来定义 "Thread-" + nextThreadNum() 的线程名,自增的序号在线程池打印日志中很常见
    // 静态变量 threadInitNumber 在 static synchronized 方法中自增,这个方法被调用时在 Thread.class 类上加 synchronized 锁,保证单台 JVM 虚拟机上都通过 Thread.class 并发创建线程 init 时,线程自增序号的并发安全
    /* For autonumbering anonymous threads. */
    private static int threadInitNumber;
    private static synchronized int nextThreadNum() {
        return threadInitNumber++;
    }
    
    // 每个线程都维护一个 ThreadLocalMap,这个在保障线程安全的 ThreadLocal 中经常出现
    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;
    
    /*
     * InheritableThreadLocal values pertaining to this thread. This map is
     * maintained by the InheritableThreadLocal class.
     */
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    
    /*
     * The requested stack size for this thread, or 0 if the creator did
     * not specify a stack size.  It is up to the VM to do whatever it
     * likes with this number; some VMs will ignore it.
     */
    private long stackSize;
    
    /*
     * JVM-private state that persists after native thread termination.
     */
    private long nativeParkEventPointer;
    
    /*
     * Thread ID
     */
    private long tid;
    
    /* For generating thread ID */
    private static long threadSeqNumber;
    
    /* Java thread status for tools,
     * initialized to indicate thread 'not yet started'
     */
    
    private volatile int threadStatus = 0;
    
    
    private static synchronized long nextThreadID() {
        return ++threadSeqNumber;
    }
    
    /**
     * The argument supplied to the current call to
     * java.util.concurrent.locks.LockSupport.park.
     * Set by (private) java.util.concurrent.locks.LockSupport.setblocker
     * Accessed using java.util.concurrent.locks.LockSupport.getBlocker
     */
    volatile Object parkBlocker;
    
    /*编程客栈 The object in which this thread is blocked in an interruptible I/O
     * operation, if any.  The blocker's interrupt method should be invoked
     * after setting this thread's interrupt status.
     */
    private volatile Interruptible blocker;
    private final Object blockerLock = new Object();
    
    /* Set the blocker field; invoked via sun.misc.SharedSecrets from java.nio code
     */
    void blockedOn(Interruptible b) {
        synchronized (blockerLock) {
            blocker = b;
        }
    }
    
    /**
     * The minimum priority that a thread can have.
     */
    public final static int MIN_PRIORITY = 1;
    
    /**
     * The default priority that is assigned to a thread.
     */
    public final static int NORM_PRIORITY = 5;
    
    /**
     * The maximum priority that a thread can have.
     */
    public final static int MAX_PRIORITY = 10;

    2.3. Thread 构造方法与初始化

    构造方法:

    Thread 具有多个重载的构造函数,内部都是调用 Thread#init() 方法初始化,我们常用的就是传入 Thread(Runnable target) 以及 Thread(Runnable target, String name)

    public Thread() {
        init(null, null, "Thread-" + nextThreadNum(), 0);
    }
    
    public Thread(Runnable target) {
        init(null, target, "Thread-" + nextThreadNum(), 0);
    }
    
    
    Thread(Runnable target, AccessControlContext acc) {
        init(null, target, "Thread-" + nextThreadNum(), 0, acc, false);
    }
    
    public Thread(ThreadGroup group, Runnable target) {
        init(group, target, "Thread-" + nextThreadNum(), 0);
    }
    
    public Thread(String name) {
        init(null, null, name, 0);
    }
    
    public Thread(ThreadGroup group, String name) {
        init(group, null, name, 0);
    }
    
    public Thread(Runnable target, String name) {
        init(null, target, name, 0);
    }
    
    public Thread(ThreadGroup group, Runnable target, String name) {
        init(group, target, name, 0);
    }
    
    public Thread(ThreadGroup group, Runnable target, String name,
                  long stackSize) {
        init(group, target, name, stackSize);
    }

    init 初始化方法:

    主要完成成员变量赋值的操作,包括 Runnable target 变量的赋值。后面可以看到,如果在构造器中就传入这个 Runnable,Thread#run 就会执行这个 Runnable.

    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
        if (name == null) {
            throw new NullPointerException("name cannot be null");
        }
    
        this.name = name;
    
        Thread parent = currentThread();
        SecurityManager security = System.getSecurityManager();
        if (g == null) {
            /* Determine if it's an applet or not */
    
            /* If there is a security manager, ask the security manager
               what to do. */
            if (security != null) {
                g = security.getThreadGroup();
            }
    
            /* If the security doesn't have a strong opinion of the matter
               use the parent thread group. */
            if (g == null) {
                g = parent.getThreadGroup();
            }
        }
    
        /* checkAccess regardless of whether or not threadgroup is
           explicitly passed in. */
        g.checkAccess();
    
        /*
         * Do we have the required permissions?
         */
        if (security != null) {
            if (isCCLOverridden(getClass())) {
                security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
            }
        }
    
        g.addUnstarted();
    
        this.group = g;
        this.daemon = parent.isDaemon();
        this.priority = parent.getPriority();
        if (security == null || isCCLOverridden(parent.getClass()))
            this.contextClassLoader = parent.getContextClassLoader();
        else
            this.contextClassLoader = parent.contextClassLoader;
        this.inheritedAccessControlContext =
                acc != null ? acc : AccessController.getContext();
      
      	// 就是上面成员变量中的 target,在这里赋值
        this.target = target;
        setPriority(priority);
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        /* Stash the specified stack size in case the VM cares */
        this.stackSize = stackSize;
    
        /* Set thread ID */
        tid = nextThreadID();
    }

    2.4. Thread 线程状态与操作系统状态

    public enum State {
        /**
         * Thread state for a thread which has not yet started.
         */
      	// 初始化状态
        NEW,
    
        /**
         * Thread state for a runnable thread.  A thread in the runnable
         * state is executing in the Java virtual MAChine but it may
         * be waiting for other resources from the operating system
         * such as processor.
         */
      	// 可运行状态,可运行状态可以包括:运行中状态和就绪状态。
        RUNNABLE,
    
        /**
         * Thread state for a thread blocked waiting for a monitor lock.
         * A thread in the blocked state is waiting for a monitor lock
         * to enter a synchronized block/method or
         * reenter a synchronized block/method after calling
         * {@link Object#wait() Object.wait}.
         */
      	// 线程阻塞状态
        BLOCKED,
    
        /**
         * Thread state for a waiting thread.
         * A thread is in the waiting state due to calling one of the
         * following methods:
         * <ul>
         *   <li>{@link Object#wait() Object.wait} with no timeout</li>
         *   <li>{@link #join() Thread.join} with no timeout</li>
         *   <li>{@link LockSupport#park() LockSupport.park}</li>
         * </ul>
         *
         * <p>A thread in the waiting state is waiting for another thread to
         * perform a particular action.
         *
         * For example, a thread that has called <tt>Object.wait()</tt>
         * on an object is waiting for another thread to call
         * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
         * that object. A thread that has called <tt>Thread.join()</tt>
         * is waiting for a specified thread to terminate.
         */
      	// 等待状态
        WAITING,
    
        /**
         * Thread state for a waiting thread with a specified waiting time.
         * A thread is in the timed waiting state due to calling one of
         * the following methods with a specified positive waiting time:
         * <ul>
         *   <li>{@link #sleep Thread.sleep}</li>
         *   <li>{@link Object#wait(long) Object.wait} with timeout</li>
         *   <li>{@link #join(long) Thread.join} with timeout</li>
         *   <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
         *   <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
         * </ul>
         */
      	// 超时等待状态
        TIMED_WAITING,
    
        /**
         * Thread state for a terminated thread.
         * The thread has completed execution.
         */
      	// 线程终止状态
        TERMINATED;
    }
    • NEW: 初始状态,线程被构建,但是还没有调用 Thread#start() 方法
    • RUNNABLE: 可运行状态,包括运行中和就绪状态。从源码的注释中可以看出来,就绪状态就是线程在 JVM 中有资格运行,但是由于操作系统调度的原因尚未执行,可能线程在等待操作系统释放资源,比方说处理器资源。
    • BLOCKED: 阻塞状态,处于这个状态的线程等待别的线程释放 monitor 锁以进入 synchronized 块;或者调用 Object#wait() 方法释放锁进入等待队列后(此时是 WAITING 状态),被其他线程 notify() 唤醒时不能立刻从上次 wait 的地方恢复执行,再次进入 synchronized 块还需要和别的线程竞争锁。
    • 总结来说,线程因为获取不到锁而无法进入同步代码块时,处于 BLOCKED 阻塞状态。
    • WAITING: 等待状态,处于该状态的线程需要其他线程对其进行通知或者中断等操作,从而进入下一个状态。
    • TIMED_WAITING: 超时等待状态,相比于 WAITING 状态持续等待,该状态可以在一定时间后自行返回
    • TERMINATED: 终止状态,当前线程执行完毕

    下面就用一张图表示了Java线程各种状态的流转,其中夹杂着操作系统线程的状态定义,其中标红的部分表示 Java 状态

    Java多线程Thread及其原理详解

    对比操作系统线程状态,包括 new、terminated、ready、running、waiting,除去初始化 new 和 terminated 终止状态,一个线程运行中的状态只有:

    • ready: 线程已创建,等待系统调度分配 CPU 资源
    • running: 线程获得了 CPU 使用权,正在运算
    • waiting: 线程等待(或者说挂起),让出 CPU 资源给其他线程使用

    其对应关系我理解如下:

    其中 Java 线程状态 RUNNABLE 包括操作系统状态的运行 running 和就绪 ready,操作系统的 waiting 包含了 BLOCKED 阻塞挂起状态。

    Java多线程Thread及其原理详解

    2.4. start() 与 run() 方法

    新线程构造之后,只有调用 start() 才能让 JVM 创建线程并进入运行状态,Thread#start() 源码如下,主要包含几大步骤:

    • 判断线程状态是否为 NEW 初始化
    • 加入线程组
    • 调用 native 方法 start0() 通知底层 JVM 启动一个线程,start0() 就是前面 registerNatives() 本地方法注册的一个启动方法
    • 如果启动失败,把线程从线程组中删除
    public synchronized void start() {
        /**
         * This method is not invoked for the main method thread or "system"
         * group threads created/set up by the VM. Any new functionality added
         * to this method in the future may have to also be added to the VM.
         *
         * A zero status value corresponds to state "NEW".
         */
      	// 1. 判断线程状态是否为 NEW 初始化,否则直接抛出异常
        if (threadStatus != 0)
            throw new IllegalThreadStateException();
    
        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
      	// 2. 加入线程组
        group.add(this);
    	
      	// 线程是否已经启动标志位,启动后设置为 true
        boolean started = false;
        try {
          	// 3. 调用本地方法启动线程
            start0();
          	// 启动后设置标志位为 true
            started = true;
        } finally {
            try {
              	// 4. 如果启动失败,把线程从线程组中移除
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }
    
    // JVM 真正启动线程的本地方法
    private native void start0();

    从 start() 源码中可以看出来以下几点:

    1. start() 加上 synchronized 关键词,单个 Thread 实例在这个 JVM 进程中运行是同步的,因此不会出现并发问题。同步检查该线程的状态,如果不是初始化状态则抛出异常。
    2. start() 方法并没有直接调用我们定义的 run() 方法,是因为 Thread#start() 底层调用 Thread#start0(),start0() 的本地方vYpfHvWCL法逻辑中会调用 run() 方法
    3. 直接调用 Thread#run() 方法或者 Runnable#run() 方法不会创建新线程执行任务,而是在主线程直接串行执行,如果要创建新线程执行任务,需要调用 Thread#start() 方法

    调用逻辑图如下:

    Java多线程Thread及其原理详解

    Thread#run() 源码如下:

    // 自定义重写 Thread#run() 或者传入 Runnable,最终都会调用该线程的 run() 方法逻辑
    // 如果传入了 Runnable 就会走进这个方法运行 target.run(),有点装饰器模式的感觉
    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }

    至于为何最终 start0() 还是调用了 Thread#run(),这就需要去看 jdk 源码了,我刚好也硬着头皮去挖了下:

    首先看到 Thread.c 文件中 registerNatives 里面注册的这些本地方法,start0() 会去调用 JVM_StartThread

    Java多线程Thread及其原理详解

    在 jvm.cpp 文件中找出 JVM_StartThread 方法,其底层调用 new JavaThread()方法

    Java多线程Thread及其原理详解

    最终该方法真的会去 thread.cpp 里调用创建操作系统线程的方法 os::create_thread

    Java多线程Thread及其原理详解

    new JavaThread() 方法里面会引用 jvm.cpp 文件中的 thread_entry 方法,这个方法最终就会调用 vmSymbols::run_method_name(),看起来是个虚拟机内注册的方法

    Java多线程Thread及其原理详解

    全局检索一下,其实就是在 vmSymbols.hpp 头文件中定义的许多通用方法和变量,run 方法刚好是其中定义的一个,也就是 Thread#run()。

    还可以看到许多其他常见的方法,比方说类的初始化方法,是 jvm 第一次加载 class 文件时调用,包括静态变量初始化语句和静态块执行。

    Java多线程Thread及其原理详解

    2.5. sleep() 方法

    Thread#sleep() 方法会让当前线程休眠一段时间,单位为毫秒,由于是 static 方法,所以是让直接调用 Thread.sleep() 的休眠,这里需要注意的是:

    调用 sleep() 方法使线程休眠以后,不会释放自己占有的锁。

    // 本地方法,真正让线程休眠的方法
    public static native void sleep(long millis) throws InterruptedException;
    
    /**
     * Causes the currently executing thread to sleep (temporarily cease
     * execution) for the specified number of milliseconds plus the specified
     * number of nanoseconds, subject to the precision and accuracy of system
     * tiwww.devze.commers and schedulers. The thread does not lose ownership of any
     * monitors.
     *
     * @param  millis
     *         the length of time to sleep in milliseconds
     *
     * @param  nanos
     *         {@code 0-999999} additional nanoseconds to sleep
     *
     * @throws  IllegalArgumentException
     *          if the value of {@code millis} is negative, or the value of
     *          {@code nanos} is not in the range {@code 0-999999}
     *
     * @throws  InterruptedException
     *          if any thread has interrupted the current thread. The
     *          <i>interrupted status</i> of the current thread is
     *          cleared when this exception is thrown.
     */
    public static void sleep(long millis, int nanos)
    throws InterruptedException {
        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
    
        if (nanos < 0 || nanos > 999999) {
            throw new IllegalArgumentException(
                                "nanosecond timeout value out of range");
        }
    
        if (nanos >= 500000 || (nanos != 0 && millis == 0)) {
            millis++;
        }
    		
      	// 调用本地方法
        sleep(millis);
    }

    2.6. join() 方法

    这个方法是目前我觉得 Thread 里面最难理解的方法了,涉及到 synchronized 锁、wait、notify 原理,以及线程调用主体之间的辨析,参考 【Java】Thread类中的join()方法原理,我的理解如下:

    首先看下 Thread#join() 方法的源码:

    非静态方法,是类中的普通方法,比方说 Main 线程调用 ThreadA.join(),就是 Main 线程会等待 ThreadA 执行完成

    // 调用方法,比方说 Main 线程调用 ThreadA.join(),就是 Main 线程会等待 ThreadA 执行完成
    public final void join() throws InterruptedException {
        join(0);
    }
    
    public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;
    
        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
    
        if (millis == 0) {
          	// 该分支是无限期等待js ThreadA 结束,其实内部最后是在 ThreadA 结束时被 notify 
            while (isAlive()) {
                wait(0);
            }
        } else {
          	// 该分支时等待有限的时间,如果 ThreadA 在 delay 时间以后还未结束,等待线程也返回了
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

    重点关注下其中会出现的两个 wait():

    首先我们知道,Object#wait() 需要放在 synchronized 代码块中执行,即获取到锁以后再释放掉锁。

    这个 synchronized 锁就是在 Thread#join() 方法上,成员方法上加了 synchronized 说明就是 synchronized(this)android, 假设 Main 线程调用 ThreadA.join(),那么这个 this 就是指调用 ThreadA.join() 的 ThreadA 对象本身,最终效果就是,调用方 Main 线程持有了 ThreadA 对象的 Monitor 锁,被记录在 ThreadA 对象头上。

    Java多线程Thread及其原理详解

    有了 Object#wait() 就需要有对应的 Object#notify() 将其唤醒,这又得看到 jvm 源码里面去了

    在 openjdk/hotspot/src/share/vm/runtime/thread.cpp 的 JavaThread::exit 方法中,这其实是线程退出时会执行的方法,有个 ensure_join() 方法

    Java多线程Thread及其原理详解

    ensure_join() 方法的源码如下:

    上面的 this 就是指 ThreadA,就是下面方法入参中的 thread。可以看出来,当线程 ThreadA 执行完成准备退出时,jvm 会自动唤醒等待在 threadA 对象上的线程,在我们的例子中就是主线程。

    Java多线程Thread及其原理详解

    总结如下:

    Thread.join() 方法底层原理是 synchronized 方法 + wait/notify。主线程调用 ThreadA.join() 方法,通过 synchronized 关键字获取到 ThreadA 的对象锁,内部再通过 Object#wait() 方法等待,这里的执行方和调用方都是主线程,最终当 ThreadA 线程退出的时候,jvm 会自动 notify 唤醒等待在 ThreadA 上的线程,也就是主线程。

    2.7. interrupt() 方法

    Thread#interrupt 是中断被调用线程的方法,它通过设置线程的中断标志位来中断被调用线程,通常调用会抛出 java.lang.InterruptedException 异常。

    这种中断线程的方法比较安全,能够使正在执行的任务继续能够执行完,而不像 stop() 方法那样强制关闭。

    public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();
    
        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
      	// 调用本地方法中断线程
        interrupt0();
    }

    总结

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜