Skip to content

Java线程那点事儿 #4

Open
Open
@aCoder2013

Description

@aCoder2013

引言

说到Thread大家都很熟悉,我们平常写并发代码的时候都会接触到,那么我们来看看下面这段代码是如何初始化以及执行的呢?

public class ThreadDemo {

	public static void main(String[] args) {
		new Thread().start();
	}
}

初始化流程

代码就一行很简单,那么这行简单的代码背后做了那些事情呢?

初始化Thread这个类

首先JVM会去加载Thread的字节码,初始化这个类,这里即调用下面这段代码:

public class Thread implements Runnable {
    /* Make sure registerNatives is the first thing <clinit> does. */
    private static native void registerNatives();
    static {
        registerNatives();
    }
}

是个native方法,那么我们去看看内部实现是什么,具体的目录是openjdk/jdk/src/share/native/java/lang/Thread.c, 下载地址

#define ARRAY_LENGTH(a) (sizeof(a)/sizeof(a[0]))

//JVM前缀开头的方法,具体实现在JVM中
static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
    {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};

#undef THD
#undef OBJ
#undef STE
#undef STR

//jclass cls即为java.lang.Thread
JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
    (*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

可以发现具体的实现都是由这些JVM开头的方法决定的,而这几个方法的具体实现都在hotspot\src\share\vm\prims\jvm.cpp文件中,而RegisterNatives我目前的理解其实类似一个方法表,从Java方法到native方法的一个映射,具体的原理后面再研究。

初始化Thread对象

其实就是一些赋值,名字、线程ID这些,这两个变量都是static,用synchronized修饰,保证线程安全性。

    public Thread() {
        //nextThreadNum就是变量的自增,用synchronized修饰保证可见性
        init(null, null, "Thread-" + nextThreadNum(), 0);
    }
    
        private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize) {
        init(g, target, name, stackSize, null);
    }
    
        private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc) {
        if (name == null) {
            throw new NullPointerException("name cannot be null");
        }

        this.name = name;

        Thread parent = currentThread();
        // 安全相关的一坨东西....

        /* Stash the specified stack size in case the VM cares */
        this.stackSize = stackSize;

        /* Set thread ID */
        tid = nextThreadID();
    }
    
    
    private static synchronized long nextThreadID() {
        return ++threadSeqNumber;
    }

创建并启动线程

    public synchronized void start() {
        if (threadStatus != 0)
            throw new IllegalThreadStateException();
        group.add(this);

        boolean started = false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }

这里start0()是个native方法,对应jvm.cpp中的JVM_StartThread,我们看到很多方法都是JVM_ENTRY开头,JVM_END结尾,类似于{}的作用,这里是将很多公共的操作封装到了JVM_ENTRY里面.

JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_StartThread");
  JavaThread *native_thread = NULL;

  // We cannot hold the Threads_lock when we throw an exception,
  // due to rank ordering issues. Example:  we might need to grab the
  // Heap_lock while we construct the exception.
  bool throw_illegal_thread_state = false;

  // We must release the Threads_lock before we can post a jvmti event
  // in Thread::start.
  {
    // 加锁
    MutexLocker mu(Threads_lock);
    
    // 自从JDK 5之后 java.lang.Thread#threadStatus可以用来阻止重启一个已经启动
    // 的线程,所以这里的JavaThread通常为空。然而对于一个和JNI关联的线程来说,在线程
    // 被创建和更新他的threadStatus之前会有一个小窗口,因此必须检查这种情况
    if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
      throw_illegal_thread_state = true;
    } else {
      // We could also check the stillborn flag to see if this thread was already stopped, but
      // for historical reasons we let the thread detect that itself when it starts running

      jlong size =
             java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
      // 为C++线程结构体分配内存并创建native线程。从java取出的stack size是有符号的,因此这里
      // 需要进行一次转换,避免传入负数导致创建一个非常大的栈。
      size_t sz = size > 0 ? (size_t) size : 0;
      native_thread = new JavaThread(&thread_entry, sz);

      // At this point it may be possible that no osthread was created for the
      // JavaThread due to lack of memory. Check for this situation and throw
      // an exception if necessary. Eventually we may want to change this so
      // that we only grab the lock if the thread was created successfully -
      // then we can also do this check and throw the exception in the
      // JavaThread constructor.
      if (native_thread->osthread() != NULL) {
        // Note: the current thread is not being used within "prepare".
        native_thread->prepare(jthread);
      }
    }
  }

  if (throw_illegal_thread_state) {
    THROW(vmSymbols::java_lang_IllegalThreadStateException());
  }

  assert(native_thread != NULL, "Starting null thread?");

  if (native_thread->osthread() == NULL) {
    // No one should hold a reference to the 'native_thread'.
    delete native_thread;
    if (JvmtiExport::should_post_resource_exhausted()) {
      JvmtiExport::post_resource_exhausted(
        JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
        "unable to create new native thread");
    }
    THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
              "unable to create new native thread");
  }

  Thread::start(native_thread);

JVM_END

基本上这里就是先加锁,做些检查,然后创建JavaThread,如果创建成功的话会调用prepare(),然后是一些异常处理,没有异常的话最后会启动线程,那么下面我们先来看看JavaThread是如何被创建的。

JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
  Thread()
#if INCLUDE_ALL_GCS
  , _satb_mark_queue(&_satb_mark_queue_set),
  _dirty_card_queue(&_dirty_card_queue_set)
#endif // INCLUDE_ALL_GCS
{
  if (TraceThreadEvents) {
    tty->print_cr("creating thread %p", this);
  }
  initialize();//这个方法其实就是一堆变量的初始化,不是Null就是0.
  _jni_attach_state = _not_attaching_via_jni;
  set_entry_point(entry_point);
  // Create the native thread itself.
  // %note runtime_23
  os::ThreadType thr_type = os::java_thread;
  // 根据传进来的entry_point判断要创建的线程的类型。
  thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
                                                     os::java_thread;
  os::create_thread(this, thr_type, stack_sz);
  // _osthread可能是Null,因此我们耗尽了内存(太多的活跃线程)。我们需要抛出OOM,然而不能在这做,因为调用者可能
  // 还持有锁,而所有的锁都必须在抛出异常之前被释放。
  // 代码执行到这,线程还是suspended状态,因为线程必须被创建者直接启动。
}

void JavaThread::initialize() {
  // Initialize fields
  // ...
  set_thread_state(_thread_new); // 线程的初始状态
 // ...

}

JavaThreadState记录了线程记录了线程正在执行的代码在哪一部分,这个信息可能会被安全点使用到(GC),最核心的有四种:

  1. _thread_new 刚开始启动,但还没执行初始化代码,更可能还在OS初始化的层面
  2. _thread_in_native 在native代码中
  3. _thread_in_vm 在vm中执行
  4. _thread_in_Java 执行在解释或者编译后的Java代码中

每个状态都会对应一个中间的转换状态,这些额外的中间状态使得安全点的代码能够更快的处理某一线程状态而不用挂起线程。

enum JavaThreadState {
  _thread_uninitialized     =  0, // should never happen (missing initialization)
  _thread_new               =  2, // just starting up, i.e., in process of being initialized
  _thread_new_trans         =  3, // corresponding transition state (not used, included for completness)
  _thread_in_native         =  4, // running in native code
  _thread_in_native_trans   =  5, // corresponding transition state
  _thread_in_vm             =  6, // running in VM
  _thread_in_vm_trans       =  7, // corresponding transition state
  _thread_in_Java           =  8, // running in Java or in stub code
  _thread_in_Java_trans     =  9, // corresponding transition state (not used, included for completness)
  _thread_blocked           = 10, // blocked in vm
  _thread_blocked_trans     = 11, // corresponding transition state
  _thread_max_state         = 12  // maximum thread state+1 - used for statistics allocation
};

我们看到 os::create_thread(this, thr_type, stack_sz);这行代码会去实际的创建线程,首先我们知道Java宣传的是一次编译,到处运行,那么究竟是怎么做到在不同的CPU、操作系统上还能够保持良好的可移植性呢?

// 平台相关的东东
#ifdef TARGET_OS_FAMILY_linux
# include "os_linux.hpp"
# include "os_posix.hpp"
#endif
#ifdef TARGET_OS_FAMILY_solaris
# include "os_solaris.hpp"
# include "os_posix.hpp"
#endif
#ifdef TARGET_OS_FAMILY_windows
# include "os_windows.hpp"
#endif
#ifdef TARGET_OS_FAMILY_aix
# include "os_aix.hpp"
# include "os_posix.hpp"
#endif
#ifdef TARGET_OS_FAMILY_bsd
# include "os_posix.hpp"
# include "os_bsd.hpp"
#endif
#ifdef TARGET_OS_ARCH_linux_x86
# include "os_linux_x86.hpp"
#endif
#ifdef TARGET_OS_ARCH_linux_sparc
# include "os_linux_sparc.hpp"
#endif
#ifdef TARGET_OS_ARCH_linux_zero
# include "os_linux_zero.hpp"
#endif
#ifdef TARGET_OS_ARCH_solaris_x86
# include "os_solaris_x86.hpp"
#endif
#ifdef TARGET_OS_ARCH_solaris_sparc
# include "os_solaris_sparc.hpp"
#endif
#ifdef TARGET_OS_ARCH_windows_x86
# include "os_windows_x86.hpp"
#endif
#ifdef TARGET_OS_ARCH_linux_arm
# include "os_linux_arm.hpp"
#endif
#ifdef TARGET_OS_ARCH_linux_ppc
# include "os_linux_ppc.hpp"
#endif
#ifdef TARGET_OS_ARCH_aix_ppc
# include "os_aix_ppc.hpp"
#endif
#ifdef TARGET_OS_ARCH_bsd_x86
# include "os_bsd_x86.hpp"
#endif
#ifdef TARGET_OS_ARCH_bsd_zero
# include "os_bsd_zero.hpp"
#endif

我们看到os.hpp中有这样一段代码,能够根据不同的操作系统选择include不同的头文件,从而将平台相关的逻辑封装到对应的库文件中,我们这里以linux为例,create_thread最终会调用os_linux.cpp中的create_thread方法。

bool os::create_thread(Thread* thread, ThreadType thr_type, size_t stack_size) {
  assert(thread->osthread() == NULL, "caller responsible");

  // Allocate the OSThread object
  OSThread* osthread = new OSThread(NULL, NULL);
  if (osthread == NULL) {
    return false;
  }

  // set the correct thread state
  osthread->set_thread_type(thr_type);

  // 初始状态为ALLOCATED,而不是INITIALIZED
  osthread->set_state(ALLOCATED);

  thread->set_osthread(osthread);

  // init thread attributes
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

  // stack size
  if (os::Linux::supports_variable_stack_size()) {
    // 如果上层未传递则计算stack_size
    if (stack_size == 0) {
      //如果为compiler_thread,则分配4M,否则默认会分配1M
      stack_size = os::Linux::default_stack_size(thr_type);

      switch (thr_type) {
      case os::java_thread:
        // Java线程用ThreadStackSize,这个值可以通过-Xss指定
        assert (JavaThread::stack_size_at_create() > 0, "this should be set");
        stack_size = JavaThread::stack_size_at_create();
        break;
      case os::compiler_thread:
        if (CompilerThreadStackSize > 0) {
          stack_size = (size_t)(CompilerThreadStackSize * K);
          break;
        } // else fall through:
          // use VMThreadStackSize if CompilerThreadStackSize is not defined
      case os::vm_thread:
      case os::pgc_thread:
      case os::cgc_thread:
      case os::watcher_thread:
        if (VMThreadStackSize > 0) stack_size = (size_t)(VMThreadStackSize * K);
        break;
      }
    }
        
    // 用两者较大的那个,min_stack_allowed默认为128K
    stack_size = MAX2(stack_size, os::Linux::min_stack_allowed);
    pthread_attr_setstacksize(&attr, stack_size);
  } else {
    // let pthread_create() pick the default value.
  }

  // glibc guard page
  pthread_attr_setguardsize(&attr, os::Linux::default_guard_size(thr_type));

  ThreadState state;

  {
    // 检查是否需要加锁
    bool lock = os::Linux::is_LinuxThreads() && !os::Linux::is_floating_stack();
    if (lock) {
      os::Linux::createThread_lock()->lock_without_safepoint_check();
    }

    pthread_t tid;
    // Linux用于创建线程的函数,这个线程通过执行java_start来启动,其中thread是作为java_start的参数传递进来的
    // 具体可见手册:http://man7.org/linux/man-pages/man3/pthread_create.3.html
    int ret = pthread_create(&tid, &attr, (void* (*)(void*)) java_start, thread);

    pthread_attr_destroy(&attr);

    if (ret != 0) {
      // 创建失败,将_osthread置为空,还记得在jvm.cpp的JVM_StartThread中会根据_osthread是否为空来判断
      // 是否创建成功
      if (PrintMiscellaneous && (Verbose || WizardMode)) {
        perror("pthread_create()");
      }
      // 清理资源,并解锁
      thread->set_osthread(NULL);
      delete osthread;
      if (lock) os::Linux::createThread_lock()->unlock();
      return false;
    }

    // 创建成功会将底层线程的ID保存在tid中
    osthread->set_pthread_id(tid);

    // 等待子线程创建完成或者终止
    {
      Monitor* sync_with_child = osthread->startThread_lock();
      MutexLockerEx ml(sync_with_child, Mutex::_no_safepoint_check_flag);
      while ((state = osthread->get_state()) == ALLOCATED) {
        sync_with_child->wait(Mutex::_no_safepoint_check_flag);
      }
    }

    if (lock) {
      os::Linux::createThread_lock()->unlock();
    }
  }

  // 线程的数目达到极限了
  if (state == ZOMBIE) {
      thread->set_osthread(NULL);
      delete osthread;
      return false;
  }

  // The thread is returned suspended (in state INITIALIZED),
  // and is started higher up in the call chain
  assert(state == INITIALIZED, "race condition");
  return true;
}

下面我们来看看pthread_create会执行的回调函数java_start,这个方法是所有新创建的线程必走的流程。

static void *java_start(Thread *thread) {
  // 尝试随机化热栈帧高速缓存行的索引,这有助于优化拥有相同栈帧线程去互相驱逐彼此的缓存行时,线程
  // 可以是同一个JVM实例或者不同的JVM实例,这尤其有助于拥有超线程技术的处理器。
  static int counter = 0;
  int pid = os::current_process_id();
  alloca(((pid ^ counter++) & 7) * 128);

  ThreadLocalStorage::set_thread(thread);

  OSThread* osthread = thread->osthread();
  Monitor* sync = osthread->startThread_lock();

  // non floating stack LinuxThreads needs extra check, see above
  if (!_thread_safety_check(thread)) {
    // notify parent thread
    MutexLockerEx ml(sync, Mutex::_no_safepoint_check_flag);
    osthread->set_state(ZOMBIE);
    sync->notify_all();
    return NULL;
  }

  // thread_id is kernel thread id (similar to Solaris LWP id)
  osthread->set_thread_id(os::Linux::gettid());

  if (UseNUMA) {
    int lgrp_id = os::numa_get_group_id();
    if (lgrp_id != -1) {
      thread->set_lgrp_id(lgrp_id);
    }
  }
  // initialize signal mask for this thread
  os::Linux::hotspot_sigmask(thread);

  // initialize floating point control register
  os::Linux::init_thread_fpu_state();

  // handshaking with parent thread
  {
    MutexLockerEx ml(sync, Mutex::_no_safepoint_check_flag);

    // 设置为已经初始化完成,并notify父线程
    osthread->set_state(INITIALIZED);
    sync->notify_all();

    // wait until os::start_thread()
    while (osthread->get_state() == INITIALIZED) {
      sync->wait(Mutex::_no_safepoint_check_flag);
    }
  }
    
    
  //这里上层传递过来的是JavaThread,因此会调用JavaThread#run()方法  
  thread->run();

  return 0;
}


void JavaThread::run() {
  // 初始化本地线程分配缓存(TLAB)相关的属性
  this->initialize_tlab();

  // used to test validitity of stack trace backs
  this->record_base_of_stack_pointer();

  // Record real stack base and size.
  this->record_stack_base_and_size();

  // Initialize thread local storage; set before calling MutexLocker
  this->initialize_thread_local_storage();

  this->create_stack_guard_pages();

  this->cache_global_variables();

  // 将线程的状态更改为_thread_in_vm,线程已经可以被VM中的安全点相关的代码处理了,也就是说必须
  // JVM如果线程在执行native里面的代码,是搞不了安全点的,待确认
  ThreadStateTransition::transition_and_fence(this, _thread_new, _thread_in_vm);

  assert(JavaThread::current() == this, "sanity check");
  assert(!Thread::current()->owns_locks(), "sanity check");

  DTRACE_THREAD_PROBE(start, this);

  // This operation might block. We call that after all safepoint checks for a new thread has
  // been completed.
  this->set_active_handles(JNIHandleBlock::allocate_block());

  if (JvmtiExport::should_post_thread_life()) {
    JvmtiExport::post_thread_start(this);
  }

  EventThreadStart event;
  if (event.should_commit()) {
     event.set_javalangthread(java_lang_Thread::thread_id(this->threadObj()));
     event.commit();
  }

  // We call another function to do the rest so we are sure that the stack addresses used
  // from there will be lower than the stack base just computed
  thread_main_inner();

  // Note, thread is no longer valid at this point!
}

void JavaThread::thread_main_inner() {
  assert(JavaThread::current() == this, "sanity check");
  assert(this->threadObj() != NULL, "just checking");

  // Execute thread entry point unless this thread has a pending exception
  // or has been stopped before starting.
  // Note: Due to JVM_StopThread we can have pending exceptions already!
  if (!this->has_pending_exception() &&
      !java_lang_Thread::is_stillborn(this->threadObj())) {
    {
      ResourceMark rm(this);
      this->set_native_thread_name(this->get_thread_name());
    }
    HandleMark hm(this);
    // 这个entry_point就是JVM_StartThread中传递过来的那个,也就是thread_entry
    this->entry_point()(this, this);
  }

  DTRACE_THREAD_PROBE(stop, this);

  this->exit(false);
  delete this;
}

我们最后再看thread_entry的代码

static void thread_entry(JavaThread* thread, TRAPS) {
  HandleMark hm(THREAD);
  Handle obj(THREAD, thread->threadObj());
  JavaValue result(T_VOID);
  JavaCalls::call_virtual(&result,
                          obj,
                          KlassHandle(THREAD, SystemDictionary::Thread_klass()),
                          vmSymbols::run_method_name(),
                          vmSymbols::void_method_signature(),
                          THREAD);
}

vmSymbols,这个是JVM对于那些需要特殊处理的类、方法等的声明,我的理解就是一个方法表,根据下面这行代码可以看出来,其实调用的就是run()方法.

  /* common method and field names */                                                            
  template(run_method_name,                           "run")                                  

然后我们回到JVM_StartThread方法中,这里会接着调用prepare()方法,设置线程优先级(将Java中的优先级映射到os中),然后添加到线程队列中去.最后会调用Thread::start(native_thread);
启动线程。

void Thread::start(Thread* thread) {
  trace("start", thread);
  // start和resume不一样,start被synchronized修饰
  if (!DisableStartThread) {
    if (thread->is_Java_thread()) {
     // 在启动线程之前初始化线程的状态为RUNNABLE,为啥不能在之后设置呢?因为启动之后可能是
     //  等待或者睡眠等其他状态,具体是什么我们不知道
      java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(),
                                          java_lang_Thread::RUNNABLE);
    }
    os::start_thread(thread);
  }
}

总结

  1. 一个Java线程对应一个JavaThread->OSThread -> Native Thread
  2. 在调用java.lang.Thread#start()方法之前不会启动线程,仅仅调用run()方法只是会在当前线程运行而已
  3. //todo

Flag Counter

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions