Administrator
发布于 2025-01-12 / 40 阅读
0
0

Java源码篇-Map


title: Java源码篇-Map
author: reef
categories: jdk
tags:

  • Map
  • ConcurrentHashMap
    date: 2020-04-09 19:21:44

    summary: jdk中map相关源码

Java源码篇-Map

1 HashMap

1.1 重点字段

/**
 * 数组默认长度
 */
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16

/**
 * 最大容量(即数组最大长度)
 */
static final int MAXIMUM_CAPACITY = 1 << 30;

/**
 * 默认加载因子
 * 加载因子(loadFactor)是哈希表中用于控制数组存放数据疏密程度的参数。
 * 当loadFactor越趋近于1时,数组中存放的数据(entry)越多,哈希冲突的概率增加,
 * 导致单个桶中的链表长度可能增加,进而影响查找元素的效率。反之,当loadFactor越小,
 * 数组中存放的数据越少,数据分布越稀疏,数组的利用率降低。
 *
 * 默认的loadFactor值为0.75f,是官方经过权衡后给出的一个较为理想的平衡点,
 * 旨在兼顾查找效率和空间利用率。
 */
static final float DEFAULT_LOAD_FACTOR = 0.75f;

/**
 * 树化阈值。当桶(bucket)上的结点数大于这个值时会转成红黑树
 */
static final int TREEIFY_THRESHOLD = 8;

/**
 * 链表化阈值(当桶(bucket)上的结点数小于这个值时树转链表)
 */
static final int UNTREEIFY_THRESHOLD = 6;

/**
 * 桶中结构树化对应的table的最小长度
 */
static final int MIN_TREEIFY_CAPACITY = 64;

/**
 * 扩容阈值(threshold)
 * threshold = loadFactor * 数组长度
 * 当HashMap中元素的数量超过threshold时,会触发数组的扩容操作。
 * 扩容是为了减少哈希冲突,保持查找效率。
 */
int threshold;

1.2 核心方法

1.2.1 put

1 初始化数组
  • 如果是第一次添加元素,先初始化数组(即分配内存空间)。
  • 计算键的哈希值并确定索引位置,然后将键值对放入对应的桶(bucket)中。
2 目标桶为空
  • 如果计算索引后,数组中对应的桶为空,则直接将键值对放入该桶中。
3 目标桶不为空
  • 根节点匹配
  • 如果桶中的根节点(第一个节点)与待插入节点的键 equals,则直接替换根节点的值。
  • 树化节点
  • 如果根节点是树形节点(即红黑树节点),则调用红黑树的插入方法将节点放入树中。
  • 链表遍历
  • 如果根节点是链表节点,则遍历链表:
    • 如果找到与待插入节点键 equals 的节点,则替换其值。
    • 如果遍历到链表末尾仍未找到匹配的节点,则将新节点插入链表末尾。
    • 树化条件
    • 如果链表长度(包括待插入节点)达到 8 且数组长度大于等于 64,则将链表转换为红黑树。
    • 如果链表长度达到 8 但数组长度小于 64,则仅进行数组扩容,不进行树化。

1.2.2 resize

HashMap 数组的长度始终为 2 的次幂,且扩容时长度加倍。这样设计的主要目的是为了方便扩容时的索引计算。以下为具体的扩容过程

1 创建新数组
  • 先创建一个长度为原数组 2 倍的新数组。
2 迁移数据
  • 遍历原数组中的每个桶(bucket):

  • 如果桶为空或只有一个元素

    • 直接计算该元素在新数组中的索引,并将其放入新数组。
  • 如果桶中有链表结构

    • 遍历链表中的每个节点(Node),计算其在新数组中的索引。

    • 由于新数组长度是原数组的 2 倍,且长度始终为 2 的次幂,因此新索引的计算方法为:

      newIndex = (key.hashCode() & (newCapacity - 1))
      
    • 新索引的结果只有两种可能:

    • 与原索引相同:如果 key 的哈希值在高一位为 0。

    • 等于原索引加上原数组长度:如果 key 的哈希值在高一位为 1。

    • 根据计算结果,将节点放入新数组的对应位置。

3 链表拆分
  • 如果原桶中的链表被拆分为两个链表(一个保持原索引,另一个为原索引加上原数组长度),则分别将它们放入新数组的对应位置。

1.2.3 部分核心代码

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
             boolean evict) {
  Node<K,V>[] tab; Node<K,V> rootNode; int tableLength, index;
  if ((tab = table) == null || (tableLength = tab.length) == 0) // 添加第一个元素
      tableLength = (tab = resize()).length;
  if ((rootNode = tab[index = (tableLength - 1) & hash]) == null) // 数组上对于的索引为空,代表这个kv可以直接放到这
      tab[index] = newNode(hash, key, value, null);
  else {
      Node<K,V> e; K k;
      if (rootNode.hash == hash &&
          ((k = rootNode.key) == key || (key != null && key.equals(k)))) // bucket上的元素equals要放进来的kv,直接覆盖
          e = rootNode;
      else if (rootNode instanceof TreeNode)  // bucket上已经是红黑树结构了,直接存放为红黑树结构
          e = ((TreeNode<K,V>)rootNode).putTreeVal(this, tab, hash, key, value);
      else { // 向链表的末尾添加
          for (int binCount = 0; ; ++binCount) {
              if ((e = rootNode.next) == null) { // 到链表末尾了
                  rootNode.next = newNode(hash, key, value, null);
                  if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                  { // 链表的长度(算上bucket)已经大于等于了8,转换为红黑树
                      treeifyBin(tab, hash);
                  }
                  break;
              }
              if (e.hash == hash &&
                  ((k = e.key) == key || (key != null && key.equals(k)))) { // 遍历链表时有equals,直接替换
                  break;
              }
              rootNode = e;
          }
      }
      if (e != null) { // existing mapping for key
          // e不为空,代表是覆盖的情况,不是新增
          V oldValue = e.value;
          if (!onlyIfAbsent || oldValue == null)
              e.value = value;
          afterNodeAccess(e);
          return oldValue;
      }
  }
  ++modCount;
  if (++size > threshold)
      resize();
  afterNodeInsertion(evict);
  return null;
}

/**
 * resize方法,只展示了部分核心代码
 * 数组的初始化或扩容,扩容是加倍的
 */
final Node<K,V>[] resize() {
    if (oldTab != null) { // 旧数组存在元素
        for (int j = 0; j < oldCap; ++j) {
            Node<K,V> e;
            if ((e = oldTab[j]) != null) { // bucket存在元素
                oldTab[j] = null;
                if (e.next == null) // 这个bucket没有链表,只需要将它重新计算下在新数组的索引,并放入对于的bucket中
                    newTab[e.hash & (newCap - 1)] = e;
                else if (e instanceof TreeNode) // 红黑树结构
                    ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                else { // preserve order
                    /* 旧数组的桶在新数组的索引位的节点 */
                    // 索引大小没有变化
                    Node<K,V> loHead = null, loTail = null;
                    // 索引扩大了旧数组的长度(即新索引位:旧索引位+旧数组长度)
                    Node<K,V> hiHead = null, hiTail = null;
                    Node<K,V> next;
                    do {
                        next = e.next;
                        // e.hash & oldCap == 0 就代表e的hash值(转换为2进制)高一位为0,
                        // 与(新的容量-1)相与后,其在数组的索引位置不变
                        if ((e.hash & oldCap) == 0) {
                            if (loTail == null)
                                loHead = e;
                            else
                                loTail.next = e;
                            loTail = e;
                        }
                        else { // 这里则高一位为1,与新的容量 & 后,
                             // 其在新数组的索引位置会增加新容量的扩大值(即原容量的大小)
                            if (hiTail == null)
                                hiHead = e;
                            else
                                hiTail.next = e;
                            hiTail = e;
                        }
                    } while ((e = next) != null);
                    if (loTail != null) {
                        loTail.next = null;
                        newTab[j] = loHead;
                    }
                    if (hiTail != null) {
                        hiTail.next = null;
                        newTab[j + oldCap] = hiHead;
                    }
                }
            }
        }
    }
    return newTab;
}

1.3 线程不安全

  • 当两个线程同时put数据时,且被put的两个数据能定位到HashMap数组的相同那个bucket位置上,就可能产生一个覆盖掉另一个的可能,造成一个数据消失。
  • 多个线程同时修改 HashMap的结构时(如插入、删除或扩容),可能会导致部分数据丢失。比如线程A插入,而线程B正在扩容,最终导致A线程插入的数据丢失
  • 没有volatile或锁的同步机制,会导致一个线程的修改对另一个线程不可见

2 LinkedHashMap

LinkedHashMapHashMap 的子类,它不仅实现了 Map 接口,还具有排序功能。其排序行为由 accessOrder 字段控制

2.1 核心字段

  • head:链表的头节点,指向最早插入或访问的节点。
  • tail:链表的尾节点,指向最近插入或访问的节点。
  • accessOrder(默认 false):
  • false:按照插入顺序排序,越晚插入的元素越排在链表末尾。
  • true:按照访问顺序排序,最近访问的元素会被移动到链表末尾。可用于实现 LRU 缓存。

2.2 排序实现原理

2.2.1 双向链表

// 继承了HashMap.Node的Entry内部结构
static class Entry<K,V> extends HashMap.Node<K,V> { // 具有链表结构的Entry
    // 前驱节点和后继节点
    Entry<K,V> before, after;
    Entry(int hash, K key, V value, Node<K,V> next) {
        super(hash, key, value, next);
    }
}

2.2.2 添加元素

// 重写了HashMap的newNode方法,在构造新节点时将其添加到链表末尾
Node<K,V> newNode(int hash, K key, V value, Node<K,V> e) {
    Entry<K,V> p = new Entry<>(hash, key, value, e);
    // 将新节点链接到链表末尾
    linkNodeLast(p);
    return p;
}

2.2.3 访问节点移动到末尾

如果 accessOrdertrueLinkedHashMap 会在访问节点时(如调用 get 方法)触发 afterNodeAccess 方法,将最近访问的节点移动到链表末尾,基于此可以实现LRU缓存

// 重写的HashMap#afterNodeAccess方法,
void afterNodeAccess(Node<K,V> e) {
    LinkedHashMap.Entry<K,V> last;
    if (accessOrder && (last = tail) != e) { // accessOrder为true,e不为tail
        // 重排序当前元素,将当前节点设为新的tail(保持最近一次被访问的节点在最后面)
        LinkedHashMap.Entry<K,V> p =
            (LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
        p.after = null;
        if (b == null) // e为head,更新头节点
            head = a;
        else
            b.after = a;
        if (a != null) // e不为tail
            a.before = b;
        else
            last = b;
        if (last == null)
            head = p;
        else {
            p.before = last;
            last.after = p;
        }
        tail = p;
        ++modCount;
    }
}

3 ConcurrentHashMap

ConcurrentHashMap 是一个线程安全的 Map 实现,通过 CAS 和 分段锁 机制实现高效的并发操作。其数据结构与 HashMap 类似,采用 数组 + 链表 + 红黑树** 的形式:

  • 当链表长度超过 8 时,链表会转换为红黑树。
  • 当红黑树节点数小于 6 时,红黑树会退化为链表。

3.1 put方法

死循环put元素,直到操作成功才退出

  • 数组还没初始化,开始数组的初始化
  • 数组的bucket还未被占用,CAS占用(成功了就break,失败了就代表已经被其他节点占用了,进行下一次循环进入其他if分支)
  • 当前桶为ForwardingNode,表示有线程正在进行扩容操作,则先帮助扩容,等扩容完毕在继续put
  • bucket被占用,锁住根节点,开始构造到链表的为尾节点。添加到尾节点后,在判断当前链表长度是否超过8,否则就转换为红黑树

3.2 扩容(重点)

​ 核心是通过 多线程协作分段迁移 的方式进行高效的数据迁移,同时尽量减少对读写操作的影响

3.2.1 扩容触发时机

  • ConcurrentHashMap 中的元素数量超过 阈值(threshold) 时触发
  • 阈值计算公式:阈值 = 数组长度 * 负载因子(loadFactor,默认 0.75)

3.2.2 具体步骤

  • 首先创建一个新的数据,为元素组大小的2倍。将其设置到nextTable字段

  • 通过CAS设置transferIndex(初始设为旧数组的长度,即是从旧数组末尾开始向前遍历转移数据的)

  • 每个线程通过CAS从transferIndex获取一段连续长度为stride(步长)的桶,stride计算如下

  • // 计算步长:即扩容时每个线程每次最小处理的数组连续长度
    // cpu为1,则由这个线程全部处理;cpu数量大于1,每个核心负责的步长为 数组长度/(8 * cpu核数) ,不过如果计算出步长小于16,则会被设置为16。确保线程的工作量均衡
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE){
      stride = MIN_TRANSFER_STRIDE;
    } 
    
  • 开始迁移数据,到这一步了每个线程就只会迁移自己所负责的步长索引数据,不会冲突

  • 空桶:则放置一个 ForwardingNode,表示该桶已迁移

  • 桶为ForwardingNode:当前桶已迁移。因为整体是从后向前迁移的,可推测当前线程负责的这段步长索引一定已处理完毕,即这段步长内这个桶后面的所有桶也都已经被处理完成了,需要重新计算它下一次该负责的新步长索引

  • 桶未迁移:则操作加锁,对桶中的链表或红黑树进行迁移,迁移完成后,再将当前桶放置为ForwardingNode节点

  • 当最后一个线程迁移完毕后,则更新table为新数据和sizeCtl,表示扩容完成

3.2.3 相关思考

  • 步长计算安全嘛?

    ​ 安全,通过CAS设置公共变量transferIndex(初始值为table.length),同时该变量为volatile,它的变化能立马被其他线程感知到,可以保证每个线程处理的步长索引不会重复和交叉

  • 其他线程如何感知并帮助扩容?

    ​ 通过判断桶节点为ForwardingNode,则表示正在扩容,此时这个线程则帮助扩容,计算自己需要处理的步长索引来转移数据到新数组中。每处理完一个桶也将其设为ForwardingNode节点

  • get方法并没有加锁,如果桶已被转移怎么获取到数据?

    ​ 首先扩容是读写分离的,扩容时不会对桶本身做任何修改(即不会修改Node的内部指针数据),所以如果拿到原桶数据,则能直接遍历获取数据。而如果拿到的是ForwardingNode,它本身也提供了find方法,会到新数组中去找到需要的数据

  • 扩容完成如何处理?

    ​ 每个线程完成自己负责区间的迁移后,会更新sizeCtl字段中的扩容线程数计数,判断确定最后一个完成迁移的线程会将新数组赋值给table并重新计算sizeCtl的阈值

  • 最后需要注意,在操作数组中的桶时,都会获取这个桶节点的锁(put和resize等等修改方法),锁是相同的,所以不必担心某一个桶的相关数据被多个线程同时处理(put,resize等)

3.2.4 核心代码

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 计算步长:即扩容时每个线程每次最小处理的数组连续长度
    // cpu为1,则由这个线程全部处理;cpu数量大于1,每个核心负责的步长为 数组长度/(8 * cpu核数) ,不过如何计算出步长小于16,则会被设置为16。确保线程的工作量均衡
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            // 创建新数组,2次幂便于扩容计算新索引位置
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        // 表示从数组末尾开始分配迁移任务
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 当前线程是否需要继续在旧数组上截取一段桶来处理数据,默认是
    boolean advance = true;
    // 扩容完毕的标志
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) { // i 是当前线程正在处理的桶的索引,bound 是当前线程负责的迁移任务的起始索引(也就是在处理中则 i>bound)
        Node<K,V> f; int fh;
        // 检查当前线程负责的步长内的桶是否处理完毕。处理完毕,则选取下一段当前现场该处理的步长索引段
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                // i初始化为旧数组最后一个索引位置
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) { // 当前线程扩容完毕处理
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null) // 空桶,CAS放置ForwardingNode,让其他线程可以感知到,以帮助扩容
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED) // 当前桶已被处理
            // 到这了就说明这段步长索引处理完毕,需要重新计算新步长索引
            advance = true; // already processed
        else {
            synchronized (f) { // 桶加锁,开始转移当前桶的链表或红黑树到新的数组里
                if (tabAt(tab, i) == f) { // 获取成功,再次校验桶节点是否变化,未变才继续操作(避免被其他刚释放了这个锁的线程给修改了)
                    // 桶数据转移到新数组去,和HashMap类似计算新数组中的索引
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        // 转移完后,旧数组的桶放置ForwardingNode,表示当前桶已处理完毕并表示为扩容中
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        // 转移完后,旧数组的桶放置ForwardingNode,表示当前桶已处理完毕并表示为扩容中
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

3.4 为什么key和value不允许为null,而HashMap可以呢?

ConcurrentHashMap如果允许key和value为null,会产生二义性。即不能确定map里本身没有这个数据,还是说有这个数据,但这个数据存的是null值。

​ 为什么HashMap可以允许呢?因为它不会产生二义性,使用HashMap设计用于单线程下,假设我们获取key为A的数据返回了null,之后还马上可以通过containsKey来判断到底是不存在A还是A就为null(因为是单线程,不用担心其他线程会修改数据)

​ 但ConcurrentHashMap是线程安全的,也就是默认会在多线程下修改数据。假设ConcurrentHashMap支持设置null,这时线程A获取key为null的数据返回了null,此时我们不确定A在不在ConcurrentHashMap里,需要用containsKey来判断key为null是否存在于ConcurrentHashMap里。但多线程的情况下,B线程在A线程containsKey操作前添加了key为null的数据,导致A线程containsKey返回了true,导致和第一步预期不同(第一步可能是不存在key为null的数据)

​ 综上:ConcurrentHashMap,它是为并发而生的,它是要用在并发场景中的。假如允许使用 map.get(key)返回 null ,这时是没办法通过 map.containsKey来准确的检测,因为在检测过程中可能会被其他线程锁修改,而导致检测结果并不可靠。所以直接禁用了null,好处就是返回null一定能表示key不存在,而不是有其他的含义,让语义更明确了

​ 所以这个设计选择反映了并发编程的一个重要原则:通过适当的限制来换取更好的可靠性和简单性。虽然失去了存储null值的能力,但换来了更清晰的语义和更好的并发安全性。


评论