ZOFTINO.COM

RxJava Use Cases Android

RxJava is useful in android to produce apps with clean code making them to be easily maintainable and extendable. And also, it requires less code to run multiple tasks parallel using RxJava. More importantly, updating UI with results from background process can be implemented efficiently without causing memory leaks.

In this post, I’ll cover RxJava uses cases in android. If you want to know about RxJava and RxJava operators, you can read my previous articles RxJava with examples and Rxjava operator examples.

Updating UI Thread

The common flow of tasks that are performed in any android app starts with capturing user input data or events, then processing it and then finally showing results back to user. If processing of a user event involves log running tasks or it requires networking, then these tasks should be performed in the background thread as executing long running tasks in the main thread makes apps appear to be unresponsive.

But there is one problem when running tasks in the background and needing to update the UI with results from background process. The problem is that the process which is running in the background thread can’t access view objects as view objects are maintained by UI thread. To solve this problem, AsyncTask needs to be used, which provides mechanism to communicate results from background process to UI. You can read Json gson asynctask example to know how to use AsyncTask.

There is a problem in using AsyncTask; especially the problem arises when you use multiple AsyncTasks in your app. Using AsyncTask can cause memory leaks in your app, leading to app-crashes.

When user moves away from current activity, activity will be destroyed and garbage collected. But in the case of activity which has started background process using AsyncTask, activity can’t be garage collected until AsyncTask gets completed. While there are ways to fix the problem and still use AsyncTask without memory leak issues, RxJava is the best option, in terms of ease with which the requirement can be implemented, to run background tasks and update results on the UI thread.

Updating UI Thread Using RxJava Example

To show how to start background process and communicate results back to UI from background process, I created one example app that can be found at https://github.com/srinurp/AndroidRxJava.

The example takes user entered numbers, calculates results in the background (simple division) and updates the view objects with result.

There are two points that needs to be notice. To run observable in a separate thread, you need to use subscribeOn passing scheduler you want to run on, this step makes the process shift to separate thread from android main thread.

To run observer code on a particular thread, you need to use observeOn passing scheduler you want run on, this steps makes the process shift to android main thread from background thread.

In the button onclick handler, first, just operator is used to create an observable that emits user entered numbers, then the resulting observable is subscribed on computation scheduler, toList operator is used to convert items to List, then map operator is used to perform calculation, and final observable’s observeOn is set to android main thread using AndroidSchedulers class so that observer code will run in android main thread.

Observable part

 //creates an observable using just for emitting input numbers
//subscribeOn computation thread - to make it run in the background
//observeOn main thread - to run the observer code in the main thread so UI can be updated
//then creates list of floats using toList
//apply function using map operator
//subscribe to the observable emitted by map
Observable.just(numOne, numTwo)
        .subscribeOn(Schedulers.computation())
        .toList()
        .map(new Function<List<Float>, Float>() {
            @Override
            public Float apply(List<Float> fl) throws Exception {
                return fl.get(0) / fl.get(1);

            }
        }).observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);

Observer’s onSubscribe method is called when it is subscribed to observable. This callback method is supplied with Disposable object which can be used to dispose subscription. This is useful in android to prevent memory leaks, for example, in case of user moving away from an activity which has started observables and observables are still running in the background thread.

In this example SingleObserver is used because final observable emits single item. OnSuccess method of SingleObserver will be executed on main thread.

Observer

 // observer gets result from thread background task and updates Ui on main thread
//SingleObserver to receive single item from observable which emits single item
observer = new SingleObserver<Float>() {
    @Override
    public void onSubscribe(Disposable d) {
        subscription = d;
    }
    @Override
    public void onSuccess(@NonNull Float aFloat) {
        TextView tv = (TextView) findViewById(R.id.result_one);
        tv.setText("" + aFloat);
    }
    @Override
    public void onError(Throwable e) {
        TextView tv = (TextView) findViewById(R.id.result_one);
        tv.setText("Error processing calculation");
    }
};
android rxjava use cases & examples

Parallel Processing

Using RxJava, multiple tasks can be run parallel without having to deal with threads in your code. For example, if there is a screen in your android app, which needs data from multiple services, you can send multiple requests to different services at the same time using RxJava instead of calling them sequentially, improving efficiency of your app.

To implement parallel processing, you need to create observables for each task that need to run parallel and subscribe to each one on a scheduler.

Below example first creates an observable using range operator taking user give numbers, then uses flatMap to create observable for each integer emitted by range, then subscribes each one of these observable on separate thread, this step creates parallel processing, and finally calculates results using map.

Observable

 //creates observable which emits integers between give two numbers using range operator
//subscribeOn computation scheduler leave android main thread
//Each integer is converted into observable by flatMap and process parallel by subscribing it on compuation scheduler
//Map applies function to calculate results
//Observe on android main thread to update results in UI
Observable.range(numOne, numTwo)
        .subscribeOn(Schedulers.computation())
        .flatMap(new Function<Integer, ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> apply(Integer num) throws Exception {
                Log.d("flatMap function ", ""+Thread.currentThread().getName());
                //subscribe each observable on separate thread - parallel processing
                return Observable.just(num).subscribeOn(Schedulers.computation());
            }
        }).map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer val) throws Exception {
                Log.d("map calculation ", ""+Thread.currentThread().getName());
                return val * (val + 1);
            }
        }).observeOn(AndroidSchedulers.mainThread())
        .subscribe(multiThreadObserver);
 

Observer

 multiThreadObserver = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        multiThreadSubscription = d;
    }

    @Override
    public void onNext(@NonNull Integer val) {
        Log.d("updating UI onNext ", ""+Thread.currentThread().getName());
        TextView tv = (TextView) findViewById(R.id.result_one);
        tv.setText(tv.getText()+" "+ val);
    }
    @Override
    public void onError(Throwable e) {
        TextView tv = (TextView) findViewById(R.id.result_one);
        tv.setText("Error processing calculation");
    }

    @Override
    public void onComplete() {
        Log.d("updating UI onComplete ", ""+Thread.currentThread().getName());
        TextView tv = (TextView) findViewById(R.id.result_one);
        tv.setText(tv.getText()+" calculation complete");
    }
};
 

Add below libraries to your android project gradle configuration file to use RxJava and RxAndroid.

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'