title: Java源码篇-Map
author: reef
categories: jdk
tags:
- Map
- ConcurrentHashMap
date: 2020-04-09 19:21:44summary: jdk中map相关源码
Java源码篇-Map
1 HashMap
1.1 重点字段
/**
* 数组默认长度
*/
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
/**
* 最大容量(即数组最大长度)
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 默认加载因子
* 加载因子(loadFactor)是哈希表中用于控制数组存放数据疏密程度的参数。
* 当loadFactor越趋近于1时,数组中存放的数据(entry)越多,哈希冲突的概率增加,
* 导致单个桶中的链表长度可能增加,进而影响查找元素的效率。反之,当loadFactor越小,
* 数组中存放的数据越少,数据分布越稀疏,数组的利用率降低。
*
* 默认的loadFactor值为0.75f,是官方经过权衡后给出的一个较为理想的平衡点,
* 旨在兼顾查找效率和空间利用率。
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**
* 树化阈值。当桶(bucket)上的结点数大于这个值时会转成红黑树
*/
static final int TREEIFY_THRESHOLD = 8;
/**
* 链表化阈值(当桶(bucket)上的结点数小于这个值时树转链表)
*/
static final int UNTREEIFY_THRESHOLD = 6;
/**
* 桶中结构树化对应的table的最小长度
*/
static final int MIN_TREEIFY_CAPACITY = 64;
/**
* 扩容阈值(threshold)
* threshold = loadFactor * 数组长度
* 当HashMap中元素的数量超过threshold时,会触发数组的扩容操作。
* 扩容是为了减少哈希冲突,保持查找效率。
*/
int threshold;
1.2 核心方法
1.2.1 put
1 初始化数组
- 如果是第一次添加元素,先初始化数组(即分配内存空间)。
- 计算键的哈希值并确定索引位置,然后将键值对放入对应的桶(bucket)中。
2 目标桶为空
- 如果计算索引后,数组中对应的桶为空,则直接将键值对放入该桶中。
3 目标桶不为空
- 根节点匹配:
- 如果桶中的根节点(第一个节点)与待插入节点的键
equals
,则直接替换根节点的值。 - 树化节点:
- 如果根节点是树形节点(即红黑树节点),则调用红黑树的插入方法将节点放入树中。
- 链表遍历:
- 如果根节点是链表节点,则遍历链表:
- 如果找到与待插入节点键
equals
的节点,则替换其值。 - 如果遍历到链表末尾仍未找到匹配的节点,则将新节点插入链表末尾。
- 树化条件:
- 如果链表长度(包括待插入节点)达到 8 且数组长度大于等于 64,则将链表转换为红黑树。
- 如果链表长度达到 8 但数组长度小于 64,则仅进行数组扩容,不进行树化。
- 如果找到与待插入节点键
1.2.2 resize
HashMap 数组的长度始终为 2 的次幂,且扩容时长度加倍。这样设计的主要目的是为了方便扩容时的索引计算。以下为具体的扩容过程
1 创建新数组
- 先创建一个长度为原数组 2 倍的新数组。
2 迁移数据
遍历原数组中的每个桶(bucket):
如果桶为空或只有一个元素:
- 直接计算该元素在新数组中的索引,并将其放入新数组。
如果桶中有链表结构:
遍历链表中的每个节点(Node),计算其在新数组中的索引。
由于新数组长度是原数组的 2 倍,且长度始终为 2 的次幂,因此新索引的计算方法为:
newIndex = (key.hashCode() & (newCapacity - 1))
新索引的结果只有两种可能:
与原索引相同:如果 key 的哈希值在高一位为 0。
等于原索引加上原数组长度:如果 key 的哈希值在高一位为 1。
根据计算结果,将节点放入新数组的对应位置。
3 链表拆分
- 如果原桶中的链表被拆分为两个链表(一个保持原索引,另一个为原索引加上原数组长度),则分别将它们放入新数组的对应位置。
1.2.3 部分核心代码
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> rootNode; int tableLength, index;
if ((tab = table) == null || (tableLength = tab.length) == 0) // 添加第一个元素
tableLength = (tab = resize()).length;
if ((rootNode = tab[index = (tableLength - 1) & hash]) == null) // 数组上对于的索引为空,代表这个kv可以直接放到这
tab[index] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (rootNode.hash == hash &&
((k = rootNode.key) == key || (key != null && key.equals(k)))) // bucket上的元素equals要放进来的kv,直接覆盖
e = rootNode;
else if (rootNode instanceof TreeNode) // bucket上已经是红黑树结构了,直接存放为红黑树结构
e = ((TreeNode<K,V>)rootNode).putTreeVal(this, tab, hash, key, value);
else { // 向链表的末尾添加
for (int binCount = 0; ; ++binCount) {
if ((e = rootNode.next) == null) { // 到链表末尾了
rootNode.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
{ // 链表的长度(算上bucket)已经大于等于了8,转换为红黑树
treeifyBin(tab, hash);
}
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k)))) { // 遍历链表时有equals,直接替换
break;
}
rootNode = e;
}
}
if (e != null) { // existing mapping for key
// e不为空,代表是覆盖的情况,不是新增
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}
/**
* resize方法,只展示了部分核心代码
* 数组的初始化或扩容,扩容是加倍的
*/
final Node<K,V>[] resize() {
if (oldTab != null) { // 旧数组存在元素
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) { // bucket存在元素
oldTab[j] = null;
if (e.next == null) // 这个bucket没有链表,只需要将它重新计算下在新数组的索引,并放入对于的bucket中
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode) // 红黑树结构
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
/* 旧数组的桶在新数组的索引位的节点 */
// 索引大小没有变化
Node<K,V> loHead = null, loTail = null;
// 索引扩大了旧数组的长度(即新索引位:旧索引位+旧数组长度)
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
// e.hash & oldCap == 0 就代表e的hash值(转换为2进制)高一位为0,
// 与(新的容量-1)相与后,其在数组的索引位置不变
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else { // 这里则高一位为1,与新的容量 & 后,
// 其在新数组的索引位置会增加新容量的扩大值(即原容量的大小)
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}
1.3 线程不安全
- 当两个线程同时put数据时,且被put的两个数据能定位到HashMap数组的相同那个bucket位置上,就可能产生一个覆盖掉另一个的可能,造成一个数据消失。
- 多个线程同时修改 HashMap的结构时(如插入、删除或扩容),可能会导致部分数据丢失。比如线程A插入,而线程B正在扩容,最终导致A线程插入的数据丢失
- 没有volatile或锁的同步机制,会导致一个线程的修改对另一个线程不可见
2 LinkedHashMap
LinkedHashMap
是 HashMap
的子类,它不仅实现了 Map
接口,还具有排序功能。其排序行为由 accessOrder
字段控制
2.1 核心字段
head
:链表的头节点,指向最早插入或访问的节点。tail
:链表的尾节点,指向最近插入或访问的节点。accessOrder
(默认false
):false
:按照插入顺序排序,越晚插入的元素越排在链表末尾。true
:按照访问顺序排序,最近访问的元素会被移动到链表末尾。可用于实现 LRU 缓存。
2.2 排序实现原理
2.2.1 双向链表
// 继承了HashMap.Node的Entry内部结构
static class Entry<K,V> extends HashMap.Node<K,V> { // 具有链表结构的Entry
// 前驱节点和后继节点
Entry<K,V> before, after;
Entry(int hash, K key, V value, Node<K,V> next) {
super(hash, key, value, next);
}
}
2.2.2 添加元素
// 重写了HashMap的newNode方法,在构造新节点时将其添加到链表末尾
Node<K,V> newNode(int hash, K key, V value, Node<K,V> e) {
Entry<K,V> p = new Entry<>(hash, key, value, e);
// 将新节点链接到链表末尾
linkNodeLast(p);
return p;
}
2.2.3 访问节点移动到末尾
如果 accessOrder
为 true
,LinkedHashMap
会在访问节点时(如调用 get
方法)触发 afterNodeAccess
方法,将最近访问的节点移动到链表末尾,基于此可以实现LRU缓存
// 重写的HashMap#afterNodeAccess方法,
void afterNodeAccess(Node<K,V> e) {
LinkedHashMap.Entry<K,V> last;
if (accessOrder && (last = tail) != e) { // accessOrder为true,e不为tail
// 重排序当前元素,将当前节点设为新的tail(保持最近一次被访问的节点在最后面)
LinkedHashMap.Entry<K,V> p =
(LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
p.after = null;
if (b == null) // e为head,更新头节点
head = a;
else
b.after = a;
if (a != null) // e不为tail
a.before = b;
else
last = b;
if (last == null)
head = p;
else {
p.before = last;
last.after = p;
}
tail = p;
++modCount;
}
}
3 ConcurrentHashMap
ConcurrentHashMap
是一个线程安全的 Map
实现,通过 CAS 和 分段锁 机制实现高效的并发操作。其数据结构与 HashMap
类似,采用 数组 + 链表 + 红黑树** 的形式:
- 当链表长度超过 8 时,链表会转换为红黑树。
- 当红黑树节点数小于 6 时,红黑树会退化为链表。
3.1 put方法
死循环put元素,直到操作成功才退出
- 数组还没初始化,开始数组的初始化
- 数组的bucket还未被占用,CAS占用(成功了就break,失败了就代表已经被其他节点占用了,进行下一次循环进入其他if分支)
- 当前桶为
ForwardingNode
,表示有线程正在进行扩容操作,则先帮助扩容,等扩容完毕在继续put - bucket被占用,锁住根节点,开始构造到链表的为尾节点。添加到尾节点后,在判断当前链表长度是否超过8,否则就转换为红黑树
3.2 扩容(重点)
核心是通过 多线程协作 和 分段迁移 的方式进行高效的数据迁移,同时尽量减少对读写操作的影响
3.2.1 扩容触发时机
- 当
ConcurrentHashMap
中的元素数量超过 阈值(threshold) 时触发 - 阈值计算公式:
阈值 = 数组长度 * 负载因子(loadFactor,默认 0.75)
。
3.2.2 具体步骤
首先创建一个新的数据,为元素组大小的2倍。将其设置到nextTable字段
通过CAS设置transferIndex(初始设为旧数组的长度,即是从旧数组末尾开始向前遍历转移数据的)
每个线程通过CAS从transferIndex获取一段连续长度为stride(步长)的桶,stride计算如下
// 计算步长:即扩容时每个线程每次最小处理的数组连续长度 // cpu为1,则由这个线程全部处理;cpu数量大于1,每个核心负责的步长为 数组长度/(8 * cpu核数) ,不过如果计算出步长小于16,则会被设置为16。确保线程的工作量均衡 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE){ stride = MIN_TRANSFER_STRIDE; }
开始迁移数据,到这一步了每个线程就只会迁移自己所负责的步长索引数据,不会冲突
空桶:则放置一个
ForwardingNode
,表示该桶已迁移桶为
ForwardingNode
:当前桶已迁移。因为整体是从后向前迁移的,可推测当前线程负责的这段步长索引一定已处理完毕,即这段步长内这个桶后面的所有桶也都已经被处理完成了,需要重新计算它下一次该负责的新步长索引桶未迁移:则操作加锁,对桶中的链表或红黑树进行迁移,迁移完成后,再将当前桶放置为
ForwardingNode
节点当最后一个线程迁移完毕后,则更新table为新数据和sizeCtl,表示扩容完成
3.2.3 相关思考
步长计算安全嘛?
安全,通过CAS设置公共变量transferIndex(初始值为table.length),同时该变量为volatile,它的变化能立马被其他线程感知到,可以保证每个线程处理的步长索引不会重复和交叉
其他线程如何感知并帮助扩容?
通过判断桶节点为ForwardingNode,则表示正在扩容,此时这个线程则帮助扩容,计算自己需要处理的步长索引来转移数据到新数组中。每处理完一个桶也将其设为ForwardingNode节点
get方法并没有加锁,如果桶已被转移怎么获取到数据?
首先扩容是读写分离的,扩容时不会对桶本身做任何修改(即不会修改Node的内部指针数据),所以如果拿到原桶数据,则能直接遍历获取数据。而如果拿到的是ForwardingNode,它本身也提供了find方法,会到新数组中去找到需要的数据
扩容完成如何处理?
每个线程完成自己负责区间的迁移后,会更新
sizeCtl
字段中的扩容线程数计数,判断确定最后一个完成迁移的线程会将新数组赋值给table并重新计算sizeCtl
的阈值最后需要注意,在操作数组中的桶时,都会获取这个桶节点的锁(put和resize等等修改方法),锁是相同的,所以不必担心某一个桶的相关数据被多个线程同时处理(put,resize等)
3.2.4 核心代码
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算步长:即扩容时每个线程每次最小处理的数组连续长度
// cpu为1,则由这个线程全部处理;cpu数量大于1,每个核心负责的步长为 数组长度/(8 * cpu核数) ,不过如何计算出步长小于16,则会被设置为16。确保线程的工作量均衡
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 创建新数组,2次幂便于扩容计算新索引位置
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 表示从数组末尾开始分配迁移任务
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 当前线程是否需要继续在旧数组上截取一段桶来处理数据,默认是
boolean advance = true;
// 扩容完毕的标志
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) { // i 是当前线程正在处理的桶的索引,bound 是当前线程负责的迁移任务的起始索引(也就是在处理中则 i>bound)
Node<K,V> f; int fh;
// 检查当前线程负责的步长内的桶是否处理完毕。处理完毕,则选取下一段当前现场该处理的步长索引段
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
// i初始化为旧数组最后一个索引位置
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) { // 当前线程扩容完毕处理
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null) // 空桶,CAS放置ForwardingNode,让其他线程可以感知到,以帮助扩容
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED) // 当前桶已被处理
// 到这了就说明这段步长索引处理完毕,需要重新计算新步长索引
advance = true; // already processed
else {
synchronized (f) { // 桶加锁,开始转移当前桶的链表或红黑树到新的数组里
if (tabAt(tab, i) == f) { // 获取成功,再次校验桶节点是否变化,未变才继续操作(避免被其他刚释放了这个锁的线程给修改了)
// 桶数据转移到新数组去,和HashMap类似计算新数组中的索引
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 转移完后,旧数组的桶放置ForwardingNode,表示当前桶已处理完毕并表示为扩容中
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 转移完后,旧数组的桶放置ForwardingNode,表示当前桶已处理完毕并表示为扩容中
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
3.4 为什么key和value不允许为null,而HashMap可以呢?
ConcurrentHashMap如果允许key和value为null,会产生二义性。即不能确定map里本身没有这个数据,还是说有这个数据,但这个数据存的是null值。
为什么HashMap可以允许呢?因为它不会产生二义性,使用HashMap设计用于单线程下,假设我们获取key为A的数据返回了null,之后还马上可以通过containsKey来判断到底是不存在A还是A就为null(因为是单线程,不用担心其他线程会修改数据)
但ConcurrentHashMap是线程安全的,也就是默认会在多线程下修改数据。假设ConcurrentHashMap支持设置null,这时线程A获取key为null的数据返回了null,此时我们不确定A在不在ConcurrentHashMap里,需要用containsKey来判断key为null是否存在于ConcurrentHashMap里。但多线程的情况下,B线程在A线程containsKey操作前添加了key为null的数据,导致A线程containsKey返回了true,导致和第一步预期不同(第一步可能是不存在key为null的数据)
综上:ConcurrentHashMap,它是为并发而生的,它是要用在并发场景中的。假如允许使用 map.get(key)返回 null ,这时是没办法通过 map.containsKey来准确的检测,因为在检测过程中可能会被其他线程锁修改,而导致检测结果并不可靠。所以直接禁用了null,好处就是返回null一定能表示key不存在,而不是有其他的含义,让语义更明确了
所以这个设计选择反映了并发编程的一个重要原则:通过适当的限制来换取更好的可靠性和简单性。虽然失去了存储null值的能力,但换来了更清晰的语义和更好的并发安全性。