Banner

Monday, November 23, 2015

RxJava Operators - flatMap()

Last week I created an RxMovers scene showing the toList() operator. This week I will show how to take an Observable emitting List<T> objects, turn each List<T> into an Observable<T>, and flatten all these Observables into a single Observable<T> using a flatMap().

Say you have three RxMovers emptying boxes from a house and loading them into truck, just like our previous examples. But this time Mover 1, the source Observable, is pushing a dolly of items out of the house. He is an Observable<List<Box>> emitting List<Box> objects. 

Mover 2 will act as a flatMap() taking each dolly of boxes and passing each box individually to Mover 3, the Subscriber.



Mover 1 pushes two dollies of boxes before he calls onCompleted(). On each emission, Mover 2 receives the dolly and passes the boxes individually. Effectively, both dollies were consolidated into a single stream of boxes which were received by Mover 3.

This is what it would look like in RxJava.

Observable<List<Box>> mover1 = Observable.create(s -> {
 while (!s.isUnsubscribed() && house.hasBoxes()) {
  s.onNext(house.loadBoxesToDolly()); //pass a full dolly of boxes
 }
 s.onCompleted();
});

Observable<Box> mover2 = mover1.flatMap(list -> Observable.from(list));

Subscription mover3 = mover2.subscribe(box ->
    putInTruck(box));

Of course, each List<Box> has to be converted into an Observable<Box>, and you accomplish that with the Observable.from() static factory method. This also means that flatMap() can map each emitted item (of any type) to any Observable and not just ones backed by a List.

The flatMap() is powerful because it transforms an emission into another set of emissions, and these multiple sets of emissions get consolidated into a single stream. flatMap() is arguably one of the most critical operators in RxJava because it contributes so much utility. It is difficult to show every application of flatMap() here, but definitely spend some time playing with. The concept of taking each emitted item from an Observable and mapping it to another Observable is a powerful concept.

9 comments:

  1. Nice Tom! One hitch I haven't got a convenient solution for though is that we don't want people using Observable.create like this because of backpressure concerns. Any operator might error if delivered an item that it didn't request explicitly. Makes things tricky when trying to explain things simply.

    ReplyDelete
    Replies
    1. Yeah I've heard you and one other person in the Rx community say that. I can see some situations where Observable.create() can be abused. But if the source is strictly cold and no hot Observables are introduced, this should be fairly safe from backpressure right? Because a cold Observable fulfills an emission at the Subscriber's request. I can see why hot emissions could cause major issues with backpressure though.

      Delete
    2. This comment has been removed by the author.

      Delete
  2. This comment has been removed by a blog administrator.

    ReplyDelete
  3. Shouldn't there at least be an isUnsubscribed check in the while loop?

    ReplyDelete
    Replies
    1. Technically yes, you are correct. I didn't want to clutter the concept with boilerplate code but now that I think about it I might include that later.

      Delete
  4. Really good explanation ! Keep up the great work :)

    ReplyDelete
  5. This comment has been removed by a blog administrator.

    ReplyDelete