Banner

Saturday, October 31, 2015

Understanding Observables

When I first started learning RxJava, I found it very difficult to wrap my head around even its most basic concepts. The idea of a Subscriber's onNext(), onCompleted(), and onError() calls made sense in isolation, but how did they work in a practical reactive process? I thought the Subscriber was the final operation in the chain that consumed the emissions, so when I called Observable.create(), I thought I was calling on the Subscriber at the end of the chain, not the operators between them.

Of course this is not the case, but to a reactive newbie this is not so obvious. I developed a relatable analogy that helped me understand and explain what happens in a reactive chain of Observable operations. This is more or less how I think of Observable operations.

The onNext()

The Observable emits items. I like to think of emissions as "handing" items to an Observer by calling it's onNext() method, which accepts the item as an argument.

But Observable operators (like map(), flatMap(), etc) create Observables that are both an Observable and Observer.

Here is a relatable example. Take a moving crew (portrayed by my artistically crafted stick figures) passing boxes. A chain of Observable operations is much like a mover handing a box up a chain. Each time the box is pushed to the next person, the onNext() is called on the next guy to pass it.

You could say that the mover on the left is the "source" Observable, since he is the originator creating the emissions. He calls the middle guy's onNext() method to pass the box to him. The second guy in turn immediately calls the third guy's onNext() method, and so on. The final onNext() call will be on the final Subscriber. But the point is the second and third mover are observing the mover on their left. A mover reacts to a box getting pushed to him by turning around and pushing it to the next mover.

Here is a more thorough example showing these three movers each doing something more specific and can be represented in code.



Mover 1 on the far left is the source Observable. He creates the emissions by picking items out of the house. He then calls onNext() on Mover 2, who does a map() operation. When his onNext() is called he takes that Item and puts it in a Box. Then he calls onNext() on Mover 3. He is the final Subscriber who loads the box on the truck.

If we were to express these three movers as an Observable chain of operations, this is what it might look like.

Observable<Item> mover1 = Observable.create(s -> {
 while (house.hasItems()) {
  s.onNext(house.getItem());
 }
});

Observable<Box> mover2 = mover1.map(item -> putInBox(item));

Subscription mover3 = mover2.subscribe(box -> putInTruck(box));

We could also express this into a single chain of Observable operators without any intermediary variables.

Observable.create(s -> {
 while (house.hasItems()) {
  s.onNext(house.getItem());
 }
}).map(item -> putInBox(item)).subscribe(box -> putInTruck(box));

However, if there is a finite number of items in the house we need to implement onCompleted().

 

The onCompleted()

Observables can be infinite, which means they will never call onCompleted() and are expected to forever emit items. But in this example, it is unlikely there are infinite items in the house. When Mover 1 has removed all items from the house, he needs to tell Mover 2 they are done by calling his onCompleted() method. He will in turn tell Mover 3 and call his onCompleted() method. Mover 3 will then close the truck and shutdown. Each mover will clean up and leave in this process as well.


We would code the onCompleted() like this.

Observable<Item> mover1 = Observable.create(s -> {
 while (house.hasItems()) {
  s.onNext(house.getItem());
 }
  s.onCompleted();
});

Observable<Item> mover2 = mover1.map(item -> putInBox(item));

Subscription mover3 = mover2.subscribe(box -> putInTruck(box), 
   () -> closeTruck());

We need to have mover1 call onCompleted() since we created that source Observable from scratch, and that way it will communicate up the chain its completion event. The map() operation on mover2 created an Observable that already implements a call to onCompleted(). It just needs that notification from mover1 which we implemented. Since mover3 is the final Subscriber, we overload another lambda specifying to close the truck when onCompleted() is called.

The onError()

Errors can obviously happen in any program, but the reactive approach handles them a bit differently. The exception is emitted up the chain so the Subscriber can decide how to handle it. Of course there are operators to intercept the exception, and perform policies like resume next or retry a specified number of times. But generally you should have the Subscriber handle exceptions in some way.




In this case, the map() operation failed. Mover 2 accidentally broke the item while trying to put it in a box. He calls onError() on Mover 3 who happens to be the final Subscriber, and Mover 3 comes down with a broom to handle the error.

Coding-wise, we have to handle the source Observable and catch any errors to pass up the chain. The error in this example does not occur at the source however. It occurs in the middle of the chain in the map() operation, which already implements passing the error up the chain. We also overload the Subscriber with another lambda to handle the error.

Observable<Item> mover1 = Observable.create(s -> {
    try { 
        while (house.hasItems()) {
            s.onNext(house.getItem());
        }
        s.onCompleted();
    } catch (Exception e) {
        s.onError(e);
    }
});

Observable<Box> mover2 = mover1.map(item -> putInBox(item));

Subscription mover3 = mover2.subscribe(box -> putInTruck(box),
    () -> closeTruck(), e -> cleanupMess());


Conclusions

I hope this example helps any newbies who are trying to grasp the flow and structure of reactive programs. Please leave comments or suggestions and I'll respond as soon as I can.

11 comments:

  1. Cool Thomas! Love the animated pics! You could demo backpressure too where the guy in the truck says "2 more, pass it on" to the next guy. That way the movers are enabled to drive the full truck off and bring in another one without things piling up.

    ReplyDelete
    Replies
    1. Haha! I was just thinking about showing other operators. The RxMarbles are great, but they are still abstract and this provides something more relatable. This post was pretty well-received on Twitter so I might invest some time in creating more "RxMover" animations.

      Delete
  2. Cool Thomas! Love the animated pics! You could demo backpressure too where the guy in the truck says "2 more, pass it on" to the next guy. That way the movers are enabled to drive the full truck off and bring in another one without things piling up.

    ReplyDelete
  3. Very helpful. Can you show schedulers too please. For me thats the most complex ones to undsrstand::-)

    ReplyDelete
    Replies
    1. Adding notification to be alerted

      Delete
    2. Yeah I've been thinking about schedulers too. I have some ideas I'll need to sift through. I might create a dedicated blog for these models.

      Delete
    3. Just wrote an entry on schedulers. http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html

      Delete
    4. Thanks Thomas your examples are enlightening! Can't wait for other posts!

      Delete
  4. Excellent analogy. Thanks a ton Thomas

    ReplyDelete
  5. Hi admin, it is very nice blog, very informative blog so thank you so much for this information. But there is another website with same concept of free ad posting www.helpadya.com www.helpadya.com Classified Free Ads.

    ReplyDelete
  6. Extremely useful post, thank you very much for writing it!

    ReplyDelete