Home Post Architecture

Reactive programming in Java with Project Reactor

Mar 31, 2024

Reactive programming is a declarative programming paradigm that focuses on building applications that are responsive, resilient, and scalable in the face of modern challenges like concurrency, distributed systems, and asynchronous data streams.

Reactive programming provides a set of tools, patterns, and abstractions to handle asynchronous and event-driven programming more effectively.

Imperative programming focuses on describing the step-by-step instructions or commands that the computer needs to follow to achieve a specific task. In this paradigm, you explicitly state how to perform each operation and control flow in your code. The emphasis is on "how" the computation should be done.

int sum = 0;
for (int i = 1; i <= 10; i++) {
    sum += i;
}
System.out.println("Sum: " + sum);

Declarative programming emphasizes specifying what you want to achieve rather than detailing how to achieve it. You describe the desired outcome or the properties of the result, and the programming language or framework handles the execution details. The emphasis is on "what" the computation should accomplish.

int sum = IntStream.rangeClosed(1, 10).sum();
System.out.println("Sum: " + sum);

Reactive programming use cases

Here are some common use cases for reactive programming:

1) Web Applications: Reactive programming allows handling incoming HTTP requests, user interactions, and asynchronous operations without blocking the application's responsiveness.

2) Real-Time Data Streams: Reactive programming is ideal for processing real-time data streams such as stock market updates, sensor data from IoT devices, social media feeds, or online gaming events.

3) Interactive User Interfaces: Reactive programming can enhance user interfaces by allowing them to respond immediately to user input, such as mouse clicks, keyboard events, and touch gestures.

4) Microservices and APIs: When building microservices and APIs, reactive programming can help manage multiple incoming requests concurrently, enabling efficient resource utilization and better handling of spikes in traffic.

5) Data Processing Pipelines: For data-intensive applications, such as ETL (Extract, Transform, Load) processes, reactive programming can simplify the processing of large volumes of data by leveraging parallelism and concurrency.

Traditional Vs Reactive programming in Java

In traditional programming, tasks are executed sequentially, and blocking operations are common. If an operation takes time, the program blocks, waiting for that operation to complete before moving on.

This approach can lead to inefficiencies, especially in scenarios with multiple concurrent tasks.

Processed result: Operation Result
Program completed.

Reactive programming, on the other hand, revolves around the concept of reacting to events and handling asynchronous operations without blocking.

Instead of waiting for an operation to complete, reactive programming uses streams of data to react to events as they occur. This approach enables better resource utilization and responsiveness.

Continuing program execution...
Processed result: Reactive Result
Program completed.

In the reactive programming example, the Mono type from the Reactor library represents a reactive stream. The performReactiveOperation() method returns a Mono that encapsulates a non-blocking operation.

By using the subscribeOn(Schedulers.parallel()) scheduler, the operation is executed asynchronously, allowing the program to continue its execution without waiting.

Reactive programming vs CompletableFuture

Also remember that, we can achieve some level of reactive behavior using CompletableFuture as well.

Continuing program execution...
Processed result: CompletableFuture Result
Program completed.

In this version, I've replaced the Reactor's Mono and Schedulers with Java's CompletableFuture and Executors.

The join() method in CompletableFuture is a blocking operation that waits for the result of the asynchronous computation to be available.

While CompletableFuture does offer a way to handle asynchronous operations, joining their result before returning using join() can block the current thread until all CompletableFuture are available.

This is not in line with the fully non-blocking and asynchronous principles of reactive programming.

Reactive programming libraries like Reactor provide more advanced tools for managing complex asynchronous scenarios without blocking the main thread, handling backpressure, and effectively managing concurrency and data streams.

In a web server context, it's crucial to avoid blocking the request threads, allowing them to handle other incoming requests while waiting for asynchronous tasks to complete.

Reactive programming libraries have been designed specifically to address these challenges, providing mechanisms like event loops and efficient thread management to ensure that request threads are not blocked unnecessarily.

In contrast, in the case of CompletableFuture, the request threads are held up, causing poor performance and potential resource exhaustion.

Moreover, CompletableFuture doesn't address the elasticity requirement of the Reactive Manifesto as it does not handle backpressure.

Reactive programming libraries available in Java

There are several reactive programming libraries available in Java that provide tools and abstractions for working with asynchronous and event-driven programming using reactive principles.

Reactor

Reactor (Project Reactor) is a reactive programming library developed by Pivotal.

It's built on top of the Reactive Streams specification and provides both the Flux and Mono types for working with reactive streams.

It's widely used in the Spring ecosystem for building reactive applications.

RxJava

RxJava is a library that implements the ReactiveX (Rx) API for Java.

It offers an extensive set of reactive types and operators for composing and transforming asynchronous and event-based code. RxJava is known for its flexibility and wide range of supported platforms.

Akka Streams

Akka Streams is part of the Akka toolkit, which is designed for building highly concurrent, distributed, and fault-tolerant systems.

Akka Streams provides a high-level DSL for defining and processing reactive streams.

Project Loom (Virtual Threads)

Project Loom is an experimental project led by OpenJDK community to introduce lightweight, virtual threads (also known as fibers) to the Java platform.

Although not a traditional reactive programming library, virtual threads enable a more efficient way of handling concurrency and can be used to implement reactive patterns.

Vert.x

Vert.x is a toolkit for building reactive and event-driven applications on the JVM.

It provides features for building web applications, microservices, and more using an event-driven and non-blocking architecture.

What is Reactor ?

Reactor is an open-source reactive programming library for building asynchronous, event-driven, and non-blocking applications in Java.

It provides abstractions for working with reactive streams, allowing developers to handle concurrency and asynchronous operations more efficiently and with better responsiveness.

Reactor implements the Reactive Streams specification, which defines a standard for asynchronous stream processing with non-blocking backpressure. This helps manage the flow of data and prevents overwhelming downstream components.

Reactor introduces two main types, Flux and Mono, which represent streams of data.

Flux is used for handling sequences of zero or more elements, while Mono represents a stream of at most one element. These types allow developers to work with reactive streams in a clear and expressive way.

Reactor provides a comprehensive set of operators for transforming, filtering, combining, and handling reactive streams. These operators enable developers to create complex data processing pipelines while maintaining the reactive nature of the application.

Reactor offers schedulers that allow you to control the threading context in which reactive operations are executed. This is crucial for managing concurrency, parallelism, and ensuring that certain operations run on specific threads.

Reactor's implementation of the Reactive Streams specification includes built-in support for backpressure, which is a mechanism for controlling the flow of data between publishers and subscribers to prevent resource exhaustion.

Reactor is a core component of the Spring Framework's reactive programming support. It's integrated with Spring's reactive components to enable building reactive microservices, web applications, and data pipelines.

Reactor's API is designed to be composable and functional, making it easier to reason about asynchronous and event-driven code. It encourages using functional programming patterns to create more predictable and maintainable code.

Reactor enables developers to utilize async programming patterns, such as asynchronous methods and callbacks, without introducing callback hell or the complexities associated with traditional callback-based approaches.

How to create a Reactor project in Java

To create a Reactor project in Java, you typically start by setting up a Maven or Gradle project and then adding the Reactor dependency.

1) Open a command prompt or terminal, navigate to your project directory, and run the following Maven command to initialize a new Maven project:

mvn archetype:generate -DgroupId=com.cb -DartifactId=reactor-project-demo -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

2) Open the newly created project in your preferred IDE.

3) Open the pom.xml file in your project directory and add the Reactor dependency to the section:

4) Here's a basic example:

Compile and run the Java class using your IDE or the command line. This example creates a simple Mono that emits a message and subscribes to it to print the message when it's available.

What are Mono and Flux ?

Mono and Flux are fundamental building blocks in reactive programming libraries like Reactor. They represent two different types of reactive streams for handling asynchronous and event-driven operations in a non-blocking and efficient manner.

Mono

Mono is short for "monadic" and represents a stream that emits at most one item or an error.

Monos are suitable for scenarios where you expect either a single result or an error, such as database queries that retrieve a single record.

Flux

Flux represents a stream that emits zero or more items.

Flux can emit multiple values, possibly followed by an error or completion signal.

It's suitable for scenarios where you have multiple pieces of data arriving asynchronously, such as streaming data from a sensor or handling multiple concurrent requests.

Operators and Transformations in 'Reactor'

Reactive programming libraries like Reactor provide a variety of operators that you can use to transform, filter, combine, and handle streams of events or data. Here are some common operators and transformations you can apply to reactive streams:

'Map' in Reactor

The map operator allows you to transform the emitted items in a stream by applying a function to each item.

'FlatMap' in Reactor

The flatMap operator is used to transform each item emitted by the source stream into a new stream (Publisher).

The transformation function (called the "mapper" function) takes an input element and returns a Publisher. The resulting output is a stream of elements that are emitted from the inner streams generated by the mapper function.

A: 1
B: 1
A: 2
B: 2
A: 3
B: 3

The key distinction between map and flatMap is that while map transforms each element into a new element, flatMap transforms each element into a new stream of elements, and then "flattens" these streams into a single output stream.

'Filter' in Reactor

The filter operator lets you selectively include or exclude items from the stream based on a condition.

Number: 2

'Reduce' in Reactor

The reduce operator aggregates the elements in a stream and emits a single value as the result.

6

'Scan' in Reactor

The scan operator is similar to reduce, but it emits the intermediate results as the stream progresses.

0
1
3
6

'Zip' in Reactor

The zip operator combines multiple streams element-wise and emits the combined results as tuples.

Running Cat
Sitting Dog

Differences between the 'flatMap', 'flatMapSequential', and 'concatMap'

The flatMap, flatMapSequential, and concatMap operators are used to transform and process reactive streams in a reactive programming context. While they might seem similar, they exhibit distinct behaviors in terms of concurrency, ordering, and how they handle inner streams.

1) flatMap Operator

Subscribes eagerly to all inner Publishers (inner streams) generated by the transformation function for each element emitted by the source stream.

Does not preserve the original ordering of elements from the source stream in the output. Inner streams can emit their values in any order, resulting in a mixed sequence.

Suitable for scenarios where concurrency among inner streams is desired, and the order of results is not important.

Useful when processing inner streams concurrently, such as making parallel network requests.

2) flatMapSequential Operator

Subscribes eagerly to all inner Publishers (inner streams) like flatMap.

Preserves the original ordering of elements from the source stream. The results are emitted in the order they were encountered in the source stream.

Does not introduce concurrency by default. The next inner stream is subscribed to only after the previous one completes.

Suitable when maintaining the order of emitted results is essential, and you want to avoid interleaving.

3) concatMap Operator

Waits for each inner stream to complete before generating and subscribing to the next inner stream.

Preserves the same order as the source elements. The order of emitted results in the output matches the order of the source elements.

Sequentially processes inner streams in the order they were encountered.

Ensures that the next inner stream only starts once the previous one completes.

Useful when maintaining the order of results and processing inner streams sequentially is required, such as maintaining database transactional behavior.

How backpressure is handled in Reactor ?

Backpressure handling is a critical aspect of reactive programming, as it ensures that data streams are processed in a way that doesn't overwhelm downstream components with more data than they can handle.

Reactor adheres to the Reactive Streams specification, which defines the rules and protocols for reactive stream processing. One of the key features of this specification is backpressure handling.

In Reactor, the subscriber signals its demand for data to the publisher. The subscriber does this by sending requests for a certain number of elements it's ready to consume.

The publisher then responds by emitting elements accordingly.

When a subscriber subscribes to a publisher, the subscriber's onSubscribe() method is called. In this method, the subscriber can request an initial number of elements it's prepared to process.

Publishers only emit elements when they receive requests for them from subscribers. This ensures that the publisher doesn't produce more elements than what the subscriber is ready to consume.

Reactor's publishers use internal buffers or queues to hold elements that haven't been requested by subscribers yet. When a subscriber requests elements, the publisher retrieves them from the buffer and emits them.

Backpressure Strategies

Reactor provides several backpressure strategies that publishers can use when the buffer is full and the subscriber can't keep up with the rate of emission.

BUFFER

The buffer is used to store excess elements until the subscriber can consume them.

ERROR

The publisher throws an exception if the buffer is full and backpressure is not being handled properly.

IGNORE

The publisher drops elements that can't be accommodated in the buffer.

LATEST

The publisher keeps the latest emitted element in the buffer and drops earlier ones.

Handling Backpressure on Subscribers

Subscribers can control backpressure by modifying their demand in response to the number of elements they can process.

Here's how you can handle subscriptions using the Subscriber interface in Reactor:

The subscribe() method

In a reactive programming model, subscribers need to explicitly subscribe to a publisher (such as a Mono or a Flux) in order to start receiving events emitted by that publisher.

Here's how you can use the subscribe() method to consume elements and handle completion and errors in Reactor:

In this example, when you call the subscribe() method on a Mono or Flux, you provide three different callback functions:

OnNext

This callback is invoked whenever an element is emitted from the stream. It receives the emitted value.

OnError

This callback is invoked when an error occurs during the processing of the stream. It receives the error object.

OnComplete

This callback is invoked when the stream completes successfully (no more elements to emit).

In Reactor, the subscribe() method itself does not return a Flux or Mono. It returns a Disposable object that represents the subscription to the reactive stream.

The Disposable allows you to manage the subscription, including methods to cancel it if needed.

You can subscribe to the same Flux multiple times in Reactor. Each subscription will trigger the processing of the stream independently, and each subscriber will receive the events emitted by the Flux.

The block() method

The block() method is used in Reactor to synchronously block the calling thread and wait for the result of a Mono or Flux to be available.

However, it's important to use block() with caution, as it can lead to blocking behavior and potential performance issues, especially in reactive systems.

In this example, the block() method is called on both a Mono and a Flux. For the Mono, it blocks until the value is emitted. For the Flux, it blocks until the entire sequence is complete.

It's often more appropriate to work with reactive operations using composition, chaining, and operators like subscribe, map, filter, and others.

The log() operator

In Reactor, the log() operator is a convenient way to add logging to your reactive pipeline. It allows you to log various events and details as data flows through the reactive operators, helping you debug and understand the behavior of your reactive streams.

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(A)
Received: A
[ INFO] (main) | onNext(B)
Received: B
[ INFO] (main) | onNext(C)
Received: C
[ INFO] (main) | onNext(D)
Received: D
[ INFO] (main) | onNext(E)
Received: E
[ INFO] (main) | onComplete()
Mono completed.

The buffer() operator

In Reactor, the buffer() operator is used to group elements emitted by a Flux into batches or buffers. Each buffer is emitted as a List containing a specified number of elements or based on a given duration.

You can use the buffer(int maxSize) variant to buffer elements based on a specified count. The Flux emits a List of elements whenever the specified number of elements has been emitted.

List: [1, 2]
List: [3, 4]
List: [5, 6]
List: [7, 8]
List: [9, 10]

You can use the buffer(Duration timespan) variant to buffer elements based on a specified time duration. The Flux emits a List of elements at regular intervals defined by the specified duration.

Buffer: [0, 1, 2, 3, 4]
Buffer: [5, 6, 7, 8, 9]
Buffer: [10, 11, 12, 13]

Error Handling in Reactor

Reactive programming provides mechanisms to handle errors and gracefully manage failures that might occur during the processing of reactive streams. Here are some common error handling operators and techniques in Reactor:

1) In subscribe() method

You can use the subscribe() method's overloaded version that takes error handling as a parameter.

1
2
Error happened: For input string: "Three"

2) The onError Operator

The onError operator is used to handle errors that occur within a reactive stream.

1
2
Error happened: For input string: "Three"
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NumberFormatException: For input string: "Three"
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NumberFormatException: For input string: "Three"
Caused by: java.lang.NumberFormatException: For input string: "Three"
. 
.

3) The onErrorContinue Operator

The onErrorContinue operator is used to handle errors by continuing the stream with the next item in the source stream when an error occurs. This allows you to handle errors gracefully without terminating the whole stream.

1
2
Error for value: Three, error: For input string: "Three"
4

4) The onErrorResume Operator

The onErrorResume operator in Reactor is used to handle errors by providing an alternative source (another Mono or Flux) to continue the stream with when an error occurs.

1
2
Error: For input string: "Three"
10
20

5) The doFinally operator

The doFinally operator in Reactor allows you to register a callback that will be executed regardless of whether the sequence completes successfully, completes with an error, or is cancelled.

This operator is useful for performing cleanup or finalization tasks after the reactive stream has completed, regardless of the outcome.

Value: Hello, Reactor!
Completed.
Stream completed successfully.

How to Convert a Flux<T> to Mono<T> ?

You can convert a Flux to a Mono using various operators provided by Reactor. Here are a few ways you can achieve this:

1) Using single() Operator

If you want to convert a Flux that emits only one element into a Mono, you can use the single() operator. This is useful when you know the Flux will emit only one element, and you want to work with it as a Mono.

2) Using next() Operator

Similar to single(), if you want to convert a Flux that emits only one element into a Mono, you can also use the next() operator.

3) Using take(1) Operator

If you want to convert a Flux that emits multiple elements into a Mono by taking the first emitted element and ignoring the rest, you can use the take(1) operator.

How to Convert a Mono<T> to Flux<T> ?

Converting a Mono to a Flux involves transforming a single element into a stream of one element. Reactor provides a few operators to achieve this transformation.

Here's how you can convert a Mono to a Flux:

1) Using flux() Operator

The flux() operator is used to convert a Mono into a Flux containing a single element. This can be useful if you want to treat the single element as part of a larger Flux sequence.

2) Using flatMapMany Operator

You can use the flatMapMany operator to convert a Mono into a Flux. This allows you to perform transformations and create a new stream based on the value of the Mono.

How to Convert a Flux<T> to Mono<List<T>> ?

The collectList() operator in Reactor is used to collect all the elements emitted by a Flux into a single Mono>. It transforms a stream of elements into a Mono that emits a single list containing all the collected elements.

Collected List: [1, 2, 3, 4, 5]

This operator is useful when you want to collect all the elements of a Flux into a list and work with them as a batch. Keep in mind that using collectList() will wait for the entire Flux to complete before emitting the collected list. If you are dealing with a large or infinite Flux, be cautious about memory usage and potential blocking behavior.

How to Count Items in a Flux<T> ?

You can count the number of elements in a Flux using the count() operator in Reactor. The count() operator transforms a Flux into a Mono that emits a single value representing the count of elements in the Flux.

Number of Elements: 5

Keep in mind that the count() operator will wait for the entire Flux to complete before emitting the count. If you're dealing with a large or infinite Flux, the counting operation might not complete until the whole stream has finished emitting elements.

avatar

NK Chauhan

NK Chauhan is a Principal Software Engineer with one of the biggest E Commerce company in the World.

Chauhan has around 12 Yrs of experience with a focus on JVM based technologies and Big Data.

His hobbies include playing Cricket, Video Games and hanging with friends.

Categories
Spring Framework
Microservices
BigData
Core Java
Java Concurrency