ZOFTINO.COM

RxJava Operators Part 2

RxJava operators can be used to work with observable to convert, merge, group, filter, and perform other actions on observables.

In my previous article I explained few RxJava operators with examples. In this article, I am going to explain rest of the RxJava operators which are not covered in the first article. Examples code in this article is intentionally written in the long form to make it easy for readers to decipher concept behind each RxJava operator.

If you want to learn RxJava concepts and view RxJava examples, you can visit my previous article RxJava2 concepts and examples.

RxJava Operator GroupBy

GroupBy operator groups items emitted by source observable and creates GroupedObservable for each group. GroupBy operator returns an observable which emits GroupedObservable objects. You can find out which group the emitted GroupedObservable represents by checking key value of GroupedObservable. Grouping logic needs to be defined in the Function that is passed to groupBy operator. In the Function, you need to define logic that identifies group for each item emitted by source observable and return group id for the item.

Below example, groups integer into even and odd groups, and prints even group by subscribing to even GroupedObservable.

		Observable<Integer> observable = Observable.range(1, 10);		

		Observable<GroupedObservable<String, Integer>> groupedObservable =  observable.groupBy(new Function<Integer, String>(){
			@Override
			public String apply(Integer t) throws Exception {
				if(t%2 > 0){
					return "Odd Number";
				}else{
					return "Even Number";
				}
			}
		});

		groupedObservable.subscribe(s -> { LOGGER.info("grouped observable item key is  "+s.getKey()); 		
		if("Even Number".equals(s.getKey())){
			s.subscribe ( groupedObservavleItem -> {LOGGER.info("Items from the even number group observable "+groupedObservavleItem);});
		}
		});
Rxjava group by operator example

RxJava Operator IntervalRange

IntervalRange operator creates an observable which emits values in the given range of numbers. Each item is emitted after specified time interval is elapsed.

		Observable<Long> observable = Observable.intervalRange(1, 20, 100, 200, TimeUnit.MILLISECONDS);
		observable.subscribe(l -> LOGGER.info("observer - onNext value "+l));
		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {

		}

RxJava Operator Just

Just operator creates an observable which emits supplied items to just operator.

		Observable<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");		
		observableOne.subscribe(s -> LOGGER.info("values from observable "+s));

RxJava Operator Map

Map operator creates an observable which emits items returned by a function that is supplied to map operator. The function is called for each item emitted by source observable. The difference between flatMap and map is that Function supplied to flatMap retruns ObservableSource for each item emitted by source observable and flatMap merges all the items from the observableSources to create final observable.

		Observable<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");
		Observable<String> observableFin = observableOne.map(new Function<String, String>(){			
			@Override
			public String apply(String t) throws Exception {
				return "map applied on "+t;
			}
		});

		observableFin.subscribe(s -> LOGGER.info("values after map operator "+s));	

RxJava Operator MergeWith

MergeWith operator allows you to combine items emitted by source observables to create a new observable which emits all the items.

		Observable<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");
		ObservableSource<String> observableTwo = Observable.just("c", "c plus", "perl", "c sharp", "dot net");

		Observable<String> observableFinal = observableOne.mergeWith(observableTwo);
		observableFinal.subscribe(s -> LOGGER.info("items after merge operator "+s));

RxJava Operator OfType

OfType operator creates an observable which emits items which are emitted by source observable and are of given type.

		Observable<Object> observableOne = Observable.just("java", "spring", "hibernate", 1, 2, 3);	
		Observable<String> observableFinal = observableOne.ofType(String.class);
		observableFinal.subscribe(s -> LOGGER.info("items ofType String... "+s));	

RxJava Operator OnErrorResumeNext

OnErrorResumeNext operator allows you to create an observer that emits items from source observable and emits its own items if source observable throws exception or calls onError. OnErrorResumeNext operator takes Function object as input that returns observable. The Function’s apply method will be called only if source observable throws an exception or calls onError.

		Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
			@Override			 
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				String s = null;
				e.onNext("STRUTS");
				e.onNext("SPRING");
				//throws null pointer exception
				s = s.toLowerCase();
				e.onNext("TILES");
				e.onNext("JSTL");
				e.onComplete();
			}
		});
		Observable<String> observableFinal = observable.onErrorResumeNext(new Function<Throwable, ObservableSource<String>>(){
			@Override
			public ObservableSource<String> apply(Throwable t) throws Exception {
				return Observable.just("c", "c plus", "perl");
			}
		});
		observableFinal.subscribe(s -> LOGGER.info("items ... "+s));	
Rxjava onErrorResumeNext operator example

RxJava Operator OnErrorReturn

OnErrorReturn operator allows you to return a value if source observable calls onError.

 		Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
			@Override			 
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				String s = null;
				e.onNext("STRUTS");
				e.onNext("SPRING");
				e.onError(null);
				e.onNext("TILES");
				e.onNext("JSTL");
				e.onComplete();
			}
		});
		Observable<String> observableFinal = observable.onErrorReturn(new Function<Throwable, String>(){
			@Override
			public String apply(Throwable t) throws Exception {
				return "Could not return all frameworks";
			}
		});
		observableFinal.subscribe(s -> LOGGER.info("items ... "+s));	 

RxJava Operator onExceptionResumeNext

OnExceptionResumeNext allows you to create an observable which emits items from source observable, but it emits its own items if source observable encounters an exception. OnExceptionResumeNext is similar to onErrorResumeNext. But with onErrorResumeNext, you can provide different observables based on the exception it receives from source observable.

 		Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
			@Override			 
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				String s = null;
				e.onNext("STRUTS");
				e.onNext("SPRING");
				//to throw exception
				s = s.toLowerCase();
				e.onNext("TILES");
				e.onNext("JSTL");
				e.onComplete();
			}
		});
		ObservableSource<String> observableTwo = Observable.just("c", "c plus", "perl", "c sharp", "dot net");
		Observable<String> observableFinal = observable.onExceptionResumeNext(observableTwo);
		observableFinal.subscribe(s -> LOGGER.info("items ... "+s));	 

RxJava Operator Publish

Publish operator allows you to create an observable which emits items after connect is called on it. One more point is that it emits each item only once, meaning if an observer subscribes to publish-observer after it has started emitting items, the observer won’t receive items already emitted.

 		Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
			@Override			 
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				String s = null;
				e.onNext("STRUTS");
				e.onNext("SPRING");
				e.onNext("TILES");
				e.onNext("JSTL");
				e.onComplete();
			}
		});	
		ConnectableObservable<String> publishObservable = observable.publish();

		publishObservable.subscribe(s -> { LOGGER.info("subscriber one  "+s);}); 	
		
		publishObservable.subscribe(s -> { LOGGER.info("subscriber two  "+s);}); 
		
		publishObservable.connect();
		
		publishObservable.subscribe(s -> { LOGGER.info("subscriber three  "+s);}); 

RxJava Operator Reduce

Reduce operator returns Maybe observable which emits aggregated value of items emitted by source observable or an exception, it aggregates items emitted by source observable by applying supplied BiFunction. BiFunction’s apply method is called each item an item is emitted by source observable and takes two arguments, one argument is next item from the source observable and the second argument is the value returned by this method for previous value from the source observable.

 		Observable<Integer> observable = Observable.range(1, 10);		

		Maybe<Integer>  maybeObservable =  observable.reduce(new BiFunction<Integer, Integer, Integer>(){
			@Override
			public Integer apply(Integer t, Integer st) throws Exception {
				return t + st;
			}
		});

		maybeObservable.subscribe(s -> { LOGGER.info("reduce value  "+s);}); 

RxJava Operator RepeatUntil

RepeatUntil operator creates an observable that repeatedly emits, until the supplied BooleanSupplier return true, from start to end the values emitted by the source observable.

 		Observable<Integer> observable = Observable.range(1, 5);		

		Observable<Integer> observableRepeat = observable.repeatUntil( new BooleanSupplier(){
			private int count = 0;
			@Override
			public boolean getAsBoolean() throws Exception {
				count++;
				if(count < 5 ){
					return false;
				}else{
					return true;
				}
			}});

		observableRepeat.subscribe(s -> { LOGGER.info("repeat value  "+s);}); 

RxJava Operator RepeatWhen

RepeatWhen operator creates an observable which emits the same items emitted by the source observable, but repeatWhen can resubscribe to the source observable based on the return value from the supplied function. This function is called only once after source observable calls onComplete or onError first time. This function is passed an observable which emits an item each time source observable calls onComplete or onError. By subscribing to this observable, you can get notification about source observable’s onComplete call and indicate back to repeatWhen operator whether to resubscribe or call onComplete on child subscribers.

		//rxjava repeatWhen operator example
		Observable<String> observable = Observable.just("c", "c plus", "perl", "c sharp", "dot net");	

		Observable<String> observableRepeat = observable.repeatWhen(new Function<Observable<Object> , ObservableSource<Object>>(){
			private int count = 0;
			@Override
			public ObservableSource<Object> apply(Observable<Object> t) throws Exception {
				LOGGER.info("functiona call :  "+count);
				return t.flatMap(ob -> {
					count++;
					if(count < 4 ){
						LOGGER.info("repeating :  "+count);
						return Observable.just(ob);
					}else{
						return Observable.empty();
					}
				});
			}
		});

		observableRepeat.onErrorReturnItem("exception returned") 
		.subscribe(s -> { LOGGER.info("repeat values :  "+s);}); 

RxJava Operator Replay

Replay operator returns ConnectableObservable that starts emitting items only after connect is called on it. Replay operator is similar to publish operator except that it will replay all items emitted by source observable to future observers.

 		Observable<String> observable = Observable.just("c", "c plus", "perl", "c sharp", "dot net");
		
		ConnectableObservable<String> replayObservable = observable.replay();
		replayObservable.subscribe(s -> { LOGGER.info("subscriber one  "+s);});
			
		replayObservable.connect();
		
		replayObservable.subscribe(s -> { LOGGER.info("subscriber two  "+s);}); 

RxJava Operator Retry

Retry operator creates an observable that can handle onError call from source observable and resubscribes to the source observable so that all items from source observable are emitted again.

 		//show how to use rxjava repeat operator
		Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
			private boolean oneTimeFail = false;
			@Override			 
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				String s = null;
				e.onNext("int");
				e.onNext("long");
				//it fails first time
				if(!oneTimeFail){
					oneTimeFail = true;
					e.onError(new Exception());
				}
				e.onNext("char");
				e.onNext("double");
				e.onComplete();
			}
		});
		Observable<String> observableFinal = observable.retry();

		observableFinal.subscribe(s -> LOGGER.info("items ... "+s));	
Rxjava retry operator example

RxJava Operator RetryUntil

RetryUntil operator is similar to retry operator explained above except that it will retry every time onError is called by source observable until the supplied BooleanSupplier function returns true.

 		//rxjava retryUntil operator example
		Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
			private int failCount = 0;
			@Override			 
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				String s = null;
				e.onNext("int");
				e.onNext("long");
				//it fails two times
				failCount++;
				if(failCount < 3){
					e.onError(new Exception());
				}
				e.onNext("char");
				e.onNext("double");
				e.onComplete();
			}
		});		

		Observable<String> observableRetry = observable.retryUntil( new BooleanSupplier(){
			private int count = 0;
			//it will retry 3 times
			@Override
			public boolean getAsBoolean() throws Exception {
				count++;
				if(count < 4 ){
					return false;
				}else{
					return true;
				}
			}});

		observableRetry.subscribe(s -> { LOGGER.info("retry value  "+s);}); 

RxJava Operator RetryWhen

RetryWhen operator can be used to resubscribe to source observable in case of exception in the source observable. RetryWhen operator takes Function as parameter. This function takes an observer, that emits errors, as input and returns an observable that retryWhen operator uses to decide whether to resubscribe or call onError or onComplete on child subscribers. This function is called only once when first time error occurs in the source observable. For subsequent error notification from the source observable, error observable, that is passed to Function when the function was called first time and that emits errors from source observable, is used to get error notifications and do something with it.

Below example shows how to define function, how to subscribe to error observable and how to return observable from the function. In this example, retryWhen tries to resubscribe to the source observable three times by making the function return an observable that can emit plain item and then on receiving error fourth time, it calls onError on child subscriber by making the function return an observable which emits error.

 		//rxjava retryWhen operator example
		//source observable always throw exception
		Observable<String> observable = Observable.create(s -> {
			s.onError(new Exception());
		});		

		//using retryWhen, 3 times it is resubscribed
		Observable<String> observableRetry = observable.retryWhen(new Function<Observable<Throwable> , ObservableSource<String>>(){
			private int count = 0;
			@Override
			public ObservableSource<String> apply(Observable<Throwable> t) throws Exception {
				return t.flatMap(error -> {LOGGER.info("error from source  "+error);
				count++;
				if(count < 4 ){
					LOGGER.info("retrying  "+count);
					return Observable.just("");
				}else{
					return Observable.error(error);
				}
				});
			}});

		observableRetry.onErrorReturnItem("exception") 
		.subscribe(s -> { LOGGER.info("retry value  "+s);}); 
 INFO: error from source  java.lang.Exception
INFO: retrying  1
INFO: error from source  java.lang.Exception
INFO: retrying  2
INFO: error from source  java.lang.Exception
INFO: retrying  3
INFO: error from source  java.lang.Exception
INFO: retry value  exception

RxJava Operator Sample

Use sample operator if you don’t want all items from source observable, but want only recent item at given time intervals.

		Observable<Integer> observable = Observable.range(1, 24425).sample(1, TimeUnit.NANOSECONDS);
		observable.subscribe(s -> { LOGGER.info("vale after every 1 nano secs  "+s);}); 

RxJava Operator Scan

Scan operator returns an observable which emits aggregated values of items emitted by source observable, it aggregates items emitted by source observable by applying supplied BiFunction. BiFunction’s apply method is called each time an item is emitted by source observable. The function takes two arguments, one argument is next item from the source observable and the second argument is the value returned by this method for previous value from the source observable.

Scan operator is similar to reduce operator in that both apply aggregating function on the values emitted by source observable with one difference. Reduce operator creates an observable which emits final value after applying aggregating function on all values emitted by source observable. Whereas scan operator creates an observable which emits all values returned by the function.

		Observable<Integer> observable = Observable.range(1, 10);		

		Observable<Integer>  finalObservable =  observable.scan(new BiFunction<Integer, Integer, Integer>(){
			@Override
			public Integer apply(Integer t, Integer st) throws Exception {
				return t + st;
			}
		});

		finalObservable.subscribe(s -> { LOGGER.info("scan values  "+s);}); 

RxJava Operator SkipWhile

SkipWhile operator creates an observable which skips items emitted by source observable until the supplied Predicate holds true and emits items after Predicate returns false.

		Observable<String> observable = Observable.just("java", "hibernate", "android", "spring", "maven", "struts");

		//skipWhile operator 
		Observable<String> skipWhileOb= observable.skipWhile(new Predicate<String>(){
			@Override
			public boolean test(String t) throws Exception {
				return t.contains("a") ? true : false;		
			}			
		});
		skipWhileOb.subscribe(s -> LOGGER.info("Items from skipWhile operator "+s));

RxJava Operator Sorted

Sorted operator creates an observable which emits source-observable items in a sorted order. It uses the supplied comparator object for sorting items emitted by source observable.

		Observable<String> observable = Observable.just("java", "hibernate", "android", "spring", "maven", "struts");

		//skipWhile operator 
		Observable<String> observableS = observable.sorted(new Comparator<String>(){
			@Override
			public int compare(String arg0, String arg1) {		
				return arg0.compareTo(arg1);
			}			
		});
		observableS.subscribe(s -> LOGGER.info("Items sorted "+s));

RxJava Operator StartWith

StartWith operator creates an observable which emits items from the given input to it before it starts emitting items from source observable.

 		Observable<String> observable = Observable.just("java", "hibernate", "android", "spring", "maven", "struts");
		Observable<String> observableSec = Observable.just("c", "c plus", "perl", "c sharp", "dot net");
		
		Observable<String> observableFin = observable.startWith(observableSec);		
		observableFin.subscribe(s -> LOGGER.info("Items startWith operator "+s)); 

RxJava Operator SwitchMap

SwitchMap use Function to create an observable for each item emitted by source observable and emits items from recent observable returned by the function.

 		Observable<String> observable = Observable.just("java", "hibernate", "android", "spring", "maven", "struts");
	
		Observable<String> observableFin = observable.switchMap(new Function<String, ObservableSource<String>>(){
			@Override
			public ObservableSource<String> apply(String t) throws Exception {
				return Observable.just("language first "+t,"framework "+t, "frontend "+t, "backend last "+t).delay(10, TimeUnit.NANOSECONDS);
			}			
		});		
		observableFin.subscribe(s -> LOGGER.info("SwithMap items "+s));
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
		} 
Rxjava switchmap operator example

RxJava Operator Take

Take operator creates an observable which emits only specified number of first items which are emitted by source observable.

 		Observable<Integer> observable = Observable.range(1, 133).take(20);
		observable.subscribe(s -> { LOGGER.info("take 20  "+s);}); 

RxJava Operator TimeInterval

TimeInterval creates an observable which emits time interval between items emitted by source observable.

 		Observable<Timed<Integer>> observable = Observable.range(1, 9).timeInterval();
		observable.subscribe(s -> { LOGGER.info("time interval  "+s.time());}); 

RxJava Operator Timer

Using timer operator, you can delay receiving items from observables. Timer operator creates an observable which emits item after specified time expires.

 		Observable.timer(2000, TimeUnit.MILLISECONDS).flatMap( d -> Observable.just("java", "hibernate", "android"))
		.subscribe(s -> LOGGER.info("items "+s));			
		
		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {

		} 

RxJava Operator Window

Window operator creates an observable which emits observables. Each of these observables emits specified number of items from the source observable. For example, if you pass 3 to window operator, it will create an observable for each three items from the source observable. First observable contains first three items, second observable contain next three items and so on.

 		Observable<Observable<Integer>> observable = Observable.range(1, 133).window(3);
		observable.subscribe(s -> { LOGGER.info("next window "); s.subscribe(i -> LOGGER.info("items window observable "+i));}); 

RxJava Operator Zip

Zip operator applies the given function on the items emitted in sequence by given list of observables and creates an observer which emits the items returned by this function. Zip operator applies function on items emitted by the given list of observable sequentially, for example, when function is called for the first time, all the first items from all the given observables are passed to the function.

 		List<Observable<String>> lst = new ArrayList<Observable<String>>();
		
		Observable<String> observable = Observable.just("java", "hibernate", "android", "spring", "maven", "struts");
		Observable<String> observableSec = Observable.just("c", "c plus", "perl", "c sharp", "dot net", "asp");
		Observable<String> observableThird = Observable.just("maven", "ant", "make", "gradle", "ivy", "othertool");
		lst.add(observable);
		lst.add(observableSec);
		lst.add(observableThird);
		
		Observable<String> observableFin = Observable.zip(lst, new Function<Object[], String>(){
			@Override
			public String apply(Object[] t) throws Exception {
				String finStr = "";
				for(Object s : t){
					finStr = finStr +" "+s;
				}
				return finStr;
			}});		
		observableFin.subscribe(s -> LOGGER.info("Items after zip operator "+s)); 
 
INFO: Items after zip operator  java c maven
INFO: Items after zip operator  hibernate c plus ant
INFO: Items after zip operator  android perl make
INFO: Items after zip operator  spring c sharp gradle
INFO: Items after zip operator  maven dot net ivy
INFO: Items after zip operator  struts asp othertool