Java同步数据结构之CopyOnWriteArrayList/CopyOnWriteArraySet

2022-12-07,,,,

前言

前面介绍完了队列(包括双端队列),今天探讨以下Java并发包中一个List的并发数据结构实现CopyOnWriteArrayList,顾名思义CopyOnWriteArrayList也是一种基于数组的类似ArrayList的集合,CopyOnWriteArrayList比起ArrayList最大的区别在于,CopyOnWriteArrayList实现的时候采用的是通过对底层数组进行一次新的复制来实现的;另外CopyOnWriteArrayList增加了对线程安全的支持,并且只对写操作进行同步,相当于做了读写分离,因此CopyOnWriteArrayList可以认为是线程安全并且是读写分离的ArrayList实现,也是无界队列。

CopyOnWriteArrayList尤其适用于当读操作(或者遍历)比写操作频繁的场景,毕竟它内部实现需要对写操作加锁,而且每一次写操作都需要复制整个底层数组,开销是很大的;它的迭代器在创建迭代器的时候已经拿到数组快照,往后对数组的写操作都不会反映到迭代器持有的数组快照,因此其迭代器实现是弱一致性的(不会抛出ConcurrentModificationException),也不会反映迭代器创建之后对列表的添加、删除或更改,同时也不支持迭代器的remove等元素更改操作。

CopyOnWriteArrayList的内存一致性:与其他并发集合一样,在将对象放入CopyOnWriteArrayList之前的线程中的操作happen-before随后另一个线程从CopyOnWriteArrayList中访问或删除该元素之后的操作。

ArrayList

在理解CopyOnWriteArrayList之前,我们有必要对ArrayList进行简单的了解,才能明白CopyOnWriteArrayList的重要性。ArrayList是一个非线程安全的基于数组的无界队列,ArrayList的构造方法指定的大小仅仅是初始容量,而不是队列的固定大小。如果使用ArrayList的无参构造方法ArrayList()创建的实例则第一次添加元素之前将一次性将数组扩容到10个长度,当10个长度用完之后再次需要扩容时扩容大小至少是原容量的一半;否则在每一次需要扩容时按每一次扩容至少是原容量的一半的策略进行扩容。ArrayList内部主要借助:新数组 = Arrays.copyOf(旧数组,新长度)的方式对数组进行扩容,这一点其实与CopyOnWriteArrayList差不多,都是采用拷贝复制产生新数组的方式进行的。

针对需要一次性添加大量,为了避免大量的扩容操作,ArrayList提供了一个ensureCapacity(int)方法在添加大批量元素之前调用,这可以减少递增式再分配的数量。ArrayList是非线程安全的,即不是同步的,这不利于多线程并发环境下的使用,针对线程同步的问题,ArrayList的Java Doc中提到了需要使用如下的方式进行同步包装:

List list = Collections.synchronizedList(new ArrayList(...)); 

但是Collections的这种同步包装实现内部采用了悲观锁的实现方式,即对每一个方法不论读写都加上了synchronized关键字,这显然在并发度越高的情况下越是吞吐量低下的。并且针对迭代器它也没有支持同步,需要调用者手动同步。总之,ArrayList是一个非线程安全的队列,不适用于并发情况,而采用Collections包装过的同步List性能低下,这时候就需要CopyOnWriteArrayList登场了,它不仅是线程安全的,并且是读写分离的,只对写操作加锁,不影响读操作。

CopyOnWriteArrayList

CopyOnWriteArrayList除了对写操作进行了同步,比起ArrayList实现看起来更简化了,最主要是它去掉了ArrayList实现的繁复的扩容机制,仅仅依靠Arrays.copyOf直接拷贝数组产生新的数组实例。下面依次看看源码:

构造方法

 /** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock(); /** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array; //创建包含指定集合元素的CopyOnWriteArrayList实例
public CopyOnWriteArrayList(Collection<? extends E> c) {
Object[] elements;
//如果是CopyOnWriteArrayList,直接拿到数组即可
if (c.getClass() == CopyOnWriteArrayList.class)
elements = ((CopyOnWriteArrayList<?>)c).getArray();
else {
elements = c.toArray();
// c.toArray might (incorrectly) not return Object[] (see 6260652)
//这里听说是JDK8以下的bug,从JDK8开始已经被修复
//https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6260652
if (elements.getClass() != Object[].class)
elements = Arrays.copyOf(elements, elements.length, Object[].class);
}
setArray(elements);
}

CopyOnWriteArrayList的构造方法提供了无参的构造方法以及根据集合和数组初始化CopyOnWriteArrayList的构造方法,都比较简单就不细说了。

写操作

CopyOnWriteArrayList的读操作方法很简单,并不需要加锁同步,这里只看同步方法:

 public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); //加锁
try {
Object[] elements = getArray();
int len = elements.length;
//拷贝产生新数组
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e; //添加新元素
setArray(newElements); //更新数组引用指向新数组
return true;
} finally {
lock.unlock(); //释放锁
}
} public void add(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock(); //加锁
try {
Object[] elements = getArray();
int len = elements.length;
if (index > len || index < 0) //越界错误
throw new IndexOutOfBoundsException("Index: "+index+
", Size: "+len);
Object[] newElements;
int numMoved = len - index; //需要移动的元素个数
if (numMoved == 0) //不需要移动任何元素,直接拷贝产生新数组
newElements = Arrays.copyOf(elements, len + 1);
else {
newElements = new Object[len + 1];//创建新数组
System.arraycopy(elements, 0, newElements, 0, index); //拷贝前面的元素
System.arraycopy(elements, index, newElements, index + 1,
numMoved); //拷贝后面的元素
}
newElements[index] = element; //将新元素添加到指定的位置
setArray(newElements); //更新数组引用指向新数组
} finally {
lock.unlock(); //释放锁
}
} //如果元素不存在队列中则添加,若元素被添加则返回true
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
} private boolean addIfAbsent(E e, Object[] snapshot) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] current = getArray();
int len = current.length;
if (snapshot != current) { //数组已经被更新
// Optimize for lost race to another addXXX operation
int common = Math.min(snapshot.length, len);
for (int i = 0; i < common; i++)
if (current[i] != snapshot[i] && eq(e, current[i]))
return false; //说明新数组原来某个下标值刚好被e替换,直接返回false
if (indexOf(e, current, common, len) >= 0)
return false;//说明e作为新元素已经被添加到队列,直接返回false
}
//不存在于当前队列中,先拷贝产生新数组
Object[] newElements = Arrays.copyOf(current, len + 1);
newElements[len] = e; //将新元素添加到队列末尾
setArray(newElements); //更新数组引用指向新数组
return true;
} finally {
lock.unlock();
}
} //用指定的元素替换列表中指定位置的元素。
public E set(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
E oldValue = get(elements, index); if (oldValue != element) {//若需要更新相应位置的元素
int len = elements.length;
//拷贝产生新的数组
Object[] newElements = Arrays.copyOf(elements, len);
newElements[index] = element; //重置相应位置的元素
setArray(newElements); //更新数组引用指向新数组
} else {
// Not quite a no-op; ensures volatile write semantics
setArray(elements); //虽然不需要做任何更新,但是为了确保CopyOnWriteArrayList的内存一致性
}
return oldValue;
} finally {
lock.unlock();
}
} //移除列表中指定位置的元素。将任何后续元素向左移动(从它们的索引中减去一个)。返回从列表中删除的元素。
public E remove(int index) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0) //需要移动的元素个数为0,移除的是最后一个元素
setArray(Arrays.copyOf(elements, len - 1));//拷贝产生新数组
else {
//创建新数组
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);//拷贝前面的元素
System.arraycopy(elements, index + 1, newElements, index,
numMoved);//拷贝后面的元素
setArray(newElements); //更新数组引用指向新数组
}
return oldValue;
} finally {
lock.unlock();
}
} //Jdk8新增的lambda表达式方法,移除满足过滤条件的元素
public boolean removeIf(Predicate<? super E> filter) {
if (filter == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
if (len != 0) {
int newlen = 0;
Object[] temp = new Object[len];//创建临时数组
for (int i = 0; i < len; ++i) {
@SuppressWarnings("unchecked") E e = (E) elements[i];
if (!filter.test(e))
temp[newlen++] = e; //将不满足条件的元素保存到临时数组
}
if (newlen != len) { //存在需要移除的元素
setArray(Arrays.copyOf(temp, newlen)); //拷贝不满足条件的元素产生新数组
return true;
}
}
return false;
} finally {
lock.unlock();
}
} //Jdk8新增的lambda表达式方法,将队列中每一个下标元素替换成根据相应元素按给定的操作运算的结果
public void replaceAll(UnaryOperator<E> operator) {
if (operator == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len);//拷贝临时数组
for (int i = 0; i < len; ++i) {
@SuppressWarnings("unchecked") E e = (E) elements[i];
newElements[i] = operator.apply(e); //将每一个小标位置的元素替换成相应运算结果的值
}
setArray(newElements);
} finally {
lock.unlock();
}
} //移除指定参数集合中不存在的元素,即保留它们的交集
public boolean retainAll(Collection<?> c) {
if (c == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
if (len != 0) {
// temp array holds those elements we know we want to keep
int newlen = 0;
Object[] temp = new Object[len]; //临时数组
for (int i = 0; i < len; ++i) {
Object element = elements[i];
if (c.contains(element)) //将参数集合中存在的元素保存到临时数组
temp[newlen++] = element;
}
if (newlen != len) {//若存在不在参数集合中的元素
setArray(Arrays.copyOf(temp, newlen)); //拷贝都存在的元素参生新的数组
return true;
}
}
return false;
} finally {
lock.unlock();
}
}

写操作的add方法比较简单,addIfAbsent是当队列中不存在该元素的时候才添加,set方法将指定的元素替换列表中指定位置的元素,这些方法都比较简单,其中有点特别的方法是set方法的实现,正常情况下若队列中相应下标元素不等于要替换的元素,需要拷贝产生新数组再替换相应的下标位置元素,这是因为虽然内部数组array是volatile修饰,但是该值仅仅指向数组的起始内存地址,所以仅仅直接修改下标位置的元素这种修改对其它线程不可见,这很容易理解。

容易让人产生疑惑的是为什么明明数组中相应的下标位置的元素与要替换的元素相等,即无任何修改还需要调用setArray(elements)呢?代码中的注释说仅仅是为了保证volatile的写语义,可能有点让人摸不着头脑,其实这是为了满足CopyOnWriteArrayList的内存一致性:将对象放入CopyOnWriteArrayList之前的线程中的操作happen-before随后另一个线程从CopyOnWriteArrayList中访问或删除该元素之后的操作。 例如下面的代码

 int a = 0;
CopyOnWriteArrayList list = ....;
线程1:
a = 2;
list.set(index, ?) 线程2:
list.get(index);
int tmp = a;

线程1先执行,在执行CopyOnWriteArrayList的set之前先对普通变量a进行了赋值更新;线程2后执行,先执行CopyOnWriteArrayList的get,然后在获取变量a的值,根据CopyOnWriteArrayList的内存一致性,线程2必须能够得到 tmp = a = 2 即拿到最新的值,试想若set操作相应的下标index对应的值其实与想替换的值相等,若不执行setArray(elements),那么将无法保证这种可见性,因为不满足happen-before中volatile语义的条件:对一个volatile变量的写操作happen-before后面对这个volatile变量的读操作。因为你没有volatile写,后面的volatile读自然无法看见最新的值。另外set方法中的加锁也很容易让人认为,既然有加锁存在,可见性为何不能得到满足,这是因为CopyOnWriteArrayList的读操作不需要加锁,而且加锁与释放锁内部是对另一个volatile变量state的读写(AQS中ReentrantLock的实现原理),happen-before的volatile变量规则必须是对同一个volatile变量的读写,因此仅仅是set加锁也是无法保证普通变量a的可见性。

addIfAbsent(e)如果当前队列中不存在指定的元素才将其添加到当前队列,addAllAbsent则是其批量操作方法,将指定集合中所有不存在于当前队列中的所有元素添加到当前对列,这两个方法其实就是为实现CopyOnWriteArraySet提供了接口。

相关的批量操作addAll,removeAll,addAllAbsent源码也都不是很复杂,就不一一分析了。其实removeAll和retainAll是一对互补的操作,removeAll是要移除当前队列中存在于指定集合中的所有元素,retainAll则是要移除当前队列中不存在指定集合中的元素即只会留下与指定集合中相同的元素。值得注意的是,remove方法会使内部数组长度变短,因此CopyOnWriteArrayList的内部数组的长度既可以因为添加元素变长,也可以因为移除元素从而使数组收缩。

其它一些方法,sort根据指定的比较器对当前CopyOnWriteArrayList的元素进行排序,clear方法使内部数组重新变成0长度的空数组,equals/hashCode方法是基于集合中的所有元素的,toArray()方法拷贝队列中的所有元素到新的数组返回,toArray(T a[])若指定的数组长度不够则拷贝所有元素产生新的数组返回,否则将队列中的元素拷贝到给定数组的前面相应位置,后面剩余的下标位置全部置为null。

CopyOnWriteArrayList中还有一个方法subList(int fromIndex, int toIndex)用于返回队列中 [fromIndex,toIndex) 之间的元素视图,所有对该视图的更新操作都将反映到原本的CopyOnWriteArrayList集合上,而一旦原CopyOnWriteArrayList集合发生了变化,则该视图将变得不可用。

迭代器

CopyOnWriteArrayList的迭代器是由内部类COWIterator实现的:

 //返回列表中所有元素的普通迭代器
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}
//返回列表中所有元素的双向迭代器
public ListIterator<E> listIterator() {
return new COWIterator<E>(getArray(), 0);
}
//返回从列表中的指定位置开始列表中元素的双向迭代器
public ListIterator<E> listIterator(int index) {
Object[] elements = getArray();
int len = elements.length;
if (index < 0 || index > len)
throw new IndexOutOfBoundsException("Index: "+index); return new COWIterator<E>(elements, index);
}

其中iterator()返回的普通迭代器只能从0开始向后迭代,listIterator()返回的双向迭代器由于指定了起始位置是0,所以也只能从0开始向后迭代,listIterator(int index)在指定的index > 0的情况下,迭代器即可以遍历从index开始至当前数组的最后一个元素之间的所有元素,也可以遍历从index开始到0之间的所有元素。因为ListIterator是一种双向迭代器。

由于CopyOnWriteArrayList的特殊性,在更新操作的时候会重新产生新的数组,而迭代器有仅仅是拿到创建迭代器的时候的数组快照(引用),所以在创建迭代器之后,原队列的任何更新操作都不会反映到迭代器,所以迭代器是弱一致性的,不需要进行同步控制,并且也不支持迭代器的remove、add等方法,毕竟迭代器持有的数组引用已经脱离了CopyOnWriteArrayList的范围。

 //双向迭代器实现
static final class COWIterator<E> implements ListIterator<E> {
/** Snapshot of the array */
private final Object[] snapshot; //数组快照引用
/** Index of element to be returned by subsequent call to next. */
private int cursor;//起始位置下标 //构造方法
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
} //向后遍历判断
public boolean hasNext() {
return cursor < snapshot.length;
} //向前遍历判断
public boolean hasPrevious() {
return cursor > 0;
} //向后遍历
@SuppressWarnings("unchecked")
public E next() {
if (! hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
} //向前遍历
@SuppressWarnings("unchecked")
public E previous() {
if (! hasPrevious())
throw new NoSuchElementException();
return (E) snapshot[--cursor];
} //向后遍历指针(next时已经加1)
public int nextIndex() {
return cursor;
} //向前遍历指针减1
public int previousIndex() {
return cursor-1;
} //其余remove、set、add都将抛出UnsupportedOperationException
//......
@Override
//遍历每一个元素执行指定的操作
public void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
Object[] elements = snapshot;
final int size = elements.length;
for (int i = cursor; i < size; i++) {
@SuppressWarnings("unchecked") E e = (E) elements[i];
action.accept(e);
}
cursor = size;
}
}

CopyOnWriteArrayList的迭代器实现COWIterator很简单,就不详细分析了。

可拆分迭代器

 public Spliterator<E> spliterator() {
return Spliterators.spliterator
(getArray(), Spliterator.IMMUTABLE | Spliterator.ORDERED);
}

CopyOnWriteArrayList的可拆分迭代器是直接使用可拆分迭代器工具类Spliterators返回了一个默认的实现ArraySpliterator,它的顶层ArraySpliterator迭代器是按数组下标的方式进行迭代,拆分后的迭代器依然是ArraySpliterator类型的实例,而拆分的实现是每一次拆分一半,直到只剩下1个元素的时候将无法再被拆分,由于创建可拆分迭代器的时候依然是传入的数组引用,所以任何在原CopyOnWriteArrayList集合上的更新操作都不会反映到迭代器上。可拆分迭代器的实现ArraySpliterator比较简单,就不详细介绍了,也可以参阅ArrayBlockingQueue中关于拆分迭代器的讨论。

CopyOnWriteArraySet

CopyOnWriteArraySet的实现其实就是在内部借助一个CopyOnWriteArrayList实现所有的操作,因为Set不允许放入相同的元素,所以在实现的时候借助了CopyOnWriteArrayList的addIfAbsent和addAllAbsent来保证不会插入相同的元素。

  public class CopyOnWriteArraySet<E> extends AbstractSet<E>
implements java.io.Serializable {
private static final long serialVersionUID = 5457747651344034263L; private final CopyOnWriteArrayList<E> al; /**
* Creates an empty set.
*/
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}
....
}

从以上的CopyOnWriteArraySet的无参构造方法(有参构造方法也是一样)可以看出在CopyOnWriteArraySet内部确实是维护了一个CopyOnWriteArrayList实例来实现所以操作,以Set的不可添加重复元素的特殊性,其add方法如下:

 public boolean add(E e) {
return al.addIfAbsent(e); //直接调用CopyOnWriteArrayList的addIfAbsent实现
} public boolean addAll(Collection<? extends E> c) {
return al.addAllAbsent(c) > 0;//直接调用CopyOnWriteArrayList的addAllAbsent实现
}

CopyOnWriteArrayList的addIfAbsent、addAllAbsent只会在队列中不存在的时候才会插入元素,这满足了Set集合的特性,关于CopyOnWriteArraySet的其它方法(包括迭代器)都是直接调用的CopyOnWriteArrayList实现,可以说CopyOnWriteArraySet除了元素不可重复之外与CopyOnWriteArrayList特性完全一样,即CopyOnWriteArraySet也是线程安全的,底层采用拷贝整个数组的方式更新数组元素,适用于读操作比写操作多的场景,迭代器弱一致性,迭代器与原队列不存在相互干扰。当然CopyOnWriteArraySet也没有像CopyOnWriteArrayList那样通过下标获取和插入元素的方法。

总结

CopyOnWriteArrayList是ArrayList的线程安全版本,它仅仅对写操作加锁,读操作不加锁,非常适用于多线程并发情况。它的内部实现使用了拷贝数组产生新数组的方式进行覆盖原有的数组对象引用,其内部数组的长度会随着元素的添加删除增长收缩变化的。它和ArrayList一样是无界的。它的迭代器(包括可拆分迭代器)由于是对创建迭代器的时候的内部数组快照的引用,所以后续对CopyOnWriteArrayList队列本身的更新不会反映到迭代器,迭代器相关的remove等更新方法也是无意义的,所以将直接抛出UnsupportedOperationException。

CopyOnWriteArraySet内部是采用CopyOnWriteArrayList实现的所有操作,所以除了CopyOnWriteArraySet拥有Set集合的特性即不允许重复元素以及不能通过下标获取插入元素之外,所有的特性都与CopyOnWriteArrayList一样,即CopyOnWriteArraySet也是采用直接拷贝原数组产生新数组的方式实现的线程安全的无界队列,它的迭代器特性与CopyOnWriteArrayList一样,在创建完迭代器实例只后对CopyOnWriteArraySet队列本身的更新不会反映到迭代器,迭代器相关的remove等更新方法也是无意义的,所以将直接抛出UnsupportedOperationException。

Java同步数据结构之CopyOnWriteArrayList/CopyOnWriteArraySet的相关教程结束。

《Java同步数据结构之CopyOnWriteArrayList/CopyOnWriteArraySet.doc》

下载本文的Word格式文档,以方便收藏与打印。