RxJava2 operators allow you to create observables by transforming, merging, filtering and grouping items emitted by other observables. Rxjava2 operators convert one observable into another observable which emits somehow modified items emitted by source observable.
In this post I am going to explain RxJava2 operators with examples. If you want to learn about RxJava2, read my previous post RxJava2 examples.
There are different types of operators which can be used on observable. Most of the operators return observables. Observable operators can be categorized depending on what it does to items emitted by source observable.
The operators such as create, range, just, interval, repeat, empty, and defer, which emit data based on the input data or specification, creates new observables.
The operators such as buffer, map, flatMap, scan, compose, and groupBy create observables which emit items after transformation of items emitted by source observable.
The operators such as first, last, distinct, filter, skip, and elementAt create observables which emit items after applying filters on the items emitted by source observable.
The operator such as join, merge, switchOnNext, zip, and combineLatest create observables by combining multiple observers.
All operator takes Predicate object as input which contain logic to test if each item satisfies defined condition. If all items satisfy defined condition, all operator return true otherwise it returns false. All operator returns Single<Boolean> observable which can be used for further actions by subscribing to it.
Any operator is similar to all operator, but it returns true if any one item satisfies the defined condition.
Observable<String> observable = Observable.just("java", "hibernate", "android");
//all operator checks whether items from main observable contain letter a
//return single boolean item observable
Single<Boolean> itemsContainA= observable.all(new Predicate<String>(){
@Override
public boolean test(String t) throws Exception {
return t.contains("a") ? true : false;
}
});
//subscribe to single observable to do something with the result from all operator
itemsContainA.subscribe(s -> LOGGER.info("Do items from given observable contain letter a ? "+s));
If you want just last item from an observable, use blockingLast operator. To get only first item, you can use blockingFirst operator. There are different variants of blocking operators available.
Observable<String> observable = Observable.just("java", "spring", "hibernate", "android");
String s = observable.blockingLast();
LOGGER.info("last item from the observable "+s);
Buffer operator creates an observable which emits list of items each time, buffering items from the source observable. Number of items in the list can be specified in various ways.
Observable<Integer> observable = Observable.range(1, 10);
Observable<List<Integer>> observableBuffer = observable.buffer(3);
observableBuffer.subscribe(s -> LOGGER.info("observer - values from buffer "+s));
Cache operators collects all items from source observable and emits them in the same order. Cache operator is useful when you want to cache responses from source observable and emit them when needed to all subscribers.
Observable<Long> observable = Observable.intervalRange(1, 8, 100, 600, TimeUnit.MILLISECONDS);
Observable<Long> observableCache = observable.cache();
observableCache.subscribe(l -> LOGGER.info("from cache- onNext value "+l));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
Collect operator allows you to create a Single item observable which emits one item with data structure containing items emitted by source observable. Below example shows how to use collect operator, which collects all the odd number emitted by source observable into array list.
Observable<Integer> observable = Observable.range(1, 10);
Single<ArrayList<Integer>> observableCollect = observable.collect(new Callable<ArrayList<Integer>>(){
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<Integer>();
}},
new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> integers, Integer num) {
if(num%2 > 0) { integers.add(num);}
}
});
observableCollect.subscribe(s -> LOGGER.info("values after collect operator "+s));
CombineLatest operator can be used to aggregate the latest items which are emitted by each observable from the list of observables passed to it. How latest values from each of the observable are aggregated can be defined as Function and passed it to combineLatest operator. This function is called each time a value is emitted by any of the source observables and the returned values from function will be emitted by observable created by combineLatest operator. The aggregating function can be defined by implementing Function interface which has apply method that takes array of latest values from each observable as input and returns aggregated value.
ObservableSource<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");
ObservableSource<String> observableTwo = Observable.just("language", "di", "orm", "os", "ractive p");
ObservableSource<String> observableThree = Observable.just("1", "2", "3", "4", "5");
Observable<String> observableFin = Observable.combineLatest(new Function<Object[], String>(){
@Override
public String apply(Object[] t) throws Exception {
String finObj = "";
for(Object ob : t){
finObj = finObj + ob+ " ";
}
return finObj.trim();
}},
1,
observableOne, observableTwo, observableThree);
observableFin.subscribe(s -> LOGGER.info("values after combineLatest operator "+s));
Compose operator transforms source observable and returns new downstream observable. Compose operator takes ObservableTransformer implementation as input that transforms upstream observable to downstream observable. ObservableTransformer has one method called apply; this is where you need provide logic to convert source observable. In below example, compose operator returns an observable which emits distinct values from values emitted by source observable.
Observable<String> observableupstream = Observable.just("language", "framework", "os", "library", "os");
//compose operator takes ObservableTransformer implementation to perform transformation of source observable
Observable<String> observabledownstream = observableupstream.compose(new ObservableTransformer<String, String>(){
//returns new observable that can emit distinct values from source observable
@Override
public ObservableSource<String> apply(Observable<String> upstream) {
return upstream.distinct();
}});
observabledownstream.subscribe(s -> LOGGER.info("values after compose operator "+s));
Concat operator creates observable that emits items from all the source observables.
List<ObservableSource<String>> observableLst = new ArrayList<ObservableSource<String>>();
ObservableSource<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");
observableLst.add(observableOne);
ObservableSource<String> observableTwo = Observable.just("language", "di", "orm", "os", "ractive p");
observableLst.add(observableTwo);
ObservableSource<String> observableThree = Observable.just("1", "2", "3", "4", "5");
observableLst.add(observableThree);
//concat operator takes list of ObservableSource object and returns observable
//which emits all the items from all source observables
Observable<String> observableFin = Observable.concat(observableLst);
observableFin.subscribe(s -> LOGGER.info("values after concat operator "+s));
ConcatMap operator returns an observable that emits all items from source observable after applying transformation logic on each item. If you need to transform each value emitted by observable, you can use concatMap. Transformation logic can defined in a Function and passed it to concatMap as shown below.
Observable<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");
Observable<String> observableFin = observableOne.concatMap(new Function<String, ObservableSource<String>>(){
@Override
public ObservableSource<String> apply(String t) throws Exception {
return Observable.just(t+" after thru concatMap");
}
});
observableFin.subscribe(s -> LOGGER.info("values after concatMap operator "+s));
DoFinally operator allows you to specify an action to be executed after either onComplete, onError, or observable is disposed by the downstream
Observable<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");
Observable<String> observableFin = observableOne.doFinally(new Action(){
@Override
public void run() throws Exception {
LOGGER.info("do finally action of observableOne");
}
});
observableFin.subscribe(s -> LOGGER.info("values from observable "+s));
FlatMap operator is similar to the concatMap except that order of elements may change.
public void flatMapOperator(){
Observable<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");
Observable<String> observableFin = observableOne.flatMap(new Function<String, ObservableSource<String>>(){
@Override
public ObservableSource<String> apply(String t) throws Exception {
return Observable.just(t+" after thru concatMap").delay(200, TimeUnit.MILLISECONDS);
}
});
observableFin.subscribe(s -> LOGGER.info("values after flatMap operator "+s));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
}
FromArray operator takes array of items as input and creates observable which emits those items.
Observable<String> observableOne = Observable.fromArray("java", "spring", "hibernate", "android", "rxjava");
observableOne.subscribe(s -> LOGGER.info("values "+s));
Observable<String> observableOne = Observable.fromCallable(new Callable<String>(){
@Override
public String call() throws Exception {
return "excellent work";
}});
observableOne.subscribe(s -> LOGGER.info("value from callable "+s));
FromFuture allows you to create an observer which emits value that is returned by get of supplied future object. You need to create Future object implementing get methods.
Observable<String> observable = Observable.fromFuture(new Future<String>(){
boolean done;
@Override
public boolean cancel(boolean arg0) {
return false;
}
@Override
public String get() throws InterruptedException, ExecutionException {
done = true;
//perform task
return "came from future";
}
@Override
public String get(long arg0, TimeUnit arg1)
throws InterruptedException, ExecutionException, TimeoutException {
done = true;
return "came from future";
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return done;
}});
observable.subscribe(s -> LOGGER.info("value from future observable "+s));
Generate operator can be used to produce observable which emits values from function defined in Consumer’s apply method, values are emitted using Emitter. This function is called multiple times until function sends onComplete or onError signal using Emitter object.
Observable<String> observable = Observable.generate( new Consumer<Emitter<String>>(){
private int count=0;
@Override
public void accept(Emitter<String> t) throws Exception {
if(count > 5){
t.onComplete();
}
count++;
t.onNext("generate "+count);
}});
observable.subscribe(s -> LOGGER.info("got value from observable "+s));