YouTube Videos

A Simple Neural Network
KotlinConf 2018 - Mathematical Modeling
Creating a Sudoku Solver from Scratch
Traveling Salesman Problem
Text Categorization w/ Naive Bayes
Monty Hall Problem
Solving World's Hardest Sudoku

Friday, February 19, 2016

RxJava - Maximizing Parallelization

Earlier I posted an entry about achieving parallelization with RxJava. But I have learned a few things from experiments and the RxJava devs to maximize parallelization and get better performance. This post is going to focus on computation parallelization and not really IO. Computation is where you really have to constrain the number of threads you use, and roughly you get the best performance accounting for the number of cores your computer has.

Maximizing Computation with Executors

In my previous article on parallelization, I had a short conversation with David Karnok, the current maintainer of RxJava, in the comments section. I asked him how many threads the computation() Scheduler creates and utilizes. This was his response:
If [your computer has] 8 cores, this will only use 5 of them. If you have 4 cores, two groups will share the same core.
Thinking about this, I began to have many questions. I am not an expert on multithreading, but I read most of Java Concurrency in Practice by Brian Goetz a year or so ago. I recall in Chapter 8, Brian talked about sizing thread pools for computation-intensive tasks. Pragmatically, you can follow this formula to roughly get an optimal number of threads. It's not perfect, but its a simple rule to get well-balanced computation performance.
Nthreads = NCPU + 1
So if you have 4 cores, that means you should have 5 threads. If you have 8 cores, you should have 9 threads. That extra thread is to fill up idle CPU cycles when the other threads get short moments of idleness. You can get this formula dynamically using a simple Java expression.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;
I may be very naive and might be overlooking something. Or maybe the RxJava developers simply had different goals and wanted to keep the number of threads conservative. But I began to suspect Schedulers.computation() was not maximizing parallelization since it was not explicitly designed for it.
Observable.range(1,1000)
.flatMap(i -> Observable.just(i)
    .subscribeOn(Schedulers.computation())
    .map(i2 -> intenseCalculation(i2))
).subscribe(System.out::println)
I was running a process at work in a similar manner as shown above, and the process took 225 seconds to emit all the items and do an intensive calculation on each one in parallel.
I had a hunch and decided to try Goetz’s simple rough formula for calculation thread pool sizing. This means I need to create my own computation ExecutorService, which is basically a thread pool. Fortunately Schedulers has a factory to turn an Executor into a Scheduler easily.
We can create an ExecutorService with a fixed number of threads matching the optimization formula.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;

ExecutorService executor = Executors.newFixedThreadPool(threadCt);
Then we can call Schedulers.from() and pass this executor to it.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;

ExecutorService executor = Executors.newFixedThreadPool(threadCt);
Scheduler scheduler = Schedulers.from(executor);

Observable.range(1,1000)
.flatMap(i -> Observable.just(i)
    .subscribeOn(scheduler)
    .map(i2 -> intenseCalculation(i2))
).subscribe(System.out::println);
One thing we need to add: we need to shutdown() the ExecutorService when we are done with it so the threads are disposed and do not keep the application alive inadvertently. We can simply call this in a finallyDo() somewhere in the chain as shown below.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;

ExecutorService executor = Executors.newFixedThreadPool(threadCt);
Scheduler scheduler = Schedulers.from(executor);

Observable.range(1,1000)
.flatMap(i -> Observable.just(i)
    .subscribeOn(scheduler)
    .map(i2 -> intenseCalculation(i2))
).finallyDo(() -> executor.shutdown())
.subscribe(System.out::println);
Running my process at work on a 4-core machine (so it is running 5 threads), it ran in 92 seconds, a huge difference from 225 seconds. This makes my custom ExecutorService 2.44 times faster than the Schedulers.computation()
Based on what David Karnok said, it seems RxJava maintains a conservative number of computation() threads and/or core utilization. Concluding from my observations and experiments, if you need to do some intensive parallel computation work you might be better off creating your own Scheduler off an ExecutorService with an appropriately fixed pool size.

Round-Robin Parallelization

David Karnok posted a neat trick: round-robin parallelization. If you are flatMapping to already existing Observables, then this does not really apply. But take our example where we are creating a single-emission Observables for each value.
Observable.range(1,1000)
.flatMap(i -> Observable.just(i)
    .subscribeOn(Schedulers.computation()
    .map(i2 -> intenseCalculation(i2))
).subscribe(System.out::println)
That Observable.just() factory can be expensive when used repeatedly because we are creating a new Observable for each emission. Is there not a way to consolidate 1000 single-emission Observables into a handful of Observables with several emissions?
We can do this by using groupBy() and a round-robin strategy. Create an AtomicInteger and use a modulus to break up the emissions into batches. Then you can use that modulus remainder to groupBy() on. For example, we can break up an Observable into 5 GroupedObservable items using a modulus operator, and we can flatMap() those.
final AtomicInteger batch = AtomicInteger(0);

Observable.range(1,1000)
    .groupBy(i -> batch.getAndIncrement() % 5)
    .flatMap(g -> g.observeOn(Schedulers.io())
        .map(i -> intenseCalculation(i))
    ).subscribe(System.out::println);
Note we have to use observeOn() since the GroupedObservable specifies its own subscribeOn() thread and can’t be overridden.
You can also use this in tandem with our ExecutorService strategy. Heck, you can throttle the number of GroupedObservables to be equal to the number of threads. If you have 4 cores, you would then have 5 cleanly divided streams of emissions on 5 threads.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;

ExecutorService executor = Executors.newFixedThreadPool(threadCt);

Schedulers scheduler = Schedulers.from(executor);

Observable.range(1,1000)
    .groupBy(i -> batch.getAndIncrement() % threadCt )
    .flatMap(g -> g.observeOn(scheduler)
        .map(i -> intenseCalculation(i))
    ).subscribe(System.out::println);
If you have any other ideas, comments, questions, or your own discoveries with parallelization please comment below.

27 comments:

  1. Shouldn't it be "Nthreads = Ncpu + 1"? :)

    ReplyDelete
  2. What was the execution time for the last change?

    ReplyDelete
    Replies
    1. Not quite sure what you mean. Which change are you referring to?

      Delete
  3. When you added the GroupedObservable.

    ReplyDelete
    Replies
    1. Well the measurements I shared were not for this Observable of 1000 integers, but rather an actual complex process at my workplace doing intensive calculations. Since the flatMap() mapped to already existing Observables, I wasn't using Observable.just() and therefore didn't need to do the groupBy().

      Therefore I don't have any measurements for that. But generally you want to avoid creating extraneous Observables (or excessive object creation in general) because it takes up memory and resources.

      Delete
  4. When you added the GroupedObservable.

    ReplyDelete
  5. My guess is that they were being conservative about computation, so that other tasks in the same application, that require threads as well and might be deemed just as important, can use some threads as well, - networking for instance.

    ReplyDelete
    Replies
    1. I've had similar theories. I think the core usage is conservative simply to keep the library lightweight and non-invasive on system resources. This probably is especially important on systems like Android. RxJava would probably get criticism as a library if it occupied all four cores by default.

      Delete
  6. I'm using Retrofit with RxJava, in one case, I have to make multiple say more than 40 request based on the single response from the server. I'm using flatMap to create observable without grouping. Do we need to create groupBy option, with our own Schedulers in that case ??

    ReplyDelete
    Replies
    1. It really depends. I'm not an Android developer (although I bought a book and plan on learning), but 40 emissions does not sound like very many where I would worry too much about it. The reason for the groupBy() is to minimize excessive object creation, but 40 objects is not that many. You'll have to test and find out for sure though.

      Delete
  7. I don't understand something:
    I tested code samples posted in this article in Android Studio and on my phone with 2 cores, and I agree, without this lines:

    int threadCt = Runtime.getRuntime().availableProcessors() + 1;
    ExecutorService executor = Executors.newFixedThreadPool(threadCt);
    Scheduler scheduler = Schedulers.from(executor);

    emission of items from 1 to 100 took me +-50s, with them - +-30s. But as I saw later, when I than increased numbers of threads to 4 (by changing line
    int threadCt = Runtime.getRuntime().availableProcessors() + 2// (not +1))
    I get...better performance and job took 24s instead of 30s.
    So, why int threadCt = Runtime.getRuntime().availableProcessors() + 1; is best way to select number of threads?

    ReplyDelete
  8. So you are saying that increasing your threads, beyond the recommended formula, yielded better performance? Keep in mind that formula is *roughly* optimal for *computation* tasks. If you are doing IO you can actually get even better performance by creating more threads, since IO is not computationally intensive.

    If your tasks are computational in nature, and you are getting better performance with +2, there could be a few reasons. You may simply be getting lucky, and I bet if you add more beyond a certain threshhold you will have a diminishing return. It could also be that your tasks may be long-running but not as computational-heavy as you think.

    Multithreading can definitely be tinkered with and optimized to an obsessive degree, and on a case-by-case basis you can yield better performance by experimenting and measuring. I would definitely recommend reading Brian Goetz's "Concurrency in Practice" if you can. You might enjoy that reading if you are interesting in fine-tuning concurrency and want to be more precise about utilization.

    ReplyDelete
  9. These micro-benchmarks do not really test backpressure. I've had issues caused by a high number of subscribers created by flatMap's merge. Fortunately, you can now limit the number of concurrent subscriptions. See: https://github.com/ReactiveX/RxJava/issues/2291

    ReplyDelete
  10. Hi Thomas,
    Your code was great, it works fine as a Java example. My case is in Android. Actually I have a list of categories and I am doing a network call for each of the categories. Totally there are 6000 data (an average of 75 products per category). Everything seemed clear..
    I was trying to apply your solution in Android, but I keep obtaining this error: "no instance of type variable R exist so that void conforms to R". Do you have any idea about this? Have you tried it in Android?

    ReplyDelete
  11. You wrote: "Note we have to use observeOn() since the GroupedObservable specifies its own subscribeOn() thread and can’t be overridden."

    This is a bit vague. I googled through the JavaDocs and even source code (RxJava 2.2.1). The docs doesn't say a word on this behavior (but then, the JavaDocs are pretty shallow/watered-down so it's not like I expected it to be officially noted) and I can find no reference to subscribeOn() in the source code.

    So I tried - using your example - to switch the call from observeOn() to subscribeOn() and I did not notice any difference in the output.

    I.e., either this particular statement is wrong, or it was only applicable for an older version of RxJava, or I have completely missed something lol. Could you help a brother out??

    ReplyDelete
    Replies
    1. Hmmm. You seem to be right. I might have worded this poorly and mean to say that GroupedObservable uses the source's subscribeOn(). But even that seems to be wrong now with a quick experiment I ran.

      import io.reactivex.Observable
      import io.reactivex.schedulers.Schedulers

      fun main(args: Array) {
      Observable.range(1,10)
      .groupBy { it % 2 == 0 }
      .flatMap { it.subscribeOn(Schedulers.computation()).map { "$it emitted on ${Thread.currentThread().name}" } }
      .subscribe {
      println(it)
      }

      Thread.sleep(2000L)
      }


      OUTPUT:
      1 emitted on RxComputationThreadPool-1
      2 emitted on RxComputationThreadPool-2
      4 emitted on RxComputationThreadPool-2
      6 emitted on RxComputationThreadPool-2
      8 emitted on RxComputationThreadPool-2
      10 emitted on RxComputationThreadPool-2



      Either that behavior is no longer the case, or I confused RxPy's implementation (which did have the aforementioned behavior) with RxJava's.

      On a side note, looking deeper into the JavaDocs, it looks indeed like groupBy() caches items until subscription and this has been the case in RxJava 1.X.

      Delete
  12. I really happy found this website eventually. Really informative and inoperative, Thanks for the post and effort! Please keep sharing more such blog norton.com/setup

    www.norton.com/setup

    ReplyDelete
  13. This is really great work , Thank you for sharing such a useful information here in the blog. I have something to share here mcafee.com/activate
    www.mcafee.com/activate

    ReplyDelete
  14. Nice blog, it's so knowledgeable, informative, and good-looking site. assignment is a great platform that has been performing astonishingly well www.avg.com/retail

    avg.com/retail

    ReplyDelete
  15. Dear Admin, you have shared very informative information with us through your article. this information is very useful for me and you are doing amazing work keep it up. I want also to share something with you. Thank you so much for sharing this and I wish you all the best for the upcoming article. norton.com/setup
    www.norton.com/setup, Norton product key

    ReplyDelete
  16. I admit, I have a tremendous sex drive. My boyfriend lives forty miles away. Hey, i am looking for an online sexual partner ;) Click on my boobs if you are interested (. )( .)

    ReplyDelete
  17. Thank you for sharing in this article
    I can learn a lot and could also be a reference
    I am happy to find your website and can join to comment
    I think is very valuable to be able to read your writing, and on this occasion will I use for my reference source
    Thank you so much for sharing, I hope you continue to write spirit next topic.
    https://www.gamestopbalance.com/
    Gamestop Card Balance Gamestop Gift Card Balance Check Gamestop Check Balance Gamestop Gift Card Balance Inquiry Check Gamestop Card Balance Gamestop Balance Check My Gamestop Gift Card Balance Gamestop Balance Check Check Gamestop Balance

    ReplyDelete