RxJava is a library that helps programmers to write asynchronous, concurrent and resilient applications. Using RxJava you write programs in reactive programming paradigm. In this article I will be providing a quick introduction to reactive programming and RxJava.
Before we dive into more details, let’s see a real world example.
A Real World Example
Suppose you go to an ATM (Automated Teller Machine) to withdraw some cash. You insert your debit card into the machine, enter pin code, enter the amount you want to withdraw and hit the done button. After hitting the button there are two possible outcomes:
- Either the ATM has the requested amount of cash, in which case it will dispense the cash to you. Once all the money is dispensed it will signal to you with a message about the successful transaction completion.
- Or the ATM does not have sufficient cash left, in which case it will signal with a message of transaction failure.
In this example the ATM is a source, of cash, and you are the consumer, of cash. Based on the transaction details the cash flows from the ATM to you when you hit the done button. I will use this example to explain the concepts below.
What is Reactive Programming
It is a style of programming where you define a source of data and a consumer of that data. Once you connect the consumer to the source, the library (which in this blog is RxJava) takes care of pushing the data, generated by the source, to the consumer.
The above definition talks about three important things. I will be explaining each of these in detail.
- Source of data
- Consumer of data
- Connecting Consumer to Source
RxJava in Action
Let’s understand each of the above mentioned points using an example code written using RxJava. I have intentionally made the code verbose to highlight the details. In the end of this blog I have also provided a concise version of the same code.
// defining the source
Observable<Integer> source = Observable.range(1, 5);
// defining the consumer
Subscriber<Integer> consumer = new Subscriber<Integer>() {
@Override
public void onNext(Integer number) { System.out.println(number); }
@Override
public void onError(Throwable e) { System.out.println("error"); }
@Override
public void onCompleted() { System.out.println("completed"); }
};
// connecting the consumer to source
source.subscribe(consumer);
Source of data
In ATM example the machine along with the configured transaction details serves as the source. Similarly
in the code example Observable<T>
represents a source. An Observable can be created using one of the many
factory methods it provides. Observable.range(int start, int count)
is one of them.
In the example above the source will emit five numbers, starting from 1 through 5, and then finish.
Consumer of data
Subscriber<T>
serves as consumer of data. RxJava uses onNext(T data)
method on the Subscriber to push the
data emitted by source, the Observable, to the consumer of data, the Subscriber. In the example above the
consumer will print each received number onto the console. This is similar to the ATM dispensing bills/banknotes
of different denominations.
Once all the data is emitted by the source, RxJava signals the completion using onComplete()
method on the
Subscriber. In the example above the consumer just prints complete. In the ATM example, completion is signalled
using a successful transaction message.
If there is any error observed during emission of data, RxJava forwards the error to onError(Throwable e)
method on the Subscriber. In the example above the consumer is handles the exception by printing error onto
console. In the ATM example the error is signalled using a transaction failure message.
Connecting Consumer to Source
This is established using the subscribe(Subscriber s)
method on Observable. In RxJava the computations defined
as part of Observable and Subscriber get executed only when the connection between the two is established. This means
that the computations are lazy. In the example above the source will start emitting numbers only when a consumer
is subscribed to it. In the ATM example pressing the done button after configuring transaction details is analogous
to the subscribing action. Till then no cash is dispensed by the machine.
A concise version of above code written using Java 8 will be something like below.
Observable.range(1, 5).subscribe(
number -> System.out.println(number),
error -> System.out.println("error"),
() -> System.out.println("completed")
);
Summary
We saw that when writing programs using RxJava we have to define an Observable
and a Subscriber
and then
connect the two using the subscribe
method on Observable. From here the Observable starts emitting data and
RxJava starts pushing the data received from the Observable onto the Subscriber. I hope that this introduction should be
enough to get you started with RxJava. I have created a repository here which you can use to experiment
with RxJava.
In upcoming blog posts I will be explaining concepts around concurrency, composition and resilience in context of RxJava. Next post in this series - RxJava - Part 2 - Creating an Observable
Update: In May 2016, I had done a talk on RxJava, where I had explained on how to think in RxJava. As this was an introductory talk on RxJava, it serves as a good addition to this blog post.
GeeCON 2016: Praveer Gupta - How to Think, in RxJava, Before Reacting from GeeCON Conference on Vimeo.