Subjects are both an
Observable
and an Observer
. Because it is an Observer
, anything at any time can call its onNext()
method and push items up to its Subscribers. You can also take one or more existing Observables and have a Subject
subscribe to them, and in turn pass their emissions up to the Subject's Subscribers. These features may seem convenient but can quickly encourage anti-patterns.When I first started learning reactive programming, I was quickly introduced to the
Subject
and its various flavors like BehaviorSubject
, ReplaySubject
, and PublishSubject
. To a newbie with an imperative programming background, these seemed like magical devices that bridged imperative and reactive operations together. I began to think subjects were a practical way to create source Observables, and I could push items from any source at any time through a chain of reactive operations. Makes sense right? Even some reactive tutorials encourage newbies to play with Subjects to get a feel for reactive programming, but I think this is wrong and creates a setback in a newbie's learning curve.If you find yourself using Subjects quite often, you might want to reflect on what you think a source
Observable
does. On a philosophical level, reactive programming is about defining behaviors from a well-defined source all the way to the Subscriber. It is important to think about how emissions should originate and take form. But first let's discuss ideally how a reactive chain of Observables should work, and then later it will make sense why Subjects can undermine the integrity of a reactive application.
A Typical Observable Setup
From a practical standpoint, anObservable
chain of operations is broken into three parts:1) The Source
Observable
where emissions originate2) Operators to transform the emissions
3) A
Subscriber
to consume the emissionsOf course, we can make #2 more complicated by using operators that work with other Observables, like
merge()
and flatMap()
. But every Observable
by itself should follow this structure. The simplest example would be emitting a fixed set of String
values, mapping their lengths, and then printing them. //Source
Observable<String> values = Observable.just("Alpha", "Beta", "Gamma");
//Operators
Observable<Integer> lengths = values.map(String::length);
//Subscriber
Subscription printSubscription = lengths.subscribe(System.out::println);
The three components, the source, the operators, and subscriber are very distinctly identified above. We can alternatively express this in one statement without intermediary variables.
//Source, Operators, Subscriber
Observable.just("Alpha", "Beta", "Gamma")
.map(String::length)
.subscribe(System.out::println);
This entire
Observable
operation is also agnostic to which thread it is scheduled on, which is good for flexible concurrency and works easily with observeOn()
and subscribeOn()
as we will read about in Reason 3. But first let's just look at source Observables themselves and analyze their nature.Reason 1: Keep Source Observables Predictable
What needs to be highlighted with the example above is the source emitting "Alpha", "Beta", and "Gamma". TheObservable.just()
factory created a source Observable
emitting these three Strings. The benefit is the source is tightly controlled and predictable, and we know it will only emit those three Strings. We can trust nothing alien is ever going to enter that source and and be emitted up the chain.For instance, we should never expect the word "Puppy" to be emitted by the source. We only expect "Alpha", "Beta", and "Gamma".
Observable.just("Alpha", "Beta", "Gamma")
.map(String::length)
.subscribe(System.out::println);
However if we use a
Subject
, this is not guaranteed, especially if it is exposed publicly and anything can call its onNext()
method. (Not to mention, this is just downright messy compared to the code above).PublishSubject<String> subject = PublishSubject.create();
subject.map(String::length)
.subscribe(System.out::println);
subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
//something accidentally pushes "Puppy" as an emission
subject.onNext("Puppy");
subject.onCompleted();
The
Subject
undermines having a well-thought, strictly-defined source Observable
where the emissions come from a predictable source. You also lose some flexibility as the Subject
is hot, and you can no longer create a cold Observable
as we will read in #2.This is a simplistic example of course, and more complex applications are not going to emit three emissions like "Alpha", "Beta", and "Gamma". Realistically, you might use RxJava-JDBC to create a source
Observable
which emits items from a database query. Database db = ...
Observable<String> values = db
.select("SELECT MY_FIELD FROM MY_TABLE")
.getAs(String.class);
values.subscribe(System.out::println);
While the results of the query can change each time it is subscribed to (because the table can be updated), we have strictly defined the source
Observable
and where its emissions come from. It will only emit items from this query and nowhere else. You can even avoid Subjects for hot sources too. A JavaFX
Button
being pressed, for example, can emit those action events in a hot Observable
. We can use Observable.create()
instead of a Subject
to turn those events into an Observable
. Button button = new Button("Press Me");
Observable<ActionEvent> actionEvents = Observable.create(new Observable.OnSubscribe<ActionEvent>() {
@Override
public void call(final Subscriber<? super ActionEvent> subscriber) {
final EventHandler<ActionEvent> handler = subscriber::onNext;
button.addEventHandler(ActionEvent.ACTION, handler);
subscriber.add(JavaFxSubscriptions.unsubscribeInEventDispatchThread(() ->
button.removeEventHandler(ActionEvent.ACTION, handler)));
}
});
actionEvents.subscribe(ae -> System.out.println("Pressed!"));
Even better, we can use RxJavaFX and call a factory that does this for us.
Button button = new Button("Press Me");
Observable<ActionEvent> actionEvents = JavaFxObservable.fromActionEvents(button);
actionEvents.subscribe(ae -> System.out.println("Pressed!"));
This way we can be sure that emissions for this source
Observable
will only come from this Button
being pressed.To summarize all the examples above, we should tightly control how a source
Observable
works and strictly define where its emissions come from. Having a Subject
haphazardly allow emissions to come from any source can create a chaotic, unmaintainable application.Reason 2: Mind Your Hots and Colds
The best definition I have heard of a Cold Observable versus a Hot Observable is by Nickolay Tsvetinov, author of Learning Reactive Programming with Java 8:We can say that cold Observables generate notifications for each subscriber and hotA cold
Observables are always running, broadcasting notifications to all of their subscribers.
Think of a hot Observable as a radio station. All of the listeners that are listening to
it at this moment listen to the same song. A cold Observable is a music CD. Many
people can buy it and listen to it independently.
Observable
is "replayable", meaning it will "replay" its emissions to each Subscriber
independently. A hot Observable
will emit the same item at a given time to all its Subscribers, and it may not even care if any Subscribers are present. Some things are more appropriately represented as a hot
Observable
, such as UI input events like a Button
getting pressed. Data-driven Observables, such as the earlier example emitting "Alpha", "Beta", and "Gamma" or the database query, are often better represented as a cold Observable
. That data is replayed to each Subscriber
that needs it, and being cold guarantees no data was missed previously. Sometimes, it is more efficient to carefully play expensive emissions at the same time to all Subscribers
, and this is where a ConnectableObervable
can come in handy to turn a cold Observable
into a hot one. But you need to be careful! Hot Observables are a timing-sensitive animal. This is why it is very important to prefer cold Observables by default. Many operators like
flatMap()
are in fact a Subscriber
to another Observable
. When used with hot Observables, this can lead to complicated, confusing cases where the data is being emitted in live time and not every Subscriber
is able to capture all the data if they subscribed too late. For example, lets say we have three Strings being emitted from a source
Observable
. We want to subtract their lengths from the min length of the three Strings. We can wrongly use a PublishSubject
to hotly emit the Strings to a minLength
Observable and a Subscriber
that flatMaps against this minLength
. PublishSubject<String> source = PublishSubject.create();
Observable<Integer> minLength = source
.map(String::length)
.reduce((x,y) -> x < y ? x : y);
source.map(String::length)
.flatMap(len -> minLength.map(min -> len - min))
.subscribe(System.out::println);
source.onNext("Alpha");
source.onNext("Beta");
source.onNext("Gamma");
source.onCompleted();
But if you run this you may get an error or no console output at all. Why? The three values are being emitted hotly and the
minLength
can miss the emissions in the flatMap()
. This would happen not just with a Subject
but any hot Observable
, including a ConnectableObservable
. But if you use the
Observable.just()
factory, it will coldly replay the values each time it is subscribed to. The flatMap()
will subscribe to the minLength
, which will subscribe to the source and replay the data each time. This way every part of the Observable
chain is given ALL the data. Each piece can replay the CD rather than worrying it missed the radio program.Observable<String> source = Observable.just("Alpha","Beta","Gamma");
Observable<Integer> minLength = source
.map(String::length)
.reduce((x,y) -> x < y ? x : y);
source.map(String::length)
.flatMap(len -> minLength.map(min -> len - min))
.subscribe(System.out::println);
Also, is the code just not simpler? Cold Observables tend to make programs simpler and more reliable, and they should be preferred when possible. Yes, you could use caching of some form but this is often not good for performance. It is better for the cold source
Observable
to define how data is provided.By the way, also be very mindful of infinite Observables and only use them when your source is truly infinite (like UI input events). Overusing infinite hot
Observable
can clash with operators that expect a finite set of emissions, and this hardly works well with complex reactive applications.Reason 3: Thread Safety and Concurrency
Last but definitely not least, SUBJECTS BY DEFAULT ARE NOT THREADSAFE. This is a big reason to avoid them because reactive programming is all about making Observables safely agnostic to which threads they operate on. But if multiple concurrent calls toonNext()
are occurring on different threads, it will break the Observable Contract and risk creating race conditions with the operators. Only one thread can be passing through a given operator at a time. But if the source Observable
is making several simultaneous onNext()
calls, it could have catastrophic behaviors. If you use the existing
Observable
factories and operators with RxJava, you can be safely sure that you will not have this issue. Subjects, incorrectly-built custom Transformers, and bad sources with concurrent calls to onNext()
can all wreak havoc.If you do have to use Subjects for those corner cases, it is always a good idea to turn your
Subject
into a SerializedSubject
. That way if multiple threads do happen to call the onNext()
concurrently, it will serialize the emissions safely so it does not break any Observables and Subscribers dependent on it. SerializedSubject<Integer,Integer> subject =
PublishSubject.<Integer>create().toSerialized();
To learn more about concurrency with RxJava, read my article on observeOn() and subscribeOn().
Conclusion
Subjects are abused a lot and can undermine the benefits of reactive programming. Erik Meijer called them the "mutable variables" of reactive programming for good reason. Actually, the problems of mutable variables are almost identical to that of subjects: thread safety, bugs, not being sure where a value change came from, etc. AnObservable
is an immutable chain of operations from the source to the Subscriber
, and the emissions are pushed in a predictable, contained manner. A Subject
introduces mutability to that chain and can break it with unpredictable emission sources, lost data, and unwanted thread safety behaviors. Of course there are some uses for Subjects and they can come in handy, but I avoid subjects 99.9% of the time and am better off because of it. I am sure there are other technical reasons to avoid Subjects, and these are just my observations and experiences. Let me know if you have other reasons they should be avoided, or corner cases when they should be used.
Hi Thomas,
ReplyDeletethank you for this interesting article and +1 for the Tsvetinov's book citation (maybe the best until now IMHO).
I just starting using RxJava and one thing that continues to confuse me is the usage of cold observables for UI events. I think that it is more "natural" to use a Subject (exposed as an Observable to clients if necessary) given that (think about the click listener of the example) it will pass the emissions the UI generates when subsribers are subscribed. The javadoc for the create() method of the Observable class states "Returns an Observable that will execute the specified function when a Subscriber subscribes to it.". Thinking about it when the source are click events for example still confuses me.
Anyway, I see other similar implementations to the one provided above (on RxBindings for Android for example) but it still does not convince me.
A further explanation would be much appreciated :)
Thanks!
Paolo
Thanks Paolo, I'm glad you found it helpful :)
DeleteRegarding UI events, yes it may feel more natural to use Subjects and perhaps it is *more* appropriate than other cases. But there still is a better way. The Observable.create() can be used to create hot Observables and not just cold ones.
For instance, consider the Button example I showed earlier. Anything that follows a Listener convention can likely be turned into a hot Observable in this manner. If you study it long enough, you will see it is not as complex as it looks. On Subscription, a Listener is created that will call onNext() each time it is fired. The unsubscription is also handled by removing the Listener when it is done. This critical unsubscription behavior is a bit less straightforward to do with Subjects.
Of course, it is a bit verbose and this is why it is a good idea to use RxBindings, RxJavaFX, or other libraries that provide source Observable factories for you.
Great article. I already read other people writing about PublishSubjects being somehow wrong, but none of them explained exactly why, but you just did. Thanks for this
DeleteExactly why I wrote this. Glad you found it helpful!
DeleteGreat Article
DeleteIEEE final year projects on machine learning
JavaScript Training in Chennai
Final Year Project Centers in Chennai
JavaScript Training in Chennai
Thanks Thomas, now I got it.
ReplyDeleteSubjects are often FAR better then calling Observable.create().
ReplyDeleteThe reason is that creating an observable is MUCH harder then one could think and you need to know the internals of RxJava to build one that take into account everything an Observable should.
Subjects on the other way do all that correctly out of the box.
Sure, you need to be careful calling onNext on the right thread but you can assure that easily enough and I think it's the only real gotcha with using Subjects (or Relay).
Just wrap subject in a class, and DO NOT expose them, return them with subject.asObservable() and provide methods (if needed) to push stuff into it that make sure they use the right thread.
See https://akarnokd.blogspot.it/2015/06/subjects-part-1.html
There are many implications in using Subjects that make them less of a candidate than Observable.create(), like backpressure and concurrency issues. Observable.create() should be avoided as much as possible too, but there are fare more gotchas with subjects.
Deletebackpressure and concurrency issues can both be handled and sometimes easier in Subjects. There is nothings bad or wrong about Subjects and they are super useful to decouple modules while keep decouple cross functional messaging clear. Saying do not use Subjects 99.99% to newbies can led to them reinvent stuff and mess it up. In case of fully Rx components available you may end up using 99.99% only Subjects to wire things up, and there is nothing wrong about it.
DeleteFunny you say that. I don't know if I agree completely with all your sentiments, but I do agree with your thoughts on decoupling modules. I have relaxed my views on Subjects slightly in the past month or two as I have ran into this problem. Situations where you need an "event bus" in some form, or an "event model", I can see those as valid use cases for Subjects. I've even created a CompositeObservable type in the RxJavaFX library to fulfill this need. If I didn't have that and some utilities JavaFX provided, I probably would have to use Subjects.
Deletehttps://github.com/ReactiveX/RxJavaFX#compositeobservable
Regardless, I think Subjects are one of those things that should be learned out of need rather than be introduced upfront. For newbies, they attract anti-patterns and should be used as a last resort, or decoupling modue
While working with Couchbase client and Spring Cloud Sleuth, I found an issue with how Couchbase uses Subject. Sleuth relies on the RxJava Scheduler hooks to continue the trace and span id; the code is in SleuthRxJavaSchedulersHook of spring-cloud-sleuth project. AbstractCouchbaseRequest of couchbase-jvm-core project creates a subject AsyncSubject.create() that bypasses the RxJava hooks. Hence, Sleuth can't trace the Couchbase queries made.
ReplyDeleteThis is a simplistic but correct view of the problem, and since Markdown is not supported, I didn't provide direct links to the classes I referred to (but you can find them easily using the keywords I used). Do you've any suggestions on how to fix this?
this is really a serious issue which need to be considered.
ReplyDelete