【Java并发集合】ConcurrentHashMap源码解析基于JDK1.8

2022-12-07,,,,

concurrentHashMap(基于jdk1.8)

类注释

    所有的操作都是线程安全的,我们在使用时无需进行加锁。
    多个线程同时进行put、remove等操作时并不会阻塞,可以同时进行,而HashTable在操作时会锁住整个Map。
    在迭代过程中,即使Map及结构被修改,也不会抛出ConcurrentModificationException 。
    除了数组+链表+红黑树的基本结构外,新增了转移节点,是为了保证扩容时的线程安全的节点。
    提供了很多的Stream方法,比如:foreach、search、reduce等。

类结构

ConcurrentHashMap与HashMap结构几乎相同;都是采用数组+链表+红黑树。
两者都实现了Map接口,继承了AbstractMap。

ConcurrentHashMap原理概述

在ConcurrentHashMap中通过一个Node<K,V>[ ]数组来保存到map中的键值对,而在同一个数组位置时,通过链表和红黑树的形式来保存的。但是这个数组只有在第一次添加元素的时候才会初始化,否则只是初始化一个实例对象,只是设定了sizeCtl变量,这个变量用来判断对象的一些状态和是否需要扩容。

第一次添加元素的时候,默认初始长度是16,当往map中继续添加元素的时候,通过hash值跟数组长度来决定放在数组的哪个位置,如果出现在同一个位置的时候,优先以链表的形式存放,在同一个位置的个数大于等于8时,并且数组的长度还小于64的话,则会触发扩容机制。如果数组的长度大于等于64,链表的长度大于等于8,就会将这些节点的构成的链表转换成红黑树。

通过扩容数组的方式将这些节点给分散开。然后将这些元素复制到扩容的新数组中,同一个链表中的元素通过hash值的数组长度位来区分放在 原先的位置 or 放到新数组的相同位置。扩容完成后,如果某个节点是树,同时性质该节点的个数小于等于6,则将由红黑树转换成链表。

ConcurrentHashMap的属性

    //数组的最大容量 2^30
private static final int MAXIMUM_CAPACITY = 1 << 30; /**
* 数组长度默认是16,必须是2的幂次方
*/
private static final int DEFAULT_CAPACITY = 16; /**
* 数组可能的最大值,需要与toArray()相关方法关联
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /**
*
* 默认并发度,同时允许多少个线程访问
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16; /**
* 默认的负载因子时0.75
*
*/
private static final float LOAD_FACTOR = 0.75f; /**
* 链表转成红黑树的阈值
*/
static final int TREEIFY_THRESHOLD = 8; /**
* 红黑树转成链表的阈值
*/
static final int UNTREEIFY_THRESHOLD = 6; /**
* 链表转换成红黑树之前还会进行一次判断,只有键值对数量大于64才会发生转换
*/
static final int MIN_TREEIFY_CAPACITY = 64; /**
* Minimum number of rebinnings per transfer step. Ranges are
* subdivided to allow multiple resizer threads. This value
* serves as a lower bound to avoid resizers encountering
* excessive memory contention. The value should be at least
* DEFAULT_CAPACITY.
*/
private static final int MIN_TRANSFER_STRIDE = 16; /**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
*/
private static int RESIZE_STAMP_BITS = 16; /**
* 2^15-1,resize时最大的线程数
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; /**
* The bit shift for recording size stamp in sizeCtl.
* sizeCtl记录size大小的偏移量
RESIZE_STAMP_SHIFT = 32-16=16
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; /*
* Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // 表示正在转移
static final int TREEBIN = -2; // 表示已经转换成树,TreeBin节点
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash /** 可用的CPU的数量 */
static final int NCPU = Runtime.getRuntime().availableProcessors(); /**
* 桶数组,与HashMap基本一致,延迟加载,不过这里使用了volatile修饰
*/
transient volatile Node<K,V>[] table; /**
* 这个桶数组在扩容的时候使用
*/
private transient volatile Node<K,V>[] nextTable; /**
* 记录元素的个数,也被volatile修饰
*/
private transient volatile long baseCount; /**
* 表初始化和扩容的标志位
* 默认值是:0
* 初始化之前:初始化容量大小
* 正在初始化:-1
* 扩容前:触发扩容前的元素个数相当于HashMap的threshold,threshold = capacity*loadFactory
* 正在扩容:-(1+参与扩容的线程数)
*/
private transient volatile int sizeCtl; /**
* 当扩容时需要对元素进行迁移,transferIndex用来作为桶的下标
*/
private transient volatile int transferIndex; /**
* 对CounterCells数组更新操作时,使用到的自旋锁
*/
private transient volatile int cellsBusy; /**
* 计数用,用于计算没来的及更新到baseCount的变化
*/
private transient volatile CounterCell[] counterCells; // views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;

重要方法

JDK1.8以前,ConcurrentHashMap主要使用锁分段的机制保证线程安全,在JDK1.8及其以后,主要使用了CAS+synchronized来实现。CAS是一种无锁并发技术。
但是ConcurrentHashMap并没有实现自己的CAS,而是会直接使用private static final sun.misc.Unsafe U;
一下三种方法保证了访问桶数组中的第一个元素是线程安全的。

//返回节点数组的指定位置的节点  (原子操作)
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//在指定位置设置值(CAS)
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//在指定位置设置值(原子操作)
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

ConcurrentHashMap重要的类

Node<K,V>


static class Node<K,V> implements Map.Entry<K,V> {
final int hash;//key的hash值
final K key;
volatile V val;
volatile Node<K,V> next;//表示链表中的下一个节点 Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
} public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
} public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
//如果key和value都不为空并且两个都相等,则返回true
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
} /**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}

TreeNode节点


static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red; TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
} Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}

TreeBin用作树的头结点字存储root和first节点不存储key和value


static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// 锁的状态值
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock

ForwardingNode


static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}

构造函数

    /**
* 空参构造函数,默认table的size是16.
*/
public ConcurrentHashMap() {
} /**
*
*创建一个新的map,可以通过参数指定table的size。
* @param initialCapacity The implementation performs internal
* sizing to accommodate this many elements.
* @throws IllegalArgumentException 如果参数是负数就会抛出IllegalArgumentException
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
//判断参数是不是大于所限制的最大值,如果大于最大值就设置成最大值,反之调用tableSizeFor方法
//设置初始table容量
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//初始化sizeCtl
this.sizeCtl = cap;
} /**
* 创建与指定map映射的map
*
* @param m the map
*/
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
} /**
*
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements,
* given the specified load factor.
* @param loadFactor the load factor (table density) for
* establishing the initial table size
* @throws IllegalArgumentException if the initial capacity of
* elements is negative or the load factor is nonpositive
*
* @since 1.6
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
} /**
*
* @param initialCapacity 初始容量
* @param loadFactor 负载因子当容量达到 initialCapacty*loadFactory时,执行扩容
* @param concurrencyLevel 并发更新的线程数
* @throws IllegalArgumentException if the initial capacity is
* negative or the load factor or concurrencyLevel are
* nonpositive
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

在任何构造方法中都没有直接对存储元素的Node类型的table进行初始化,而是在第一次put的时候进行初始化,这叫做延迟加载。

这里具体看一下初始化方法initTable

private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
//如果正在扩容或者正在进行初始化那就释放CPU执行权
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//将sizeCtl设置成-1,告诉其他线程正在进行初始化
try {
if ((tab = table) == null || tab.length == 0) {
//如果sizeCtl>0,那就初始化大小为sizeCtl的数组,如果=0,就初始化为16的默认大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);//相当于 sc = n*0.75
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

put方法

   /**
* 单纯的调用putVal方法,第三个参数设置成了false
* 当设置为false的时候表示这个value一定会被设置
* 如果设置成true,表示只有这个key的value是null的时候才会被设置
**/
public V put(K key, V value) {
return putVal(key, value, false);
}

putVal

实现的思路

如果数组为空,初始化数组
计算当前桶的位置有没有值,没有的话,CAS创建,失败继续自旋,直到成功添加
如果该节点是转移节点(正在扩容,)就会一直自旋等待扩容完成后在进行自增
如果这个位置不为null,先锁定该槽点,保证其他线程不能进行操作,如果是链表,新增到链表的尾部,如果是红黑树使用红黑树的新增方法进行新增;
新增完成后,check需不需要进行扩容,需要的话进行扩容。

    /** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果tab 是空进行tab初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果索引的位置没有值是null那就直接创建
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//在索引是i的位置上创建新的元素,当i位置时空时,表示创建成功没结束for循环,否则继续自旋
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果当前节点是转移节点(MOVED=-1),表示正在进行扩容,则会等待扩容完成
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//如果该索引的位置不是null,已经有值
V oldVal = null;
//锁住当前节点,保证其他线程不进行更改
synchronized (f) {
//链表
if (tabAt(tab, i) == f) {
if (fh >= 0) {
//binCount被赋值,代表有线程更新表的过程
binCount = 1;
//遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果key的hash和链表中的某个节点的key的hash相同
//并且值页相同,然后将旧的值赋值给oldVal
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
//如果onlyIfAbsent=false代表可以设置,否则不能更新值
if (!onlyIfAbsent)
e.val = value;
//更新值以后退出
break;
}
//如果遍历过后没有找到对应的key那就将新的node放在链表的最后
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//红黑树,TreeBin持有红黑树的引用并且会对其加锁保证线程安全
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//在putTreeVal方法中,再给红黑树着色旋转的时候,会锁住树的根节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//binCount不为空,并且oldVal有值的情况下,说明新增成功
if (binCount != 0) {
//binCount>=8需要转成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//检查一下是否需要进行扩容,如果需要进行扩容,调用transfer进行扩容
addCount(1L, binCount);
return null;
}

扩容

扩容方法通过下面的方法来发起

addCount方法:当容器中的元素发生改变时会进行调用,主要是检查容器当前的状态,判断是否扩容。
tryPresize方法:在treeIf斌和putAll方法中调用,treeIfbin主要是在put元素后,判断该链表的长度是不是大于8,如果超过则会调用这个方法来扩容数组或者把链表转化为树。
helpTransfer方法:是在当一个线程要对表中元素进行操作时候,如果检测到节点的HASH值为MOVED的时候,就会调用help Transfer方法,helpTransfer方法又会调用transfer方法进行扩容

扩容的具体实现还是通过transfer方法完成

将数组中的节点复制到新的数组中的相同位置,或者移动到拷贝部分的相同位置。
拷贝数组时,先把原先数组的节点锁住,保证原数组的节点不能操作,成功拷贝到新数组时,把原数组的当前节点赋值称为转移节点,这时如果有线程在put这个节点位置时,发现是转移节点就会一直等待,所以在扩容完成之前这个节点对应的值是不发生变化的。扩容分为两步,第一步是新建空数组,第二是移动或拷贝每个元素到新数组中去


private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//n代表原先数组的长度,移动过程中会将桶分段,stride代表每段的长度最小值是16
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // 初始化新数组
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n; //transferIndex用于记录当前迁移的进度,需要注意的是迁移元素从最后一个节点开始
}
//新数组的长度
int nextn = nextTab.length;
//fwd代表转移节点,如果原数组上是转移节点,说明该节点正在被扩容
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//advance=true代表这次循环已经完成,可以开始下一个循环
boolean advance = true;
//finishing表示当前线程数组迁移是否完成
boolean finishing = false; // to ensure sweep before committing nextTab
//死循环 自旋操作
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)//结束本次循环
advance = false;
else if ((nextIndex = transferIndex) <= 0) {//已经拷贝完成
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//为当前线程分配桶的区间,当前线程需要将负责这个区间内的桶元素移动到nextTable 中
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//判断当前线程是否完成所有的节点的迁移
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {//finishing=true代表完成拷贝过程,那就把新数组赋值给table
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//相当于 n * 0.75
return;
}
//将参与扩容的线程数量减一
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // 退出的线程需要再检查一遍容器的状态
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // 当前桶节点已经被处理
else {
//开始进行桶节点的拷贝
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;//n的值是原先数组的长度
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//如果节点只有单个数据直接拷贝;如果是链表进行链表拷贝
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//在新数组的位置上设置值
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {//红黑树的拷贝
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//这里会判断要不要变回链表 UNTREEIFY_THRESHOLD=6
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

扩容的关键点

    拷贝桶节点时,会把原数组的节点锁住;
    拷贝成功后,会把原数组的节点设置成转移节点,这样如果有数据需要put到该节点时,发现该节点时转移节点会进行等待,知道扩容成功后,才能继续执行。
    从尾到头进行拷贝,但拷贝后并不是原先数组的绝对倒序
    等扩容完成后将新数组的值赋值给数组容器,之前等待put的线程才能继续put。

get

/*需要注意的是get方法的参数 key不允许传入null,否则会抛出NullPointerException*/
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算hashcode
int h = spread(key.hashCode());
//不是空数组并且当前索引下标的节点不是空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
//如果第一个几点的键与要找的key相等,就直接返回对应的value
return e.val;
}
else if (eh < 0)//如果是红黑树或者是转移节点使用对应的find方法
return (p = e.find(h, key)) != null ? p.val : null;
//如果是链表就开始遍历
while ((e = e.next) != null) {
//找到对应的key返回相应的value
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
//tab是null就返回null
return null;
}

参考
https://www.cnblogs.com/zerotomax/p/8687425.html#go0
https://www.rayjun.cn/
https://www.imooc.com/read/47/article/858

【Java并发集合】ConcurrentHashMap源码解析基于JDK1.8的相关教程结束。

《【Java并发集合】ConcurrentHashMap源码解析基于JDK1.8.doc》

下载本文的Word格式文档,以方便收藏与打印。