Java parallel streams API

By , last updated November 22, 2016

Parallel programming is a very difficult, but sometimes a necessary task. Modern computers contain multiple cores and any complex algorithm that is not performing parallel processing is not exploiting the full potential of the hardware.

The Streams API introduced in Java 8 makes it a lot easier to implement parallel processing. The API does a lot of complex low level processing itself.

Simple examples

Here is how simple it is to change a Stream from serial to parallel processing: just change stream() with parallellStream().

Convert Stream to List

Serial example of converting Stream to List:

public class SerialStream {
	public static void main(String[] args){
		List<Integer> list = new ArrayList<Integer>();
		for(int i = 1; i < 1000; i++){
			list.add(i);
		}
		List<Integer> myNumbersList = list.stream()
			.filter(i -> i%2 == 1).collect(Collectors.toList());
	}
}

Parallel example:

public class ParallelStream {
     public static void main(String[] args){
         List<Integer> list = new ArrayList<Integer>();
         for(int i = 1; i< 1000; i++){
             list.add(i);
         }
         List<Integer> myNumbersList = list
			.parallelStream()
			.filter(i -> i%2 == 1)
			.collect(Collectors.toList());
     }
}

Recall that parallel processing won’t give your program a lot of benefits unless it’s a huge amount of data that is being processed. Parallel processing uses as many cores as possible, and use less time to do the same task.

Convert Iterator to Stream

In order to convert Java 8 iterator to Stream you will need to convert to a Splititerator first.

Splititerator is a Java 8 object for traversing elements of collections, functions and IO channels. A powerful interface to work with elements and is often used in parallel computations.

There are several ways to create the Splititerator:

Spliterator spliterator = Spliterators
	.spliteratorUnknownSize(sourceIterator, Spliterator.ORDERED);

Or use Iterable:

Iterable<String> iterable = () -> sourceIterator;
Spliterator spliterator = iterable.spliterator();

Where () -> sourceIterator is a lambda expression for new Iterable(){…}

Here is an example of using Splititerator to count empty strings in an array.

public class IteratorToStream {
	private static final Logger logger = LoggerFactory.getLogger(IteratorToStream.class);

	public static void main(String[] args){
		Iterator<String> sourceIterator = Arrays
			.asList("aaa", "", "bbb", "", "ccc", "", "ddd").iterator();
		Spliterator spliterator = Spliterators
			.spliteratorUnknownSize(sourceIterator, Spliterator.ORDERED);
		Stream<String> targetStream = StreamSupport
			.stream(spliterator, false);
			
		long emptyStringsCount = targetStream
			.filter(string -> string.isEmpty()).count();

		logger.info("emptyStringsCount: " + emptyStringsCount);
	}
}

Count strings

Sequential empty strings count example:

public class SerialCount {
     public static void main(String[] args){
         List<String> stringsList = Arrays.asList("aaa", "", "bbb", "", "ccc", "", "ddd");
		 
		 int emptyStringsCount = stringsList.stream()
			.filter(string->string.isEmpty()).count();
     }
}

Parallel code:

public class ParallelCount {
     public static void main(String[] args){
         List<String> stringsList = Arrays.asList("aaa", "", "bbb", "", "ccc", "", "ddd");
		 
		 int emptyStringsCount = stringsList
			.parallelStream()
			.filter(string->string.isEmpty()).count();
     }
}

Both will produce the same result: 3.

Requirements

Not all algorithms can run parallel computations. The ones where, for example, order of the computations matter can’t be parallelized.

Subtraction example.

public class SerialSubtract {
     public static void main(String[] args){
         OptionalInt result = IntStream.range(0,100)
		 .reduce((a,b)->a-b);
     }
}

Th output for this algorithm will be totally different from a parallel version:

public class ParallelSubtract {
     public static void main(String[] args){
         OptionalInt result = IntStream.range(0,100)
		 .parallel()
		 .reduce((a,b)->a-b);
     }
}

In order to preserve the correct behavior of the program one need to follow the requirement that the accumulator/combiner functions must be associative, stateless, and non-interfering.

Read full list of requirements in the Stream Interface documentation.

Parallel or sequential

It has become extremely easy with Java 8 to iterate over collections as streams. But should we always choose parallel instead of sequential?

The answer is – no. Coordinating the threads in parallel processing can sometime be an overkill. It takes time and is rather difficult to maintain.

General idea on programming forums is to use sequential streams by default and parallel processing when it’s necessary.

Senior Software Engineer developing all kinds of stuff.

Comments

Be the first to comment.

Leave a Reply


You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*