A common question about RxJava is how to achieve parallelization, or emitting multiple items concurrently from an Observable. Of course, this definition breaks the Observable Contract which states that onNext() must be called sequentially and never concurrently by more than one thread at a time. Now you may be thinking "how the heck am I supposed to achieve parallelization then, which by definition is multiple items getting processed at a time!?" Widespread misunderstanding of how parallel actually works in RxJava has even led to the discontinuation of the parallel operator.
You can achieve parallelization in RxJava without breaking the Observable contract, but it requires a little understanding of Schedulers and how operators deal with multiple asynchronous sources.
Say you have this simple Observable that emits a range of Integers 1 to 10 upon subscription.
Observable<Integer> vals = Observable.range(1,10); vals.subscribe(val -> System.out.println(val);)
But let's say we are doing some very intense calculation process to each Integer value.
Observable<Integer> vals = Observable.range(1,10); vals.map(i -> intenseCalculation(i)) .subscribe(val -> System.out.println(val);)
And for simplicity's sake let's just make intenseCalculation() sleep for a random interval before returning the integer back, simulating an intensive calculation. We will also make it print the current thread the computation is occurring on.
public static int intenseCalculation(int i) { try { System.out.println("Calculating " + i + " on " + Thread.currentThread().getName()); Thread.sleep(randInt(1000,5000)); return i; } catch (InterruptedException e) { throw new RuntimeException(e); } }
We would want to parallelize and process multiple integers at a time. A common mistake people make is think "Oh, I just use subscribeOn() and have the source Observable emit items on multiple computation threads just like an ExecutorService.
Observable<Integer> vals = Observable.range(1,10); vals.subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) .subscribe(val -> System.out.println("Subscriber received " + val + " on " + Thread.currentThread().getName()));
However, if you run this code you will get this output, showing that each integer only emitted on one computation thread, not several as you might have expected. As a matter of fact, this is as sequential as a single-threaded operation.
Calculating 1 on RxComputationThreadPool-1 Subscriber received 1 on RxComputationThreadPool-1 Calculating 2 on RxComputationThreadPool-1 Subscriber received 2 on RxComputationThreadPool-1 Calculating 3 on RxComputationThreadPool-1 Subscriber received 3 on RxComputationThreadPool-1 Calculating 4 on RxComputationThreadPool-1 Subscriber received 4 on RxComputationThreadPool-1 Calculating 5 on RxComputationThreadPool-1 Subscriber received 5 on RxComputationThreadPool-1 Calculating 6 on RxComputationThreadPool-1 Subscriber received 6 on RxComputationThreadPool-1 Calculating 7 on RxComputationThreadPool-1 Subscriber received 7 on RxComputationThreadPool-1 Calculating 8 on RxComputationThreadPool-1 Subscriber received 8 on RxComputationThreadPool-1 Calculating 9 on RxComputationThreadPool-1 Subscriber received 9 on RxComputationThreadPool-1alculating 10 on RxComputationThreadPool-1 Subscriber received 10 on RxComputationThreadPool-1
Well that was not helpful! We did not achieve any effective parallelization at all. We just directed the emissions to happen on another thread named RxComputationThreadPool-1.
So how do we make calculations happen on more than one computation thread? And do it without breaking the Observable contract? The secret is to catch each Integer in a flatMap(), create an Observable off it, do a subscribeOn() to the computation scheduler, and then perform the process all within the flatMap().
Observable<Integer> vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val));Now we are getting somewhere and this looks pretty parallel now. We are getting multiple emissions happening on different computational threads.
Calculating 1 on RxComputationThreadPool-3 Calculating 4 on RxComputationThreadPool-2 Calculating 3 on RxComputationThreadPool-1 Calculating 2 on RxComputationThreadPool-4 Subscriber received 3 on RxComputationThreadPool-1 Calculating 7 on RxComputationThreadPool-1 Subscriber received 4 on RxComputationThreadPool-2 Calculating 8 on RxComputationThreadPool-2 Subscriber received 2 on RxComputationThreadPool-4 Calculating 6 on RxComputationThreadPool-4 Subscriber received 8 on RxComputationThreadPool-2 Subscriber received 6 on RxComputationThreadPool-4 Calculating 10 on RxComputationThreadPool-4 Subscriber received 7 on RxComputationThreadPool-1 Subscriber received 10 on RxComputationThreadPool-4 Subscriber received 1 on RxComputationThreadPool-3 Calculating 5 on RxComputationThreadPool-3 Subscriber received 5 on RxComputationThreadPool-3 Calculating 9 on RxComputationThreadPool-3 Subscriber received 9 on RxComputationThreadPool-3
RX COMPUTATION THREAD ---------------------------1---- RX COMPUTATION THREAD -------------------------3------ RX COMPUTATION THREAD -----------------------------2-- RX COMPUTATION THREAD ---------------------------4---- MAIN THREAD ------7---6--5------------------But how is this not breaking the Observable contract you ask? Remember that you cannot have concurrent onNext() calls on the same Observable. We have created an independent Observable off each integer value and scheduled them on separate computational threads, making their concurrency legitimate.
Now you may also be asking "Well... why is the Subscriber receiving emissions from multiple threads then? That sounds an awful lot like concurrent onNext() calls are happening and that breaks the contract."
Actually, there are no concurrent onNext() calls happening. The flatMap() has to merge emissions from multiple Observables happening on multiple threads, but it cannot allow concurrent onNext() calls to happen down the chain including the Subscriber. It will not block and synchronize either because that would undermine the benefits of RxJava. Instead of blocking, it will re-use the thread currently emitting something out of the flatMap(). If other threads want to emit items while another thread is emitting out the flatMap(), the other threads will leave their emissions for the occupying thread to take ownership of.
Here is proof as the above example makes this not so obvious. Let's collect the emissions into a toList() before they go to the Subscriber.
Observable<Integer>vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).toList() .subscribe(val -> System.out.println("Subscriber received " + val + " on " + Thread.currentThread().getName()));
Observe the output below.
Calculating 4 on RxComputationThreadPool-2 Calculating 1 on RxComputationThreadPool-3 Calculating 2 on RxComputationThreadPool-4 Calculating 3 on RxComputationThreadPool-1 Calculating 7 on RxComputationThreadPool-1 Calculating 6 on RxComputationThreadPool-4 Calculating 5 on RxComputationThreadPool-3 Calculating 10 on RxComputationThreadPool-4 Calculating 8 on RxComputationThreadPool-2 Calculating 9 on RxComputationThreadPool-3 Subscriber received [3, 2, 1, 6, 4, 7, 10, 5, 8, 9] on RxComputationThreadPool-3
Notice that the calculations were all happening on different threads, and the flatMap() was pushing items out on one of these threads at a given time. But the last thread to do an emission was RxComputationThreadPool-3. This happened to be the thread pushing items out of flatMap() when it needed to emit the final value 9, so it called onCompleted() which pushed out the list all the way to the Subscriber.
Let me know if you have any questions. Here is the full working example.
import rx.Observable; import rx.schedulers.Schedulers; import java.util.Random; public final class ParallelTest { private static final Random rand = new Random(); public static void main(String[] args) { Observable<Integer>vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println("Subscriber received " + val + " on " + Thread.currentThread().getName())); waitSleep(); } public static void waitSleep() { try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } } public static int intenseCalculation(int i) { try { System.out.println("Calculating " + i + " on " + Thread.currentThread().getName()); Thread.sleep(randInt(1000,5000)); return i; } catch (InterruptedException e) { throw new RuntimeException(e); } } public static int randInt(int min, int max) { return rand.nextInt((max - min) + 1) + min; } }
About Fixing Thread Pools
Please note as David mentioned below that this parallelization strategy does not work with all Schedulers. Some Schedulers other than computation() could flood the system with too many threads and therefore cause poor performance. The computation scheduler limits the number of concurrent processes that can happen inside the flatMap() based on the number of CPU's your machine has. If you wanted to use other schedulers like newThread() and io() which do not limit the number of threads, you can pass a second int argument to the flatMap() limiting the number of concurrent processes.
You can also create a Scheduler off an ExecutorService giving you more fine-tuned control if needed. Actually you can get significantly better performance doing this. You can read about it in my other article on Maximizing Parallelization.
Great post!
ReplyDeleteI just wanted to add that this approach doesn't work with any Scheduler; this works with computation() and any fixed-size Executor turned into a Scheduler. io() and newThread() will flood the system with threads. However, if you use the flatMap(func, maxConcurrency) overload, you can limit the active number of threads (although newThread will still churn through many of them).
In addition, you can use groupBy with a round-robin key selector:
AtomicInteger key = new AtomicInteger()
source.groupBy(v -> key.getAndIncrement() % 5)
.flatMap(g -> g.observeOn(scheduler).map(i -> calculation(i)))
.subscribe(...);
This will avoid creating too many one-time Observables and Workers.
How does this work with computation without the max concurrency overload?
DeleteFrom my observations as well as general understanding of concurrency, it limits the number of available threads to an appropriate number based on the number of cores you have. Please correct me if I'm wrong David...
DeleteIf you have 8 cores, this will only use 5 of them. If you have 4 cores, two groups will share the same core.
DeleteI finally get the round-robin key-selector now. Very clever.
DeleteWhile it certainly can accomplish the goal, it's still not as good as just piping tasks into an ExecutorService in terms of efficiency, especially if you're dealing with large sets of data and/or large variations in latency in the calculation() method. From what I can tell, https://github.com/ReactiveX/RxJava/wiki/Parallel-flows is the new/"right" way to do non-sequential parallelization of the pipeline, in RxJava 2. Is that right? Has anyone tried it?
DeleteThe solution with groupBy and observeOn proposed by David does not seem to support backpressure. It will request all data from the source at once.
DeleteIt has been fixed recently ;).
DeleteGreat Article
DeleteIEEE final year projects on machine learning
JavaScript Training in Chennai
Final Year Project Centers in Chennai
JavaScript Training in Chennai
Thanks David! I'll make a mention of that regarding Schedulers and fixed sizing a little later when I edit this. I need to update one of the visualizations anyway, realizing I misrepresented the subscribeOn() as an observeOn() switching from one thread to another.
ReplyDeleteI have never seen that groupBy() approach so I'll need to play with it. I've been interested to see if anything would come out of the RxJava Parallel project on GitHub, but noticed it had no activity for awhile. I was wondering if new operators in that project would challenge the Observable contract safely.
It is hard to compete with Java 8 Stream's parallel mode. I have a bunch of ideas for RxJavaParallel but not the time for it.
ReplyDeletehaha yeah, prioritizing is always the frustrating part of being a developer. Stream's parallel operator is pull-based of course which I'm guessing is simpler to achieve. But what makes Rx so great in my opinion is its composability allows it to scale any problem, even its own perceived limitations.
DeleteExcellent post, thanks!
ReplyDeleteGreat post!
ReplyDeleteI thought RxJava guarantees the order of items in Subscriber is the same as in Observer. Do you have idea how to achieve this and still keep the parallelization? How would you extend you example? Thanks
An Observable will emit items in a sequential, deterministic order. But when multiple sources are parallelized and merged, order can logically no longer be enforced. You will want to avoid parallelization if order matters.
DeleteThanks.. I wrote the same without using lamda. I am getting different results
ReplyDeleteWHats wrong with my code
vals.flatMap(new Func1>() {
@Override
public Observable call(Integer pArg0) {
// TODO Auto-generated method stub
return Observable.just(pArg0);
}
}).subscribeOn(Schedulers.computation()).map(new Func1() {
@Override
public Integer call(Integer pArg0) {
// TODO Auto-generated method stub
return intenseCalculation(pArg0);
}
}).subscribe(new Action1() {
@Override
public void call(Integer pArg0) {
System.out.println(" Subscribed recieved " + pArg0 + " on " + Thread.currentThread().getName());
}
});
Hi Arun! Your lambda-less code, though compilable, seems not to be equivalent to the source example in the post. Try the following:
Deletevals.flatMap(new Func1>() {
@Override
public Observable call(Integer pArg0) {
// TODO Auto-generated method stub
return Observable.just(pArg0).
subscribeOn(Schedulers.computation())
.map(new Func1() {
@Override
public Integer call(Integer pArg0) {
// TODO Auto-generated method stub
return intenseCalculation(pArg0);
}
});
}
}).subscribe(new Action1() {
@Override
public void call(Integer pArg0) {
System.out.println(" Subscribed recieved " + pArg0 + " on " + Thread.currentThread().getName());
}
});
Great article on Parallelization. In the full working example, I noticed ".toList()" missing. Was that intentional? Thanks!
ReplyDeleteFrom your code, if I add a thread.sleep in the subscribe i.e
ReplyDeleteFlowable.range(1,1000)
.flatMap(val -> Flowable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> {
Thread.sleep(500);
System.out.println("Subscriber received "
+ val + " on "
+ Thread.currentThread().getName());
});
I see subscriber only processing in one thread:
Subscriber received 10 on RxComputationThreadPool-1
Subscriber received 11 on RxComputationThreadPool-1
Subscriber received 12 on RxComputationThreadPool-1
Subscriber received 14 on RxComputationThreadPool-1
Subscriber received 15 on RxComputationThreadPool-1
Subscriber received 16 on RxComputationThreadPool-1
Subscriber received 18 on RxComputationThreadPool-1
Is there a newer updated version of this using RxJava 2.0 ? Perhaps using ParallelFlowable ?
ReplyDeleteNino Nurmadi, S.Kom
ReplyDeleteNino Nurmadi, S.Kom
Nino Nurmadi, S.Kom
Nino Nurmadi, S.Kom
Nino Nurmadi, S.Kom
Nino Nurmadi, S.Kom
Nino Nurmadi, S.Kom
Nino Nurmadi, S.Kom
Nino Nurmadi, S.Kom
Nino Nurmadi, S.Kom Nino Nurmadi, S.Kom Nino Nurmadi, S.Kom Nino Nurmadi, S.Kom Nino Nurmadi, S.Kom Nino Nurmadi, S.Kom Nino Nurmadi, S.Kom Nino Nurmadi, S.Kom Nino Nurmadi, S.Kom Nino Nurmadi, S.Kom
ReplyDeletebuy big chief extracts online
ReplyDeletebuy gaswoods pre rolls online
buy dankwoods online
Livechat Vivo
ReplyDeleteLivechat Vivo Slot
Customer Service Vivo Slot
Daftar Vivo Slot Via Livechat
Livechat Vivo Resmi
Deposit Live22 via Pulsa
ReplyDeleteDeposit Live22 Pulsa
Deposit Live22 via Pulsa Telkomsel
Deposit Live22 via Pulsa XL
Deposit Live22 via Pulsa Tri
I trust you post again soon... Valorant Phoenix Jacket
ReplyDeleteYou can do very creative work in a particular field. Exceptional concept That was incredible share.
ReplyDeleterocketman denim jacket
I enjoyed your blog Thanks for sharing such an informative post. We are also providing the best services click on below links to visit our website.
ReplyDeleteHoliday Music
Christmas 2020 Playlist
comprar carta de condução
ReplyDeletecarteira de motorista portugal
comprar carta de condução legal
comprar a carta de condução
carteira de motorista em portugal
comprar carta de conduçao online
comprar carta de conduçao em portugal
condutor português
carta de motorista portugal
comprar carta de condução
ReplyDeletecarteira de motorista portugal
comprar carta de condução legal
comprar a carta de condução
carteira de motorista em portugal
comprar carta de conduçao online
comprar carta de conduçao em portugal
condutor português
carta de motorista portugal
comprar carta de condução
ReplyDeletecarteira de motorista portugal
comprar carta de condução legal
comprar a carta de condução
carteira de motorista em portugal
comprar carta de conduçao online
comprar carta de conduçao em portugal
condutor português
carta de motorista portugal
we work with transport office and the municipality We offer real license without exam. We are only out to facilitate the process for those who do not want to go through the normal process or who may have failed the exams
ReplyDeletecomprar carta de condução
carteira de motorista portugal
comprar carta de condução legal
comprar a carta de condução
carteira de motorista em portugal
comprar carta de conduçao online
comprar carta de conduçao em portugal
condutor português
carta de motorista portugal
This is really amazing, you are very skilled blogger. Visit Ogen Infosystem for professional website designing and SEO Services.
ReplyDeleteSEO Service in Delhi
Our the purpose is to share the reviews about the latest Jackets,Coats and Vests also share the related Movies,Gaming, Casual,Faux Leather and Leather materials available Yellowstone Rip Wheeler Jacket
ReplyDeleteNice Blog !
ReplyDeleteHere We are Specialist in Manufacturing of Movies, Gaming, Casual, Faux Leather Jackets, Coats And Vests See Hughie Campbell Jacket
Thank you for posting such a nice post. Keep posting. It is really helpful. Chronicles Santa Claus Coat
ReplyDeleteFollow the latest trends and leave your impression on viewers mind wearing this titled Beth Dutton Blue Coat . Buy it and enjoy flat 80% off on all items.
ReplyDeleteThis blog was very nicely formatted; it maintained a flow from the first word to the last. First Aid Usmle
ReplyDeleteMua vé máy bay tại đại lý Aivivu, tham khảo
ReplyDeletevé máy bay đi Mỹ
gia ve tu my ve vietnam
ve may bay tư duc ve viet nam
giá vé máy bay từ moscow về hà nội