The various language-specific implementations of ReactiveX have a variety of operators that you can use to convert an Observable, or a sequence of items emitted by an Observable, into another variety of object or data structure. Some of these block until the Observable terminates and then produce an equivalent object or data structure; others return an Observable that emits such an object or data structure.
In some implementations of ReactiveX, there is also an operator that converts an Observable into a “Blocking” Observable. A Blocking Observable extends the ordinary Observable by providing a set of methods, operating on the items emitted by the Observable, that block. Some of the To operators are in this Blocking Obsevable set of extended operations.
The getIterator
operator applies to the BlockingObservable
subclass, so in order
to use it, you must first convert your source Observable into a BlockingObservable
by means
of either the BlockingObservable.from
method or the Observable.toBlocking
operator.
This operator converts an Observable into an Iterator
with which you can iterate over the
set of items emitted by the source Observable.
BlockingObservable.getIterator()
The toFuture
operator applies to the BlockingObservable
subclass, so in order
to use it, you must first convert your source Observable into a BlockingObservable
by means
of either the BlockingObservable.from
method or the Observable.toBlocking
operator.
This operator converts an Observable into an Future
that will return the single item emitted
by the source Observable. If the source Observable emits more than one item, the Future
will
receive an IllegalArgumentException
; if it completes after emitting no items, the
Future
will receive a NoSuchElementException
.
If you want to convert an Observable that may emit multiple items into a Future
, try
something like this: myObservable.toList().toBlocking().toFuture()
.
BlockingObservable.toFuture()
The toIterable
operator applies to the BlockingObservable
subclass, so in order
to use it, you must first convert your source Observable into a BlockingObservable
by means
of either the BlockingObservable.from
method or the Observable.toBlocking
operator.
This operator converts an Observable into an Iterable
with which you can iterate over the
set of items emitted by the source Observable.
BlockingObservable.toIterable()
Normally, an Observable that emits multiple items will do so by invoking its observer’s
onNext
method for each such item. You can change this behavior, instructing the Observable to
compose a list of these multiple items and then to invoke the observer’s onNext
method only
once, passing it the entire list, by applying the toList
operator to the Observable.
For example, the following rather pointless code takes a list of integers, converts it into an Observable, then converts that Observable into one that emits the original list as a single item:
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]); numbers.toList().subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
[1, 2, 3, 4, 5, 6, 7, 8, 9] Sequence complete
If the source Observable invokes onCompleted
before emitting any items, the Observable
returned by toList
will emit an empty list before invoking onCompleted
. If the
source Observable invokes onError
, the Observable returned by toList
will
immediately invoke the onError
methods of its observers.
toList
does not by default operate on any particular
Scheduler.
toList()
The toMap
operator collects the items emitted by the source Observable into a map (by
default, a HashMap
, but you can optionally supply a factory function that generates another
Map
variety) and then emits that map. You supply a function that generates the key for each
emitted item. You may also optionally supply a function that converts an emitted item into the value to be
stored in the map (by default, the item itself is this value).
toMap
does not by default operate on any particular
Scheduler.
toMap(Func1)
toMap(Func1,Func1)
toMap(Func1,Func1,Func0)
The toMultiMap
operator is similar to toMap
except that the map it generates is
also an ArrayList
(by default; or you can pass an optional factory method as a fourth
parameter by which you generate the variety of collection you prefer).
toMultiMap
does not by default operate on any particular
Scheduler.
toMultiMap(Func1)
toMultiMap(Func1,Func1)
toMultiMap(Func1,Func1,Func0)
toMultiMap(Func1,Func1,Func0,Func1)
The toSortedList
operator behaves much like toList
except that it sorts the
resulting list. By default it sorts the list naturally in ascending order by means of the
Comparable
interface. If any of the items emitted by the Observable does not support
Comparable
with respect to the type of every other item emitted by the Observable,
toSortedList
will throw an exception. However, you can change this default behavior by also
passing in to toSortedList
a function that takes as its parameters two items and returns a
number; toSortedList
will then use that function instead of Comparable
to sort
the items.
For example, the following code takes a list of unsorted integers, converts it into an Observable, then converts that Observable into one that emits the original list in sorted form as a single item:
numbers = Observable.from([8, 6, 4, 2, 1, 3, 5, 7, 9]); numbers.toSortedList().subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
[1, 2, 3, 4, 5, 6, 7, 8, 9] Sequence complete
Here is an example that provides its own sorting function: in this case, one that sorts numbers according to how close they are to the number 5.
numbers = Observable.from([8, 6, 4, 2, 1, 3, 5, 7, 9]); numbers.toSortedList({ n, m -> Math.abs(5-n) - Math.abs(5-m) }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
[5, 6, 4, 3, 7, 8, 2, 1, 9] Sequence complete
toSortedList
does not by default operate on any particular
Scheduler.
toSortedList()
toSortedList(Func2)
RxGroovy also has a nest
operator that has one particular purpose: it converts a source
Observable into an Observable that emits that source Observable as its sole item.
The getIterator
operator applies to the BlockingObservable
subclass, so in order
to use it, you must first convert your source Observable into a BlockingObservable
by means
of either the BlockingObservable.from
method or the Observable.toBlocking
operator.
This operator converts an Observable into an Iterator
with which you can iterate over the
set of items emitted by the source Observable.
BlockingObservable.getIterator()
The toFuture
operator applies to the BlockingObservable
subclass, so in order
to use it, you must first convert your source Observable into a BlockingObservable
by means
of either the BlockingObservable.from
method or the Observable.toBlocking
operator.
This operator converts an Observable into an Future
that will return the single item emitted
by the source Observable. If the source Observable emits more than one item, the Future
will
receive an IllegalArgumentException
; if it completes after emitting no items, the
Future
will receive a NoSuchElementException
.
If you want to convert an Observable that may emit multiple items into a Future
, try
something like this: myObservable.toList().toBlocking().toFuture()
.
BlockingObservable.toFuture()
The toIterable
operator applies to the BlockingObservable
subclass, so in order
to use it, you must first convert your source Observable into a BlockingObservable
by means
of either the BlockingObservable.from
method or the Observable.toBlocking
operator.
This operator converts an Observable into an Iterable
with which you can iterate over the
set of items emitted by the source Observable.
BlockingObservable.toIterable()
Normally, an Observable that emits multiple items will do so by invoking its observer’s
onNext
method for each such item. You can change this behavior, instructing the Observable to
compose a list of these multiple items and then to invoke the observer’s onNext
method only
once, passing it the entire list, by applying the toList
operator to the Observable.
If the source Observable invokes onCompleted
before emitting any items, the Observable
returned by toList
will emit an empty list before invoking onCompleted
. If the
source Observable invokes onError
, the Observable returned by toList
will
immediately invoke the onError
methods of its observers.
toList
does not by default operate on any particular
Scheduler.
toList()
The toMap
operator collects the items emitted by the source Observable into a map (by
default, a HashMap
, but you can optionally supply a factory function that generates another
Map
variety) and then emits that map. You supply a function that generates the key for each
emitted item. You may also optionally supply a function that converts an emitted item into the value to be
stored in the map (by default, the item itself is this value).
toMap
does not by default operate on any particular
Scheduler.
toMap(Func1)
toMap(Func1,Func1)
toMap(Func1,Func1,Func0)
The toMultiMap
operator is similar to toMap
except that the map it generates is
also an ArrayList
(by default; or you can pass an optional factory method as a fourth
parameter by which you generate the variety of collection you prefer).
toMultiMap
does not by default operate on any particular
Scheduler.
toMultiMap(Func1)
toMultiMap(Func1,Func1)
toMultiMap(Func1,Func1,Func0)
toMultiMap(Func1,Func1,Func0,Func1)
The toSortedList
operator behaves much like toList
except that it sorts the
resulting list. By default it sorts the list naturally in ascending order by means of the
Comparable
interface. If any of the items emitted by the Observable does not support
Comparable
with respect to the type of every other item emitted by the Observable,
toSortedList
will throw an exception. However, you can change this default behavior by also
passing in to toSortedList
a function that takes as its parameters two items and returns a
number; toSortedList
will then use that function instead of Comparable
to sort
the items.
toSortedList
does not by default operate on any particular
Scheduler.
toSortedList()
toSortedList(Func2)
RxJava also has a nest
operator that has one particular purpose: it converts a source
Observable into an Observable that emits that source Observable as its sole item.
Normally, an Observable that emits multiple items will do so by invoking its observer’s
onNext
method for each such item. You can change this behavior, instructing the Observable to
compose an array of these multiple items and then to invoke the observer’s onNext
method only
once, passing it the entire array, by applying the toArray
operator to the Observable.
var source = Rx.Observable.timer(0, 1000) .take(5) .toArray(); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [0,1,2,3,4] Completed
toArray
is found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
The toMap
operator collects the items emitted by the source Observable into a
Map
and then emits that map. You supply a function that generates the key for each emitted
item. You may also optionally supply a function that converts an emitted item into the value to be stored
in the map (by default, the item itself is this value).
var source = Rx.Observable.timer(0, 1000) .take(5) .toMap(function (x) { return x * 2; }, function (x) { return x * 4; }); var subscription = source.subscribe( function (x) { var arr = []; x.forEach(function (value, key) { arr.push(value, key); }) console.log('Next: ' + arr); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [0,0,2,4,4,8,6,12,8,16] Completed
toMap
is found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.aggregates.js
Normally, an Observable that emits multiple items will do so by invoking its observer’s
onNext
method for each such item. You can change this behavior, instructing the Observable to
compose a Set
of these multiple items and then to invoke the observer’s onNext
method only once, passing it the entire Set
, by applying the toSet
operator to
the Observable.
Note that this only works in an ES6 environment or polyfilled.
var source = Rx.Observable.timer(0, 1000) .take(5) .toSet(); var subscription = source.subscribe( function (x) { var arr = []; x.forEach(function (i) { arr.push(i); }) console.log('Next: ' + arr); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [0,1,2,3,4] Completed
toSet
is found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.aggregates.js
TBD
RxPHP implements this operator as toArray
.
Creates an array from an observable sequence.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/toArray/toArray.php $source = \Rx\Observable::fromArray([1, 2, 3, 4]); $observer = $createStdoutObserver(); $subscription = $source->toArray() ->subscribe(new CallbackObserver( function ($array) use ($observer) { $observer->onNext(json_encode($array)); }, [$observer, "onError"], [$observer, "onCompleted"] ));
Next value: [1,2,3,4] Complete!
TBD
TBD