A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.
If you apply the Replay operator to an Observable before you convert it into a connectable Observable, the resulting connectable Observable will always emit the same complete sequence to any future observers, even those observers that subscribe after the connectable Observable has begun to emit items to other subscribed observers.
TBD
In RxGroovy there is a variety of the replay
operator that returns a connectable
Observable. You must Publish this
connectable Observable before observers can subscribe to it, and then
Connect to it in order to observe its
emissions.
Variants of this variety of the replay
operator permit you to set a maximum
buffer size to limit the number of items replay
will buffer and replay to
subsequent observers, and/or to establish a moving time window that defines when emitted items
become too old to buffer and replay.
replay()
replay(Scheduler)
replay(int)
replay(int,Scheduler)
replay(long,TimeUnit)
replay(long,TimeUnit,Scheduler)
replay(int,long,TimeUnit)
replay(int,long,TimeUnit,Scheduler)
There is also a variety of replay
that returns an ordinary Observable. These
variants take as a parameter a transformative function; this function accepts an item emitted
by the source Observable as its parameter, and returns an item to be emitted by the resulting
Observable. So really, this operator does not replay the source Observable but instead replays
the source Observable as transformed by this function.
Variants of this variety of the replay
operator permit you to set a maximum
buffer size to limit the number of items replay
will buffer and replay to
subsequent observers, and/or to establish a moving time window that defines when emitted items
become too old to buffer and replay.
replay(Func1)
replay(Func1,Scheduler)
replay(Func1,int)
replay(Func1,int,Scheduler)
replay(Func1,long,TimeUnit)
replay(Func1,long,TimeUnit,Scheduler)
replay(Func1,int,long,TimeUnit)
replay(Func1,int,long,TimeUnit,Scheduler)
In RxJava there is a variety of the replay
operator that returns a connectable
Observable. You must Publish this
connectable Observable before observers can subscribe to it, and then
Connect to it in order to observe its
emissions.
Variants of this variety of the replay
operator permit you to set a maximum
buffer size to limit the number of items replay
will buffer and replay to
subsequent observers, and/or to establish a moving time window that defines when emitted items
become too old to buffer and replay.
replay()
replay(Scheduler)
replay(int)
replay(int,Scheduler)
replay(long,TimeUnit)
replay(long,TimeUnit,Scheduler)
replay(int,long,TimeUnit)
replay(int,long,TimeUnit,Scheduler)
There is also a variety of replay
that returns an ordinary Observable. These
variants take as a parameter a transformative function; this function accepts an item emitted
by the source Observable as its parameter, and returns an item to be emitted by the resulting
Observable. So really, this operator does not replay the source Observable but instead replays
the source Observable as transformed by this function.
Variants of this variety of the replay
operator permit you to set a maximum
buffer size to limit the number of items replay
will buffer and replay to
subsequent observers, and/or to establish a moving time window that defines when emitted items
become too old to buffer and replay.
replay(Func1)
replay(Func1,Scheduler)
replay(Func1,int)
replay(Func1,int,Scheduler)
replay(Func1,long,TimeUnit)
replay(Func1,long,TimeUnit,Scheduler)
replay(Func1,int,long,TimeUnit)
replay(Func1,int,long,TimeUnit,Scheduler)
In RxJs the replay
operator takes four optional parameters and returns an
ordinary Observable:
selector
bufferSize
window
scheduler
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .do(function (x) { console.log('Side effect'); }); var published = source .replay(function (x) { return x.take(2).repeat(2); }, 3); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Next: SourceA0 Side effect Next: SourceB0 Side effect Next: SourceA1 Next: SourceA0 Next: SourceA1 Completed Side effect Next: SourceB1 Next: SourceB0 Next: SourceB1 Completed
There is also a shareReplay
operator, which keeps track of the number of
observers, and disconnects from the source Observable when that number drops to zero.
shareReplay
takes three optional parameters and returns an ordinary Observable:
bufferSize
window
scheduler
var interval = Rx.Observable.interval(1000); var source = interval .take(4) .doAction(function (x) { console.log('Side effect'); }); var published = source .shareReplay(3); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); // Creating a third subscription after the previous two subscriptions have // completed. Notice that no side effects result from this subscription, // because the notifications are cached and replayed. Rx.Observable .return(true) .delay(6000) .flatMap(published) .subscribe(createObserver('SourceC')); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Side effect Next: SourceA2 Next: SourceB2 Side effect Next: SourceA3 Next: SourceB3 Completed Completed Next: SourceC1 Next: SourceC2 Next: SourceC3 Completed
replay
and shareReplay
are found in the following distributions:
rx.all.js
rx.all.compat.js
rx.binding.js
(requires rx.js
or rx.compat.js
)rx.lite.js
rx.lite.compat.js
TBD
RxPHP implements this operator as replay
.
Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence replaying notifications subject to a maximum time length for the replay buffer. This operator is a specialization of Multicast using a ReplaySubject.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/replay/replay.php $loop = \React\EventLoop\Factory::create(); $scheduler = new \Rx\Scheduler\EventLoopScheduler($loop); $interval = \Rx\Observable::interval(1000); $source = $interval ->take(2) ->doOnNext(function ($x) { echo $x, " something", PHP_EOL; echo "Side effect", PHP_EOL; }); $published = $source ->replay(function (\Rx\Observable $x) { return $x->take(2)->repeat(2); }, 3); $published->subscribe($createStdoutObserver('SourceA '), $scheduler); $published->subscribe($createStdoutObserver('SourceB '), $scheduler); $loop->run();
RxPHP also has an operator shareReplay
.
Returns an observable sequence that shares a single subscription to the underlying sequence replaying notifications subject to a maximum time length for the replay buffer. This operator is a specialization of replay which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/shareReplay.php $loop = \React\EventLoop\Factory::create(); $scheduler = new \Rx\Scheduler\EventLoopScheduler($loop); $interval = Rx\Observable::interval(1000); $source = $interval ->take(4) ->doOnNext(function ($x) { echo 'Side effect', PHP_EOL; }); $published = $source ->shareReplay(3); $published->subscribe($createStdoutObserver('SourceA '), $scheduler); $published->subscribe($createStdoutObserver('SourceB '), $scheduler); Rx\Observable ::just(true) ->concatMapTo(\Rx\Observable::timer(6000)) ->flatMap(function () use ($published) { return $published; }) ->subscribe($createStdoutObserver('SourceC '), $scheduler); $loop->run();
Side effect SourceA Next value: 0 SourceB Next value: 0 Side effect SourceA Next value: 1 SourceB Next value: 1 Side effect SourceA Next value: 2 SourceB Next value: 2 Side effect SourceA Next value: 3 SourceB Next value: 3 SourceA Complete! SourceB Complete! SourceC Next value: 1 SourceC Next value: 2 SourceC Next value: 3 SourceC Complete!
TBD
TBD