Banner

Saturday, February 13, 2016

RxJava- Understanding observeOn() and subscribeOn()


RxJava makes multithreading easier, but the simplified abstraction more or less forces a veteran developer to re-learn multithreading the "RxJava way". Beginners still have to learn multithreading which is not a trivial topic, but RxJava makes it much more accessible.
A few people have asked if I could cover observeOn() and subscribeOn() in a similar manner that I have covered parallelization. Let's take a hand-on approach to understand observeOn() and subscribeOn(). But first here is a little theory for newcomers to multithreading.

Concurrency and Multithreading in a Nutshell

If you have sizable experience with Java concurrency, please skip this section. Otherwise read on!
If you have never done multithreading/concurrency before, the idea is essentially multitasking. Think of a thread as a cursor executing one line of code at a time, which you can visibly see when you are using breakpoints in debug mode with Eclipse or IDEA. As you step through your code, each statement is executed top-to-bottom. In effect, you slowed down a thread and are now in control of it. You only have one thread traversing your code and executing each statement.

But when you multithread, you can have two or more threads (cursors) traversing your code and executing statements. For instance, you can have three threads doing three different tasks. Two threads could be importing two different sets of data simultaneously, while the third thread is asking the user for login credentials. These three tasks are being executing at the same time, and this is much better than having the user wait for each data set to be loaded before being prompted with a login.

However, when these three threads enter the same object and manipulate its properties, there is a high risk of the cursors overlapping. They can start to race each other and compete chaotically to evaluate and change the object's properties. That is why immutability should be your default policy with properties, and when properties have to be mutable you use synchronization. While you should always strive for immutability, RxJava greatly reduces the likelihood of race conditions and other multithreading problems. Problems will only likely happen when you create side effects manipulating objects outside the Observable chain.

Another common use of multithreading is parallelization. Say you have 10,000 objects and you need to perform an expensive calculation on each one. Instead of iterating them and executing the process one at a time, you can process 5 at at time by passing them to 5 worker threads. This could make the process up to 5 times faster.

Think of this as a checkout line where having 5 cashiers is better than 1, because you can process a high volume of customers faster. But of course, like threads, cashiers are expensive. If you have 30 customers to process, it is probably not practical to have 30 cashiers due to operational constraints. It is better to have 5 cashiers and "re-use" them after they process each customer, and they can take another customer waiting in the queue. This is effectively what a thread pool does. It internally maintains a set of threads and will queue tasks to delegate to them.

Most computers nowadays have multiple "cores", or processors built into the CPU. If you have a quad-core, you can optimally support 5 computational threads (4 cores + 1 extra thread for idle time). If you have 8 cores, you can optimally support 9 threads, and so on. If you exceed this simple rough formula (e.g. running 6 threads or more on a 4-core machine) you risk compromising performance. But not every task is computational. Importing and exporting data (called IO tasks) is much less taxing on the CPU. You could theoretically have 10 threads on a 4-core machine without issue if they all are simply importing/exporting data.

While RxJava takes a lot of the pain out of concurrency and multithreading, I highly recommend knowing how to use concurrency without RxJava, just so you are aware of the "gotchas" multithreading can manifest. Benjamin Winterberg created an awesome tutorial on Java 8 concurrency which I recommend reading. If you want some deep knowledge on concurrency, check out Java: Concurrency in Practice by Brian Goetz.

subscribeOn()

Before we bring concurrency into the discussion, think long and hard how an Observable chain of operators actually works. You have to have a source Observable where the emissions originate from. Only one emission at a time can be pushed up the entire chain of operators to the Subscriber. By default, the thread that declares the subscription (the subscribe() method) is the thread that pushes these emissions from the source, one a a time, all the way to the Subscriber.


For example, take this simple Observable operation that emits three String objects and maps their lengths.

public static void main(String[] args) {

    Observable<String> source = Observable.just("Alpha","Beta","Gamma");

    Observable<Integer> lengths = source.map(String::length);

    lengths.subscribe(l -> System.out.println("Received " + l + 
        " on thread " + Thread.currentThread().getName()));
}

The subscribe() operation on the second line will receive the emissions and print them. By default, the thread that declares the subscribe() is the one that pushes items from the source all the way up the chain to the Subscriber. If you run this program you will see the following output, indicating this Observable emitted items on the main thread.

Received 5 on thread main
Received 4 on thread main
Received 5 on thread main

his means the main thread (the thread which started the program) executed the emissions of this Observable, and it pushed each emission through the map() operator to the Subscriber. Since the main thread becomes occupied with pushing the emissions, the program will not exit until the Observable is done pushing emissions and calls onCompleted().

Let's say we wanted to subscribe to this Observable but we do not want to do it on the current main thread. Pretend calculating the lengths takes awhile. Perhaps we would like to kick off the calculations but not hold up the main thread. That main thread has places to go, things to do, and needs to kick off other tasks. We can do that with the subscribeOn() and specify a Scheduler. This will emit the items from the source on a different thread.

If our task is computational, we should use Schedulers.computation(). This will allocate one of a few fixed number of threads to this Observable operation, and the source will emit items on that thread.

 public static void main(String[] args) {

    Observable<String> source = Observable.just("Alpha", "Beta", "Gamma");

    Observable<Integer> lengths = source
            .subscribeOn(Schedulers.computation())
            .map(String::length);

    lengths.subscribe(sum -> System.out.println("Received " + sum +
            " on thread " + Thread.currentThread().getName()));
}

But you may run into a problem and not get any output. Why? With our simple program, the main thread passed off the execution of our Observable chain to a computation thread. The main thread reached the end of the main() method and exited the program, before the computation thread got a chance to emit any items!

You will not likely encounter this with real programs that are kept alive for a session, but for our example we need to keep our main thread alive long enough to see the subscription work. Just make it sleep for three seconds and that should give plenty of time to subscribe and execute the emissions.

public static void main(String[] args) {

    Observable<String> source = Observable.just("Alpha", "Beta", "Gamma");

    Observable<Integer> lengths = source
            .subscribeOn(Schedulers.computation())
            .map(String::length);

    sumLength.subscribe(sum -> System.out.println("Received " + sum +
            " on thread " + Thread.currentThread().getName()));

    sleep(3000);
}

private static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Your output should now look like this.

Received 5 on thread RxComputationThreadPool-1
Received 4 on thread RxComputationThreadPool-1
Received 5 on thread RxComputationThreadPool-1

Note that the emissions happened on a computation thread, or more specifically a thread named RxComputationThreadPool-1. This thread emitted all these items. A common misconception is that multiple threads will automatically parallelize your emissions, but this is not true as it would break the Observable contract. You can only direct emissions of an Observable from one single thread to another single thread. Parallelization is only possible when you create separate Observables as shown here.

It does not matter where in your Observable chain to put the subscribeOn(). No matter where you put it, it will tell the source Observable which thread to emit items on. If you specify multiple subscribeOn() operators, the one closes to the source (the left-most), will be the one used. As a matter of fact, a few source Observable factories, like Observable.interval(), will already specify a subscribeOn() internally. Observable.interval() will already emit on the computation scheduler, and any subscribeOn() you specify on it will do nothing.

In summary, subscribeOn() instructs the source Observable which thread to emit items on, and this thread will push items all the way to the Subscriber. However, if it encounters an observeOn() somewhere in the chain (discussed shortly), it will then pass emissions to another thread for the remaining operations at that point.

Choosing a Scheduler

There are several other Schedulers such as Schedulers.io(), which is optimal for IO-related tasks (and it caches and re-uses threads to increase efficiency). Then there is Schedulers.newThread() which simply creates a new thread for each subscription. You have to be careful with both of these because in theory they could create an unlimited number of threads (this can cause bad performance). For computational tasks, you should use Schedulers.computation() so the number of threads are limited based on the number of cores your machine has.

You can also use Schedulers.from() to specify your own Executor. Especially for parallization, I found this approach to have better performance.

observeOn()

It is helpful to instruct a source Observable which Scheduler to use via subscribeOn(), and the source Observable will emit items on one of that Scheduler's threads. However, it is often helpful in the middle of an Observable chain to switch to another Scheduler. For example, you may press a button on a UI and it kicks off work on a computation thread, which frees up the UI thread so the UI does not freeze. But when the computation is done, it needs to be displayed back on the UI. Oftentimes, when you working with UI technologies like JavaFX, Swing, or Android, you have to update the UI on the Event Dispatch Thread.

Take this example. We emit the numbers 1 through 10 and do some simple multiplication to them. By default the emissions happen on the main thread since we do not specify a subscribeOn(). But before the map(i -> i * 10) operation we switch the emissions over to a computation thread.

 public static void main(String[] args) {

    Observable<Integer> source = Observable.range(1,10);

    source.map(i -> i * 100)
            .doOnNext(i -> System.out.println("Emitting " + i
                    + " on thread " + Thread.currentThread().getName()))
            .observeOn(Schedulers.computation())
            .map(i -> i * 10)
            .subscribe(i -> System.out.println("Received " + i + " on thread "
                    + Thread.currentThread().getName()));

    sleep(3000);
}

private static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
If you run this code you should get this output.

Emitting 100 on thread main
Emitting 200 on thread main
Emitting 300 on thread main
Emitting 400 on thread main
Emitting 500 on thread main
Emitting 600 on thread main
Emitting 700 on thread main
Emitting 800 on thread main
Emitting 900 on thread main
Emitting 1000 on thread main
Received 1000 on thread RxComputationThreadPool-3
Received 2000 on thread RxComputationThreadPool-3
Received 3000 on thread RxComputationThreadPool-3
Received 4000 on thread RxComputationThreadPool-3
Received 5000 on thread RxComputationThreadPool-3
Received 6000 on thread RxComputationThreadPool-3
Received 7000 on thread RxComputationThreadPool-3
Received 8000 on thread RxComputationThreadPool-3
Received 9000 on thread RxComputationThreadPool-3
Received 10000 on thread RxComputationThreadPool-3

You will see the emissions initially occurred on the main thread and were pushed on that thread all the way to the first map(). But after that the observeOn() redirected the emissions to a computation thread, which pushed the emissions to the second map() and the final Subscriber.
Still not clear? Let's look at this visually. No matter what Scheduler you are subscribed on, only one emission is allowed to travel up the Observable chain of operators at a time. Below, you can observe that the emission must be pushed all the way from the source to the Subscriber before the next emission can start.


Let's say we wanted to switch to another thread after Operator 2. Perhaps we finished calculating something and now we want to update the UI on the UI thread. Or maybe we finished importing a data set on the io() Scheduler and now we want to do computations on the computation() Scheduler.

You can do this with the observeOn() operator as shown below. Notice how the bottom stream passed an emission to the top stream, allowing the bottom stream to start the next emission without waiting for the current one to reach the Subscriber.


The bottom stream represents a stream of operators on one Scheduler, and the top one represents another. Once an emission is passed from the bottom stream to the top one, the bottom stream is no longer concerned with it. It is now the top stream's responsibility to get that emission to the Subscriber.

From what I understand, one problem that may arise with observeOn() is the bottom stream can produce emissions faster than the top stream can process them. This can cause issues with backpressure you may have to consider. I'm definitely not an authority on backpressure but I've been burned enough to be wary of it.

Effectively, you can only use one subscribeOn(), but you can have any number of observeOn() operators. You can switch emissions from one thread to another with ease using observeOn(). But do not use it everywhere for the sake of. Only use it when you find a calculation is intense enough that it needs to be offloaded to another thread.

For UI technologies, there are a couple of libraries that bridge RxJava with a UI Scheduler. For example, there is RxJavaFX which has a Scheduler that puts emissions on the JavaFX Platform thread. There is also the RxJava Android Module which has Schedulers for Android. There is even RxSwing for those of us stuck with legacy Swing applications. These are very helpful to use in conjunction with observeOn() when working with user interfaces.

Let me know if you have any questions or comments. Be sure to read about parallelization in my other article as well.

38 comments:

  1. Wait, if I understand this correctly, backpressure ONLY manifests if you switch between threads?

    ReplyDelete
    Replies
    1. From what I understand, backpressure issues can manifest if you bring in any kind of multithreading. More specifically, if you have hot Observables that emit at their own pace rather than the Subscriber's, that can cause backpressure issues especially for operators like zip() and observeOn(), or from my experience UI Subscribers that can't update the UI fast enough. You can read up on backpressure here.
      https://github.com/ReactiveX/RxJava/wiki/Backpressure

      This is why you really have to know what you are doing when building your own Operators.
      http://akarnokd.blogspot.com/2015/05/pitfalls-of-operator-implementations.html

      Delete
  2. Great article, thank you!!! For Rx newbies like me it would be nice if you could also cover the case when both observerOn() and subscribeOn() are used on single stream, whether there are some gotchas to be aware of or so...

    ReplyDelete
  3. Glad you found it helpful. I'll see if I can add a case of using both but there really is not much to be said. It did cross my mind to emphasize how the two interact (or lack thereof). The subscribeOn() and observeOn() are somewhat mutually exclusive. The former instructs a Scheduler up the stream towards the source and the latter down the stream towards the Subscriber. The two should stay out of each others business concurrency -wise. The observeOn() will leave all operators left of it to the subscribeOn(), but it owns the scheduling of all operators to its right (except if another observeOn() exists which owns everything to its right). Experiment like I did above and you may get some moments of epiphany and revelation like I did.

    ReplyDelete
  4. Nice article!
    I would add one thing: subscribeOn affects only the thread where OnSubscribe.call() method is called. For example:

    Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber observer) {
    //....
    }
    } )

    onSubscribe() allows force call() method to be called on a specific thread. onNext() is usually called on the same thread as call() but there is one case when it can be different. If you use subject, then you can call it's onNext() on a different thread explicitly. This is dangerous specifically on Android. There are many Rx libraries that are wrapping existing APIs using subjects under the hood and calling onNext() in some callbacks from the system which usually are on a main or completely different thread.
    So to summarize: subscribeOn() guarantees only that call() method will be called on a specific thread.

    ReplyDelete
    Replies
    1. Thanks for highlighting that. These are definitely reasons why Subjects can be abused so easily. One thing you can do to remedy this particular problem is call toSerialized() on any Subject to make it a SerializedSubject. But it is definitely ideal to avoid Subjects as much as possible. I think Erik Meijer called Subjects "the mutable variables of Rx".

      Delete
    2. Actually to use both operators seems to be well-used approach to "do something in the background and handle result in the main thread" or vice-versa:
      http://blog.danlew.net/2015/03/02/dont-break-the-chain/
      http://code.hootsuite.com/asynchronous-android-programming-the-good-the-bad-and-the-ugly/

      I wonder whether there is an option to get same behavior with observeOn() operator only?

      Delete
    3. For sure, just use two observeOn()'s.

      Observable source = ...

      source.observeOn(Schedulers.computation())
      .map(s -> s.length())
      .observeOn(JavaFxScheduler.getInstance())
      .subscribe(System.out::println);

      The only possible disadvantage is the source will not be impacted by either observeOn() and still emit on the default thread.

      Delete
  5. One of the clearest and most complete articles I found on this topic. Thanks Thomas!

    ReplyDelete
  6. Could you explain why do you get non-interleaved sequence of "Emitting/Received" messages in the output?

    Considering that there're different threads which process doOnNext and map I expected to see emissions randomly from different threads but it seems like they were not processed results asynchronously but only after the emission of interval values ended.

    ReplyDelete
  7. For the observeOn() example, that is a good question. For this case, think it has to do with the sleep() on the Subscriber. The source emissions can quickly beat the observeOn() emissions it since the subscriber thread is not sleeping, but the observeOn() thread is.

    If there was not a sleep() in the Subscriber, it would interleave as you probably would expect.

    ReplyDelete
    Replies
    1. Аah, of course. I missed this sleep() call. Thanks for clarification.

      Delete
  8. Thomas, great article. Thanks!

    I'm still not sure how I should fix you last code so that observeOn() makes the observable go through different computation threads. I've modified it to be sure, and always ends up received on the same thread: http://pastebin.com/PinvzXUt

    Thanks!

    ReplyDelete
    Replies
    1. Could you add a sample code to get the behavior in that last animation?

      Delete
    2. You might want to read my other article on parallelization. http://tomstechnicalblog.blogspot.com/2015/11/rxjava-achieving-parallelization.html?m=1

      Delete
    3. Thomas, I did.

      Using subscribeOn you chose to emit the items on a thread-pool from the start.

      But I'm looking for a way to achieve what you mention in your second animation: items are emitted in a single thread, then sent to a thread-pool, allowing the 1st part of the sequence to continue handling new items.

      Delete
    4. I wasn't sure I was going to get a response from you, so I took the chance to ask on StackOverflow (even used your image, hope you don't mind!).

      http://stackoverflow.com/questions/38488451/how-can-observe-my-fast-source-on-thread-pool-queue

      Delete
  9. I implemented a case where one subscriber is slow with a buffer, while another one consumes whatever is thrown to it. That was using Scala and Akka Streams.

    I usually try to develop a RxJava version for comparison but I got stuck on this one. In Akka Streams, I can build a graph with one source that broadcasts on 2 channels, and have a slow sink and a fast sink consume from those channels. Each channel can independently apply buffering and throttling. In RxJava, there is the share operator for broadcasting but the buffering and throttling logic is not on the subscriber, but on the Observable. Thus I'm not sure how to apply buffering and throttling and not have both subscribers affected. observeOn seems promising but I need to think about it. Any suggestions?

    ReplyDelete
    Replies
    1. I'm not a hundred percent certain I follow. Why not use myObservable.publish().refCount().myThrottleOp() and have two Subcribers to it? Post a question on StackOverflow and I'll take a look.

      Delete
    2. Thanks. http://stackoverflow.com/questions/39281954/rxjava-how-to-implement-a-slow-consumer-with-buffer-and-a-fast-consumer-to-the

      Delete
  10. Hello Thomas. Link "http://tomstechnicalblog.blogspot.ru/2015/10/understanding-observable-emissions.html" is broken.

    ReplyDelete
    Replies
    1. Fixed. Thank you. This is the correct link http://tomstechnicalblog.blogspot.com/2015/10/understanding-observables.html

      Delete
  11. The only article, describing sunscribeOn and observeOn in detail. God bless you

    ReplyDelete
  12. Awesome article, specially the graphs and the part where switching is made. Thank you a ton for this. I am just starting to learn the basics of RxJava. One thing, I found in RxJava is that most of the libraries like network call, database has built in Observables and ask to simply subscribe to that. In your article you have mentioned that subscribeOn will define which type of thread that needs to be passed on the chain. Could you help me in one thing? Previously I had understood that subscribeOn means where to do the work by Observable. Meaning an Observable has to fetch a data from web and if we do subscribeOn() to main thread, it will do that task on main thread. I have only tested with simple Observable.create() and called the subscriber onNext() from it. Am I understanding it right or not? Thank you once again.

    ReplyDelete
    Replies
    1. Hey Thomas, this is what I found. subscribeOn() simply means, in which thread call() function is to be executed.

      Delete
    2. Have you tried using Observable.fromCallable()? If you did something like Observable.fromCallable(() -> getRequest()), that will do the request work on any scheduler specified by a subscribeOn().

      Delete
    3. Awesome. Thanks a lot for pointing me to the right direction.

      Delete
  13. Is there any cleaner way of waiting for all observables to complete other than guessing how long it will take and calling sleep? Isn't there any equivalent to threads "join" for waiting until all observables have completed? Thanks!

    ReplyDelete
    Replies
    1. You can use a CountDownLatch as shown here. This can be helpful for testing, but typically you do not want to wait on Observables in production. That undermines the point of reactive programming. https://gist.github.com/thomasnield/ac2398d2e0ea13a6b3416b10adafa3d6

      Delete
  14. Hello, nice article. I have a question: I see that you used sleep to "observe" what was happening. Do you think there is a way to actually "observeOn" the main thread? For example with RxAndroid you can say main().

    ReplyDelete
    Replies
    1. I haven't done much Android, but yes. I think you use RxAndroid to get that Scheduler for the main thread. https://github.com/ReactiveX/RxAndroid#observing-on-the-main-thread

      Delete
    2. And I actually used sleep() to keep the application alive and give the Observable a chance to fire.

      Delete

  15. احدث موديلات قطع الغيار متوفرة الان من خلال صيانة كاريير فى جميع فروعنا فى كافة المحافظات .. كما بامكانك الان التواصل مع خدمة عملاء مركز صيانة فيليبس لتحصل على اسرع خدمة صيانة منزلية مزودة باحدث التقنيات التكنولوجية التى تساعد على الحفاظ على سلامة اجهزتك الكهربائية
    تواصل معنا الان

    ReplyDelete
  16. مع اقوى فريق دعم فنى فى مركز خدمة صيانة يونيفرسال احصل على فحص دورى على كافة انواع الاجهزة الكهربائية من مختلف التوكيلات العالبمة فقط تواصل مع توكيل يونيفرسال فى جميع فروعنا وتمتع باقل اسعار خدمات الصيانة

    ReplyDelete
  17. احدث تقنيات تكنولوجيا الصيانة المستخدمة فقط من خلال مركز صيانة كريازى للاجهزة الكهربائية احصل على افضل خدمات الصيانة المنزلية من خلال تواصلك مع فريق خدمة عملاء صيانة كريازى تواصل معنا الان لتحصل على اقوى العروض وافضل الخدمات

    ReplyDelete