You can register callbacks that ReactiveX will call when certain events take place on an Observable, where those callbacks will be called independently from the normal set of notifications associated with an Observable cascade. There are a variety of operators that various ReactiveX implementations have designed to allow for this.
TBD
RxGroovy has several Do variants.
The doOnEach
operator allows you to establish a callback that the resulting Observable will
call each time it emits an item. You can pass this callback either in the form of an Action
that takes an onNext
variety of Notification
as its sole parameter, or you can
pass in an Observer whose onNext
method will be called as if it had subscribed to the
Observable.
doOnEach(Action1)
doOnEach(Observer)
The doOnNext
operator is much like doOnEach(Action1)
except that the
Action
that you pass it as a parameter does not accept a Notification
but
instead simply accepts the emitted item.
doOnNext(Action1)
The doOnRequest
operator (new in RxGroovy 1.1) registers an Action
which will be
called whenever an observer requests additional items from the resulting Observable. That
Action
receives as its parameter the number of items that the observer is requesting.
doOnRequest(Action1)
The doOnSubscribe
operator registers an Action
which will be called whenever
an observer subscribes to the resulting Observable.
doOnSubscribe(Action0)
The doOnUnsubscribe
operator registers an Action
which will be called whenever
an observer unsubscribes from the resulting Observable.
doOnUnsubscribe(Action0)
The doOnCompleted
operator registers an Action
which will be called if the
resulting Observable terminates normally, calling onCompleted
.
doOnCompleted(Action0)
The doOnError
operator registers an Action
which will be called if the
resulting Observable terminates abnormally, calling onError
. This Action
will
be passed the Throwable
representing the error.
doOnError(Action1)
The doOnTerminate
operator registers an Action
which will be called just
before the resulting Observable terminates, whether normally or with an error.
doOnTerminate(Action0)
The finallyDo
operator registers an Action
which will be called just
after the resulting Observable terminates, whether normally or with an error.
def numbers = Observable.from([1, 2, 3, 4, 5]); numbers.finallyDo({ println('Finally'); }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
1 2 3 4 5 Sequence complete Finally
finallyDo(Action0)
RxJava has several Do variants.
The doOnEach
operator allows you to establish a callback that the resulting Observable will
call each time it emits an item. You can pass this callback either in the form of an Action
that takes an onNext
variety of Notification
as its sole parameter, or you can
pass in an Observer whose onNext
method will be called as if it had subscribed to the
Observable.
doOnEach(Action1)
doOnEach(Observer)
The doOnNext
operator is much like doOnEach(Action1)
except that the
Action
that you pass it as a parameter does not accept a Notification
but
instead simply accepts the emitted item.
Observable.just(1, 2, 3) .doOnNext(new Action1<Integer>() { @Override public void call(Integer item) { if( item > 1 ) { throw new RuntimeException( "Item exceeds maximum value" ); } } }).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Next: 1 Error: Item exceeds maximum value
doOnNext(Action1)
The doOnRequest
operator (new in RxJava 1.1) registers an Action
which will be
called whenever an observer requests additional items from the resulting Observable. That
Action
receives as its parameter the number of items that the observer is requesting.
doOnRequest(Action1)
The doOnSubscribe
operator registers an Action
which will be called whenever
an observer subscribes to the resulting Observable.
doOnSubscribe(Action0)
The doOnUnsubscribe
operator registers an Action
which will be called whenever
an observer unsubscribes from the resulting Observable.
doOnUnsubscribe(Action0)
The doOnCompleted
operator registers an Action
which will be called if the
resulting Observable terminates normally, calling onCompleted
.
doOnCompleted(Action0)
The doOnError
operator registers an Action
which will be called if the
resulting Observable terminates abnormally, calling onError
. This Action
will
be passed the Throwable
representing the error.
doOnError(Action1)
The doOnTerminate
operator registers an Action
which will be called just
before the resulting Observable terminates, whether normally or with an error.
doOnTerminate(Action0)
The finallyDo
operator registers an Action
which will be called just
after the resulting Observable terminates, whether normally or with an error.
finallyDo(Action0)
RxJS implements the basic Do operator as do
or tap
(two names for the same operator). You have two choices for how to use this operator:
do
/tap
will call that
Observer’s methods as though that Observer had subscribed to the resulting Observable.onNext
, onError
,
and onCompleted
) that do
/tap
will call along with the
similarly-named functions of any of its observers./* Using an observer */ var observer = Rx.Observer.create( function (x) { console.log('Do Next: %s', x); }, function (err) { console.log('Do Error: %s', err); }, function () { console.log('Do Completed'); } ); var source = Rx.Observable.range(0, 3) .do(observer); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Do Next: 0 Next: 0 Do Next: 1 Next: 1 Do Next: 2 Next: 2 Do Completed Completed
/* Using a function */ var source = Rx.Observable.range(0, 3) .do( function (x) { console.log('Do Next:', x); }, function (err) { console.log('Do Error:', err); }, function () { console.log('Do Completed'); } ); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Do Next: 0 Next: 0 Do Next: 1 Next: 1 Do Next: 2 Next: 2 Do Completed Completed
RxJS also implements doOnNext
or tapOnNext
(two names for the same operator).
It is a specialized form of Do that responds only to the
onNext
case, by calling a callback function you provide as a parameter. You may also
optionally pass a second parameter that will be the “this
” object from the
point of view of your callback function when it executes.
var source = Rx.Observable.range(0, 3) .doOnNext( function () { this.log('Do Next: %s', x); }, console ); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Do Next: 0 Next: 0 Do Next: 1 Next: 1 Do Next: 2 Next: 2 Completed
RxJS also implements doOnError
or tapOnError
(two names for the same operator).
It is a specialized form of Do that responds only to the
onError
case, by calling a callback function you provide as a parameter. You may also
optionally pass a second parameter that will be the “this
” object from the
point of view of your callback function when it executes.
var source = Rx.Observable.throw(new Error()); .doOnError( function (err) { this.log('Do Error: %s', err); }, console ); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Do Error: Error Error: Error
RxJS also implements doOnCompleted
or tapOnCompleted
(two names for the same
operator). It is a specialized form of Do that responds only to the
onCompleted
case, by calling a callback function you provide as a parameter. You may also
optionally pass a second parameter that will be the “this
” object from the
point of view of your callback function when it executes.
var source = Rx.Observable.range(0, 3) .doOnCompleted( function () { this.log('Do Completed'); }, console ); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Do Completed Completed
RxJS also implements a finally
operator. It takes a function that will be called after the
resulting Observable terminates, whether normally (onCompleted
) or abnormally
(onError
).
var source = Rx.Observable.throw(new Error()) .finally(function () { console.log('Finally'); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Error: Error Finally
do
/tap
, doOnNext
/tapOnNext
,
doOnError
/tapOnError
, doOnCompleted
/tapOnCompleted
, and
finally
are found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
TBD
RxPHP implements this operator as doOnEach
.
Invokes an action for each element in the observable sequence and invokes an action upon graceful or exceptional termination of the observable sequence. This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnEach.php $source = \Rx\Observable::range(0, 3) ->doOnEach(new \Rx\Observer\CallbackObserver( function ($x) { echo 'Do Next:', $x, PHP_EOL; }, function (Exception $err) { echo 'Do Error:', $err->getMessage(), PHP_EOL; }, function () { echo 'Do Completed', PHP_EOL; } )); $subscription = $source->subscribe($stdoutObserver);
Do Next:0 Next value: 0 Do Next:1 Next value: 1 Do Next:2 Next value: 2 Do Completed Complete!
RxPHP also has an operator doOnNext
.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnNext.php $source = \Rx\Observable::range(0, 3) ->doOnNext(function ($x) { echo 'Do Next:', $x, PHP_EOL; }); $subscription = $source->subscribe($stdoutObserver);
Do Next:0 Next value: 0 Do Next:1 Next value: 1 Do Next:2 Next value: 2 Complete!
RxPHP also has an operator doOnError
.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnError.php $source = \Rx\Observable::error(new Exception('Oops')) ->doOnError(function (Exception $err) { echo 'Do Error:', $err->getMessage(), PHP_EOL; }); $subscription = $source->subscribe($stdoutObserver);
Do Error:Oops Exception: Oops
RxPHP also has an operator doOnCompleted
.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnCompleted.php $source = \Rx\Observable::emptyObservable() ->doOnCompleted(function () { echo 'Do Completed', PHP_EOL; }); $subscription = $source->subscribe($stdoutObserver);
Do Completed Complete!
TBD
TBD