ZOFTINO.COM android and web dev tutorials

RxJava Observables

In reactive programming, observables emit data to its subscribers called observers. Observers process or take action on the emitted data. With this pattern, instructions can be executed parallel without blocking main program. In other words, if tasks have no dependency on each other, they can be run parallel and asynchronously.

In this article, I’ll explain about different observables and its categories. If you want to learn RxJava and RxJava operators, you can read RxJava examples and RxJava Operators.

RxJava Observable Types

Depending on when observables start emitting items, observables can be categorized into hot, cold and connectable observables.

Observables can also be categorized into backpressured and non-backpressured observables depending on backpressure support.

RxJava provides observables that emit 0 or one items.

Cold Observables

Cold observables don’t emit items until an observer is subscribed to it. Observers subscribed to cold observables always get items from the beginning, meaning they get all the items emitted by observable.

Notice in the output of below cold observable that both subscribers receives all the items emitted by cold observable.

Cold observable example

Observable<Integer> observableInt = Observable.range(1,3).subscribeOn(Schedulers.computation());

observableInt.subscribe(i -> LOGGER.info("first subscriber : "+i));

Thread.sleep(27);

observableInt.
	subscribe(i -> LOGGER.info("second subscriber "+i));

Thread.sleep(500); 

Cold observable output

INFO: first subscriber : 1
INFO: first subscriber : 2
INFO: second subscriber 1
INFO: first subscriber : 3
INFO: second subscriber 2
INFO: second subscriber 3 

Hot Observables

Hot observables start emitting data immediately after the observable has been created. Hot observables emit data even if there are no observers subscribed to it. If an observer subscribes to a hot observable later after it has been created, the observer will receive items from the current item at the point of subscription not from the beginning, meaning observers won’t receive already emitted items from hot observables.

Hot observable can be created using Subject by converting cold observable into hot observable. Subject is both an observable and observer; it subscribes to an observable as observer and emits items as hot observable to its subscribed observers.

Below example shows how to create hot observable using PublishSubject. PublishSubject subscribes to cold observable and emits items as hot observable to its subscribers. Notice in the output of hot observable example that first subscriber received all items emitted, whereas, second subscriber missed first item emitted by observable.

RxJava hot observable example

Observable<Long> observableInterval = Observable.interval(30, TimeUnit.MILLISECONDS);	
		
PublishSubject<Long> pSubject = PublishSubject.create();
observableInterval.subscribe(pSubject);

pSubject.subscribe(l -> LOGGER.info("first subscriber : "+l));

Thread.sleep(35);

pSubject.
	subscribe(l -> LOGGER.info("second subscriber "+l));

Thread.sleep(90); 

Hot observable output

INFO: first subscriber : 1
INFO: first subscriber : 2
INFO: second subscriber 2
INFO: first subscriber : 3
INFO: second subscriber 3
INFO: first subscriber : 4
INFO: second subscriber 4 

Connectable Observables

Connectable observables don’t emit items until connect method is called on it. As soon as connect method is called, connectable observable start emitting items without considering number of observers subscribed to it. After connect method is called, connectable observables behave as hot observables.

Connectable observable can be created from cold observable by calling publish method on it. Notice in the output of connectable observable example that subscribers receive items only after connect method is called and second subscriber misses first few items as it subscribed later.

Connectable observable example

Observable<Integer> observableInt = Observable.range(1,4).subscribeOn(Schedulers.computation());
				
ConnectableObservable<Integer> connectableIntObservable = observableInt.publish();
connectableIntObservable.subscribe(i -> LOGGER.info("first subscriber : "+i));

Thread.sleep(10);
LOGGER.info("After 10 milli seconds sleep");
				
connectableIntObservable.connect();
				
Thread.sleep(2);

connectableIntObservable.
	subscribe(i -> LOGGER.info("second subscriber "+i));

Thread.sleep(500);

Connectable observable output

INFO: After 10 milli seconds sleep
INFO: first subscriber : 1
INFO: first subscriber : 2
INFO: first subscriber : 3
INFO: second subscriber 3
INFO: first subscriber : 4
INFO: second subscriber 4

Backpressured Observable : Flowable

First let’s understand what the backpressure is. Backpressure comes into picture when observables produce more items than consumer can consume. Backpressure needs to be handled in order to reduce resource consumption. There are ways to handle backpressure such as buffering, skipping, batching and pull backpressure etc. Backpressure is applicable to hot observable as cold observables are pull-based.

Flowable is a backpressured observable meaning that backpressure handling mechanisms are built into Flowable observable and Flowable operators.

Converting Flowable into hot observable

Flowable is clod observable; it can be converted into hot observable using processors. For example, below code converts Flowable into hot observable using PublishProcessor.

Flowable<Long> observableInterval = Flowable.interval(20, TimeUnit.NANOSECONDS);	
			
PublishProcessor<Long> processor = PublishProcessor.create();
observableInterval.subscribe(processor);

processor.subscribe(l -> LOGGER.info("flowable hot observerable item : "+l));			

Thread.sleep(900);

Converting Flowable into Connectable Obserable

Flowable can be converted to connectable observable using publish operator.

Flowable<Long> observableInterval = Flowable.interval(20, TimeUnit.NANOSECONDS);	
ConnectableFlowable<Long> connectableFlowable = observableInterval.publish();
connectableFlowable.connect();
connectableFlowable.subscribe(l -> LOGGER.info("flowable connectable observerable item : "+l));	 

Non-Backpressured Observable

Observable is a non-backpressured RxJava observable. Observable operators don’t throw MissingBackpressureException but can cuase OutOfMemoryError. See hot, cold and connectable observables sections as Observable is used to explain them.

Completable Observable

Completable observables doesn’t emit any value, it just notifies subscribers about status with complete or exception indicator. Unlike other observables, Completable doesn’t call onNext on observers. The main observer for Completable observable is CompletableObserver.

Observable operators which produce Completable are ingnoreElements and flatMapCompletable.

Observable<Integer> observable = Observable.range(1, 5);
Completable completable = observable.ignoreElements();

completable.subscribe( () -> {
		LOGGER.info("Perform some action here  " );},
	throwable -> {
		LOGGER.info("error in ingnoring elements");
	}); 

Maybe Observable

Maybe Observable emits either 0 or 1 item. MaybeObserver is the main subscriber of Maybe observable. Maybe observable calls onSuccess, onError or onComplete methods on MaybeObserver. OnSuccess and onComplete are called when Maybe emits item and only onComplete is called when Maybe doesn’t emit item.

The observable operators which produce Maybe observable are elementAt, firstElement, lastElement and reduce.

Observable<String> observable = Observable.just("RxJava","is","the","best");

Maybe<String> maybeObservable = observable.reduce(
	new BiFunction<String, String, String>() {
		@Override
		public String apply(String conStr, String item) throws Exception {
			return conStr+ " " + item;
		}
	});

maybeObservable.subscribe(s -> {
		LOGGER.info("item from maybe observable  " + s);
	});

You can convert Maybe observable to Observable by calling toObservable operator on Maybe observable.

Single Observable

Single observable emits one item. SingleObserver is the main subscriber of Single observable. Single observable calls onSuccess or onError on SingleObserver for emitting items or exception respectively.

The observable operators which produce Single observable are all, any, collect, contains, count, elementAt, first, isEmpty, last, reduce, single, toList and toMap.

 Observable<Integer> observable = Observable.just(4, 6, 8, 10, 11);

Single<Boolean> containsOdd = observable.all(new Predicate<Integer>() {
	@Override
	public boolean test(Integer t) throws Exception {
		return t % 2 == 0 ? true : false;
	}});

containsOdd.subscribe(isOdd -> 
		LOGGER.info("Does the given list of numbers contain odd number? "+!isOdd),
	throwable -> {
		LOGGER.info("error in findig type of numbers");
	});

Most of the Observable operators are available with Single observable. In case you need to convert Single observable to Observable to perform unavailable operators or for other reasons, you can do so by calling toObservable on Single observable.