Stream Smarter: Reactive Systems Using Reactor and Spring
Reactor 3.x is a Java library for writing reactive applications using the Reactive Streams standard. Reactor Core provides a reactive systems implementation similar in style to RxJava 2. Because it is based on standard reactive streams, Reactor can easily integrate with any other reactive streams library (particularly RxJava 2). There are adapters available for RxJava 1 and the CompleteableFuture
API as well as conversions for the new JDK 9 Flow
reactive stream interfaces. Reactor also provides an IPC API for implementing reactive systems for general networking and off-JVM communication. The main implementation of the IPC API is Reactor Netty which provides TCP, UDP, and HTTP DSLs. There are other modules in the works, but Reactor 3.x is still under heavy development.
Reactive Streams
Reactive streams are an API and pattern to handling streams of data asynchronously and non-blocking while providing backpressure to stream publishers so that stream subscribers do not get overwhelmed and require maintaining internal buffers or cause blocking. The reactive streams standard provides the minimal API to support this style of architecture which is also being introduced in Java 9.
The main classes in this API are Publisher<T>
and Subscriber<T>
. A Publisher
is the source of events T
in the stream, and a Subscriber
is the destination for those events. The basic idea of how these classes interact is that a Subscriber
subscribes to a Publisher
which invokesSubscriber::onSubscribe(Subscription)
. When the Subscriber
is ready to start handling events, it signals this via a request to that Subscription
Upon receiving this signal, the Publisher
begins to invoke Subscriber::onNext(T)
for each event T
. This continues until either completion of the stream (Subscriber::onComplete()
) or an error occurs during processing (Subscriber::onError(Throwable)
).
Reactor
Clearly, the reactive streams API is too low level to make practical use of it in reactive applications. This is where Reactor Core (or RxJava) comes into play by providing a DSL for handling these streams. Reactor provides two main types of publishers: Flux
and Mono
. A Flux
is a general purpose publisher that can contain an unbounded number of events. A Mono
is a specialized publisher that can contain only zero or one events. For a hands-on introduction to these classes, take a look at this GitHub project and try to implement all the test stubs. We’ll go over some of the tests below.
Trying Out Reactor
As a neat way to get familiar with the Reactor Core API, we’ll be implementing the code necessary to pass these test stubs. These tests will cover Flux
, Mono
, Scheduler
, StepVerifier
, and bridges to other APIs.
Flux
Our first task is to create an empty Flux
. Such a Flux
will publish no events and will immediately complete without error. Flux
provides a static factory method for creating such a Flux
:
Flux<String> emptyFlux() {
return Flux.empty();
}
Our next task is to create a Flux
that will stream through two strings and complete. Flux
provides another static factory method for this:
Flux<String> fooBarFluxFromValues() {
return Flux.just("foo", "bar");
}
Next, we need to create a Flux
from a standard List
collection. Again, we can use a factory method:
Flux<String> fooBarFluxFromList() {
return Flux.fromIterable(Arrays.asList("foo", "bar"));
}
Next, we want a Flux
that errs immediately instead of completing normally. Did you guess that there’s a factory method for it?
Flux<String> errorFlux() {
return Flux.error(new IllegalStateException());
}
Finally, things get a little interesting by creating a Flux
that emits the sequence 0 through 9, and each emission should be spaced out by 100 ms. Here we’ll use the Flux.interval()
factory and limit it to only 10 items:
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100))
.take(10);
}
Mono
The next test class revolves around basic Mono
APIs. As with Flux
, our first task is to create an empty Mono
. This Mono
will complete immediately as with an empty Flux
:
Mono<String> emptyMono() {
return Mono.empty();
}
Next, we want to create a Mono
that never signals an event, completion, or error. This is available through a factory method:
Mono<String> monoWithNoSignal() {
return Mono.never();
}
The next task is to create a Mono
with a single value. This is available through a similar factory as Flux.just(T...)
:
Mono<String> fooMono() {
return Mono.just("foo");
}
Finally, we create a Mono
that does not emit any events but instead emits an error right away. This works the same as Flux.error(Throwable)
:
Mono<String> errorMono() {
return Mono.error(new IllegalStateException());
}
Transforming Streams
Now that we know the basics of creating Flux
and Mono
publishers, let’s do something with them. There are two styles of mapping events to new objects: synchronously and asynchronously. The synchronous style is rather simple and works the same with both Flux
and Mono
:
Mono<String> toUpperCase(Mono<String> mono) {
return mono.map(String::toUpperCase);
}
However, suppose that our mapped function is asynchronous or could be made asynchronous in order to prevent blocking in our stream. In this style mapping function, instead of returning the same object type, we wrap it into a Mono
so the computed value can be taken when ready:
Flux<String> toUpperCase(Flux<String> flux) {
return flux.flatMap(s -> Mono.just(s.toUpperCase()));
}
But wait, that’s still executing s.toUpperCase()
right away! What if we want to wait until it’s consumed to make the computation? We can wrap the execution into a deferred publisher:
Flux<String> asyncToUpperCase(Flux<String> flux) {
return flux.flatMap(s -> Mono.defer(() -> Mono.just(s.toUpperCase())));
}
Combining Streams
Suppose we have two streams and we wish to interleave their events together into a single stream. We can merge them like so:
Flux<String> interleave(Flux<String> flux1, Flux<String> flux2) {
return flux1.mergeWith(flux2);
}
Now we wish to combine two streams, but we want events from one stream to all happen before events from the other stream. We can concatenate them:
Flux<String> combine(Flux<String> flux1, Flux<String> flux2) {
return flux1.concatWith(flux2);
}
Both of these operations can also be performed on Mono
publishers.
Handling Errors
One of the things that really make reactive streams more useful than the Java 8 streams is the ability to specify error handling at various levels. For example, say we have a Mono
that we wish to provide a default value to return in case of an error:
Mono<String> provideDefaultForError(Mono<String> mono) {
return mono.otherwiseReturn("error string");
}
Perhaps on error we wish to substitute our stream with a different stream:
Flux<String> provideFallbackStreamForError(Flux<String> flux) {
return flux.onErrorResumeWith(e -> Flux.just("foo", "bar"));
}
Other Operations
Suppose we have a Person
class:
public class Person {
public final String firstName;
public final String lastName;
public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
}
Now imagine we have two streams of data, one corresponding to first names, and the other corresponding to last names. We can zip these streams together to form a new stream:
Flux<Person> zipIntoPersons(Flux<String> firstNames, Flux<String> lastNames) {
return Flux.zip(firstNames, lastNames)
.map(pair -> new Person(pair.getT1(), pair.getT2()));
}
Suppose we have two Mono
streams and we want to get an item from whichever one can provide it faster:
Mono<String> getFasterStream(Mono<String> mono1, Mono<String> mono2) {
return mono1.or(mono2);
}
In a similar situation, say we have two Flux
streams and want to use whichever stream can begin emitting items first:
Flux<String> getFasterStream(Flux<String> flux1, Flux<String> flux2) {
return flux1.firstEmittingWith(flux2);
}
Finally, we can take a Flux
and return a Mono<
Void> that is only used to indicate when the stream has completed or erred:
Mono<Void> getCompletionOf(Flux<String> flux) {
return flux.then();
}
Reactive/Blocking Conversions
Perhaps we have a Mono
or Flux
that we wish to make a blocking call to obtain values from for use in a synchronous API. Using a Mono
, we do:
String getValue(Mono<String> mono) {
return mono.block();
}
Using a Flux
, we do:
Iterable<String> getValues(Flux<String> flux) {
return flux.toIterable();
}
Next, we go in the other direction. We wish to convert a collection into a Flux
. Perhaps obtaining the collection is expensive, so we defer looking it up until necessary. In this case, we’ll also specify a Scheduler
so we don’t block our main thread. This scenario is a fast subscriber but slow publisher:
Flux<Person> asyncPersonLookup(PersonRepository repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
.subscribeOn(Schedulers.elastic());
}
In the reverse scenario, suppose we have a fast publisher and a slow subscriber such as storing a record in a database. Using a Flux
for the publisher, we can publish on a separate Scheduler
. Since we’ll be saving data, we’ll just return a Mono<
Void> to indicate when the stream has finished processing:
Mono<Void> asyncSavePersons(Flux<Person> flux, PersonRepository repository) {
return flux.publishOn(Schedulers.parallel())
.doOnNext(repository::save)
.then();
}
Blog Microservice Example
In the previous Lagom article, we implemented a blog microservice. Let’s do the same thing here, but we’ll use Reactor Core along with Spring 5. If you’re not already familiar with Lombok, take a look at the Lagom article for more information. The source code is on GitHub for this example.
The goal of this microservice is to provide a way to create, read, and update blog posts. We’ll begin with a data model for our blog posts:
@Value
@AllArgsConstructor(onConstructor = @__(@JsonCreator))
public class PostContent {
@NonNull
String title;
@NonNull
String author;
@NonNull
String body;
}
There are a bit less annotations to specify here than in the Lagom example, but that’s because Spring does not have a strong opinion about mutability. This data model will be used for our REST API and has the same structure as PostContent
from the Lagom version.
Next, we’ll specify our REST controller which is equivalent to our Lagom service. Since we’re working with reactive microservices, we’ll be using reactive streams rather than directly using our data model.
@RestController
@RequestMapping(produces = MediaType.APPLICATION_JSON_VALUE)
@AllArgsConstructor(onConstructor = @__(@Autowired))
public class BlogController {
private final BlogService service;
@GetMapping("/api/blog/{id}")
public Mono<PostContent> getPost(@PathVariable final UUID id) {
return service.getPost(id);
}
@PostMapping("/api/blog")
public Mono<UUID> addPost(@RequestBody Mono<PostContent> content) {
return service.addPost(content);
}
@PutMapping("/api/blog/{id}")
public Mono<Void> updatePost(@PathVariable final UUID id, @RequestBody final Mono<PostContent> content) {
return service.updatePost(id, content);
}
}
Already we can see that Spring 5 has added support for using Mono
and Flux
for both request bodies and response bodies. For our API, we only use single objects, so we only use Mono
. Also note that Mono
has replaced our use of Optional
as it functions similarly to a reactive Optional
.
Next, we’ll define our entity class. You may notice we are no longer using the CQRS pattern; that’s because Lagom made it easy to follow by implementing a bit of the tedium involved in tracking snapshots of state along with doing most of the work of implementing the read side of things. So, instead of using an event log directly, we’ll create a keyspace and table in Cassandra manually and use that. Note that Cassandra itself does implement its storage engine using an event log, so updates are just as fast as creates are regardless of our pattern.
@Value
@AllArgsConstructor(onConstructor = @__(@JsonCreator))
@Table
public class BlogPost {
@PrimaryKey
@NonNull
UUID id;
@NonNull
String title;
@NonNull
String author;
@NonNull
String body;
}
The only new annotations here are @Table
and @PrimaryKey
which are both from the spring-data-cassandra API. We’ll also define a spring-data repository interface for this class:
public interface BlogRepository extends CrudRepository<BlogPost, UUID> {
}
Since we don’t have any special querying needs, consequently we don’t need to create any custom repository methods.
To set up our Cassandra data store, we’ll create a single keyspace and table for our data. In the example repository, this is handled on startup using an embedded Cassandra instance. However, it could be easily refactored to use an external one that was set up ahead of time.
CREATE KEYSPACE blogs WITH REPLICATION = { 'class' : 'SimpleStrategy' };
CREATE TABLE blogs.blogpost (
id uuid PRIMARY KEY,
title text,
author text,
body text
);
Finally, we get to the interesting part: bridging the non-reactive Spring Data API into our reactive REST API. Using the reactive stream concepts from earlier, we implement our service class:
@Service
@AllArgsConstructor(onConstructor = @__(@Autowired))
public class BlogService {
private final BlogRepository repository;
public Mono<PostContent> getPost(final UUID id) {
return Mono.defer(() -> Mono.justOrEmpty(repository.findOne(id)))
.subscribeOn(Schedulers.elastic())
.map(post -> new PostContent(
post.getTitle(), post.getAuthor(), post.getBody()));
}
public Mono<UUID> addPost(final Mono<PostContent> contentMono) {
return contentMono
.map(content -> new BlogPost(UUID.randomUUID(),
content.getTitle(), content.getAuthor(), content.getBody()))
.publishOn(Schedulers.parallel())
.doOnNext(repository::save)
.map(BlogPost::getId);
}
public Mono<Void> updatePost(final UUID id, final Mono<PostContent> mono) {
return mono
.map(content -> new BlogPost(id,
content.getTitle(), content.getAuthor(), content.getBody()))
.publishOn(Schedulers.parallel())
.doOnNext(repository::save)
.then();
}
}
Notice that our getPost()
method uses Mono.defer()
to use a blocking API (in this case, the Spring Data CrudRepository
API) and fetch an item which may be null
, then maps it to our desired data format. Our addPost()
method works in reverse by mapping to our entity format and then publishes on a scheduler to save the repository item. Finally, our updatePost()
method works essentially the same way.
As can be seen, Spring 5 is making reactive streams programming much easier by building in direct support for Reactor and RxJava into Spring Web. However, there is still room for improvement in further Spring releases to add first class reactive stream support to other Spring projects, particularly Spring Data.