A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.
The RefCount operator automates the process of connecting to and disconnecting from a connectable Observable. It operates on a connectable Observable and returns an ordinary Observable. When the first observer subscribes to this Observable, RefCount connects to the underlying connectable Observable. RefCount then keeps track of how many other observers subscribe to it and does not disconnect from the underlying connectable Observable until the last observer has done so.
TBD
TBD
RxGroovy implements this operator as refCount
.
refCount()
There is also a share
operator, which is the equivalent of applying both the
publish
and refCount
operators to an Observable, in that order.
share()
RxJava implements this operator as refCount
.
refCount()
There is also a share
operator, which is the equivalent of applying both the
publish
and refCount
operators to an Observable, in that order.
share()
RxJava implements this operator as refCount
.
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publish().refCount(); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed Completed
refCount
is found in the following distributions:
rx.all.js
rx.all.compat.js
rx.binding.js
(requires rx.js
, rx.compat.js
, rx.lite.js
, or rx.lite.compat.js
)rx.lite.js
rx.lite.compat.js
There is also a share
operator, which is the equivalent of applying both the
publish
and refCount
operators to an Observable, in that order.
A variant called shareValue
takes as a parameter a single item that it will
emit to any subscribers before beginning to emit items from the source Observable.
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .do( function (x) { console.log('Side effect'); }); var published = source.share(); // When the number of observers subscribed to published observable goes from // 0 to 1, we connect to the underlying observable sequence. published.subscribe(createObserver('SourceA')); // When the second subscriber is added, no additional subscriptions are added to the // underlying observable sequence. As a result the operations that result in side // effects are not repeated per subscriber. published.subscribe(createObserver('SourceB')); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed
share
and shareValue
are found in the following distributions:
rx.all.js
rx.all.compat.js
rx.binding.js
(requires rx.js
or rx.compat.js
)rx.lite.js
rx.lite.compat.js
TBD
RxPHP implements this operator as share
.
Returns an observable sequence that shares a single subscription to the underlying sequence. This operator is a specialization of publish which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/share.php $loop = \React\EventLoop\Factory::create(); $scheduler = new \Rx\Scheduler\EventLoopScheduler($loop); //With Share $source = \Rx\Observable::interval(1000, $scheduler) ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->share(); $published->subscribe($createStdoutObserver('SourceA ')); $published->subscribe($createStdoutObserver('SourceB ')); $loop->run();
Side effect SourceA Next value: 0 SourceB Next value: 0 Side effect SourceA Next value: 1 SourceB Next value: 1 SourceA Complete! SourceB Complete!
RxPHP also has an operator shareValue
.
Returns an observable sequence that shares a single subscription to the underlying sequence and starts with an initialValue. This operator is a specialization of publishValue which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/shareValue.php $loop = \React\EventLoop\Factory::create(); $scheduler = new \Rx\Scheduler\EventLoopScheduler($loop); $source = \Rx\Observable::interval(1000, $scheduler) ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->shareValue(42); $published->subscribe($createStdoutObserver('SourceA ')); $published->subscribe($createStdoutObserver('SourceB ')); $loop->run();
SourceA Next value: 42 SourceB Next value: 42 Side effect SourceA Next value: 0 SourceB Next value: 0 Side effect SourceA Next value: 1 SourceB Next value: 1 SourceA Complete! SourceB Complete!
TBD
TBD
TBD