The high level difference between Collection and Stream is in the very basic purpose of both the APIs.
On the other hand Stream does not store any data. So conceptually Stream can not be considered as data structure. Stream is basically introduced to process a sequence of data. So conceptually Stream is a pipeline of operations which computes data lazily when user demands it. Please remember this statement as of now as going ahead we will explore this statement when we are going to talk about operations on the Stream. Sources of data can be any Array, Collections or any kind of I/O channels. Java 8 provides several options to create Streams from different sources along with introducing several methods to create stream and parallel stream in Collection interface. We are going to explore them shortly.
There is difference between how we implement any data processing logic using Stream and using Collection. When we are implementing using Collection implementation we describe how to perform the logic. But when we implement the same logic using Stream we define what to perform. Approach is same as to what we see in SQL. Rather than explaining theoretically, lets see using one use case.
Let's consider we have a list of employees of different age and different salary from different departments. Now HR department of the organisation wants to know average salary for employees of "IT" department having age group between 20 to 30 years.
First let's implement the logic using Collection.
List<Employee> employees = ...;
double totalSalary = 0;
int countOfEmployee = 0;
for(Employee employee: employees) {
if("IT".equalsIgnoreCase(employee.getDepartment()) &&
employee.getAge() >= 20 && employee.getAge() <= 30) {
countOfEmployee++;
totalSalary += employee.getSalary();
}
}
double averageSalary = 0;
if(countOfEmployee > 0) {
averageSalary = totalSalary/countOfEmployee;
}
Now if we take a closer look at the above code, it defines how to perform each steps to compute the logic. If we implement the same logic using Stream, let's see how that looks.
OptionalDouble average = employees.stream()
.filter(emp -> "IT".equalsIgnoreCase(emp.getDepartment())
&& emp.getAge() >= 20
&& emp.getAge() <= 30)
.mapToDouble(emp -> emp.getSalary())
.average();
Now above code defines what to perform to calculate the average. It declaratively defines steps to filter, then map to stream of salary and then perform average. This code is more readable and understandable.
Working of Stream
Stream pipeline is composed of a source of data, some intermediate operations and one terminal operation. So we are constantly saying that Stream is a pipeline of operations to be performed on a sequence of data. Stream does not hold any data and all the elements are lazily computed when user demands. What exactly does that mean?
Source of data
If we decompose the three steps of the Stream, first one is source of data. Source of data is defined as abstraction of Spliterator. Now what is this Spliterator? We are already familiar with different iterators present in Java language. Main purpose of the iterator pattern is to traverse elements. But
Spliterator is used not only to traverse elements from any source of data but it also provides a mechanism of partitioning data which helps to form the basis of parallel execution using parallel stream.
Spliterator actually behaves same as Iterator though it does not implement Iterator. Iterator works on the basis of two methods hasNext() and next(). We will go deeper into Iterator. But Iterator adds bit of data access overhead due to its stateful nature and duplicative code.
But with the help of lambda Spliterator implement better and efficient approach which makes it a better iterator. If we take a look at the Spliterator interface, Java has introduced two new methods to access elements and one method to split the data source into a new Spliterator.
Methods for accessing elements:
boolean tryAdvance(Consumer<? super T> action);
default void forEachRemaining(Consumer<? super T> action) {
do { } while (tryAdvance(action));
}
As per documentation tryAdvance method is to
If a remaining element exists, performs the given action on it, returning true; else returns false. If this Spliterator is ORDERED the action is performed on the next element in encounter order. Exceptions thrown by the action are relayed to the caller.
and forEachRemaining method is to iterate elements sequentially.
Performs the given action for each remaining element, sequentially in the current thread, until all elements have been processed or the action throws an exception. If this Spliterator is ORDERED, actions are performed in encounter order. Exceptions thrown by the action are relayed to the caller.
Each of the method accepts a Consumer and tryAdvance method performs the action on the existing elements if there is any and return true otherwise return false. forEachRemaining sequentially access all elements using tryAdvance method.
Also to support parallelism using multiple thread, Spliterator provides
Spliterator<T> trySplit();
which documentation describes as
If this spliterator can be partitioned, returns a Spliterator covering elements, that will, upon return from this method, not be covered by this Spliterator.
If this Spliterator is ORDERED, the returned Spliterator must cover a strict prefix of the elements.
Unless this Spliterator covers an infinite number of elements, repeated calls to trySplit() must eventually return null. Upon non-null return:
- the value reported for
estimateSize() before splitting, must, after splitting, be greater than or equal to estimateSize() for this and the returned Spliterator; and - if this Spliterator is
SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
This method may return null for any reason, including emptiness, inability to split after traversal has commenced, data structure constraints, and efficiency considerations.
In short as mentioned trySplit method slice the data into new Spliterator, so that different threads can process them separately.
In fact Java 8 has introduced spliterator() in Collection interface to support Spliterator. Also Java has provided different implementation of Spliterator for different collections and data source like ArraySpliterator, ArrayListSpliterator, KeySpliterator, EntrySpliterator etc. to support different data sources. Also it is possible to provide custom implementation of the Spliterator in case of any custom data source.
Different types of Operations
Stream operations are majorly of two types, Intermediate Operations (map, filter, flatmap, distinct, sort etc.) and Terminal Operations (forEach, reduce, collect, min, max etc.).
All the intermediate operations returns a new stream only. There by stream forms a pipeline of intermediate operations. On the other hand terminal operations return non stream values and compute the end result by invoking each intermediate operations lazily on the sequence of data. But how does Stream do that.
Technically Stream constructs pipeline of operations using a linked list structure of the intermediate operations. Default implementation of the pipeline is provided by AbstractPipeline and generic implementation of the same is ReferencePipeline. Also there are more specialized implementation of pipeline such as DoublePipeline, IntPipeline and LongPipeline to provide customized operations for the corresponding data type.
Each stages of the pipeline is assigned a bitmap of flag to describe the state of the elements at that particular step. StreamOpFlag enum defines flags as
| |
|---|
SIZED | The size of the stream is known. |
DISTINCT | The elements of the stream are distinct. |
SORTED | The elements of the stream are sorted in the natural order. |
ORDERED | Elements are in meaningful encounter order. |
When we chain any intermediate operation, pipeline defines the bitmap for that particular operation to define the characteristics of the elements. Similarly bitmap for source stage is defined using fromCharacteristics method of StreamOpFlag which takes Spliterator implementation as parameter and derives flag from characteristics method of corresponding Spliterator implementation. As per documentation of
int characteristics()
Returns a set of characteristics of this Spliterator and its elements. The result is represented as ORed values from ORDERED, DISTINCT, SORTED, SIZED, NONNULL, IMMUTABLE, CONCURRENT, SUBSIZED. Repeated calls to characteristics() on a given spliterator, prior to or in-between calls to trySplit, should always return the same result.If a Spliterator reports an inconsistent set of characteristics (either those returned from a single invocation or across multiple invocations), no guarantees can be made about any computation using this Spliterator.
Now as mentioned earlier each of the data source provides their own Spliterator implementation. For example if we take a look at the Spliterator implementation of TreeMap
public int characteristics() {
return (side == 0 ? Spliterator.SIZED : 0) |
Spliterator.DISTINCT | Spliterator.SORTED | Spliterator.ORDERED;
}
It characterizes the elements of TreeMap appropriately.
Each intermediate operation has a known and predefined effect on the flag. Accordingly pipeline set, modify or clear any flag for each of the intermediate operations. Using linked list approach pipeline derives combined flag using current flag set by operation and the combined flags of the previous stage.
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
For example, the filter() operation preserves the SORTED and DISTINCT flags but clears the SIZED flag; the map() operation clears the SORTED and DISTINCT flags but preserves the SIZED flag; and the sorted() operation preserves the SIZED and DISTINCT flags and injects the SORTED flag.
Depending upon the combined flag of the particular stage, Stream cleverly skip a stage if not require.
Let's see an example
We have a TreeSet of type String holding name of cities and we have implemented the below logic
cities.stream()
.filter(city -> city.length() > 4)
.sorted()
.forEach(city -> System.out.println(city));
So we have pipelined filter and sorted intermediate operation and added forEach as terminal operation.
As the source of data is TreeSet, data is already sorted in natural order. So source stage adds SORTED flag. Now next intermediate operation filter preserves the SORTED flag as it does not change the sorting of the data. As next stage sorted sees that data is already sorted in natural order, it will be a no operation. Hence Stream will cleverly skip this stage.
Generally there are two kinds of intermediate operations: Stateless operations and Stateful operations. They are generally defined as StatelessOp and StatefulOp class. We will explore them later.
Execution of Terminal Operation
As we already mentioned several times that all intermediate operations are lazily executed only once terminal operation is called. So as soon as any terminal operation is triggered, Stream creates an execution plan of the entire operation pipeline.
Every terminal operation implementation starts with evaluate method
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
First thing this method does is that it checks whether same Stream has been linked or consumed already and then decide whether it is parallel or sequential Stream. Then it creates the execution strategy of the intermediate operations depending upon the type of intermediate operations present in the pipeline, i.e., whether operation is Stateless or Stateful operation. What does exactly these Stateless and Stateful operation imply?
Any operation which does not need to know anything about the other elements of the Stream is Stateless operation as it does not impact the state of any other data except the one on which operation is performed. On the other hand any operation which needs to know all elements of the Stream to perform the action is Stateful operation. Let's explore both of them with one of such operation.
If we talk about filter operation, it just checks a particular predicate on the current data and decide whether to keep that data or not. So to do so it does not need to know anything about other elements. Hence filter is a Stateless operation. Similarly unordered, map, flatmap, peek etc are Stateless operations.
Now if we see sorted or distinct operation, both of them need to know all the elements to decide sorting order or to decide distinct elements. Hence these kind of operations are Stateful operation.
So obvious question is how Stateless and Stateful operation is important here? If the pipeline consists of all stateless operations, it can be computed in a single pass. But if there one or more Stateful operation(s), the pipeline is divided into sections and is computed in multiple passes.
There are two types of terminal operations: short-circuiting and non short-circuiting. In case of short-circuiting operations data must be processed one at a time and action is stopped as soon as the desired result is found. On the other hand non short-circuiting operations can be processed in bulk. Example of short-circuiting operation is anyMatch(), noneMatch(), findFirst() etc and example of non short-circuiting operation is reduce(), forEach(), collect() etc. Another interesting fact is that short-circuiting operation used tryAdvance in Spliterator while non short-circuiting operation uses forEachRemaining in Spliterator.
Execution of Stateless operations
For any sequential execution of Stateless operations Stream creates a consumer for each stage and consume data one by one, apply action on the data and then send data to the next stage depending upon the type of the operation. One point to notice here is that each stage consumer knows about the next stage. Now let's take an example of the below implementation
employees.stream()
.filter(emp -> "IT".equalsIgnoreCase(emp.getDepartment())
.mapToDouble(emp -> emp.getSalary())
.average();
Here consumer of the filter stage receives a data and apply predicate on that data to decide whether it should send the data to next stage mapToDouble or not. Now if data does not satisfy the predicate of filter stage, consumer does not send the data to mapToDouble stage. On the other hand consumer of mapToDouble stage simply receives a data, apply the Function to map the data to the desired form and send to next stage if there is any. Now this process keeps on going till the time it sees end of the data stream or encounter any Stateful operations. For flow of Stateful operation we will see soon.
That means filter or map stage in above example is not going to be executed on all the elements first and move to next operation in pipeline, rather each stage like filter and map is going to be executed on each data one by one. To visualize this let's add some print statements for each of the stateless intermediate operations.
List<Employee> employees = Arrays.asList(
new Employee("Rahul", "IT", 40000, 23),
new Employee("John", "IT", 50000, 24),
new Employee("Sandy", "Finance", 45000, 23),
new Employee("Ron", "IT", 55000, 25),
new Employee("Arin", "HR", 40000, 24));
OptionalDouble average = employees.stream()
.filter(emp -> {
System.out.println("Filtering record of employee name " + emp.getName());
return "IT".equalsIgnoreCase(emp.getDepartment());
}).mapToDouble(emp -> {
System.out.println("Mapping salary of " + emp.getName());
return emp.getSalary();
}).average();
System.out.println("Average salary for IT employees "+ average.getAsDouble());
If we execute the above code, output will be
Filtering record of employee name Rahul
Mapping salary of Rahul
Filtering record of employee name John
Mapping salary of John
Filtering record of employee name Sandy
Filtering record of employee name Ron
Mapping salary of Ron
Filtering record of employee name Arin
Average salary for IT employees 48333.333333333336
Execution of Stateful operation
Major difference between Stateless and Stateful operation in terms of execution is that Stateless operations can be executed on each element one at a time without any knowledge about other elements, but Stateful operation has to be performed on the entire data set. That is why while creating execution plan, if there is any Stateful operation Stream divides the pipeline into multiple sections depending upon the number of Stateful operations.
As explained earlier consumer of each Stateless operation keeps on processing each data and pass to next stage. But in case of Stateful operation consumer of the stage buffers all the data till it reaches end of the data and then process them and send to next stage.
To visualize the same concept, let's modify the above example code and introduce a fictitious step to sort the employee records.
List<Employee> employees = Arrays.asList(
new Employee("Rahul", "IT", 40000, 23),
new Employee("John", "IT", 50000, 24),
new Employee("Sandy", "Finance", 45000, 23),
new Employee("Ron", "IT", 55000, 25),
new Employee("Arin", "HR", 40000, 24));
OptionalDouble average = employees.stream()
.filter(emp -> {
System.out.println("Filtering record of employee name " + emp.getName());
return "IT".equalsIgnoreCase(emp.getDepartment());
}).sorted()
.mapToDouble(emp -> {
System.out.println("Mapping salary of " + emp.getName());
return emp.getSalary();
}).average();
System.out.println("Average salary for IT employees "+ average.getAsDouble());
Now output of the above implementation will be
Filtering record of employee name Rahul
Filtering record of employee name John
Filtering record of employee name Sandy
Filtering record of employee name Ron
Filtering record of employee name Arin
Mapping salary of Rahul
Mapping salary of John
Mapping salary of Ron
Average salary for IT employees 48333.333333333336
As you can see now, the pipeline got divided into two sections as we have one Stateful operation sorted. So all Stateless operations before sorted executed sequentially and same for all Stateless operations after sorted. That is why filter action executed for all elements one by one and then sorted and then mapping of each element executed.
Parallel Stream
Parallel Stream works exactly same as Sequential Stream. Difference is that execution happens in different threads and each thread gets it's own copy of consumer and data set. Once the entire pipeline is executed on each set on it's corresponding thread, end results are merged together to produce final result. Parallel uses trySplit method of Spliterator to try splitting the collection in chunks that could be processed by different threads.
One important point to understand is that Java assigns each chunk of work to a thread in the common ForkJoinPool, in the same way as CompletableFuture does to handle parallel stream.
Conclusion
If you have reached till this part, I assume you have gone through the entire concept. At parts it might be bit boring to go through this hard core theory. But to leverage the performance and benefit of the Stream API, it is important to understand the conceptual part how Stream works. There are few points also to consider while working with Stream to get better performance
- Please use specialized Stream for primitive data types like IntStream, DoubleStream, LongStream etc to avoid overhead of auto boxing and unboxing.
- Try not to use peek()
Comments
Post a Comment