Mastering Java Reactive Programming [From Scratch] with Flux: A Practical Introduction Part-3

Ranjeet Kumar
8 min readJun 7, 2024

--

Welcome back to our series on mastering Java Reactive Programming! In this part, we’ll dive deeper into Flux, a powerful component of Project Reactor. Whether you’re new to reactive programming or looking to strengthen your skills, this guide will walk you through Flux with simple explanations and practical examples. Let’s get started!

1. Flux — Just

The Flux.just() method is one of the simplest ways to create a Flux. It creates a Flux that emits a specified sequence of items. Suppose you want to emit a sequence of strings “Hello” and “World”. This is useful when you need to create a simple, predefined sequence of elements that can be easily emitted by a Flux.

Example:

Flux<String> flux = Flux.just("Hello", "World");
flux.subscribe(System.out::println);

Output:

Hello
World

2. Flux — Multiple Subscribers

A Flux can have multiple subscribers. Each subscriber receives the same sequence of events. Suppose you have two components in your application that need to process the same sequence of strings “Hello” and “World”. With Flux, you can have both components subscribe to the same sequence of events.

Example:

Flux<String> flux = Flux.just("Hello", "World");

flux.subscribe(data -> System.out.println("Subscriber 1: " + data));
flux.subscribe(data -> System.out.println("Subscriber 2: " + data));

Output:

Subscriber 1: Hello
Subscriber 1: World
Subscriber 2: Hello
Subscriber 2: World

3. Flux — From Array/List

You can create a Flux from an array or a list using Flux.fromArray() and Flux.fromIterable(). Suppose you have an array and a list of strings and you want to create a Flux from them. This is useful when you want to convert existing collections into a reactive stream for further processing.

Example:

String[] array = {"A", "B", "C"};
Flux<String> fluxFromArray = Flux.fromArray(array);

List<String> list = Arrays.asList("X", "Y", "Z");
Flux<String> fluxFromList = Flux.fromIterable(list);

fluxFromArray.subscribe(System.out::println);
fluxFromList.subscribe(System.out::println);

Output:

A
B
C
X
Y
Z

4. Flux — From Stream

You can also create a Flux from a Java Stream using Flux.fromStream(). Suppose you have a stream of numbers and you want to create a Flux from it. This is useful when working with Java Streams and you want to leverage the reactive programming model.

Example:

Stream<String> stream = Stream.of("1", "2", "3");
Flux<String> fluxFromStream = Flux.fromStream(stream);
fluxFromStream.subscribe(System.out::println);

Output:

1
2
3

5. Flux — Range

Flux.range() creates a Flux that emits a sequence of integers within a specified range. Suppose you want to create a Flux that emits numbers from 1 to 5. This can be useful for generating sequences of numbers for iteration or testing.

Example:

Flux<Integer> rangeFlux = Flux.range(1, 5);
rangeFlux.subscribe(System.out::println);

Output:

1
2
3
4
5

6. Log Operator

The log() operator allows you to log the events in a Flux for debugging purposes. Suppose you want to debug a Flux that emits numbers from 1 to 3. The log() operator helps you see what's happening inside the Flux, such as subscription, request, and emission events.

Example:

Flux<Integer> flux = Flux.range(1, 3).log();
flux.subscribe(System.out::println);

Output (with log details):

| onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
| request(unbounded)
| onNext(1)
1
| onNext(2)
2
| onNext(3)
3
| onComplete()

7. Flux Vs List

A Flux is not just a List. While both can hold multiple elements, a Flux is asynchronous and can handle potentially infinite sequences of data. It also supports backpressure and non-blocking I/O. Suppose you need to process data in real-time as it arrives, rather than waiting for the entire dataset to be available like a List. A Flux is ideal for handling real-time data streams and asynchronous processing.

Example: Flux is used to handle streaming data, whereas a List holds all data in memory.

List Example:

List<String> list = Arrays.asList("A", "B", "C");
list.forEach(System.out::println);

Flux Example:

Flux<String> flux = Flux.just("A", "B", "C");
flux.subscribe(System.out::println);

Output for both:

A
B
C

8. ChatGPT vs Gemini

While ChatGPT is a powerful AI developed by OpenAI, Gemini, part of the Gemini suite, is Google’s AI. Both are advanced in their capabilities, with different strengths. For instance, ChatGPT excels in natural language understanding and generation, while Gemini focuses on AI integration across Google’s products. Comparing the use cases of two advanced AI systems.

Example:

  • ChatGPT: Excellent for creating conversational agents, content generation, and language translation.
  • Gemini: Integrates well with Google services, used for data analysis, and enhancing search capabilities.

9. FAQ — Are Mono & Flux Data Structures?

No, Mono and Flux are not traditional data structures. They are reactive streams that handle asynchronous data sequences. Mono handles a single value or empty, while Flux handles multiple values. Explaining the difference between Mono/Flux and traditional data structures.

Example:

  • Mono: Mono.just("Hello") - Emits a single item "Hello".
  • Flux: Flux.just("A", "B", "C") - Emits multiple items "A", "B", "C".

10. Flux — Non-blocking IO Stream — Demo

Flux supports non-blocking I/O operations, making it ideal for tasks like reading from a file or a web request asynchronously. Reading a file asynchronously using Flux.

Example:

Flux<String> fileFlux = Flux.using(
() -> Files.lines(Paths.get("data.txt")),
Flux::fromStream,
BaseStream::close
);

fileFlux.subscribe(System.out::println);

Output (Assuming data.txt contains):

Line 1
Line 2
Line 3

11. Flux — Interval

Flux.interval() creates a Flux that emits long values at regular intervals. Creating a Flux that emits values every second. This is useful for scheduling tasks or creating timed sequences.

Example:

Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1));
intervalFlux.subscribe(System.out::println);

Output (values emitted at 1-second intervals):

0
1
2
...

12. Flux — Empty/Error

You can create an empty Flux or a Flux that emits an error. Handling scenarios where no data or an error condition is possible. This is useful for managing edge cases in your reactive streams.

Example:

Flux<String> emptyFlux = Flux.empty();
emptyFlux.subscribe(System.out::println);

Flux<String> errorFlux = Flux.error(new RuntimeException("Error occurred"));
errorFlux.subscribe(System.out::println, System.err::println);

Output:

Error occurred

13. Flux — Defer

Flux.defer() allows you to create a new Flux instance for each subscriber, providing fresh data. Emitting current time for each new subscriber. This is useful when you want to provide up-to-date data for each subscription.

Example:

Flux<String> deferFlux = Flux.defer(() -> Flux.just("Current Time: " + LocalDateTime.now()));
deferFlux.subscribe(System.out::println);
Thread.sleep(1000);
deferFlux.subscribe(System.out::println);

Output (with 1 second gap):

Current Time: 2024-06-01T12:00:00
Current Time: 2024-06-01T12:00:01

14. Mono/Flux Conversion

You can convert between Mono and Flux using .flux() and .next() methods. Converting a Mono to a Flux and vice versa. This is useful when you need to switch between handling single values and multiple values in your reactive streams.

Example:

Mono<String> mono = Mono.just("Hello");
Flux<String> flux = mono.flux();

Flux<String> stringFlux = Flux.just("A", "B", "C");
Mono<String> firstMono = stringFlux.next();

flux.subscribe(System.out::println);
firstMono.subscribe(System.out::println);

Output:

Hello
A

15. Assignment

Create a Flux that emits numbers from 1 to 10, logs each number, filters out even numbers, and then converts each number to its square.

Example:

1
9
25
49
81

16. Assignment Solution

Solution:

Flux.range(1, 10)
.log()
.filter(n -> n % 2 != 0)
.map(n -> n * n)
.subscribe(System.out::println);

Output:

1
9
25
49
81

17. Summary

In this part, we covered several essential topics on Flux, including creation methods, multiple subscribers, and advanced operators. Understanding these concepts will help you build robust reactive applications. Now let’s answer the some questions listed below , use paper pen to write the answer and check your knowledge.

18. Quiz

1.)What does Flux.just() do?

  • A. Creates a Flux that emits a single item.
  • B. Creates a Flux that emits a specified sequence of items.
  • C. Creates a Flux that emits items from a list.
  • D. Creates a Flux that emits items at intervals.

2. How can you create a Flux from an array?

  • A. Flux.just(array)
  • B. Flux.fromArray(array)
  • C. Flux.create(array)
  • D. Flux.range(array)

3. What is the purpose of the log() operator?

  • A. To handle errors.
  • B. To convert Mono to Flux.
  • C. To log events in a Flux for debugging.
  • D. To filter items in a Flux.

4. How is Flux different from a List?

  • A. Flux is synchronous, List is asynchronous.
  • B. Flux holds all data in memory, List handles data streams.
  • C. Flux is asynchronous and can handle infinite sequences.
  • D. Flux supports only a single value, List supports multiple values.

5. How do you handle errors in a Flux?

  • A. Using the error operator.
  • B. Using the log operator.
  • C. Using the subscribe method with an error consumer.
  • D. Using the map method.

6. What method would you use to create a Flux from a Java Stream?

  • A. Flux.just()
  • B. Flux.fromStream()
  • C. Flux.fromIterable()
  • D. Flux.fromArray()

7. Which operator allows you to create a Flux that emits values at regular intervals?

  • A. interval()
  • B. range()
  • C. just()
  • D. fromIterable()

8. How can you create a new Flux instance for each subscriber, providing fresh data?

  • A. Flux.defer()
  • B. Flux.create()
  • C. Flux.generate()
  • D. Flux.push()

9. What is the output of Flux.range(1, 5)?

  • A. A Flux that emits integers from 1 to 5.
  • B. A Flux that emits integers from 1 to 4.
  • C. A Flux that emits integers from 0 to 4.
  • D. A Flux that emits a single integer 5.

10. Which method would you use to convert a Mono to a Flux?

  • A. next()
  • B. flux()
  • C. defer()
  • D. from()

11. What does Flux.empty() do?

  • A. Creates a Flux that emits no items and completes immediately.
  • B. Creates a Flux that emits a single item and completes.
  • C. Creates a Flux that emits an error.
  • D. Creates a Flux that emits a sequence of null values.

12. What is the use of Flux.error()?

  • A. To log an error in the Flux.
  • B. To create a Flux that emits an error immediately.
  • C. To handle errors in the Flux.
  • D. To filter items in the Flux.

13. How can you convert a Flux to a Mono that emits only the first item?

  • A. next()
  • B. first()
  • C. just()
  • D. single()

14. What is the purpose of Flux.interval()?

  • A. To create a Flux that emits a single item.
  • B. To create a Flux that emits items from a list.
  • C. To create a Flux that emits values at regular intervals.
  • D. To create a Flux that emits an error at intervals.

15. Which method creates a Flux from a list?

  • A. Flux.just()
  • B. Flux.fromIterable()
  • C. Flux.fromArray()
  • D. Flux.range()

16. How can you log events in a Flux for debugging?

  • A. Using the map() method.
  • B. Using the log() method.
  • C. Using the filter() method.
  • D. Using the subscribe() method.

17. What does Flux.defer() provide for each new subscriber?

  • A. The same data for all subscribers.
  • B. Fresh data for each new subscriber.
  • C. An error for each new subscriber.
  • D. A single item for all subscribers.

18. Which operator is used to create a Flux that emits a sequence of integers within a specified range?

  • A. interval()
  • B. range()
  • C. just()
  • D. fromIterable()

19. What is the purpose of Flux.fromArray()?

  • A. To create a Flux from a list.
  • B. To create a Flux from a Java Stream.
  • C. To create a Flux from an array.
  • D. To create a Flux that emits values at intervals.

20. Which method is used to handle asynchronous data sequences that can emit multiple values?

  • A. Mono
  • B. Flux
  • C. Stream
  • D. List

Answers:

  1. B. Creates a Flux that emits a specified sequence of items.
  2. B. Flux.fromArray(array)
  3. C. To log events in a Flux for debugging.
  4. C. Flux is asynchronous and can handle infinite sequences.
  5. C. Using the subscribe method with an error consumer.
  6. B. Flux.fromStream()
  7. A. interval()
  8. A. Flux.defer()
  9. A. A Flux that emits integers from 1 to 5.
  10. B. flux()
  11. A. Creates a Flux that emits no items and completes immediately.
  12. B. To create a Flux that emits an error immediately.
  13. A. next()
  14. C. To create a Flux that emits values at regular intervals.
  15. B. Flux.fromIterable()
  16. B. Using the log() method.
  17. B. Fresh data for each new subscriber.
  18. B. range()
  19. C. To create a Flux from an array.
  20. B. Flux

Feel free to ask any questions in the comments or share your own examples. Happy coding!

--

--