Banner

Monday, November 23, 2015

RxJava Operators - flatMap()

Last week I created an RxMovers scene showing the toList() operator. This week I will show how to take an Observable emitting List<T> objects, turn each List<T> into an Observable<T>, and flatten all these Observables into a single Observable<T> using a flatMap().

Say you have three RxMovers emptying boxes from a house and loading them into truck, just like our previous examples. But this time Mover 1, the source Observable, is pushing a dolly of items out of the house. He is an Observable<List<Box>> emitting List<Box> objects. 

Mover 2 will act as a flatMap() taking each dolly of boxes and passing each box individually to Mover 3, the Subscriber.



Mover 1 pushes two dollies of boxes before he calls onCompleted(). On each emission, Mover 2 receives the dolly and passes the boxes individually. Effectively, both dollies were consolidated into a single stream of boxes which were received by Mover 3.

This is what it would look like in RxJava.

Observable<List<Box>> mover1 = Observable.create(s -> {
 while (!s.isUnsubscribed() && house.hasBoxes()) {
  s.onNext(house.loadBoxesToDolly()); //pass a full dolly of boxes
 }
 s.onCompleted();
});

Observable<Box> mover2 = mover1.flatMap(list -> Observable.from(list));

Subscription mover3 = mover2.subscribe(box ->
    putInTruck(box));

Of course, each List<Box> has to be converted into an Observable<Box>, and you accomplish that with the Observable.from() static factory method. This also means that flatMap() can map each emitted item (of any type) to any Observable and not just ones backed by a List.

The flatMap() is powerful because it transforms an emission into another set of emissions, and these multiple sets of emissions get consolidated into a single stream. flatMap() is arguably one of the most critical operators in RxJava because it contributes so much utility. It is difficult to show every application of flatMap() here, but definitely spend some time playing with. The concept of taking each emitted item from an Observable and mapping it to another Observable is a powerful concept.

Wednesday, November 18, 2015

RxJava Operators - toList()

If you read my earlier post Understanding Observables, you already know that I like to think of Observable operations as stick figures passing boxes. The analogy made sense to me and it seemed to help others too. As I get more efficient at cheap animation, I'll see if I can cover other operators with these "RxMovers". I even have some ideas to visualize subscribeOn() and observeOn() with these guys.

The toList() operation reveals quite a bit about Observable behavior and how onNext() and onCompleted() calls can be manipulated. In my previous article that covered map(), the onNext() calls happen in sequence down the entire chain and get passed all the way to the final Subscriber.

The toList() operator intercepts onNext() calls. Rather than handing the items further down the chain, it catches them instead and adds them to a List (specifically an ArrayList). Only when the source Observable calls onCompleted() will it emit the entire List up the chain, then it will call onCompleted() up the chain.


The toList() in Action




The scene starts by Mover 1 taking boxes out of storage and pushing them to Mover 2, who represents the toList() operation. Instead of passing the boxes immediately to Mover 3, he instead collects them onto a dolly and does not pass anything to Mover 3. When Mover 1 has passed all items from the storage unit (which is three items), he calls onCompleted() on Mover 2 and says they are done.

However, Mover 2 does not pass the onCompleted() call to Mover 3 yet. This is his cue to push the dolly of items to Mover 3. He calls Mover 3's onNext() method and passes the entire List<Box> of items to him. Then Mover 2 calls onCompleted() which signals Mover 3 to close the truck.

This is how it may look in RxJava.

Observable<Box> mover1 = Observable.create(s -> {
 while (house.hasItems()) {
  s.onNext(house.getItem());
 }
 s.onCompleted();
});

Observable<List<Box>> mover2 = mover1.toList();

Subscription mover3 = mover2.subscribe(listOfBoxes -> 
    putInTruck(listOfBoxes));


The importance of onCompleted()

As you can probably guess, the onCompleted() call is very important with an operation like toList(). Logically, you cannot collect infinite emissions from an infinite Observable into a List, because a List is a finite collection of items. Therefore, if an onCompleted() call is never passed to a toList() operation, you will get some very undesired behaviors. These behaviors can range from infinite blocking because toList() infinitely keeps waiting for items, or it gives up and does not emit anything at all.

A few other operators are dependent on the onCompleted() call, such as last() and count(). So always be mindful and ensure the sequences are finite. Let me know if you have any questions.

Be sure to read my latest RxMover post on flatMap().

Friday, November 6, 2015

RxJava- Flatmap Filtering

After doing RxJava for several months, I have found the key to making it extra useful is not just knowing the operators, but knowing how to combine them together. The built-in operators can be cleverly composed together with other operators to create new operations, like flatMap() and subscribeOn() to achieve parallelization.

When I started mixing object-oriented and reactive programming together, I came across some interesting problems and design decisions. If I have a class that contained one or more Observable properties, I was not quite sure how to filter on these properties.

For instance, let's say you have a type called Transaction. It has four properties that you typically might expect on a bank transaction object.
public interface Transaction {
    int getID();
    LocalDate getDate();
    BigDecimal getAmount();
    Observable<BigDecimal> getBalance();
}
Suppose we have an Observable streaming these Transaction objects from some IO source or a reactive JDBC library. If wanted to filter these emissions to just today's date, that is easy enough using the filter() operator. It only passes emissions that meet the boolean condition specified by a lambda.
Observable<Transaction> transactions = importFromDb();

Observable<Transaction> todaysTransactions = transactions
    .filter(t -> t.getDate().equals(LocalDate.now()));

todaysTransactions.subscribe(System.out::println);
Easy enough, right? But now what if we wanted to filter on the Observable property getBalance()? This property will dynamically push a BigDecimal value reflecting the bank balance after that transaction took place. Say we want to filter the stream of transactions to where the balance is less than $100. The problem is we can't use the filter operator thanks to the monad. Try it. You will never be able to compile because you will always return an Observable, not a boolean which the filter() operator expects.
Observable<Transaction> transactions = importFromDb();

Observable<Transaction> underHundred = transactions
    .filter(t -> t.getBalance()
    .map(d -> d.compareTo(BigDecimal.valueOf(100)) < 0)); //argh!!! 
                
underHundred.subscribe(System.out::println);
The problem is we are passing the filter() operator an Observable<Boolean>, not a boolean which it needs. We could call toBlocking() and extract that boolean value but that will break the monad (which is bad).

So how do we filter on this Observable property? I learned through experimentation the flatMap() has a lot of tricks up its sleeves, and we can use it filter on an Observable property on a stream of objects.
Observable transactions = importFromDb();

Observable<Transaction> underHundred = transactions
    .flatMap(t -> t.getBalance()
        .filter(b -> b.compareTo(BigDecimal.valueOf(100)) < 0)
        .map(b -> t)
    );

underHundred.subscribe(System.out::println);
Study the code above very closely and notice everything that happened inside that flatMap(). For each Transaction, we called the getBalance() which emits the balance. But we immediately filtered  where the balance is less than 100. Any balance emissions that pass this condition are then mapped back to the original Transaction item they came from using .map(b -> t).  This Transaction is emitted out of the flatMap(), but Transactions that failed to meet the criteria will come out as empty Observables.

This is how you can filter on Observable properties. Of course, the above example assumes the getBalance() method returns a single emission followed by an onCompleted() call. You might encounter scenarios where you need to filter on a property that emits multiple values, and this can be very useful if that is your intention. You can emit multiple conditions from a property, and those that pass will emit the item through while the conditions that fail do not.

Let me know if you have any questions and make it a great day!

Thursday, November 5, 2015

RxJava- Achieving Parallelization

RxJava is often misunderstood when it comes to the asynchronous/multithreaded aspects of it. The coding of multithreaded operations is simple, but understanding the abstraction is another thing.

A common question about RxJava is how to achieve parallelization, or emitting multiple items concurrently from an Observable. Of course, this definition breaks the Observable Contract which states that onNext() must be called sequentially and never concurrently by more than one thread at a time. Now you may be thinking "how the heck am I supposed to achieve parallelization then, which by definition is multiple items getting processed at a time!?" Widespread misunderstanding of how parallel actually works in RxJava has even led to the discontinuation of the parallel operator.

You can achieve parallelization in RxJava without breaking the Observable contract, but it requires a little understanding of Schedulers and how operators deal with multiple asynchronous sources.

Say you have this simple Observable that emits a range of Integers 1 to 10 upon subscription.
Observable<Integer> vals = Observable.range(1,10);

vals.subscribe(val -> System.out.println(val);)

But let's say we are doing some very intense calculation process to each Integer value.
Observable<Integer> vals = Observable.range(1,10);
vals.map(i -> intenseCalculation(i))
.subscribe(val -> System.out.println(val);)

And for simplicity's sake let's just make intenseCalculation() sleep for a random interval before returning the integer back, simulating an intensive calculation. We will also make it print the current thread the computation is occurring on.
public static int intenseCalculation(int i) {
    try {
        System.out.println("Calculating " + i + 
             " on " + Thread.currentThread().getName());
        Thread.sleep(randInt(1000,5000));
        return i;
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

We would want to parallelize and process multiple integers at a time. A common mistake people make is think "Oh, I just use subscribeOn() and have the source Observable emit items on multiple computation threads just like an ExecutorService.
Observable<Integer> vals = Observable.range(1,10);

vals.subscribeOn(Schedulers.computation())
          .map(i -> intenseCalculation(i))
          .subscribe(val -> System.out.println("Subscriber received "
                  + val + " on "
                  + Thread.currentThread().getName()));

However, if you run this code you will get this output, showing that each integer only emitted on one computation thread, not several as you might have expected. As a matter of fact, this is as sequential as a single-threaded operation.
Calculating 1 on RxComputationThreadPool-1
Subscriber received 1 on RxComputationThreadPool-1
Calculating 2 on RxComputationThreadPool-1
Subscriber received 2 on RxComputationThreadPool-1
Calculating 3 on RxComputationThreadPool-1
Subscriber received 3 on RxComputationThreadPool-1
Calculating 4 on RxComputationThreadPool-1
Subscriber received 4 on RxComputationThreadPool-1
Calculating 5 on RxComputationThreadPool-1
Subscriber received 5 on RxComputationThreadPool-1
Calculating 6 on RxComputationThreadPool-1
Subscriber received 6 on RxComputationThreadPool-1
Calculating 7 on RxComputationThreadPool-1
Subscriber received 7 on RxComputationThreadPool-1
Calculating 8 on RxComputationThreadPool-1
Subscriber received 8 on RxComputationThreadPool-1
Calculating 9 on RxComputationThreadPool-1
Subscriber received 9 on RxComputationThreadPool-1alculating 10 on RxComputationThreadPool-1
Subscriber received 10 on RxComputationThreadPool-1

Well that was not helpful! We did not achieve any effective parallelization at all. We just directed the emissions to happen on another thread named RxComputationThreadPool-1

So how do we make calculations happen on more than one computation thread? And do it without breaking the Observable contract? The secret is to catch each Integer in a flatMap(), create an Observable off it, do a subscribeOn() to the computation scheduler, and then perform the process all within the flatMap(). 
Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
            .subscribeOn(Schedulers.computation())
            .map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
Now we are getting somewhere and this looks pretty parallel now. We are getting multiple emissions happening on different computational threads.
Calculating 1 on RxComputationThreadPool-3
Calculating 4 on RxComputationThreadPool-2
Calculating 3 on RxComputationThreadPool-1
Calculating 2 on RxComputationThreadPool-4
Subscriber received 3 on RxComputationThreadPool-1
Calculating 7 on RxComputationThreadPool-1
Subscriber received 4 on RxComputationThreadPool-2
Calculating 8 on RxComputationThreadPool-2
Subscriber received 2 on RxComputationThreadPool-4
Calculating 6 on RxComputationThreadPool-4
Subscriber received 8 on RxComputationThreadPool-2
Subscriber received 6 on RxComputationThreadPool-4
Calculating 10 on RxComputationThreadPool-4
Subscriber received 7 on RxComputationThreadPool-1
Subscriber received 10 on RxComputationThreadPool-4
Subscriber received 1 on RxComputationThreadPool-3
Calculating 5 on RxComputationThreadPool-3
Subscriber received 5 on RxComputationThreadPool-3
Calculating 9 on RxComputationThreadPool-3
Subscriber received 9 on RxComputationThreadPool-3

RX COMPUTATION THREAD ---------------------------1----
RX COMPUTATION THREAD -------------------------3------
RX COMPUTATION THREAD -----------------------------2--
RX COMPUTATION THREAD ---------------------------4----
MAIN THREAD           ------7---6--5------------------
But how is this not breaking the Observable contract you ask? Remember that you cannot have concurrent onNext() calls on the same Observable. We have created an independent Observable off each integer value and scheduled them on separate computational threads, making their concurrency legitimate.

Now you may also be asking "Well... why is the Subscriber receiving emissions from multiple threads then? That sounds an awful lot like concurrent onNext() calls are happening and that breaks the contract."

Actually, there are no concurrent onNext() calls happening. The flatMap() has to merge emissions from multiple Observables happening on multiple threads, but it cannot allow concurrent onNext() calls to happen down the chain including the Subscriber. It will not block and synchronize either because that would undermine the benefits of RxJava. Instead of blocking, it will re-use the thread currently emitting something out of the flatMap(). If other threads want to emit items while another thread is emitting out the flatMap(), the other threads will leave their emissions for the occupying thread to take ownership of.

Here is proof as the above example makes this not so obvious. Let's collect the emissions into a toList() before they go to the Subscriber.

Observable<Integer>vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
               .subscribeOn(Schedulers.computation())
               .map(i -> intenseCalculation(i))
     ).toList()
     .subscribe(val -> System.out.println("Subscriber received "
                + val + " on "
                + Thread.currentThread().getName()));

Observe the output below.
Calculating 4 on RxComputationThreadPool-2
Calculating 1 on RxComputationThreadPool-3
Calculating 2 on RxComputationThreadPool-4
Calculating 3 on RxComputationThreadPool-1
Calculating 7 on RxComputationThreadPool-1
Calculating 6 on RxComputationThreadPool-4
Calculating 5 on RxComputationThreadPool-3
Calculating 10 on RxComputationThreadPool-4
Calculating 8 on RxComputationThreadPool-2
Calculating 9 on RxComputationThreadPool-3
Subscriber received [3, 2, 1, 6, 4, 7, 10, 5, 8, 9] on RxComputationThreadPool-3

Notice that the calculations were all happening on different threads, and the flatMap() was pushing items out on one of these threads at a given time. But the last thread to do an emission was RxComputationThreadPool-3. This happened to be the thread pushing items out of flatMap() when it needed to emit the final value 9, so it called onCompleted() which pushed out the list all the way to the Subscriber.

Let me know if you have any questions. Here is the full working example.


import rx.Observable;
import rx.schedulers.Schedulers;

import java.util.Random;

public final class ParallelTest {

    private static final Random rand = new Random();

 public static void main(String[] args) {

        Observable<Integer> vals = Observable.range(1,10);

        vals.flatMap(val -> Observable.just(val)
                    .subscribeOn(Schedulers.computation())
                    .map(i -> intenseCalculation(i))
        ).subscribe(val -> System.out.println("Subscriber received "
                + val + " on "
                + Thread.currentThread().getName()));

        waitSleep();
    }
    public static void waitSleep() {
        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static int intenseCalculation(int i) {
        try {
            System.out.println("Calculating " + i + " on " + Thread.currentThread().getName());
            Thread.sleep(randInt(1000,5000));
            return i;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static int randInt(int min, int max) {
        return rand.nextInt((max - min) + 1) + min;
    }
}

About Fixing Thread Pools
Please note as David mentioned below that this parallelization strategy does not work with all Schedulers. Some Schedulers other than computation() could flood the system with too many threads and therefore cause poor performance. The computation scheduler limits the number of concurrent processes that can happen inside the flatMap() based on the number of CPU's your machine has. If you wanted to use other schedulers like newThread() and io() which do not limit the number of threads, you can pass a second int argument to the flatMap() limiting the number of concurrent processes.

You can also create a Scheduler off an ExecutorService giving you more fine-tuned control if needed.  Actually you can get significantly better performance doing this. You can read about it in my other article on Maximizing Parallelization.