RxJava - Part 1 - A Quick Introduction


RxJava is a library that helps programmers to write asynchronous, concurrent and resilient applications. Using RxJava you write programs in reactive programming paradigm. In this article I will be providing a quick introduction to reactive programming and RxJava.

Before we dive into more details, let’s see a real world example.

A Real World Example

Suppose you go to an ATM (Automated Teller Machine) to withdraw some cash. You insert your debit card into the machine, enter pin code, enter the amount you want to withdraw and hit the done button. After hitting the button there are two possible outcomes:

  • Either the ATM has the requested amount of cash, in which case it will dispense the cash to you. Once all the money is dispensed it will signal to you with a message about the successful transaction completion.
  • Or the ATM does not have sufficient cash left, in which case it will signal with a message of transaction failure.

In this example the ATM is a source, of cash, and you are the consumer, of cash. Based on the transaction details the cash flows from the ATM to you when you hit the done button. I will use this example to explain the concepts below.

What is Reactive Programming

It is a style of programming where you define a source of data and a consumer of that data. Once you connect the consumer to the source, the library (which in this blog is RxJava) takes care of pushing the data, generated by the source, to the consumer.

The above definition talks about three important things. I will be explaining each of these in detail.

  • Source of data
  • Consumer of data
  • Connecting Consumer to Source

RxJava in Action

Let’s understand each of the above mentioned points using an example code written using RxJava. I have intentionally made the code verbose to highlight the details. In the end of this blog I have also provided a concise version of the same code.

// defining the source
Observable<Integer> source = Observable.range(1, 5);

// defining the consumer
Subscriber<Integer> consumer = new Subscriber<Integer>() {
    @Override
    public void onNext(Integer number) { System.out.println(number); }

    @Override
    public void onError(Throwable e) { System.out.println("error"); }

    @Override
    public void onCompleted() { System.out.println("completed"); }
};

// connecting the consumer to source
source.subscribe(consumer);

Source of data

In ATM example the machine along with the configured transaction details serves as the source. Similarly in the code example Observable<T> represents a source. An Observable can be created using one of the many factory methods it provides. Observable.range(int start, int count) is one of them. In the example above the source will emit five numbers, starting from 1 through 5, and then finish.

Consumer of data

Subscriber<T> serves as consumer of data. RxJava uses onNext(T data) method on the Subscriber to push the data emitted by source, the Observable, to the consumer of data, the Subscriber. In the example above the consumer will print each received number onto the console. This is similar to the ATM dispensing bills/banknotes of different denominations.

Once all the data is emitted by the source, RxJava signals the completion using onComplete() method on the Subscriber. In the example above the consumer just prints complete. In the ATM example, completion is signalled using a successful transaction message.

If there is any error observed during emission of data, RxJava forwards the error to onError(Throwable e) method on the Subscriber. In the example above the consumer is handles the exception by printing error onto console. In the ATM example the error is signalled using a transaction failure message.

Connecting Consumer to Source

This is established using the subscribe(Subscriber s) method on Observable. In RxJava the computations defined as part of Observable and Subscriber get executed only when the connection between the two is established. This means that the computations are lazy. In the example above the source will start emitting numbers only when a consumer is subscribed to it. In the ATM example pressing the done button after configuring transaction details is analogous to the subscribing action. Till then no cash is dispensed by the machine.

A concise version of above code written using Java 8 will be something like below.

Observable.range(1, 5).subscribe(
    number -> System.out.println(number),
    error -> System.out.println("error"),
    () -> System.out.println("completed")
);

Summary

We saw that when writing programs using RxJava we have to define an Observable and a Subscriber and then connect the two using the subscribe method on Observable. From here the Observable starts emitting data and RxJava starts pushing the data received from the Observable onto the Subscriber. I hope that this introduction should be enough to get you started with RxJava. I have created a repository here which you can use to experiment with RxJava.

In upcoming blog posts I will be explaining concepts around concurrency, composition and resilience in context of RxJava. Next post in this series - RxJava - Part 2 - Creating an Observable


Update: In May 2016, I had done a talk on RxJava, where I had explained on how to think in RxJava. As this was an introductory talk on RxJava, it serves as a good addition to this blog post.

GeeCON 2016: Praveer Gupta - How to Think, in RxJava, Before Reacting from GeeCON Conference on Vimeo.

Related Posts

Spring up an application quickly with Spring Boot

How functional programming helps me write clean code

Using asynchrony to reduce response times in Java 8

Book Review - Soft Skills: The software developer's life manual

Java 8 Optional as a Monad

REST Error Responses in Spring Boot

Writing Comparators - The Java 8 Way

RxJava - Part 3 - Multithreading

RxJava - Part 2 - Creating an Observable

Testing REST APIs with REST-assured