An Observable is the heart of RxJava. It is the source of data or events in Reactive Programming. RxJava provides many methods in its library to create an Observable. Choosing which one to use can be difficult. My goal from this article is to help you in making this choice simpler by providing you with a mental map of different scenarios and which methods to use in each scenario.
The code samples for this article can be found here. This article is second in the series of RxJava articles that I am writing. Part 1 can be found here.
Observable and OnSubscribe
Think of Observable as an action which starts getting executed as soon as a Subscriber subscribes to it. During the execution of this action, data/events are generated and passed on to the subscriber. Let’s try to understand it by an example. I will be using Observable.create() method to create an Observable.
// Note that below code is not optimal but it helps in demonstration of concepts
// A better version is shown in the next section
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
String result = doSomeTimeTakingIoOperation();
subscriber.onNext(result); // Pass on the data to subscriber
subscriber.onCompleted(); // Signal about the completion subscriber
} catch (Exception e) {
subscriber.onError(e); // Signal about the error to subscriber
}
}
});
In the code above, the following are worth noticing:
- The Action - The create method receives an implementation of Observable.OnSubscribe interface. This implementation defines what action will be taken when a subscriber subscribes to the Observable.
- The Action is lazy - The
call
method is called by library each time a subscriber subscribes to the Observable. Till then the action is not executed, i.e. it is lazy. - Events are pushed to subscriber - onNext, onError and onCompleted methods
on
Subscriber
are used to push the events onto it. As per Rx Design Guidelines, the events pushed from Observable should follow the below rules:- Zero, one or more than one calls to
onNext
- Zero or only one call to either of
onCompleted
oronError
- Zero, one or more than one calls to
In all the different methods for creation of Observable that RxJava library provides, an implementation of the
Observable.OnSubscribe
interface is created internally. We will be seeing these methods in the next section.
Scenarios for creating an Observable
Grouping the different scenarios into categories helps in deciding which method to use for creating an Observable. In the following sections I will be providing the scenarios and what methods are available in the library to help in each scenario.
An Observable that emits a single value after computation
Among all the scenarios, the one that you will come across the most is where you define a computation which returns a single value and then completes. Defining a function that makes an HTTP call to retrieve some information is an example of such a scenario. Such an action can be defined by using the Callable interface. RxJava provides a fromCallable factory method for creating an Observable from a Callable.
The code example which I have shown above emits only one value. We can replace it by the fromCallable
method as
shown below. The emission of value to the subscriber using onNext and either onCompleted or onError is handled by
the library.
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return doSomeTimeTakingIoOperation();
}
});
An Observable that emits multiple values during computation
In few cases more than one value is passed to the Subscriber from the Observer.
An example can be making an HTTP call to retrieve a list but the list in the response is paginated. In this case you will have
to make multiple request to retrieve the complete list and keep on calling Subscriber’s onNext method multiple times
till all the values have been emitted. In such a scenario Observable.create()
can be used.
However creating an Observable using the create()
factory method requires some advanced handling which might be
difficult to write. RxJava library has introduced SyncOnSubscribe and
AsyncOnSubscribe to handle the difficulties.
An Observable created by combining two or more Observables
There are scenarios where you have two or more observables and you would want to combine the result from all of them before doing any further processing. An example of this can be on a server side implementation where a request has to be processed by collecting information from two external services which can be run in parallel. In such a case you can start two parallel computations using two Observables and combine the results of each by using a method like Observable.concat().
There are many other methods which help in combining observables. The differences among them are in the way they combine the observables. Observable.amb(), Observable.combineLatest(), Observable.merge() and Observable.zip() are few that are very useful in combining observables.
Some useful pre-defined Observables
The RxJava library provides few methods for pre-defined Observables. One such method is Observable.interval(). This observable emits a sequential number every specified interval of time. Other such methods are Observable.empty(), Observable.never(), Observable.error(), Observable.just(), Observable.from(), Observable.timer() and Observable.range().
An Observable from applying operator on an instance of Observable
All the methods that we have seen for Observable creation till now are static factory methods on Observable. There is another group of methods which are on instances of an Observable. These are called operators. Whenever an operator is applied on an instance, a new instance of Observable, chained to previous one, is provided. Internally all the operators use lift function to do the chaining. I will be writing about operators in detail in other blog posts in this series.
Summary
There are many methods that are provided by the RxJava library for Observable creation. Using the mental map of scenarios can help in deciding which method to use in a particular scenario.
Next post in this series - RxJava - Part 3 - Multithreading
Update: In May 2016, I had done a talk on RxJava, where I had explained on how to think in RxJava. As this was an introductory talk on RxJava, it serves as a good addition to this blog post.
GeeCON 2016: Praveer Gupta - How to Think, in RxJava, Before Reacting from GeeCON Conference on Vimeo.