You can combine the output of multiple Observables so that they act like a single Observable, by using the Merge operator.
Merge may interleave the items emitted by the merged Observables (a similar operator, Concat, does not interleave items, but emits all of each source Observable’s items in turn before beginning to emit items from the next source Observable).
As shown in the above diagram, an onError
notification from any of the source
Observables will immediately be passed through to observers and will terminate the merged
Observable.
In many ReactiveX implementations there is a second operator,
MergeDelayError, that changes this behavior — reserving
onError
notifications until all of the merged Observables complete and only then
passing it along to the observers:
In RxClojure there are six operators of concern here:
merge
converts two or more Observables into a single Observable that emits all
of the items emitted by all of those Observables.
merge*
converts an Observable that emits Observables into a single Observable
that emits all of the items emitted by all of the emitted Observables.
merge-delay-error
is like merge
, but will emit all items from all
of the merged Observables even if one or more of those Observables terminates with an
onError
notification while emissions are still pending.
merge-delay-error*
is a similarly-modified version of merge*
.
interleave
is like merge
, but more deliberate about how it
interleaves the items from the source Observables: the resulting Observable emits the first
item emitted by the first source Observable, then the first item emitted by the second source
Observable, and so forth, and having reached the last source Observable, then emits the
second item emitted by the first source Observable, the second item emitted by the second
source Observable, and so forth, until all of the source Observables terminate.
interleave*
is similar but operates on an Observable of Observables.
RxCpp implements this operator as merge
.
RxGroovy implements this operator as merge
, mergeWith
, and
mergeDelayError
.
For example, the following code merges the odds
and evens
into a
single Observable. (The subscribeOn
operator makes odds
operate
on a different thread from evens
so that the two Observables may both emit items
at the same time, to demonstrate how Merge may interleave these
items.)
odds = Observable.from([1, 3, 5, 7]).subscribeOn(someScheduler); evens = Observable.from([2, 4, 6]); Observable.merge(odds,evens).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
1 3 2 5 4 7 6 Sequence complete
Instead of passing multiple Observables (up to nine) into merge
, you could also
pass in a List<>
(or other Iterable) of Observables, an Array of
Observables, or even an Observable that emits Observables, and merge
will merge
their output into the output of a single Observable:
merge(Iterable)
merge(Iterable,int)
merge(Observable[])
merge(Observable[], int)
(RxGroovy 1.1)merge(Observable, Observable)
(there are also versions that take up to nine Observables)
If you pass in an Observable of Observables, you have the option of also passing in a value
indicating to merge
the maximum number of those Observables it should attempt to
be subscribed to simultaneously. Once it reaches this maximum subscription count, it will
refrain from subscribing to any other Observables emitted by the source Observable until such
time as one of the already-subscribed-to Observables issues an onCompleted
notification.
The instance version of merge
is mergeWith
, so, for example, in the
code sample above, instead of writing Observable.merge(odds,evens)
you could also
write odds.mergeWith(evens)
.
mergeWith(Observable)
If any of the individual Observables passed into merge
terminates with an
onError
notification, the Observable produced by merge
itself will
immediately terminate with an onError
notification. If you would prefer a merge
that continues emitting the results of the remaining, error-free Observables before reporting
the error, use mergeDelayError
instead.
mergeDelayError
behaves much like merge
. The exception is when one
of the Observables being merged terminates with an onError
notification. If this
happens with merge
, the merged Observable will immediately issue an
onError
notification and terminate. mergeDelayError
, on the other
hand, will hold off on reporting the error until it has given any other non-error-producing
Observables that it is merging a chance to finish emitting their items, and it will emit
those itself, and will only terminate with an onError
notification when all of
the other merged Observables have finished.
Because it is possible that more than one of the merged Observables encountered an error,
mergeDelayError
may pass information about multiple errors in the
onError
notification (it will never invoke the observer’s
onError
method more than once). For this reason, if you want to know the nature
of these errors, you should write your observers’ onError
methods so that
they accept a parameter of the class CompositeException
.
mergeDelayError
has fewer variants. You cannot pass it an Iterable or Array of
Observables, but you can pass it an Observable that emits Observables or between one and
nine individual Observables as parameters. There is not an instance method version of
mergeDelayError
as there is for merge
.
mergeDelayError(Observable<Observable>)
mergeDelayError(Observable,Observable)
(there are also versions that take up to nine Observables)
RxJava implements this operator as merge
, mergeWith
, and
mergeDelayError
.
Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler); Observable<Integer> evens = Observable.just(2, 4, 6); Observable.merge(odds, evens) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Next: 1 Next: 3 Next: 5 Next: 2 Next: 4 Next: 6 Sequence complete.
merge(Iterable)
merge(Iterable,int)
merge(Observable[])
merge(Observable[], int)
(RxJava 1.1)merge(Observable, Observable)
(there are also versions that take up to nine Observables)
Instead of passing multiple Observables (up to nine) into merge
, you could also
pass in a List<>
(or other Iterable) of Observables, an Array of
Observables, or even an Observable that emits Observables, and merge
will merge
their output into the output of a single Observable:
If you pass in an Observable of Observables, you have the option of also passing in a value
indicating to merge
the maximum number of those Observables it should attempt to
be subscribed to simultaneously. Once it reaches this maximum subscription count, it will
refrain from subscribing to any other Observables emitted by the source Observable until such
time as one of the already-subscribed-to Observables issues an onCompleted
notification.
merge(Observable<Observable>)
merge(Observable<Observable>, int)
(RxJava 1.1)
The instance version of merge
is mergeWith
, so, for example,
instead of writing Observable.merge(odds,evens)
you could also write
odds.mergeWith(evens)
.
If any of the individual Observables passed into merge
terminates with an
onError
notification, the Observable produced by merge
itself will
immediately terminate with an onError
notification. If you would prefer a merge
that continues emitting the results of the remaining, error-free Observables before reporting
the error, use mergeDelayError
instead.
mergeDelayError
behaves much like merge
. The exception is when one
of the Observables being merged terminates with an onError
notification. If this
happens with merge
, the merged Observable will immediately issue an
onError
notification and terminate. mergeDelayError
, on the other
hand, will hold off on reporting the error until it has given any other non-error-producing
Observables that it is merging a chance to finish emitting their items, and it will emit
those itself, and will only terminate with an onError
notification when all of
the other merged Observables have finished.
Because it is possible that more than one of the merged Observables encountered an error,
mergeDelayError
may pass information about multiple errors in the
onError
notification (it will never invoke the observer’s
onError
method more than once). For this reason, if you want to know the nature
of these errors, you should write your observers’ onError
methods so that
they accept a parameter of the class CompositeException
.
mergeDelayError
has fewer variants. You cannot pass it an Iterable or Array of
Observables, but you can pass it an Observable that emits Observables or between one and
nine individual Observables as parameters. There is not an instance method version of
mergeDelayError
as there is for merge
.
mergeDelayError(Observable<Observable>)
mergeDelayError(Observable,Observable)
(there are also versions that take up to nine Observables)
The first variant of merge
is an instance operator that takes a variable number
of Observables as parameters, merging each of these Observables with the source (instance)
Observables to produce its single output Observable.
This first variant of merge
is found in the following distributions:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
The second variant of merge
is a prototype (class) operator that accepts two
parameters. The second of these is an Observable that emits the Observables you want to merge.
The first is a number that indicates the maximum number of these emitted Observables that you
want merge
to attempt to be subscribed to at any moment. Once it reaches this
maximum subscription count, it will refrain from subscribing to any other Observables emitted
by the source Observable until such time as one of the already-subscribed-to Observables
issues an onCompleted
notification.
This second variant of merge
is found in the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
mergeAll
is like this second variant of merge
except that it does
not allow you to set this maximum subscription count. It only takes the single parameter of
an Observable of Observables.
mergeAll
is found in the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
If any of the individual Observables passed into merge
or mergeAll
terminates
with an onError
notification, the resulting Observable will immediately terminate with an
onError
notification. If you would prefer a merge that continues emitting the results of the
remaining, error-free Observables before reporting the error, use mergeDelayError
instead.
var source1 = Rx.Observable.of(1,2,3); var source2 = Rx.Observable.throwError(new Error('whoops!')); var source3 = Rx.Observable.of(4,5,6); var merged = Rx.Observable.mergeDelayError(source1, source2, source3); var subscription = merged.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); } function () { console.log('Completed' } );
1 2 3 4 5 6 Error: Error: whoops!
mergeDelayError
is found in the following distributions:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxKotlin implements this operator as merge
, mergeWith
, and
mergeDelayError
.
Instead of passing multiple Observables (up to nine) into merge
, you could also
pass in a List<>
(or other Iterable) of Observables, an Array of
Observables, or even an Observable that emits Observables, and merge
will merge
their output into the output of a single Observable:
If you pass in an Observable of Observables, you have the option of also passing in a value
indicating to merge
the maximum number of those Observables it should attempt to
be subscribed to simultaneously. Once it reaches this maximum subscription count, it will
refrain from subscribing to any other Observables emitted by the source Observable until such
time as one of the already-subscribed-to Observables issues an onCompleted
notification.
The instance version of merge
is mergeWith
, so, for example,
instead of writing Observable.merge(odds,evens)
you could also write
odds.mergeWith(evens)
.
If any of the individual Observables passed into merge
terminates with an
onError
notification, the Observable produced by merge
itself will
immediately terminate with an onError
notification. If you would prefer a merge
that continues emitting the results of the remaining, error-free Observables before reporting
the error, use mergeDelayError
instead.
mergeDelayError
behaves much like merge
. The exception is when one
of the Observables being merged terminates with an onError
notification. If this
happens with merge
, the merged Observable will immediately issue an
onError
notification and terminate. mergeDelayError
, on the other
hand, will hold off on reporting the error until it has given any other non-error-producing
Observables that it is merging a chance to finish emitting their items, and it will emit
those itself, and will only terminate with an onError
notification when all of
the other merged Observables have finished.
Because it is possible that more than one of the merged Observables encountered an error,
mergeDelayError
may pass information about multiple errors in the
onError
notification (it will never invoke the observer’s
onError
method more than once). For this reason, if you want to know the nature
of these errors, you should write your observers’ onError
methods so that
they accept a parameter of the class CompositeException
.
mergeDelayError
has fewer variants. You cannot pass it an Iterable or Array of
Observables, but you can pass it an Observable that emits Observables or between one and
nine individual Observables as parameters. There is not an instance method version of
mergeDelayError
as there is for merge
.
Rx.NET implements this operator as Merge
.
You can pass Merge
an Array of Observables, an Enumerable of Observables, an
Observable of Observables, or two individual Observables.
If you pass an Enumerable or Observable of Observables, you have the option of also passing
in an integer indicating the maximum number of those Observables it should attempt to be
subscribed to simultaneously. Once it reaches this maximum subscription count, it will
refrain from subscribing to any other Observables emitted by the source Observable until such
time as one of the already-subscribed-to Observables issues an onCompleted
notification.
RxPHP implements this operator as merge
.
Merges all the observable sequences into a single observable sequence.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/merge/merge.php $loop = React\EventLoop\Factory::create(); $scheduler = new Rx\Scheduler\EventLoopScheduler($loop); $observable = Rx\Observable::just(42)->repeat(); $otherObservable = Rx\Observable::just(21)->repeat(); $mergedObservable = $observable ->merge($otherObservable) ->take(10); $disposable = $mergedObservable->subscribe($stdoutObserver, $scheduler); $loop->run();
Next value: 42 Next value: 21 Next value: 42 Next value: 21 Next value: 42 Next value: 21 Next value: 42 Next value: 21 Next value: 42 Next value: 21 Complete!
RxPHP also has an operator mergeAll
.
Merges an observable sequence of observables into an observable sequence.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/merge/merge-all.php $loop = React\EventLoop\Factory::create(); $scheduler = new Rx\Scheduler\EventLoopScheduler($loop); $sources = Rx\Observable::range(0, 3) ->map(function ($x) { return Rx\Observable::range($x, 3); }); $merged = \Rx\Observable::mergeAll($sources); $disposable = $merged->subscribe($stdoutObserver, $scheduler); $loop->run();
Next value: 0 Next value: 1 Next value: 1 Next value: 2 Next value: 2 Next value: 2 Next value: 3 Next value: 3 Next value: 4 Complete!
RxPY implements this operator as merge
and
merge_all
/merge_observable
.
You can either pass merge
a set of Observables as individual parameters, or as
a single parameter containing an array of those Observables.
merge_all
and its alias merge_observable
take as their single
parameter an Observable that emits Observables. They merge the emissions of all of these
Observables to create their own Observable.
Rx.rb implements this operator as merge
, merge_concurrent
, and
merge_all
.
merge
merges a second Observable into the one it is operating on to create a
new merged Observable.
merge_concurrent
operates on an Observable that emits Observables, merging the
emissions from each of these Observables into its own emissions. You can optionally pass it
an integer parameter indicating how many of these emitted Observables
merge_concurrent
should try to subscribe to concurrently. Once it reaches this
maximum subscription count, it will refrain from subscribing to any other Observables emitted
by the source Observable until such time as one of the already-subscribed-to Observables
issues an onCompleted
notification. The default is 1, which makes it equivalent
to merge_all
.
merge_all
is like merge_concurrent(1)
. It subscribes to each
emitted Observable one at a time, mirroring its emissions as its own, and waiting to
subscribe to the next Observable until the present one terminates with an
onCompleted
notification. In this respect it is more like a
Concat variant.
RxScala implements this operator as flatten
, flattenDelayError
,
merge
, and mergeDelayError
.
merge
takes a second Observable as a parameter and merges that Observable with
the one the merge
operator is applied to in order to create a new output
Observable.
mergeDelayError
is similar to merge
except that it will always
emit all items from both Observables even if one of the Observables terminates with an
onError
notification before the other Observable has finished emitting items.
flatten
takes as its parameter an Observable that emits Observables. It merges
the items emitted by each of these Observables to create its own single Observable sequence.
A variant of this operator allows you to pass in an Int
indicating the maximum
number of these emitted Observables you want flatten
to try to be subscribed to
at any time. It it hits this maximum subscription count, it will refrain from subscribing to
any other Observables emitted by the source Observable until such time as one of the
already-subscribed-to Observables issues an onCompleted
notification.
flattenDelayError
is similar to flatten
except that it will always
emit all items from all of the emitted Observables even if one or more of those Observables
terminates with an onError
notification before the other Observables have
finished emitting items.
RxSwift implements this operator as merge
.
merge
takes as its parameter an Observable that emits Observables. It merges
the items emitted by each of these Observables to create its own single Observable sequence.
A variant of this operator merge(maxConcurrent:)
allows you to pass in an
Int
indicating the maximum number of these emitted Observables you want
merge
to try to be subscribed to at any time. If it hits this maximum
subscription count, it will refrain from subscribing to any other Observables emitted by
the source Observable until such time as one of the already-subscribed-to Observables
issues an onCompleted
notification.
let subject1 = PublishSubject() let subject2 = PublishSubject () Observable.of(subject1, subject2) .merge() .subscribe { print($0) } subject1.on(.Next(10)) subject1.on(.Next(11)) subject1.on(.Next(12)) subject2.on(.Next(20)) subject2.on(.Next(21)) subject1.on(.Next(14)) subject1.on(.Completed) subject2.on(.Next(22)) subject2.on(.Completed)
Next(10) Next(11) Next(12) Next(20) Next(21) Next(14) Next(22) Completed