FlatMap

transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable

FlatMap

The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.

This method is useful, for example, when you have an Observable that emits a series of items that themselves have Observable members or are in other ways transformable into Observables, so that you can create a new Observable that emits the complete collection of items emitted by the sub-Observables of these items.

Note that FlatMap merges the emissions of these Observables, so that they may interleave.

In several of the language-specific implementations there is also an operator that does not interleave the emissions from the transformed Observables, but instead emits these emissions in strict order, often called ConcatMap or something similar.

See Also

Language-Specific Information:

flatMap

RxGroovy implements the flatMap operator.

Sample Code

// this closure is an Observable that emits three numbers
numbers   = Observable.from([1, 2, 3]);
// this closure is an Observable that emits two numbers based on what number it is passed
multiples = { n -> Observable.from([ n*2, n*3 ]) };

numbers.flatMap(multiples).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
2
3
4
6
6
9
Sequence complete

Note that if any of the individual Observables mapped to the items from the source Observable by flatMap aborts by invoking onError, the Observable produced by flatMap will itself immediately abort and invoke onError.

A version of this variant of the operator (still in Beta as of this writing) takes an additional int parameter. This parameter sets the maximum number of concurrent subscriptions that flatMap will attempt to have to the Observables that the items emitted by the source Observable map to. When it reaches this maximum number, it will wait for one of those Observables to terminate before subscribing to another.

flatMap

Another version of flatMap creates (and flattens) a new Observable for each item and notification from the source Observable.

A version of this variant of the operator (still in Beta as of this writing) takes an additional int parameter. This parameter sets the maximum number of concurrent subscriptions that flatMap will attempt to have to the Observables that the items emitted by the source Observable map to. When it reaches this maximum number, it will wait for one of those Observables to terminate before subscribing to another.

flatMap

Another version combines items from the source Observable with the Observable triggered by those source items, and emits these combinations.

A version of this variant of the operator (still in Beta as of this writing) takes an additional int parameter. This parameter sets the maximum number of concurrent subscriptions that flatMap will attempt to have to the Observables that the items emitted by the source Observable map to. When it reaches this maximum number, it will wait for one of those Observables to terminate before subscribing to another.

flatMapIterable

The flatMapIterable variants pair up source items and generated Iterables rather than source items and generated Observables, but otherwise work in much the same way.

concatMap

There is also a concatMap operator, which is like the simpler version of the flatMap operator, but it concatenates rather than merges the resulting Observables in order to generate its own sequence.

switchMap

RxGroovy also implements the switchMap operator. It behaves much like flatMap, except that whenever a new item is emitted by the source Observable, it will unsubscribe to and stop mirroring the Observable that was generated from the previously-emitted item, and begin only mirroring the current one.

split

In the distinct StringObservable class (not part of RxGroovy by default) there is also a split operator that converts an Observable of Strings into an Observable of Strings that treats the source sequence as a stream and splits it on a specified regex boundary, then merges the results of this split.

See Also

flatMap

RxJava implements the flatMap operator.

Note that if any of the individual Observables mapped to the items from the source Observable by flatMap aborts by invoking onError, the Observable produced by flatMap will itself immediately abort and invoke onError.

A version of this variant of the operator (still in Beta as of this writing) takes an additional int parameter. This parameter sets the maximum number of concurrent subscriptions that flatMap will attempt to have to the Observables that the items emitted by the source Observable map to. When it reaches this maximum number, it will wait for one of those Observables to terminate before subscribing to another.

flatMap

Another version of flatMap creates (and flattens) a new Observable for each item and notification from the source Observable.

A version of this variant of the operator (still in Beta as of this writing) takes an additional int parameter. This parameter sets the maximum number of concurrent subscriptions that flatMap will attempt to have to the Observables that the items emitted by the source Observable map to. When it reaches this maximum number, it will wait for one of those Observables to terminate before subscribing to another.

flatMap

Another version combines items from the source Observable with the Observable triggered by those source items, and emits these combinations.

A version of this variant of the operator (still in Beta as of this writing) takes an additional int parameter. This parameter sets the maximum number of concurrent subscriptions that flatMap will attempt to have to the Observables that the items emitted by the source Observable map to. When it reaches this maximum number, it will wait for one of those Observables to terminate before subscribing to another.

flatMapIterable

The flatMapIterable variants pair up source items and generated Iterables rather than source items and generated Observables, but otherwise work in much the same way.

concatMap

There is also a concatMap operator, which is like the simpler version of the flatMap operator, but it concatenates rather than merges the resulting Observables in order to generate its own sequence.

switchMap

RxJava also implements the switchMap operator. It behaves much like flatMap, except that whenever a new item is emitted by the source Observable, it will unsubscribe to and stop mirroring the Observable that was generated from the previously-emitted item, and begin only mirroring the current one.

split

In the distinct StringObservable class (not part of RxJava by default) there is also a split operator that converts an Observable of Strings into an Observable of Strings that treats the source sequence as a stream and splits it on a specified regex boundary, then merges the results of this split.

RxJS has a wealth of operators that perform FlatMap-like operations. In RxJS, the functions that transform items emitted by the source Observable into Observables typically take as parameters both the item and the index of the item in the Observable sequence.

flatMap

RxJS implements the basic flatMap operator. It has a variant that allows you to apply a transformative function (an optional second parameter to flatMap) to the items emitted by the Observables generated for each item in the source Observable, before merging and emitting those items.

flatMap works just as well if the function you provide transforms items from the source Observables into Observables, into Promises, or into arrays.

selectMany” is an alias for flatMap.

Sample Code

var source = Rx.Observable
    .range(1, 2)
    .selectMany(function (x) {
        return Rx.Observable.range(x, 2);
    });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 2
Next: 2
Next: 3
Completed
// Using a promise
var source = Rx.Observable.of(1,2,3,4)
    .selectMany(function (x, i) {
        return Promise.resolve(x + i);
    });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 3
Next: 5
Next: 7
Completed
// Using an array
Rx.Observable.of(1,2,3)
  .flatMap(
    function (x, i) { return [x,i]; },
    function (x, y, ix, iy) { return x + y + ix + iy; }
  );

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 2
Next: 2
Next: 5
Next: 5
Next: 8
Next: 8
Completed

flatMap is found in each of the following distributions:

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
flatMapLatest

The flatMapLatest operator behaves much like the standard FlatMap operator, except that whenever a new item is emitted by the source Observable, it will unsubscribe to and stop mirroring the Observable that was generated from the previously-emitted item, and begin only mirroring the current one.

selectSwitch” is an alias for flatMapLatest.

Sample Code

var source = Rx.Observable
    .range(1, 2)
    .flatMapLatest(function (x) {
        return Rx.Observable.range(x, 2);
    });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 2
Next: 3
Completed

flatMapLatest is found in each of the following distributions:

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
flatMapObserver

flatMapObserver creates (and flattens) a new Observable for each item and notification from the source Observable. It accepts a different transformation function to respond to onNext, onError, and onCompleted notifications and to return an Observable for each.

selectManyObserver” is an alias for flatMapObserver.

Sample Code

var source = Rx.Observable.range(1, 3)
    .flatMapObserver(
        function (x, i) {
            return Rx.Observable.repeat(x, i);
        },
        function (err) {
            return Rx.Observable.return(42);
        },
        function () {
            return Rx.Observable.empty();
        });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 2
Next: 3
Next: 3
Completed

flatMapObserver is found in each of the following distributions:

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
concatMap

There is also a concatMap operator, which is like the flatMap operator, but it concatenates rather than merges the resulting Observables in order to generate its own sequence.

As with flatMap, concatMap works just as well if the function you provide transforms items from the source Observables into Observables, into Promises, or into arrays.

selectConcat” is an alias for concatMap.

concatMap is found in each of the following distributions:

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
for

The for operator (and its alias, forIn) is very similar to concatMap, though it has a converse flexibility. While concatMap operates on an Observable source and can use Observable, Promise, or array intermediaries to generate its output sequence; for always uses Observables as its intermediaries, but can operate on a source that is either an Observable, a Promise, or an array.

concatMap is found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.experimental.js (requires either rx.js, rx.compat.js, rx.lite.js, or rx.lite.compat.js)
concatMapObserver

There is also a concatMapObserver operator, which is like the flatMapObserver operator, in that it creates Observables to merge from both the emissions and terminal notifications of the source Observable, but it concatenates rather than merges these resulting Observables in order to generate its own sequence.

selectConcatObserver” is an alias for concatMapObserver.

concatMapObserver is found in each of the following distributions:

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
manySelect

The manySelect operator is often described as a “comonadic bind.” If that clears things up for you, you’re welcome. Elsewise, here’s an explanation:

manySelect internally transforms each item emitted by the source Observable into an Observable that emits that item and all items subsequently emitted by the source Observable, in the same order. So, for example, it internally transforms an Observable that emits the numbers 1,2,3 into three Observables: one that emits 1,2,3, one that emits 2,3, and one that emits 3.

Then manySelect passes each of these Observables into a function that you provide, and emits, as the emissions from the Observable that manySelect returns, the return values from those function calls.

In this way, each item emitted by the resulting Observable is a function of the corresponding item in the source Observable and all of the items emitted by the source Observable after it.

manySelect is found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.experimental.js

manySelect requires one of the following distributions:

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

See Also

RxPHP implements this operator as flatMap.

Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/flatMap/flatMap.php

$loop      = React\EventLoop\Factory::create();
$scheduler = new Rx\Scheduler\EventLoopScheduler($loop);

$observable = Rx\Observable::range(1, 5);

$selectManyObservable = $observable->flatMap(function ($value) {
    return Rx\Observable::range(1, $value);
}, $scheduler);

$disposable = $selectManyObservable->subscribe($stdoutObserver, $scheduler);

$loop->run();

   
Next value: 1
Next value: 1
Next value: 2
Next value: 1
Next value: 2
Next value: 1
Next value: 2
Next value: 3
Next value: 1
Next value: 2
Next value: 3
Next value: 3
Next value: 4
Next value: 4
Next value: 5
Complete!
    

RxPHP also has an operator selectMany.

Alias for flatMap

RxPHP also has an operator concatMap.

Projects each element of an observable sequence to an observable sequence and concatenates the resulting observable sequences into one observable sequence.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatMap.php

$loop      = \React\EventLoop\Factory::create();
$scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);

$source = Rx\Observable::range(0, 5)
    ->concatMap(function ($x, $i) use ($scheduler) {
        return \Rx\Observable::interval(100, $scheduler)
            ->take($x)
            ->map(function () use ($i) {
                return $i;
            });
    });

$subscription = $source->subscribe($stdoutObserver);

$loop->run();

   
Next value: 1
Next value: 2
Next value: 2
Next value: 3
Next value: 3
Next value: 3
Next value: 4
Next value: 4
Next value: 4
Next value: 4
Complete!
    

RxPHP also has an operator concatMapTo.

Projects each element of the source observable sequence to the other observable sequence and merges the resulting observable sequences into one observable sequence.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatMapTo.php

$loop      = \React\EventLoop\Factory::create();
$scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);

$obs = \Rx\Observable::interval(100, $scheduler)
    ->take(3)
    ->mapWithIndex(function ($i) {
        return $i;
    });

$source = Rx\Observable::range(0, 5)
    ->concatMapTo($obs);

$subscription = $source->subscribe($stdoutObserver);

$loop->run();

   
Next value: 0
Next value: 1
Next value: 2
Next value: 3
Next value: 4
Next value: 5
Next value: 6
Next value: 7
Next value: 8
Next value: 9
Next value: 10
Next value: 11
Next value: 12
Next value: 13
Next value: 14
Complete!