RxJava is often misunderstood when it comes to the asynchronous/multithreaded aspects of it. The coding of multithreaded operations is simple, but understanding the abstraction is another thing.
A common question about RxJava is how to achieve parallelization, or emitting multiple items concurrently from an
Observable. Of course, this definition breaks the
Observable Contract which states that
onNext() must be
called sequentially and never concurrently by more than one thread at a time. Now you may be thinking "how the heck am I supposed to achieve parallelization then, which by definition is multiple items getting processed at a time!?" Widespread misunderstanding of how parallel actually works in RxJava has even led to the
discontinuation of the parallel operator.
You can achieve parallelization in RxJava without breaking the
Observable contract, but it requires a little understanding of
Schedulers and how operators deal with multiple asynchronous sources.
Say you have this simple
Observable that emits a range of Integers 1 to 10 upon subscription.
Observable<Integer> vals = Observable.range(1,10);
vals.subscribe(val -> System.out.println(val);)
But let's say we are doing some very intense calculation process to each Integer value.
Observable<Integer> vals = Observable.range(1,10);
vals.map(i -> intenseCalculation(i))
.subscribe(val -> System.out.println(val);)
And for simplicity's sake let's just make
intenseCalculation() sleep for a random interval before returning the integer back, simulating an intensive calculation. We will also make it print the current thread the computation is occurring on.
public static int intenseCalculation(int i) {
try {
System.out.println("Calculating " + i +
" on " + Thread.currentThread().getName());
Thread.sleep(randInt(1000,5000));
return i;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
We would want to parallelize and process multiple integers at a time. A common mistake people make is think "Oh, I just use
subscribeOn() and have the source
Observable emit items on multiple computation threads just like an
ExecutorService.
Observable<Integer> vals = Observable.range(1,10);
vals.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.subscribe(val -> System.out.println("Subscriber received "
+ val + " on "
+ Thread.currentThread().getName()));
However, if you run this code you will get this output, showing that each integer only emitted on one computation thread, not several as you might have expected. As a matter of fact, this is as sequential as a single-threaded operation.
Calculating 1 on RxComputationThreadPool-1
Subscriber received 1 on RxComputationThreadPool-1
Calculating 2 on RxComputationThreadPool-1
Subscriber received 2 on RxComputationThreadPool-1
Calculating 3 on RxComputationThreadPool-1
Subscriber received 3 on RxComputationThreadPool-1
Calculating 4 on RxComputationThreadPool-1
Subscriber received 4 on RxComputationThreadPool-1
Calculating 5 on RxComputationThreadPool-1
Subscriber received 5 on RxComputationThreadPool-1
Calculating 6 on RxComputationThreadPool-1
Subscriber received 6 on RxComputationThreadPool-1
Calculating 7 on RxComputationThreadPool-1
Subscriber received 7 on RxComputationThreadPool-1
Calculating 8 on RxComputationThreadPool-1
Subscriber received 8 on RxComputationThreadPool-1
Calculating 9 on RxComputationThreadPool-1
Subscriber received 9 on RxComputationThreadPool-1alculating 10 on RxComputationThreadPool-1
Subscriber received 10 on RxComputationThreadPool-1
Well that was not helpful! We did not achieve any effective parallelization at all. We just directed the emissions to happen on another thread named
RxComputationThreadPool-1.
So how do we make calculations happen on more than one computation thread? And do it without breaking the
Observable contract? The secret is to catch each Integer in a
flatMap(), create an
Observable off it, do a
subscribeOn() to the computation scheduler, and then perform the process all within the flatMap().
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
Now we are getting somewhere and this looks pretty parallel now. We are getting multiple emissions happening on different computational threads.
Calculating 1 on RxComputationThreadPool-3
Calculating 4 on RxComputationThreadPool-2
Calculating 3 on RxComputationThreadPool-1
Calculating 2 on RxComputationThreadPool-4
Subscriber received 3 on RxComputationThreadPool-1
Calculating 7 on RxComputationThreadPool-1
Subscriber received 4 on RxComputationThreadPool-2
Calculating 8 on RxComputationThreadPool-2
Subscriber received 2 on RxComputationThreadPool-4
Calculating 6 on RxComputationThreadPool-4
Subscriber received 8 on RxComputationThreadPool-2
Subscriber received 6 on RxComputationThreadPool-4
Calculating 10 on RxComputationThreadPool-4
Subscriber received 7 on RxComputationThreadPool-1
Subscriber received 10 on RxComputationThreadPool-4
Subscriber received 1 on RxComputationThreadPool-3
Calculating 5 on RxComputationThreadPool-3
Subscriber received 5 on RxComputationThreadPool-3
Calculating 9 on RxComputationThreadPool-3
Subscriber received 9 on RxComputationThreadPool-3
RX COMPUTATION THREAD ---------------------------1----
RX COMPUTATION THREAD -------------------------3------
RX COMPUTATION THREAD -----------------------------2--
RX COMPUTATION THREAD ---------------------------4----
MAIN THREAD ------7---6--5------------------
But how is this not breaking the
Observable contract you ask? Remember that you cannot have concurrent
onNext() calls on the
same Observable. We have created an independent
Observable off each integer value and scheduled them on separate computational threads, making their concurrency legitimate.
Now you may also be asking "Well... why is the
Subscriber receiving emissions from multiple threads then? That sounds an awful lot like concurrent
onNext() calls are happening and that breaks the contract."
Actually, there are no concurrent
onNext() calls happening. The
flatMap() has to merge emissions from multiple Observables happening on multiple threads, but it cannot allow concurrent
onNext() calls to happen down the chain including the
Subscriber. It will not block and synchronize either because that would undermine the benefits of RxJava. Instead of blocking, it will re-use the thread currently emitting something out of the
flatMap(). If other threads want to emit items while another thread is emitting out the
flatMap(), the other threads will leave their emissions for the occupying thread to take ownership of.
Here is proof as the above example makes this not so obvious. Let's collect the emissions into a
toList() before they go to the
Subscriber.
Observable<Integer>vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).toList()
.subscribe(val -> System.out.println("Subscriber received "
+ val + " on "
+ Thread.currentThread().getName()));
Observe the output below.
Calculating 4 on RxComputationThreadPool-2
Calculating 1 on RxComputationThreadPool-3
Calculating 2 on RxComputationThreadPool-4
Calculating 3 on RxComputationThreadPool-1
Calculating 7 on RxComputationThreadPool-1
Calculating 6 on RxComputationThreadPool-4
Calculating 5 on RxComputationThreadPool-3
Calculating 10 on RxComputationThreadPool-4
Calculating 8 on RxComputationThreadPool-2
Calculating 9 on RxComputationThreadPool-3
Subscriber received [3, 2, 1, 6, 4, 7, 10, 5, 8, 9] on RxComputationThreadPool-3
Notice that the calculations were all happening on different threads, and the
flatMap() was pushing items out on one of these threads at a given time. But the last thread to do an emission was
RxComputationThreadPool-3. This happened to be the thread pushing items out of
flatMap() when it needed to emit the final value 9, so it called
onCompleted() which pushed out the list all the way to the
Subscriber.
Let me know if you have any questions. Here is the full working example.
import rx.Observable;
import rx.schedulers.Schedulers;
import java.util.Random;
public final class ParallelTest {
private static final Random rand = new Random();
public static void main(String[] args) {
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println("Subscriber received "
+ val + " on "
+ Thread.currentThread().getName()));
waitSleep();
}
public static void waitSleep() {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static int intenseCalculation(int i) {
try {
System.out.println("Calculating " + i + " on " + Thread.currentThread().getName());
Thread.sleep(randInt(1000,5000));
return i;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public static int randInt(int min, int max) {
return rand.nextInt((max - min) + 1) + min;
}
}
About Fixing Thread Pools
Please note as David mentioned below that this parallelization strategy does not work with all Schedulers. Some Schedulers other than
computation() could flood the system with too many threads and therefore cause poor performance. The computation scheduler limits the number of concurrent processes that can happen inside the
flatMap() based on the number of CPU's your machine has. If you wanted to use other schedulers like
newThread() and
io() which do not limit the number of threads, you can pass a second int argument to the
flatMap() limiting the number of concurrent processes.
You can also create a
Scheduler off an
ExecutorService giving you more fine-tuned control if needed. Actually you can get significantly better performance doing this. You can read about it in my other article on
Maximizing Parallelization.