The Scan operator applies a function to the first item emitted by the source Observable and then emits the result of that function as its own first emission. It also feeds the result of the function back into the function along with the second item emitted by the source Observable in order to generate its second emission. It continues to feed back its own subsequent emissions along with the subsequent emissions from the source Observable in order to create the rest of its sequence.
This sort of operator is sometimes called an “accumulator” in other contexts.
TBD
RxGroovy implements this operator as scan
. The following code, for example,
takes an Observable that emits a consecutive sequence of n integers starting with
1
and converts it, via scan
, into an Observable that emits the
first n triangular
numbers:
numbers = Observable.from([1, 2, 3, 4, 5]); numbers.scan({ a, b -> a+b }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
1 3 6 10 15 Sequence complete
scan(Func2)
There is also a variant of scan
to which you can pass a seed value to pass
to the accumulator function the first time it is called (for the first emission from the
source Observable) in place of the result from the missing prior call to the accumulator.
Note that if you use this version, scan
will emit this seed value as its own
initial emission. Note also that passing a seed of null
is not the
same as passing no seed at all. A null
seed is a valid variety of seed.
scan(R,Func2)
This operator does not by default operate on any particular Scheduler.
RxJava implements this operator as scan
.
Observable.just(1, 2, 3, 4, 5) .scan(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer sum, Integer item) { return sum + item; } }).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Next: 1 Next: 3 Next: 6 Next: 10 Next: 15 Sequence complete.
scan(Func2)
There is also a variant of scan
to which you can pass a seed value to pass
to the accumulator function the first time it is called (for the first emission from the
source Observable) in place of the result from the missing prior call to the accumulator.
Note that if you use this version, scan
will emit this seed value as its own
initial emission. Note also that passing a seed of null
is not the
same as passing no seed at all. A null
seed is a valid variety of seed.
scan(R,Func2)
This operator does not by default operate on any particular Scheduler.
RxJS implements the scan
operator.
var source = Rx.Observable.range(1, 3) .scan( function (acc, x) { return acc + x; }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 3 Next: 6 Completed
You can optionally pass scan
a seed value as an additional parameter.
scan
will pass this seed value to the accumulator function the first time it is
called (for the first emission from the source Observable) in place of the result from the
missing prior call to the accumulator.
var source = Rx.Observable.range(1, 3) .scan( function (acc, x) { return acc * x; }, 1 ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 6 Completed
scan
is found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxJS also implements the expand
operator, which is somewhat similar. Rather than applying
the function to the previous return value of the function combined with the next item emitted from the
source Observable, such that the number of items it emits is equal to the number emitted by the source
Observable, expand
simply feeds the return value from the function back into the function
without regard to future emissions from the Observable, such that it will just continue to create new
values at its own pace.
var source = Rx.Observable.return(42) .expand(function (x) { return Rx.Observable.return(42 + x); }) .take(5); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Next: 84 Next: 126 Next: 168 Next: 210 Completed
expand
is found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.experimental.js
expand
requires one of the following distributions:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
TBD
TBD
RxPHP implements this operator as scan
.
Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value. For aggregation behavior with no intermediate results, see Observable.aggregate.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/scan/scan.php //Without a seed $source = Rx\Observable::range(1, 3); $subscription = $source ->scan(function ($acc, $x) { return $acc + $x; }) ->subscribe($createStdoutObserver());
Next value: 1 Next value: 3 Next value: 6 Complete!
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/scan/scan-with-seed.php //With a seed $source = Rx\Observable::range(1, 3); $subscription = $source ->scan(function ($acc, $x) { return $acc * $x; }, 1) ->subscribe($createStdoutObserver());
Next value: 1 Next value: 2 Next value: 6 Complete!
TBD
TBD
TBD
TBD