compose() vs. transform() vs. as() vs. map() in Flux and Mono
I found the example in reference documentation little difficult to follow
So made the below programs to wrap my head around the tranform vs compose concept.
fnstatefull = flux -> {
Flux<String> f = flux.filter(color -> {
//only reds are allowed
return color.equalsIgnoreCase("red");
});
//applies mapping 'toUpperCase' based on the external control 'toUpper'
if(toUpper) {
f= f.map(String::toUpperCase);
}
return f;
};
Transform
The operator is applied at the time of instantiation of the flux.
fnstatefull will behave same way for both subscribers below.
Flux<String> f = Flux.just("red", "green", "blue");
toUpper = false;
f = f.transform(fnstatefull);
toUpper = true;
f.subscribe(op -> log.error("ONE>>>" + op));
toUpper = false;
f.subscribe(op -> log.error("TWO>>>" + op));
Output
ReactordemoApplication - ONE>>>red
ReactordemoApplication - TWO>>>red
Compose
The operator is applied at the time of subscription to the flux.
fnstatefull will behave differently for each subscriber below.
Flux<String> f = Flux.just("red", "green", "blue");
toUpper = false;
f = f.compose(fnstatefull);
toUpper = true;
f.subscribe(op -> log.error("ONE>>>" + op));
toUpper = false;
f.subscribe(op -> log.error("TWO>>>" + op));
Output
ReactordemoApplication - ONE>>>RED
ReactordemoApplication - TWO>>>red
You have two broadly different categories of operators here:
Operators that work on the Flux
itself
transform
and transformDeferred
are for code mutualization
When you compose chains of operators regularly and you have common operator usage patterns in your application, you can mutualize this code or give it a more descriptive name by using transform
and transformDeferred
.
The difference between the two is when the mutualized operators are applied: transform
applies them at instantiation, while transformDeferred
applies them at subscription (allowing for dynamic choice of the added operators).
Have a look at the reference documentation for more details and examples.
note: transformDeferred
was called compose
in versions prior to 3.3.0
as
This is a convenience shortcut to apply a Function
to the whole Flux
while keeping the whole code in a fluent style.
The major differentiator with transform*
operators is that this one doesn't enforce a particular return type. It is all driven by the Function
you use, and could for instance be used for testing with a StepVerifier
in a fluent style:
Flux.just("test")
.map(String::length)
.as(StepVerifier::create)
//from there on we're dealing with the StepVerifier API
.expectNext(4)
.verifyComplete();
The example shown in the javadoc uses this approach to convert to a Mono
using Mono::from
, which is a bit confusing because the return type is quite close to Flux
.
Note that this approach can also helps with external operators that are implemented in a factory method style to "extend" the Flux
API
Take reactor-addons
MathFlux
for example, and compare:
MathFlux.sumInt(Flux.range(1, 10)
.map(i -> i + 2)
.map(i -> i * 10))
.map(isum -> "sum=" + isum);
To:
Flux.range(1, 10)
.map(i -> i + 2)
.map(i -> i * 10)
.as(MathFlux::sumInt)
.map(isum -> "sum=" + isum)
(this can help you deal with the fact that, unlike Kotlin, Java doesn't have extension methods :) )
Operator that works on the data that goes through the Flux
map
is all about the data. It applies a 1-1 transformation function to each element in the source, as they become available.
In the MathFlux example above, map
is successively used to add 2 to each original integer, then again to multiply each number in the sequence by 10, then a third time at the end to produce a String
out of each sum.