0%

JDK1.7 HashMap和ConcurrentHashMap 源码和实现

HashMap

问题:JDK1.7中,HashMap是通过什么原理实现的呢?

答案:数组+链表

问题:什么叫哈希碰撞(哈希冲突)?

不同的键值通过哈希函数运算得到相同的哈希值,解决哈希冲突的方式有开放寻址法和链表法,ThreadLocalMap由于其元素个数较少,采用的是开放寻址法,而HashMap采用的是链表法来解决哈希冲突,即所有散列值相同的元素都放在相同槽对应的链表中(也就是数组+链表的方式)。

属性结构

先分析一下HashMap的关键属性:

1
2
3
4
5
/**
* The default initial capacity - MUST be a power of two.
*/
//HashMap初始化容量大小,默认是16
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4;
1
2
3
4
5
6
7
/**
* The maximum capacity, used if a higher value is implicitly specified
* by either of the constructors with arguments.
* MUST be a power of two <= 1<<30.
*/
//HashMap最大的容量大小
static final int MAXIMUM_CAPACITY = 1 << 30;
1
2
3
4
5
/**
* The load factor used when none specified in constructor.
*/
//HashMap的扩容因子,默认是0.75
static final float DEFAULT_LOAD_FACTOR = 0.75f;
1
2
3
4
5
6
7
8
9
10
/**
* The next size value at which to resize (capacity * load factor).
* @serial
*/
// If table == EMPTY_TABLE then this is the initial capacity at which the
// table will be created when inflated.
//阈值,当table == {}时,该值为初始容量(初始容量默认为16);
//当table被填充了,也就是为table分配内存空间后,threshold一般为 capacity*loadFactory。
//HashMap在进行扩容时需要参考threshold
int threshold;
1
2
3
4
5
/**
* The number of key-value mappings contained in this map.
*/
//当前HashMap存储的键值对数量
transient int size;
1
2
3
4
5
6
7
8
9
/**
* The number of times this HashMap has been structurally modified
* Structural modifications are those that change the number of mappings in
* the HashMap or otherwise modify its internal structure (e.g.,
* rehash). This field is used to make iterators on Collection-views of
* the HashMap fail-fast. (See ConcurrentModificationException).
*/
//用于计算链表中的修改次数
transient int modCount;
1
2
3
4
5
/**
* An empty table instance to share when the table is not inflated.
*/
//HashMap内部的存储结构是一个数组,此处数组为空,即没有初始化之前的状态
static final Entry<?,?>[] EMPTY_TABLE = {};
1
2
3
4
5
/**
* The table, resized as necessary. Length MUST Always be a power of two.
*/
//HashMap的主干数组,就是一个Entry数组,初始值为空数组{},主干数组的长度一定是2的次幂
transient Entry<K,V>[] table = (Entry<K,V>[]) EMPTY_TABLE;

HashMap的主干是一个Entry数组。Entry是HashMap的基本组成单元,Entry是一个链表,每一个Entry包含一个key-value键值对,每一个Entry都有一个next属性指向下一个Entry对象。

Entry是HashMap中的一个静态内部类。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static class Entry<K,V> implements Map.Entry<K,V> {
final K key;
V value;
Entry<K,V> next;//存储指向下一个Entry的引用,单链表结构
int hash;//对key的hashcode值进行hash运算后得到的值,存储在Entry,避免重复计算

/**
* Creates new entry.
*/
Entry(int h, K k, V v, Entry<K,V> n) {
value = v;
next = n;
key = k;
hash = h;
}
}

可以看到,Entry有一个构造函数,该构造函数第三个参数是Entry<K,V> n主要用于链表的头插入,传入链表的链表头,将新生成的Entry对象作为链表的头部,next属性指向原链表头。

source-01


接着看一下HashMap的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public HashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR);
}

public HashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR);
}

public HashMap(int initialCapacity, float loadFactor) {
//当初始化的容量小于0时抛出异常
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
//当初始化容量大于最大值时,修改成最大值
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);

this.loadFactor = loadFactor;
//这里先把初始化的容量大小赋值给 threshold阈值
threshold = initialCapacity;
//LinkedHashMap初始化的使用,HashMap类是个空方法
init();
}

在看HashMap的put方法之前,先看一个例子:

1
2
3
4
5
6
7
8
public class Demo {
public static void main(String[] args) {
HashMap<String, String> map = new HashMap<>();
map.put("1", "2");
String a = map.put("1", "3");
System.out.println(a); //输出结果:2
}
}

为什么会输出结果是【2】呢?来HashMap的put方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public V put(K key, V value) {
//采用了懒加载,初始化HashMap对象时并未真正赋予其数组大小
if (table == EMPTY_TABLE) {
inflateTable(threshold);
}
//当Key等于null时,默认其存储的数组下标是0
if (key == null)
return putForNullKey(value);
//计算Key的Hash值
int hash = hash(key);
//将Hash值进行算法计算,得出存储的数组下标
int i = indexFor(hash, table.length);
//遍历寻找key值是否存在于链表中,存在则替换value成新值,并且返回旧值
for (Entry<K,V> e = table[i]; e != null; e = e.next) {
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
//修改次数++
modCount++;
//不存在于链表中,执行添加操作
addEntry(hash, key, value, i);
return null;
}

懒加载初始化

接着解析一下put方法的逻辑,可以看到它懒加载时调度了inflateTable方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Inflates the table.
* 初始化table数组大小
*/
private void inflateTable(int toSize) {
// Find a power of 2 >= toSize
// 用于获取toSize 大于或等于的 2的次方数 HashMap规定了容量大小必须是2个次方倍
int capacity = roundUpToPowerOf2(toSize);

threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
table = new Entry[capacity];
initHashSeedAsNeeded(capacity);
}

private static int roundUpToPowerOf2(int number) {
// assert number >= 0 : "number must be non-negative";
return number >= MAXIMUM_CAPACITY
? MAXIMUM_CAPACITY
: (number > 1) ? Integer.highestOneBit((number - 1) << 1) : 1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
*在看以下运算之前,我们应该了解到,2的幂次方数在二进制位数上只有一个1的存在
2 0000 0010
4 0000 0100
8 0000 1000
16 0001 0000
*/
//int java.lang.Integer.highestOneBit(int i)
//返回小于等于i的2的次方数
public static int highestOneBit(int i) {
// HD, Figure 3-1 比如:i=17 binary(17)= 0001 0001
i |= (i >> 1); //0001 0001 | 0000 1000 = 0001 1001
i |= (i >> 2); //0001 1001 | 0000 0110 = 0001 1110
i |= (i >> 4); //以下也如此,主要是为了把低数值都填充为1
i |= (i >> 8);
i |= (i >> 16); //0001 1111 | 0000 0000 = 0001 1111
//以上操作时为了将低位全部变成1
return i - (i >>> 1); //0001 1111 - 0000 1111 = 0001 0000
//如此以上操作我们就得到了小于等于2的幂次方数,因为传入的数值经过左移1处理,即数值翻倍,所以得到的次方数为大于等于原始数值的2的幂次方数
}

关注代码:Integer.highestOneBit((number - 1) << 1)
1、highestOneBit获取的是小于等于i的2次方数,而这里需要获取大于等于i的2次方数

2、传入的参数值通过向左位移一位,将数字扩大两倍,如number=10,需要将它左移后得到20,通过highestOneBit方法可以获取到小于等于传入值的2次方数,即16,如此便可获得大于等于number的2次方数

3、(number - 1)是为了兼容特殊场景,number刚好等于2的次方数 ,例如16,此时如果直接左位移后传入计算,则得到的数字为32,远远超出我们的需求,所以需要进行减1操作,适配这种情况


思考:HashMap的采用的数组+链表的存储方式,那么当一个键值对put进来时,它的下标是怎么计算的呢?

思考:为什么HashMap规定了其容量的大小必须是2的幂次方数?其原因是什么?

put方法接着往下看,可以看到,它把Key值拿去做了HashCode运算,并调用indexFor,根据返回的HashCode计算该键值对应该存放的Entry数组下标

1
2
3
4
5
6
7
/**
* Returns index for hash code h.
*/
static int indexFor(int h, int length) {
// assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";
return h & (length-1);
}
意义 二进制 十进制
length 0001 0000 16
length-1 0000 1111 15
HashCode 1010 1010
hashCode & (length-1) 0000 1010

可以看到下标的结果将落在了【0 - 15】 中,通过length -1,将低位全部变成1,进行与运算后,下标必定落在其范围中,这也是为什么规定了HashMap的容量必须是2的次方数。

(Length - 1) & HashCode公式中,因为数组长度需要其加入&运算,当长度减1时,高位变成0,低位全部变成1,与HashCode值进行&运算之后的数值必定落的[0,Length -1]之间,因此保证长度为2的幂次方时,运算可以等同于HashCode%数组长度

细心的同学可以发现,这里参与计算的只有HashCode的地位,而HashCode的高位并没有参与计算,那么就会导致数据分散在数组中会很不均匀,这时候回过头再来看上一行的hash方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final int hash(Object k) {
int h = hashSeed;
if (0 != h && k instanceof String) {
return sun.misc.Hashing.stringHash32((String) k);
}
//当hashSeed!=0时,计算时hash值将加入哈希种子打散hash值的分布
h ^= k.hashCode();

// This function ensures that hashCodes that differ only by
// constant multiples at each bit position have a bounded
// number of collisions (approximately 8 at default load factor).
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}

可以看到,当取到了hashCode之后,对hashCode进行了右移的异或运算,再将得出的结果拿去计算数组下标,这是为了让hashcode的高位也能参与到数组下标的计算过程当中,解决数据分布不均匀的问题。


再接着往下看put方法的执行,定位到数组下标后遍历其链表,判断链表是否存在相同的key值,如果找到,新值覆盖旧值,并且返回旧值。这也是为什么上文例子中,输出的结果是【2】的原因。

扩容判断及添加元素

接着详细看看put方法中执行的addEntry方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* int hash: 当前Key的Hash值
* bucketIndex: 数组下标
*/
void addEntry(int hash, K key, V value, int bucketIndex) {
//判断是否需要扩容
if ((size >= threshold) && (null != table[bucketIndex])) {
//需要扩容时,传参传入了table.length的长度的两倍
resize(2 * table.length);
hash = (null != key) ? hash(key) : 0;
bucketIndex = indexFor(hash, table.length);
}

createEntry(hash, key, value, bucketIndex);
}

void createEntry(int hash, K key, V value, int bucketIndex) {
//获取当前链表的表头,即数组bucketIndex索引上的元素
Entry<K,V> e = table[bucketIndex];
//将key-value键值对赋予到新Entry对象中,并通过头插入法,其Entry的next属性指向原链表表头
//则新的Entry对象成为了链表的新表头,并将该表头存储在table数组中。
table[bucketIndex] = new Entry<>(hash, key, value, e);
//添加新元素,hashMap的元素数量++
size++;
}

注意:根据源码可以得出,JDK1.7的HashMap插入链表采用的是“头插入”法。

问题:为什么采用头插法呢?

答案:因为头插法效率高,不需要遍历链表。

​ 但其实插入之前都需要遍历链表的旧元素,判断是否存在相同的key,存在则修改,没有才会在遍历之后进行头插入新增;所以这里并不能体现出头插法的优势。因为在我们遍历旧链表时,新增情况下,我们可以知道尾节点的元素,因此尾插法也可以直接将元素添加上去;如若修改和插入的方法进行分离,无法记录尾节点的情况下,头插法才能在插入时体现出效率优势。

问题:在添加新的键值对的时候,需要判断是否需要扩容,那么它什么情况下需要扩容?

答案:

1.当前HashMap的键值对数量(size)>= threshold【threshold=capacity(当前数组容量)*loadFactory(扩容因子)】

2.Entry数组上的元素不为空


问题:HashMap为什么需要扩容?

答案:初始化的Entry数组容量是16,元素过多会导致链表过长,当调用get方法遍历链表时会增加耗时,所以需要通过扩容,将一条链表的数据内容分散成多条链表,添加到扩容数组的新下标中。

扩容逻辑

接下来看看他的扩容逻辑,当满足扩容条件(size大于等于阈值 && 数组当前索引位置不为空)时,调度了resize方法:

重新计算链表元素的索引位置(h & (length-1)),使用头插法,旧元素next指向新链表索引位置元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return;
}

Entry[] newTable = new Entry[newCapacity];
transfer(newTable, initHashSeedAsNeeded(newCapacity));
table = newTable;
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

/**
* Transfers all entries from current table to newTable.
*/
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
//遍历原Entry数组
for (Entry<K,V> e : table) {
//遍历链表中的每一个非空Entry
while(null != e) {
Entry<K,V> next = e.next;
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
//重新计算该Entry需要存储的数组下标
int i = indexFor(e.hash, newCapacity);
//将next属性指向原链表头
e.next = newTable[i];
//替换数组中的链表头位置
newTable[i] = e;
//将e元素指向下一个链表中的Entry
e = next;
}
}
}

key的rehash是有条件的,需要根据参数的布尔值决定它是否需要重新计算hash值,并不是所有扩容都需要重新计算key的Hash值。

假设链表中有三个元素,分别是【1、2、3】,具体行为如下:

source-02

在遍历过程中,由于数组下标没有发生变化,其Entry对象依旧将指向新数组的同一个下标,接着循环遍历链表的第二个元素。

source-03

遍历第二个元素时,通过表头插入法,其next指向原链表表头,并将自身作为Entry数组的链表头,最终得出的结果如下:

source-04

可以看到,在扩容过程中,计算出来的数组下标没有发生变化时,链表被反序存储了。


问题:怎么将一条链表的数据内容分散成多条链表,添加到扩容数组的新下标中?新下标满足什么规律?

假设原数组长度是16,那么扩容后则等于32,看看其计算下标的变化:

意义 二进制 十进制
length 0010 0000 32
length-1 0001 1111 31
Akey:HashCode 1010 1010
hashCode & (length-1) 0000 1010 index=10
Bkey:HashCode 1011 1010
hashCode & (length-1) 0001 1010 index=26

可以看到,当Akey=HashCode=1010 1010时,计算的数组下标未发生变化,而当Bkey=HashCode=1011 1010时,计算的等于26,也就是说,通过这种方式,就能够将属于同一个链表的元素,在重新计算下标的过程中,将链表元素分配到其他新的链表中。

在Hash值未被重新计算的前提下,若链表元素被分配到其他新链表时,其新链表的下标=原下标+原数组长度


问题:HashMap为什么在多线程并发的情况下会出现问题?著名的HashMap死锁问题原因是什么?

假设现在有两条线程同时对同一个HashMap操作,执行put方法,而且刚好满足扩容条件时,假设这时候两条线程都运行到了transfer方法的循环语句,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;
//假设两条多线程都运行到该行位置
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}

int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
}
}
}

由于代码中遍历的table数组是一个成员变量,是共享的,所以循环到该点时的指向如下图:

source-05

由于Java的多线程并发是通过CPU上下文切换交替执行的,假设这时候线程2被CPU挂起了,线程1执行完扩容操作,线程2再恢复执行,那么当线程1执行完扩容后,其对应的table数组内容已经发生改变,当线程2被CPU唤醒继续执行扩容时,其结构图如下:

source-06

按照代码的流程走向,经过一次while循环以后,值得注意的是,其e对象指向Entry对象(1),其next对象指向Entry(2),得到如下:

source-07

再经过一次while循环以后,其原本的Entry对象(2)的next属性指向Entry对象(1),所以其next对象将会重新指向Entry对象(1),将得到下图:

source-08

再经过一次while循环以后,其next对象将会指向null值,而Entry对象(1)的next属性将会指向原表头Entry对象(2),将得到下图:

source-09

从逻辑上可以看出,在多线程情况下造成了循环链表,由于最后的循环中的next对象指向了null,所以线程2的扩容是可以执行的,但是当有其他线程遍历该循环链表时,将会造成死循环,耗尽CPU资源,这就是HashMap在多线程并发情况下造成的死锁问题。

问题:如何在多线程情况下避免HashMap的死锁问题?

答案:如果已知了数据长度时,可以使用public HashMap(int initialCapacity, float loadFactor)构造函数设置其大小以及扩容因子,使得size >= threshold条件不成立,避免造成扩容。


接着再回去看看resizetransfer方法,通过源码得知,initHashSeedAsNeeded方法是控制是否需要重算hash值,在它在什么情况下需要重算keyhash值呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final boolean initHashSeedAsNeeded(int capacity) {
//初始条件下 哈希种子hashSeed=0,所以currentAltHashing = false;
boolean currentAltHashing = hashSeed != 0;
//具体逻辑:当capacity 大于 Holder.ALTERNATIVE_HASHING_THRESHOLD时,才可能重算Hash
boolean useAltHashing = sun.misc.VM.isBooted() &&
(capacity >= Holder.ALTERNATIVE_HASHING_THRESHOLD);
//取异或 不相等才返回true
boolean switching = currentAltHashing ^ useAltHashing;
if (switching) {
hashSeed = useAltHashing
? sun.misc.Hashing.randomHashSeed(this)
: 0;
}
return switching;
}

最后跟踪源码得知,在默认情况下,无论怎么扩容都不会造成重算Hash值,除非配置了JVM启动参数jdk.map.althashing.threshold。这里哈希种子的目的就是为了打散链表的hash值,使得每次扩容时,各链表分布更加均匀,这里就不再详细讨论了。


HashMap的属性中,定义了一个transient int modCount;,用来记录该对象的修改次数,是hashMap提供的一种快速失败机制(fast fail),用于迭代遍历时校验对象是否被修改,当集合在迭代过程中对象被其他线程并发修改时,将会抛出ConcurrentModificationException异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final Entry<K,V> nextEntry() {
//在遍历集合之前会将modCount赋值给expectedModCount,接着遍历过程中会不断验证modCount是否发生变化
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
Entry<K,V> e = next;
if (e == null)
throw new NoSuchElementException();

if ((next = e.next) == null) {
Entry[] t = table;
while (index < t.length && (next = t[index++]) == null)
;
}
current = e;
return e;
}

问题:那想要在遍历集合的过程中删除集合的元素,该如何实现?

答案:通过使用Iterator迭代器遍历集合,调用迭代器的remove方法删除元素,该方法在删除元素的时候会重新对expectedModCount赋值,保证了modCount == expectedModCount

ConcurrentHashMap

众做周知,HashMap就算避免了死锁问题,也不是一个线程安全的集合,而HashTable类就是达到通过对HashMap里的各个方法加synchronized锁实现HashMap集合的线程安全,HashTable是一个同步容器类

属性结构

ConcurrentHashMap是一个并发容器类,底层使用了Segment分段锁的原理,先从源码中看看它和HashMap相比,属性上的区别:

1
2
3
4
5
6
/**
* The default concurrency level for this table, used when not
* otherwise specified in a constructor.
*/
//并发级别,用于计算一个Segment负责管理多少个Entry数组
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
1
2
3
4
5
6
7
/**
* The minimum capacity for per-segment tables. Must be a power
* of two, at least two to avoid immediate resizing on next use
* after lazy construction.
*/
//规定了segment中的HashEntry数组最小容量,数组容量必须是2的次方数
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
1
2
3
4
5
6
7
8
9
10
/**
* The maximum number of times to tryLock in a prescan before
* possibly blocking on acquire in preparation for a locked
* segment operation. On multiprocessors, using a bounded
* number of retries maintains cache acquired while locating
* nodes.
*/
//获取锁失败时,tryLock循环的最大次数,获取当前CPU核数,大于1则设置成64
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
1
2
3
4
5
6
/**
* The maximum number of segments to allow; used to bound
* constructor arguments. Must be power of two less than 1 << 24.
*/
//规定了segment数组的最大值不能超过2的16次方
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
1
2
3
4
5
6
/**
* Mask value for indexing into segments. The upper bits of a
* key's hash code are used to choose the segment.
*/
//segmentMask = segment[].length - 1 用来执行【haschCode & segmentMask】计算Hash值
final int segmentMask;
1
2
3
4
5
/**
* The segments, each of which is a specialized hash table.
*/
//ConcurrentHashMap的分段锁对象,自身带了一把锁且存储了链表结构
final Segment<K,V>[] segments;

HashEntryConcurrentHashMap是的基础单元(节点),是实际数据的载体,等同于HashMap中的Entry

1
2
3
4
5
6
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}

source-10


Segment 继承 ReentrantLock 锁,用于存放数组 HashEntry[]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Segments are specialized versions of hash tables. This
* subclasses from ReentrantLock opportunistically, just to
* simplify some locking and avoid separate construction.
*/
static final class Segment<K,V> extends ReentrantLock implements Serializable {
/**
* The maximum number of times to tryLock in a prescan before
* possibly blocking on acquire in preparation for a locked
* segment operation. On multiprocessors, using a bounded
* number of retries maintains cache acquired while locating
* nodes.
*/
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

//HashEntry数组,ConcurrentHashMap数据的存储的载体
transient volatile HashEntry<K,V>[] table;

/**
* The number of elements. Accessed only either within locks
* or among other volatile reads that maintain visibility.
*/
//HashEntry的数组中的所有HashEntry大小(包括链表)
transient int count;
//HashEntry数组的变化次数
transient int modCount;
//扩容阈值
transient int threshold;
//扩容因子
final float loadFactor;
}

source-12


构造初始化

在看看ConcurrentHashMap的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Creates a new, empty map with a default initial capacity (16),
* load factor (0.75) and concurrencyLevel (16).
*/
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}

@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//规定了不能大于2的16次方
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
//该值用于存储segment数组容量的次方数
int sshift = 0;
int ssize = 1;
//ssize小于并发级别则循环,并发级别默认16,该循环主要是找大于等于concurrencyLevel的2的次方数
//默认的并发级别concurrencyLevel=16,循环4次后,sshift=4,ssize=16
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;//28 = 32 - 4
//ssize为segment[]的数组长度,所以segmentMask=segment.length-1
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//c相当于initialCapacity与ssize相差的倍数值,因除法可能舍弃小数位,所以有c * ssize < initialCapacity的判断,进行自增,有小数位的情况下c会自增1
//用于之后的cap计算
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
//cap是HashEntry数组大小,默认最小值为2
int cap = MIN_SEGMENT_TABLE_CAPACITY;
//规定HashEntry数组必须是2的次方数,所以循环获取大于等于计算的HashEntry数组大小的2的次方数
while (cap < c)
cap <<= 1;
// create segments and segments[0]
//在初始化Segment数组时默认初始化segment[0],主要是为了生成原型,当初始化其他segment对象时无需再计算HashEntry数组大小
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

可以看到其构造函数相比HashMap,多了一个concurrencyLevel参数,该参数主要是用来设置并发级别,用于

计算每个segmentHashEntry数组大小,ConcurrentHashMap规定了segment数组和HashEntry数组的大小必须是2的次方数,其缘由与HashMap一致。

ConcurrentHashMap构造方法的执行步骤:

1.判断参数的范围合法性

2.计算concurrencyLevel大于等于2的次方数

3.设置segmentMask的值

4.计算segment对象的HashEntry数组长度

5.设置HashEntry数组长度的合理长度(2<=计算的HashEntry数组长度<=合理长度=2的次方数)

6.根据合理的segment数组长度及HashEntry数组长度初始化Segment[]Segment[0]


添加元素

接着看ConcurrentHashMapput方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public V put(K key, V value) {
Segment<K,V> s;
//concurrentHashMap规定了Key和Value不能为NULL
if (value == null)
throw new NullPointerException();
//计算Key的Hash值
int hash = hash(key);
//计算需要存储的segment数组中的下标
int j = (hash >>> segmentShift) & segmentMask;
//获取segment数组中的第j个位置的segment对象并赋值给对象s,如果为null,则初始化segment对象并存储进数组中
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
//将key-value键值对存储到segment中的HashEntry数组中
return s.put(key, hash, value, false);
}

上文中通过调用UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)获取Segment数组中的第j个下标的segment对象,因为通过这种偏移量获取对象效率会高很多。

问题:在计算segment数组下标时,为什么需要执行(hash >>> segmentShift)向右位移呢?

1
2
3
4
5
6
//通过观察ConcurrentHashMap的构造函数,其中有部分代码如下:
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;

上文分析了构造函数,该循环用于计算大于等于concurrencyLevel的2的次方数,假设concurrencyLevel=16,那么通过循环得知sshift=4segmentShift = 32 - sshift = 28

source-13

如上图,其向右无符号位移28(segmentShift=28)位,最后将保留HashCode的高4(sshift=4)位,并使用HashCode的高位和segmentMark相与计算所在的segment数组下标。

总而言之,就是取了HashCode的高位,和segmentMask计算数组下标,向右偏移获取HashCode高位的数量取决于初始化的concurrencyLevel的值,是2的几次方。


put方法执行计算Segment[]下标后,获取的segment对象有可能尚未初始化,根据逻辑会调度ensureSegment方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
//进入方法后重新判断是否被其他线程初始化,若被初始化,则直接赋值seg对象并返回
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//获取segment[0]对象,以它为原型初始化segment对象
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
//获取原型的HashEntry数组长度
int cap = proto.table.length;
//获取扩容因子
float lf = proto.loadFactor;
//计算阈值
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//double Check,再次判断是否已被其他线程初始化
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
//创建Segment对象
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
//采用CAS自旋,将新生成的Segment对象塞进数组中,若失败,则获取数组对象返回
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

通过分析,当有多个线程同时并发执行创建同一个数组下标的segment对象时,由于CAS自旋机制,最终只会有一个线程创建成功,其他线程将获取数组对象并返回,从而达到在并发情况下的线程安全。


创建完segment对象后,将调用Segment.put()方法,添加key-value键值对到Segment对象中,我们来看看Segment类的put方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//尝试加锁 tryLock不阻塞
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//计算存储的HashEntry数组下标
int index = (tab.length - 1) & hash;
//获取HashEntry数组中的index下标,这里命名的first指的是HashEntry中的链表的头结点
HashEntry<K,V> first = entryAt(tab, index);
//遍历链表
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;

//普通put方法的onlyIfAbsent=false,有相同的key,进入判断,更新旧值,返回旧值,
//putIfAbsent方法的onlyIfAbsent=true,有相同的key,不会进入此判断,不更新旧值,返回旧值
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
//假设遍历到最后节点均未匹配到相同的key由于它是最后的链表节点,e.next==null
//再次循环时,将会运行到下面的else条件新建节点
e = e.next;
}
else {
//当获取锁失败时,会调用scanAndLockForPut提前生成node对象,这里只需设置链表头即可
if (node != null)
node.setNext(first);
else
//创建HashEntry对象,采用头插法,新的HashEntry对象的next属性指向first
node = new HashEntry<K,V>(hash, key, value, first);
//HashEntry总数+1
int c = count + 1;
//如果数量达到了扩容的阈值,则进行扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
//添加HashEntry对象到数组中,并替换数组中的链表头(替换成新创建的HashEntry对象)
setEntryAt(tab, index, node);
//Setment中累加变化次数
++modCount;
//赋值新计算的count值
count = c;
oldValue = null;
break;
}
}
} finally {
//解锁ReentrantLock
unlock();
}
return oldValue;
}

由于Segment继承了ReentrantLock,当它在插入元素时会调用tryLock非阻塞尝试获取锁,当获取锁失败时,调用scanAndLockForPut方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* Scans for a node containing given key while trying to
* acquire lock, creating and returning one if not found. Upon
* return, guarantees that lock is held. UNlike in most
* methods, calls to method equals are not screened: Since
* traversal speed doesn't matter, we might as well help warm
* up the associated code and accesses as well.
*
* @return a new node if key not found, else null
*/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//根据Hash值计算数组下标并获取链表的表头
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
//用这个变量来控制循环的逻辑
int retries = -1; // negative while locating node
//循环尝试获取锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
//当链表的表头 == null 或者遍历链表后未匹配到元素时,创建HashEntry对象
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
//设置retries=0用以下次循环时调用其他逻辑
retries = 0;
}
//当链表表头不为空时,遍历链表,寻址到链表元素时设置retries=0调用其他逻辑
else if (key.equals(e.key))
retries = 0;
else
//当链表遍历到最后一个节点,e.next=null,当再次循环时,会调用创建HashEntry对象的逻辑
e = e.next;
}
//当重试次数大于一定的次数时,直接调用阻塞的Lock方法
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
//(retries & 1) == 0 :当retries是偶数时,返回true,表示每隔一次才去判断链表表头是否发生变化
//重新获取链表表头,当链表表头与原来获取的first不同时,设置retries=-1,重新遍历
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}

说白了,当尝试获取锁失败而调度scanAndLockForPut方法,其目的是为了在无法获取锁的时候能够提前创建HashEntry对象并返回。


回头接着看Segmentput方法的步骤:

1.当Segment获取到锁之后,先进行遍历,判断HashEntry是否有同样的key值存在

2.若key值存在,根据参数onlyIfAbsent判断是否覆盖value值,并返回旧值

3.若key值不存在,则创建HashEntry对象并设置链表头(有可能在获取锁时创建)

4.判断其扩容条件,若需要扩容,则进行链表重排

5.若不需要扩容,则添加HashEntry对象到数组中,并替换数组中的链表头

6.更新count值和modCount值


扩容

接着来看看其扩容的方法,当满足count + 1 > threshold && tab.length < MAXIMUM_CAPACITY条件时,将调用rehash扩容方法:

segement数组不扩容,HashEntry数组进行扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//注意:这里传的参数是新的HashEntry对象,其next属性指向原链表的头部
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
//向左偏移1位,扩容一倍大小
int newCapacity = oldCapacity << 1;
//重新计算阈值
threshold = (int)(newCapacity * loadFactor);
//初始化新的HashEntry数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
//HashEntry[].length - 1 用于计算Hash值
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
//遍历旧的HashEntry数组
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
//计算出新的HashEntry数组下标
int idx = e.hash & sizeMask;
//当链表中只有一个元素时,直接将该元素赋值到数组中
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
//该循环的目的是从链表尾部截取出一条与链表末尾节点的新数组下标相同,并且相连的链表
//并存储该截取链表的头对象到lastRun对象,存储新数组下标到lastIdx中
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
//直接移动尾部截取的链表到新的数组下标中
newTable[lastIdx] = lastRun;
// Clone remaining nodes
//重新顺着从原链表头开始遍历,遍历到lastRun,也就是链表截取处时,跳出循环
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
//采用头插法讲一个个元素分别插入到链表头中,并赋值给HashEntry数组
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
//对原链表扩容完成之后,计算新的HashEntry的下标
int nodeIndex = node.hash & sizeMask; // add the new node
//替换链表头
node.setNext(newTable[nodeIndex]);
//赋值到HashEntry数组中
newTable[nodeIndex] = node;
table = newTable;
}

这里的扩容逻辑和HashMap不太一样,HashMap存在逻辑判断可能会重新计算keyhash值,而ConcurrentHashMap并没有,所以它的必定满足新数组下标=原下标+原数组长度

它的扩容并非直接遍历整张链表,而是先遍历一次链表,计算每个元素的新数组下标,如果从某个A元素开始,一直到链表遍历完成,他们计算的新数组下标均相同,意味着可以直接截取该链表,从A元素开始一致到末尾,整条迁移到新节点上,如下图:

source-11

当迁截取部分链表赋值到新数组之后,遍历原链表,采用头插法插入链表,并赋值到新数组中,一直遍历到截取的A元素位置时挑出循环。最后才将新增的元素对象插入到扩容的数组中。


最后看看Segment类中的put方法中调用的setEntryAt方法:

1
2
3
4
5
6
7
8
/**
* Sets the ith element of given table, with volatile write
* semantics. (See above about use of putOrderedObject.)
*/
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
HashEntry<K,V> e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}

通过使用UNSAFE调用CPU指令,意思是将HashEntry对象赋值到HashEntry数组的index下标中。


获取元素个数

接着看看ConcurrentHashMapsize方法,它的设计思想可以在编码中借鉴:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
static final int RETRIES_BEFORE_LOCK = 2; 

public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
//当重试次数超过2次时,开启加锁计算
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
//累加各个segment的修改次数
sum += seg.modCount;
//累加各个segment的元素数量
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
//第一次循环:当ConcurrentHashMap的modCount=0时,代表没元素,跳出循环
//第二次循环:和上一次循环计算的元素总数量数比较,如果相同返回,不同则加锁
if (sum == last)
break;
//第一次循环计算的元素总数量赋值给last
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}

其实它的实现逻辑很简单,先在不加锁的情况下,进行计算,第一次计算的结果和第二次计算的结果相同时,就认为这个结果是稳定的,直接返回,当两次计算的结果不相同时,则进行加锁计算。


“本篇文章主要摘自参考资料