The Sample operator periodically looks at an Observable and emits whichever item it has most recently emitted since the previous sampling.
In some implementations, there is also a ThrottleFirst operator that is similar, but emits not the most-recently emitted item in the sample period, but the first item that was emitted during that period.
sample
TBD
TBD
RxGroovy implements this operator as sample
and throttleLast
.
Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from this operator will emit no item for that sampling period.
One variant of sample
(or its alias, throttleLast
) samples at a periodic time
interval that you choose by passing in a TimeUnit
and a quantity of such units as parameters
to sample
.
The following code constructs an Observable that emits the numbers between one and a million, and then samples that Observable every ten milliseconds to see what number it is emitting at that moment.
def numbers = Observable.range( 1, 1000000 ); numbers.sample(10, java.util.concurrent.TimeUnit.MILLISECONDS).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
339707 547810 891282 Sequence complete
This variant of sample
operates by default on the computation
Scheduler, but you can optionally pass in a Scheduler of your choosing as
a third parameter.
sample(long,TimeUnit)
and throttleLast(long,TimeUnit)
sample(long,TimeUnit,Scheduler)
and throttleLast(long,TimeUnit,Scheduler)
There ia also a variant of sample
(that does not have a throttleLast
alias)
that samples the source Observable each time a second Observable emits an item (or when it terminates).
You pass in that second Observable as the parameter to sample
.
This variant of sample
does not by default operate on any particular
Scheduler.
sample(Observable)
There is also a throttleFirst
operator, which differs from
throttleLast
/sample
in that it emits the first item emitted by the
source Observable in each sampling period rather than the most recently emitted item.
Scheduler s = new TestScheduler(); PublishSubject<Integer> o = PublishSubject.create(); o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted ); // send events with simulated time increments s.advanceTimeTo(0, TimeUnit.MILLISECONDS); o.onNext(1); // deliver o.onNext(2); // skip s.advanceTimeTo(501, TimeUnit.MILLISECONDS); o.onNext(3); // deliver s.advanceTimeTo(600, TimeUnit.MILLISECONDS); o.onNext(4); // skip s.advanceTimeTo(700, TimeUnit.MILLISECONDS); o.onNext(5); // skip o.onNext(6); // skip s.advanceTimeTo(1001, TimeUnit.MILLISECONDS); o.onNext(7); // deliver s.advanceTimeTo(1501, TimeUnit.MILLISECONDS); o.onCompleted();
1 3 7 Sequence complete
throttleFirst
operates by default on the computation
Scheduler, but you can optionally pass in a Scheduler of your choosing as
a third parameter.
RxJava implements this operator as sample
and throttleLast
.
Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from this operator will emit no item for that sampling period.
One variant of sample
(or its alias, throttleLast
) samples at a periodic time
interval that you choose by passing in a TimeUnit
and a quantity of such units as parameters
to sample
.
This variant of sample
operates by default on the computation
Scheduler, but you can optionally pass in a Scheduler of your choosing as
a third parameter.
sample(long,TimeUnit)
and throttleLast(long,TimeUnit)
sample(long,TimeUnit,Scheduler)
and throttleLast(long,TimeUnit,Scheduler)
There ia also a variant of sample
(that does not have a throttleLast
alias)
that samples the source Observable each time a second Observable emits an item (or when it terminates).
You pass in that second Observable as the parameter to sample
.
This variant of sample
does not by default operate on any particular
Scheduler.
sample(Observable)
There is also a throttleFirst
operator, which differs from
throttleLast
/sample
in that it emits the first item emitted by the
source Observable in each sampling period rather than the most recently emitted item.
throttleFirst
operates by default on the computation
Scheduler, but you can optionally pass in a Scheduler of your choosing as
a third parameter.
RxJS implements this operator with two variants of sample
.
The first variant accepts as its parameter a periodicity, defined as an integer number of milliseconds, and it samples the source Observable periodically at that frequency.
var source = Rx.Observable.interval(1000) .sample(5000) .take(2); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 3 Next: 8 Completed
The second variant accepts as its parameter an Observable, and it samples the source Observable whenever this second Observable emits an item.
var source = Rx.Observable.interval(1000) .sample(Rx.Observable.interval(5000)) .take(2); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 3 Next: 8 Completed
There is also a throttleFirst
operator, which differs from sample
in that it
emits the first item emitted by the source Observable in each sampling period rather than the
most recently emitted item.
It does not have the variant that uses the emissions from a second Observable to regulate the sampling periodicity.
var times = [ { value: 0, time: 100 }, { value: 1, time: 600 }, { value: 2, time: 400 }, { value: 3, time: 900 }, { value: 4, time: 200 } ]; // Delay each item by time and project value; var source = Rx.Observable.from(times) .flatMap(function (item) { return Rx.Observable .of(item.value) .delay(item.time); }) .throttleFirst(300 /* ms */); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Next: 0 Next: 2 Next: 3 Completed
sample
and throttleFirst
operate by default on the timeout
Scheduler. They are found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.time.js
(requires rx.js
or rx.compat.js
)rx.lite.js
rx.lite.compat.js
TBD
TBD