ZOFTINO.COM android and web dev tutorials

RxJava Subject

RxJava subject is both an observer and observable. Subject can subscribe to multiple observables and emit the items to its subscribers.

In this article, details about Subject, Subject sub classes and its uses are provided. If you want to learn RxJava, you can read RxJava examples and RxJava Operators.

Subject

Subject implements both Observer and ObservableSource that is why it can be used as subscriber and producer. Subject is mainly used for multicasting events to its child observers and converting cold observable into hot observable. I’ll cover these two uses in details later in the post.

Subject is the base class and AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject, UnicastSubject, CompletableSubject, SingleSubject and MaybeSubject are subclasses of Subject. All these types of subject are covered in detail in the below sections.

Below example shows subject as both an observer and observable.

Observable<String> observable = Observable.just("hot","warm","cold")
	.subscribeOn(Schedulers.computation())
	.map(i -> {return i+" climate";});
			
BehaviorSubject<String> bSubject = BehaviorSubject.create();
observable.subscribe(bSubject);
		
bSubject.subscribe(s -> LOGGER.info("subscriber one: " + s));
bSubject.subscribe(s -> LOGGER.info("subscriber two: " + s));

RxJava Multicasting using Subject

Subject does multicast items to multiple child subscribers meaning that Subject emits items or events received from its observables to its multiple subscribers. Multicasting makes it possible to run expensive operations once and emit the results to multiple subscribers.

In the below example, chain of operations are executed two times for each subscriber.

Observable<Integer> observable = 
	Observable.range(1, 2).subscribeOn(Schedulers.computation())
		  .flatMap(i -> {
			LOGGER.info("executing expensive operation first one for : " + i);
			return Observable.just(i);
		  }).map( val -> {
			LOGGER.info("executing expensive operation secon one for : " + val);
			return val;
		  });
			
observable.subscribe(l -> LOGGER.info("first subscriber : " + l));

observable.subscribe(l -> LOGGER.info("second subscriber :" + l));

Thread.sleep(90);

Output

INFO: executing expensive operation first one for : 1
INFO: executing expensive operation first one for : 1
INFO: executing expensive operation secon one for : 1
INFO: executing expensive operation secon one for : 1
INFO: first subscriber : 1
INFO: second subscriber :1
INFO: executing expensive operation first one for : 2
INFO: executing expensive operation first one for : 2
INFO: executing expensive operation secon one for : 2
INFO: first subscriber : 2
INFO: executing expensive operation secon one for : 2
INFO: second subscriber :2

Multicasting using Subject an example

In the below example, chain of operations on observable which is subscribed to by subject is run only once even though there are two subscribers to the subject.

Observable<Integer> oneTimeOperationObservalbe = 
	Observable.range(1, 2).subscribeOn(Schedulers.computation())
	.flatMap(i -> {
		LOGGER.info("executing expensive operation first one for : " + i);
		return Observable.just(i);
	}).map( val -> {
		LOGGER.info("executing expensive operation secon one for : " + val);
		return val;
	});
			
PublishSubject<Integer> pSubject = PublishSubject.create();
oneTimeOperationObservalbe.subscribe(pSubject);
		
pSubject.subscribe(l -> LOGGER.info("first subscriber : " + l));
pSubject.subscribe(l -> LOGGER.info("second subscriber : " + l));

Multicasting example output

INFO: executing expensive operation first one for : 1
INFO: executing expensive operation secon one for : 1
INFO: first subscriber : 1
INFO: second subscriber 1
INFO: executing expensive operation first one for : 2
INFO: executing expensive operation secon one for : 2
INFO: first subscriber : 2
INFO: second subscriber 2

Hot Observable

RxJava observable classes such as Observable and Flowable are cold observers meaning they emit items only after an observer is added to them and that too they emit all the items to all the subscribers. In your application, if you need an observable (hot) which emits items only once regard less of number of subscribers and its subscribers receive items from the point of their subscription, then you can use Subject. Subject converts cold observable into hot observable.

Below code shows how subject converts two observable into hot observable. See output, all items from both observables are emitted only once. You can also notice that second observer subscribed to subject misses first few items emitted by subject.

Observable<Integer> observableOne = Observable.range(1, 4)
		.subscribeOn(Schedulers.computation())
		.map(i -> {return i*1;});
			
Observable<Integer> observableTwo = Observable.range(105, 108)
		.subscribeOn(Schedulers.computation())
		.map(i -> {return i*1;});

PublishSubject<Integer> pSubject = PublishSubject.create();
observableOne.subscribe(pSubject);
observableTwo.subscribe(pSubject);

pSubject.subscribe(l -> LOGGER.info("subscriber a : " + l));

Thread.sleep(2);

pSubject.subscribe(l -> LOGGER.info("subscriber b : " + l));

Output

INFO: subscriber a : 105
INFO: subscriber a : 1
INFO: subscriber a : 106
INFO: subscriber a : 2
INFO: subscriber b : 106
INFO: subscriber b : 2
INFO: subscriber a : 107
INFO: subscriber a : 3
INFO: subscriber b : 107
INFO: subscriber b : 3
INFO: subscriber a : 108
INFO: subscriber a : 4
INFO: subscriber b : 108
INFO: subscriber b : 4

AsyncSubject

AsyncSubject emits just last value and calls onComplete. And this happens only if source observable completes.

AsyncSubject<Integer> pSubject = AsyncSubject.create();
pSubject.onNext(1);
pSubject.onNext(2);
pSubject.onNext(3);

pSubject.observeOn(Schedulers.computation())
.subscribe(l -> LOGGER.info("subscriber received : " + l));
pSubject.onNext(4);
pSubject.onNext(5);
pSubject.onComplete();

Output

INFO: subscriber received : 5

BehaviorSubject

Subscribers of BehaviorSubject receive most recent item at the time of their subscription and all items after that.

BehaviorSubject<Integer> pSubject = BehaviorSubject.create();
pSubject.onNext(1);
pSubject.onNext(2);

pSubject.observeOn(Schedulers.computation())
	.subscribe(l -> LOGGER.info("subscriber one : " + l));

pSubject.onNext(3);
pSubject.onNext(4);

pSubject.observeOn(Schedulers.computation())
	.subscribe(l -> LOGGER.info("subscriber two : " + l));

pSubject.onNext(5);

Output

INFO: subscriber one : 2
INFO: subscriber two : 4
INFO: subscriber one : 3
INFO: subscriber two : 5
INFO: subscriber one : 4
INFO: subscriber one : 5

PublishSubject

Observers of PublishSubject receive items from the point of subscription. The difference between PublishSubject and BehaviorSubject is that observers of BehaviorSubject receive one extra item which is most recent one at the time of subscription. With above examples if you use PublishSubject instead of using BehaviorSubject, you will get below output.

INFO: subscriber one : 3
INFO: subscriber two : 5
INFO: subscriber one : 4
INFO: subscriber one : 5 

ReplaySubject

Notice in the above example output that PublishSubject starts emitting items immediately after it has been created. By the time observer subscribes to the subject, it may emit few items which observers miss to receive.

If you want to guarantee that observers of a subject receive all the items emitted by the subject, you should use ReplaySubject. If you try the same example, that is used to show BehaviorSubject, with ReplaySubject, you will get below ouput , subscribers receive all the items.

INFO: subscriber one : 1
INFO: subscriber two : 1
INFO: subscriber two : 2
INFO: subscriber one : 2
INFO: subscriber two : 3
INFO: subscriber one : 3
INFO: subscriber two : 4
INFO: subscriber one : 4
INFO: subscriber two : 5
INFO: subscriber one : 5 

ReplaySubject example

Observable<Integer> observableOne = Observable.range(1, 3)
		.subscribeOn(Schedulers.computation())
		.map(i -> {return i*1;});			

ReplaySubject<Integer> pSubject = ReplaySubject.create();
observableOne.subscribe(pSubject);

Thread.sleep(10);
pSubject.subscribe(l -> LOGGER.info("subscriber : " + l));

ReplaySubject example output

 INFO: subscriber : 1
INFO: subscriber : 2
INFO: subscriber : 3

UnicastSubject

UnicastSubject allows only single subscriber during its lifetime and emits all the items to it. If one observer is already subscribed to UnicastSubject and second one tries to subscribe, second observer will get exception "Only a single observer allowed".

Observers of UnicastSubject can request for replay of notifications as UnicastSubject maintains buffer to hold items.

UnicastSubject example

Observable<String> observable = Observable.just("int","long","float")
	.subscribeOn(Schedulers.computation());		
	
UnicastSubject<String> bSubject = UnicastSubject.create();			
observable.subscribe(bSubject);
Thread.sleep(90);
bSubject.subscribe(s -> LOGGER.info("subscriber : " + s));

Thread.sleep(900); 

UnicastSubject example output

INFO: subscriber : int
INFO: subscriber : long
INFO: subscriber : float 

SingleSubject

SingleSubject can subscribe to multiple SingleObservables. Multiple observers can subscribe to SingleSubject. But SingleSubject emits only one item to all its observers.

SingleSubject example

Single<String> singleObservable = Single.just("int")
	.subscribeOn(Schedulers.computation());	
Single<String> singleObservableTwo = Single.just("double")
	.subscribeOn(Schedulers.computation());
			
SingleSubject<String> bSubject = SingleSubject.create();			
singleObservable.subscribe(bSubject);
singleObservableTwo.subscribe(bSubject);
			
Thread.sleep(50);

bSubject.subscribe(s -> LOGGER.info("subscriber : " + s));
bSubject.subscribe(s -> LOGGER.info("subscriber two: " + s));

SingleSubject example output

INFO: subscriber : double
INFO: subscriber two: double

Similarly, MaybeSubject emits zero or one item and CompletableSubject just emits completion notification or error.

Processors

Like Subjects, Processors are also observers and observables as they implement Flowable and Subscriber. Cold observable like Flowable with backpressure support can be converted into hot observable using Processor. Different types of processors are AsyncProcessor, BehaviorProcessor, PublishProcessor, ReplayProcessor, and UnicastProcessor. These are similar to various types of Subjects except that Processor types can observe Flowable.