Operators
Mathematical and Aggregate
Concat
Concat
emit the emissions from two or more Observables without interleaving them
The Concat operator concatenates the output of multiple
Observables so that they act like a single Observable, with all of the items emitted by the
first Observable being emitted before any of the items emitted by the second Observable (and
so forth, if there are more than two).
Concat waits to subscribe to each additional Observable that you
pass to it until the previous Observable completes. Note that because of this, if you try to
concatenate a “hot” Observable, that is, one that begins emitting items immediately
and before it is subscribed to, Concat will not see, and therefore
will not emit, any items that Observable emits before all previous Observables complete and
Concat subscribes to the “hot” Observable.
In some ReactiveX implementations there is also a ConcatMap
operator (a.k.a. concat_all
, concat_map
,
concatMapObserver
, for
, forIn
/for_in
,
mapcat
, selectConcat
, or selectConcatObserver
) that
transforms the items emitted by a source Observable into corresponding Observables and then
concatenates the items emitted by each of these Observables in the order in which they are
observed and transformed.
The StartWith operator is similar to
Concat , but prepends , rather than appends, items or
emissions of items to those emitted by a source Observable.
The Merge operator is also similar. It
combines the emissions of two or more Observables, but may interleave them, whereas
Concat never interleaves the emissions from multiple Observables.
See Also
Language-Specific Information:
In RxClojure concat
concatenates some number of individual Observables together
in the order in which they are given.
The concat*
operator concatenates the Observables emitted by an Observable
together, in the order in which they are emitted.
RxCpp implements this operator as concat
:
RxGroovy implements this operator as concat
. There are variants of this operator
that take between two and nine Observables as parameters, and that concatenate them in the
order they appear in the parameter list. There is also a variant that takes as a parameter an
Observable of Observables, and concatenates each of these Observables in the order that they
are emitted.
Sample Code
odds = Observable.from([1, 3, 5, 7]);
evens = Observable.from([2, 4, 6]);
Observable.concat(odds, evens).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
1
3
5
7
2
4
6
Sequence complete
There is also an instance method, concatWith
, such that
Observable.concat(a,b)
is equivalent to a.concatWith(b)
.
RxJava implements this operator as concat
. There are variants of this operator
that take between two and nine Observables as parameters, and that concatenate them in the
order they appear in the parameter list. There is also a variant that takes as a parameter an
Observable of Observables, and concatenates each of these Observables in the order that they
are emitted.
There is also an instance method, concatWith
, such that
Observable.concat(a,b)
is equivalent to a.concatWith(b)
.
RxJS implements this operator as concat
and concatAll
.
concat
takes a variable number of Observables (or Promises) as parameters (or a
single array of Observables or Promises), and concatenates them in the order they appear in
the parameter list (or array). It exists as both an Observable prototype method and as an
instance method.
concatAll
is an instance method that operates on an Observable of Observables,
concatinating each of these Observables in the order they are emitted.
concat
and concatAll
are found in each of the following
distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxKotlin implements this operator as concat
. There are variants of this operator
that take between two and nine Observables as parameters, and that concatenate them in the
order they appear in the parameter list. There is also a variant that takes as a parameter an
Observable of Observables, and concatenates each of these Observables in the order that they
are emitted.
There is also an instance method, concatWith
, such that
Observable.concat(a,b)
is equivalent to a.concatWith(b)
.
Rx.NET implements this operator as Concat
. It accepts either an enumerable of
Observables, an Observable of Observables, or two Observables as parameters, and concatenates
these in the order given.
RxPHP implements this operator as concat
.
Concatenates all the observable sequences.
Sample Code
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concat.php
$source1 = \Rx\Observable::just(42);
$source2 = \Rx\Observable::just(56);
$source = \Rx\Observable::emptyObservable()->concat($source1)->concat($source2);
$subscription = $source->subscribe($stdoutObserver);
Next value: 42
Next value: 56
Complete!
RxPHP also has an operator concatAll
.
Concatenates a sequence of observable sequences into a single observable sequence.
Sample Code
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatAll.php
$source = Rx\Observable::range(0, 3)
->map(function ($x) {
return \Rx\Observable::range($x, 3);
})
->concatAll();
$subscription = $source->subscribe($stdoutObserver);
Next value: 0
Next value: 1
Next value: 2
Next value: 1
Next value: 2
Next value: 3
Next value: 2
Next value: 3
Next value: 4
Complete!
In RxPY concat
takes a variable number of Observables as parameters (or an
array of Observables), and concatenates them in the order they appear in the parameter list
(or array).
concatAll
operates on an Observable of Observables, concatinating each of these
Observables in the order they are emitted.
In Rx.rb, the concat
operator operates on two Observables as an instance
operator, or on an array of Observables as a class method.
The merge_all
operator, despite its name, really behaves like a
Concat variant in Rx.rb. It accepts an Observable of
Observables as its parameter, and concatenates the emissions from these Observables.
RxScala implements this operator in two ways. There is a concat
operator that
accepts an Observable of Observables as its parameter, and then concatenates each of these
Observables in the order they are emitted. There is also a ++
operator that
concatenates one Observable to another.