11. March 2019
by Andreas Grub | 1498 words | ~8 min read
This post shows how to dispatch a flux of items to services of separated functional domains when using Reactor in Java. The author encountered this problem while developing a larger reactive application, where a strict separation of different domains of the application is key to maintain a clean architecture.
Reactor is a library for developing reactive applications and its reference guide is a good read to understand the basic principles of reactive programming.
The examples the author found for Reactor, or for other reactive libraries, show how to deal with a flux of items without mentioning how to dispatch and share this flux among separated functional domains. When mapping a flux to one functional domain, there is no obvious way to obtain the original flux to map it to another function domain. In the following, an example will detail the problem and present a solution for the Reactor library.
This section introduces an example application which will be transformed later to a reactive one. It will dispatch some deletion tasks to independent services, which is a common feature of larger software systems.
A customer is represented by (the usual Java boiler plate such as
getters, setters, equals, hashCode, toString is omitted)
A customer has its own account and a set of associated invoices. The classes
CustomerId, AccountId, InvoiceId here are simple wrapper classes to uniquely identify the corresponding entities.
A service supposed to delete a set of customers has the interface
An implementation of CustomerService should take care of deleting the account and the invoices as well.
The deletion of the customers itself is delegated to an underlying
customerRepository, which returns a collection of the deleted customers for further processing (this “find and delete” pattern is common for NoSQL databases, such as MongoDB).
Furthermore, the deletion of the associated accounts and invoices are delegated to the respective
invoiceService, which have the following interface:
Note that this example application has clearly separated domains, which are the customers, the invoices and the accounts.
Turning the service interfaces into reactive services is straight forward:
Note that returning a
Mono<Void> is the reactive way of telling the caller that the requested operation has completed (with or without errors). Also note that the input to the
ReactiveCustomerRepository stays non-reactive, as we want to focus on the reactive implementation of the
CustomerService in combination with
A first attempt to implement
CustomerService reactively could lead to the following code
However, when using the following dummy implementation for the
the following output is obtained:
This might be surprising as the
reactiveCustomerRepository is requested twice to generate the customer. If the repository wasn’t a dummy implementation here, the account deletion would have consumed all those
deletedCustomers, and the subsequent invoice deletion would have worked on a completed stream (meaning doing nothing at all). This is certainly undesired behavior.
The reference documentation has an answer to this problem: Broadcasting to multiple subscribers with
.publish(). The failing attempt should thus be modified as follows
.autoConnect(2) is used, the subscription to the repository publisher only happens if two subscriptions have happened downstream. This requires the
reactiveInvoiceService to return a
Mono<Void> which completes once the given input flux is consumed completely, which ensures one subscription. The second subscription is achieved by merging the output together with original input flux.
The output is then as expected
At this point, the
reactiveInvoiceService could now also decide to .buffer their own given flux if they wanted to delete the given accounts or invoices in batch. Each implementation is free to choose a different buffer (or batch) size on its own. This is an advantage over the non-reactive implementation, where all items have been collected in one large list beforehand and are then given in bulk to the
The above working solution has already been written such that a generic utility method can be extracted
Flux.mergeDelayError is used which handles the situation better if the worker returns an error for completion. In this particular use case, it’s desired that deletion continues even if one worker fails to do so. The worker is also expected to return a
Mono<Void> which completes once the input flux is consumed. The simplest worker function would thus be
The unchecked cast could not be removed, but in this circumstance it should never fail as the merged flux can only contain items of type
T, as the
Mono<Void> just completes with no items at all.
A usage example in a more reactive style of coding would be
Note the pattern of using
.transform together with the utility function. The output is the same as the working example above.
Reactive applications should still follow the overall architecture of larger applications, which are usually split into several components for each functional domain. This approach clashes with reactive programming, where usually one stream is mapped with operators and dispatching work to other services is not easily supported. This post shows a solution, although usage in Java of the presented utility function is still somewhat clumsy.
In Kotlin, the usage of extension functions would make this utitilty easier to use without the rather clumsy
.transform pattern above.
It’s also open if there’s a better solution for the presented problem. Comments welcome!