Banner

Friday, February 19, 2016

RxJava - Maximizing Parallelization

Earlier I posted an entry about achieving parallelization with RxJava. But I have learned a few things from experiments and the RxJava devs to maximize parallelization and get better performance. This post is going to focus on computation parallelization and not really IO. Computation is where you really have to constrain the number of threads you use, and roughly you get the best performance accounting for the number of cores your computer has.

Maximizing Computation with Executors

In my previous article on parallelization, I had a short conversation with David Karnok, the current maintainer of RxJava, in the comments section. I asked him how many threads the computation() Scheduler creates and utilizes. This was his response:
If [your computer has] 8 cores, this will only use 5 of them. If you have 4 cores, two groups will share the same core.
Thinking about this, I began to have many questions. I am not an expert on multithreading, but I read most of Java Concurrency in Practice by Brian Goetz a year or so ago. I recall in Chapter 8, Brian talked about sizing thread pools for computation-intensive tasks. Pragmatically, you can follow this formula to roughly get an optimal number of threads. It's not perfect, but its a simple rule to get well-balanced computation performance.
Nthreads = NCPU + 1
So if you have 4 cores, that means you should have 5 threads. If you have 8 cores, you should have 9 threads. That extra thread is to fill up idle CPU cycles when the other threads get short moments of idleness. You can get this formula dynamically using a simple Java expression.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;
I may be very naive and might be overlooking something. Or maybe the RxJava developers simply had different goals and wanted to keep the number of threads conservative. But I began to suspect Schedulers.computation() was not maximizing parallelization since it was not explicitly designed for it.
Observable.range(1,1000)
.flatMap(i -> Observable.just(i)
    .subscribeOn(Schedulers.computation())
    .map(i2 -> intenseCalculation(i2))
).subscribe(System.out::println)
I was running a process at work in a similar manner as shown above, and the process took 225 seconds to emit all the items and do an intensive calculation on each one in parallel.
I had a hunch and decided to try Goetz’s simple rough formula for calculation thread pool sizing. This means I need to create my own computation ExecutorService, which is basically a thread pool. Fortunately Schedulers has a factory to turn an Executor into a Scheduler easily.
We can create an ExecutorService with a fixed number of threads matching the optimization formula.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;

ExecutorService executor = Executors.newFixedThreadPool(threadCt);
Then we can call Schedulers.from() and pass this executor to it.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;

ExecutorService executor = Executors.newFixedThreadPool(threadCt);
Scheduler scheduler = Schedulers.from(executor);

Observable.range(1,1000)
.flatMap(i -> Observable.just(i)
    .subscribeOn(scheduler)
    .map(i2 -> intenseCalculation(i2))
).subscribe(System.out::println);
One thing we need to add: we need to shutdown() the ExecutorService when we are done with it so the threads are disposed and do not keep the application alive inadvertently. We can simply call this in a finallyDo() somewhere in the chain as shown below.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;

ExecutorService executor = Executors.newFixedThreadPool(threadCt);
Scheduler scheduler = Schedulers.from(executor);

Observable.range(1,1000)
.flatMap(i -> Observable.just(i)
    .subscribeOn(scheduler)
    .map(i2 -> intenseCalculation(i2))
).finallyDo(() -> executor.shutdown())
.subscribe(System.out::println);
Running my process at work on a 4-core machine (so it is running 5 threads), it ran in 92 seconds, a huge difference from 225 seconds. This makes my custom ExecutorService 2.44 times faster than the Schedulers.computation()
Based on what David Karnok said, it seems RxJava maintains a conservative number of computation() threads and/or core utilization. Concluding from my observations and experiments, if you need to do some intensive parallel computation work you might be better off creating your own Scheduler off an ExecutorService with an appropriately fixed pool size.

Round-Robin Parallelization

David Karnok posted a neat trick: round-robin parallelization. If you are flatMapping to already existing Observables, then this does not really apply. But take our example where we are creating a single-emission Observables for each value.
Observable.range(1,1000)
.flatMap(i -> Observable.just(i)
    .subscribeOn(Schedulers.computation()
    .map(i2 -> intenseCalculation(i2))
).subscribe(System.out::println)
That Observable.just() factory can be expensive when used repeatedly because we are creating a new Observable for each emission. Is there not a way to consolidate 1000 single-emission Observables into a handful of Observables with several emissions?
We can do this by using groupBy() and a round-robin strategy. Create an AtomicInteger and use a modulus to break up the emissions into batches. Then you can use that modulus remainder to groupBy() on. For example, we can break up an Observable into 5 GroupedObservable items using a modulus operator, and we can flatMap() those.
final AtomicInteger batch = AtomicInteger(0);

Observable.range(1,1000)
    .groupBy(i -> batch.getAndIncrement() % 5)
    .flatMap(g -> g.observeOn(Schedulers.io())
        .map(i -> intenseCalculation(i))
    ).subscribe(System.out::println);
Note we have to use observeOn() since the GroupedObservable specifies its own subscribeOn() thread and can’t be overridden.
You can also use this in tandem with our ExecutorService strategy. Heck, you can throttle the number of GroupedObservables to be equal to the number of threads. If you have 4 cores, you would then have 5 cleanly divided streams of emissions on 5 threads.
int threadCt = Runtime.getRuntime().availableProcessors() + 1;

ExecutorService executor = Executors.newFixedThreadPool(threadCt);

Schedulers scheduler = Schedulers.from(executor);

Observable.range(1,1000)
    .groupBy(i -> batch.getAndIncrement() % threadCt )
    .flatMap(g -> g.observeOn(scheduler)
        .map(i -> intenseCalculation(i))
    ).subscribe(System.out::println);
If you have any other ideas, comments, questions, or your own discoveries with parallelization please comment below.

Saturday, February 13, 2016

RxJava- Understanding observeOn() and subscribeOn()


RxJava makes multithreading easier, but the simplified abstraction more or less forces a veteran developer to re-learn multithreading the "RxJava way". Beginners still have to learn multithreading which is not a trivial topic, but RxJava makes it much more accessible.
A few people have asked if I could cover observeOn() and subscribeOn() in a similar manner that I have covered parallelization. Let's take a hand-on approach to understand observeOn() and subscribeOn(). But first here is a little theory for newcomers to multithreading.

Concurrency and Multithreading in a Nutshell

If you have sizable experience with Java concurrency, please skip this section. Otherwise read on!
If you have never done multithreading/concurrency before, the idea is essentially multitasking. Think of a thread as a cursor executing one line of code at a time, which you can visibly see when you are using breakpoints in debug mode with Eclipse or IDEA. As you step through your code, each statement is executed top-to-bottom. In effect, you slowed down a thread and are now in control of it. You only have one thread traversing your code and executing each statement.

But when you multithread, you can have two or more threads (cursors) traversing your code and executing statements. For instance, you can have three threads doing three different tasks. Two threads could be importing two different sets of data simultaneously, while the third thread is asking the user for login credentials. These three tasks are being executing at the same time, and this is much better than having the user wait for each data set to be loaded before being prompted with a login.

However, when these three threads enter the same object and manipulate its properties, there is a high risk of the cursors overlapping. They can start to race each other and compete chaotically to evaluate and change the object's properties. That is why immutability should be your default policy with properties, and when properties have to be mutable you use synchronization. While you should always strive for immutability, RxJava greatly reduces the likelihood of race conditions and other multithreading problems. Problems will only likely happen when you create side effects manipulating objects outside the Observable chain.

Another common use of multithreading is parallelization. Say you have 10,000 objects and you need to perform an expensive calculation on each one. Instead of iterating them and executing the process one at a time, you can process 5 at at time by passing them to 5 worker threads. This could make the process up to 5 times faster.

Think of this as a checkout line where having 5 cashiers is better than 1, because you can process a high volume of customers faster. But of course, like threads, cashiers are expensive. If you have 30 customers to process, it is probably not practical to have 30 cashiers due to operational constraints. It is better to have 5 cashiers and "re-use" them after they process each customer, and they can take another customer waiting in the queue. This is effectively what a thread pool does. It internally maintains a set of threads and will queue tasks to delegate to them.

Most computers nowadays have multiple "cores", or processors built into the CPU. If you have a quad-core, you can optimally support 5 computational threads (4 cores + 1 extra thread for idle time). If you have 8 cores, you can optimally support 9 threads, and so on. If you exceed this simple rough formula (e.g. running 6 threads or more on a 4-core machine) you risk compromising performance. But not every task is computational. Importing and exporting data (called IO tasks) is much less taxing on the CPU. You could theoretically have 10 threads on a 4-core machine without issue if they all are simply importing/exporting data.

While RxJava takes a lot of the pain out of concurrency and multithreading, I highly recommend knowing how to use concurrency without RxJava, just so you are aware of the "gotchas" multithreading can manifest. Benjamin Winterberg created an awesome tutorial on Java 8 concurrency which I recommend reading. If you want some deep knowledge on concurrency, check out Java: Concurrency in Practice by Brian Goetz.

subscribeOn()

Before we bring concurrency into the discussion, think long and hard how an Observable chain of operators actually works. You have to have a source Observable where the emissions originate from. Only one emission at a time can be pushed up the entire chain of operators to the Subscriber. By default, the thread that declares the subscription (the subscribe() method) is the thread that pushes these emissions from the source, one a a time, all the way to the Subscriber.


For example, take this simple Observable operation that emits three String objects and maps their lengths.

public static void main(String[] args) {

    Observable<String> source = Observable.just("Alpha","Beta","Gamma");

    Observable<Integer> lengths = source.map(String::length);

    lengths.subscribe(l -> System.out.println("Received " + l + 
        " on thread " + Thread.currentThread().getName()));
}

The subscribe() operation on the second line will receive the emissions and print them. By default, the thread that declares the subscribe() is the one that pushes items from the source all the way up the chain to the Subscriber. If you run this program you will see the following output, indicating this Observable emitted items on the main thread.

Received 5 on thread main
Received 4 on thread main
Received 5 on thread main

his means the main thread (the thread which started the program) executed the emissions of this Observable, and it pushed each emission through the map() operator to the Subscriber. Since the main thread becomes occupied with pushing the emissions, the program will not exit until the Observable is done pushing emissions and calls onCompleted().

Let's say we wanted to subscribe to this Observable but we do not want to do it on the current main thread. Pretend calculating the lengths takes awhile. Perhaps we would like to kick off the calculations but not hold up the main thread. That main thread has places to go, things to do, and needs to kick off other tasks. We can do that with the subscribeOn() and specify a Scheduler. This will emit the items from the source on a different thread.

If our task is computational, we should use Schedulers.computation(). This will allocate one of a few fixed number of threads to this Observable operation, and the source will emit items on that thread.

 public static void main(String[] args) {

    Observable<String> source = Observable.just("Alpha", "Beta", "Gamma");

    Observable<Integer> lengths = source
            .subscribeOn(Schedulers.computation())
            .map(String::length);

    lengths.subscribe(sum -> System.out.println("Received " + sum +
            " on thread " + Thread.currentThread().getName()));
}

But you may run into a problem and not get any output. Why? With our simple program, the main thread passed off the execution of our Observable chain to a computation thread. The main thread reached the end of the main() method and exited the program, before the computation thread got a chance to emit any items!

You will not likely encounter this with real programs that are kept alive for a session, but for our example we need to keep our main thread alive long enough to see the subscription work. Just make it sleep for three seconds and that should give plenty of time to subscribe and execute the emissions.

public static void main(String[] args) {

    Observable<String> source = Observable.just("Alpha", "Beta", "Gamma");

    Observable<Integer> lengths = source
            .subscribeOn(Schedulers.computation())
            .map(String::length);

    sumLength.subscribe(sum -> System.out.println("Received " + sum +
            " on thread " + Thread.currentThread().getName()));

    sleep(3000);
}

private static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Your output should now look like this.

Received 5 on thread RxComputationThreadPool-1
Received 4 on thread RxComputationThreadPool-1
Received 5 on thread RxComputationThreadPool-1

Note that the emissions happened on a computation thread, or more specifically a thread named RxComputationThreadPool-1. This thread emitted all these items. A common misconception is that multiple threads will automatically parallelize your emissions, but this is not true as it would break the Observable contract. You can only direct emissions of an Observable from one single thread to another single thread. Parallelization is only possible when you create separate Observables as shown here.

It does not matter where in your Observable chain to put the subscribeOn(). No matter where you put it, it will tell the source Observable which thread to emit items on. If you specify multiple subscribeOn() operators, the one closes to the source (the left-most), will be the one used. As a matter of fact, a few source Observable factories, like Observable.interval(), will already specify a subscribeOn() internally. Observable.interval() will already emit on the computation scheduler, and any subscribeOn() you specify on it will do nothing.

In summary, subscribeOn() instructs the source Observable which thread to emit items on, and this thread will push items all the way to the Subscriber. However, if it encounters an observeOn() somewhere in the chain (discussed shortly), it will then pass emissions to another thread for the remaining operations at that point.

Choosing a Scheduler

There are several other Schedulers such as Schedulers.io(), which is optimal for IO-related tasks (and it caches and re-uses threads to increase efficiency). Then there is Schedulers.newThread() which simply creates a new thread for each subscription. You have to be careful with both of these because in theory they could create an unlimited number of threads (this can cause bad performance). For computational tasks, you should use Schedulers.computation() so the number of threads are limited based on the number of cores your machine has.

You can also use Schedulers.from() to specify your own Executor. Especially for parallization, I found this approach to have better performance.

observeOn()

It is helpful to instruct a source Observable which Scheduler to use via subscribeOn(), and the source Observable will emit items on one of that Scheduler's threads. However, it is often helpful in the middle of an Observable chain to switch to another Scheduler. For example, you may press a button on a UI and it kicks off work on a computation thread, which frees up the UI thread so the UI does not freeze. But when the computation is done, it needs to be displayed back on the UI. Oftentimes, when you working with UI technologies like JavaFX, Swing, or Android, you have to update the UI on the Event Dispatch Thread.

Take this example. We emit the numbers 1 through 10 and do some simple multiplication to them. By default the emissions happen on the main thread since we do not specify a subscribeOn(). But before the map(i -> i * 10) operation we switch the emissions over to a computation thread.

 public static void main(String[] args) {

    Observable<Integer> source = Observable.range(1,10);

    source.map(i -> i * 100)
            .doOnNext(i -> System.out.println("Emitting " + i
                    + " on thread " + Thread.currentThread().getName()))
            .observeOn(Schedulers.computation())
            .map(i -> i * 10)
            .subscribe(i -> System.out.println("Received " + i + " on thread "
                    + Thread.currentThread().getName()));

    sleep(3000);
}

private static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
If you run this code you should get this output.

Emitting 100 on thread main
Emitting 200 on thread main
Emitting 300 on thread main
Emitting 400 on thread main
Emitting 500 on thread main
Emitting 600 on thread main
Emitting 700 on thread main
Emitting 800 on thread main
Emitting 900 on thread main
Emitting 1000 on thread main
Received 1000 on thread RxComputationThreadPool-3
Received 2000 on thread RxComputationThreadPool-3
Received 3000 on thread RxComputationThreadPool-3
Received 4000 on thread RxComputationThreadPool-3
Received 5000 on thread RxComputationThreadPool-3
Received 6000 on thread RxComputationThreadPool-3
Received 7000 on thread RxComputationThreadPool-3
Received 8000 on thread RxComputationThreadPool-3
Received 9000 on thread RxComputationThreadPool-3
Received 10000 on thread RxComputationThreadPool-3

You will see the emissions initially occurred on the main thread and were pushed on that thread all the way to the first map(). But after that the observeOn() redirected the emissions to a computation thread, which pushed the emissions to the second map() and the final Subscriber.
Still not clear? Let's look at this visually. No matter what Scheduler you are subscribed on, only one emission is allowed to travel up the Observable chain of operators at a time. Below, you can observe that the emission must be pushed all the way from the source to the Subscriber before the next emission can start.


Let's say we wanted to switch to another thread after Operator 2. Perhaps we finished calculating something and now we want to update the UI on the UI thread. Or maybe we finished importing a data set on the io() Scheduler and now we want to do computations on the computation() Scheduler.

You can do this with the observeOn() operator as shown below. Notice how the bottom stream passed an emission to the top stream, allowing the bottom stream to start the next emission without waiting for the current one to reach the Subscriber.


The bottom stream represents a stream of operators on one Scheduler, and the top one represents another. Once an emission is passed from the bottom stream to the top one, the bottom stream is no longer concerned with it. It is now the top stream's responsibility to get that emission to the Subscriber.

From what I understand, one problem that may arise with observeOn() is the bottom stream can produce emissions faster than the top stream can process them. This can cause issues with backpressure you may have to consider. I'm definitely not an authority on backpressure but I've been burned enough to be wary of it.

Effectively, you can only use one subscribeOn(), but you can have any number of observeOn() operators. You can switch emissions from one thread to another with ease using observeOn(). But do not use it everywhere for the sake of. Only use it when you find a calculation is intense enough that it needs to be offloaded to another thread.

For UI technologies, there are a couple of libraries that bridge RxJava with a UI Scheduler. For example, there is RxJavaFX which has a Scheduler that puts emissions on the JavaFX Platform thread. There is also the RxJava Android Module which has Schedulers for Android. There is even RxSwing for those of us stuck with legacy Swing applications. These are very helpful to use in conjunction with observeOn() when working with user interfaces.

Let me know if you have any questions or comments. Be sure to read about parallelization in my other article as well.

Thursday, February 4, 2016

Kotlin + RxJava = Functional Powerhouse

As a software developer working in a tactical business environment, I'm always looking for ways to do more with less code. Even better, if I can modify and scale code to constantly adapt to business demands then I have to do less code re-writes.

Java has been my go-to language as it is practical, scalable, performant, portable, and statically-typed. As I became proficient and took on more ambitious projects, it started to feel cumbersome (and I kept eyeing C# wishfully). Thankfully I found RxJava last year, and reactive programming enabled me to take on tasks that I would hesitate to do before.

Utilizing RxJava almost exclusively for all my projects, I became much more productive and the quality of my applications increased. But I slowly started to realize the limitations of the Java language were holding RxJava back. Even with Java 8's lambdas, some functional programming tasks became very verbose.

For instance, using the compose() operator which accepts a custom Transformer allows you to create your own operator with existing RxJava operators. The problem is it can quickly become wordy and less fluid.

Here is a simple example. I can create a custom Transformer that turns an Observable<T> into an Observable<ImmutableList<T>> since I like Google Guava's immutable collections.

public final class Launcher {

    public static void main(String[] args) {
        Observable<String> source = Observable.just("Alpha", 
            "Beta", "Gamma", "Delta", "Epsilon");

        source.compose(toImmutableList()).subscribe(System.out::println);
    }

    public static <T> Observable.Transformer<T,ImmutableList<T>> toImmutableList() {
        return obs -> obs.collect(() -> ImmutableList.<T>builder(),
             (b,t) -> b.add(t)).map(b -> b.build());
    }
}

With Java 8 this is not a very big deal. But this Transformer factory exists in the same class, and if I stored it in a separate factory class it would slowly start getting verbose.

public final class Launcher {

    public static void main(String[] args) {
        Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", 
            "Delta", "Epsilon");

        source.compose(GuavaTransformers.toImmutableList()).subscribe(System.out::println);
    }

    /*This would be in the GuavaTransformers class */
    public static <T> Observable.Transformer<T,ImmutableList<T>> toImmutableList() {
        return obs -> obs.collect(() -> ImmutableList.<T>builder(), 
            (b,t) -> b.add(t)).map(b -> b.build());
    }
}

Even worse if I start to create more complicated Transformers or Operators with arguments, my compose() statement can start to get pretty ugly. If I wanted to collect items into an ImmutableListMultimap, it starts to get less fluid with the lambda arguments.

public final class JavaLauncher {

    public static void main(String[] args) {

        Observable<String> source = Observable.just("Alpha", "Beta", "Gamma",
             "Delta", "Epsilon");

        source.compose(GuavaTransformers
            .toImmutableListMultiMap(s -> s.length())).subscribe(System.out::println);
    }


    /*This would be in the GuavaTransformers class */
    public static <T> Observable.Transformer<T,ImmutableList<T>> toImmutableList() {
        return obs -> obs.collect(() -> ImmutableList.<T>builder(), 
            (b,t) -> b.add(t)).map(b -> b.build());
    }

    public static <T,K> Observable.Transformer<T,ImmutableListMultimap<K,T>> 
            toImmutableListMultiMap(Func1<T,K> keyMapper) {

        return obs -> obs.collect(() -> ImmutableListMultimap.<K,T>builder(),
            (b,t) -> b.put(keyMapper.call(t), t)).map(b -> b.build());
    }
}

These may be trivial examples, but for larger applications these problems can quickly become amplified. Precious code real estate becomes eaten away even with the efficiencies of RxJava and Java 8 lambdas, and we have not even gotten to the subject of Tuples and data classes! But Kotlin solves all of these problems and more.

Introducing Kotlin

I tried looking at Scala, Python, and other languages. I especially looked at Scala but despite all the praise it gets, I found it too esoteric. Then one day I found JetBrains sharing their new language called Kotlin. They advertised it as an industry-grade, business-focused language emphasizing practicality rather than convention. JetBrains, the creator of the popular Java IDE Intellij IDEA, built it because they felt they could be more productive using a language that Java should have been. After studying Kotlin and re-writing two home projects with it, I quickly became sold and am ready to use it exclusively. The fact it is 100% interoperable with Java and all Java libraries made it a quick sell as well.

But in this post, what I really want to share is my experience using RxJava with Kotlin. Ironically, I found RxJava works better with Kotlin than Java itself. It just expresses functional programming concepts so much better.

For instance, I can "add" methods to the Observable using extension methods, without even extending the class! This is nothing new if you came from a C# background, but this was always the #1 thing I wished Java would have. Below I add toImmutableList() and toImmutableListMultimap() methods to the Observable (in Kotlin methods are actually called functions). I can then call those methods directly on the Observable rather than creating a compose() statement.

fun main(args: Array<String>) {

    val source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

    source.toImmutableListMultimap { it.length }.subscribe { println(it) }

}

fun <T> Observable<T>.toImmutableList() =
        collect({ ImmutableList.builder<T>()},{ b, t -> b.add(t)}).map { it.build() }

inline fun <K,T> Observable<T>.toImmutableListMultimap(
    crossinline keyMapper: (T) -> K) = collect({ ImmutableListMultimap.builder<K,T>()},
        { b, t -> b.put(keyMapper(t), t)}).map { it.build() }

There are a lot of observations to make here.

  1. We did not have to wrap these functions inside a class. Unlike Java, Kotlin does not force you to put static methods in a class. This is really helpful and helps eliminate a lot of boilerplate, especially for procedural programs.

  2. The type of the source variable is inferred, allowing us to not have to explicitly declare it as an Observable<String>. You can do that if you want to as shown below. In Kotlin the type comes after the variable name (separated by a colon :). This is done because the variable name is likely more pertinent to you than the type, so it is declared first to making finding it easy.

     val source: Observable<String> = Observable.just("Alpha", "Beta", "Gamma",
          "Delta", "Epsilon")
    
  3. You can express lambdas much more easily. Instead of having to write out a lambda for a simple one-to-one mapping like s -> s.length(), you can express this with a much more succint it.length, where it refers to the single incoming item emitted (this is featured in other JVM languages). Also, there are no paranthesis () to receive functional arguments. Instead you use curley braces {} and express the entire function for that operator in it. This is especially helpful because you can put multiple lines in the curly braces { } at any time.

      source.toImmutableListMultimap { it.length }.subscribe{ println(it)}
    
  4. You can "add" functions/methods to a class without actually extending a class using extension functions. This single statement below adds a toImmutableList() function to the Observable everywhere in your application (unless you make it private or alter its scope). How is this done? The compiler simply makes it a static method when turned into bytecode, but you get the nice syntactic sugar as well as seeing it in your auto-complete. You do not have to target generic types either with extension methods. For example, I could make a concatStr() extension method specifically targeting Observable<String> and not Observable<T>.

     fun <T> Observable<T>.toImmutableList() =
           collect({ ImmutableList.builder<T>()},{ b, t -> b.add(t)}).map { it.build()}
    
  5. Functional argument types are much simpler. Instead of expressing a functional type as Func1<T,K>, you can use a SAM-less type expression (T) -> K. This gets across much more easily that this function receives a T and turns it into a K. It is not a single-abstract-method type (SAM) which makes it easier to reason with and leaves out the question "which single-method interface am I using?". Of course, Kotlin will handle converting lambdas to SAM when calling Java libraries, but it will not do it in Kotlin. Also, using the inline and crossinline keywords for a function accepting function arguments, you can get great efficiency by eliminating object overhead.

    inline fun <K,T> Observable<T>.toImmutableListMultimap(
       crossinline keyMapper: (T) -> K) = collect({ImmutableListMultimap.builder<K,T>()},
             { b, t -> b.put(keyMapper(t), t)}).map { it.build() }
    

Data Classes

Another great feature of Kotlin is data classes. Have you ever wanted to simply zip two values together, but had to create an entire class just to pair them up with hashCode(), equals(), and toString() implemented?


public final class JavaLauncher {

    public static void main(String[] args) {
        Observable<String> letter = Observable.just("Alpha", "Beta", "Gamma", 
            "Delta", "Epsilon");

        Observable<Integer> number = Observable.just(1,2,3,4,5);

        Observable<CodePair> zipped = Observable.zip(letter,number, 
            (l,n) -> new CodePair(l,n));

        zipped.subscribe(System.out::println);
    }

    private static final class CodePair {
        private final String letter;
        private final Integer number;

        CodePair(String letter, Integer number) {
            this.letter = letter;
            this.number = number;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;

            CodePair codePair = (CodePair) o;

            if (!letter.equals(codePair.letter)) return false;
            return number.equals(codePair.number);

        }

        @Override
        public int hashCode() {
            int result = letter.hashCode();
            result = 31 * result + number.hashCode();
            return result;
        }

        @Override
        public String toString() {
            return "CodePair{" +
                    "letter='" + letter + '\'' +
                    ", number=" + number +
                    '}';
        }
    }
}

It is not fun I had to write 36 lines of code just to create a CodePair class holding two properties. This problem comes up in functional programming quite a bit, and the only alternative is creating esoteric tuples which only obfuscate the code.

But in Kotlin, you can declare something called a data class. This allows you to quickly declare a class in one line with all its properties, and it will take care of the hashCode(), equals(), toString() and even clone/modify builders for you.

That 48-line mess in Java now becomes 5 lines in Kotlin.

fun main(args: Array<String>) {

    data class CodePair(val letter: String, val number: Int)

    val letter = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
    val number = Observable.just(1, 2, 3, 4, 5)

    val zipped = Observable.zip(letter, number) { l, n -> CodePair(l, n) }

    zipped.subscribe { System.out.println(it) }
}

We declared a CodePair class right inside the main() function, and it only exists in the scope of the main() function. It has named properties letter and number you can access. This opens up a lot of tactical possibilities that were borderline impractical to do in Java. Being able to declare simple classes on the fly and have the common method implementations done allows fast, organized, and legible code to be developed quickly.

Conclusions

I have only scratched the surface in sharing what Kotlin can do, with or without RxJava. This was not a tutorial but just a quick showcase of how RxJava expresses differently in Kotlin. I hope I have effectively shared my experience and you are curious to check out Kotlin. I know Scala can do quite a bit with RxScala, but Kotlin is different. It really serves folks who need the tactical abilities and simplicity of Python with the scalability and power of Java. When you throw RxJava into the mix with Kotlin, I have found it to be a very rewarding combination. Did I forget to mention that there are no primitives or boxed types either? There are so many features in Kotlin that would be out of scope to post here, like nullable types

As a sidenote, Kotlin is supported on Android. You can also checkout the RxKotlin library which extends RxJava to take advantage of Kotlin functionalities (such as adding toObservable() methods to collections).