This the multi-page printable view of this section. Click here to print.
Java 集合
1 - CH01-集合结构
层级结构
选择参考
2 - CH02-ArrayList
概述
- ArrayList 实现了 List 接口,是顺序型容器,允许 NULL 元素,底层结构为数组。
- 除了没有实现线程安全,其余实现与 Vector 类似。
- 拥有容量(capacity)属性,表示底层数组大小,实际元素个数不能大于容量。
- 容量不足以承载更多元素时,会执行扩容。
- size、isEmpty、get、set 均可在常数时间内完成。
- add 的时间开销与插入位置有关。
- addAll 的时间开销与所要添加元素的个数成正比。
- 其余方法大多为线性时间。
内部实现
数据结构
transient Object[] elementData;
private int size;
构造函数
public ArrayList(int initialCapacity) {
if (initialCapacity > 0) {
this.elementData = new Object[initialCapacity];
} else if (initialCapacity == 0) {
this.elementData = EMPTY_ELEMENTDATA;
} else {
throw new IllegalArgumentException("Illegal Capacity: "+
initialCapacity);
}
}
public ArrayList() {
this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA; // 10
}
public ArrayList(Collection<? extends E> c) {
elementData = c.toArray();
if ((size = elementData.length) != 0) {
// c.toArray might (incorrectly) not return Object[] (see 6260652)
if (elementData.getClass() != Object[].class)
elementData = Arrays.copyOf(elementData, size, Object[].class);
} else {
// replace with empty array.
this.elementData = EMPTY_ELEMENTDATA;
}
}
自动扩容
每当向数组中添加元素时,都要去检查添加后元素的个数是否会超出当前数组的长度,如果超出,数组将会进行扩容,以满足添加数据的需求。数组扩容通过一个公开的方法ensureCapacity(int minCapacity)来实现。在实际添加大量元素前,我也可以使用ensureCapacity来手动增加ArrayList实例的容量,以减少递增式再分配的数量。
数组进行扩容时,会将老数组中的元素重新拷贝一份到新的数组中,每次数组容量的增长大约是其原容量的 1.5 倍。这种操作的代价是很高的,因此在实际使用时,我们应该尽量避免数组容量的扩张。当我们可预知要保存的元素的多少时,要在构造ArrayList实例时,就指定其容量,以避免数组扩容的发生。或者根据实际需求,通过调用ensureCapacity方法来手动增加ArrayList实例的容量。
/**
* Increases the capacity of this <tt>ArrayList</tt> instance, if
* necessary, to ensure that it can hold at least the number of elements
* specified by the minimum capacity argument.
*
* @param minCapacity the desired minimum capacity
*/
public void ensureCapacity(int minCapacity) {
int minExpand = (elementData != DEFAULTCAPACITY_EMPTY_ELEMENTDATA)
// any size if not default element table
? 0
// larger than default for default empty table. It's already
// supposed to be at default size.
: DEFAULT_CAPACITY;
if (minCapacity > minExpand) {
ensureExplicitCapacity(minCapacity);
}
}
private void ensureCapacityInternal(int minCapacity) {
if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
minCapacity = Math.max(DEFAULT_CAPACITY, minCapacity);
}
ensureExplicitCapacity(minCapacity);
}
private void ensureExplicitCapacity(int minCapacity) {
modCount++;
// overflow-conscious code
if (minCapacity - elementData.length > 0)
grow(minCapacity);
}
/**
* The maximum size of array to allocate.
* Some VMs reserve some header words in an array.
* Attempts to allocate larger arrays may result in
* OutOfMemoryError: Requested array size exceeds VM limit
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* Increases the capacity to ensure that it can hold at least the
* number of elements specified by the minimum capacity argument.
*
* @param minCapacity the desired minimum capacity
*/
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// minCapacity is usually close to size, so this is a win:
elementData = Arrays.copyOf(elementData, newCapacity);
}
private static int hugeCapacity(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
return (minCapacity > MAX_ARRAY_SIZE) ?
Integer.MAX_VALUE :
MAX_ARRAY_SIZE;
}
add/addAll
添加单个元素的方法 add(E e)
和 add(int index, E e)
,在执行之前都需要检查剩余容量,如果需要则自动扩容,即执行 grow 方法。
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
public void add(int index, E element) {
rangeCheckForAdd(index);
ensureCapacityInternal(size + 1); // Increments modCount!!
System.arraycopy(elementData, index, elementData, index + 1,
size - index);
elementData[index] = element;
size++;
}
add(int index, E e)
首先需要移动元素,然后完成插入操作,因此具有线性时间的复杂度。
addAll()
能够一次添加多个元素,根据添加的位置拥有两种版本的实现:
- 向末尾添加:
addAll(Collection<? extends E> c)
- 向指定位置添加:
addAll(int index, Collection<? extends E> c)
在插入之前也需要扩容检查,如果需要就执行扩容。如果插入指定位置,也需要移动元素。因此同时与插入元素的数据和插入的位置相关。
set
首先执行越界检查,然后对数组指定位置的元素赋值。
get
首先执行越界检查,然后数组指定位置的元素值,最后转换类型。
remove
remove(int index)
删除指定位置的元素remove(Object o)
删除第一满足 equals 条件的元素
remove 是 add 的逆操作,需要将删除位置之后的元素向前移动。
为了让 GC 起作用,必须显式的为最后一个位置赋值为 null,即解除引用。如果不设为 null,那么该位置将会继续引用原有的对象,除非被一个新的对象覆盖。
trimToSize
该方法可以将数组的容量调整为当前实际元素的个数。
/**
* Trims the capacity of this <tt>ArrayList</tt> instance to be the
* list's current size. An application can use this operation to minimize
* the storage of an <tt>ArrayList</tt> instance.
*/
public void trimToSize() {
modCount++;
if (size < elementData.length) {
elementData = (size == 0)
? EMPTY_ELEMENTDATA
: Arrays.copyOf(elementData, size);
}
}
indexOf, lastIndexOf
分别获取第一次和最后一次出现的元素位置:
public int indexOf(Object o) {
if (o == null) {
for (int i = 0; i < size; i++)
if (elementData[i]==null)
return i;
} else {
for (int i = 0; i < size; i++)
if (o.equals(elementData[i]))
return i;
}
return -1;
}
public int lastIndexOf(Object o) {
if (o == null) {
for (int i = size-1; i >= 0; i--)
if (elementData[i]==null)
return i;
} else {
for (int i = size-1; i >= 0; i--)
if (o.equals(elementData[i]))
return i;
}
return -1;
}
fail-fast 机制
通过记录 modCount 的值,在面对并发修改时,迭代器很快就会完全失败,避免在将来某个不确定时间发生任意不确定行为。
3 - CH03-LinkedList
概述
- LinkedList 同时实现了 List 接口和 Deque 接口,也就是说它既可以看作一个顺序容器,又可以看作一个队列(Queue),同时又可以看作一个栈(Stack)。
- 栈或队列,现在的首选是 ArrayDeque,它有着比 LinkedList (当作栈或队列使用时)有着更好的性能。
- 所有跟下标相关的操作都是线性时间。
- 在首段或者末尾删除元素只需要常数时间。
- 为追求效率 LinkedList 没有实现同步(synchronized)。
内部实现
数据结构
- 底层通过双向链表实现。
- 双向链表的每个节点用内部类 Node 表示。
- LinkedList 通过
first
和last
引用分别指向链表的第一个和最后一个元素。 - 当链表为空的时候
first
和last
都指向null
。
transient int size = 0;
/**
* Pointer to first node.
* Invariant: (first == null && last == null) ||
* (first.prev == null && first.item != null)
*/
transient Node<E> first;
/**
* Pointer to last node.
* Invariant: (first == null && last == null) ||
* (last.next == null && last.item != null)
*/
transient Node<E> last;
private static class Node<E> {
E item;
Node<E> next;
Node<E> prev;
Node(Node<E> prev, E element, Node<E> next) {
this.item = element;
this.next = next;
this.prev = prev;
}
}
构造函数
/**
* Constructs an empty list.
*/
public LinkedList() {
}
/**
* Constructs a list containing the elements of the specified
* collection, in the order they are returned by the collection's
* iterator.
*
* @param c the collection whose elements are to be placed into this list
* @throws NullPointerException if the specified collection is null
*/
public LinkedList(Collection<? extends E> c) {
this();
addAll(c);
}
getFirst, getLast
/**
* Returns the first element in this list.
*
* @return the first element in this list
* @throws NoSuchElementException if this list is empty
*/
public E getFirst() {
final Node<E> f = first;
if (f == null)
throw new NoSuchElementException();
return f.item;
}
/**
* Returns the last element in this list.
*
* @return the last element in this list
* @throws NoSuchElementException if this list is empty
*/
public E getLast() {
final Node<E> l = last;
if (l == null)
throw new NoSuchElementException();
return l.item;
}
removeFirest(), removeLast(), remove(e), remove(index)
remove 可以删除首个 equals 指定对象的元素,或者删除指定位置的元素。
add
add(E e)
将在末尾添加元素,因为 last 指向链表的末尾元素,因此操作为常数时间,仅需修改几个相关的引用即可。
add(int index, E element)
是在指定位置插入元素,首选需要线性查找到具体位置,然后修改相关引用,完成操作。
addAll
addAll(index, c) 实现方式并不是直接调用add(index,e)来实现,主要是因为效率的问题,另一个是fail-fast中modCount只会增加1次;
/**
* Appends all of the elements in the specified collection to the end of
* this list, in the order that they are returned by the specified
* collection's iterator. The behavior of this operation is undefined if
* the specified collection is modified while the operation is in
* progress. (Note that this will occur if the specified collection is
* this list, and it's nonempty.)
*
* @param c collection containing elements to be added to this list
* @return {@code true} if this list changed as a result of the call
* @throws NullPointerException if the specified collection is null
*/
public boolean addAll(Collection<? extends E> c) {
return addAll(size, c);
}
/**
* Inserts all of the elements in the specified collection into this
* list, starting at the specified position. Shifts the element
* currently at that position (if any) and any subsequent elements to
* the right (increases their indices). The new elements will appear
* in the list in the order that they are returned by the
* specified collection's iterator.
*
* @param index index at which to insert the first element
* from the specified collection
* @param c collection containing elements to be added to this list
* @return {@code true} if this list changed as a result of the call
* @throws IndexOutOfBoundsException {@inheritDoc}
* @throws NullPointerException if the specified collection is null
*/
public boolean addAll(int index, Collection<? extends E> c) {
checkPositionIndex(index);
Object[] a = c.toArray();
int numNew = a.length;
if (numNew == 0)
return false;
Node<E> pred, succ;
if (index == size) {
succ = null;
pred = last;
} else {
succ = node(index);
pred = succ.prev;
}
for (Object o : a) {
@SuppressWarnings("unchecked") E e = (E) o;
Node<E> newNode = new Node<>(pred, e, null);
if (pred == null)
first = newNode;
else
pred.next = newNode;
pred = newNode;
}
if (succ == null) {
last = pred;
} else {
pred.next = succ;
succ.prev = pred;
}
size += numNew;
modCount++;
return true;
}
clear
为了让GC更快可以回收放置的元素,需要将node之间的引用关系赋值为 null。
/**
* Removes all of the elements from this list.
* The list will be empty after this call returns.
*/
public void clear() {
// Clearing all of the links between nodes is "unnecessary", but:
// - helps a generational GC if the discarded nodes inhabit
// more than one generation
// - is sure to free memory even if there is a reachable Iterator
for (Node<E> x = first; x != null; ) {
Node<E> next = x.next;
x.item = null;
x.next = null;
x.prev = null;
x = next;
}
first = last = null;
size = 0;
modCount++;
}
Positional Access 方法
通过 index 获取元素:
public E get(int index) {
checkElementIndex(index);
return node(index).item;
}
通过 index 赋值元素:
public E set(int index, E element) {
checkElementIndex(index);
Node<E> x = node(index);
E oldVal = x.item;
x.item = element;
return oldVal;
}
通过 index 插入元素:
public void add(int index, E element) {
checkPositionIndex(index);
if (index == size)
linkLast(element);
else
linkBefore(element, node(index));
}
通过 index 删除元素:
public E remove(int index) {
checkElementIndex(index);
return unlink(node(index));
}
查找
即查找元素的下标,查找第一次出现元素值相等的 index,否则返回 -1:
public int indexOf(Object o) {
int index = 0;
if (o == null) {
for (Node<E> x = first; x != null; x = x.next) {
if (x.item == null)
return index;
index++;
}
} else {
for (Node<E> x = first; x != null; x = x.next) {
if (o.equals(x.item))
return index;
index++;
}
}
return -1;
}
查找最后一次出现的元素则类似,区别是从 last 开始向前查找。
Queue 方法
- peek
- element
- poll
- remove
- offer
Deque 方法
- offerFirst
- offerLast
- peekFirst
- peekLast
- pollFirst
- pollLast
- push
- pop
4 - CH04-Stack-Queue
概述
- Java 中存在 Stack 实现类,但没有提供 Queue 实现类,仅有一个 Queue 接口。
- 但是在需要使用栈时,Java 推荐的结果是更加高效的 ArrayQueue。
- 在需要使用队列时,首选是 ArrayQueue,其次是 LinkedList。
Queue
Queue 接口继承自 Collection 接口,除了最基本的 Collection 方法之外,还额外支持 insertion、extraction、inspection 操作。这里有两组格式共 6 个方法,一组是抛出异常的实现,一组是返回值的实现(或 null)。
Throws Exception | Returns special Value | |
---|---|---|
Insert | add(e) | offer(e) |
Remove | remove() | poll() |
Examine | element() | peek() |
Deque
Deque 是 “double ended queue”,表示双向队列,英文读作 deck
。Deque 继承自 Queue 接口,除了支持 Queue 的方法外,还支持 insert、remove、examine 操作。
由于 Deque 是双向的,所以可以支持队列的头尾操作,同时支持两种格式共 12 个方法:
First Element-Head | Last Element-Tail | |||
---|---|---|---|---|
Throws Exception | Special Value | Throws Exception | Speicial Value | |
Insert | addFirst(e) | offerFirst(e) | addLast(e) | offerLast(e) |
Remove | removeFirst() | pollFirst() | removeLast() | pollLast() |
Examine | getFirst() | peekFirst() | getLast() | peekLast() |
当把 Deque 当做 FIFO 来使用时,元素是从 deque 的尾部添加,从头部进行删除。所谓 Deque 的部分方法和 Queue 是等同的。
Queue Method | Equivalent Deque Method | 说明 |
---|---|---|
add(e) | addLast(e) | 向队尾添加元素,失败时抛异常 |
offer(e) | offerLast(e) | 向队尾添加元素,失败时返回 false |
remove() | removeFirst() | 获取并删除队首元素,失败时抛异常 |
poll() | pollFirst() | 获取并删除队首元素,失败时返回 null |
element() | getFirst() | 获取但不删除队首元素,失败时抛异常 |
peek() | peekFirst() | 获取但不删除队首元素,失败时返回 null |
Deque 与 Stack 的对应方法:
Stack Method | Equivalent Deque Method | 说明 |
---|---|---|
push(e) | addFirst(e) | 向栈顶插入元素,失败则抛出异常 |
无 | offerFirst(e) | 向栈顶插入元素,失败则返回false |
pop() | removeFirst() | 获取并删除栈顶元素,失败则抛出异常 |
无 | pollFirst() | 获取并删除栈顶元素,失败则返回null |
peek() | peekFirst() | 获取但不删除栈顶元素,失败则抛出异常 |
无 | peekFirst() | 获取但不删除栈顶元素,失败则返回null |
以上操作中,除非对容量有限制,否则添加操作是不会失败的。
ArrayDeque
ArrayDeque 底层为数组结构,为了满足可以同时在数组两端添加或删除元素,该数组必须是循环数组,即数组的任何一点都可能被看做起点或终点。
ArrayDeque 非线程安全,不能添加 null 元素。
head 指向首端第一个有效元素,tail 指向尾端第一个可以插入元素的空位。
因为是循环数组,所谓 head 的位置不一定是 0,tail 的位置也不一定总是比 head 的位置大。
addFirst
在 Deque 首端添加元素,也就是在 head 前面添加元素,在空间足够且下标没有越界的情况下,只需要将 elements[--head]=e
即可。即将 head 的索引递减 1 的位置赋值为新加的元素。
//addFirst(E e)
public void addFirst(E e) {
if (e == null)//不允许放入null
throw new NullPointerException();
elements[head = (head - 1) & (elements.length - 1)] = e;//2.下标是否越界
if (head == tail)//1.空间是否够用
doubleCapacity();//扩容
}
上述代码中可以发现,空间问题是在插入之后开始解决的,因为 tail 总是指向下一个可插入的空位,也就意味着 elements 数组至少会存在一个空位,所以插入元素时不用先考虑空间问题。
下标越界的解决方法很简单,head = (head -1) & (elements.length -1)
即可,这段代码相当于取余,同时解决了 head 值为负的情况。因为 elements.length
必须是 2 的指数倍,elements -1
就是二进制低位全为 1,跟 head-1
相与之后就起到了取模的作用,如果 head-1
为负(-1),则相当于对其取 elements.length
的补码。
对于扩容函数 doubleCapacity
,其逻辑就是申请一个更大的数组(原数据的两倍空间),然后复制原来的元素。
复制分为两次,第一次复制 head 右边的元素,第二次复制 head 左边的数据。
addLast
作用是在 Deque 的尾端插入元素,也就是在tail
的位置插入元素,由于tail
总是指向下一个可以插入的空位,因此只需要elements[tail] = e;
即可。插入完成后再检查空间,如果空间已经用光,则调用doubleCapacity()
进行扩容。
poolFirst
作用是删除并返回 Deque 首端元素,也即是head
位置处的元素。如果容器不空,只需要直接返回elements[head]
即可,当然还需要处理下标的问题。由于ArrayDeque
中不允许放入null
,当elements[head] == null
时,意味着容器为空。
pollLast
作用是删除并返回Deque尾端元素,也即是tail
位置前面的那个元素。
peekFirst
作用是返回但不删除Deque首端元素,也即是head
位置处的元素,直接返回elements[head]
即可。
peekLast
作用是返回但不删除Deque尾端元素,也即是tail
位置前面的那个元素。
5 - CH05-PriorityQueue
概览
- 优先队列的作用是能保证每次取出的元素都是队列中权值最小的。
- 元素大小的评判可以通过元素本身的自然顺序(natural ordering),也可以通过构造时传入的比较器。
- Java 中 PriorityQueue 实现了 Queue 接口,不允许放入
null
元素。 - 底层结构为堆,通过完全二叉树实现的小顶堆,表示可以通过数组作为实现结构。
- PriorityQueue 的
peek()
和element()
操作是常数时间,add()
,offer()
, 无参数的remove()
以及poll()
方法的时间复杂度都是log(N)。
观察上图中每个元素的索引编号,会发现父节点与子节点的编号存在联系:
- leftNo = parentNo*2+1
- rightNo = parentNo*2+2
- parentNo = (nodeNo-1)/2
通过这三个公式,可以轻易计算出某个节点的父节点以及子节点的索引编号。即可以通过数组来实现存储堆。
方法实现
add & offer
两者语义相同,都是相对列中添加元素,只是 Queue 接口规定二者对插入失败是的处理方式不同,前者抛出异常,后者返回 false。
新加入的元素可能会破坏小顶堆的性质,因此需要进行必要的调整。
//offer(E e)
public boolean offer(E e) {
if (e == null)//不允许放入null元素
throw new NullPointerException();
modCount++;
int i = size;
if (i >= queue.length)
grow(i + 1);//自动扩容
size = i + 1;
if (i == 0)//队列原来为空,这是插入的第一个元素
queue[0] = e;
else
siftUp(i, e);//调整
return true;
}
扩容函数 grow 类似于 ArrayList 中的 grow 函数,申请更大空间的数组并复制数据。
siftUp(int k, E x)
方法用于插入元素 x 同时维持堆的特性:
//siftUp()
private void siftUp(int k, E x) {
while (k > 0) {
int parent = (k - 1) >>> 1;//parentNo = (nodeNo-1)/2
Object e = queue[parent];
if (comparator.compare(x, (E) e) >= 0)//调用比较器的比较方法
break;
queue[k] = e;
k = parent;
}
queue[k] = x;
}
调整过程为:从 k 指定的位置开始,将 x 逐层与当前点的 parent 进行比较并交换,直到满足 x >= queue[parent]
为止。
element & peek
语义完全相同,都是获取但不删除队首元素,也就是队列中权值最小的那个元素,二者唯一的区别是当方法失败时前者抛出异常,后者返回null
。
根据小顶堆的性质,堆顶那个元素就是全局最小的那个;由于堆用数组表示,根据下标关系,0
下标处的那个元素既是堆顶元素。所以直接返回数组0
下标处的那个元素即可。
remove & poll
语义也完全相同,都是获取并删除队首元素,区别是当方法失败时前者抛出异常,后者返回null
。
由于删除操作会改变队列的结构,为维护小顶堆的性质,需要进行必要的调整。
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
E result = (E) queue[0];//0下标处的那个元素就是最小的那个
E x = (E) queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);//调整
return result;
}
上述代码首先记录0
下标处的元素,并用最后一个元素替换0
下标位置的元素,之后调用siftDown()
方法对堆进行调整,最后返回原来0
下标处的那个元素(也就是最小的那个元素)。
重点是siftDown(int k, E x)
方法,该方法的作用是从k
指定的位置开始,将x
逐层向下与当前点的左右孩子中较小的那个交换,直到x
小于或等于左右孩子中的任何一个为止。
//siftDown()
private void siftDown(int k, E x) {
int half = size >>> 1;
while (k < half) {
//首先找到左右孩子中较小的那个,记录到c里,并用child记录其下标
int child = (k << 1) + 1;//leftNo = parentNo*2+1
Object c = queue[child];
int right = child + 1;
if (right < size &&
comparator.compare((E) c, (E) queue[right]) > 0)
c = queue[child = right];
if (comparator.compare(x, (E) c) <= 0)
break;
queue[k] = c;//然后用c取代原来的值
k = child;
}
queue[k] = x;
}
remove
用于删除队列中跟o
相等的某一个元素(如果有多个相等,只删除一个),该方法不是Queue接口内的方法,而是Collection接口的方法。由于删除操作会改变队列结构,所以要进行调整;又由于删除元素的位置可能是任意的,所以调整过程比其它函数稍加繁琐。具体来说,remove(Object o)
可以分为2种情况: 1. 删除的是最后一个元素。直接删除即可,不需要调整。2. 删除的不是最后一个元素,从删除点开始以最后一个元素为参照调用一次siftDown()
即可。此处不再赘述。
//remove(Object o)
public boolean remove(Object o) {
//通过遍历数组的方式找到第一个满足o.equals(queue[i])元素的下标
int i = indexOf(o);
if (i == -1)
return false;
int s = --size;
if (s == i) //情况1
queue[i] = null;
else {
E moved = (E) queue[s];
queue[s] = null;
siftDown(i, moved);//情况2
......
}
return true;
}
6 - CH06-HashSet-Map
概述
- HashSet 与 HashMap 在 Java 内部的实现类似,前者仅仅是对后者进行了封装。
- HashMap 实现了 Map 接口,允许放入 null key 和 null value。
- 与 HashTable 的区别在于没有实现同步。
- 与 TreeMap 的区别在于不保证元素顺序。
- 采用冲突链表(Sepratate chaining with linked lists)解决哈希冲突。
- 另一种实现是开放地址方式(Open Addressing)。
如果选择合适的哈希函数,put 与 get 方法可以在常数内完成。但是在对 HashMap 执行迭代时,需要遍历整个 table 以及后边跟的冲突链表。因此对于迭代频繁的场景,不宜将 HashMap 的初始大小设置的过大。
有两个参数可以影响 HashMap 的性能:初始容量(inital capacity)和负载系数(load factor)。
初始容量指定了初始 table 的大小,负载系数用来指定自动扩容的临界值。当 entry 的数量超过 capacity * load-factor
时,容器将自动扩容并重新哈希。对于插入元素较多的场景,将初始容量设置较大可以减少重新哈希的次数。
将对象放入到 HashSet 和 HashMap 时,有两个方法要格外留意:hashCode 和 equals。
hashCode 方法决定了对象会被放到哪个 bucket 中,当多个对象的哈希值冲突,equals 方法决定了这些对象是否是同一个对象。因此,如果要将自定义的对象放入到 HashMap 或 HashSet,需要重写 hashCode 和 equals 方法。
HashMap
get
get(Object key)
方法根据指定的key
值返回对应的value
,该方法调用了getEntry(Object key)
得到相应的entry
,然后返回entry.getValue()
。因此getEntry()
是算法的核心。 算法思想是首先通过hash()
函数得到对应bucket
的下标,然后依次遍历冲突链表,通过key.equals(k)
方法来判断是否是要找的那个entry
。
上图中 hash(k) & (table.length-1)
等价于 hash(k) % table.length
,原因是 HashMap 要求 table.length
均为 2 的指数,因此 table.length -1
就是二进制低位全是 1,跟 hash(k)
相与会将哈希值的高位全部抹掉,剩下的就是余数了。
//getEntry()方法
final Entry<K,V> getEntry(Object key) {
......
int hash = (key == null) ? 0 : hash(key);
for (Entry<K,V> e = table[hash&(table.length-1)];//得到冲突链表
e != null; e = e.next) {//依次遍历冲突链表中的每个entry
Object k;
//依据equals()方法判断是否相等
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
}
return null;
}
put
put(K key, V value)
方法是将指定的key, value
对添加到map
里。该方法首先会对map
做一次查找,看是否包含该元组,如果已经包含则直接返回,查找过程类似于getEntry()
方法;如果没有找到,则会通过addEntry(int hash, K key, V value, int bucketIndex)
方法插入新的entry
,插入方式为头插法。
//addEntry()
void addEntry(int hash, K key, V value, int bucketIndex) {
if ((size >= threshold) && (null != table[bucketIndex])) {
resize(2 * table.length);//自动扩容,并重新哈希
hash = (null != key) ? hash(key) : 0;
bucketIndex = hash & (table.length-1);//hash%table.length
}
//在冲突链表头部插入新的entry
Entry<K,V> e = table[bucketIndex];
table[bucketIndex] = new Entry<>(hash, key, value, e);
size++;
}
remove
remove(Object key)
的作用是删除key
值对应的entry
,该方法的具体逻辑是在removeEntryForKey(Object key)
里实现的。removeEntryForKey()
方法会首先找到key
值对应的entry
,然后删除该entry
(修改链表的相应引用)。查找过程跟getEntry()
过程类似。
//removeEntryForKey()
final Entry<K,V> removeEntryForKey(Object key) {
......
int hash = (key == null) ? 0 : hash(key);
int i = indexFor(hash, table.length);//hash&(table.length-1)
Entry<K,V> prev = table[i];//得到冲突链表
Entry<K,V> e = prev;
while (e != null) {//遍历冲突链表
Entry<K,V> next = e.next;
Object k;
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k)))) {//找到要删除的entry
modCount++; size--;
if (prev == e) table[i] = next;//删除的是冲突链表的第一个entry
else prev.next = next;
return e;
}
prev = e; e = next;
}
return e;
}
HashSet
HashSet是对HashMap的简单包装,对HashSet的函数调用都会转换成合适的HashMap方法。
//HashSet是对HashMap的简单包装
public class HashSet<E>
{
......
//HashSet里面有一个HashMap
private transient HashMap<E,Object> map;
// Dummy value to associate with an Object in the backing Map
private static final Object PRESENT = new Object();
public HashSet() {
map = new HashMap<>();
}
......
public boolean add(E e) {//简单的方法转换
return map.put(e, PRESENT)==null;
}
......
}
7 - CH07-LinkedHashSet-Map
概述
- LinkedHashSet 和 LinkedHashMap 在 Java 中也是类似的实现,前者只是对后者的简单封装。
- LinkedHashMap 实现了 Map 接口,允许放入 null key 和 null value。
- 同时满足 HashMap 和 linked list 的一些特性。
- 可以将 LinkedHashMap 看做是通过 linked list 增强的 HashMap。
- LinkedHashMap 是 HashMap 的直接子类,二者唯一的区别是 LinkedHashMap 在 HashMap 的基础上,采用双向链表的形式将所有 entry 连接起来,以保证元素的迭代顺序和插入顺序相同。
如上图,相比 HashMap,在 entry 部分多了个属性用于连接所有 entry。而 header 用于指向双向链表的头部。
这种结构体还有一个好处,迭代时不需要像 HashMap 那样遍历整个 table,只需要遍历 header 指向的双向链表即可。也就是说,LinkedHashMap 的迭代时间只和 entry 的数量相关,与 table 的大小无关。
有两个参数可以影响 LinkedHashMap 的性能:初始容量(inital capacity)和负载系数(load factor)。初始容量指定了 table 的大小,负载系数用来指定自动扩容的临界值。当entry
的数量超过capacity*load_factor
时,容器将自动扩容并重新哈希。对于插入元素较多的场景,将初始容量设大可以减少重新哈希的次数。
将对象放入到LinkedHashMap或LinkedHashSet中时,有两个方法需要特别关心: hashCode()
和equals()
。hashCode()
方法决定了对象会被放到哪个bucket
里,当多个对象的哈希值冲突时,equals()
方法决定了这些对象是否是“同一个对象”。所以,如果要将自定义的对象放入到LinkedHashMap
或LinkedHashSet
中,需要重写 hashCode()
和equals()
方法。
内部实现
get
get(Object key)
方法根据指定的key
值返回对应的value
。该方法跟HashMap.get()
方法的流程几乎完全一样。
put
put(K key, V value)
方法是将指定的key, value
对添加到map
里。该方法首先会对map
做一次查找,看是否包含该元组,如果已经包含则直接返回,查找过程类似于get()
方法;如果没有找到,则会通过addEntry(int hash, K key, V value, int bucketIndex)
方法插入新的entry
。
注意这里的插入有两重含义:
- 从 table 的角度看,新的 entry 需要插入到对应的 bucket 中,当有哈希冲突时,采用头插法将新的 entry 插入到冲突链表的头部。
- 从 header 的角度看,新的 entry 需要插入到双向链表大尾部。
addEntry 的实现逻辑:
// LinkedHashMap.addEntry()
void addEntry(int hash, K key, V value, int bucketIndex) {
if ((size >= threshold) && (null != table[bucketIndex])) {
resize(2 * table.length);// 自动扩容,并重新哈希
hash = (null != key) ? hash(key) : 0;
bucketIndex = hash & (table.length-1);// hash%table.length
}
// 1.在冲突链表头部插入新的entry
HashMap.Entry<K,V> old = table[bucketIndex];
Entry<K,V> e = new Entry<>(hash, key, value, old);
table[bucketIndex] = e;
// 2.在双向链表的尾部插入新的entry
e.addBefore(header);
size++;
}
上述代码中用到了 addBefore 方法将新的 entry 插入到双向链表头引用的 header 的前面,这样 e 就称为双向链表中的最后一个元素。addBefore 的实现逻辑如下:
// LinkedHashMap.Entry.addBefor(),将this插入到existingEntry的前面
private void addBefore(Entry<K,V> existingEntry) {
after = existingEntry;
before = existingEntry.before;
before.after = this;
after.before = this;
}
上述到吗只是简单的修改 entry 的引用就实现了整个逻辑。
remove
remove(Object key)
的作用是删除key
值对应的entry
,该方法的具体逻辑是在removeEntryForKey(Object key)
里实现的。removeEntryForKey()
方法会首先找到key
值对应的entry
,然后删除该entry
(修改链表的相应引用)。查找过程跟get()
方法类似。
注意这里的删除也有两重含义:
- 从 table 的角度看,需要将 entry 从对应的 bucket 中删除,如果对应的冲突链表不为空,需要修改冲突链表的引用。
- 从 header 的角度看,需要将该 entry 从双向链表中删除,同时修改链表中前置和后置元素的引用。
removeEntryForKey 的实现逻辑如下:
// LinkedHashMap.removeEntryForKey(),删除key值对应的entry
final Entry<K,V> removeEntryForKey(Object key) {
......
int hash = (key == null) ? 0 : hash(key);
int i = indexFor(hash, table.length);// hash&(table.length-1)
Entry<K,V> prev = table[i];// 得到冲突链表
Entry<K,V> e = prev;
while (e != null) {// 遍历冲突链表
Entry<K,V> next = e.next;
Object k;
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k)))) {// 找到要删除的entry
modCount++; size--;
// 1. 将e从对应bucket的冲突链表中删除
if (prev == e) table[i] = next;
else prev.next = next;
// 2. 将e从双向链表中删除
e.before.after = e.after;
e.after.before = e.before;
return e;
}
prev = e; e = next;
}
return e;
}
LinkedHashSet
LinkedHashSet是对LinkedHashMap的简单包装,对LinkedHashSet的函数调用都会转换成合适的LinkedHashMap方法。
public class LinkedHashSet<E>
extends HashSet<E>
implements Set<E>, Cloneable, java.io.Serializable {
......
// LinkedHashSet里面有一个LinkedHashMap
public LinkedHashSet(int initialCapacity, float loadFactor) {
map = new LinkedHashMap<>(initialCapacity, loadFactor);
}
......
public boolean add(E e) {//简单的方法转换
return map.put(e, PRESENT)==null;
}
......
}
常用场景
LinkedHashMap除了可以保证迭代顺序外,还有一个非常有用的用法: 可以轻松实现一个采用了FIFO替换策略的缓存。具体说来,LinkedHashMap有一个子类方法protected boolean removeEldestEntry(Map.Entry<K,V> eldest)
,该方法的作用是告诉Map是否要删除“最老”的Entry,所谓最老就是当前Map中最早插入的Entry,如果该方法返回true
,最老的那个元素就会被删除。在每次插入新元素的之后LinkedHashMap会自动询问removeEldestEntry()是否要删除最老的元素。这样只需要在子类中重载该方法,当元素个数超过一定数量时让removeEldestEntry()返回true,就能够实现一个固定大小的FIFO策略的缓存。示例代码如下:
class FifoCache<K,V> extends LinkedHashMap<K,v> {
private final int size;
public FifoCache(int size){
this.size = size;
}
@Override
protected boolean removeEldestEntry(Map.Entry<K,V> eldest){
return size() > size;
}
}
8 - CH08-TreeSet-Map
概述
- TreeSet 和 TreeMap 在 Java 中具有类似的实现,前者仅仅是对后者的简单封装。
- TreeMap实现了SortedMap接口,也就是说会按照
key
的大小顺序对Map中的元素进行排序,key
大小的评判可以通过其本身的自然顺序(natural ordering),也可以通过构造时传入的比较器(Comparator)。 - *TreeMap*底层通过红黑树(Red-Black tree)实现,也就意味着
containsKey()
,get()
,put()
,remove()
都有着log(n)
的时间复杂度。 - 出于性能原因,TreeMap是非同步的(not synchronized)。
红黑树是一种近似平衡的二叉查找树,它能保证任何一个节点的左右子树的高度差不会超过二者中较低那个的一倍。
具体来说,红黑树是满足如下条件的二叉查找树:
- 每个节点要么是红色,要么是黑色。
- 根节点必须是黑色。
- 红色节点不能有连续(父子节点均不能为红色)。
- 对于每个节点,从该节点至 null(树尾)的任何路径,都含有相同个数的黑色节点。
在树的结构发生改变时(插入或删除),往往会破坏上面的 3 和 4,需要执行调整以使得重新满足所有条件。
树操作
调整可以分为两类,颜色调整和结构调整。
结构调整:左旋
左旋的过程就是想 X 的右子树绕 X 向左方向(逆时针)旋转,使 X 的右子树称为 X 的父亲,同时修改相关节点的引用。旋转之后,二叉查找树的条件仍然满足。
结构调整:右旋
右旋的过程是将 X 的左子树绕 X 向右方向(顺时针)旋转,使 X 的左子树称为 X 的父亲,同时修改相关的引用。旋转之后,二叉查找树的条件仍然满足。
寻找节点后继
对二叉查找树,给定节点 T,其后继(树中大于 T 的最小元素)可以通过如下方式找到:
- T 的右子树不空,则 T 的后继是其右子树中最小的按个元素。
- T 的右子树为空,则 T 的后继是其第一个向左走的父亲。
该操作用于删除红黑树中的删除操作。
// 寻找节点后继函数successor()
static <K,V> TreeMap.Entry<K,V> successor(Entry<K,V> t) {
if (t == null)
return null;
else if (t.right != null) {// 1. t的右子树不空,则t的后继是其右子树中最小的那个元素
Entry<K,V> p = t.right;
while (p.left != null)
p = p.left;
return p;
} else {// 2. t的右孩子为空,则t的后继是其第一个向左走的祖先
Entry<K,V> p = t.parent;
Entry<K,V> ch = t;
while (p != null && ch == p.right) {
ch = p;
p = p.parent;
}
return p;
}
}
内部实现
get
get(Object key)
方法根据指定的key
值返回对应的value
,该方法调用了getEntry(Object key)
得到相应的entry
,然后返回entry.value
。因此getEntry()
是算法的核心。算法思想是根据key
的自然顺序(或者比较器顺序)对二叉查找树进行查找,直到找到满足k.compareTo(p.key) == 0
的entry
。
//getEntry()方法
final Entry<K,V> getEntry(Object key) {
......
if (key == null)//不允许key值为null
throw new NullPointerException();
Comparable<? super K> k = (Comparable<? super K>) key;//使用元素的自然顺序
Entry<K,V> p = root;
while (p != null) {
int cmp = k.compareTo(p.key);
if (cmp < 0)//向左找
p = p.left;
else if (cmp > 0)//向右找
p = p.right;
else
return p;
}
return null;
}
put
put(K key, V value)
方法是将指定的key
, value
对添加到map
里。该方法首先会对map
做一次查找,看是否包含该元组,如果已经包含则直接返回,查找过程类似于getEntry()
方法;如果没有找到则会在红黑树中插入新的entry
,如果插入之后破坏了红黑树的约束条件,还需要进行调整(旋转,改变某些节点的颜色)。
public V put(K key, V value) {
......
int cmp;
Entry<K,V> parent;
if (key == null)
throw new NullPointerException();
Comparable<? super K> k = (Comparable<? super K>) key;//使用元素的自然顺序
do {
parent = t;
cmp = k.compareTo(t.key);
if (cmp < 0) t = t.left;//向左找
else if (cmp > 0) t = t.right;//向右找
else return t.setValue(value);
} while (t != null);
Entry<K,V> e = new Entry<>(key, value, parent);//创建并插入新的entry
if (cmp < 0) parent.left = e;
else parent.right = e;
fixAfterInsertion(e);//调整
size++;
return null;
}
上述代码首先在红黑树上找到合适的位置,然后创建新的 entry 并插入(插入的节点一定是叶子)。难点是调整函数 fixAfterInsertion,需要执行颜色调整和结构调整。
调整函数的具体实现如下,其中用到了前面提到的 rotateLeft 和 rotateRight 函数。通过代码我们可以看到,情况 2 其实是落在情况 3 内。情况 4~6 跟前三种情况是对称的,因此图解中没有展示后 3 种情况。
//红黑树调整函数fixAfterInsertion()
private void fixAfterInsertion(Entry<K,V> x) {
x.color = RED;
while (x != null && x != root && x.parent.color == RED) {
if (parentOf(x) == leftOf(parentOf(parentOf(x)))) {
Entry<K,V> y = rightOf(parentOf(parentOf(x)));
if (colorOf(y) == RED) {
setColor(parentOf(x), BLACK); // 情况1
setColor(y, BLACK); // 情况1
setColor(parentOf(parentOf(x)), RED); // 情况1
x = parentOf(parentOf(x)); // 情况1
} else {
if (x == rightOf(parentOf(x))) {
x = parentOf(x); // 情况2
rotateLeft(x); // 情况2
}
setColor(parentOf(x), BLACK); // 情况3
setColor(parentOf(parentOf(x)), RED); // 情况3
rotateRight(parentOf(parentOf(x))); // 情况3
}
} else {
Entry<K,V> y = leftOf(parentOf(parentOf(x)));
if (colorOf(y) == RED) {
setColor(parentOf(x), BLACK); // 情况4
setColor(y, BLACK); // 情况4
setColor(parentOf(parentOf(x)), RED); // 情况4
x = parentOf(parentOf(x)); // 情况4
} else {
if (x == leftOf(parentOf(x))) {
x = parentOf(x); // 情况5
rotateRight(x); // 情况5
}
setColor(parentOf(x), BLACK); // 情况6
setColor(parentOf(parentOf(x)), RED); // 情况6
rotateLeft(parentOf(parentOf(x))); // 情况6
}
}
}
root.color = BLACK;
}
remove
remove(Object key)
的作用是删除key
值对应的entry
,该方法首先通过上文中提到的getEntry(Object key)
方法找到key
值对应的entry
,然后调用deleteEntry(Entry<K,V> entry)
删除对应的entry
。由于删除操作会改变红黑树的结构,有可能破坏红黑树的约束条件,因此有可能要进行调整。
getEntry()
函数前面已经讲解过,这里重点放deleteEntry()
上,该函数删除指定的entry
并在红黑树的约束被破坏时进行调用fixAfterDeletion(Entry<K,V> x)
进行调整。
由于红黑树是一棵增强版的二叉查找树,红黑树的删除操作跟普通二叉查找树的删除操作也就非常相似,唯一的区别是红黑树在节点删除之后可能需要进行调整。现在考虑一棵普通二叉查找树的删除过程,可以简单分为两种情况:
删除节点 P 的左右子树都为空,或者只有一个子树为空。
删除节点 P 的左右子树都非空。
对于上述情况1,处理起来比较简单,直接将p删除(左右子树都为空时),或者用非空子树替代p(只有一棵子树非空时);对于情况2,可以用p的后继s(树中大于x的最小的那个元素)代替p,然后使用情况1删除s(此时s一定满足情况1.可以画画看)。
基于以上逻辑,红黑树的节点删除函数deleteEntry()
代码如下:
// 红黑树entry删除函数deleteEntry()
private void deleteEntry(Entry<K,V> p) {
modCount++;
size--;
if (p.left != null && p.right != null) {// 2. 删除点p的左右子树都非空。
Entry<K,V> s = successor(p);// 后继
p.key = s.key;
p.value = s.value;
p = s;
}
Entry<K,V> replacement = (p.left != null ? p.left : p.right);
if (replacement != null) {// 1. 删除点p只有一棵子树非空。
replacement.parent = p.parent;
if (p.parent == null)
root = replacement;
else if (p == p.parent.left)
p.parent.left = replacement;
else
p.parent.right = replacement;
p.left = p.right = p.parent = null;
if (p.color == BLACK)
fixAfterDeletion(replacement);// 调整
} else if (p.parent == null) {
root = null;
} else { // 1. 删除点p的左右子树都为空
if (p.color == BLACK)
fixAfterDeletion(p);// 调整
if (p.parent != null) {
if (p == p.parent.left)
p.parent.left = null;
else if (p == p.parent.right)
p.parent.right = null;
p.parent = null;
}
}
}
上述代码中占据大量代码行的,是用来修改父子节点间引用关系的代码,其逻辑并不难理解。下面着重讲解删除后调整函数fixAfterDeletion()
。首先请思考一下,删除了哪些点才会导致调整?只有删除点是BLACK的时候,才会触发调整函数,因为删除RED节点不会破坏红黑树的任何约束,而删除BLACK节点会破坏规则4。
跟上文中讲过的fixAfterInsertion()
函数一样,这里也要分成若干种情况。记住,无论有多少情况,具体的调整操作只有两种: 1.改变某些节点的颜色,2.对某些节点进行旋转。
上图的整体思路为:将情况 1 首先转换为情况 2,或者转换成 3 或 4。当然,该图解并不意味着调整情况一定是从情况 1 开始的。通过后续的代码我们会发现一些规则:
- 如果是由情况 1 之后紧接着进入情况 2,那么情况 2 之后一定会退出循环(因为 X 为红色)。
- 一旦进入情况 3 和 4,一定会退出循环(因为 X 为 root)。
删除后跳转函数 fixAfterDeletion 的具体实现如下,其中用到了上文中提到的rotateLeft()
和rotateRight()
函数。通过代码我们能够看到,情况3其实是落在情况4内的。情况5~情况8跟前四种情况是对称的,因此图解中并没有画出后四种情况,读者可以参考代码自行理解。
private void fixAfterDeletion(Entry<K,V> x) {
while (x != root && colorOf(x) == BLACK) {
if (x == leftOf(parentOf(x))) {
Entry<K,V> sib = rightOf(parentOf(x));
if (colorOf(sib) == RED) {
setColor(sib, BLACK); // 情况1
setColor(parentOf(x), RED); // 情况1
rotateLeft(parentOf(x)); // 情况1
sib = rightOf(parentOf(x)); // 情况1
}
if (colorOf(leftOf(sib)) == BLACK &&
colorOf(rightOf(sib)) == BLACK) {
setColor(sib, RED); // 情况2
x = parentOf(x); // 情况2
} else {
if (colorOf(rightOf(sib)) == BLACK) {
setColor(leftOf(sib), BLACK); // 情况3
setColor(sib, RED); // 情况3
rotateRight(sib); // 情况3
sib = rightOf(parentOf(x)); // 情况3
}
setColor(sib, colorOf(parentOf(x))); // 情况4
setColor(parentOf(x), BLACK); // 情况4
setColor(rightOf(sib), BLACK); // 情况4
rotateLeft(parentOf(x)); // 情况4
x = root; // 情况4
}
} else { // 跟前四种情况对称
Entry<K,V> sib = leftOf(parentOf(x));
if (colorOf(sib) == RED) {
setColor(sib, BLACK); // 情况5
setColor(parentOf(x), RED); // 情况5
rotateRight(parentOf(x)); // 情况5
sib = leftOf(parentOf(x)); // 情况5
}
if (colorOf(rightOf(sib)) == BLACK &&
colorOf(leftOf(sib)) == BLACK) {
setColor(sib, RED); // 情况6
x = parentOf(x); // 情况6
} else {
if (colorOf(leftOf(sib)) == BLACK) {
setColor(rightOf(sib), BLACK); // 情况7
setColor(sib, RED); // 情况7
rotateLeft(sib); // 情况7
sib = leftOf(parentOf(x)); // 情况7
}
setColor(sib, colorOf(parentOf(x))); // 情况8
setColor(parentOf(x), BLACK); // 情况8
setColor(leftOf(sib), BLACK); // 情况8
rotateRight(parentOf(x)); // 情况8
x = root; // 情况8
}
}
}
setColor(x, BLACK);
}
TreeSet
前面已经说过TreeSet
是对TreeMap
的简单包装,对TreeSet
的函数调用都会转换成合适的TreeMap
方法。
// TreeSet是对TreeMap的简单包装
public class TreeSet<E> extends AbstractSet<E>
implements NavigableSet<E>, Cloneable, java.io.Serializable
{
......
private transient NavigableMap<E,Object> m;
// Dummy value to associate with an Object in the backing Map
private static final Object PRESENT = new Object();
public TreeSet() {
this.m = new TreeMap<E,Object>();// TreeSet里面有一个TreeMap
}
......
public boolean add(E e) {
return m.put(e, PRESENT)==null;
}
......
}
9 - CH09-WeakHashMap
概述
它的特殊之处在于 WeakHashMap 里的entry
可能会被GC自动删除,即使程序员没有调用remove()
或者clear()
方法。
当使用 WeakHashMap 时,即使没有显式的添加或删除任何元素,也可能发生如下情况:
- 调用两次 size 方法所返回的结果不同。
- 调用两次 isEmpty 方法,第一次返回 false,第二次返回 true。
- 调用两次 containskey 方法,首次返回 true,第二次返回 false,尽管两次使用相同的 key。
- 调用两次 get 方法,首次返回 value,第二次返回 null,尽管两次使用相同的对象。
这些特性尤其适用于需要缓存的场景。在缓存场景中,由于内存的局限,不能缓存所有对象,对象缓存命中可以提供系统效率,但缓存 MISS 也不会引起错误,因为可以通过计算重新得到。
Java 内存是通过 GC 自动管理的,GC 会在程序运行过程中自动判断哪些对象是可以被回收的,并在合适的时机执行内存释放。GC 判断某个对象释放可以被回收的依据是,释放有有效的引用指向该对象。如果没有有效引用指向该对象(即基本意味着不存在访问该对象的方式),那么该对象就是可以被回收的。这里的有效应用并不包括弱引用。也就是说,虽然弱引用可以用来访问对象,但进行垃圾回收时弱引用并不会被考虑在内,仅有弱引用指向的对象仍然会被 GC 回收。
WeakHashMap 内部是通过弱引用来管理 entry 的,弱引用的特性应用到 WeakHashMap 上意味着什么呢?将一对 key value 放入到 WeakHashMap 中并不能避免该 key 被 GC 回收,除非在 WeakHashMap 在外还有对该 key 的强引用。
具体实现
类似于 HashMap 和 HashSet。
WeakHashSet
Set<Object> weakHashSet = Collections
.newSetFromMap(new WeakHashMap<Object, Boolean>());
该工具方法可以直接将 Map 包装为 Set,只是对 Map 的简单封装。
// Collections.newSetFromMap()用于将任何Map包装成一个Set
public static <E> Set<E> newSetFromMap(Map<E, Boolean> map) {
return new SetFromMap<>(map);
}
private static class SetFromMap<E> extends AbstractSet<E>
implements Set<E>, Serializable
{
private final Map<E, Boolean> m; // The backing map
private transient Set<E> s; // Its keySet
SetFromMap(Map<E, Boolean> map) {
if (!map.isEmpty())
throw new IllegalArgumentException("Map is non-empty");
m = map;
s = map.keySet();
}
public void clear() { m.clear(); }
public int size() { return m.size(); }
public boolean isEmpty() { return m.isEmpty(); }
public boolean contains(Object o) { return m.containsKey(o); }
public boolean remove(Object o) { return m.remove(o) != null; }
public boolean add(E e) { return m.put(e, Boolean.TRUE) == null; }
public Iterator<E> iterator() { return s.iterator(); }
public Object[] toArray() { return s.toArray(); }
public <T> T[] toArray(T[] a) { return s.toArray(a); }
public String toString() { return s.toString(); }
public int hashCode() { return s.hashCode(); }
public boolean equals(Object o) { return o == this || s.equals(o); }
public boolean containsAll(Collection<?> c) {return s.containsAll(c);}
public boolean removeAll(Collection<?> c) {return s.removeAll(c);}
public boolean retainAll(Collection<?> c) {return s.retainAll(c);}
// addAll is the only inherited implementation
......
}
10 - CH10-Stream
Stream 是 JDK1.8 中首次引入的,距今已经过去了接近8年时间(JDK1.8正式版是2013年底发布的)。Stream 的引入一方面极大地简化了某些开发场景,另一方面也可能降低了编码的可读性(确实有不少人说到Stream会降低代码的可读性,但是在笔者看来,熟练使用之后反而觉得代码的可读性提高了)。这篇文章会花巨量篇幅,详细分析 Stream 的底层实现原理,参考的源码是 JDK11 的源码,其他版本 JDK 可能不适用于本文中的源码展示和相关例子。
向前兼容
Stream
是JDK1.8
引入的,如要需要JDK1.7
或者以前的代码也能在JDK1.8
或以上运行,那么Stream
的引入必定不能在原来已经发布的接口方法进行修改,否则必定会因为兼容性问题导致老版本的接口实现无法在新版本中运行(方法签名出现异常),猜测是基于这个问题引入了接口默认方法,也就是default
关键字。查看源码可以发现,ArrayList
的超类Collection
和Iterable
分别添加了数个default
方法:
// java.util.Collection部分源码
public interface Collection<E> extends Iterable<E> {
// 省略其他代码
@Override
default Spliterator<E> spliterator() {
return Spliterators.spliterator(this, 0);
}
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
}
// java.lang.Iterable部分源码
public interface Iterable<T> {
// 省略其他代码
default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
for (T t : this) {
action.accept(t);
}
}
default Spliterator<T> spliterator() {
return Spliterators.spliteratorUnknownSize(iterator(), 0);
}
}
从直觉来看,这些新增的方法应该就是Stream
实现的关键方法(后面会印证这不是直觉,而是查看源码的结果)。接口默认方法在使用上和实例方法一致,在实现上可以直接在接口方法中编写方法体,有点静态方法的意味,但是子类可以覆盖其实现(也就是接口默认方法在本接口中的实现有点像静态方法,可以被子类覆盖,使用上和实例方法一致)。这种实现方式,有可能是一种突破,也有可能是一种妥协,但是无论是妥协还是突破,都实现了向前兼容:
// JDK1.7中的java.lang.Iterable
public interface Iterable<T> {
Iterator<T> iterator();
}
// JDK1.7中的Iterable实现
public MyIterable<Long> implements Iterable<Long>{
public Iterator<Long> iterator(){
....
}
}
如上,MyIterable
在JDK1.7
中定义,如果该类在JDK1.8
中运行,那么调用其实例中的forEach()
和spliterator()
方法,相当于直接调用JDK1.8
中的Iterable
中的接口默认方法forEach()
和spliterator()
。当然受限于JDK
版本,这里只能确保编译通过,旧功能正常使用,而无法在JDK1.7
中使用Stream
相关功能或者使用default
方法关键字。总结这么多,就是想说明为什么使用JDK7
开发和编译的代码可以在JDK8
环境下运行。
可拆分迭代器 Spliterator
Stream
实现的基石是Spliterator
,Spliterator
是splitable iterator
的缩写,意为"可拆分迭代器",用于遍历指定数据源(例如数组、集合或者IO Channel
等)中的元素,在设计上充分考虑了串行和并行的场景。上一节提到了Collection
存在接口默认方法spliterator()
,此方法会生成一个Spliterator<E>
实例,意为着所有的集合子类都具备创建Spliterator
实例的能力。Stream
的实现在设计上和Netty
中的ChannelHandlerContext
十分相似,本质是一个链表,而Spliterator
就是这个链表的Head
节点(Spliterator
实例就是一个流实例的头节点,后面分析具体的源码时候再具体展开)。
Spliterator 接口方法
接着看Spliterator
接口定义的方法:
public interface Spliterator<T> {
// 暂时省略其他代码
boolean tryAdvance(Consumer<? super T> action);
default void forEachRemaining(Consumer<? super T> action) {
do { } while (tryAdvance(action));
}
Spliterator<T> trySplit();
long estimateSize();
default long getExactSizeIfKnown() {
return (characteristics() & SIZED) == 0 ? -1L : estimateSize();
}
int characteristics();
default boolean hasCharacteristics(int characteristics) {
return (characteristics() & characteristics) == characteristics;
}
default Comparator<? super T> getComparator() {
throw new IllegalStateException();
}
// 暂时省略其他代码
}
tryAdvance
- 方法签名:
boolean tryAdvance(Consumer<? super T> action)
- 功能:如果
Spliterator
中存在剩余元素,则对其中的某个元素执行传入的action
回调,并且返回true
,否则返回false
。如果Spliterator
启用了ORDERED
特性,会按照顺序(这里的顺序值可以类比为ArrayList
中容器数组元素的下标,ArrayList
中添加新元素是天然有序的,下标由零开始递增)处理下一个元素 - 例子:
public static void main(String[] args) throws Exception {
List<Integer> list = new ArrayList<>();
list.add(2);
list.add(1);
list.add(3);
Spliterator<Integer> spliterator = list.stream().spliterator();
final AtomicInteger round = new AtomicInteger(1);
final AtomicInteger loop = new AtomicInteger(1);
while (spliterator.tryAdvance(num -> System.out.printf("第%d轮回调Action,值:%d\n", round.getAndIncrement(), num))) {
System.out.printf("第%d轮循环\n", loop.getAndIncrement());
}
}
// 控制台输出
第1轮回调Action,值:2
第1轮循环
第2轮回调Action,值:1
第2轮循环
第3轮回调Action,值:3
第3轮循环
forEachRemaining
- 方法签名:
default void forEachRemaining(Consumer<? super T> action)
- 功能:如果
Spliterator
中存在剩余元素,则对其中的所有剩余元素在当前线程中执行传入的action
回调。如果Spliterator
启用了ORDERED
特性,会按照顺序处理剩余所有元素。这是一个接口默认方法,方法体比较粗暴,直接是一个死循环包裹着tryAdvance()
方法,直到false
退出循环 - 例子:
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
list.add(2);
list.add(1);
list.add(3);
Spliterator<Integer> spliterator = list.stream().spliterator();
final AtomicInteger round = new AtomicInteger(1);
spliterator.forEachRemaining(num -> System.out.printf("第%d轮回调Action,值:%d\n", round.getAndIncrement(), num));
}
// 控制台输出
第1轮回调Action,值:2
第2轮回调Action,值:1
第3轮回调Action,值:3
trySplit
- 方法签名:
Spliterator<T> trySplit()
- 功能:如果当前的
Spliterator
是可分区(可分割)的,那么此方法将会返回一个全新的Spliterator
实例,这个全新的Spliterator
实例里面的元素不会被当前Spliterator
实例中的元素覆盖(这里是直译了API
注释,实际要表达的意思是:当前的Spliterator
实例X
是可分割的,trySplit()
方法会分割X
产生一个全新的Spliterator
实例Y
,原来的X
所包含的元素(范围)也会收缩,类似于X = [a,b,c,d] => X = [a,b], Y = [c,d]
;如果当前的Spliterator
实例X
是不可分割的,此方法会返回NULL
),具体的分割算法由实现类决定 - 例子:
public static void main(String[] args) throws Exception {
List<Integer> list = new ArrayList<>();
list.add(2);
list.add(3);
list.add(4);
list.add(1);
Spliterator<Integer> first = list.stream().spliterator();
Spliterator<Integer> second = first.trySplit();
first.forEachRemaining(num -> {
System.out.printf("first spliterator item: %d\n", num);
});
second.forEachRemaining(num -> {
System.out.printf("second spliterator item: %d\n", num);
});
}
// 控制台输出
first spliterator item: 4
first spliterator item: 1
second spliterator item: 2
second spliterator item: 3
estimateSize
- 方法签名:
long estimateSize()
- 功能:返回
forEachRemaining()
方法需要遍历的元素总量的估计值,如果样本个数是无限、计算成本过高或者未知,会直接返回Long.MAX_VALUE
- 例子:
public static void main(String[] args) throws Exception {
List<Integer> list = new ArrayList<>();
list.add(2);
list.add(3);
list.add(4);
list.add(1);
Spliterator<Integer> spliterator = list.stream().spliterator();
System.out.println(spliterator.estimateSize());
}
// 控制台输出
4
getExactSizeIfKnown
- 方法签名:
default long getExactSizeIfKnown()
- 功能:如果当前的
Spliterator
具备SIZED
特性(关于特性,下文再展开分析),那么直接调用estimateSize()
方法,否则返回-1
- 例子:
public static void main(String[] args) throws Exception {
List<Integer> list = new ArrayList<>();
list.add(2);
list.add(3);
list.add(4);
list.add(1);
Spliterator<Integer> spliterator = list.stream().spliterator();
System.out.println(spliterator.getExactSizeIfKnown());
}
// 控制台输出
4
int characteristics()
- 方法签名:
long estimateSize()
- 功能:当前的
Spliterator
具备的特性(集合),采用位运算,存储在32
位整数中(关于特性,下文再展开分析)
hasCharacteristics
- 方法签名:
default boolean hasCharacteristics(int characteristics)
- 功能:判断当前的
Spliterator
是否具备传入的特性
getComparator
- 方法签名:
default Comparator<? super T> getComparator()
- 功能:如果当前的
Spliterator
具备SORTED
特性,则需要返回一个Comparator
实例;如果Spliterator
中的元素是天然有序(例如元素实现了Comparable
接口),则返回NULL
;其他情况直接抛出IllegalStateException
异常
Spliterator自分割
Spliterator#trySplit()
可以把一个既有的Spliterator
实例分割为两个Spliterator
实例,笔者这里把这种方式称为Spliterator
自分割,示意图如下:
这里的分割在实现上可以采用两种方式:
- 物理分割:对于
ArrayList
而言,把底层数组拷贝并且进行分割,用上面的例子来说相当于X = [1,3,4,2] => X = [4,2], Y = [1,3]
,这样实现加上对于ArrayList
中本身的元素容器数组,相当于多存了一份数据,显然不是十分合理 - 逻辑分割:对于
ArrayList
而言,由于元素容器数组天然有序,可以采用数组的索引(下标)进行分割,用上面的例子来说相当于X = 索引表[0,1,2,3] => X = 索引表[2,3], Y = 索引表[0,1]
,这种方式是共享底层容器数组,只对元素索引进行分割,实现上比较简单而且相对合理
参看ArrayListSpliterator
的源码,可以分析其分割算法实现:
// ArrayList#spliterator()
public Spliterator<E> spliterator() {
return new ArrayListSpliterator(0, -1, 0);
}
// ArrayList中内部类ArrayListSpliterator
final class ArrayListSpliterator implements Spliterator<E> {
// 当前的处理的元素索引值,其实是剩余元素的下边界值(包含),在tryAdvance()或者trySplit()方法中被修改,一般初始值为0
private int index;
// 栅栏,其实是元素索引值的上边界值(不包含),一般初始化的时候为-1,使用时具体值为元素索引值上边界加1
private int fence;
// 预期的修改次数,一般初始化值等于modCount
private int expectedModCount;
ArrayListSpliterator(int origin, int fence, int expectedModCount) {
this.index = origin;
this.fence = fence;
this.expectedModCount = expectedModCount;
}
// 获取元素索引值的上边界值,如果小于0,则把hi和fence都赋值为(ArrayList中的)size,expectedModCount赋值为(ArrayList中的)modCount,返回上边界值
// 这里注意if条件中有赋值语句hi = fence,也就是此方法调用过程中临时变量hi总是重新赋值为fence,fence是ArrayListSpliterator实例中的成员属性
private int getFence() {
int hi;
if ((hi = fence) < 0) {
expectedModCount = modCount;
hi = fence = size;
}
return hi;
}
// Spliterator自分割,这里采用了二分法
public ArrayListSpliterator trySplit() {
// hi等于当前ArrayListSpliterator实例中的fence变量,相当于获取剩余元素的上边界值
// lo等于当前ArrayListSpliterator实例中的index变量,相当于获取剩余元素的下边界值
// mid = (lo + hi) >>> 1,这里的无符号右移动1位运算相当于(lo + hi)/2
int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
// 当lo >= mid的时候为不可分割,返回NULL,否则,以index = lo,fence = mid和expectedModCount = expectedModCount创建一个新的ArrayListSpliterator
// 这里有个细节之处,在新的ArrayListSpliterator构造参数中,当前的index被重新赋值为index = mid,这一点容易看漏,老程序员都喜欢做这样的赋值简化
// lo >= mid返回NULL的时候,不会创建新的ArrayListSpliterator,也不会修改当前ArrayListSpliterator中的参数
return (lo >= mid) ? null : new ArrayListSpliterator(lo, index = mid, expectedModCount);
}
// tryAdvance实现
public boolean tryAdvance(Consumer<? super E> action) {
if (action == null)
throw new NullPointerException();
// 获取迭代的上下边界
int hi = getFence(), i = index;
// 由于前面分析下边界是包含关系,上边界是非包含关系,所以这里要i < hi而不是i <= hi
if (i < hi) {
index = i + 1;
// 这里的elementData来自ArrayList中,也就是前文经常提到的元素数组容器,这里是直接通过元素索引访问容器中的数据
@SuppressWarnings("unchecked") E e = (E)elementData[i];
// 对传入的Action进行回调
action.accept(e);
// 并发修改异常判断
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
return true;
}
return false;
}
// forEachRemaining实现,这里没有采用默认实现,而是完全覆盖实现一个新方法
public void forEachRemaining(Consumer<? super E> action) {
// 这里会新建所需的中间变量,i为index的中间变量,hi为fence的中间变量,mc为expectedModCount的中间变量
int i, hi, mc;
Object[] a;
if (action == null)
throw new NullPointerException();
// 判断容器数组存在性
if ((a = elementData) != null) {
// hi、fence和mc初始化
if ((hi = fence) < 0) {
mc = modCount;
hi = size;
}
else
mc = expectedModCount;
// 这里就是先做参数合法性校验,再遍历临时数组容器a中中[i,hi)的剩余元素对传入的Action进行回调
// 这里注意有一处隐蔽的赋值(index = hi),下界被赋值为上界,意味着每个ArrayListSpliterator实例只能调用一次forEachRemaining()方法
if ((i = index) >= 0 && (index = hi) <= a.length) {
for (; i < hi; ++i) {
@SuppressWarnings("unchecked") E e = (E) a[i];
action.accept(e);
}
// 这里校验ArrayList的modCount和mc是否一致,理论上在forEachRemaining()遍历期间,不能对数组容器进行元素的新增或者移除,一旦发生modCount更变会抛出异常
if (modCount == mc)
return;
}
}
throw new ConcurrentModificationException();
}
// 获取剩余元素估计值,就是用剩余元素索引上边界直接减去下边界
public long estimateSize() {
return getFence() - index;
}
// 具备ORDERED、SIZED和SUBSIZED特性
public int characteristics() {
return Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SUBSIZED;
}
}
在阅读源码的时候务必注意,老一辈的程序员有时候会采用比较隐蔽的赋值方式,笔者认为需要展开一下:
第一处红圈位置在构建新的ArrayListSpliterator
的时候,当前ArrayListSpliterator
的index
属性也被修改了,过程如下图:
第二处红圈位置,在forEachRemaining()
方法调用时候做参数校验,并且if
分支里面把index
(下边界值)赋值为hi
(上边界值),那么一个ArrayListSpliterator
实例中的forEachRemaining()
方法的遍历操作必定只会执行一次。可以这样验证一下:
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
list.add(2);
list.add(1);
list.add(3);
Spliterator<Integer> spliterator = list.stream().spliterator();
final AtomicInteger round = new AtomicInteger(1);
spliterator.forEachRemaining(num -> System.out.printf("[第一次遍历forEachRemaining]第%d轮回调Action,值:%d\n", round.getAndIncrement(), num));
round.set(1);
spliterator.forEachRemaining(num -> System.out.printf("[第二次遍历forEachRemaining]第%d轮回调Action,值:%d\n", round.getAndIncrement(), num));
}
// 控制台输出
[第一次遍历forEachRemaining]第1轮回调Action,值:2
[第一次遍历forEachRemaining]第2轮回调Action,值:1
[第一次遍历forEachRemaining]第3轮回调Action,值:3
对于ArrayListSpliterator
的实现可以确认下面几点:
- 一个新的
ArrayListSpliterator
实例中的forEachRemaining()
方法只能调用一次 ArrayListSpliterator
实例中的forEachRemaining()
方法遍历元素的边界是[index, fence)
ArrayListSpliterator
自分割的时候,分割出来的新ArrayListSpliterator
负责处理元素下标小的分段(类比fork
的左分支),而原ArrayListSpliterator
负责处理元素下标大的分段(类比fork
的右分支)ArrayListSpliterator
提供的estimateSize()
方法得到的分段元素剩余数量是一个准确值
如果把上面的例子继续分割,可以得到下面的过程:
Spliterator
自分割是并行流实现的基础,并行流计算过程其实就是fork-join
的处理过程,trySplit()
方法的实现决定了fork
任务的粒度,每个fork
任务进行计算的时候是并发安全的,这一点由线程封闭(线程栈封闭)保证,每一个fork
任务计算完成最后的结果再由单个线程进行join
操作,才能得到正确的结果。下面的例子是求整数1 ~ 100
的和:
public class ConcurrentSplitCalculateSum {
private static class ForkTask extends Thread {
private int result = 0;
private final Spliterator<Integer> spliterator;
private final CountDownLatch latch;
public ForkTask(Spliterator<Integer> spliterator,
CountDownLatch latch) {
this.spliterator = spliterator;
this.latch = latch;
}
@Override
public void run() {
long start = System.currentTimeMillis();
spliterator.forEachRemaining(num -> result = result + num);
long end = System.currentTimeMillis();
System.out.printf("线程[%s]完成计算任务,当前段计算结果:%d,耗时:%d ms\n",
Thread.currentThread().getName(), result, end - start);
latch.countDown();
}
public int result() {
return result;
}
}
private static int join(List<ForkTask> tasks) {
int result = 0;
for (ForkTask task : tasks) {
result = result + task.result();
}
return result;
}
private static final int THREAD_NUM = 4;
public static void main(String[] args) throws Exception {
List<Integer> source = new ArrayList<>();
for (int i = 1; i < 101; i++) {
source.add(i);
}
Spliterator<Integer> root = source.stream().spliterator();
List<Spliterator<Integer>> spliteratorList = new ArrayList<>();
Spliterator<Integer> x = root.trySplit();
Spliterator<Integer> y = x.trySplit();
Spliterator<Integer> z = root.trySplit();
spliteratorList.add(root);
spliteratorList.add(x);
spliteratorList.add(y);
spliteratorList.add(z);
List<ForkTask> tasks = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(THREAD_NUM);
for (int i = 0; i < THREAD_NUM; i++) {
ForkTask task = new ForkTask(spliteratorList.get(i), latch);
task.setName("fork-task-" + (i + 1));
tasks.add(task);
}
tasks.forEach(Thread::start);
latch.await();
int result = join(tasks);
System.out.println("最终计算结果为:" + result);
}
}
// 控制台输出结果
线程[fork-task-4]完成计算任务,当前段计算结果:1575,耗时:0 ms
线程[fork-task-2]完成计算任务,当前段计算结果:950,耗时:1 ms
线程[fork-task-3]完成计算任务,当前段计算结果:325,耗时:1 ms
线程[fork-task-1]完成计算任务,当前段计算结果:2200,耗时:1 ms
最终计算结果为:5050
当然,最终并行流的计算用到了ForkJoinPool
,并不像这个例子中这么粗暴地进行异步执行。关于并行流的实现下文会详细分析。
Spliterator 支持特性
某一个Spliterator
实例支持的特性由方法characteristics()
决定,这个方法返回的是一个32
位数值,实际使用中会展开为bit
数组,所有的特性分配在不同的位上,而hasCharacteristics(int characteristics)
就是通过输入的具体特性值通过位运算判断该特性是否存在于characteristics()
中。下面简化characteristics
为byte
分析一下这个技巧:
假设:byte characteristics() => 也就是最多8个位用于表示特性集合,如果每个位只表示一种特性,那么可以总共表示8种特性
特性X:0000 0001
特性Y:0000 0010
以此类推
假设:characteristics = X | Y = 0000 0001 | 0000 0010 = 0000 0011
那么:characteristics & X = 0000 0011 & 0000 0001 = 0000 0001
判断characteristics是否包含X:(characteristics & X) == X
上面推断的过程就是Spliterator
中特性判断方法的处理逻辑:
// 返回特性集合
int characteristics();
// 基于位运算判断特性集合中是否存在输入的特性
default boolean hasCharacteristics(int characteristics) {
return (characteristics() & characteristics) == characteristics;
}
这里可以验证一下:
public class CharacteristicsCheck {
public static void main(String[] args) {
System.out.printf("是否存在ORDERED特性:%s\n", hasCharacteristics(Spliterator.ORDERED));
System.out.printf("是否存在SIZED特性:%s\n", hasCharacteristics(Spliterator.SIZED));
System.out.printf("是否存在DISTINCT特性:%s\n", hasCharacteristics(Spliterator.DISTINCT));
}
private static int characteristics() {
return Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SORTED;
}
private static boolean hasCharacteristics(int characteristics) {
return (characteristics() & characteristics) == characteristics;
}
}
// 控制台输出
是否存在ORDERED特性:true
是否存在SIZED特性:true
是否存在DISTINCT特性:false
目前Spliterator
支持的特性一共有8
个,如下:
特性 | 十六进制值 | 二进制值 | 功能 |
---|---|---|---|
DISTINCT | 0x00000001 | 0000 0000 0000 0001 | 去重,例如对于每对要处理的元素(x,y) ,使用!x.equals(y) 比较,Spliterator 中去重实际上基于Set 处理 |
ORDERED | 0x00000010 | 0000 0000 0001 0000 | (元素)顺序处理,可以理解为trySplit() 、tryAdvance() 和forEachRemaining() 方法对所有元素处理都保证一个严格的前缀顺序 |
SORTED | 0x00000004 | 0000 0000 0000 0100 | 排序,元素使用getComparator() 方法提供的Comparator 进行排序,如果定义了SORTED 特性,则必须定义ORDERED 特性 |
SIZED | 0x00000040 | 0000 0000 0100 0000 | (元素)预估数量,启用此特性,那么Spliterator 拆分或者迭代之前,estimateSize() 返回的是元素的准确数量 |
NONNULL | 0x00000040 | 0000 0001 0000 0000 | (元素)非NULL ,数据源保证Spliterator 需要处理的元素不能为NULL ,最常用于并发容器中的集合、队列和Map |
IMMUTABLE | 0x00000400 | 0000 0100 0000 0000 | (元素)不可变,数据源不可被修改,也就是处理过程中元素不能被添加、替换和移除(更新属性是允许的) |
CONCURRENT | 0x00001000 | 0001 0000 0000 0000 | (元素源)的修改是并发安全的,意味着多线程在数据源中添加、替换或者移除元素在不需要额外的同步条件下是并发安全的 |
SUBSIZED | 0x00004000 | 0100 0000 0000 0000 | (子Spliterator 元素)预估数量,启用此特性,意味着通过trySplit() 方法分割出来的所有子Spliterator (当前Spliterator 分割后也属于子Spliterator )都启用SIZED 特性 |
细心点观察可以发现:所有特性采用32位的整数存储,使用了隔1位存储的策略,位下标和特性的映射是:(0 => DISTINCT)、(3 => SORTED)、(5 => ORDERED)、(7=> SIZED)、(9 => NONNULL)、(11 => IMMUTABLE)、(13 => CONCURRENT)、(15 => SUBSIZED)
所有特性的功能这里只概括了核心的定义,还有一些小字或者特例描述限于篇幅没有完全加上,这一点可以参考具体的源码中的API
注释。这些特性最终会转化为StreamOpFlag
再提供给Stream
中的操作判断使用,由于StreamOpFlag
会更加复杂,下文再进行详细分析。
流的实现原理以及源码分析
由于流的实现是高度抽象的工程代码,所以在源码阅读上会有点困难。整个体系涉及到大量的接口、类和枚举,如下图:
图中的顶层类结构图描述的就是流的流水线相关类继承关系,其中IntStream
、LongStream
和DoubleStream
都是特化类型,分别针对于Integer
、Long
和Double
三种类型,其他引用类型构建的Pipeline
都是ReferencePipeline
实例,因此笔者认为,ReferencePipeline
(引用类型流水线)是流的核心数据结构,下面会基于ReferencePipeline
的实现做深入分析。
StreamOpFlag源码分析
StreamOpFlag
是一个枚举,功能是存储Stream
和操作的标志(Flags corresponding to characteristics of streams and operations
,下称Stream
标志),这些标志提供给Stream
框架用于控制、定制化和优化计算。Stream
标志可以用于描述与流相关联的若干不同实体的特征,这些实体包括:Stream
的源、Stream
的中间操作(Op
)和Stream
的终端操作(Terminal Op
)。但是并非所有的Stream
标志对所有的Stream
实体都具备意义,目前这些实体和标志映射关系如下:
Type(Stream Entity Type) | DISTINCT | SORTED | ORDERED | SIZED | SHORT_CIRCUIT |
---|---|---|---|---|---|
SPLITERATOR | 01 | 01 | 01 | 01 | 00 |
STREAM | 01 | 01 | 01 | 01 | 00 |
OP | 11 | 11 | 11 | 10 | 01 |
TERMINAL_OP | 00 | 00 | 10 | 00 | 01 |
UPSTREAM_TERMINAL_OP | 00 | 00 | 10 | 00 | 00 |
其中:
- 01:表示设置/注入
- 10:表示清除
- 11:表示保留
- 00:表示初始化值(默认填充值),这是一个关键点,
0
值表示绝对不会是某个类型的标志
StreamOpFlag
的顶部注释中还有一个表格如下:
- | DISTINCT | SORTED | ORDERED | SIZED | SHORT_CIRCUIT |
---|---|---|---|---|---|
Stream source(Stream 的源) | Y | Y | Y | Y | N |
Intermediate operation(中间操作) | PCI | PCI | PCI | PC | PI |
Terminal operation(终结操作) | N | N | PC | N | PI |
标记 ->
含义:
Y
:允许N
:非法P
:保留C
:清除I
:注入- 组合
PCI
:可以保留、清除或者注入 - 组合
PC
:可以保留或者清除 - 组合
PI
:可以保留或者注入
两个表格其实是在描述同一个结论,可以相互对照和理解,但是最终实现参照于第一个表的定义。注意一点:这里的preserved
(P
)表示保留的意思,如果Stream
实体某个标志被赋值为preserved
,意味着该实体可以使用此标志代表的特性。例如此小节第一个表格中的OP
的DISTINCT
、SORTED
和ORDERED
都赋值为11
(preserved
),意味着OP
类型的实体允许使用去重、自然排序和顺序处理特性。回到源码部分,先看StreamOpFlag
的核心属性和构造器:
enum StreamOpFlag {
// 暂时忽略其他代码
// 类型枚举,Stream相关实体类型
enum Type {
// SPLITERATOR类型,关联所有和Spliterator相关的特性
SPLITERATOR,
// STREAM类型,关联所有和Stream相关的标志
STREAM,
// STREAM类型,关联所有和Stream中间操作相关的标志
OP,
// TERMINAL_OP类型,关联所有和Stream终结操作相关的标志
TERMINAL_OP,
// UPSTREAM_TERMINAL_OP类型,关联所有在最后一个有状态操作边界上游传播的终止操作标志
// 这个类型的意义直译有点拗口,不过实际上在JDK11源码中,这个类型没有被流相关功能引用,暂时可以忽略
UPSTREAM_TERMINAL_OP
}
// 设置/注入标志的bit模式,二进制数0001,十进制数1
private static final int SET_BITS = 0b01;
// 清除标志的bit模式,二进制数0010,十进制数2
private static final int CLEAR_BITS = 0b10;
// 保留标志的bit模式,二进制数0011,十进制数3
private static final int PRESERVE_BITS = 0b11;
// 掩码建造器工厂方法,注意这个方法用于实例化MaskBuilder
private static MaskBuilder set(Type t) {
return new MaskBuilder(new EnumMap<>(Type.class)).set(t);
}
// 私有静态内部类,掩码建造器,里面的map由上面的set(Type t)方法得知是EnumMap实例
private static class MaskBuilder {
// Type -> SET_BITS|CLEAR_BITS|PRESERVE_BITS|0
final Map<Type, Integer> map;
MaskBuilder(Map<Type, Integer> map) {
this.map = map;
}
// 设置类型和对应的掩码
MaskBuilder mask(Type t, Integer i) {
map.put(t, i);
return this;
}
// 对类型添加/inject
MaskBuilder set(Type t) {
return mask(t, SET_BITS);
}
MaskBuilder clear(Type t) {
return mask(t, CLEAR_BITS);
}
MaskBuilder setAndClear(Type t) {
return mask(t, PRESERVE_BITS);
}
// 这里的build方法对于类型中的NULL掩码填充为0,然后把map返回
Map<Type, Integer> build() {
for (Type t : Type.values()) {
map.putIfAbsent(t, 0b00);
}
return map;
}
}
// 类型->掩码映射
private final Map<Type, Integer> maskTable;
// bit的起始偏移量,控制下面set、clear和preserve的起始偏移量
private final int bitPosition;
// set/inject的bit set(map),其实准确来说应该是一个表示set/inject的bit map
private final int set;
// clear的bit set(map),其实准确来说应该是一个表示clear的bit map
private final int clear;
// preserve的bit set(map),其实准确来说应该是一个表示preserve的bit map
private final int preserve;
private StreamOpFlag(int position, MaskBuilder maskBuilder) {
// 这里会基于MaskBuilder初始化内部的EnumMap
this.maskTable = maskBuilder.build();
// Two bits per flag <= 这里会把入参position放大一倍
position *= 2;
this.bitPosition = position;
this.set = SET_BITS << position; // 设置/注入标志的bit模式左移2倍position
this.clear = CLEAR_BITS << position; // 清除标志的bit模式左移2倍position
this.preserve = PRESERVE_BITS << position; // 保留标志的bit模式左移2倍position
}
// 省略中间一些方法
// 下面这些静态变量就是直接返回标志对应的set/injec、清除和保留的bit map
/**
* The bit value to set or inject {@link #DISTINCT}.
*/
static final int IS_DISTINCT = DISTINCT.set;
/**
* The bit value to clear {@link #DISTINCT}.
*/
static final int NOT_DISTINCT = DISTINCT.clear;
/**
* The bit value to set or inject {@link #SORTED}.
*/
static final int IS_SORTED = SORTED.set;
/**
* The bit value to clear {@link #SORTED}.
*/
static final int NOT_SORTED = SORTED.clear;
/**
* The bit value to set or inject {@link #ORDERED}.
*/
static final int IS_ORDERED = ORDERED.set;
/**
* The bit value to clear {@link #ORDERED}.
*/
static final int NOT_ORDERED = ORDERED.clear;
/**
* The bit value to set {@link #SIZED}.
*/
static final int IS_SIZED = SIZED.set;
/**
* The bit value to clear {@link #SIZED}.
*/
static final int NOT_SIZED = SIZED.clear;
/**
* The bit value to inject {@link #SHORT_CIRCUIT}.
*/
static final int IS_SHORT_CIRCUIT = SHORT_CIRCUIT.set;
}
又因为StreamOpFlag
是一个枚举,一个枚举成员是一个独立的标志,而一个标志会对多个Stream
实体类型产生作用,所以它的一个成员描述的是上面实体和标志映射关系的一个列(竖着看):
// 纵向看
DISTINCT Flag:
maskTable: {
SPLITERATOR: 0000 0001,
STREAM: 0000 0001,
OP: 0000 0011,
TERMINAL_OP: 0000 0000,
UPSTREAM_TERMINAL_OP: 0000 0000
}
position(input): 0
bitPosition: 0
set: 1 => 0000 0000 0000 0000 0000 0000 0000 0001
clear: 2 => 0000 0000 0000 0000 0000 0000 0000 0010
preserve: 3 => 0000 0000 0000 0000 0000 0000 0000 0011
SORTED Flag:
maskTable: {
SPLITERATOR: 0000 0001,
STREAM: 0000 0001,
OP: 0000 0011,
TERMINAL_OP: 0000 0000,
UPSTREAM_TERMINAL_OP: 0000 0000
}
position(input): 1
bitPosition: 2
set: 4 => 0000 0000 0000 0000 0000 0000 0000 0100
clear: 8 => 0000 0000 0000 0000 0000 0000 0000 1000
preserve: 12 => 0000 0000 0000 0000 0000 0000 0000 1100
ORDERED Flag:
maskTable: {
SPLITERATOR: 0000 0001,
STREAM: 0000 0001,
OP: 0000 0011,
TERMINAL_OP: 0000 0010,
UPSTREAM_TERMINAL_OP: 0000 0010
}
position(input): 2
bitPosition: 4
set: 16 => 0000 0000 0000 0000 0000 0000 0001 0000
clear: 32 => 0000 0000 0000 0000 0000 0000 0010 0000
preserve: 48 => 0000 0000 0000 0000 0000 0000 0011 0000
SIZED Flag:
maskTable: {
SPLITERATOR: 0000 0001,
STREAM: 0000 0001,
OP: 0000 0010,
TERMINAL_OP: 0000 0000,
UPSTREAM_TERMINAL_OP: 0000 0000
}
position(input): 3
bitPosition: 6
set: 64 => 0000 0000 0000 0000 0000 0000 0100 0000
clear: 128 => 0000 0000 0000 0000 0000 0000 1000 0000
preserve: 192 => 0000 0000 0000 0000 0000 0000 1100 0000
SHORT_CIRCUIT Flag:
maskTable: {
SPLITERATOR: 0000 0000,
STREAM: 0000 0000,
OP: 0000 0001,
TERMINAL_OP: 0000 0001,
UPSTREAM_TERMINAL_OP: 0000 0000
}
position(input): 12
bitPosition: 24
set: 16777216 => 0000 0001 0000 0000 0000 0000 0000 0000
clear: 33554432 => 0000 0010 0000 0000 0000 0000 0000 0000
preserve: 50331648 => 0000 0011 0000 0000 0000 0000 0000 0000
接着就用到按位与(&
)和按位或(|
)的操作,假设A = 0001
、B = 0010
、C = 1000
,那么:
A|B = A | B = 0001 | 0010 = 0011
(按位或,1|0=1, 0|1=1,0|0 =0,1|1=1
)A&B = A & B = 0001 | 0010 = 0000
(按位与,1|0=0, 0|1=0,0|0 =0,1|1=1
)MASK = A | B | C = 0001 | 0010 | 1000 = 1011
- 那么判断
A|B
是否包含A
的条件为:A == (A|B & A)
- 那么判断
MASK
是否包含A
的条件为:A == MASK & A
这里把StreamOpFlag
中的枚举套用进去分析:
static int DISTINCT_SET = 0b0001;
static int SORTED_CLEAR = 0b1000;
public static void main(String[] args) throws Exception {
// 支持DISTINCT标志和不支持SORTED标志
int flags = DISTINCT_SET | SORTED_CLEAR;
System.out.println(Integer.toBinaryString(flags));
System.out.printf("支持DISTINCT标志:%s\n", DISTINCT_SET == (DISTINCT_SET & flags));
System.out.printf("不支持SORTED标志:%s\n", SORTED_CLEAR == (SORTED_CLEAR & flags));
}
// 控制台输出
1001
支持DISTINCT标志:true
不支持SORTED标志:true
由于StreamOpFlag
的修饰符是默认,不能直接使用,可以把它的代码拷贝出来修改包名验证里面的功能:
public static void main(String[] args) {
int flags = StreamOpFlag.DISTINCT.set | StreamOpFlag.SORTED.clear;
System.out.println(StreamOpFlag.DISTINCT.set == (StreamOpFlag.DISTINCT.set & flags));
System.out.println(StreamOpFlag.SORTED.clear == (StreamOpFlag.SORTED.clear & flags));
}
// 输出
true
true
下面这些方法就是基于这些运算特性而定义的:
enum StreamOpFlag {
// 暂时忽略其他代码
// 返回当前StreamOpFlag的set/inject的bit map
int set() {
return set;
}
// 返回当前StreamOpFlag的清除的bit map
int clear() {
return clear;
}
// 这里判断当前StreamOpFlag类型->标记映射中Stream类型的标记,如果大于0说明不是初始化状态,那么当前StreamOpFlag就是Stream相关的标志
boolean isStreamFlag() {
return maskTable.get(Type.STREAM) > 0;
}
// 这里就用到按位与判断输入的flags中是否设置当前StreamOpFlag(StreamOpFlag.set)
boolean isKnown(int flags) {
return (flags & preserve) == set;
}
// 这里就用到按位与判断输入的flags中是否清除当前StreamOpFlag(StreamOpFlag.clear)
boolean isCleared(int flags) {
return (flags & preserve) == clear;
}
// 这里就用到按位与判断输入的flags中是否保留当前StreamOpFlag(StreamOpFlag.clear)
boolean isPreserved(int flags) {
return (flags & preserve) == preserve;
}
// 判断当前的Stream实体类型是否可以设置本标志,要求Stream实体类型的标志位为set或者preserve,按位与要大于0
boolean canSet(Type t) {
return (maskTable.get(t) & SET_BITS) > 0;
}
// 暂时忽略其他代码
}
这里有个特殊操作,位运算的时候采用了(flags & preserve)
,理由是:同一个标志中的同一个Stream
实体类型只可能存在set/inject
、clear
和preserve
的其中一种,也就是同一个flags
中不可能同时存在StreamOpFlag.SORTED.set
和StreamOpFlag.SORTED.clear
,从语义上已经矛盾,而set/inject
、clear
和preserve
在bit map
中的大小(为2
位)和位置已经是固定的,preserve
在设计的时候为0b11
刚好2
位取反,因此可以特化为(这个特化也让判断更加严谨):
(flags & set) == set => (flags & preserve) == set
(flags & clear) == clear => (flags & preserve) == clear
(flags & preserve) == preserve => (flags & preserve) == preserve
分析这么多,总的来说,就是想通过一个32
位整数,每2
位分别表示3
种状态,那么一个完整的Flags
(标志集合)一共可以表示16
种标志(position=[0,15]
,可以查看API
注释,[4,11]
和[13,15]
的位置是未需实现或者预留的,属于gap
)。接着分析掩码Mask
的计算过程例子:
// 横向看(位移动运算符优先级高于与或,例如<<的优先级比|高)
SPLITERATOR_CHARACTERISTICS_MASK:
mask(init) = 0
mask(DISTINCT,SPLITERATOR[DISTINCT]=01,bitPosition=0) = 0000 0000 | 0000 0001 << 0 = 0000 0000 | 0000 0001 = 0000 0001
mask(SORTED,SPLITERATOR[SORTED]=01,bitPosition=2) = 0000 0001 | 0000 0001 << 2 = 0000 0001 | 0000 0100 = 0000 0101
mask(ORDERED,SPLITERATOR[ORDERED]=01,bitPosition=4) = 0000 0101 | 0000 0001 << 4 = 0000 0101 | 0001 0000 = 0001 0101
mask(SIZED,SPLITERATOR[SIZED]=01,bitPosition=6) = 0001 0101 | 0000 0001 << 6 = 0001 0101 | 0100 0000 = 0101 0101
mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=00,bitPosition=24) = 0101 0101 | 0000 0000 << 24 = 0101 0101 | 0000 0000 = 0101 0101
mask(final) = 0000 0000 0000 0000 0000 0000 0101 0101(二进制)、85(十进制)
STREAM_MASK:
mask(init) = 0
mask(DISTINCT,SPLITERATOR[DISTINCT]=01,bitPosition=0) = 0000 0000 | 0000 0001 << 0 = 0000 0000 | 0000 0001 = 0000 0001
mask(SORTED,SPLITERATOR[SORTED]=01,bitPosition=2) = 0000 0001 | 0000 0001 << 2 = 0000 0001 | 0000 0100 = 0000 0101
mask(ORDERED,SPLITERATOR[ORDERED]=01,bitPosition=4) = 0000 0101 | 0000 0001 << 4 = 0000 0101 | 0001 0000 = 0001 0101
mask(SIZED,SPLITERATOR[SIZED]=01,bitPosition=6) = 0001 0101 | 0000 0001 << 6 = 0001 0101 | 0100 0000 = 0101 0101
mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=00,bitPosition=24) = 0101 0101 | 0000 0000 << 24 = 0101 0101 | 0000 0000 = 0101 0101
mask(final) = 0000 0000 0000 0000 0000 0000 0101 0101(二进制)、85(十进制)
OP_MASK:
mask(init) = 0
mask(DISTINCT,SPLITERATOR[DISTINCT]=11,bitPosition=0) = 0000 0000 | 0000 0011 << 0 = 0000 0000 | 0000 0011 = 0000 0011
mask(SORTED,SPLITERATOR[SORTED]=11,bitPosition=2) = 0000 0011 | 0000 0011 << 2 = 0000 0011 | 0000 1100 = 0000 1111
mask(ORDERED,SPLITERATOR[ORDERED]=11,bitPosition=4) = 0000 1111 | 0000 0011 << 4 = 0000 1111 | 0011 0000 = 0011 1111
mask(SIZED,SPLITERATOR[SIZED]=10,bitPosition=6) = 0011 1111 | 0000 0010 << 6 = 0011 1111 | 1000 0000 = 1011 1111
mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=01,bitPosition=24) = 1011 1111 | 0000 0001 << 24 = 1011 1111 | 0100 0000 0000 0000 0000 0000 0000 = 0100 0000 0000 0000 0000 1011 1111
mask(final) = 0000 0000 1000 0000 0000 0000 1011 1111(二进制)、16777407(十进制)
TERMINAL_OP_MASK:
mask(init) = 0
mask(DISTINCT,SPLITERATOR[DISTINCT]=00,bitPosition=0) = 0000 0000 | 0000 0000 << 0 = 0000 0000 | 0000 0000 = 0000 0000
mask(SORTED,SPLITERATOR[SORTED]=00,bitPosition=2) = 0000 0000 | 0000 0000 << 2 = 0000 0000 | 0000 0000 = 0000 0000
mask(ORDERED,SPLITERATOR[ORDERED]=10,bitPosition=4) = 0000 0000 | 0000 0010 << 4 = 0000 0000 | 0010 0000 = 0010 0000
mask(SIZED,SPLITERATOR[SIZED]=00,bitPosition=6) = 0010 0000 | 0000 0000 << 6 = 0010 0000 | 0000 0000 = 0010 0000
mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=01,bitPosition=24) = 0010 0000 | 0000 0001 << 24 = 0010 0000 | 0001 0000 0000 0000 0000 0000 0000 = 0001 0000 0000 0000 0000 0010 0000
mask(final) = 0000 0001 0000 0000 0000 0000 0010 0000(二进制)、16777248(十进制)
UPSTREAM_TERMINAL_OP_MASK:
mask(init) = 0
mask(DISTINCT,SPLITERATOR[DISTINCT]=00,bitPosition=0) = 0000 0000 | 0000 0000 << 0 = 0000 0000 | 0000 0000 = 0000 0000
mask(SORTED,SPLITERATOR[SORTED]=00,bitPosition=2) = 0000 0000 | 0000 0000 << 2 = 0000 0000 | 0000 0000 = 0000 0000
mask(ORDERED,SPLITERATOR[ORDERED]=10,bitPosition=4) = 0000 0000 | 0000 0010 << 4 = 0000 0000 | 0010 0000 = 0010 0000
mask(SIZED,SPLITERATOR[SIZED]=00,bitPosition=6) = 0010 0000 | 0000 0000 << 6 = 0010 0000 | 0000 0000 = 0010 0000
mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=00,bitPosition=24) = 0010 0000 | 0000 0000 << 24 = 0010 0000 | 0000 0000 = 0010 0000
mask(final) = 0000 0000 0000 0000 0000 0000 0010 0000(二进制)、32(十进制)
相关的方法和属性如下:
enum StreamOpFlag {
// SPLITERATOR类型的标志bit map
static final int SPLITERATOR_CHARACTERISTICS_MASK = createMask(Type.SPLITERATOR);
// STREAM类型的标志bit map
static final int STREAM_MASK = createMask(Type.STREAM);
// OP类型的标志bit map
static final int OP_MASK = createMask(Type.OP);
// TERMINAL_OP类型的标志bit map
static final int TERMINAL_OP_MASK = createMask(Type.TERMINAL_OP);
// UPSTREAM_TERMINAL_OP类型的标志bit map
static final int UPSTREAM_TERMINAL_OP_MASK = createMask(Type.UPSTREAM_TERMINAL_OP);
// 基于Stream类型,创建对应类型填充所有标志的bit map
private static int createMask(Type t) {
int mask = 0;
for (StreamOpFlag flag : StreamOpFlag.values()) {
mask |= flag.maskTable.get(t) << flag.bitPosition;
}
return mask;
}
// 构造一个标志本身的掩码,就是所有标志都采用保留位表示,目前作为flags == 0时候的初始值
private static final int FLAG_MASK = createFlagMask();
// 构造一个包含全部标志中的preserve位的bit map,按照目前来看是暂时是一个固定值,二进制表示为0011 0000 0000 0000 0000 1111 1111
private static int createFlagMask() {
int mask = 0;
for (StreamOpFlag flag : StreamOpFlag.values()) {
mask |= flag.preserve;
}
return mask;
}
// 构造一个Stream类型包含全部标志中的set位的bit map,这里直接使用了STREAM_MASK,按照目前来看是暂时是一个固定值,二进制表示为0000 0000 0000 0000 0000 0000 0101 0101
private static final int FLAG_MASK_IS = STREAM_MASK;
// 构造一个Stream类型包含全部标志中的clear位的bit map,按照目前来看是暂时是一个固定值,二进制表示为0000 0000 0000 0000 0000 0000 1010 1010
private static final int FLAG_MASK_NOT = STREAM_MASK << 1;
// 初始化操作的标志bit map,目前来看就是Stream的头节点初始化时候需要合并在flags里面的初始化值,照目前来看是暂时是一个固定值,二进制表示为0000 0000 0000 0000 0000 0000 1111 1111
static final int INITIAL_OPS_VALUE = FLAG_MASK_IS | FLAG_MASK_NOT;
}
SPLITERATOR_CHARACTERISTICS_MASK
等5
个成员(见上面的Mask
计算例子)其实就是预先计算好对应的Stream
实体类型的所有StreamOpFlag
标志的bit map
,也就是之前那个展示Stream
的类型和标志的映射图的"横向"展示:
前面的分析已经相对详细,过程非常复杂,但是更复杂的Mask
应用还在后面的方法。Mask
的初始化就是提供给标志的合并(combine
)和转化(从Spliterator
中的characteristics
转化为flags
)操作的,见下面的方法:
enum StreamOpFlag {
// 这个方法完全没有注释,只使用在下面的combineOpFlags()方法中
// 从源码来看
// 入参flags == 0的时候,那么直接返回0011 0000 0000 0000 0000 1111 1111
// 入参flags != 0的时候,那么会把当前flags的所有set/inject、clear和preserve所在位在bit map中全部置为0,然后其他位全部置为1
private static int getMask(int flags) {
return (flags == 0)
? FLAG_MASK
: ~(flags | ((FLAG_MASK_IS & flags) << 1) | ((FLAG_MASK_NOT & flags) >> 1));
}
// 合并新的flags和前一个flags,这里还是用到老套路先和Mask按位与,再进行一次按位或
// 作为Stream的头节点的时候,prevCombOpFlags必须为INITIAL_OPS_VALUE
static int combineOpFlags(int newStreamOrOpFlags, int prevCombOpFlags) {
// 0x01 or 0x10 nibbles are transformed to 0x11
// 0x00 nibbles remain unchanged
// Then all the bits are flipped
// Then the result is logically or'ed with the operation flags.
return (prevCombOpFlags & StreamOpFlag.getMask(newStreamOrOpFlags)) | newStreamOrOpFlags;
}
// 通过合并后的flags,转换出Stream类型的flags
static int toStreamFlags(int combOpFlags) {
// By flipping the nibbles 0x11 become 0x00 and 0x01 become 0x10
// Shift left 1 to restore set flags and mask off anything other than the set flags
return ((~combOpFlags) >> 1) & FLAG_MASK_IS & combOpFlags;
}
// Stream的标志转换为Spliterator的characteristics
static int toCharacteristics(int streamFlags) {
return streamFlags & SPLITERATOR_CHARACTERISTICS_MASK;
}
// Spliterator的characteristics转换为Stream的标志,入参是Spliterator实例
static int fromCharacteristics(Spliterator<?> spliterator) {
int characteristics = spliterator.characteristics();
if ((characteristics & Spliterator.SORTED) != 0 && spliterator.getComparator() != null) {
// Do not propagate the SORTED characteristic if it does not correspond
// to a natural sort order
return characteristics & SPLITERATOR_CHARACTERISTICS_MASK & ~Spliterator.SORTED;
}
else {
return characteristics & SPLITERATOR_CHARACTERISTICS_MASK;
}
}
// Spliterator的characteristics转换为Stream的标志,入参是Spliterator的characteristics
static int fromCharacteristics(int characteristics) {
return characteristics & SPLITERATOR_CHARACTERISTICS_MASK;
}
}
这里的位运算很复杂,只展示简单的计算结果和相关功能:
combineOpFlags()
:用于合并新的flags
和上一个flags
,因为Stream
的数据结构是一个Pipeline
,后继节点需要合并前驱节点的flags
,例如前驱节点flags
是ORDERED.set
,当前新加入Pipeline
的节点(后继节点)的新flags
为SIZED.set
,那么在后继节点中应该合并前驱节点的标志,简单想象为SIZED.set | ORDERED.set
,如果是头节点,那么初始化头节点时候的flags
要合并INITIAL_OPS_VALUE
,这里举个例子:
int left = ORDERED.set | DISTINCT.set;
int right = SIZED.clear | SORTED.clear;
System.out.println("left:" + Integer.toBinaryString(left));
System.out.println("right:" + Integer.toBinaryString(right));
System.out.println("right mask:" + Integer.toBinaryString(getMask(right)));
System.out.println("combine:" + Integer.toBinaryString(combineOpFlags(right, left)));
// 输出结果
left:1010001
right:10001000
right mask:11111111111111111111111100110011
combine:10011001
characteristics
的转化问题:Spliterator
中的characteristics
可以通过简单的按位与转换为flags
的原因是Spliterator
中的characteristics
在设计时候本身就是和StreamOpFlag
匹配的,准确来说就是bit map
的位分布是匹配的,所以直接与SPLITERATOR_CHARACTERISTICS_MASK
做按位与即可,见下面的例子:
// 这里简单点只展示8 bit
SPLITERATOR_CHARACTERISTICS_MASK: 0101 0101
Spliterator.ORDERED: 0001 0000
StreamOpFlag.ORDERED.set: 0001 0000
至此,已经分析完StreamOpFlag
的完整实现,Mask
相关的方法限于篇幅就不打算详细展开,下面会开始分析Stream
中的"流水线"结构实现,因为习惯问题,下文的"标志"和"特性"两个词语会混用。
ReferencePipeline源码分析
既然Stream
具备流的特性,那么就需要一个链式数据结构,让元素能够从Source
一直往下"流动"和传递到每一个链节点,实现这种场景的常用数据结构就是双向链表(考虑需要回溯,单向链表不太合适),目前比较著名的实现有AQS
和Netty
中的ChannelHandlerContext
。例如Netty
中的流水线ChannelPipeline
设计如下:
对于这个双向链表的数据结构,Stream
中对应的类就是AbstractPipeline
,核心实现类在ReferencePipeline
和ReferencePipeline
的内部类。
主要接口
先简单展示AbstractPipeline
的核心父类方法定义,主要接父类是Stream
、BaseStream
和PipelineHelper
:
Stream
代表一个支持串行和并行聚合操作集合的元素序列,此顶层接口提供了流中间操作、终结操作和一些静态工厂方法的定义(由于方法太多,这里不全部列举),这个接口本质是一个建造器类型接口(对接中间操作来说),可以构成一个多中间操作,单终结操作的链,例如:
public interface Stream<T> extends BaseStream<T, Stream<T>> {
// 忽略其他代码
// 过滤Op
Stream<T> filter(Predicate<? super T> predicate);
// 映射Op
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
// 终结操作 - 遍历
void forEach(Consumer<? super T> action);
// 忽略其他代码
}
// init
Stream x = buildStream();
// chain: head -> filter(Op) -> map(Op) -> forEach(Terminal Op)
x.filter().map().forEach()
BaseStream
:Stream
的基础接口,定义流的迭代器、流的等效变体(并发处理变体、同步处理变体和不支持顺序处理元素变体)、并发和同步判断以及关闭相关方法
// T是元素类型,S是BaseStream<T, S>类型
// 流的基础接口,这里的流指定的支持同步执行和异步执行的聚合操作的元素序列
public interface BaseStream<T, S extends BaseStream<T, S>> extends AutoCloseable {
// 返回一个当前Stream实例中所有元素的迭代器
// 这是一个终结操作
Iterator<T> iterator();
// 返回一个当前Stream实例中所有元素的可拆分迭代器
Spliterator<T> spliterator();
// 当前的Stream实例是否支持并发
boolean isParallel();
// 返回一个等效的同步处理的Stream实例
S sequential();
// 返回一个等效的并发处理的Stream实例
S parallel();
// 返回一个等效的不支持StreamOpFlag.ORDERED特性的Stream实例
// 或者说支持StreamOpFlag.NOT_ORDERED的特性,也就返回的变体Stream在处理元素的时候不需要顺序处理
S unordered();
// 返回一个添加了close处理器的Stream实例,close处理器会在下面的close方法中回调
S onClose(Runnable closeHandler);
// 关闭当前Stream实例,回调关联本Stream的所有close处理器
@Override
void close();
}
PipelineHelper
:
abstract class PipelineHelper<P_OUT> {
// 获取流的流水线的数据源的"形状",其实就是数据源元素的类型
// 主要有四种类型:REFERENCE(除了int、long和double之外的引用类型)、INT_VALUE、LONG_VALUE和DOUBLE_VALUE
abstract StreamShape getSourceShape();
// 获取合并流和流操作的标志,合并的标志包括流的数据源标志、中间操作标志和终结操作标志
// 从实现上看是当前流管道节点合并前面所有节点和自身节点标志的所有标志
abstract int getStreamAndOpFlags();
// 如果当前的流管道节点的合并标志集合支持SIZED,则调用Spliterator.getExactSizeIfKnown()返回数据源中的准确元素数量,否则返回-1
abstract<P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
// 相当于调用下面的方法组合:copyInto(wrapSink(sink), spliterator)
abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
// 发送所有来自Spliterator中的元素到Sink中,如果支持SHORT_CIRCUIT标志,则会调用copyIntoWithCancel
abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
// 发送所有来自Spliterator中的元素到Sink中,Sink处理完每个元素后会检查Sink#cancellationRequested()方法的状态去判断是否中断推送元素的操作
abstract <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
// 创建接收元素类型为P_IN的Sink实例,实现PipelineHelper中描述的所有中间操作,用这个Sink去包装传入的Sink实例(传入的Sink实例的元素类型为PipelineHelper的输出类型P_OUT)
abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
// 包装传入的spliterator,从源码来看,在Stream链的头节点调用会直接返回传入的实例,如果在非头节点调用会委托到StreamSpliterators.WrappingSpliterator()方法进行包装
// 这个方法在源码中没有API注释
abstract<P_IN> Spliterator<P_OUT> wrapSpliterator(Spliterator<P_IN> spliterator);
// 构造一个兼容当前Stream元素"形状"的Node.Builder实例
// 从源码来看直接委托到Nodes.builder()方法
abstract Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown,
IntFunction<P_OUT[]> generator);
// Stream流水线所有阶段(节点)应用于数据源Spliterator,输出的元素作为结果收集起来转化为Node实例
// 此方法应用于toArray()方法的计算,本质上是一个终结操作
abstract<P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator,
boolean flatten,
IntFunction<P_OUT[]> generator);
}
注意一点(重复3
次):
- 这里把同步流称为同步处理|执行的流,“并行流"称为并发处理|执行的流,因为并行流有歧义,实际上只是并发执行,不是并行执行
- 这里把同步流称为同步处理|执行的流,“并行流"称为并发处理|执行的流,因为并行流有歧义,实际上只是并发执行,不是并行执行
- 这里把同步流称为同步处理|执行的流,“并行流"称为并发处理|执行的流,因为并行流有歧义,实际上只是并发执行,不是并行执行
Sink 和引用类型链
PipelineHelper
的几个方法中存在Sink
这个接口,上一节没有分析,这一小节会详细展开。Stream
在构建的时候虽然是一个双向链表的结构,但是在最终应用终结操作的时候,会把所有操作转化为引用类型链(ChainedReference
),记得之前也提到过这种类似于多层包装器的编程模式,简化一下模型如下:
public class WrapperApp {
interface Wrapper {
void doAction();
}
public static void main(String[] args) {
AtomicInteger counter = new AtomicInteger(0);
Wrapper first = () -> System.out.printf("wrapper [depth => %d] invoke\n", counter.incrementAndGet());
Wrapper second = () -> {
first.doAction();
System.out.printf("wrapper [depth => %d] invoke\n", counter.incrementAndGet());
};
second.doAction();
}
}
// 控制台输出
wrapper [depth => 1] invoke
wrapper [depth => 2] invoke
上面的例子有点突兀,两个不同Sink
的实现可以做到无感知融合,举另一个例子如下:
public interface Sink<T> extends Consumer<T> {
default void begin(long size) {
}
default void end() {
}
abstract class ChainedReference<T, OUT> implements Sink<T> {
protected final Sink<OUT> downstream;
public ChainedReference(Sink<OUT> downstream) {
this.downstream = downstream;
}
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
public class ReferenceChain<OUT, R> {
/**
* sink chain
*/
private final List<Supplier<Sink<?>>> sinkBuilders = new ArrayList<>();
/**
* current sink
*/
private final AtomicReference<Sink> sinkReference = new AtomicReference<>();
public ReferenceChain<OUT, R> filter(Predicate<OUT> predicate) {
//filter
sinkBuilders.add(() -> {
Sink<OUT> prevSink = (Sink<OUT>) sinkReference.get();
Sink.ChainedReference<OUT, OUT> currentSink = new Sink.ChainedReference<>(prevSink) {
@Override
public void accept(OUT out) {
if (predicate.test(out)) {
downstream.accept(out);
}
}
};
sinkReference.set(currentSink);
return currentSink;
});
return this;
}
public ReferenceChain<OUT, R> map(Function<OUT, R> function) {
// map
sinkBuilders.add(() -> {
Sink<R> prevSink = (Sink<R>) sinkReference.get();
Sink.ChainedReference<OUT, R> currentSink = new Sink.ChainedReference<>(prevSink) {
@Override
public void accept(OUT in) {
downstream.accept(function.apply(in));
}
};
sinkReference.set(currentSink);
return currentSink;
});
return this;
}
public void forEachPrint(Collection<OUT> collection) {
forEachPrint(collection, false);
}
public void forEachPrint(Collection<OUT> collection, boolean reverse) {
Spliterator<OUT> spliterator = collection.spliterator();
// 这个是类似于terminal op
Sink<OUT> sink = System.out::println;
sinkReference.set(sink);
Sink<OUT> stage = sink;
// 反向包装 -> 正向遍历
if (reverse) {
for (int i = 0; i <= sinkBuilders.size() - 1; i++) {
Supplier<Sink<?>> supplier = sinkBuilders.get(i);
stage = (Sink<OUT>) supplier.get();
}
} else {
// 正向包装 -> 反向遍历
for (int i = sinkBuilders.size() - 1; i >= 0; i--) {
Supplier<Sink<?>> supplier = sinkBuilders.get(i);
stage = (Sink<OUT>) supplier.get();
}
}
Sink<OUT> finalStage = stage;
spliterator.forEachRemaining(finalStage);
}
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
list.add(12);
ReferenceChain<Integer, Integer> chain = new ReferenceChain<>();
// filter -> map -> for each
chain.filter(item -> item > 10)
.map(item -> item * 2)
.forEachPrint(list);
}
}
// 输出结果
24
执行的流程如下:
多层包装器的编程模式的核心要领就是:
- 绝大部分操作可以转换为
java.util.function.Consumer
的实现,也就是实现accept(T t)
方法完成对传入的元素进行处理 - 先处理的
Sink
总是以后处理的Sink
为入参,在自身处理方法中判断和回调传入的Sink
的处理方法回调,也就是构建引用链的时候,需要从后往前构建,这种方式的实现逻辑可以参考AbstractPipeline#wrapSink()
,例如:
// 目标顺序:filter -> map
Sink mapSink = new Sink(inputSink){
private Function mapper;
public void accept(E ele) {
inputSink.accept(mapper.apply(ele))
}
}
Sink filterSink = new Sink(mapSink){
private Predicate predicate;
public void accept(E ele) {
if(predicate.test(ele)){
mapSink.accept(ele);
}
}
}
- 由上一点得知,一般来说,最后的终结操作会应用在引用链的第一个
Sink
上
上面的代码并非笔者虚构出来,可见java.util.stream.Sink
的源码:
// 继承自Consumer,主要是继承函数式接口方法void accept(T t)
interface Sink<T> extends Consumer<T> {
// 重置当前Sink的状态(为了接收一个新的数据集),传入的size是推送到downstream的准确数据量,无法评估数据量则传入-1
default void begin(long size) {}
//
default void end() {}
// 返回true的时候表示当前的Sink不会接收数据
default boolean cancellationRequested() {
return false;
}
// 特化方法,接受一个int类型的值
default void accept(int value) {
throw new IllegalStateException("called wrong accept method");
}
// 特化方法,接受一个long类型的值
default void accept(long value) {
throw new IllegalStateException("called wrong accept method");
}
// 特化方法,接受一个double类型的值
default void accept(double value) {
throw new IllegalStateException("called wrong accept method");
}
// 引用类型链,准确来说是Sink链
abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
// 下一个Sink
protected final Sink<? super E_OUT> downstream;
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
// 暂时忽略Int、Long、Double的特化类型场景
}
如果用过RxJava
或者Project-Reactor
,Sink
更像是Subscriber
,多个Subscriber
组成了ChainedReference
(Sink Chain
,可以理解为一个复合的Subscriber
),而Terminal Op
则类似于Publisher
,只有在Subscriber
订阅Publisher
的时候才会进行数据的处理,这里是应用了Reactive
编程模式。
AbstractPipeline和ReferencePipeline的实现
AbstractPipeline
和ReferencePipeline
都是抽象类,AbstractPipeline
用于构建Pipeline
的数据结构,提供一些Shape
相关的抽象方法给ReferencePipeline
实现,而ReferencePipeline
就是Stream
中Pipeline
的基础类型,从源码上看,Stream
链式(管道式)结构的头节点和操作节点都是ReferencePipeline
的子类。先看AbstractPipeline
的成员变量和构造函数:
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
// 流管道链式结构的头节点(只有当前的AbstractPipeline引用是头节点,此变量才会被赋值,非头节点为NULL)
@SuppressWarnings("rawtypes")
private final AbstractPipeline sourceStage;
// 流管道链式结构的upstream,也就是上一个节点,如果是头节点此引用为NULL
@SuppressWarnings("rawtypes")
private final AbstractPipeline previousStage;
// 合并数据源的标志和操作标志的掩码
protected final int sourceOrOpFlags;
// 流管道链式结构的下一个节点,如果是头节点此引用为NULL
@SuppressWarnings("rawtypes")
private AbstractPipeline nextStage;
// 流的深度
// 串行执行的流中,表示当前流管道实例中中间操作节点的个数(除去头节点和终结操作)
// 并发执行的流中,表示当前流管道实例中中间操作节点和前一个有状态操作节点之间的节点个数
private int depth;
// 合并了所有数据源的标志、操作标志和当前的节点(AbstractPipeline)实例的标志,也就是当前的节点可以基于此属性得知所有支持的标志
private int combinedFlags;
// 数据源的Spliterator实例
private Spliterator<?> sourceSpliterator;
// 数据源的Spliterator实例封装的Supplier实例
private Supplier<? extends Spliterator<?>> sourceSupplier;
// 标记当前的流节点是否被连接或者消费掉,不能重复连接或者重复消费
private boolean linkedOrConsumed;
// 标记当前的流管道链式结构中是否存在有状态的操作节点,这个属性只会在头节点中有效
private boolean sourceAnyStateful;
// 数据源关闭动作,这个属性只会在头节点中有效,由sourceStage持有
private Runnable sourceCloseAction;
// 标记当前流是否并发执行
private boolean parallel;
// 流管道结构头节点的父构造方法,使用数据源的Spliterator实例封装的Supplier实例
AbstractPipeline(Supplier<? extends Spliterator<?>> source,
int sourceFlags, boolean parallel) {
// 头节点的前驱节点置为NULL
this.previousStage = null;
this.sourceSupplier = source;
this.sourceStage = this;
// 合并传入的源标志和流标志的掩码
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
// 初始化合并标志集合为sourceOrOpFlags和所有流操作标志的初始化值
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
// 深度设置为0
this.depth = 0;
this.parallel = parallel;
}
// 流管道结构头节点的父构造方法,使用数据源的Spliterator实例
AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
// 头节点的前驱节点置为NULL
this.previousStage = null;
this.sourceSpliterator = source;
this.sourceStage = this;
// 合并传入的源标志和流标志的掩码
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
// 初始化合并标志集合为sourceOrOpFlags和所有流操作标志的初始化值
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
// 流管道结构中间操作节点的父构造方法
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
// 设置前驱节点的后继节点引用为当前的AbstractPipeline实例
previousStage.nextStage = this;
// 设置前驱节点引用为传入的前驱节点实例
this.previousStage = previousStage;
// 合并传入的中间操作标志和流操作标志的掩码
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
// 合并标志集合为传入的标志和前驱节点的标志集合
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
// 赋值sourceStage为前驱节点的sourceStage
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
// 标记当前的流存在有状态操作
sourceStage.sourceAnyStateful = true;
// 深度设置为前驱节点深度加1
this.depth = previousStage.depth + 1;
}
// 省略其他方法
}
至此,可以看出流管道的数据结构:
Terminal Op
不参与管道链式结构的构建。接着看AbstractPipeline
中的终结求值方法(Terminal evaluation methods
):
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
// 省略其他方法
// 基于终结操作进行求值,这个是Stream执行的常用核心方法,常用于collect()这类终结操作
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
// 判断linkedOrConsumed,以防多次终结求值,也就是每个终结操作只能执行一次
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
// 如果当前流支持并发执行,则委托到TerminalOp.evaluateParallel(),如果当前流只支持同步执行,则委托到TerminalOp.evaluateSequential()
// 这里注意传入到TerminalOp中的方法参数分别是this(PipelineHelper类型)和数据源Spliterator
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
// 基于当前的流实例转换为最终的Node实例,传入的IntFunction用于创建数组实例
// 此终结方法一般用于toArray()这类终结操作
@SuppressWarnings("unchecked")
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
// If the last intermediate operation is stateful then
// evaluate directly to avoid an extra collection step
// 当前流支持并发执行,并且最后一个中间操作是有状态,则委托到opEvaluateParallel(),否则委托到evaluate(),这两个都是AbstractPipeline中的方法
if (isParallel() && previousStage != null && opIsStateful()) {
// Set the depth of this, last, pipeline stage to zero to slice the
// pipeline such that this operation will not be included in the
// upstream slice and upstream operations will not be included
// in this slice
depth = 0;
return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
}
else {
return evaluate(sourceSpliterator(0), true, generator);
}
}
// 这个方法比较简单,就是获取当前流的数据源所在的Spliterator,并且确保流已经消费,一般用于forEach()这类终结操作
final Spliterator<E_OUT> sourceStageSpliterator() {
if (this != sourceStage)
throw new IllegalStateException();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
if (sourceStage.sourceSpliterator != null) {
@SuppressWarnings("unchecked")
Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
return s;
}
else if (sourceStage.sourceSupplier != null) {
@SuppressWarnings("unchecked")
Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
return s;
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
}
// 省略其他方法
}
AbstractPipeline
中实现了BaseStream
的方法:
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
// 省略其他方法
// 设置头节点的parallel属性为false,返回自身实例,表示当前的流是同步执行的
@Override
@SuppressWarnings("unchecked")
public final S sequential() {
sourceStage.parallel = false;
return (S) this;
}
// 设置头节点的parallel属性为true,返回自身实例,表示当前的流是并发执行的
@Override
@SuppressWarnings("unchecked")
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}
// 流关闭操作,设置linkedOrConsumed为true,数据源的Spliterator相关引用置为NULL,置空并且回调sourceCloseAction钩子实例
@Override
public void close() {
linkedOrConsumed = true;
sourceSupplier = null;
sourceSpliterator = null;
if (sourceStage.sourceCloseAction != null) {
Runnable closeAction = sourceStage.sourceCloseAction;
sourceStage.sourceCloseAction = null;
closeAction.run();
}
}
// 返回一个添加了close处理器的Stream实例,close处理器会在下面的close方法中回调
// 如果本来持有的引用sourceStage.sourceCloseAction非空,会使用传入的closeHandler与sourceStage.sourceCloseAction进行合并
@Override
@SuppressWarnings("unchecked")
public S onClose(Runnable closeHandler) {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
Objects.requireNonNull(closeHandler);
Runnable existingHandler = sourceStage.sourceCloseAction;
sourceStage.sourceCloseAction =
(existingHandler == null)
? closeHandler
: Streams.composeWithExceptions(existingHandler, closeHandler);
return (S) this;
}
// Primitive specialization use co-variant overrides, hence is not final
// 返回当前流实例中所有元素的Spliterator实例
@Override
@SuppressWarnings("unchecked")
public Spliterator<E_OUT> spliterator() {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
// 标记当前节点被链接或者消费
linkedOrConsumed = true;
// 如果当前节点为头节点,那么返回sourceStage.sourceSpliterator或者延时加载的sourceStage.sourceSupplier(延时加载封装由lazySpliterator实现)
if (this == sourceStage) {
if (sourceStage.sourceSpliterator != null) {
@SuppressWarnings("unchecked")
Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
return s;
}
else if (sourceStage.sourceSupplier != null) {
@SuppressWarnings("unchecked")
Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;
sourceStage.sourceSupplier = null;
return lazySpliterator(s);
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
}
else {
// 如果当前节点不是头节点,重新对sourceSpliterator进行包装,包装后的实例为WrappingSpliterator
return wrap(this, () -> sourceSpliterator(0), isParallel());
}
}
// 当前流实例是否并发执行,从头节点的parallel属性进行判断
@Override
public final boolean isParallel() {
return sourceStage.parallel;
}
// 从当前combinedFlags中获取数据源标志和所有流中间操作标志的集合
final int getStreamFlags() {
return StreamOpFlag.toStreamFlags(combinedFlags);
}
/**
* Get the source spliterator for this pipeline stage. For a sequential or
* stateless parallel pipeline, this is the source spliterator. For a
* stateful parallel pipeline, this is a spliterator describing the results
* of all computations up to and including the most recent stateful
* operation.
*/
@SuppressWarnings("unchecked")
private Spliterator<?> sourceSpliterator(int terminalFlags) {
// 从sourceStage.sourceSpliterator或者sourceStage.sourceSupplier中获取当前流实例中的Spliterator实例,确保必定存在,否则抛出IllegalStateException
Spliterator<?> spliterator = null;
if (sourceStage.sourceSpliterator != null) {
spliterator = sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
}
else if (sourceStage.sourceSupplier != null) {
spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
// 下面这段逻辑是对于并发执行并且存在有状态操作的节点,那么需要重新计算节点的深度和节点的合并标志集合
// 这里只提一下计算过程,从头节点的后继节点开始遍历到当前节点,如果被遍历的节点时有状态的,那么对depth、combinedFlags和spliterator会进行重新计算
// depth一旦出现有状态节点就会重置为0,然后从1重新开始增加
// combinedFlags会重新合并sourceOrOpFlags、SHORT_CIRCUIT(如果sourceOrOpFlags支持)和Spliterator.SIZED
// spliterator简单来看就是从并发执行的toArray()=>Array数组=>Spliterator实例
if (isParallel() && sourceStage.sourceAnyStateful) {
// Adapt the source spliterator, evaluating each stateful op
// in the pipeline up to and including this pipeline stage.
// The depth and flags of each pipeline stage are adjusted accordingly.
int depth = 1;
for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
u != e;
u = p, p = p.nextStage) {
int thisOpFlags = p.sourceOrOpFlags;
if (p.opIsStateful()) {
depth = 0;
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
// Clear the short circuit flag for next pipeline stage
// This stage encapsulates short-circuiting, the next
// stage may not have any short-circuit operations, and
// if so spliterator.forEachRemaining should be used
// for traversal
thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
}
spliterator = p.opEvaluateParallelLazy(u, spliterator);
// Inject or clear SIZED on the source pipeline stage
// based on the stage's spliterator
thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
: (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
}
p.depth = depth++;
p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
}
}
// 如果传入的terminalFlags标志不为0,则当前节点的combinedFlags会合并terminalFlags
if (terminalFlags != 0) {
// Apply flags from the terminal operation to last pipeline stage
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
return spliterator;
}
// 省略其他方法
}
AbstractPipeline
中实现了PipelineHelper
的方法:
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
// 省略其他方法
// 获取数据源元素的类型,这里的类型包括引用、int、double和float
// 其实实现上就是获取depth<=0的第一个节点的输出类型
@Override
final StreamShape getSourceShape() {
@SuppressWarnings("rawtypes")
AbstractPipeline p = AbstractPipeline.this;
while (p.depth > 0) {
p = p.previousStage;
}
return p.getOutputShape();
}
// 基于当前节点的标志集合判断和返回流中待处理的元素数量,无法获取则返回-1
@Override
final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1;
}
// 通过流管道链式结构构建元素引用链,再遍历元素引用链
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
// 遍历元素引用链
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
// 当前节点不支持SHORT_CIRCUIT(短路)特性,则直接遍历元素引用链,不支持短路跳出
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
// 支持短路(中途取消)遍历元素引用链
copyIntoWithCancel(wrappedSink, spliterator);
}
}
// 支持短路(中途取消)遍历元素引用链
@Override
@SuppressWarnings("unchecked")
final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
@SuppressWarnings({"rawtypes","unchecked"})
AbstractPipeline p = AbstractPipeline.this;
// 基于当前节点,获取流管道链式结构中第最后一个depth=0的前驱节点
while (p.depth > 0) {
p = p.previousStage;
}
wrappedSink.begin(spliterator.getExactSizeIfKnown());
// 委托到forEachWithCancel()进行遍历
boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);
wrappedSink.end();
return cancelled;
}
// 返回当前节点的标志集合
@Override
final int getStreamAndOpFlags() {
return combinedFlags;
}
// 当前节点标志集合中是否支持ORDERED
final boolean isOrdered() {
return StreamOpFlag.ORDERED.isKnown(combinedFlags);
}
// 构建元素引用链,生成一个多重包装的Sink(WrapSink),这里的逻辑可以看前面的分析章节
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
// 这里遍历的时候,总是从当前节点向前驱节点遍历,也就是传入的sink实例总是包裹在最里面一层执行
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
// 包装数据源的Spliterator,如果depth=0,则直接返回sourceSpliterator,否则返回的是延迟加载的WrappingSpliterator
@Override
@SuppressWarnings("unchecked")
final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {
if (depth == 0) {
return (Spliterator<E_OUT>) sourceSpliterator;
}
else {
return wrap(this, () -> sourceSpliterator, isParallel());
}
}
// 计算Node实例,这个方法用于toArray()方法系列,是一个终结操作,下面会另开章节详细分析
@Override
@SuppressWarnings("unchecked")
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
boolean flatten,
IntFunction<E_OUT[]> generator) {
if (isParallel()) {
// @@@ Optimize if op of this pipeline stage is a stateful op
return evaluateToNode(this, spliterator, flatten, generator);
}
else {
Node.Builder<E_OUT> nb = makeNodeBuilder(
exactOutputSizeIfKnown(spliterator), generator);
return wrapAndCopyInto(nb, spliterator).build();
}
}
// 省略其他方法
}
AbstractPipeline
中剩余的待如XXYYZZPipeline
等子类实现的抽象方法:
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
// 省略其他方法
// 获取当前流的输出"形状",REFERENCE、INT_VALUE、LONG_VALUE或者DOUBLE_VALUE
abstract StreamShape getOutputShape();
// 收集当前流的所有输出元素,转化为一个适配当前流输出"形状"的Node实例
abstract <P_IN> Node<E_OUT> evaluateToNode(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator,
boolean flattenTree,
IntFunction<E_OUT[]> generator);
// 包装Spliterator为WrappingSpliterator实例
abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph,
Supplier<Spliterator<P_IN>> supplier,
boolean isParallel);
// 包装Spliterator为DelegatingSpliterator实例
abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph,
Supplier<Spliterator<P_IN>> supplier,
boolean isParallel);
// 基于Sink遍历Spliterator中的元素,支持取消操作,简单理解就是支持cancel的tryAdvance方法
abstract boolean forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);
// 返回Node的建造器实例,用于toArray方法系列
abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown,
IntFunction<E_OUT[]> generator);
// 判断当前的操作(节点)是否有状态,如果是有状态的操作,必须覆盖opEvaluateParallel方法
abstract boolean opIsStateful();
// 当前操作生成的结果会作为传入的Sink实例的入参,这是一个包装Sink的过程,通俗理解就是之前提到的元素引用链添加一个新的链节点,这个方法算是流执行的一个核心方法
abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);
// 并发执行的操作节点求值
<P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator,
IntFunction<E_OUT[]> generator) {
throw new UnsupportedOperationException("Parallel evaluation is not supported");
}
// 并发执行的操作节点惰性求值
@SuppressWarnings("unchecked")
<P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator) {
return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
}
// 省略其他方法
}
这里提到的抽象方法opWrapSink()
其实就是元素引用链的添加链节点的方法,它的实现逻辑见子类,这里只考虑非特化子类ReferencePipeline
的部分源码:
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
// 构造函数,用于头节点,传入基于Supplier封装的Spliterator实例作为数据源,数据源的标志集合和是否支持并发执行的判断标记
ReferencePipeline(Supplier<? extends Spliterator<?>> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
// 构造函数,用于头节点,传入Spliterator实例作为数据源,数据源的标志集合和是否支持并发执行的判断标记
ReferencePipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
// 构造函数,用于中间节点,传入上一个流管道节点的实例(前驱节点)和当前操作节点支持的标志集合
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
super(upstream, opFlags);
}
// 这里流的输出"形状"固定为REFERENCE
@Override
final StreamShape getOutputShape() {
return StreamShape.REFERENCE;
}
// 转换当前流实例为Node实例,应用于toArray方法,后面详细分析终结操作的时候再展开
@Override
final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator,
boolean flattenTree,
IntFunction<P_OUT[]> generator) {
return Nodes.collect(helper, spliterator, flattenTree, generator);
}
// 包装Spliterator=>WrappingSpliterator
@Override
final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
Supplier<Spliterator<P_IN>> supplier,
boolean isParallel) {
return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
}
// 包装Spliterator=>DelegatingSpliterator,实现惰性加载
@Override
final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {
return new StreamSpliterators.DelegatingSpliterator<>(supplier);
}
// 遍历Spliterator中的元素,基于传入的Sink实例进行处理,支持Cancel操作
@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
boolean cancelled;
do { } while (!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink));
return cancelled;
}
// 构造Node建造器实例
@Override
final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
return Nodes.builder(exactSizeIfKnown, generator);
}
// 基于当前流的Spliterator生成迭代器实例
@Override
public final Iterator<P_OUT> iterator() {
return Spliterators.iterator(spliterator());
}
// 省略其他OP的代码
// 流管道结构的头节点
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
// 构造函数,用于头节点,传入基于Supplier封装的Spliterator实例作为数据源,数据源的标志集合和是否支持并发执行的判断标记
Head(Supplier<? extends Spliterator<?>> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
// 构造函数,用于头节点,传入Spliterator实例作为数据源,数据源的标志集合和是否支持并发执行的判断标记
Head(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
// 不支持判断是否状态操作
@Override
final boolean opIsStateful() {
throw new UnsupportedOperationException();
}
// 不支持包装Sink实例
@Override
final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
throw new UnsupportedOperationException();
}
// 区分同步异步执行forEach,同步则简单理解为调用Spliterator.forEachRemaining,异步则调用终结操作forEach
@Override
public void forEach(Consumer<? super E_OUT> action) {
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEach(action);
}
}
// 区分同步异步执行forEachOrdered,同步则简单理解为调用Spliterator.forEachRemaining,异步则调用终结操作forEachOrdered
@Override
public void forEachOrdered(Consumer<? super E_OUT> action) {
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEachOrdered(action);
}
}
}
// 无状态操作节点的父类
abstract static class StatelessOp<E_IN, E_OUT>
extends ReferencePipeline<E_IN, E_OUT> {
// 基于上一个节点引用、输入元素"形状"和当前节点支持的标志集合创建StatelessOp实例
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape,
int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
// 操作状态标记设置为无状态
@Override
final boolean opIsStateful() {
return false;
}
}
// 有状态操作节点的父类
abstract static class StatefulOp<E_IN, E_OUT>
extends ReferencePipeline<E_IN, E_OUT> {
// 基于上一个节点引用、输入元素"形状"和当前节点支持的标志集合创建StatefulOp实例
StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape,
int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
// 操作状态标记设置为有状态
@Override
final boolean opIsStateful() {
return true;
}
// 前面也提到,节点操作异步求值的方法在无状态节点下必须覆盖,这里重新把这个方法抽象,子类必须实现
@Override
abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator,
IntFunction<E_OUT[]> generator);
}
}
这里重重重点分析一下ReferencePipeline
中的wrapSink
方法实现:
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
入参是一个Sink
实例,返回值也是一个Sink
实例,里面的for
循环是基于当前的AbstractPipeline
节点向前遍历,直到depth
为0
的节点跳出循环,而depth
为0
意味着该节点必定为头节点,也就是该循环是遍历当前节点到头节点的后继节点,Sink
是"向前包装的”,也就是处于链后面的节点Sink
总是会作为其前驱节点的opWrapSink()
方法的入参,在同步执行流求值计算的时候,前驱节点的Sink
处理完元素后就会通过downstream
引用(其实就是后驱节点的Sink
)调用其accept()
把元素或者处理完的元素结果传递进去,激活下一个Sink
,以此类推。另外,ReferencePipeline
的三个内部类Head
、StatelessOp
和StatefulOp
就是流的节点类,其中只有Head
是非抽象类,代表流管道结构(或者说双向链表结构)的头节点,StatelessOp
(无状态操作)和StatefulOp
(有状态操作)的子类构成了流管道结构的操作节点或者是终结操作。在忽略是否有状态操作的前提下看ReferencePipeline
,它只是流数据结构的承载体,表面上看到的双向链表结构在流的求值计算过程中并不会进行直接遍历每个节点进行求值,而是先转化成一个多层包装的Sink
,也就是前文笔者提到的元素引用链后者前一句分析的Sink
元素处理以及传递,正确来说应该是一个Sink
栈或者Sink
包装器,它的实现可以类比为现实生活中的洋葱,或者编程模式中的AOP
编程模式。形象一点的描述如下:
Head(Spliterator) -> Op(filter) -> Op(map) -> Op(sorted) -> Terminal Op(forEach)
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
forEach ele in Spliterator:
Sink[filter](ele){
if filter process == true:
Sink[map](ele){
ele = mapper(ele)
Sink[sorted](ele){
var array
begin:
accept(ele):
add ele to array
end:
sort ele in array
}
}
}
终结操作forEach
是目前分析源码中最简单的实现,下面会详细分析每种终结操作的实现细节。
流中间操作的源码实现
限于篇幅,这里只能挑选一部分的中间Op
进行分析。流的中间操作基本都是由BaseStream
接口定义,在ReferencePipeline
中进行实现,这里挑选比较常用的filter
、map
和sorted
进行分析。先看filter
:
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
// 暂时省略其他代码
// filter操作,泛型参数Predicate类型接受一个任意类型(这里考虑到泛型擦除)的元素,输出布尔值,它是一个无状态操作
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
// 这里注意到,StatelessOp的第一个参数是指upstream,也就是理解为上一个节点,这里使用了this,意味着upstream为当前的ReferencePipeline实例,元素"形状"为引用类型,操作标志位不支持SIZED
// 在AbstractPipeline,previousStage指向了this,当前的节点就是StatelessOp[filter]实例,那么前驱节点this的后继节点引用nextStage就指向了StatelessOp[filter]实例
// 也就是StatelessOp[filter].previousStage = this; this.nextStage = StatelessOp[filter]; ===> 也就是这个看起来简单的new StatelessOp()其实已经把自身加入到管道中
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
// 这里通知下一个节点的Sink.begin(),由于filter方法不感知元素数量,所以传值-1
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
// 基于输入的Predicate实例判断当前处理元素是否符合判断,只有判断结果为true才会把元素原封不动直接传递到下一个Sink
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
// 暂时省略其他代码
}
接着是map
:
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
// 暂时省略其他代码
// map操作,基于传入的Function实例做映射转换(P_OUT->R),它是一个无状态操作
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
// upstream为当前的ReferencePipeline实例,元素"形状"为引用类型,操作标志位不支持SORTED和DISTINCT
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
// 基于传入的Function实例转换元素后把转换结果传递到下一个Sink
downstream.accept(mapper.apply(u));
}
};
}
};
}
// 暂时省略其他代码
}
然后是sorted
,sorted
操作会相对复杂一点:
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
// 暂时省略其他代码
// sorted操作,基于传入的Comparator实例对处理的元素进行排序,从源码中看,它是一个有状态操作
@Override
public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
return SortedOps.makeRef(this, comparator);
}
// 暂时省略其他代码
}
// SortedOps工具类
final class SortedOps {
// 暂时省略其他代码
// 构建排序操作的链节点
static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
Comparator<? super T> comparator) {
return new OfRef<>(upstream, comparator);
}
// 有状态的排序操作节点
private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
// 是否自然排序,不定义Comparator实例的时候为true,否则为false
private final boolean isNaturalSort;
// 用于排序的Comparator实例
private final Comparator<? super T> comparator;
// 自然排序情况下的构造方法,元素"形状"为引用类型,操作标志位不支持ORDERED和SORTED
OfRef(AbstractPipeline<?, T, ?> upstream) {
super(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
this.isNaturalSort = true;
// Comparator实例赋值为Comparator.naturalOrder(),本质是基于Object中的equals或者子类覆盖Object中的equals方法进行元素排序
@SuppressWarnings("unchecked")
Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
this.comparator = comp;
}
// 非自然排序情况下的构造方法,需要传入Comparator实例,元素"形状"为引用类型,操作标志位不支持ORDERED和SORTED
OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
super(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
this.isNaturalSort = false;
this.comparator = Objects.requireNonNull(comparator);
}
@Override
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
// If the input is already naturally sorted and this operation
// also naturally sorted then this is a no-op
// 流中的所有元素本身已经按照自然顺序排序,并且没有定义Comparator实例,则不需要进行排序,所以no op就行
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
// 知道要处理的元素的确切数量,使用数组进行排序
return new SizedRefSortingSink<>(sink, comparator);
else
// 不知道要处理的元素的确切数量,使用ArrayList进行排序
return new RefSortingSink<>(sink, comparator);
}
// 这里是并行执行流中toArray方法的实现,暂不分析
@Override
public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator,
IntFunction<T[]> generator) {
// If the input is already naturally sorted and this operation
// naturally sorts then collect the output
if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
return helper.evaluate(spliterator, false, generator);
}
else {
// @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
Arrays.parallelSort(flattenedData, comparator);
return Nodes.node(flattenedData);
}
}
}
// 这里考虑到篇幅太长,SizedRefSortingSink和RefSortingSink的源码不复杂,只展开RefSortingSink进行分析
// 无法确认待处理元素确切数量时候用于元素排序的Sink实现
private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
// 临时ArrayList实例
private ArrayList<T> list;
// 构造函数,需要的参数为下一个Sink引用和Comparator实例
RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
super(sink, comparator);
}
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
// 基于传入的size是否大于0,大于等于0用于作为initialCapacity构建ArrayList,小于0则构建默认initialCapacity的ArrayList,赋值到临时变量list
list = (size >= 0) ? new ArrayList<>((int) size) : new ArrayList<>();
}
@Override
public void end() {
// 临时的ArrayList实例基于Comparator实例进行潘旭
list.sort(comparator);
// 下一个Sink节点的激活,区分是否支持取消操作
downstream.begin(list.size());
if (!cancellationRequestedCalled) {
list.forEach(downstream::accept);
}
else {
for (T t : list) {
if (downstream.cancellationRequested()) break;
downstream.accept(t);
}
}
downstream.end();
// 激活下一个Sink完成后,临时的ArrayList实例置为NULL,便于GC回收
list = null;
}
@Override
public void accept(T t) {
// 当前Sink处理元素直接添加到临时的ArrayList实例
list.add(t);
}
}
// 暂时省略其他代码
}
sorted
操作有个比较显著的特点,一般的Sink
处理完自身的逻辑,会在accept()
方法激活下一个Sink
引用,但是它在accept()
方法中只做元素的累积(元素富集),在end()
方法进行最终的排序操作和模仿Spliterator
的两个元素遍历方法向downstream
推送待处理的元素。示意图如下:
其他中间操作的实现逻辑是大致相同的。
同步执行流终结操作的源码实现
限于篇幅,这里只能挑选一部分的Terminal Op
进行分析,简单起见只分析同步执行的场景,这里挑选最典型和最复杂的froEach()
和collect()
,还有比较独特的toArray()
方法。先看froEach()
方法的实现过程:
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
// 暂时省略其他代码
// 遍历元素
@Override
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
// 暂时省略其他代码
// 基于终结操作的求值方法
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
// 确保只会执行一次,linkedOrConsumed是流管道结构最后一个节点的属性
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
// 这里暂且只分析同步执行的流的终结操作,终结操作节点的标志会合并到流最后一个节点的combinedFlags中,执行的关键就是evaluateSequential方法
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
// 暂时省略其他代码
}
// ForEachOps类,TerminalOp接口的定义比较简单,这里不展开
final class ForEachOps {
// 暂时省略其他代码
// 构造变量元素的终结操作实例,传入的元素是T类型,结果是Void类型(返回NULL,或者说是没有返回值,毕竟是一个元素遍历过程)
// 参数为一个Consumer接口实例和一个标记是否顺序处理元素的布尔值
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
// 遍历元素操作的终结操作实现,同时它是一个适配器,适配TerminalSink(Sink)接口
abstract static class ForEachOp<T>
implements TerminalOp<T, Void>, TerminalSink<T, Void> {
// 标记是否顺序处理元素
private final boolean ordered;
protected ForEachOp(boolean ordered) {
this.ordered = ordered;
}
// TerminalOp
// 终结操作节点的标志集合,如果ordered为true则返回0,否则返回StreamOpFlag.NOT_ORDERED,表示不支持顺序处理元素
@Override
public int getOpFlags() {
return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
}
// 同步遍历和处理元素
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
// 以当前的ForEachOp实例作为最后一个Sink添加到Sink链(也就是前面经常说的元素引用链),然后对Sink链进行遍历
return helper.wrapAndCopyInto(this, spliterator).get();
}
// 并发遍历和处理元素,这里暂不分析
@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}
// TerminalSink
// 实现TerminalSink的方法,实际上TerminalSink继承接口Supplier,这里是实现了Supplier接口的get()方法,由于PipelineHelper.wrapAndCopyInto()方法会返回最后一个Sink的引用,这里其实就是evaluateSequential()中的返回值
@Override
public Void get() {
return null;
}
// ForEachOp的静态内部类,引用类型的ForEachOp的最终实现,依赖入参遍历元素处理的最后一步回调Consumer实例
static final class OfRef<T> extends ForEachOp<T> {
// 最后的遍历回调的Consumer句柄
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
@Override
public void accept(T t) {
// 遍历元素回调操作
consumer.accept(t);
}
}
}
}
forEach
终结操作实现上,自身这个操作并不会构成流的链式结构的一部分,也就是它不是一个AbstractPipeline
的子类实例,而是构建一个回调Consumer
实例操作的一个Sink
实例(准确来说是TerminalSink
)实例,这里暂且叫forEach terminal sink
,通过流最后一个操作节点的wrapSink()
方法,把forEach terminal sink
添加到Sink
链的尾部,通过流最后一个操作节点的copyInto()
方法进行元素遍历,按照copyInto()
方法的套路,只要多层包装的Sink
方法在回调其实现方法的时候总是激活downstream
的前提下,执行的顺序就是流链式结构定义的操作节点顺序,而forEach
最后添加的Consumer
实例一定就是最后回调的。
接着分析collect()
方法的实现,先看Collector
接口的定义:
// T:需要进行reduce操作的输入元素类型
// A:reduce操作中可变累加对象的类型,可以简单理解为累加操作中,累加到Container<A>中的可变对象类型
// R:reduce操作结果类型
public interface Collector<T, A, R> {
// 注释中称为Container,用于承载最终结果的可变容器,而此方法的Supplier实例持有的是创建Container实例的get()方法实现,后面称为Supplier
// 也就是一般使用如:Supplier<Container> supplier = () -> new Container();
Supplier<A> supplier();
// Accumulator,翻译为累加器,用于处理值并且把处理结果传递(累加)到Container中,后面称为Accumulator
BiConsumer<A, T> accumulator();
// Combiner,翻译为合并器,真实泛型类型为BinaryOperator<A,A,A>,BiFunction的子类,接收两个部分的结果并且合并为一个结果,后面称为Combiner
// 这个方法可以把一个参数的状态转移到另一个参数,然后返回更新状态后的参数,例如:(arg1, arg2) -> {arg2.state = arg1.state; return arg2;}
// 可以把一个参数的状态转移到另一个参数,然后返回一个新的容器,例如:(arg1, arg2) -> {arg2.state = arg1.state; return new Container(arg2);}
BinaryOperator<A> combiner();
// Finisher,直接翻译感觉意义不合理,实际上就是做最后一步转换工作的处理器,后面称为Finisher
Function<A, R> finisher();
// Collector支持的特性集合,见枚举Characteristics
Set<Characteristics> characteristics();
// 这里忽略两个Collector的静态工厂方法,因为并不常用
enum Characteristics {
// 标记Collector支持并发执行,一般和并发容器相关
CONCURRENT,
// 标记Collector处理元素时候无序
UNORDERED,
// 标记Collector的输入和输出元素是同类型,也就是Finisher在实现上R -> A可以等效于A -> R,unchecked cast会成功(也就是类型强转可以成功)
// 在这种场景下,对于Container来说其实类型强制转换也是等效的,也就是Supplier<A>和Supplier<R>得出的Container是同一种类型的Container
IDENTITY_FINISH
}
}
// Collector的实现Collectors.CollectorImpl
public final class Collectors {
// 这一大堆常量就是预设的多种特性组合,CH_NOID比较特殊,是空集合,也就是Collector三种特性都不支持
static final Set<Collector.Characteristics> CH_CONCURRENT_ID
= Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT,
Collector.Characteristics.UNORDERED,
Collector.Characteristics.IDENTITY_FINISH));
static final Set<Collector.Characteristics> CH_CONCURRENT_NOID
= Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT,
Collector.Characteristics.UNORDERED));
static final Set<Collector.Characteristics> CH_ID
= Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
static final Set<Collector.Characteristics> CH_UNORDERED_ID
= Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.UNORDERED,
Collector.Characteristics.IDENTITY_FINISH));
static final Set<Collector.Characteristics> CH_NOID = Collections.emptySet();
static final Set<Collector.Characteristics> CH_UNORDERED_NOID
= Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.UNORDERED));
private Collectors() { }
// 省略大量代码
// 静态类,Collector的实现,实现其实就是Supplier、Accumulator、Combiner、Finisher和Characteristics集合的成员属性承载
static class CollectorImpl<T, A, R> implements Collector<T, A, R> {
private final Supplier<A> supplier;
private final BiConsumer<A, T> accumulator;
private final BinaryOperator<A> combiner;
private final Function<A, R> finisher;
private final Set<Characteristics> characteristics;
CollectorImpl(Supplier<A> supplier,
BiConsumer<A, T> accumulator,
BinaryOperator<A> combiner,
Function<A,R> finisher,
Set<Characteristics> characteristics) {
this.supplier = supplier;
this.accumulator = accumulator;
this.combiner = combiner;
this.finisher = finisher;
this.characteristics = characteristics;
}
CollectorImpl(Supplier<A> supplier,
BiConsumer<A, T> accumulator,
BinaryOperator<A> combiner,
Set<Characteristics> characteristics) {
this(supplier, accumulator, combiner, castingIdentity(), characteristics);
}
@Override
public BiConsumer<A, T> accumulator() {
return accumulator;
}
@Override
public Supplier<A> supplier() {
return supplier;
}
@Override
public BinaryOperator<A> combiner() {
return combiner;
}
@Override
public Function<A, R> finisher() {
return finisher;
}
@Override
public Set<Characteristics> characteristics() {
return characteristics;
}
}
// 省略大量代码
// IDENTITY_FINISH特性下,Finisher的实现,也就是之前提到的A->R和R->A等效,可以强转
private static <I, R> Function<I, R> castingIdentity() {
return i -> (R) i;
}
// 省略大量代码
}
collect()
方法的求值执行入口在ReferencePipeline
中:
// ReferencePipeline
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
// 暂时省略其他代码
// 基于Collector实例进行求值
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
// 并发求值场景暂不考虑
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
// 这里就是同步执行场景下的求值过程,这里可以看出其实所有Collector的求值都是Reduce操作
container = evaluate(ReduceOps.makeRef(collector));
}
// 如果Collector的Finisher输入类型和输出类型相同,所以Supplier<A>和Supplier<R>得出的Container是同一种类型的Container,可以直接类型转换,否则就要调用Collector中的Finisher进行最后一步处理
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
// 暂时省略其他代码
}
// ReduceOps
final class ReduceOps {
private ReduceOps() { }
// 暂时省略其他代码
// 引用类型Reduce操作创建TerminalOp实例
public static <T, I> TerminalOp<T, I>
makeRef(Collector<? super T, I, ?> collector) {
// Supplier
Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
// Accumulator
BiConsumer<I, ? super T> accumulator = collector.accumulator();
// Combiner
BinaryOperator<I> combiner = collector.combiner();
// 这里注意一点,ReducingSink是方法makeRef中的内部类,作用域只在方法内,它是封装为TerminalOp最终转化为Sink链中最后一个Sink实例的类型
class ReducingSink extends Box<I>
implements AccumulatingSink<T, I, ReducingSink> {
@Override
public void begin(long size) {
// 这里把从Supplier创建的新Container实例存放在父类Box的状态属性中
state = supplier.get();
}
@Override
public void accept(T t) {
// 处理元素,Accumulator处理状态(容器实例)和元素,这里可以想象,如果state为一个ArrayList实例,这里的accept()实现可能为add(ele)操作
accumulator.accept(state, t);
}
@Override
public void combine(ReducingSink other) {
// Combiner合并两个状态(容器实例)
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
@Override
public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};
}
// 暂时省略其他代码
// 继承自接口TerminalSink,主要添加了combine()抽象方法,用于合并元素
private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
extends TerminalSink<T, R> {
void combine(K other);
}
// 状态盒,用于持有和获取状态,状态属性的修饰符为default,包内的类实例都能修改
private abstract static class Box<U> {
U state;
Box() {} // Avoid creation of special accessor
public U get() {
return state;
}
}
// ReduceOp的最终实现,这个就是Reduce操作终结操作的实现
private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
implements TerminalOp<T, R> {
// 流输入元素"形状"
private final StreamShape inputShape;
ReduceOp(StreamShape shape) {
inputShape = shape;
}
// 抽象方法,让子类生成终结操作的Sink
public abstract S makeSink();
// 获取流输入元素"形状"
@Override
public StreamShape inputShape() {
return inputShape;
}
// 同步执行求值,还是相似的思路,使用wrapAndCopyInto()进行Sink链构建和元素遍历
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
// 以当前的ReduceOp实例的makeSink()返回的Sink实例作为最后一个Sink添加到Sink链(也就是前面经常说的元素引用链),然后对Sink链进行遍历
// 这里向上一步一步推演思考,最终get()方法执行完毕拿到的结果就是ReducingSink父类Box中的state变量,也就是容器实例
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
// 异步执行求值,暂时忽略
@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
}
// 暂时省略其他代码
}
接着就看Collector
的静态工厂方法,看一些常用的Collector
实例是如何构建的,例如看Collectors.toList()
:
// Supplier => () -> new ArrayList<T>(); // 初始化ArrayList
// Accumulator => (list,number) -> list.add(number); // 往ArrayList中添加元素
// Combiner => (left, right) -> { left.addAll(right); return left;} // 合并ArrayList
// Finisher => X -> X; // 输入什么就返回什么,这里实际返回的是ArrayList
public static <T>
Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}
把过程画成流程图如下:
甚至可以更通俗地用伪代码表示Collector
这类Terminal Op
的执行过程(还是以Collectors.toList()
为例):
[begin]
Supplier supplier = () -> new ArrayList<T>();
Container container = supplier.get();
Box.state = container;
[accept]
Box.state.add(element);
[end]
return supplier.get(); (=> return Box.state);
↓↓↓↓↓↓↓↓↓甚至更加通俗的过程如下↓↓↓↓↓↓↓↓↓↓↓↓↓↓
ArrayList<T> container = new ArrayList<T>();
loop:
container.add(element)
return container;
也就是虽然工程化的代码看起来很复杂,最终的实现就是简单的:初始化ArrayList
实例由state
属性持有,遍历处理元素的时候把元素添加到state
中,最终返回state
。最后看toArray()
的方法实现(下面的方法代码没有按照实际的位置贴出,笔者把零散的代码块放在一起方便分析):
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
// 暂时省略其他代码
// 流的所有元素转换为数组,这里的IntFunction有一种比较特殊的用法,就是用于创建数组实例
// 例如IntFunction<String[]> f = String::new; String[] arry = f.apply(2); 相当于String[] arry = new String[2];
@Override
@SuppressWarnings("unchecked")
public final <A> A[] toArray(IntFunction<A[]> generator) {
// 这里主动擦除了IntFunction的类型,只要保证求值的过程是正确,最终可以做类型强转
@SuppressWarnings("rawtypes")
IntFunction rawGenerator = (IntFunction) generator;
// 委托到evaluateToArrayNode()方法进行计算
return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator)
.asArray(rawGenerator);
}
// 流的所有元素转换为Object数组
@Override
public final Object[] toArray() {
return toArray(Object[]::new);
}
// 流元素求值转换为ArrayNode
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
// 确保不会处理多次
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
// 并发执行暂时跳过
if (isParallel() && previousStage != null && opIsStateful()) {
depth = 0;
return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
}
else {
return evaluate(sourceSpliterator(0), true, generator);
}
}
// 最终的转换Node的方法
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
boolean flatten,
IntFunction<E_OUT[]> generator) {
// 并发执行暂时跳过
if (isParallel()) {
// @@@ Optimize if op of this pipeline stage is a stateful op
return evaluateToNode(this, spliterator, flatten, generator);
}
else {
// 兜兜转换还是回到了wrapAndCopyInto()方法,遍历Sink链,所以基本可以得知Node.Builder是Sink的一个实现
Node.Builder<E_OUT> nb = makeNodeBuilder(
exactOutputSizeIfKnown(spliterator), generator);
return wrapAndCopyInto(nb, spliterator).build();
}
}
// 获取Node的建造器实例
final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
return Nodes.builder(exactSizeIfKnown, generator);
}
// 暂时省略其他代码
}
// Node接口定义
interface Node<T> {
// 获取待处理的元素封装成的Spliterator实例
Spliterator<T> spliterator();
// 遍历当前Node实例中所有待处理的元素,回调到Consumer实例中
void forEach(Consumer<? super T> consumer);
// 获取当前Node实例的所有子Node的个数
default int getChildCount() {
return 0;
}
// 获取当前Node实例的子Node实例,入参i是子Node的索引
default Node<T> getChild(int i) {
throw new IndexOutOfBoundsException();
}
// 分割当前Node实例的一个部分,生成一个新的sub Node,类似于ArrayList中的subList方法
default Node<T> truncate(long from, long to, IntFunction<T[]> generator) {
if (from == 0 && to == count())
return this;
Spliterator<T> spliterator = spliterator();
long size = to - from;
Node.Builder<T> nodeBuilder = Nodes.builder(size, generator);
nodeBuilder.begin(size);
for (int i = 0; i < from && spliterator.tryAdvance(e -> { }); i++) { }
if (to == count()) {
spliterator.forEachRemaining(nodeBuilder);
} else {
for (int i = 0; i < size && spliterator.tryAdvance(nodeBuilder); i++) { }
}
nodeBuilder.end();
return nodeBuilder.build();
}
// 创建一个包含当前Node实例所有元素的元素数组视图
T[] asArray(IntFunction<T[]> generator);
//
void copyInto(T[] array, int offset);
// 返回Node实例基于Stream的元素"形状"
default StreamShape getShape() {
return StreamShape.REFERENCE;
}
// 获取当前Node实例包含的元素个数
long count();
// Node建造器,注意这个Node.Builder接口是继承自Sink,那么其子类实现都可以添加到Sink链中作为一个节点(终结节点)
interface Builder<T> extends Sink<T> {
// 创建Node实例
Node<T> build();
// 基于Integer元素类型的特化类型Node.Builder
interface OfInt extends Node.Builder<Integer>, Sink.OfInt {
@Override
Node.OfInt build();
}
// 基于Long元素类型的特化类型Node.Builder
interface OfLong extends Node.Builder<Long>, Sink.OfLong {
@Override
Node.OfLong build();
}
// 基于Double元素类型的特化类型Node.Builder
interface OfDouble extends Node.Builder<Double>, Sink.OfDouble {
@Override
Node.OfDouble build();
}
}
// 暂时省略其他代码
}
// 这里下面的方法来源于Nodes类
final class Nodes {
// 暂时省略其他代码
// Node扁平化处理,如果传入的Node实例存在子Node实例,则使用fork-join对Node进行分割和并发计算,结果添加到IntFunction生成的数组中,如果不存在子Node,直接返回传入的Node实例
// 关于并发计算部分暂时不分析
public static <T> Node<T> flatten(Node<T> node, IntFunction<T[]> generator) {
if (node.getChildCount() > 0) {
long size = node.count();
if (size >= MAX_ARRAY_SIZE)
throw new IllegalArgumentException(BAD_SIZE);
T[] array = generator.apply((int) size);
new ToArrayTask.OfRef<>(node, array, 0).invoke();
return node(array);
} else {
return node;
}
}
// 创建Node的建造器实例
static <T> Node.Builder<T> builder(long exactSizeIfKnown, IntFunction<T[]> generator) {
// 当知道待处理元素的准确数量并且小于允许创建的数组的最大长度MAX_ARRAY_SIZE(Integer.MAX_VALUE - 8),使用FixedNodeBuilder(固定长度数组Node建造器),否则使用SpinedNodeBuilder实例
return (exactSizeIfKnown >= 0 && exactSizeIfKnown < MAX_ARRAY_SIZE)
? new FixedNodeBuilder<>(exactSizeIfKnown, generator)
: builder();
}
// 创建Node的建造器实例,使用SpinedNodeBuilder的实例,此SpinedNode支持元素添加,但是不支持元素移除
static <T> Node.Builder<T> builder() {
return new SpinedNodeBuilder<>();
}
// 固定长度固定长度数组Node实现(也就是最终的Node实现是一个ArrayNode,最终的容器为一个T类型元素的数组T[])
private static final class FixedNodeBuilder<T>
extends ArrayNode<T>
implements Node.Builder<T> {
// 基于size(元素个数,或者说创建数组的长度)和数组创建方法IntFunction构建FixedNodeBuilder实例
FixedNodeBuilder(long size, IntFunction<T[]> generator) {
super(size, generator);
assert size < MAX_ARRAY_SIZE;
}
// 返回当前FixedNodeBuilder实例,判断数组元素计数值curSize必须大于等于实际数组容器中元素的个数
@Override
public Node<T> build() {
if (curSize < array.length)
throw new IllegalStateException(String.format("Current size %d is less than fixed size %d",
curSize, array.length));
return this;
}
// Sink的begin方法回调,传入的size必须和数组长度相等,因为后面的accept()方法会执行size此
@Override
public void begin(long size) {
if (size != array.length)
throw new IllegalStateException(String.format("Begin size %d is not equal to fixed size %d",
size, array.length));
// 重置数组元素计数值为0
curSize = 0;
}
// Sink的accept方法回调,当数组元素计数值小于数组长度,直接向数组下标curSize++添加传入的元素
@Override
public void accept(T t) {
if (curSize < array.length) {
array[curSize++] = t;
} else {
throw new IllegalStateException(String.format("Accept exceeded fixed size of %d",
array.length));
}
}
// Sink的end方法回调,再次判断数组元素计数值curSize必须大于等于实际数组容器中元素的个数
@Override
public void end() {
if (curSize < array.length)
throw new IllegalStateException(String.format("End size %d is less than fixed size %d",
curSize, array.length));
}
// 返回FixedNodeBuilder当前信息,当前处理的下标和当前数组中所有的元素
@Override
public String toString() {
return String.format("FixedNodeBuilder[%d][%s]",
array.length - curSize, Arrays.toString(array));
}
}
// Node实现,容器为一个固定长度的数组
private static class ArrayNode<T> implements Node<T> {
// 数组容器
final T[] array;
// 数组容器中当前元素的个数,这个值是一个固定值,或者在FixedNodeBuilder的accept()方法回调中递增
int curSize;
// 基于size和数组创建的工厂IntFunction构建ArrayNode实例
@SuppressWarnings("unchecked")
ArrayNode(long size, IntFunction<T[]> generator) {
if (size >= MAX_ARRAY_SIZE)
throw new IllegalArgumentException(BAD_SIZE);
// 创建szie长度的数组容器
this.array = generator.apply((int) size);
this.curSize = 0;
}
// 这个方法是基于一个现成的数组创建ArrayNode实例,直接改变数组的引用为array,元素个数curSize置为输入参数长度
ArrayNode(T[] array) {
this.array = array;
this.curSize = array.length;
}
// Node - 接下来是Node接口的实现
// 基于数组实例,起始索引0和结束索引curSize构造一个全新的Spliterator实例
@Override
public Spliterator<T> spliterator() {
return Arrays.spliterator(array, 0, curSize);
}
// 拷贝array中的元素到外部传入的dest数组中
@Override
public void copyInto(T[] dest, int destOffset) {
System.arraycopy(array, 0, dest, destOffset, curSize);
}
// 返回元素数组视图,这里直接返回array引用
@Override
public T[] asArray(IntFunction<T[]> generator) {
if (array.length == curSize) {
return array;
} else {
throw new IllegalStateException();
}
}
// 获取array中的元素个数
@Override
public long count() {
return curSize;
}
// 遍历array,每个元素回调Consumer实例
@Override
public void forEach(Consumer<? super T> consumer) {
for (int i = 0; i < curSize; i++) {
consumer.accept(array[i]);
}
}
// 返回ArrayNode当前信息,当前处理的下标和当前数组中所有的元素
@Override
public String toString() {
return String.format("ArrayNode[%d][%s]",
array.length - curSize, Arrays.toString(array));
}
}
// 暂时省略其他代码
}
很多集合容器的Spliterator
其实并不支持SIZED
特性,其实Node
的最终实现很多情况下都是Nodes.SpinedNodeBuilder
,因为SpinedNodeBuilder
重实现实现了数组扩容和Spliterator
基于数组进行分割的方法,源码相对复杂(特别是spliterator()
方法),这里挑部分进行分析,由于SpinedNodeBuilder
绝大部分方法都是使用父类SpinedBuffer
中的实现,这里可以直接分析SpinedBuffer
:
// SpinedBuffer的当前数组在超过了元素数量阈值之后,会拆分为多个数组块,存储到spine中,而curChunk引用指向的是当前处理的数组块
class SpinedBuffer<E>
extends AbstractSpinedBuffer
implements Consumer<E>, Iterable<E> {
// 暂时省略其他代码
// 当前的数组块
protected E[] curChunk;
// 所有数组块
protected E[][] spine;
// 构造函数,指定初始化容量
SpinedBuffer(int initialCapacity) {
super(initialCapacity);
curChunk = (E[]) new Object[1 << initialChunkPower];
}
// 构造函数,指定默认初始化容量
@SuppressWarnings("unchecked")
SpinedBuffer() {
super();
curChunk = (E[]) new Object[1 << initialChunkPower];
}
// 拷贝当前SpinedBuffer中的数组元素到传入的数组实例
public void copyInto(E[] array, int offset) {
// 计算最终的offset,区分单个chunk和多个chunk的情况
long finalOffset = offset + count();
if (finalOffset > array.length || finalOffset < offset) {
throw new IndexOutOfBoundsException("does not fit");
}
// 单个chunk的情况,由curChunk最接拷贝
if (spineIndex == 0)
System.arraycopy(curChunk, 0, array, offset, elementIndex);
else {
// 多个chunk的情况,由遍历spine并且对每个chunk进行拷贝
// full chunks
for (int i=0; i < spineIndex; i++) {
System.arraycopy(spine[i], 0, array, offset, spine[i].length);
offset += spine[i].length;
}
if (elementIndex > 0)
System.arraycopy(curChunk, 0, array, offset, elementIndex);
}
}
// 返回数组元素视图,基于IntFunction构建数组实例,使用copyInto()方法进行元素拷贝
public E[] asArray(IntFunction<E[]> arrayFactory) {
long size = count();
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
E[] result = arrayFactory.apply((int) size);
copyInto(result, 0);
return result;
}
// 清空SpinedBuffer,清空分块元素和所有引用
@Override
public void clear() {
if (spine != null) {
curChunk = spine[0];
for (int i=0; i<curChunk.length; i++)
curChunk[i] = null;
spine = null;
priorElementCount = null;
}
else {
for (int i=0; i<elementIndex; i++)
curChunk[i] = null;
}
elementIndex = 0;
spineIndex = 0;
}
// 遍历元素回调Consumer,分别遍历spine和curChunk
@Override
public void forEach(Consumer<? super E> consumer) {
// completed chunks, if any
for (int j = 0; j < spineIndex; j++)
for (E t : spine[j])
consumer.accept(t);
// current chunk
for (int i=0; i<elementIndex; i++)
consumer.accept(curChunk[i]);
}
// Consumer的accept实现,最终会作为Sink接口的accept方法调用
@Override
public void accept(E e) {
// 如果当前分块(第一个)的元素已经满了,就初始化spine,然后元素添加到spine[0]中
if (elementIndex == curChunk.length) {
inflateSpine();
// 然后元素添加到spine[0]中的元素已经满了,就新增spine[n],把元素放进spine[n]中
if (spineIndex+1 >= spine.length || spine[spineIndex+1] == null)
increaseCapacity();
elementIndex = 0;
++spineIndex;
// 当前的chunk更新为最新的chunk,就是spine中的最新一个chunk
curChunk = spine[spineIndex];
}
// 当前的curChunk添加元素
curChunk[elementIndex++] = e;
}
// 暂时省略其他代码
}
源码已经基本分析完毕,下面还是用一个例子转化为流程图:
流并发执行的源码实现
如果流实例调用了parallel()
,注释中提到会返回一个异步执行流的变体,实际上并没有构造变体,只是把sourceStage.parallel
标记为true
,异步求值的基本过程是:构建流管道结构的时候和同步求值的过程一致,构建完Sink
链之后,Spliterator
会使用特定算法基于trySplit()
进行自分割,自分割算法由具体的子类决定,例如ArrayList
采用的就是二分法,分割完成后每个Spliterator
持有所有元素中的一小部分,然后把每个Spliterator
作为sourceSpliterator
在fork-join
线程池中执行Sink
链,得到多个部分的结果在当前调用线程中聚合,得到最终结果。这里用到的技巧就是:线程封闭和fork-join
。因为不同Terminal Op
的并发求值过程大同小异,这里只分析forEach
并发执行的实现。首先展示一个使用fork-join
线程池的简单例子:
public class MapReduceApp {
public static void main(String[] args) {
// 数组中每个元素*2,再求和
Integer result = new MapReducer<>(new Integer[]{1, 2, 3, 4}, x -> x * 2, Integer::sum).invoke();
System.out.println(result);
}
interface Mapper<S, T> {
T apply(S source);
}
interface Reducer<S, T> {
T apply(S first, S second);
}
public static class MapReducer<T> extends CountedCompleter<T> {
final T[] array;
final Mapper<T, T> mapper;
final Reducer<T, T> reducer;
final int lo, hi;
MapReducer<T> sibling;
T result;
public MapReducer(T[] array,
Mapper<T, T> mapper,
Reducer<T, T> reducer) {
this.array = array;
this.mapper = mapper;
this.reducer = reducer;
this.lo = 0;
this.hi = array.length;
}
public MapReducer(CountedCompleter<?> p,
T[] array,
Mapper<T, T> mapper,
Reducer<T, T> reducer,
int lo,
int hi) {
super(p);
this.array = array;
this.mapper = mapper;
this.reducer = reducer;
this.lo = lo;
this.hi = hi;
}
@Override
public void compute() {
if (hi - lo >= 2) {
int mid = (lo + hi) >> 1;
MapReducer<T> left = new MapReducer<>(this, array, mapper, reducer, lo, mid);
MapReducer<T> right = new MapReducer<>(this, array, mapper, reducer, mid, hi);
left.sibling = right;
right.sibling = left;
// 创建子任务父任务的pending计数器加1
setPendingCount(1);
// 提交右子任务
right.fork();
// 在当前线程计算左子任务
left.compute();
} else {
if (hi > lo) {
result = mapper.apply(array[lo]);
}
// 叶子节点完成,尝试合并其他兄弟节点的结果,会调用onCompletion方法
tryComplete();
}
}
@Override
public T getRawResult() {
return result;
}
@SuppressWarnings("unchecked")
@Override
public void onCompletion(CountedCompleter<?> caller) {
if (caller != this) {
MapReducer<T> child = (MapReducer<T>) caller;
MapReducer<T> sib = child.sibling;
// 合并子任务结果,只有两个子任务
if (Objects.isNull(sib) || Objects.isNull(sib.result)) {
result = child.result;
} else {
result = reducer.apply(child.result, sib.result);
}
}
}
}
}
这里简单使用了fork-join
编写了一个简易的MapReduce
应用,main
方法中运行的是数组[1,2,3,4]
中的所有元素先映射为i -> i * 2
,再进行reduce
(求和)的过程,代码中也是简单使用二分法对原始的array
进行分割,当最终的任务只包含一个元素,也就是lo < hi
且hi - lo == 1
的时候,会基于单个元素调用Mapper
的方法进行完成通知tryComplete()
,任务完成会最终通知onCompletion()
方法,Reducer
就是在此方法中进行结果的聚合操作。对于流的并发求值来说,过程是类似的,ForEachOp
中最终调用ForEachOrderedTask
或者ForEachTask
,这里挑选ForEachTask
进行分析:
abstract static class ForEachOp<T>
implements TerminalOp<T, Void>, TerminalSink<T, Void> {
// 暂时省略其他代码
@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
// 最终是调用ForEachTask的invoke方法,invoke会阻塞到所有fork任务执行完,获取最终的结果
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}
// 暂时省略其他代码
}
// ForEachOps类
final class ForEachOps {
private ForEachOps() { }
// forEach的fork-join任务实现,没有覆盖getRawResult()方法,最终只会返回NULL
static final class ForEachTask<S, T> extends CountedCompleter<Void> {
// Spliterator实例,如果是父任务则代表所有待处理的元素,如果是子任务则是一个分割后的新Spliterator实例
private Spliterator<S> spliterator;
// Sink链实例
private final Sink<S> sink;
// 流管道引用
private final PipelineHelper<T> helper;
// 目标数量,其实是每个任务处理元素数量的建议值
private long targetSize;
// 这个构造器是提供给父(根)任务
ForEachTask(PipelineHelper<T> helper,
Spliterator<S> spliterator,
Sink<S> sink) {
super(null);
this.sink = sink;
this.helper = helper;
this.spliterator = spliterator;
this.targetSize = 0L;
}
// 这个构造器是提供给子任务,所以需要父任务的引用和一个分割后的新Spliterator实例作为参数
ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
super(parent);
this.spliterator = spliterator;
this.sink = parent.sink;
this.targetSize = parent.targetSize;
this.helper = parent.helper;
}
// Similar to AbstractTask but doesn't need to track child tasks
// 实现compute方法,用于分割Spliterator成多个子任务,这里不需要跟踪所有子任务
public void compute() {
// 神奇的赋值,相当于Spliterator<S> rightSplit = spliterator; Spliterator<S> leftSplit;
// rightSplit总是指向当前的spliterator实例
Spliterator<S> rightSplit = spliterator, leftSplit;
// 这里也是神奇的赋值,相当于long sizeEstimate = rightSplit.estimateSize(); long sizeThreshold;
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
// sizeThreshold赋值为targetSize
if ((sizeThreshold = targetSize) == 0L)
// 基于Spliterator分割后的右分支实例的元素数量重新赋值sizeThreshold和targetSize
// 计算方式是待处理元素数量/(fork-join线程池并行度<<2)或者1(当前一个计算方式结果为0的时候)
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
// 当前的流是否支持SHORT_CIRCUIT,也就是短路特性
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
// 当前的任务是否fork右分支
boolean forkRight = false;
// taskSink作为Sink的临时变量
Sink<S> taskSink = sink;
// 当前任务的临时变量
ForEachTask<S, T> task = this;
// Spliterator分割和创建新的fork任务ForEachTask,前提是不支持短路或者Sink不支持取消
while (!isShortCircuit || !taskSink.cancellationRequested()) {
// 当前的任务中的Spliterator(rightSplit)中的待处理元素小于等于每个任务应该处理的元素阈值或者再分割后得到NULL,则不需要再分割,直接基于rightSplit和Sink链执行循环处理元素
if (sizeEstimate <= sizeThreshold || (leftSplit = rightSplit.trySplit()) == null) {
// 这里就是遍历rightSplit元素回调Sink链的操作
task.helper.copyInto(taskSink, rightSplit);
break;
}
// rightSplit还能分割,则基于分割后的leftSplit和以当前任务作为父任务创建一个新的fork任务
ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
// 待处理子任务加1
task.addToPendingCount(1);
// 需要fork的任务实例临时变量
ForEachTask<S, T> taskToFork;
// 因为rightSplit总是分割Spliterator后对应原来的Spliterator引用,而leftSplit总是trySplit()后生成的新的Spliterator
// 所以这里leftSplit也需要作为rightSplit进行分割,通俗来说就是周星驰007那把梅花间足发射的枪
if (forkRight) {
// 这里交换leftSplit为rightSplit,所以forkRight设置为false,下一轮循环相当于fork left
forkRight = false;
rightSplit = leftSplit;
taskToFork = task;
// 赋值下一轮的父Task为当前的fork task
task = leftTask;
}
else {
forkRight = true;
taskToFork = leftTask;
}
// 添加fork任务到任务队列中
taskToFork.fork();
// 其实这里是更新剩余待分割的Spliterator中的所有元素数量到sizeEstimate
sizeEstimate = rightSplit.estimateSize();
}
// 置空spliterator实例并且传播任务完成状态,等待所有任务执行完成
task.spliterator = null;
task.propagateCompletion();
}
}
}
上面的源码分析看起来可能比较难理解,这里举个简单的例子:
public static void main(String[] args) throws Exception {
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
list.add(4);
list.stream().parallel().forEach(System.out::println);
}
这段代码中最终转换成ForEachTask
中评估后得到的targetSize = sizeThreshold == 1
,当前调用线程会参与计算,会执行3
次fork
,也就是一共有4
个处理流程实例(也就是原始的Spliterator
实例最终会分割出3
个全新的Spliterator
实例,加上自身一个4
个Spliterator
实例),每个处理流程实例只处理1
个元素,对应的流程图如下:
最终的计算结果是调用CountedCompleter.invoke()
方法获取的,此方法会阻塞直到所有子任务处理完成,当然forEach
终结操作不需要返回值,所以没有实现getRawResult()
方法,这里只是为了阻塞到所有任务执行完毕才解除调用线程的阻塞状态。
状态操作与短路操作
Stream
中按照中间操作是否有状态可以把这些操作分为无状态操作和有状态操作。Stream
中按照终结操作是否支持短路特性可以把这些操作分为非短路操作和短路操作。理解如下:
- 无状态操作:当前操作节点处理元素完成后,在满足前提条件下直接把结果传递到下一个操作节点,也就是操作内部不存在状态也不需要保存状态,例如
filter
、map
等操作 - 有状态操作:处理元素的时候,依赖于节点的内部状态对元素进行累积,当处理一个新的元素的时候,其实可以感知到所有处理过的元素的历史状态,这个"状态"其实更像是缓冲区的概念,例如
sort
、limit
等操作,以sort
操作为例,一般是把所有待处理的元素全部添加到一个容器如ArrayList
,再进行所有元素的排序,然后再重新模拟Spliterator
把元素推送到后一个节点 - 非短路(终结)操作:终结操作在处理元素时候不能基于短路条件提前中断处理并且返回,也就是必须处理所有的元素,如
forEach
- 短路(终结)操作:终结操作在处理元素时候允许基于短路条件提前中断处理并且返回,但是最终实现中是有可能遍历完所有的元素中,只是在处理方法中基于前置的短路条件跳过了实际的处理过程,如
anyMatch
(实际上anyMatch
会遍历完所有的元素,不过在命中了短路条件下,元素回调Sink.accept()
方法时候会基于stop
短路标记跳过具体的处理流程)
这里不展开源码进行分析,仅仅展示一个经常见到的Stream
操作汇总表如下:
这里还有两点要注意:
- 从源码上看部分中间操作也是支持短路的,例如
slice
和while
相关操作 - 从源码上看
find
相关终结操作中findFirst
、findAny
均支持和判断StreamOpFlag.SHORT_CIRCUIT
,而match
相关终结操作是通过内部的临时状态stop
和value
进行短路控制
总结
前前后后写了十多万字,其实也仅仅相对浅层次介绍了Stream
的基本实现,笔者认为很多没分析到的中间操作实现和终结操作实现,特别是并发执行的终结操作实现是十分复杂的,多线程环境下需要进行一些想象和多处DEBUG
定位执行位置和推演执行的过程。简单总结一下:
JDK
中Stream
的实现是精炼的高度工程化代码Stream
的载体虽然是AbstractPipeline
,管道结构,但是只用其形,实际求值操作之前会转化为一个多层包裹的Sink
结构,也就是前文一直说的Sink
链,从编程模式来看,应用的是Reactor
编程模式Stream
目前支持的固有求值执行结构一定是Head(Source Spliterator) -> Op -> Op ... -> Terminal Op
的形式,这算是一个局限性,没有办法做到像LINQ
那样可以灵活实现类似内存视图的功能Stream
目前支持并发求值方案是针对Source Spliterator
进行分割,封装Terminal Op
和固定Sink
链构造的ForkJoinTask
进行并发计算,调用线程和fork-join
线程池中的工作线程都可以参与求值过程,笔者认为这部分是Stream
中除了那些标志集合位运算外最复杂的实现Stream
实现的功能是一个突破,也有人说过此功能是一个"早产儿”,在此希望JDK
能够在矛盾螺旋中前进和发展