A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.
TBD
RxGroovy implements this operator as publish
.
publish()
There is also a variant that takes a function as a parameter. This function takes an emitted item from the source Observable as a parameter and produces the item that will be emitted in its place by the resulting Observable.
publish(Func1)
RxJava implements this operator as publish
.
publish()
There is also a variant that takes a function as a parameter. This function takes an emitted
item from the source Observable as a parameter and produces the item that will be emitted in
its place by the ConnectableObservable
.
publish(Func1)
In RxJS, the publish
operator takes a function as a parameter. This function
takes an emitted item from the source Observable as a parameter and produces the item that
will be emitted in its place by the returned ConnectableObservable
.
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publish(); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); var connection = published.connect(); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed
The publishValue
operator takes, in addition to the function described above, an
initial item to be emitted by the resulting ConnectableObservable
at connection
time before emitting the items from the source Observable. It will not, however, emit this
initial item to observers that subscribe after the time of connection.
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publishValue(42); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); var connection = published.connect(); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Next: SourceA42 Next: SourceB42 Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed Completed
The publishLast
operator is similar to publish
, and takes a
similarly-behaving function as its parameter. It differs from publish
in that
instead of applying that function to, and emitting an item for every item emitted by
the source Observable subsequent to the connection, it only applies that function to and emits
an item for the last item that was emitted by the source Observable, when that source
Observable terminates normally.
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publishLast(); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); var connection = published.connect(); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Side effect Next: SourceA1 Completed Next: SourceB1 Completed
The above operators are available in the following packages:
rx.all.js
rx.all.compat.js
rx.binding.js
(requires either rx.js
or rx.compat.js
)rx.lite.js
rx.lite.compat.js
RxJS also has a multicast
operator which operates on an ordinary Observable,
multicasts that Observable by means of a particular Subject that you specify, applies a
transformative function to each emission, and then emits those transformed values as its own
ordinary Observable sequence. Each subscription to this new Observable will trigger a
new subscription to the underlying multicast Observable.
var subject = new Rx.Subject(); var source = Rx.Observable.range(0, 3) .multicast(subject); var observer = Rx.Observer.create( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); } ); var subscription = source.subscribe(observer); subject.subscribe(observer); var connected = source.connect(); subscription.dispose();
Next: 0 Next: 0 Next: 1 Next: 1 Next: 2 Next: 2 Completed
The multicast
operator is available in the following packages:
rx.all.js
rx.all.compat.js
rx.binding.js
(requires either rx.lite.js
or rx.compat.js
)rx.lite.js
rx.lite.compat.js
There is also a let
operator (the alias letBind
is available for
browsers such as Internet Explorer before IE9 where “let
” is
forbidden). It is similar to multicast
but does not multicast the underlying
Observable through a Subject:
var obs = Rx.Observable.range(1, 3); var source = obs.let(function (o) { return o.concat(o); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 3 Next: 1 Next: 2 Next: 3 Completed
The let
(or letBind
) operator is available in the following packages:
rx.all.js
rx.all.compat.js
rx.experimental.js
It requires one of the following packages:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
TBD
RxPHP implements this operator as multicast
.
Multicasts the source sequence notifications through an instantiated subject into all uses of the sequence within a selector function. Each subscription to the resulting sequence causes a separate multicast invocation, exposing the sequence resulting from the selector function's invocation. For specializations with fixed subject types, see Publish, PublishLast, and Replay.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/multicast/multicast.php $subject = new \Rx\Subject\Subject(); $source = \Rx\Observable::range(0, 3)->multicast($subject); $subscription = $source->subscribe($stdoutObserver); $subject->subscribe($stdoutObserver); $connected = $source->connect(); $subscription->dispose();
Next value: 0 Next value: 0 Next value: 1 Next value: 1 Next value: 2 Next value: 2 Complete!
RxPHP also has an operator multicastWithSelector
.
Multicasts the source sequence notifications through an instantiated subject from a subject selector factory, into all uses of the sequence within a selector function. Each subscription to the resulting sequence causes a separate multicast invocation, exposing the sequence resulting from the selector function's invocation. For specializations with fixed subject types, see Publish, PublishLast, and Replay.
RxPHP also has an operator publish
.
Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence. This operator is a specialization of Multicast using a regular Subject.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publish.php /* With publish */ $interval = \Rx\Observable::range(0, 10); $source = $interval ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->publish(); $published->subscribe($createStdoutObserver('SourceC ')); $published->subscribe($createStdoutObserver('SourceD ')); $published->connect();
Side effect SourceC Next value: 0 SourceD Next value: 0 Side effect SourceC Next value: 1 SourceD Next value: 1 SourceC Complete! SourceD Complete!
RxPHP also has an operator publishLast
.
Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence containing only the last notification. This operator is a specialization of Multicast using a AsyncSubject.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishLast.php $range = \Rx\Observable::fromArray(range(0, 1000)); $source = $range ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->publishLast(); $published->subscribe($createStdoutObserver('SourceA')); $published->subscribe($createStdoutObserver('SourceB')); $connection = $published->connect();
Side effect Side effect SourceANext value: 1 SourceBNext value: 1 SourceAComplete! SourceBComplete!
RxPHP also has an operator publishValue
.
Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue. This operator is a specialization of Multicast using a BehaviorSubject.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishValue.php $range = \Rx\Observable::fromArray(range(0, 1000)); $source = $range ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->publishValue(42); $published->subscribe($createStdoutObserver('SourceA')); $published->subscribe($createStdoutObserver('SourceB')); $connection = $published->connect();
SourceANext value: 42 SourceBNext value: 42 Side effect SourceANext value: 0 SourceBNext value: 0 Side effect SourceANext value: 1 SourceBNext value: 1 SourceAComplete! SourceBComplete!
TBD