The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.
This method is useful, for example, when you have an Observable that emits a series of items that themselves have Observable members or are in other ways transformable into Observables, so that you can create a new Observable that emits the complete collection of items emitted by the sub-Observables of these items.
Note that FlatMap merges the emissions of these Observables, so that they may interleave.
In several of the language-specific implementations there is also an operator that does not interleave the emissions from the transformed Observables, but instead emits these emissions in strict order, often called ConcatMap or something similar.
RxGroovy implements the flatMap
operator.
// this closure is an Observable that emits three numbers numbers = Observable.from([1, 2, 3]); // this closure is an Observable that emits two numbers based on what number it is passed multiples = { n -> Observable.from([ n*2, n*3 ]) }; numbers.flatMap(multiples).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
2 3 4 6 6 9 Sequence complete
Note that if any of the individual Observables mapped to the items from the source Observable
by flatMap
aborts by invoking onError
, the Observable produced by
flatMap
will itself immediately abort and invoke onError
.
A version of this variant of the operator (still in Beta as of this writing) takes an additional
int
parameter. This parameter sets the maximum number of concurrent subscriptions
that flatMap
will attempt to have to the Observables that the items emitted by the source
Observable map to. When it reaches this maximum number, it will wait for one of those Observables to
terminate before subscribing to another.
flatMap(Func1)
flatMap(Func1,int)
Another version of flatMap
creates (and flattens) a new Observable for each item
and notification from the source Observable.
A version of this variant of the operator (still in Beta as of this writing) takes an additional
int
parameter. This parameter sets the maximum number of concurrent subscriptions
that flatMap
will attempt to have to the Observables that the items emitted by the source
Observable map to. When it reaches this maximum number, it will wait for one of those Observables to
terminate before subscribing to another.
flatMap(Func1,Func1,Func0)
flatMap(Func1,Func1,Func0,int)
Another version combines items from the source Observable with the Observable triggered by those source items, and emits these combinations.
A version of this variant of the operator (still in Beta as of this writing) takes an additional
int
parameter. This parameter sets the maximum number of concurrent subscriptions
that flatMap
will attempt to have to the Observables that the items emitted by the source
Observable map to. When it reaches this maximum number, it will wait for one of those Observables to
terminate before subscribing to another.
flatMap(Func1,Func2)
flatMap(Func1,Func2,int)
The flatMapIterable
variants pair up source items and generated
Iterable
s rather than source items and generated Observables, but otherwise work
in much the same way.
flatMapIterable(Func1)
flatMapIterable(Func1,Func2)
There is also a concatMap
operator, which is like the simpler version of the
flatMap
operator, but it concatenates rather than merges the resulting
Observables in order to generate its own sequence.
concatMap(Func1)
RxGroovy also implements the switchMap
operator. It behaves much like
flatMap
, except that whenever a new item is emitted by the source Observable, it
will unsubscribe to and stop mirroring the Observable that was generated from the
previously-emitted item, and begin only mirroring the current one.
switchMap(Func1)
In the distinct StringObservable
class (not part of RxGroovy by default) there is also a
split
operator that converts an Observable of Strings into an Observable of Strings that
treats the source sequence as a stream and splits it on a specified regex boundary, then merges the
results of this split.
split
operator
RxJava implements the flatMap
operator.
Note that if any of the individual Observables mapped to the items from the source Observable
by flatMap
aborts by invoking onError
, the Observable produced by
flatMap
will itself immediately abort and invoke onError
.
A version of this variant of the operator (still in Beta as of this writing) takes an additional
int
parameter. This parameter sets the maximum number of concurrent subscriptions
that flatMap
will attempt to have to the Observables that the items emitted by the source
Observable map to. When it reaches this maximum number, it will wait for one of those Observables to
terminate before subscribing to another.
flatMap(Func1)
flatMap(Func1,int)
Another version of flatMap
creates (and flattens) a new Observable for each item
and notification from the source Observable.
A version of this variant of the operator (still in Beta as of this writing) takes an additional
int
parameter. This parameter sets the maximum number of concurrent subscriptions
that flatMap
will attempt to have to the Observables that the items emitted by the source
Observable map to. When it reaches this maximum number, it will wait for one of those Observables to
terminate before subscribing to another.
flatMap(Func1,Func1,Func0)
flatMap(Func1,Func1,Func0,int)
Another version combines items from the source Observable with the Observable triggered by those source items, and emits these combinations.
A version of this variant of the operator (still in Beta as of this writing) takes an additional
int
parameter. This parameter sets the maximum number of concurrent subscriptions
that flatMap
will attempt to have to the Observables that the items emitted by the source
Observable map to. When it reaches this maximum number, it will wait for one of those Observables to
terminate before subscribing to another.
flatMap(Func1,Func2)
flatMap(Func1,Func2,int)
The flatMapIterable
variants pair up source items and generated
Iterable
s rather than source items and generated Observables, but otherwise work
in much the same way.
flatMapIterable(Func1)
flatMapIterable(Func1,Func2)
There is also a concatMap
operator, which is like the simpler version of the
flatMap
operator, but it concatenates rather than merges the resulting
Observables in order to generate its own sequence.
concatMap(Func1)
RxJava also implements the switchMap
operator. It behaves much like
flatMap
, except that whenever a new item is emitted by the source Observable, it
will unsubscribe to and stop mirroring the Observable that was generated from the
previously-emitted item, and begin only mirroring the current one.
switchMap(Func1)
In the distinct StringObservable
class (not part of RxJava by default) there is also a
split
operator that converts an Observable of Strings into an Observable of Strings that
treats the source sequence as a stream and splits it on a specified regex boundary, then merges the
results of this split.
RxJS has a wealth of operators that perform FlatMap-like operations. In RxJS, the functions that transform items emitted by the source Observable into Observables typically take as parameters both the item and the index of the item in the Observable sequence.
RxJS implements the basic flatMap
operator. It has a variant that allows you to
apply a transformative function (an optional second parameter to flatMap
) to the
items emitted by the Observables generated for each item in the source Observable, before
merging and emitting those items.
flatMap
works just as well if the function you provide transforms items from the
source Observables into Observables, into Promises, or into arrays.
“selectMany
” is an alias for flatMap
.
var source = Rx.Observable .range(1, 2) .selectMany(function (x) { return Rx.Observable.range(x, 2); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 2 Next: 3 Completed
// Using a promise var source = Rx.Observable.of(1,2,3,4) .selectMany(function (x, i) { return Promise.resolve(x + i); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 3 Next: 5 Next: 7 Completed
// Using an array Rx.Observable.of(1,2,3) .flatMap( function (x, i) { return [x,i]; }, function (x, y, ix, iy) { return x + y + ix + iy; } ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 2 Next: 2 Next: 5 Next: 5 Next: 8 Next: 8 Completed
flatMap
is found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
The flatMapLatest
operator behaves much like the standard
FlatMap operator, except that whenever a new item is emitted
by the source Observable, it will unsubscribe to and stop mirroring the Observable that was
generated from the previously-emitted item, and begin only mirroring the current one.
“selectSwitch
” is an alias for flatMapLatest
.
var source = Rx.Observable .range(1, 2) .flatMapLatest(function (x) { return Rx.Observable.range(x, 2); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 3 Completed
flatMapLatest
is found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
flatMapObserver
creates (and flattens) a new Observable for each item and
notification from the source Observable. It accepts a different transformation function
to respond to onNext
, onError
, and onCompleted
notifications and to return an Observable for each.
“selectManyObserver
” is an alias for flatMapObserver
.
var source = Rx.Observable.range(1, 3) .flatMapObserver( function (x, i) { return Rx.Observable.repeat(x, i); }, function (err) { return Rx.Observable.return(42); }, function () { return Rx.Observable.empty(); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 2 Next: 3 Next: 3 Completed
flatMapObserver
is found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
There is also a concatMap
operator, which is like the flatMap
operator, but it concatenates rather than merges the resulting Observables in order to
generate its own sequence.
As with flatMap
, concatMap
works just as well if the function you
provide transforms items from the source Observables into Observables, into Promises, or into
arrays.
“selectConcat
” is an alias for concatMap
.
concatMap
is found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
The for
operator (and its alias, forIn
) is very similar to
concatMap
, though it has a converse flexibility. While concatMap
operates on an Observable source and can use Observable, Promise, or array intermediaries to
generate its output sequence; for
always uses Observables as its intermediaries,
but can operate on a source that is either an Observable, a Promise, or an array.
concatMap
is found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.experimental.js
(requires either rx.js
, rx.compat.js
, rx.lite.js
, or rx.lite.compat.js
)
There is also a concatMapObserver
operator, which is like the
flatMapObserver
operator, in that it creates Observables to merge from both the
emissions and terminal notifications of the source Observable, but it concatenates rather than
merges these resulting Observables in order to generate its own sequence.
“selectConcatObserver
” is an alias for
concatMapObserver
.
concatMapObserver
is found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
The manySelect
operator is often described as a “comonadic bind.” If that clears
things up for you, you’re welcome. Elsewise, here’s an explanation:
manySelect
internally transforms each item emitted by the source Observable into an
Observable that emits that item and all items subsequently emitted by the source Observable, in the
same order. So, for example, it internally transforms an Observable that emits the numbers 1,2,3 into
three Observables: one that emits 1,2,3, one that emits 2,3, and one that emits 3.
Then manySelect
passes each of these Observables into a function that you provide, and emits,
as the emissions from the Observable that manySelect
returns, the return values from those
function calls.
In this way, each item emitted by the resulting Observable is a function of the corresponding item in the source Observable and all of the items emitted by the source Observable after it.
manySelect
is found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.experimental.js
manySelect
requires one of the following distributions:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxPHP implements this operator as flatMap
.
Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/flatMap/flatMap.php $loop = React\EventLoop\Factory::create(); $scheduler = new Rx\Scheduler\EventLoopScheduler($loop); $observable = Rx\Observable::range(1, 5); $selectManyObservable = $observable->flatMap(function ($value) { return Rx\Observable::range(1, $value); }, $scheduler); $disposable = $selectManyObservable->subscribe($stdoutObserver, $scheduler); $loop->run();
Next value: 1 Next value: 1 Next value: 2 Next value: 1 Next value: 2 Next value: 1 Next value: 2 Next value: 3 Next value: 1 Next value: 2 Next value: 3 Next value: 3 Next value: 4 Next value: 4 Next value: 5 Complete!
RxPHP also has an operator selectMany
.
Alias for flatMap
RxPHP also has an operator concatMap
.
Projects each element of an observable sequence to an observable sequence and concatenates the resulting observable sequences into one observable sequence.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatMap.php $loop = \React\EventLoop\Factory::create(); $scheduler = new \Rx\Scheduler\EventLoopScheduler($loop); $source = Rx\Observable::range(0, 5) ->concatMap(function ($x, $i) use ($scheduler) { return \Rx\Observable::interval(100, $scheduler) ->take($x) ->map(function () use ($i) { return $i; }); }); $subscription = $source->subscribe($stdoutObserver); $loop->run();
Next value: 1 Next value: 2 Next value: 2 Next value: 3 Next value: 3 Next value: 3 Next value: 4 Next value: 4 Next value: 4 Next value: 4 Complete!
RxPHP also has an operator concatMapTo
.
Projects each element of the source observable sequence to the other observable sequence and merges the resulting observable sequences into one observable sequence.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatMapTo.php $loop = \React\EventLoop\Factory::create(); $scheduler = new \Rx\Scheduler\EventLoopScheduler($loop); $obs = \Rx\Observable::interval(100, $scheduler) ->take(3) ->mapWithIndex(function ($i) { return $i; }); $source = Rx\Observable::range(0, 5) ->concatMapTo($obs); $subscription = $source->subscribe($stdoutObserver); $loop->run();
Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Next value: 6 Next value: 7 Next value: 8 Next value: 9 Next value: 10 Next value: 11 Next value: 12 Next value: 13 Next value: 14 Complete!