Skip to content

FastThreadLocal

image

对别

JDKNetty
ThreadFastThreadLocalThread
ThreadLocalFastThreadLocal
ThreadLocalMapInternalThreadLocalMap

区别

区别ThreadLocalFastThreadLocal
存储结构Map<WeakReference<ThreadLocal<?>,Object>>Object[]​ 数组下标作为索引, Object​ 存储Value
内存占用重复对象可以进行覆盖每次创建会新建下标,不会利用被删除的位置,数组只会扩容,无法缩容
性能可能产生hash碰撞,线性探测法在解决 Hash 冲突时需要不停地向下寻找,效率较低定位数据的时候可以直接根据数组下标 index 获取,时间复杂度 O(1)
回收手动调用Remove 进行回收1. 自动,执行一个被FastThreadLocalRunnable wrap的Runnable任务,在任务执行完毕后会自动进行FastThreadLocal的清理
2. 手动,FastThreadLocal和InternalThreadLocalMap都提供了remove方法,在合适的时候用户可以(有的时候也是必须,例如普通线程的线程池使用FastThreadLocal)手动进行调用,进行显示删除
3. 自动,为当前线程的每一个FastThreadLocal注册一个Cleaner,当线程对象不强可达的时候,该Cleaner线程会将当前线程的当前ftl进行回收

源码分析

构造方法

public class FastThreadLocal<V> {
    // FastThreadLocal中的index是记录了该它维护的数据应该存储的位置
    // InternalThreadLocalMap数组中的下标, 它是在构造函数中确定的
    private final int index;
 
    public InternalThreadLocal() {
        index = InternalThreadLocalMap.nextVariableIndex();
    }
    // 省略其他代码
}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    // 自增索引, ⽤于计算下次存储到Object数组中的位置
    private static final AtomicInteger nextIndex = new AtomicInteger();
 
    private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8;
 
    public static int nextVariableIndex() {
        int index = nextIndex.getAndIncrement();
        if (index >= ARRAY_LIST_CAPACITY_MAX_SIZE || index < 0) {
            nextIndex.set(ARRAY_LIST_CAPACITY_MAX_SIZE);
            throw new IllegalStateException("too many thread-local indexed variables");
        }
        return index;
    }
    // 省略其他代码
}

上面这两段代码在Netty FastThreadLocal介绍中已经讲解过,这边就不再重复介绍了。

get 方法

public class FastThreadLocal<V> {
    // FastThreadLocal中的index是记录了该它维护的数据应该存储的位置
    private final int index;
 
    public final V get() {
        // 获取当前线程的InternalThreadLocalMap
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        // 根据当前线程的index从InternalThreadLocalMap中获取其绑定的数据
        Object v = threadLocalMap.indexedVariable(index);
        // 如果获取当前线程绑定的数据不为缺省值UNSET,则直接返回;否则进行初始化
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }
 
        return initialize(threadLocalMap);
    }
    // 省略其他代码
}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32;
 
    // 未赋值的Object变量(缺省值),当⼀个与线程绑定的值被删除之后,会被设置为UNSET
    public static final Object UNSET = new Object();
 
    // 存储绑定到当前线程的数据的数组
    private Object[] indexedVariables;
 
    // slowThreadLocalMap为JDK ThreadLocal存储InternalThreadLocalMap
    private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =
            new ThreadLocal<InternalThreadLocalMap>();
 
    // 从绑定到当前线程的数据的数组中取出index位置的元素
    public Object indexedVariable(int index) {
        Object[] lookup = indexedVariables;
        return index < lookup.length? lookup[index] : UNSET;
    }
 
    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        // 判断当前线程是否是FastThreadLocalThread类型
        if (thread instanceof FastThreadLocalThread) {
            return fastGet((FastThreadLocalThread) thread);
        } else {
            return slowGet();
        }
    }
 
    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        // 直接获取当前线程的InternalThreadLocalMap
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        // 如果当前线程的InternalThreadLocalMap还未创建,则创建并赋值
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }
 
    private static InternalThreadLocalMap slowGet() {
        // 使用JDK ThreadLocal获取InternalThreadLocalMap
        InternalThreadLocalMap ret = slowThreadLocalMap.get();
        if (ret == null) {
            ret = new InternalThreadLocalMap();
            slowThreadLocalMap.set(ret);
        }
        return ret;
    }
 
    private InternalThreadLocalMap() {
        indexedVariables = newIndexedVariableTable();
    }
 
    // 初始化一个32位长度的Object数组,并将其元素全部设置为缺省值UNSET
    private static Object[] newIndexedVariableTable() {
        Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];
        Arrays.fill(array, UNSET);
        return array;
    }
    // 省略其他代码
}

源码中 **get() ** 方法主要分为下面3个步骤处理:

通过InternalThreadLocalMap.get()方法获取当前线程的InternalThreadLocalMap。


根据当前线程的index 从InternalThreadLocalMap中获取其绑定的数据。


如果不是缺省值UNSET,直接返回;如果是缺省值,则执行initialize方法进行初始化。

下面我们继续分析一下

InternalThreadLocalMap.get() 方法的实现逻辑。

首先判断当前线程是否是FastThreadLocalThread类型,如果是FastThreadLocalThread
类型则直接使用fastGet方法获取InternalThreadLocalMap,如果不是FastThreadLocalThread类型则使用slowGet方法获取InternalThreadLocalMap兜底处理。


兜底处理中的slowGet方法会退化成JDK原生的ThreadLocal获取InternalThreadLocalMap。


获取InternalThreadLocalMap时,如果为null,则会直接创建一个InternalThreadLocalMap返回。其创建过过程中初始化一个32位长度的Object数组,并将其元素全部设置为缺省值UNSET。

set 方法

public class FastThreadLocal<V> {
    // FastThreadLocal初始化时variablesToRemoveIndex被赋值为0
    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
 
    public final void set(V value) {
        // 判断value值是否是未赋值的Object变量(缺省值)
        if (value != InternalThreadLocalMap.UNSET) {
            // 获取当前线程对应的InternalThreadLocalMap
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            // 将InternalThreadLocalMap中数据替换为新的value
            // 并将FastThreadLocal对象保存到待清理的Set中
            setKnownNotUnset(threadLocalMap, value);
        } else {
            remove();
        }
    }
 
    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
        // 将InternalThreadLocalMap中数据替换为新的value
        if (threadLocalMap.setIndexedVariable(index, value)) {
            // 并将当前的FastThreadLocal对象保存到待清理的Set中
            addToVariablesToRemove(threadLocalMap, this);
        }
    }
 
    private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
        // 取下标index为0的数据,用于存储待清理的FastThreadLocal对象Set集合中
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        Set<FastThreadLocal<?>> variablesToRemove;
        if (v == InternalThreadLocalMap.UNSET || v == null) {
            // 下标index为0的数据为空,则创建FastThreadLocal对象Set集合
            variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
            // 将InternalThreadLocalMap中下标为0的数据,设置成FastThreadLocal对象Set集合
            threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
        } else {
            variablesToRemove = (Set<FastThreadLocal<?>>) v;
        }
        // 将FastThreadLocal对象保存到待清理的Set中
        variablesToRemove.add(variable);
    }
    // 省略其他代码
}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    // 未赋值的Object变量(缺省值),当⼀个与线程绑定的值被删除之后,会被设置为UNSET
    public static final Object UNSET = new Object();
    // 存储绑定到当前线程的数据的数组
    private Object[] indexedVariables;
    // 绑定到当前线程的数据的数组能再次采用x2扩容的最大量
    private static final int ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD = 1 << 30;
    private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8;
 
    // 将InternalThreadLocalMap中数据替换为新的value
    public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object oldValue = lookup[index];
            // 直接将数组 index 位置设置为 value,时间复杂度为 O(1)
            lookup[index] = value;
            return oldValue == UNSET;
        } else { // 绑定到当前线程的数据的数组需要扩容,则扩容数组并数组设置新value
            expandIndexedVariableTableAndSet(index, value);
            return true;
        }
    }
 
    private void expandIndexedVariableTableAndSet(int index, Object value) {
        Object[] oldArray = indexedVariables;
        final int oldCapacity = oldArray.length;
        int newCapacity;
        // 判断可进行x2方式进行扩容
        if (index < ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD) {
            newCapacity = index;
            // 位操作,提升扩容效率
            newCapacity |= newCapacity >>>  1;
            newCapacity |= newCapacity >>>  2;
            newCapacity |= newCapacity >>>  4;
            newCapacity |= newCapacity >>>  8;
            newCapacity |= newCapacity >>> 16;
            newCapacity ++;
        } else { // 不支持x2方式扩容,则设置绑定到当前线程的数据的数组容量为最大值
            newCapacity = ARRAY_LIST_CAPACITY_MAX_SIZE;
        }
        // 按扩容后的大小创建新数组,并将老数组数据copy到新数组
        Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
        // 新数组扩容后的部分赋UNSET缺省值
        Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
        // 新数组的index位置替换成新的value
        newArray[index] = value;
        // 绑定到当前线程的数据的数组用新数组替换
        indexedVariables = newArray;
    }
    // 省略其他代码
}

源码中 set() 方法主要分为下面3个步骤处理:

判断value是否是缺省值UNSET,如果value不等于缺省值,则会通过InternalThreadLocalMap.get()方法获取当前线程的InternalThreadLocalMap,具体实现get()方法已做讲解。


通过FastThreadLocal中的setKnownNotUnset()方法将InternalThreadLocalMap中数据替换为新的value,并将当前的FastThreadLocal对象保存到待清理的Set中。


如果等于缺省值UNSET或null(else的逻辑),会调用remove()方法,remove()具体见后面的代码分析。

接下来我们看下

InternalThreadLocalMap.setIndexedVariable方法的实现逻辑。

判断index是否超出存储绑定到当前线程的数据的数组indexedVariables的长度,如果没有超出,则获取index位置的数据,并将该数组index位置数据设置新value。

如果超出了,绑定到当前线程的数据的数组需要扩容,则扩容该数组并将它index位置的数据设置新value。

扩容数组以index 为基准进行扩容,将数组扩容后的容量向上取整为 2 的次幂。然后将原数组内容拷贝到新的数组中,空余部分填充缺省值UNSET,最终把新数组赋值给 indexedVariables。

下面我们再继续看下

FastThreadLocal.addToVariablesToRemove方法的实现逻辑。

1.取下标index为0的数据(用于存储待清理的FastThreadLocal对象Set集合中),如果该数据是缺省值UNSET或null,则会创建FastThreadLocal对象Set集合,并将该Set集合填充到下标index为0的数组位置。

2.如果该数据不是缺省值UNSET,说明Set集合已金被填充,直接强转获取该Set集合。

3.最后将FastThreadLocal对象保存到待清理的Set集合中。

remove、removeAll方法

public class FastThreadLocal<V> {
    // FastThreadLocal初始化时variablesToRemoveIndex被赋值为0
    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
 
    public final void remove() {
        // 获取当前线程的InternalThreadLocalMap
        // 删除当前的FastThreadLocal对象及其维护的数据
        remove(InternalThreadLocalMap.getIfSet());
    }
 
    public final void remove(InternalThreadLocalMap threadLocalMap) {
        if (threadLocalMap == null) {
            return;
        }
 
        // 根据当前线程的index,并将该数组下标index位置对应的值设置为缺省值UNSET
        Object v = threadLocalMap.removeIndexedVariable(index);
        // 存储待清理的FastThreadLocal对象Set集合中删除当前FastThreadLocal对象
        removeFromVariablesToRemove(threadLocalMap, this);
 
        if (v != InternalThreadLocalMap.UNSET) {
            try {
                // 空方法,用户可以继承实现
                onRemoval((V) v);
            } catch (Exception e) {
                PlatformDependent.throwException(e);
            }
        }
    }
 
    public static void removeAll() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
        if (threadLocalMap == null) {
            return;
        }
 
        try {
            // 取下标index为0的数据,用于存储待清理的FastThreadLocal对象Set集合中
            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
            if (v != null && v != InternalThreadLocalMap.UNSET) {
                @SuppressWarnings("unchecked")
                Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
                // 遍历所有的FastThreadLocal对象并删除它们以及它们维护的数据
                FastThreadLocal<?>[] variablesToRemoveArray =
                        variablesToRemove.toArray(new FastThreadLocal[0]);
                for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                    tlv.remove(threadLocalMap);
                }
            }
        } finally {
            // 删除InternalThreadLocalMap中threadLocalMap和slowThreadLocalMap数据
            InternalThreadLocalMap.remove();
        }
    }
 
    private static void removeFromVariablesToRemove(
            InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
        // 取下标index为0的数据,用于存储待清理的FastThreadLocal对象Set集合中
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
 
        if (v == InternalThreadLocalMap.UNSET || v == null) {
            return;
        }
 
        @SuppressWarnings("unchecked")
        // 存储待清理的FastThreadLocal对象Set集合中删除该FastThreadLocal对象
        Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
        variablesToRemove.remove(variable);
    }
 
    // 省略其他代码
}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
 
    // 根据当前线程获取InternalThreadLocalMap
       public static InternalThreadLocalMap getIfSet() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return ((FastThreadLocalThread) thread).threadLocalMap();
        }
        return slowThreadLocalMap.get();
    }
 
    // 数组下标index位置对应的值设置为缺省值UNSET
    public Object removeIndexedVariable(int index) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object v = lookup[index];
            lookup[index] = UNSET;
            return v;
        } else {
            return UNSET;
        }
    }
 
    // 删除threadLocalMap和slowThreadLocalMap数据
    public static void remove() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            ((FastThreadLocalThread) thread).setThreadLocalMap(null);
        } else {
            slowThreadLocalMap.remove();
        }
    }
    // 省略其他代码
}

源码中 remove() 方法主要分为下面2个步骤处理:

通过InternalThreadLocalMap.getIfSet()获取当前线程的InternalThreadLocalMap。具体和3.2小节get()方法里面获取当前线程的InternalThreadLocalMap相似,这里就不再重复介绍了。


删除当前的FastThreadLocal对象及其维护的数据。

源码中 **removeAll() ** 方法主要分为下面3个步骤处理:

通过InternalThreadLocalMap.getIfSet()获取当前线程的InternalThreadLocalMap。


取下标index为0的数据(用于存储待清理的FastThreadLocal对象Set集合),然后遍历所有的FastThreadLocal对象并删除它们以及它们维护的数据。


最后会将InternalThreadLocalMap本身从线程中移除。

Q&A

为什么 InternalThreadLocalMap 要在数组下标为 0 的位置存放一个 FastThreadLocal 类型的 Set 集合

  • 删除 FastThreadLocal 留扩展接口。
  • 提高 removeAll 的删除效率,不需要去遍历膨胀的数组。
  • 可以更好地做内存泄露的管理