Banner

Friday, February 19, 2016

RxJava - Maximizing Parallelization

Earlier I posted an entry about achieving parallelization with RxJava. But I have learned a few things from experiments and the RxJava devs to maximize parallelization and get better performance. This post is going to focus on computation parallelization and not really IO. Computation is where you really have to constrain the number of threads you use, and roughly you get the best performance accounting for the number of cores your computer has.

Maximizing Computation with Executors

In my previous article on parallelization, I had a short conversation with David Karnok, the current maintainer of RxJava, in the comments section. I asked him how many threads the computation() Scheduler creates and utilizes. This was his response:
If [your computer has] 8 cores, this will only use 5 of them. If you have 4 cores, two groups will share the same core.
Thinking about this, I began to have many questions. I am not an expert on multithreading, but I read most of Java Concurrency in Practice by Brian Goetz a year or so ago. I recall in Chapter 8, Brian talked about sizing thread pools for computation-intensive tasks. Pragmatically, you can follow this formula to roughly get an optimal number of threads. It's not perfect, but its a simple rule to get well-balanced computation performance.
Nthreads = NCPU + 1
So if you have 4 cores, that means you should have 5 threads. If you have 8 cores, you should have 9 threads. That extra thread is to fill up idle CPU cycles when the other threads get short moments of idleness. You can get this formula dynamically using a simple Java expression.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;
I may be very naive and might be overlooking something. Or maybe the RxJava developers simply had different goals and wanted to keep the number of threads conservative. But I began to suspect Schedulers.computation() was not maximizing parallelization since it was not explicitly designed for it.
Observable.range(1,1000)
.flatMap(i -> Observable.just(i)
    .subscribeOn(Schedulers.computation())
    .map(i2 -> intenseCalculation(i2))
).subscribe(System.out::println)
I was running a process at work in a similar manner as shown above, and the process took 225 seconds to emit all the items and do an intensive calculation on each one in parallel.
I had a hunch and decided to try Goetz’s simple rough formula for calculation thread pool sizing. This means I need to create my own computation ExecutorService, which is basically a thread pool. Fortunately Schedulers has a factory to turn an Executor into a Scheduler easily.
We can create an ExecutorService with a fixed number of threads matching the optimization formula.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;

ExecutorService executor = Executors.newFixedThreadPool(threadCt);
Then we can call Schedulers.from() and pass this executor to it.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;

ExecutorService executor = Executors.newFixedThreadPool(threadCt);
Scheduler scheduler = Schedulers.from(executor);

Observable.range(1,1000)
.flatMap(i -> Observable.just(i)
    .subscribeOn(scheduler)
    .map(i2 -> intenseCalculation(i2))
).subscribe(System.out::println);
One thing we need to add: we need to shutdown() the ExecutorService when we are done with it so the threads are disposed and do not keep the application alive inadvertently. We can simply call this in a finallyDo() somewhere in the chain as shown below.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;

ExecutorService executor = Executors.newFixedThreadPool(threadCt);
Scheduler scheduler = Schedulers.from(executor);

Observable.range(1,1000)
.flatMap(i -> Observable.just(i)
    .subscribeOn(scheduler)
    .map(i2 -> intenseCalculation(i2))
).finallyDo(() -> executor.shutdown())
.subscribe(System.out::println);
Running my process at work on a 4-core machine (so it is running 5 threads), it ran in 92 seconds, a huge difference from 225 seconds. This makes my custom ExecutorService 2.44 times faster than the Schedulers.computation()
Based on what David Karnok said, it seems RxJava maintains a conservative number of computation() threads and/or core utilization. Concluding from my observations and experiments, if you need to do some intensive parallel computation work you might be better off creating your own Scheduler off an ExecutorService with an appropriately fixed pool size.

Round-Robin Parallelization

David Karnok posted a neat trick: round-robin parallelization. If you are flatMapping to already existing Observables, then this does not really apply. But take our example where we are creating a single-emission Observables for each value.
Observable.range(1,1000)
.flatMap(i -> Observable.just(i)
    .subscribeOn(Schedulers.computation()
    .map(i2 -> intenseCalculation(i2))
).subscribe(System.out::println)
That Observable.just() factory can be expensive when used repeatedly because we are creating a new Observable for each emission. Is there not a way to consolidate 1000 single-emission Observables into a handful of Observables with several emissions?
We can do this by using groupBy() and a round-robin strategy. Create an AtomicInteger and use a modulus to break up the emissions into batches. Then you can use that modulus remainder to groupBy() on. For example, we can break up an Observable into 5 GroupedObservable items using a modulus operator, and we can flatMap() those.
final AtomicInteger batch = AtomicInteger(0);

Observable.range(1,1000)
    .groupBy(i -> batch.getAndIncrement() % 5)
    .flatMap(g -> g.observeOn(Schedulers.io())
        .map(i -> intenseCalculation(i))
    ).subscribe(System.out::println);
Note we have to use observeOn() since the GroupedObservable specifies its own subscribeOn() thread and can’t be overridden.
You can also use this in tandem with our ExecutorService strategy. Heck, you can throttle the number of GroupedObservables to be equal to the number of threads. If you have 4 cores, you would then have 5 cleanly divided streams of emissions on 5 threads.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;

ExecutorService executor = Executors.newFixedThreadPool(threadCt);

Schedulers scheduler = Schedulers.from(executor);

Observable.range(1,1000)
    .groupBy(i -> batch.getAndIncrement() % threadCt )
    .flatMap(g -> g.observeOn(scheduler)
        .map(i -> intenseCalculation(i))
    ).subscribe(System.out::println);
If you have any other ideas, comments, questions, or your own discoveries with parallelization please comment below.

15 comments:

  1. Shouldn't it be "Nthreads = Ncpu + 1"? :)

    ReplyDelete
  2. What was the execution time for the last change?

    ReplyDelete
    Replies
    1. Not quite sure what you mean. Which change are you referring to?

      Delete
  3. When you added the GroupedObservable.

    ReplyDelete
    Replies
    1. Well the measurements I shared were not for this Observable of 1000 integers, but rather an actual complex process at my workplace doing intensive calculations. Since the flatMap() mapped to already existing Observables, I wasn't using Observable.just() and therefore didn't need to do the groupBy().

      Therefore I don't have any measurements for that. But generally you want to avoid creating extraneous Observables (or excessive object creation in general) because it takes up memory and resources.

      Delete
  4. When you added the GroupedObservable.

    ReplyDelete
  5. My guess is that they were being conservative about computation, so that other tasks in the same application, that require threads as well and might be deemed just as important, can use some threads as well, - networking for instance.

    ReplyDelete
    Replies
    1. I've had similar theories. I think the core usage is conservative simply to keep the library lightweight and non-invasive on system resources. This probably is especially important on systems like Android. RxJava would probably get criticism as a library if it occupied all four cores by default.

      Delete
  6. I'm using Retrofit with RxJava, in one case, I have to make multiple say more than 40 request based on the single response from the server. I'm using flatMap to create observable without grouping. Do we need to create groupBy option, with our own Schedulers in that case ??

    ReplyDelete
    Replies
    1. It really depends. I'm not an Android developer (although I bought a book and plan on learning), but 40 emissions does not sound like very many where I would worry too much about it. The reason for the groupBy() is to minimize excessive object creation, but 40 objects is not that many. You'll have to test and find out for sure though.

      Delete
  7. I don't understand something:
    I tested code samples posted in this article in Android Studio and on my phone with 2 cores, and I agree, without this lines:

    int threadCt = Runtime.getRuntime().availableProcessors() + 1;
    ExecutorService executor = Executors.newFixedThreadPool(threadCt);
    Scheduler scheduler = Schedulers.from(executor);

    emission of items from 1 to 100 took me +-50s, with them - +-30s. But as I saw later, when I than increased numbers of threads to 4 (by changing line
    int threadCt = Runtime.getRuntime().availableProcessors() + 2// (not +1))
    I get...better performance and job took 24s instead of 30s.
    So, why int threadCt = Runtime.getRuntime().availableProcessors() + 1; is best way to select number of threads?

    ReplyDelete
  8. So you are saying that increasing your threads, beyond the recommended formula, yielded better performance? Keep in mind that formula is *roughly* optimal for *computation* tasks. If you are doing IO you can actually get even better performance by creating more threads, since IO is not computationally intensive.

    If your tasks are computational in nature, and you are getting better performance with +2, there could be a few reasons. You may simply be getting lucky, and I bet if you add more beyond a certain threshhold you will have a diminishing return. It could also be that your tasks may be long-running but not as computational-heavy as you think.

    Multithreading can definitely be tinkered with and optimized to an obsessive degree, and on a case-by-case basis you can yield better performance by experimenting and measuring. I would definitely recommend reading Brian Goetz's "Concurrency in Practice" if you can. You might enjoy that reading if you are interesting in fine-tuning concurrency and want to be more precise about utilization.

    ReplyDelete
  9. These micro-benchmarks do not really test backpressure. I've had issues caused by a high number of subscribers created by flatMap's merge. Fortunately, you can now limit the number of concurrent subscriptions. See: https://github.com/ReactiveX/RxJava/issues/2291

    ReplyDelete
  10. Hi Thomas,
    Your code was great, it works fine as a Java example. My case is in Android. Actually I have a list of categories and I am doing a network call for each of the categories. Totally there are 6000 data (an average of 75 products per category). Everything seemed clear..
    I was trying to apply your solution in Android, but I keep obtaining this error: "no instance of type variable R exist so that void conforms to R". Do you have any idea about this? Have you tried it in Android?

    ReplyDelete