天天看点

Thread、ThreadLocal源码解析

今天来看一下Thread和ThreadLocal类的源码。

一、Thread

(1)首先看一下线程的构造方法,之后会说每种参数的用法,而所有的构造函数都会指向init方法

//空构造创建一个线程
Thread() 
//传入Runnable对象创建一个线程
Thread(Runnable target) 
//传入Runnable对象和线程名创建一个线程
Thread(Runnable target, String name) 
//传入线程名创建一个线程
Thread(String name) 
//传入线程组和Runnable对象创建一个线程
Thread(ThreadGroup group, Runnable target) 
//传入线程组、Runnable对象和线程名创建一个线程
Thread(ThreadGroup group, Runnable target, String name) 
//传入线程组、Runnable对象、线程名和栈大小创建一个线程
Thread(ThreadGroup group, Runnable target, String name, long stackSize) 
//传入线程组和线程名创建一个线程
Thread(ThreadGroup group, String name)           

复制

Thread、ThreadLocal源码解析

(2)常用构造

最常用的构造方法是下面这个,其默认有一个生成线程名的函数。

Thread(Runnable target)            

复制

public Thread(Runnable target) {
    init(null, target, "Thread-" + nextThreadNum(), 0);
}
private static int threadInitNumber;
private static synchronized int nextThreadNum() {
    return threadInitNumber++;
}           

复制

(3)线程的初始化方法

/**
 * @param g 线程组,每个线程都会属于一个线程组
 * @param target 线程会执行Runnable对象的run方法  
 * @param name 线程名 
 * @param stackSize 栈深度,默认是0
 * @param acc 获取上下文,其实是获取classloader    
 * @param inheritThreadLocals 是否从继承的本地线程的值用于构造线程 
 */
private void init(ThreadGroup g, Runnable target, String name,
                  long stackSize, AccessControlContext acc,
                  boolean inheritThreadLocals) {
    if (name == null) {
        throw new NullPointerException("name cannot be null");
    }
    this.name = name;
    //父线程,比如在Main方法中启动一个线程,就是Main线程
    Thread parent = currentThread();
    g.addUnstarted();
    this.group = g;
    //是否为守护线程,线程分为用户线程和守护线程,只要还有一个用户
    //线程在跑,守护线程就不会挂掉。如果没有用户线程了,守护线程
    //会和JVM一起退出        
    this.daemon = parent.isDaemon();
    //线程优先级
    this.priority = parent.getPriority();
    //获取上下文
    if (security == null || isCCLOverridden(parent.getClass()))
        this.contextClassLoader = parent.getContextClassLoader();
    else
        this.contextClassLoader = parent.contextClassLoader;
    this.inheritedAccessControlContext =
            acc != null ? acc : AccessController.getContext();
    this.target = target;
    setPriority(priority);
    if (inheritThreadLocals && parent.inheritableThreadLocals != null)
        this.inheritableThreadLocals =
            ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
    //设置线程的栈深度 
    this.stackSize = stackSize;

    /* Set thread ID */
    tid = nextThreadID();
}           

复制

(4)线程的start方法

【注意】只可执行一次start方法,不可重复执行

//threadStatus是用volatile修饰的,保证内存可见性
private volatile int threadStatus = 0;
public synchronized void start() {
    //threadStatus是线程状态,一开始是0,启动一次之后就是其他值
    if (threadStatus != 0)
        throw new IllegalThreadStateException();
        
    group.add(this);

    boolean started = false;
    try {
        //调用本地方法,启动线程
        start0();
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
            /* do nothing. If start0 threw a Throwable then
              it will be passed up the call stack */
        }
    }
}           

复制

启动两次的案例:

Thread、ThreadLocal源码解析

第一次调用start方法:

Thread、ThreadLocal源码解析

第二次调用start方法:此时就会抛出IllegalThreadStateException

复制

Thread、ThreadLocal源码解析

(5)activeCount()方法

//获取当前线程组存活线程数量
public static int activeCount() {
    return currentThread().getThreadGroup().activeCount();
}           

复制

可以在主线程中用来等待其他线程完成,例如

//大于2是因为后台有一个Main线程和GC线程
while (Thread.activeCount()>2) {
    //表示当前线程让出资源,给其它线程
    Thread.yield();
}           

复制

(6)interrupt、interrupted、isInterruped方法

//interrupt仅仅是将线程的标记为中断,并不会真的中断线程 
 public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }           

复制

//而isInterrupted是判断线程是否已被中断
public boolean isInterrupted() {
    return isInterrupted(false);
}
private native boolean isInterrupted(boolean ClearInterrupted);           

复制

当用isInterrupted返回为true的时候,可以用来中断线程

//此方法返回线程是否有“中断”标记,调用之后线程的“中断”标记会被去掉
public static boolean interrupted() {
    return currentThread().isInterrupted(true);
}           

复制

下面是中断线程的例子:

Thread a = new Thread(new Runnable() {
      @Override
      public void run() {
          if (Thread.currentThread().isInterrupted()) {
              System.out.println("【"+Thread.currentThread().getName()+"】当前线程数量"+Thread.activeCount());
              System.out.println("线程被中断");
              return;
          }
          while (true) {
          }
      }
  },"a");
  a.start();
  a.interrupt();

  try {
      Thread.sleep(3000);
  } catch (InterruptedException e) {
      e.printStackTrace();
  }

  System.out.println("【Main】当前线程数量"+Thread.activeCount());           

复制

运行结果:

Thread、ThreadLocal源码解析

(7)线程状态

public enum State {
        //刚创建线程还未调用start方法
        NEW,
        //调用start方法后,正在执行run方法
        RUNNABLE,
        //阻塞,等待其他线程释放监视器
        BLOCKED,
        //等待,等待其他线程调用notify或notifyAll方法
        WAITING,
        //等待另一个线程执行动作达到指定等待时间的线程处于此状态。 
        TIMED_WAITING,
        //已退出
        TERMINATED;
    }           

复制

二、ThreadLocal

此类里面可携带私有变量,这个变量是私有的,其他线程不可见。以下是两种创建方法并且设置值。

ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(new Supplier<Integer>() {
      @Override
      public Integer get() {
          return 1;//设置私有变量值 
      }
  });           

复制

ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
//设置私有变量值
threadLocal.set(1);           

复制

上面是设置值,那如何获取值呢?

//获取私有变量值
threadLocal.get()           

复制

如何从内存中删除这个ThreadLocal呢?

//将当前ThreadLocal变量从内存用删除 
threadLocal.remove();           

复制

删除之后再次获取就是为null。

(1)从设置值看底层结构

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}           

复制

可以看到,底层是一个ThreadLocalMap类,继续点进去ThreadLocalMap,其结构如下

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
      table = new Entry[INITIAL_CAPACITY];
      int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
      table[i] = new Entry(firstKey, firstValue);
      size = 1;
      setThreshold(INITIAL_CAPACITY);
  }           

复制

private static final int INITIAL_CAPACITY = 16;
//初始值是10
private void setThreshold(int len) {
      threshold = len * 2 / 3;
  }
private void set(ThreadLocal<?> key, Object value) {
    Entry[] tab = table;
    int len = tab.length;
    //计算index
    int i = key.threadLocalHashCode & (len-1);
    //从下标为i的一直往下,如果不为空,则替换值
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
        if (k == key) {
            e.value = value;
            return;
        }
        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }
    //设置值
    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        //重新hash,里面有扩容的方法
        rehash();
}


private void rehash() {
    expungeStaleEntries();

    // 当元素数量大于(threshold - threshold / 4),就会进行扩容
    //,当threadhold为10的时候就是8,
    if (size >= threshold - threshold / 4)
        resize();
}

//扩容
private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    //两倍扩容
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;

    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        if (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == null) {
                e.value = null; // Help the GC
            } else {
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null)
                    h = nextIndex(h, newLen);
                newTab[h] = e;
                count++;
            }
        }
    }
    //重新设置threadhold的值
    setThreshold(newLen);
    size = count;
    table = newTab;
}           

复制

底层是一个Entry数组,初始化容量是16,扩容是两倍,那Entry是什么呢?

static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;
    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}           

复制

entry是弱引用的key-value对象,其用ThreadLocal的引用作为key,将私有变量值设置为value。之前的垃圾收集机制一篇说过弱引用对象只能存活到下一次垃圾回收前。但是注意,它被回收只会被回收掉key,也就是ThreadLocal,但是如果value在别的地方引用的话,就不会被回收,这样会造成内存溢出的情况。

例如:

jvm变量设置-Xmx10m -Xms10m           

复制

value在list中保持引用,内存溢出

Thread、ThreadLocal源码解析

value没在list中保持引用,一直运行

Thread、ThreadLocal源码解析

(2)get方法

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        //存在则返回
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    //不存在,则先将值设为NULL,然后返回
    return setInitialValue();
}


private T setInitialValue() {
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
    return value;
}           

复制

(3)remove方法

public void remove() {
     ThreadLocalMap m = getMap(Thread.currentThread());
     if (m != null)
         m.remove(this);
 }
 
 private void remove(ThreadLocal<?> key) {
      Entry[] tab = table;
      int len = tab.length;
      int i = key.threadLocalHashCode & (len-1);
      for (Entry e = tab[i];
           e != null;
           e = tab[i = nextIndex(i, len)]) {
          if (e.get() == key) {
              //清除
              e.clear();
              expungeStaleEntry(i);
              return;
          }
      }
  }

//将entry的引用置为null
public void clear() {
    this.referent = null;
}           

复制