A brief introduction to RxJava

What does reactive mean?

There are as many definitions of what “reactive” is as there are articles on the subject, but the basic idea behind the concept is that there are entities, called observables or streams, that represent a value that changes over time (or a list of values that might grow endlessly) and there are computations involving these entities that will themselves return a value that changes over time. That means we can apply transformations or compositions to one or more observables to get a new observable that represents a different value (or list of values).

What is RxJava?

RxJava is a java implementation of the reactive pattern. It was open sourced by Netflix in 2014 and it’s very popular at the moment to manage asynchronous operations in a range of different environments, from Android applications to backend microservices. It’s heavily based on Microsoft Reactive Extensions.
The keystone of RxJava is the Observable data type. It can be thought as an equivalent to Iterable which is push based rather than pull based. With an Iterable, the consumer pulls values from the producer and the thread is blocked until those values arrive. Observables, in contrast, pushes values to the consumer whenever values are available. This approach is more flexible, because values can arrive synchronously or asynchronously.

Example

Let’s go through a simple example to introduce the usage of RxJava. We are going to build a web map showing burglaries in Los Angeles in real time. We will get the data from data.gov and we will simulate it’s real time data by processing one crime per second. The full code of the finished example can be found at github
The first step to start building our application will be to parse the CSV file. For that, we will use the apache common library. We want to represent the data into the CSV file as an Observable (stream) of records that can be then transformed into an observable of crimes.
There are multiple ways to create observables. For fixed size sequences Observable.just() can be used:

Observable.just(1,2,3);  

This constructor is really helpful for testing purposes, or when returning fallback or one-element responses. Similar constructors exist for creating observables that terminate immediately

Observable.empty();  

Or observables that fail immediately.

Observable.error(new RuntimeException());  

In our case, the Apache Common library returns an Iterable that we can use to create our Observable calling the Observable.from() method.

File csvFile = new File(filePath);  
CSVParser parser = CSVParser.parse(csvFile, charset, csvFormat);  
Observable<CSVRecord> records = Observable.from(parser);  

The Observable that we just created will emit one element for each record in the CSV file and then it's going to terminate.
The next step on the way to building our awesome map is to extract the information needed from each record and to create an Observable of crimes. In order to do this, we will use the map operator.
The map operator transforms the items emitted by an Observable by applying a function to each item. The result is a new Observable of a different (or the same) type.

map operator

Given a method Crime parseCrimeFromCSVRecord(CSVRecord csvRecord) that converts a CSV record into a Crime object we can use the following code to apply the transformation to all the items in the Observable:

Observable<Crime> crimes = records.map(this::parseCrimeFromCSVRecord);  

(The implementation of the parseCrimeFromCSVRecord won’t be included here because it's trivial and it can be found in the github repository containing the full example code)

At this point we already have an Observable that emits the crimes taken from the CSV, the only problem is that crimes are emitted as fast as they can be read from the CSV and we wanted to process them at a 1/second rate to simulate real time data. To accomplish this we will first create an Observable that emits a signal once per second using the interval operator.
interval operator

Observable<Long> timer = Observable.interval(1, TimeUnit.SECONDS);  

Once we have our signal in place we will use another reactive operator called zip to merge both observables into a new one emitting one crime per second.
The zip operator combines the emissions of multiple Observables together via a specified function and emits single items for each combination based on the results of this function.
zip operator In our case the function will simply return the crime object.

return Observable.zip(crimes, timer, (crime, t) -> crime);  

It's worth noting that the rate at which both Observable emit values is very different, in this particular case the fastest of both (crimes) was created from an Iterable and it's backpressure-enabled so it won't be an issue. But this is something to have in mind while merging different observables.
The next step is to filter the crimes we are interested in (burglaries). This can be done really easily by using the operator called filter (not a huge surprise, right?). The filter operator emits only those items from an Observable that passes a predicate test. That is, given an Observable and a boolean function, filter will emit only the values for which the function returns true.
filter operator We want to filter crimes with code 310(BURGLARY) and 330(BURGLARY FROM VEHICLE).

Collection<Integer> codes = Arrays.asList(310,330);  
return crimes.filter(crime -> codes.contains(crime.getCrimeCode()));  

We now have an Observable that emits exactly the data that we need, we will then use Spark micro framework to create a simple WEB showing the crimes as markers into a map. We will use websockets for the communication between the backend and the frontend.
In the main function of our application we will setup Spark to expose all the files inside the public directory as static resources, we will also configure a redirection from / to /index.html and lastly we will register a websocket handler using the /crimes path.

webSocket("/crimes", WebsocketHandler.class);  
staticFiles.location("/public");  
redirect.get("/", "/index.html");  

The websocket handler will simply keep a collection of active connections (sessions) that we are going to use in order to broadcast crimes as we process them.

@WebSocket
public class WebsocketHandler {  
    private static final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
    private static final Gson gson = new Gson();

    @OnWebSocketConnect
    public void connected(Session session) {
        sessions.add(session);
    }

    @OnWebSocketClose
    public void closed(Session session, int statusCode, String reason) {
        sessions.remove(session);
    }

    public static void broadcast(Crime crime) throws IOException {

        Iterator<Session> iterator = sessions.iterator();
        while (iterator.hasNext()){
            iterator.next().getRemote().sendString(gson.toJson(crime));
        }
    }

}

On the client-side we will use Leaflet js and OpenStreetMap to create a map and show all the crimes published through the websocket by the backend.

<!doctype html>

<html lang="en">  
<head>  
    <meta charset="utf-8">

    <title>RXJava</title>
    <meta name="description" content="Simple RXJava example">
    <meta name="author" content="Simplex Software">
    <link rel="stylesheet" href="https://unpkg.com/leaflet@1.0.2/dist/leaflet.css" />
    <script src="https://unpkg.com/leaflet@1.0.2/dist/leaflet.js"></script>
    <style>
        #mapid { height: 640px; }
    </style>
</head>

<body>  
<div id="mapid"></div>  
<script>  
    var mymap = L.map('mapid').setView([34.029331, -118.201904], 10);
    L.tileLayer('http://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', {
        maxZoom: 19,
        attribution: '&copy; <a href="http://www.openstreetmap.org/copyright">OpenStreetMap</a>'
    }).addTo(mymap);

    var socket = new WebSocket("ws://localhost:4567/crimes");
    socket.onmessage = function (event) {
      console.log(event.data);
      var crime = JSON.parse(event.data);
      var marker = L.marker([crime.coordinates.lat, crime.coordinates.lng]).addTo(mymap);
      marker.bindPopup("<b>" + 
                    crime.crimeDescription + 
                    "</b><br>" + crime.areaName + " " + 
                    crime.address).openPopup()
      window.setTimeout(function(){
        mymap.removeLayer(marker);
      }, 3000);
    }
</script>  
</body>  
</html>  

The end result looks like this:
End result

Moving forward

There are some really good resources to start diving into all the possibilities offered by RxJava:
Step by step introduction to most useful operators

Cold and hot observables

Conclusion

Even if the example shown in this article is contrived and uncomplicated, it's easy to see how RxJava provides a handy approach to represent the result of asynchronous computations in a way that is simple to use and composable.