The Reduce operator applies a function to the first item emitted by the source Observable and then feeds the result of the function back into the function along with the second item emitted by the source Observable, continuing this process until the source Observable emits its final item and completes, whereupon the Observable returned from Reduce emits the final value returned from the function.
This sort of operation is sometimes called “accumulate,” “aggregate,” “compress,” “fold,” or “inject” in other contexts.
The reduce
operator returns an Observable that applies a function of your choosing to the
first item emitted by a source Observable, then feeds the result of that function along with the second
item emitted by the source Observable into the same function, then feeds the result of that function
along with the third item into the same function, and so on until all items have been emitted by the
source Observable. Then it emits the final result from the final call to your function as the sole output
from the returned Observable.
Note that if the source Observable does not emit any items, reduce
will fail with an
IllegalArgumentException
.
For example, the following code uses reduce
to compute, and then emit as an Observable, the
sum of the numbers emitted by the source Observable:
numbers = Observable.from([1, 2, 3, 4, 5]); numbers.reduce({ a, b -> a+b }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
15 Sequence complete
reduce
does not by default operate on any particular
Scheduler.
reduce(Func2)
There is also a version of reduce
to which you can pass a seed item in addition to an
accumulator function. Note that passing a null
seed is not the same as not passing a seed.
The behavior will be different. If you pass a seed of null
, you will be seeding your
reduction with the item null
. Note also that if you do pass in a seed, and the source
Observable emits no items, reduce will emit the seed and complete normally without error.
reduce
does not by default operate on any particular
Scheduler.
reduce(R,Func2)
It is a bad idea to use reduce
to collect emitted items into a mutable data structure.
Instead, use collect
for that purpose.
The collect
operator is similar to reduce
but is specialized for the purpose
of collecting the whole set of items emitted by the source Observable into a single mutable data
structure to be emitted by the resulting Observable. Pass it two parameters:
collect
does not by default operate on any particular
Scheduler.
collect(Func0,Action2)
The reduce
operator returns an Observable that applies a function of your choosing to the
first item emitted by a source Observable, then feeds the result of that function along with the second
item emitted by the source Observable into the same function, then feeds the result of that function
along with the third item into the same function, and so on until all items have been emitted by the
source Observable. Then it emits the final result from the final call to your function as the sole output
from the returned Observable.
Note that if the source Observable does not emit any items, reduce
will fail with an
IllegalArgumentException
.
reduce
does not by default operate on any particular
Scheduler.
reduce(Func2)
There is also a version of reduce
to which you can pass a seed item in addition to an
accumulator function. Note that passing a null
seed is not the same as not passing a seed.
The behavior will be different. If you pass a seed of null
, you will be seeding your
reduction with the item null
. Note also that if you do pass in a seed, and the source
Observable emits no items, reduce will emit the seed and complete normally without error.
reduce
does not by default operate on any particular
Scheduler.
reduce(R,Func2)
It is a bad idea to use reduce
to collect emitted items into a mutable data structure.
Instead, use collect
for that purpose.
The collect
operator is similar to reduce
but is specialized for the purpose
of collecting the whole set of items emitted by the source Observable into a single mutable data
structure to be emitted by the resulting Observable. Pass it two parameters:
collect
does not by default operate on any particular
Scheduler.
collect(Func0,Action2)
RxJS implements the reduce
operator. Pass it an accumulator function, and, optionally, a
seed value to pass into the accumulator function with the first item emitted by the source Observable.
var source = Rx.Observable.range(1, 3) .reduce(function (acc, x) { return acc * x; }, 1) var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 6 Completed
reduce
is found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.aggregates.js
reduce
requires one of the following distributions:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
TBD
RxPHP implements this operator as reduce
.
Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/reduce/reduce.php //Without a seed $source = \Rx\Observable::fromArray(range(1, 3)); $subscription = $source ->reduce(function ($acc, $x) { return $acc + $x; }) ->subscribe($createStdoutObserver());
Next value: 6 Complete!
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/reduce/reduce-with-seed.php //With a seed $source = \Rx\Observable::fromArray(range(1, 3)); $subscription = $source ->reduce(function ($acc, $x) { return $acc * $x; }, 1) ->subscribe($createStdoutObserver());
Next value: 6 Complete!
TBD
TBD