【Java 并发笔记】ConcurrentHashMap(1.7) 相关整理
来源:羽杰     阅读:740
云上智慧
发布于 2019-01-09 18:42
查看主页

文前说明

作为码农中的一员,需要不断的学习,我工作之余将少量分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

1. ConcurrentHashMap

public static void main(String[] arg) throws InterruptedException {        final HashMap<String, String> map = new HashMap<String, String>();        Thread thread = new Thread(new Runnable() {            @Override            public void run() {                for (int i = 0; i < 10000; i++) {                    new Thread(new Runnable() {                        @Override                        public void run() {                            map.put(UUID.randomUUID().toString(), "");                        }                    }, "thread" + i).start();                }            }        }, "threadMain");        thread.start();        thread.join();}

1.1 HaspMap(JDK 1.7)

JDK 1.7 HashMap
术语英文解释
hash 算法hash algorithm是一种将任意内容的输入转换成相同长度输出的加密方式,其输出被称为 hash(哈希)值。
hash 表hash table根据设定的哈希函数 H(key) 和解决冲突方法将一组关键字映象到一个有限的地址区间上,并以关键字在地址区间中的象作为记录在表中的存储位置,这种表称为 hash 表或者散列,所得存储位置称为 hash 地址或者散列地址。
属性说明
DEFAULT_INITIAL_CAPACITY初始化桶(数组)大小,始终保持 2^n,可以扩容,扩容后数组大小为当前的 2 倍。默认值 1 << 4(2^4,16)。
MAXIMUM_CAPACITY桶最大值 1 << 30(2^30,1073741824)。
DEFAULT_LOAD_FACTOR默认的负载因子(0.75)。
Entry<K,V>[] tableHaspMap 中的数组。根据需要可调整大小,长度必需是 2^n。
size映射中包含的键值映射的数量。
loadFactor负载因子,可在初始化时显式指定。用于衡量的是一个散列表的空间使用程度。负载因子越大,对空间的利用越充分,查找效率越低;若负载因子越小,数据越稀疏,对空间造成的白费越严重。
threshold扩容的阈值,可在初始化时显式指定。值等于 HashMap 的容量乘以负载因子。
public HashMap() {        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR);}public HashMap(int initialCapacity, float loadFactor) {        if (initialCapacity < 0)            throw new IllegalArgumentException("Illegal initial capacity: " + initialCapacity);        if (initialCapacity > MAXIMUM_CAPACITY)            initialCapacity = MAXIMUM_CAPACITY;        if (loadFactor <= 0 || Float.isNaN(loadFactor))            throw new IllegalArgumentException("Illegal load factor: " + loadFactor);        this.loadFactor = loadFactor;        threshold = initialCapacity;        init();}
transient Entry<K,V>[] table = (Entry<K,V>[]) EMPTY_TABLE;
Entry 属性说明
key写入时的键。
value
next下一个结点对象,用于实现链表结构。
hash当前 key 的 hashcode。

put 方法过程

public V put(K key, V value) {    // 当插入第一个元素的时候,需要先初始化数组大小    if (table == EMPTY_TABLE) {        inflateTable(threshold);    }    // 假如 key 为 null,将 entry 放到 table[0] 中。    if (key == null)        return putForNullKey(value);    // 1. 求出 key 的 hash 值    int hash = hash(key);    // 2. 找到对应的数组下标    int i = indexFor(hash, table.length);    // 3. 遍历一下对应下标处的链表,看能否有重复的 key 已经存在,    //    假如有,直接覆盖,put 方法返回旧值就结束。    for (Entry<K,V> e = table[i]; e != null; e = e.next) {        Object k;        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {            V oldValue = e.value;            e.value = value;            e.recordAccess(this);            return oldValue;        }    }     modCount++;    // 4. 不存在重复的 key,将此 entry 增加到链表中。    addEntry(hash, key, value, i);    return null;}

数组初始化

private void inflateTable(int toSize) {    // 保证数组大小肯定是 2 的 n 次方。    // 比方初始化:new HashMap(20),那么解决成初始数组大小是 32。    int capacity = roundUpToPowerOf2(toSize);    // 计算扩容阈值:capacity * loadFactor    threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);    // 初始化数组    table = new Entry[capacity];    initHashSeedAsNeeded(capacity); //ignore}

计算具体数组位置

static int indexFor(int hash, int length) {    // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";    return hash & (length-1);}

增加结点到链表中

void addEntry(int hash, K key, V value, int bucketIndex) {    // 假如当前 HashMap 大小已经达到了阈值,并且新值要插入的数组位置已经有元素了,那么进行扩容。    if ((size >= threshold) && (null != table[bucketIndex])) {        // 扩容        resize(2 * table.length);        // 扩容后,重新计算 hash 值。        hash = (null != key) ? hash(key) : 0;        // 重新计算扩容后的新下标。        bucketIndex = indexFor(hash, table.length);    }    createEntry(hash, key, value, bucketIndex);}// 将新值放到链表的表头,而后 size++void createEntry(int hash, K key, V value, int bucketIndex) {    Entry<K,V> e = table[bucketIndex];    table[bucketIndex] = new Entry<>(hash, key, value, e);    size++;}

数组扩容

void resize(int newCapacity) {    Entry[] oldTable = table;    int oldCapacity = oldTable.length;    if (oldCapacity == MAXIMUM_CAPACITY) {        threshold = Integer.MAX_VALUE;        return;    }    // 新的数组    Entry[] newTable = new Entry[newCapacity];    // 将原来数组中的值迁移到新的数组中。    transfer(newTable, initHashSeedAsNeeded(newCapacity));    table = newTable;    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);}

get 方法过程

public V get(Object key) {    // key 为 null,会被放到 table[0],所以只需遍历下 table[0] 处的链表即可以。    if (key == null)        return getForNullKey();        Entry<K,V> entry = getEntry(key);     return null == entry ? null : entry.getValue();}
final Entry<K,V> getEntry(Object key) {    if (size == 0) {        return null;    }     int hash = (key == null) ? 0 : hash(key);    // 确定数组下标,而后从头开始遍历链表,直到找到为止    for (Entry<K,V> e = table[indexFor(hash, table.length)];         e != null;         e = e.next) {        Object k;        if (e.hash == hash &&            ((k = e.key) == key || (key != null && key.equals(k))))            return e;    }    return null;}

1.2 HashTable

1.3 ConcurrentHashMap(JDK 1.7)

JDK 1.7 ConcurrentHashMap
属性说明
concurrencyLevel并发度,程序运行时能够同时升级 ConcurrentHashMap 且不产生锁竞争的最大线程数,分段锁个数,即 Segment[] 的数组长度,默认为 16。客户也可以在构造函数中设置并发度。
initialCapacity初始容量,指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。
loadFactor负载因子,Segment 数组不可以扩容,负载因子供每个 Segment 内部使用。
private ConcurrentHashMap.Segment<K, V> ensureSegment(int k) {        ConcurrentHashMap.Segment[] ss = this.segments;        long u = (long)(k << SSHIFT) + SBASE;        ConcurrentHashMap.Segment seg;        if((seg = (ConcurrentHashMap.Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {            ConcurrentHashMap.Segment proto = ss[0];            int cap = proto.table.length;            float lf = proto.loadFactor;            int threshold = (int)((float)cap * lf);            ConcurrentHashMap.HashEntry[] tab = (ConcurrentHashMap.HashEntry[])(new ConcurrentHashMap.HashEntry[cap]);            if((seg = (ConcurrentHashMap.Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {                ConcurrentHashMap.Segment s = new ConcurrentHashMap.Segment(lf, threshold, tab);                while((seg = (ConcurrentHashMap.Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {                    seg = s;                    if(UNSAFE.compareAndSwapObject(ss, u, (Object)null, s)) {                        break;                    }                }            }        }        return seg;}
static final class HashEntry<K, V> {        final int hash;        final K key;        volatile V value;        volatile ConcurrentHashMap.HashEntry<K, V> next;        static final Unsafe UNSAFE;        static final long nextOffset;        HashEntry(int hash, K key, V value, ConcurrentHashMap.HashEntry<K, V> next) {            this.hash = hash;            this.key = key;            this.value = value;            this.next = next;        }        final void setNext(ConcurrentHashMap.HashEntry<K, V> n) {            UNSAFE.putOrderedObject(this, nextOffset, n);        }        static {            try {                UNSAFE = Unsafe.getUnsafe();                Class e = ConcurrentHashMap.HashEntry.class;                nextOffset = UNSAFE.objectFieldOffset(e.getDeclaredField("next"));            } catch (Exception var1) {                throw new Error(var1);            }        }}
HashEntry 属性说明
key写入时的键。
value
next下一个结点对象,用于实现链表结构。
hash当前 key 的 hashcode。
public ConcurrentHashMap(int initialCapacity,                         float loadFactor, int concurrencyLevel) {    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)        throw new IllegalArgumentException();    if (concurrencyLevel > MAX_SEGMENTS)        concurrencyLevel = MAX_SEGMENTS;    // Find power-of-two sizes best matching arguments    int sshift = 0;    int ssize = 1;    // 计算并行级别 ssize,由于要保持并行级别是 2 的 n 次方    while (ssize < concurrencyLevel) {        ++sshift;        ssize <<= 1;    }    // 默认值,concurrencyLevel 为 16,sshift 为 4    // 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值    this.segmentShift = 32 - sshift;    this.segmentMask = ssize - 1;     if (initialCapacity > MAXIMUM_CAPACITY)        initialCapacity = MAXIMUM_CAPACITY;     // initialCapacity 是设置整个 map 初始的大小,    // 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小    // 如 initialCapacity 为 64,那么每个 Segment 或者称之为"槽"可以分到 4 个    int c = initialCapacity / ssize;    if (c * ssize < initialCapacity)        ++c;    // 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,由于这样的话,对于具体的槽上,    // 插入一个元素不至于扩容,插入第二个的时候才会扩容    int cap = MIN_SEGMENT_TABLE_CAPACITY;     while (cap < c)        cap <<= 1;     // 创立 Segment 数组,    // 并创立数组的第一个元素 segment[0]    Segment<K,V> s0 =        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),                         (HashEntry<K,V>[])new HashEntry[cap]);    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];    // 往数组写入 segment[0]    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]    this.segments = ss;}

put 方法过程

public V put(K key, V value) {    Segment<K,V> s;    if (value == null)        throw new NullPointerException();    // 1. 计算 key 的 hash 值    int hash = hash(key);    // 2. 根据 hash 值找到 Segment 数组中的位置 j    //    hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位,    //    而后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的最后 4 位,也就是槽的数组下标    int j = (hash >>> segmentShift) & segmentMask;    // 初始化的时候只初始化了 segment[0],其余位置还是 null,    // ensureSegment(j) 对 segment[j] 进行初始化    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment        s = ensureSegment(j);    // 3. 插入新值到 槽 s 中    return s.put(key, hash, value, false);}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {    // 在往该 segment 写入前,需要先获取该 segment 的独占锁    HashEntry<K,V> node = tryLock() ? null :        scanAndLockForPut(key, hash, value);    V oldValue;    try {        // segment 内部的数组        HashEntry<K,V>[] tab = table;        // 利用 hash 值,求应该放置的数组下标        int index = (tab.length - 1) & hash;        // first 是数组该位置处的链表的表头        HashEntry<K,V> first = entryAt(tab, index);         for (HashEntry<K,V> e = first;;) {            if (e != null) {                K k;                if ((k = e.key) == key ||                    (e.hash == hash && key.equals(k))) {                    oldValue = e.value;                    if (!onlyIfAbsent) {                        // 覆盖旧值                        e.value = value;                        ++modCount;                    }                    break;                }                // 继续顺着链表走                e = e.next;            }            else {                // node 是不是 null,这个要看获取锁的过程。                // 假如不为 null,那就直接将它设置为链表表头;假如是 null,初始化并设置为链表表头。                if (node != null)                    node.setNext(first);                else                    node = new HashEntry<K,V>(hash, key, value, first);                 int c = count + 1;                // 假如超过了该 segment 的阈值,这个 segment 需要扩容                if (c > threshold && tab.length < MAXIMUM_CAPACITY)                    rehash(node); // 扩容                else                    // 没有达到阈值,将 node 放到数组 tab 的 index 位置,                    // 将新的结点设置成原链表的表头                    setEntryAt(tab, index, node);                ++modCount;                count = c;                oldValue = null;                break;            }        }    } finally {        // 解锁        unlock();    }    return oldValue;}

初始化槽

private Segment<K,V> ensureSegment(int k) {    final Segment<K,V>[] ss = this.segments;    long u = (k << SSHIFT) + SBASE; // raw offset    Segment<K,V> seg;    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {        // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k],这就是之前要初始化 segment[0] 的起因。        // 为什么要用 " 当前 ",由于 segment[0] 可能早就扩容过了。        Segment<K,V> proto = ss[0];        int cap = proto.table.length;        float lf = proto.loadFactor;        int threshold = (int)(cap * lf);         // 初始化 segment[k] 内部的数组        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))            == null) { // 再次检查一遍该槽能否被其余线程初始化。             Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);            // 使用 while 循环,内部用 CAS,当前线程成功设值或者其余线程成功设值后,退出            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))                   == null) {                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))                    break;            }        }    }    return seg;}
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {    HashEntry<K,V> first = entryForHash(this, hash);    HashEntry<K,V> e = first;    HashEntry<K,V> node = null;    int retries = -1; // negative while locating node     // 循环获取锁    while (!tryLock()) {        HashEntry<K,V> f; // to recheck first below        if (retries < 0) {            if (e == null) {                if (node == null) // speculatively create node                    // 进到这里说明数组该位置的链表是空的,没有任何元素                    // 当然,进到这里的另一个起因是 tryLock() 失败,所以该槽存在并发,不肯定是该位置                    node = new HashEntry<K,V>(hash, key, value, null);                retries = 0;            }            else if (key.equals(e.key))                retries = 0;            else                // 顺着链表往下走                e = e.next;        }        // 重试次数假如超过 MAX_SCAN_RETRIES(单核 1 次多核 64 次),那么不抢了,进入到阻塞队列等待锁        //    lock() 是阻塞方法,直到获取锁后返回        else if (++retries > MAX_SCAN_RETRIES) {            lock();            break;        }        else if ((retries & 1) == 0 &&                 // 进入这里,说明有新的元素进到了链表,并且成为了新的表头                 // 这边的策略是,重新执行 scanAndLockForPut 方法                 (f = entryForHash(this, hash)) != first) {            e = first = f; // re-traverse if entry changed            retries = -1;        }    }    return node;}
  1. 尝试自旋获取锁。
  2. 重试的次数达到了 MAX_SCAN_RETRIES 则改为阻塞锁获取,保证能获取成功。
    • MAX_SCAN_RETRIES 单核 CPU,重试 1 次,多核 CPU 重试 64 次。

数组扩容

// 方法参数上的 node 是这次扩容后,需要增加到新的数组中的数据。private void rehash(HashEntry<K,V> node) {    HashEntry<K,V>[] oldTable = table;    int oldCapacity = oldTable.length;    // 2 倍    int newCapacity = oldCapacity << 1;    threshold = (int)(newCapacity * loadFactor);    // 创立新数组    HashEntry<K,V>[] newTable =        (HashEntry<K,V>[]) new HashEntry[newCapacity];    // 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’    int sizeMask = newCapacity - 1;     // 遍历原数组,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置    for (int i = 0; i < oldCapacity ; i++) {        // e 是链表的第一个元素        HashEntry<K,V> e = oldTable[i];        if (e != null) {            HashEntry<K,V> next = e.next;            // 计算应该放置在新数组中的位置,            // 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者者是 3 + 16 = 19            int idx = e.hash & sizeMask;            if (next == null)   // 该位置处只有一个元素                newTable[idx] = e;            else { // Reuse consecutive sequence at same slot                // e 是链表表头                HashEntry<K,V> lastRun = e;                // idx 是当前链表的头结点 e 的新位置                int lastIdx = idx;                 // for 循环找到一个 lastRun 结点,这个结点之后的所有元素是将要放到一起的                for (HashEntry<K,V> last = next;                     last != null;                     last = last.next) {                    int k = last.hash & sizeMask;                    if (k != lastIdx) {                        lastIdx = k;                        lastRun = last;                    }                }                // 将 lastRun 及其之后的所有结点组成的这个链表放到 lastIdx 这个位置                newTable[lastIdx] = lastRun;                // 下面的操作是解决 lastRun 之前的结点,                //    这些结点可能分配在另一个链表中,也可能分配到上面的那个链表中                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {                    V v = p.value;                    int h = p.hash;                    int k = h & sizeMask;                    HashEntry<K,V> n = newTable[k];                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);                }            }        }    }    // 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部    int nodeIndex = node.hash & sizeMask; // add the new node    node.setNext(newTable[nodeIndex]);    newTable[nodeIndex] = node;    table = newTable;}

put 方法的流程。

  1. 将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry。
  2. 遍历该 HashEntry,假如不为空则判断传入的 key 和当前遍历的 key 能否相等,相等则覆盖旧的 value。
  3. 不为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断能否需要扩容。
  4. 最后再解除在第 1 步中所获取当前 Segment 的锁。

get 方法过程

public V get(Object key) {    Segment<K,V> s; // manually integrate access methods to reduce overhead    HashEntry<K,V>[] tab;    // 1. hash 值    int h = hash(key);    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;    // 2. 根据 hash 找到对应的 segment    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&        (tab = s.table) != null) {        // 3. 找到segment 内部数组相应位置的链表,遍历        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);             e != null; e = e.next) {            K k;            if ((k = e.key) == key || (e.hash == h && key.equals(k)))                return e.value;        }    }    return null;}
  1. 计算 hash 值,找到 segment 数组中的具体位置,获使用的槽。
  2. 槽中也是一个数组,根据 hash 找到数组中具体的位置。
  3. 顺着链表进行查找就可。

put 操作的线程安全性

remove 操作的线程安全性

public V remove(Object key) {        int hash = this.hash(key);        ConcurrentHashMap.Segment s = this.segmentForHash(hash);        return s == null?null:s.remove(key, hash, (Object)null);}
final V remove(Object key, int hash, Object value) {            if(!this.tryLock()) {                this.scanAndLock(key, hash);            }            Object oldValue = null;            try {                ConcurrentHashMap.HashEntry[] tab = this.table;                int index = tab.length - 1 & hash;                ConcurrentHashMap.HashEntry e = ConcurrentHashMap.entryAt(tab, index);                ConcurrentHashMap.HashEntry next;                for(ConcurrentHashMap.HashEntry pred = null; e != null; e = next) {                    next = e.next;                    Object k = e.key;                    if(e.key == key || e.hash == hash && key.equals(k)) {                        Object v = e.value;                        if(value != null && value != v && !value.equals(v)) {                            break;                        }                        if(pred == null) {                            ConcurrentHashMap.setEntryAt(tab, index, next);                        } else {                            pred.setNext(next);                        }                        ++this.modCount;                        --this.count;                        oldValue = v;                        break;                    }                    pred = e;                }            } finally {                this.unlock();            }            return oldValue;}

size

免责声明:本文为用户发表,不代表网站立场,仅供参考,不构成引导等用途。 系统环境 服务器应用
相关推荐
HTML 音频
【Docker 系列】我们来看看容器数据卷究竟是个啥
将scss收为己使用,如何使唤scss?
小白学习Java技术知识点总结,其实学习Java没那么难
isType(判断数据类型进行封装)
首页
搜索
订单
购物车
我的