X

This site uses cookies and by using the site you are consenting to this. We utilize cookies to optimize our brand’s web presence and website experience. To learn more about cookies, click here to read our privacy statement.

Stream Smarter: Reactive Systems Using Reactor and Spring

Reactor 3.x is a Java library for writing reactive applications using the Reactive Streams standard. Reactor Core provides a reactive systems implementation similar in style to RxJava 2. Because it is based on standard reactive streams, Reactor can easily integrate with any other reactive streams library (particularly RxJava 2). There are adapters available for RxJava 1 and the CompleteableFuture API as well as conversions for the new JDK 9 Flow reactive stream interfaces. Reactor also provides an IPC API for implementing reactive systems for general networking and off-JVM communication. The main implementation of the IPC API is Reactor Netty which provides TCP, UDP, and HTTP DSLs. There are other modules in the works, but Reactor 3.x is still under heavy development.

Reactive Streams

Reactive streams are an API and pattern to handling streams of data asynchronously and non-blocking while providing backpressure to stream publishers so that stream subscribers do not get overwhelmed and require maintaining internal buffers or cause blocking. The reactive streams standard provides the minimal API to support this style of architecture which is also being introduced in Java 9.

The main classes in this API are Publisher<T> and Subscriber<T>. A Publisher is the source of events T in the stream, and a Subscriber is the destination for those events. The basic idea of how these classes interact is that a Subscriber subscribes to a Publisher which invokesSubscriber::onSubscribe(Subscription). When the Subscriber is ready to start handling events, it signals this via a request to that Subscription Upon receiving this signal, the Publisherbegins to invoke Subscriber::onNext(T) for each event T. This continues until either completion of the stream (Subscriber::onComplete()) or an error occurs during processing (Subscriber::onError(Throwable)).

Reactor

Clearly, the reactive streams API is too low level to make practical use of it in reactive applications. This is where Reactor Core (or RxJava) comes into play by providing a DSL for handling these streams. Reactor provides two main types of publishers: Flux and Mono. A Flux is a general purpose publisher that can contain an unbounded number of events. A Mono is a specialized publisher that can contain only zero or one events. For a hands-on introduction to these classes, take a look at this GitHub project and try to implement all the test stubs. We’ll go over some of the tests below.

Trying Out Reactor

As a neat way to get familiar with the Reactor Core API, we’ll be implementing the code necessary to pass these test stubs. These tests will cover Flux, Mono, Scheduler, StepVerifier, and bridges to other APIs.

Flux

Our first task is to create an empty Flux. Such a Flux will publish no events and will immediately complete without error. Flux provides a static factory method for creating such a Flux:

Flux<String> emptyFlux() {
    return Flux.empty();
}

Our next task is to create a Flux that will stream through two strings and complete. Flux provides another static factory method for this:

Flux<String> fooBarFluxFromValues() {
    return Flux.just("foo", "bar");
}

Next, we need to create a Flux from a standard List collection. Again, we can use a factory method:

Flux<String> fooBarFluxFromList() {
    return Flux.fromIterable(Arrays.asList("foo", "bar"));
}

Next, we want a Flux that errs immediately instead of completing normally. Did you guess that there’s a factory method for it?

Flux<String> errorFlux() {
    return Flux.error(new IllegalStateException());
}

Finally, things get a little interesting by creating a Flux that emits the sequence 0 through 9, and each emission should be spaced out by 100 ms. Here we’ll use the Flux.interval() factory and limit it to only 10 items:

Flux<Long> counter() {
    return Flux.interval(Duration.ofMillis(100))
               .take(10);
}

Mono

The next test class revolves around basic Mono APIs. As with Flux, our first task is to create an empty Mono. This Mono will complete immediately as with an empty Flux:

Mono<String> emptyMono() {
    return Mono.empty();
}

Next, we want to create a Mono that never signals an event, completion, or error. This is available through a factory method:

Mono<String> monoWithNoSignal() {
    return Mono.never();
}

The next task is to create a Mono with a single value. This is available through a similar factory as Flux.just(T...):

Mono<String> fooMono() {
    return Mono.just("foo");
}

Finally, we create a Mono that does not emit any events but instead emits an error right away. This works the same as Flux.error(Throwable):

Mono<String> errorMono() {
    return Mono.error(new IllegalStateException());
}

Transforming Streams

Now that we know the basics of creating Flux and Mono publishers, let’s do something with them. There are two styles of mapping events to new objects: synchronously and asynchronously. The synchronous style is rather simple and works the same with both Flux and Mono:

Mono<String> toUpperCase(Mono<String> mono) {
    return mono.map(String::toUpperCase);
}

However, suppose that our mapped function is asynchronous or could be made asynchronous in order to prevent blocking in our stream. In this style mapping function, instead of returning the same object type, we wrap it into a Mono so the computed value can be taken when ready:

Flux<String> toUpperCase(Flux<String> flux) {
    return flux.flatMap(s -> Mono.just(s.toUpperCase()));
}

But wait, that’s still executing s.toUpperCase() right away! What if we want to wait until it’s consumed to make the computation? We can wrap the execution into a deferred publisher:

Flux<String> asyncToUpperCase(Flux<String> flux) {
    return flux.flatMap(s -> Mono.defer(() -> Mono.just(s.toUpperCase())));
}

Combining Streams

Suppose we have two streams and we wish to interleave their events together into a single stream. We can merge them like so:

Flux<String> interleave(Flux<String> flux1, Flux<String> flux2) {
    return flux1.mergeWith(flux2);
}

Now we wish to combine two streams, but we want events from one stream to all happen before events from the other stream. We can concatenate them:

Flux<String> combine(Flux<String> flux1, Flux<String> flux2) {
    return flux1.concatWith(flux2);
}

Both of these operations can also be performed on Mono publishers.

Handling Errors

One of the things that really make reactive streams more useful than the Java 8 streams is the ability to specify error handling at various levels. For example, say we have a Mono that we wish to provide a default value to return in case of an error:

Mono<String> provideDefaultForError(Mono<String> mono) {
    return mono.otherwiseReturn("error string");
}

Perhaps on error we wish to substitute our stream with a different stream:

Flux<String> provideFallbackStreamForError(Flux<String> flux) {
    return flux.onErrorResumeWith(e -> Flux.just("foo", "bar"));
}

Other Operations

Suppose we have a Person class:

public class Person {
    public final String firstName;
    public final String lastName;
    public Person(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }
}

Now imagine we have two streams of data, one corresponding to first names, and the other corresponding to last names. We can zip these streams together to form a new stream:

Flux<Person> zipIntoPersons(Flux<String> firstNames, Flux<String> lastNames) {
    return Flux.zip(firstNames, lastNames)
               .map(pair -> new Person(pair.getT1(), pair.getT2()));
}

Suppose we have two Mono streams and we want to get an item from whichever one can provide it faster:

Mono<String> getFasterStream(Mono<String> mono1, Mono<String> mono2) {
    return mono1.or(mono2);
}

In a similar situation, say we have two Flux streams and want to use whichever stream can begin emitting items first:

Flux<String> getFasterStream(Flux<String> flux1, Flux<String> flux2) {
    return flux1.firstEmittingWith(flux2);
}

Finally, we can take a Flux and return a Mono<Void> that is only used to indicate when the stream has completed or erred:

Mono<Void> getCompletionOf(Flux<String> flux) {
    return flux.then();
}

Reactive/Blocking Conversions

Perhaps we have a Mono or Flux that we wish to make a blocking call to obtain values from for use in a synchronous API. Using a Mono, we do:

String getValue(Mono<String> mono) {
    return mono.block();
}

Using a Flux, we do:

Iterable<String> getValues(Flux<String> flux) {
    return flux.toIterable();
}

Next, we go in the other direction. We wish to convert a collection into a Flux. Perhaps obtaining the collection is expensive, so we defer looking it up until necessary. In this case, we’ll also specify a Scheduler so we don’t block our main thread. This scenario is a fast subscriber but slow publisher:

Flux<Person> asyncPersonLookup(PersonRepository repository) {
    return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
               .subscribeOn(Schedulers.elastic());
}

In the reverse scenario, suppose we have a fast publisher and a slow subscriber such as storing a record in a database. Using a Flux for the publisher, we can publish on a separate Scheduler. Since we’ll be saving data, we’ll just return a Mono<Void> to indicate when the stream has finished processing:

Mono<Void> asyncSavePersons(Flux<Person> flux, PersonRepository repository) {
    return flux.publishOn(Schedulers.parallel())
               .doOnNext(repository::save)
               .then();
}

Blog Microservice Example

In the previous Lagom article, we implemented a blog microservice. Let’s do the same thing here, but we’ll use Reactor Core along with Spring 5. If you’re not already familiar with Lombok, take a look at the Lagom article for more information. The source code is on GitHub for this example.

The goal of this microservice is to provide a way to create, read, and update blog posts. We’ll begin with a data model for our blog posts:

@Value
@AllArgsConstructor(onConstructor = @__(@JsonCreator))
public class PostContent {
    @NonNull
    String title;
    @NonNull
    String author;
    @NonNull
    String body;
}

There are a bit less annotations to specify here than in the Lagom example, but that’s because Spring does not have a strong opinion about mutability. This data model will be used for our REST API and has the same structure as PostContent from the Lagom version.

Next, we’ll specify our REST controller which is equivalent to our Lagom service. Since we’re working with reactive microservices, we’ll be using reactive streams rather than directly using our data model.

@RestController
@RequestMapping(produces = MediaType.APPLICATION_JSON_VALUE)
@AllArgsConstructor(onConstructor = @__(@Autowired))
public class BlogController {

    private final BlogService service;

    @GetMapping("/api/blog/{id}")
    public Mono<PostContent> getPost(@PathVariable final UUID id) {
        return service.getPost(id);
    }

    @PostMapping("/api/blog")
    public Mono<UUID> addPost(@RequestBody Mono<PostContent> content) {
        return service.addPost(content);
    }

    @PutMapping("/api/blog/{id}")
    public Mono<Void> updatePost(@PathVariable final UUID id, @RequestBody final Mono<PostContent> content) {
        return service.updatePost(id, content);
    }
}

Already we can see that Spring 5 has added support for using Mono and Flux for both request bodies and response bodies. For our API, we only use single objects, so we only use Mono. Also note that Mono has replaced our use of Optional as it functions similarly to a reactive Optional.

Next, we’ll define our entity class. You may notice we are no longer using the CQRS pattern; that’s because Lagom made it easy to follow by implementing a bit of the tedium involved in tracking snapshots of state along with doing most of the work of implementing the read side of things. So, instead of using an event log directly, we’ll create a keyspace and table in Cassandra manually and use that. Note that Cassandra itself does implement its storage engine using an event log, so updates are just as fast as creates are regardless of our pattern.

@Value
@AllArgsConstructor(onConstructor = @__(@JsonCreator))
@Table
public class BlogPost {
    @PrimaryKey
    @NonNull
    UUID id;
    @NonNull
    String title;
    @NonNull
    String author;
    @NonNull
    String body;
}

The only new annotations here are @Table and @PrimaryKey which are both from the spring-data-cassandra API. We’ll also define a spring-data repository interface for this class:

public interface BlogRepository extends CrudRepository<BlogPost, UUID> {
}

Since we don’t have any special querying needs, consequently we don’t need to create any custom repository methods.

To set up our Cassandra data store, we’ll create a single keyspace and table for our data. In the example repository, this is handled on startup using an embedded Cassandra instance. However, it could be easily refactored to use an external one that was set up ahead of time.

CREATE KEYSPACE blogs WITH REPLICATION = { 'class' : 'SimpleStrategy' };

CREATE TABLE blogs.blogpost (
    id uuid PRIMARY KEY,
    title text,
    author text,
    body text
);

Finally, we get to the interesting part: bridging the non-reactive Spring Data API into our reactive REST API. Using the reactive stream concepts from earlier, we implement our service class:

@Service
@AllArgsConstructor(onConstructor = @__(@Autowired))
public class BlogService {

    private final BlogRepository repository;

    public Mono<PostContent> getPost(final UUID id) {
        return Mono.defer(() -> Mono.justOrEmpty(repository.findOne(id)))
            .subscribeOn(Schedulers.elastic())
            .map(post -> new PostContent(
                    post.getTitle(), post.getAuthor(), post.getBody()));
    }

    public Mono<UUID> addPost(final Mono<PostContent> contentMono) {
        return contentMono
            .map(content -> new BlogPost(UUID.randomUUID(),
                    content.getTitle(), content.getAuthor(), content.getBody()))
            .publishOn(Schedulers.parallel())
            .doOnNext(repository::save)
            .map(BlogPost::getId);
    }

    public Mono<Void> updatePost(final UUID id, final Mono<PostContent> mono) {
        return mono
            .map(content -> new BlogPost(id,
                    content.getTitle(), content.getAuthor(), content.getBody()))
            .publishOn(Schedulers.parallel())
            .doOnNext(repository::save)
            .then();
    }
}

Notice that our getPost() method uses Mono.defer() to use a blocking API (in this case, the Spring Data CrudRepository API) and fetch an item which may be null, then maps it to our desired data format. Our addPost() method works in reverse by mapping to our entity format and then publishes on a scheduler to save the repository item. Finally, our updatePost() method works essentially the same way.

As can be seen, Spring 5 is making reactive streams programming much easier by building in direct support for Reactor and RxJava into Spring Web. However, there is still room for improvement in further Spring releases to add first class reactive stream support to other Spring projects, particularly Spring Data.