ZOFTINO.COM android and web dev tutorials

RxJava2 Operators

RxJava2 operators allow you to create observables by transforming, merging, filtering and grouping items emitted by other observables. Rxjava2 operators convert one observable into another observable which emits somehow modified items emitted by source observable.

In this post I am going to explain RxJava2 operators with examples. If you want to learn about RxJava2, read my previous post RxJava2 examples.

RxJava2 Operator Types

There are different types of operators which can be used on observable. Most of the operators return observables. Observable operators can be categorized depending on what it does to items emitted by source observable.

The operators such as create, range, just, interval, repeat, empty, and defer, which emit data based on the input data or specification, creates new observables.

The operators such as buffer, map, flatMap, scan, compose, and groupBy create observables which emit items after transformation of items emitted by source observable.

The operators such as first, last, distinct, filter, skip, and elementAt create observables which emit items after applying filters on the items emitted by source observable.

The operator such as join, merge, switchOnNext, zip, and combineLatest create observables by combining multiple observers.

RxJava Operator All

All operator takes Predicate object as input which contain logic to test if each item satisfies defined condition. If all items satisfy defined condition, all operator return true otherwise it returns false. All operator returns Single<Boolean> observable which can be used for further actions by subscribing to it.

Any operator is similar to all operator, but it returns true if any one item satisfies the defined condition.

		Observable<String> observable = Observable.just("java", "hibernate", "android");
		
		//all operator checks whether items from main observable contain letter a
		//return single boolean item observable
	
		Single<Boolean> itemsContainA= observable.all(new Predicate<String>(){
			@Override
			public boolean test(String t) throws Exception {
				return t.contains("a") ? true : false;		
			}			
		});
		//subscribe to single observable to do something with the result from all operator
		itemsContainA.subscribe(s -> LOGGER.info("Do items from given observable contain letter a ? "+s));

RxJava Operator BlockingLast

If you want just last item from an observable, use blockingLast operator. To get only first item, you can use blockingFirst operator. There are different variants of blocking operators available.

 		Observable<String> observable = Observable.just("java", "spring", "hibernate", "android");
		String s = observable.blockingLast();
		LOGGER.info("last item from the observable "+s); 

RxJava Operator Buffer

Buffer operator creates an observable which emits list of items each time, buffering items from the source observable. Number of items in the list can be specified in various ways.

 		Observable<Integer> observable = Observable.range(1, 10);
		Observable<List<Integer>> observableBuffer = observable.buffer(3);
		observableBuffer.subscribe(s -> LOGGER.info("observer - values from buffer "+s));
 
Rxjava buffer operator example

RxJava Operator Cache

Cache operators collects all items from source observable and emits them in the same order. Cache operator is useful when you want to cache responses from source observable and emit them when needed to all subscribers.

 		Observable<Long> observable = Observable.intervalRange(1, 8, 100, 600, TimeUnit.MILLISECONDS);
		Observable<Long> observableCache = observable.cache();
		observableCache.subscribe(l -> LOGGER.info("from cache- onNext value "+l));
		
		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {

		} 

RxJava Operator Collect

Collect operator allows you to create a Single item observable which emits one item with data structure containing items emitted by source observable. Below example shows how to use collect operator, which collects all the odd number emitted by source observable into array list.

 		Observable<Integer> observable = Observable.range(1, 10);		
		
		Single<ArrayList<Integer>> observableCollect = observable.collect(new Callable<ArrayList<Integer>>(){
			@Override
			public ArrayList<Integer> call() throws Exception {
				return new ArrayList<Integer>();
			}}, 
				new BiConsumer<ArrayList<Integer>, Integer>() {
            @Override
            public void accept(ArrayList<Integer> integers, Integer num) {
            		if(num%2 > 0) { integers.add(num);}
            }
        });
		observableCollect.subscribe(s -> LOGGER.info("values after collect operator "+s)); 

RxJava Operator CombineLatest

CombineLatest operator can be used to aggregate the latest items which are emitted by each observable from the list of observables passed to it. How latest values from each of the observable are aggregated can be defined as Function and passed it to combineLatest operator. This function is called each time a value is emitted by any of the source observables and the returned values from function will be emitted by observable created by combineLatest operator. The aggregating function can be defined by implementing Function interface which has apply method that takes array of latest values from each observable as input and returns aggregated value.

 		ObservableSource<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");
		
		ObservableSource<String> observableTwo = Observable.just("language", "di", "orm", "os", "ractive p");
		
		ObservableSource<String> observableThree = Observable.just("1", "2", "3", "4", "5");
		
		Observable<String> observableFin = Observable.combineLatest(new Function<Object[], String>(){
			@Override
			public String apply(Object[] t) throws Exception {
				String finObj = "";
				for(Object ob : t){
					finObj = finObj + ob+ " ";
				}
				return finObj.trim();
			}}, 
				1, 
				observableOne, observableTwo, observableThree);

		observableFin.subscribe(s -> LOGGER.info("values after combineLatest operator "+s)); 

RxJavaOperator Compose

Compose operator transforms source observable and returns new downstream observable. Compose operator takes ObservableTransformer implementation as input that transforms upstream observable to downstream observable. ObservableTransformer has one method called apply; this is where you need provide logic to convert source observable. In below example, compose operator returns an observable which emits distinct values from values emitted by source observable.

 		Observable<String> observableupstream = Observable.just("language", "framework", "os", "library", "os");
		
		//compose operator takes ObservableTransformer implementation to perform transformation of source observable
		Observable<String> observabledownstream = observableupstream.compose(new ObservableTransformer<String, String>(){
			
			//returns new observable that can emit distinct values from source observable 
			@Override
			public ObservableSource<String> apply(Observable<String> upstream) {
				return upstream.distinct();
			}});
		
		observabledownstream.subscribe(s -> LOGGER.info("values after compose operator "+s)); 

RxJavaOperator Concat

Concat operator creates observable that emits items from all the source observables.

 		List<ObservableSource<String>> observableLst = new ArrayList<ObservableSource<String>>();
		
		ObservableSource<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");
		observableLst.add(observableOne);
		
		ObservableSource<String> observableTwo = Observable.just("language", "di", "orm", "os", "ractive p");
		observableLst.add(observableTwo);
		
		ObservableSource<String> observableThree = Observable.just("1", "2", "3", "4", "5");
		observableLst.add(observableThree);
		
		//concat operator takes list of ObservableSource object and returns observable
		//which emits all the items from all source observables
		Observable<String> observableFin = Observable.concat(observableLst);

		observableFin.subscribe(s -> LOGGER.info("values after concat operator "+s)); 

RxJavaOperator ConcatMap

ConcatMap operator returns an observable that emits all items from source observable after applying transformation logic on each item. If you need to transform each value emitted by observable, you can use concatMap. Transformation logic can defined in a Function and passed it to concatMap as shown below.

 		Observable<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");		

		Observable<String> observableFin = observableOne.concatMap(new Function<String, ObservableSource<String>>(){
			@Override
			public ObservableSource<String> apply(String t) throws Exception {
				return Observable.just(t+" after thru concatMap");
			}
			});

		observableFin.subscribe(s -> LOGGER.info("values after concatMap operator "+s));
Rxjava concatMap operator example

RxJavaOperator DoFinally

DoFinally operator allows you to specify an action to be executed after either onComplete, onError, or observable is disposed by the downstream

 		Observable<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");		

		Observable<String> observableFin = observableOne.doFinally(new Action(){

			@Override
			public void run() throws Exception {

				LOGGER.info("do finally action of observableOne");
			}

			});

		observableFin.subscribe(s -> LOGGER.info("values from observable "+s)); 

RxJavaOperator FlatMap

FlatMap operator is similar to the concatMap except that order of elements may change.

 	public void flatMapOperator(){
		Observable<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");		

		Observable<String> observableFin = observableOne.flatMap(new Function<String, ObservableSource<String>>(){
			@Override
			public ObservableSource<String> apply(String t) throws Exception {
				return Observable.just(t+" after thru concatMap").delay(200, TimeUnit.MILLISECONDS);
				
			}
			});

		observableFin.subscribe(s -> LOGGER.info("values after flatMap operator "+s));
		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {

		}	
	} 

RxJavaOperator FromArray

FromArray operator takes array of items as input and creates observable which emits those items.

 		Observable<String> observableOne = Observable.fromArray("java", "spring", "hibernate", "android", "rxjava");		
		observableOne.subscribe(s -> LOGGER.info("values "+s)); 

RxJavaOperator FromCallable

FromCallable operater creates an observable which emits an item returned by callable.
 		Observable<String> observableOne = Observable.fromCallable(new Callable<String>(){
			@Override
			public String call() throws Exception {
				return "excellent work";
			}});
		observableOne.subscribe(s -> LOGGER.info("value from callable "+s)); 

RxJavaOperator FromFuture

FromFuture allows you to create an observer which emits value that is returned by get of supplied future object. You need to create Future object implementing get methods.

 		Observable<String> observable = Observable.fromFuture(new Future<String>(){
			boolean done;

			@Override
			public boolean cancel(boolean arg0) {
				return false;
			}
			@Override
			public String get() throws InterruptedException, ExecutionException {
				done = true;
				//perform task
				return "came from future";
			}
			@Override
			public String get(long arg0, TimeUnit arg1)
					throws InterruptedException, ExecutionException, TimeoutException {
				done = true;
				return "came from future";
			}
			@Override
			public boolean isCancelled() {
				return false;
			}
			@Override
			public boolean isDone() {
				return done;
			}});
		
		observable.subscribe(s -> LOGGER.info("value from future observable "+s)); 

RxJavaOperator Generate

Generate operator can be used to produce observable which emits values from function defined in Consumer’s apply method, values are emitted using Emitter. This function is called multiple times until function sends onComplete or onError signal using Emitter object.

 		Observable<String> observable = Observable.generate( new Consumer<Emitter<String>>(){
			private int count=0;
			@Override
			public void accept(Emitter<String> t) throws Exception {
					if(count > 5){
						t.onComplete();
					}
					count++;
					t.onNext("generate "+count);
				
			}});				

		observable.subscribe(s -> LOGGER.info("got value from observable "+s));