ZOFTINO.COM android and web dev tutorials

RxJava Examples

RxJava helps in creating java and android applications which can run efficiently as multiple tasks can be executed parallel and code of applications which use RxJava is easy to decipher and maintain when data from multiple sources need to be handled.

In this post, I’ll explain about RxJava objects, RxJava operators, scenarios in which using RxJava gives advantage in terms of performance and maintainability, and provide RxJava examples.

Adding RxJava Library

To use RxJava in your application, you need to add RxJava library to your project. Below is the configuration for maven and gradle to add the library.

Maven

 <dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.1.0</version>
</dependency> 

Gradle

compile 'io.reactivex.rxjava2:rxjava:2.1.0'

RxJava Objects

Main objects in RxJava are observable and observer. Observable object emits data/items which observer can receive by subscribing to observer and react to sequence of items emitted by observer. An observer is connected to Observable using subscribe method on observable object.

Observable emits data to observer using onNext method on observer. Observable sends complete or no more items signal to observer by calling onComplete method on observer. To communicate error to observer, observable calls onError method on observer.

Creating Components Using RxJava

Below is the flow of steps required and objects needed to create components using RxJava.

  1. Create observable.
  2. Create observer implementing onNext, onComplete and onError methods.
  3. Subscribe observer to observable.
  4. After subscription, observable will start emitting items.
  5. Observer takes action on the items.

RxJava Example

Below example shows creating observable, observer and subscribing observer to observable. Observable emits three strings by calling onNext method on subscribed observer.

 	public void basicExample(){
		
		Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
			
			@Override			 
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				e.onNext("Hello");
				e.onNext("Welcome");
				e.onNext("This is your first RxJava example");
				e.onComplete();
			}
		});
		
		Observer<String> observer = new Observer<String>() {
			  @Override
			  public void onSubscribe(Disposable d) {
				  LOGGER.info("observer subscribed to observable - on subscribe");
			  }
			 
			  @Override
			  public void onNext(String value) {
				  LOGGER.info("observer - onNext "+value);
			  }
			 
			  @Override
			  public void onError(Throwable e) {
				  LOGGER.info("observer - onError "+e.toString());
			  }
			 
			  @Override
			  public void onComplete() {
				  LOGGER.info("observer - on complete");
			  }
			};
			 
		
			observable.subscribe(observer);
		
	} 

Below is the output of above RxJava example.

Rxjava observable observer example

Creating Observable

You can create your own observable using create method from scratch or observables can be created using operators which convert object, set of object or other observables into observable.

Below code shows creating observable using observable’s create method which takes ObservableOnSubscribe implementation as input. You need to implement subscribe method of ObservableOnSubscribe to emit values to the subscribed observer. This method gets called for each subscribed observable.

 			Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
				
				@Override			 
				public void subscribe(ObservableEmitter<Integer> e) throws Exception {
					e.onNext(100);
					e.onNext(200);
					e.onNext(300);
					e.onComplete();
				}
			}); 

You can create observable using just operator which takes objects or set of objects as input and return observable which emits the objects passed to just operator. Operator just takes one to ten objects as input.

		Observable<String> observable = Observable.just("java", "spring", "hibernate", "android");
		observable.subscribe(s -> LOGGER.info("observer - onNext value "+s));

To create an observable which emits values between two numbers in regular intervals, you can use intervalRange method as shown below

		Observable<Long> observable = Observable.intervalRange(1, 30, 200, 100, TimeUnit.MILLISECONDS, Schedulers.computation());
		observable.subscribe(l -> LOGGER.info("observer - onNext value "+l));
		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {

		}

To create an observable which emits sequence of items repeatedly, you can use repeat method. Below example shows creating an observable which repeats 5 times emitting items from source observable.

 		Observable<String> observable = Observable.just("java", "spring", "hibernate", "android");
		Observable<String> observableRepeat = observable.repeat(5);
		observableRepeat.subscribe(s -> LOGGER.info("observer - onNext value "+s)); 

RxJava Synchronous & Asynchronous

By default RxJava doesn’t create additional threads meaning it is single-threaded. For example, you created two observables with observers subscribed to each one, second observable starts emitting items only after first observable complete emitting. And also, the code in observable and the code in observer run in the same thread by default.

To make RxJava Asynchronous, you can use schedulers. To leave current thread on which observable is created or in other words to leave the thread that carries the event which results in observable being created, so that the thread is not blocked while executing observable and observer communication, you need to use schedulers. You can use subscribeOn method on observable passing scheduler so that current thread is not blocked and new thread is used for further execution of observable and observer.

But to communicate results, each time onNext method of observer is called, back to the main thread, that handles events and subscribes observer to observable in response to an event, you can use observeOn method on observable to specify the scheduler which can interact with main thread.

 		//create observable and subscribe on computation scheduler
		//so main thread will continue execution
		Observable.create(subscriber -> {
			subscriber.onNext("Hello World! - on Thread: "+Thread.currentThread().getId());
			Thread.sleep(500);
			subscriber.onNext("Hello World! after sleep - on Thread:"+Thread.currentThread().getId());
			subscriber.onComplete();
		}).subscribeOn(Schedulers.computation())
		.subscribe(System.out::println);

		//create range observable and subscribe on computation scheduler
		//so main thread will continue execution, but items get emitted on thread created by computation scheduler
		Observable.range(1, 5)
		.subscribeOn(Schedulers.computation())
		.subscribe( i -> LOGGER.info("Range of numbers: "+i+" - on thread : "+Thread.currentThread().getId()));

		Observable.create(subscriber -> {
			subscriber.onNext("Hello Universe! - on Thread: "+Thread.currentThread().getId());
			subscriber.onComplete();
		}).subscribe(s -> LOGGER.info(""+s));

		try {
			LOGGER.info("main thread waiting for other threads to finsh - on Thread : "+Thread.currentThread().getId());
			Thread.sleep(4000);
		} catch (InterruptedException e) {
		}
	}

	public void basicObserver(){

	} 
Rxjava asynchronous subscribeon example

RxJava Operators

RxJava provides operators which allow you to transform, filter, compose, aggregate, and manipulate items emitted by observables. All the operators take observable as parameter and return new observable which emit items after transforming, filtering or merging items emitted by input observable.

For example, skip operator skips first few items. Below example shows how to create an observable which skips first 2 items and emits rest of items from source observable using skip operator.

 		Observable<String> observable = Observable.just("java", "spring", "hibernate", "android");
		Observable<String> observableSkip = observable.skip(2);
		observableSkip.subscribe(s -> LOGGER.info("observer - onNext value "+s));

You can learn list of operators available by visiting RxJava Wiki.

RxJava Use Cases

We will go over two scenarios that I know using RxJava will be helpful and right thing to do. First one is when you need to run a task in the background in response to an event from UI and update the results to UI at certain stages of processing and once background processing is complete.

This can be done using RxJava. The actual task that exists in subscribe method of Observable can be made to run in one thread and onNext method of observer can be made to run in a different thread. You can specify these threads on observable using subscribOn and observeOn methods passing schedulers. To understand which code runs on which thread, take a look at below code and console log.

 		Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
			@Override			 
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				LOGGER.info("observerable on Thread: "+Thread.currentThread().getId());
				e.onNext("Hello");
				LOGGER.info("observerable on Thread after one next : "+Thread.currentThread().getId());
				e.onNext("Welcome");
				LOGGER.info("observerable on Thread after second next : "+Thread.currentThread().getId());
				e.onNext("This is your first RxJava example");
				e.onComplete();
			}
		});

		Observer<String> observer = new Observer<String>() {
			@Override
			public void onSubscribe(Disposable d) {
				LOGGER.info("observer subscribed to observable - on subscribe on Thread: "+Thread.currentThread().getId());
			}

			@Override
			public void onNext(String value) {
				LOGGER.info("observer - onNext "+value + "on Thread: "+Thread.currentThread().getId());
			}

			@Override
			public void onError(Throwable e) {
				LOGGER.info("observer - onError "+e.toString());
			}

			@Override
			public void onComplete() {
				LOGGER.info("observer - on complete");
			}
		};
		
		//subscribeOn specifies thread observable code which calls onNext method of observer runs in
		//observeOn specifies thread onNext method code of observer runs in, this is the thread which needs the results

		observable
		.subscribeOn(Schedulers.computation())
		.observeOn(Schedulers.io())
		.subscribe(observer);
		
		try {
			Thread.sleep(7000);
		} catch (InterruptedException e1) {

		}
Rxjava subscribeon observeon example

Second scenario in which using RxJava will be useful is when you need to perform multiple operations parallel and combine, transform or filter results to get final result. You can do this by creating multiple observables in main observable for each task which need to be run parallel and the using operators resulting items from all the child observable can be transformed, filtered or merged.