Banner

Thursday, November 5, 2015

RxJava- Achieving Parallelization

RxJava is often misunderstood when it comes to the asynchronous/multithreaded aspects of it. The coding of multithreaded operations is simple, but understanding the abstraction is another thing.

A common question about RxJava is how to achieve parallelization, or emitting multiple items concurrently from an Observable. Of course, this definition breaks the Observable Contract which states that onNext() must be called sequentially and never concurrently by more than one thread at a time. Now you may be thinking "how the heck am I supposed to achieve parallelization then, which by definition is multiple items getting processed at a time!?" Widespread misunderstanding of how parallel actually works in RxJava has even led to the discontinuation of the parallel operator.

You can achieve parallelization in RxJava without breaking the Observable contract, but it requires a little understanding of Schedulers and how operators deal with multiple asynchronous sources.

Say you have this simple Observable that emits a range of Integers 1 to 10 upon subscription.
Observable<Integer> vals = Observable.range(1,10);

vals.subscribe(val -> System.out.println(val);)

But let's say we are doing some very intense calculation process to each Integer value.
Observable<Integer> vals = Observable.range(1,10);
vals.map(i -> intenseCalculation(i))
.subscribe(val -> System.out.println(val);)

And for simplicity's sake let's just make intenseCalculation() sleep for a random interval before returning the integer back, simulating an intensive calculation. We will also make it print the current thread the computation is occurring on.
public static int intenseCalculation(int i) {
    try {
        System.out.println("Calculating " + i + 
             " on " + Thread.currentThread().getName());
        Thread.sleep(randInt(1000,5000));
        return i;
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

We would want to parallelize and process multiple integers at a time. A common mistake people make is think "Oh, I just use subscribeOn() and have the source Observable emit items on multiple computation threads just like an ExecutorService.
Observable<Integer> vals = Observable.range(1,10);

vals.subscribeOn(Schedulers.computation())
          .map(i -> intenseCalculation(i))
          .subscribe(val -> System.out.println("Subscriber received "
                  + val + " on "
                  + Thread.currentThread().getName()));

However, if you run this code you will get this output, showing that each integer only emitted on one computation thread, not several as you might have expected. As a matter of fact, this is as sequential as a single-threaded operation.
Calculating 1 on RxComputationThreadPool-1
Subscriber received 1 on RxComputationThreadPool-1
Calculating 2 on RxComputationThreadPool-1
Subscriber received 2 on RxComputationThreadPool-1
Calculating 3 on RxComputationThreadPool-1
Subscriber received 3 on RxComputationThreadPool-1
Calculating 4 on RxComputationThreadPool-1
Subscriber received 4 on RxComputationThreadPool-1
Calculating 5 on RxComputationThreadPool-1
Subscriber received 5 on RxComputationThreadPool-1
Calculating 6 on RxComputationThreadPool-1
Subscriber received 6 on RxComputationThreadPool-1
Calculating 7 on RxComputationThreadPool-1
Subscriber received 7 on RxComputationThreadPool-1
Calculating 8 on RxComputationThreadPool-1
Subscriber received 8 on RxComputationThreadPool-1
Calculating 9 on RxComputationThreadPool-1
Subscriber received 9 on RxComputationThreadPool-1alculating 10 on RxComputationThreadPool-1
Subscriber received 10 on RxComputationThreadPool-1

Well that was not helpful! We did not achieve any effective parallelization at all. We just directed the emissions to happen on another thread named RxComputationThreadPool-1

So how do we make calculations happen on more than one computation thread? And do it without breaking the Observable contract? The secret is to catch each Integer in a flatMap(), create an Observable off it, do a subscribeOn() to the computation scheduler, and then perform the process all within the flatMap(). 
Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
            .subscribeOn(Schedulers.computation())
            .map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
Now we are getting somewhere and this looks pretty parallel now. We are getting multiple emissions happening on different computational threads.
Calculating 1 on RxComputationThreadPool-3
Calculating 4 on RxComputationThreadPool-2
Calculating 3 on RxComputationThreadPool-1
Calculating 2 on RxComputationThreadPool-4
Subscriber received 3 on RxComputationThreadPool-1
Calculating 7 on RxComputationThreadPool-1
Subscriber received 4 on RxComputationThreadPool-2
Calculating 8 on RxComputationThreadPool-2
Subscriber received 2 on RxComputationThreadPool-4
Calculating 6 on RxComputationThreadPool-4
Subscriber received 8 on RxComputationThreadPool-2
Subscriber received 6 on RxComputationThreadPool-4
Calculating 10 on RxComputationThreadPool-4
Subscriber received 7 on RxComputationThreadPool-1
Subscriber received 10 on RxComputationThreadPool-4
Subscriber received 1 on RxComputationThreadPool-3
Calculating 5 on RxComputationThreadPool-3
Subscriber received 5 on RxComputationThreadPool-3
Calculating 9 on RxComputationThreadPool-3
Subscriber received 9 on RxComputationThreadPool-3

RX COMPUTATION THREAD ---------------------------1----
RX COMPUTATION THREAD -------------------------3------
RX COMPUTATION THREAD -----------------------------2--
RX COMPUTATION THREAD ---------------------------4----
MAIN THREAD           ------7---6--5------------------
But how is this not breaking the Observable contract you ask? Remember that you cannot have concurrent onNext() calls on the same Observable. We have created an independent Observable off each integer value and scheduled them on separate computational threads, making their concurrency legitimate.

Now you may also be asking "Well... why is the Subscriber receiving emissions from multiple threads then? That sounds an awful lot like concurrent onNext() calls are happening and that breaks the contract."

Actually, there are no concurrent onNext() calls happening. The flatMap() has to merge emissions from multiple Observables happening on multiple threads, but it cannot allow concurrent onNext() calls to happen down the chain including the Subscriber. It will not block and synchronize either because that would undermine the benefits of RxJava. Instead of blocking, it will re-use the thread currently emitting something out of the flatMap(). If other threads want to emit items while another thread is emitting out the flatMap(), the other threads will leave their emissions for the occupying thread to take ownership of.

Here is proof as the above example makes this not so obvious. Let's collect the emissions into a toList() before they go to the Subscriber.

Observable<Integer>vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
               .subscribeOn(Schedulers.computation())
               .map(i -> intenseCalculation(i))
     ).toList()
     .subscribe(val -> System.out.println("Subscriber received "
                + val + " on "
                + Thread.currentThread().getName()));

Observe the output below.
Calculating 4 on RxComputationThreadPool-2
Calculating 1 on RxComputationThreadPool-3
Calculating 2 on RxComputationThreadPool-4
Calculating 3 on RxComputationThreadPool-1
Calculating 7 on RxComputationThreadPool-1
Calculating 6 on RxComputationThreadPool-4
Calculating 5 on RxComputationThreadPool-3
Calculating 10 on RxComputationThreadPool-4
Calculating 8 on RxComputationThreadPool-2
Calculating 9 on RxComputationThreadPool-3
Subscriber received [3, 2, 1, 6, 4, 7, 10, 5, 8, 9] on RxComputationThreadPool-3

Notice that the calculations were all happening on different threads, and the flatMap() was pushing items out on one of these threads at a given time. But the last thread to do an emission was RxComputationThreadPool-3. This happened to be the thread pushing items out of flatMap() when it needed to emit the final value 9, so it called onCompleted() which pushed out the list all the way to the Subscriber.

Let me know if you have any questions. Here is the full working example.


import rx.Observable;
import rx.schedulers.Schedulers;

import java.util.Random;

public final class ParallelTest {

    private static final Random rand = new Random();

 public static void main(String[] args) {

        Observable<Integer> vals = Observable.range(1,10);

        vals.flatMap(val -> Observable.just(val)
                    .subscribeOn(Schedulers.computation())
                    .map(i -> intenseCalculation(i))
        ).subscribe(val -> System.out.println("Subscriber received "
                + val + " on "
                + Thread.currentThread().getName()));

        waitSleep();
    }
    public static void waitSleep() {
        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static int intenseCalculation(int i) {
        try {
            System.out.println("Calculating " + i + " on " + Thread.currentThread().getName());
            Thread.sleep(randInt(1000,5000));
            return i;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static int randInt(int min, int max) {
        return rand.nextInt((max - min) + 1) + min;
    }
}

About Fixing Thread Pools
Please note as David mentioned below that this parallelization strategy does not work with all Schedulers. Some Schedulers other than computation() could flood the system with too many threads and therefore cause poor performance. The computation scheduler limits the number of concurrent processes that can happen inside the flatMap() based on the number of CPU's your machine has. If you wanted to use other schedulers like newThread() and io() which do not limit the number of threads, you can pass a second int argument to the flatMap() limiting the number of concurrent processes.

You can also create a Scheduler off an ExecutorService giving you more fine-tuned control if needed.  Actually you can get significantly better performance doing this. You can read about it in my other article on Maximizing Parallelization.

18 comments:

  1. Great post!

    I just wanted to add that this approach doesn't work with any Scheduler; this works with computation() and any fixed-size Executor turned into a Scheduler. io() and newThread() will flood the system with threads. However, if you use the flatMap(func, maxConcurrency) overload, you can limit the active number of threads (although newThread will still churn through many of them).

    In addition, you can use groupBy with a round-robin key selector:

    AtomicInteger key = new AtomicInteger()

    source.groupBy(v -> key.getAndIncrement() % 5)
    .flatMap(g -> g.observeOn(scheduler).map(i -> calculation(i)))
    .subscribe(...);

    This will avoid creating too many one-time Observables and Workers.

    ReplyDelete
    Replies
    1. How does this work with computation without the max concurrency overload?

      Delete
    2. From my observations as well as general understanding of concurrency, it limits the number of available threads to an appropriate number based on the number of cores you have. Please correct me if I'm wrong David...

      Delete
    3. If you have 8 cores, this will only use 5 of them. If you have 4 cores, two groups will share the same core.

      Delete
    4. I finally get the round-robin key-selector now. Very clever.

      Delete
    5. While it certainly can accomplish the goal, it's still not as good as just piping tasks into an ExecutorService in terms of efficiency, especially if you're dealing with large sets of data and/or large variations in latency in the calculation() method. From what I can tell, https://github.com/ReactiveX/RxJava/wiki/Parallel-flows is the new/"right" way to do non-sequential parallelization of the pipeline, in RxJava 2. Is that right? Has anyone tried it?

      Delete
    6. The solution with groupBy and observeOn proposed by David does not seem to support backpressure. It will request all data from the source at once.

      Delete
    7. It has been fixed recently ;).

      Delete
  2. Thanks David! I'll make a mention of that regarding Schedulers and fixed sizing a little later when I edit this. I need to update one of the visualizations anyway, realizing I misrepresented the subscribeOn() as an observeOn() switching from one thread to another.

    I have never seen that groupBy() approach so I'll need to play with it. I've been interested to see if anything would come out of the RxJava Parallel project on GitHub, but noticed it had no activity for awhile. I was wondering if new operators in that project would challenge the Observable contract safely.

    ReplyDelete
  3. It is hard to compete with Java 8 Stream's parallel mode. I have a bunch of ideas for RxJavaParallel but not the time for it.

    ReplyDelete
    Replies
    1. haha yeah, prioritizing is always the frustrating part of being a developer. Stream's parallel operator is pull-based of course which I'm guessing is simpler to achieve. But what makes Rx so great in my opinion is its composability allows it to scale any problem, even its own perceived limitations.

      Delete
  4. Great post!
    I thought RxJava guarantees the order of items in Subscriber is the same as in Observer. Do you have idea how to achieve this and still keep the parallelization? How would you extend you example? Thanks

    ReplyDelete
    Replies
    1. An Observable will emit items in a sequential, deterministic order. But when multiple sources are parallelized and merged, order can logically no longer be enforced. You will want to avoid parallelization if order matters.

      Delete
  5. Thanks.. I wrote the same without using lamda. I am getting different results
    WHats wrong with my code


    vals.flatMap(new Func1>() {

    @Override
    public Observable call(Integer pArg0) {
    // TODO Auto-generated method stub
    return Observable.just(pArg0);
    }

    }).subscribeOn(Schedulers.computation()).map(new Func1() {

    @Override
    public Integer call(Integer pArg0) {
    // TODO Auto-generated method stub
    return intenseCalculation(pArg0);
    }
    }).subscribe(new Action1() {

    @Override
    public void call(Integer pArg0) {
    System.out.println(" Subscribed recieved " + pArg0 + " on " + Thread.currentThread().getName());

    }

    });

    ReplyDelete
    Replies
    1. Hi Arun! Your lambda-less code, though compilable, seems not to be equivalent to the source example in the post. Try the following:

      vals.flatMap(new Func1>() {

      @Override
      public Observable call(Integer pArg0) {
      // TODO Auto-generated method stub
      return Observable.just(pArg0).
      subscribeOn(Schedulers.computation())
      .map(new Func1() {
      @Override
      public Integer call(Integer pArg0) {
      // TODO Auto-generated method stub
      return intenseCalculation(pArg0);
      }
      });
      }

      }).subscribe(new Action1() {

      @Override
      public void call(Integer pArg0) {
      System.out.println(" Subscribed recieved " + pArg0 + " on " + Thread.currentThread().getName());
      }

      });

      Delete
  6. Great article on Parallelization. In the full working example, I noticed ".toList()" missing. Was that intentional? Thanks!

    ReplyDelete
  7. From your code, if I add a thread.sleep in the subscribe i.e

    Flowable.range(1,1000)
    .flatMap(val -> Flowable.just(val)
    .subscribeOn(Schedulers.computation())
    .map(i -> intenseCalculation(i))
    ).subscribe(val -> {
    Thread.sleep(500);
    System.out.println("Subscriber received "
    + val + " on "
    + Thread.currentThread().getName());
    });

    I see subscriber only processing in one thread:
    Subscriber received 10 on RxComputationThreadPool-1
    Subscriber received 11 on RxComputationThreadPool-1
    Subscriber received 12 on RxComputationThreadPool-1
    Subscriber received 14 on RxComputationThreadPool-1
    Subscriber received 15 on RxComputationThreadPool-1
    Subscriber received 16 on RxComputationThreadPool-1
    Subscriber received 18 on RxComputationThreadPool-1

    ReplyDelete