The various language-specific implementations of ReactiveX have a variety of operators that you can use to convert an Observable, or a sequence of items emitted by an Observable, into another variety of object or data structure. Some of these block until the Observable terminates and then produce an equivalent object or data structure; others return an Observable that emits such an object or data structure.
In some implementations of ReactiveX, there is also an operator that converts an Observable into a “Blocking” Observable. A Blocking Observable extends the ordinary Observable by providing a set of methods, operating on the items emitted by the Observable, that block. Some of the To operators are in this Blocking Obsevable set of extended operations.
The getIterator operator applies to the BlockingObservable subclass, so in order
to use it, you must first convert your source Observable into a BlockingObservable by means
of either the BlockingObservable.from method or the Observable.toBlocking
operator.
This operator converts an Observable into an Iterator with which you can iterate over the
set of items emitted by the source Observable.
BlockingObservable.getIterator()
The toFuture operator applies to the BlockingObservable subclass, so in order
to use it, you must first convert your source Observable into a BlockingObservable by means
of either the BlockingObservable.from method or the Observable.toBlocking
operator.
This operator converts an Observable into an Future that will return the single item emitted
by the source Observable. If the source Observable emits more than one item, the Future will
receive an IllegalArgumentException; if it completes after emitting no items, the
Future will receive a NoSuchElementException.
If you want to convert an Observable that may emit multiple items into a Future, try
something like this: myObservable.toList().toBlocking().toFuture().
BlockingObservable.toFuture()
The toIterable operator applies to the BlockingObservable subclass, so in order
to use it, you must first convert your source Observable into a BlockingObservable by means
of either the BlockingObservable.from method or the Observable.toBlocking
operator.
This operator converts an Observable into an Iterable with which you can iterate over the
set of items emitted by the source Observable.
BlockingObservable.toIterable()
Normally, an Observable that emits multiple items will do so by invoking its observer’s
onNext method for each such item. You can change this behavior, instructing the Observable to
compose a list of these multiple items and then to invoke the observer’s onNext method only
once, passing it the entire list, by applying the toList operator to the Observable.
For example, the following rather pointless code takes a list of integers, converts it into an Observable, then converts that Observable into one that emits the original list as a single item:
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
numbers.toList().subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);[1, 2, 3, 4, 5, 6, 7, 8, 9] Sequence complete
If the source Observable invokes onCompleted before emitting any items, the Observable
returned by toList will emit an empty list before invoking onCompleted. If the
source Observable invokes onError, the Observable returned by toList will
immediately invoke the onError methods of its observers.
toList does not by default operate on any particular
Scheduler.
toList()
The toMap operator collects the items emitted by the source Observable into a map (by
default, a HashMap, but you can optionally supply a factory function that generates another
Map variety) and then emits that map. You supply a function that generates the key for each
emitted item. You may also optionally supply a function that converts an emitted item into the value to be
stored in the map (by default, the item itself is this value).
toMap does not by default operate on any particular
Scheduler.
toMap(Func1)toMap(Func1,Func1)toMap(Func1,Func1,Func0)
The toMultiMap operator is similar to toMap except that the map it generates is
also an ArrayList (by default; or you can pass an optional factory method as a fourth
parameter by which you generate the variety of collection you prefer).
toMultiMap does not by default operate on any particular
Scheduler.
toMultiMap(Func1)toMultiMap(Func1,Func1)toMultiMap(Func1,Func1,Func0)toMultiMap(Func1,Func1,Func0,Func1)
The toSortedList operator behaves much like toList except that it sorts the
resulting list. By default it sorts the list naturally in ascending order by means of the
Comparable interface. If any of the items emitted by the Observable does not support
Comparable with respect to the type of every other item emitted by the Observable,
toSortedList will throw an exception. However, you can change this default behavior by also
passing in to toSortedList a function that takes as its parameters two items and returns a
number; toSortedList will then use that function instead of Comparable to sort
the items.
For example, the following code takes a list of unsorted integers, converts it into an Observable, then converts that Observable into one that emits the original list in sorted form as a single item:
numbers = Observable.from([8, 6, 4, 2, 1, 3, 5, 7, 9]);
numbers.toSortedList().subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);[1, 2, 3, 4, 5, 6, 7, 8, 9] Sequence complete
Here is an example that provides its own sorting function: in this case, one that sorts numbers according to how close they are to the number 5.
numbers = Observable.from([8, 6, 4, 2, 1, 3, 5, 7, 9]);
numbers.toSortedList({ n, m -> Math.abs(5-n) - Math.abs(5-m) }).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);[5, 6, 4, 3, 7, 8, 2, 1, 9] Sequence complete
toSortedList does not by default operate on any particular
Scheduler.
toSortedList()toSortedList(Func2)
RxGroovy also has a nest operator that has one particular purpose: it converts a source
Observable into an Observable that emits that source Observable as its sole item.
The getIterator operator applies to the BlockingObservable subclass, so in order
to use it, you must first convert your source Observable into a BlockingObservable by means
of either the BlockingObservable.from method or the Observable.toBlocking
operator.
This operator converts an Observable into an Iterator with which you can iterate over the
set of items emitted by the source Observable.
BlockingObservable.getIterator()
The toFuture operator applies to the BlockingObservable subclass, so in order
to use it, you must first convert your source Observable into a BlockingObservable by means
of either the BlockingObservable.from method or the Observable.toBlocking
operator.
This operator converts an Observable into an Future that will return the single item emitted
by the source Observable. If the source Observable emits more than one item, the Future will
receive an IllegalArgumentException; if it completes after emitting no items, the
Future will receive a NoSuchElementException.
If you want to convert an Observable that may emit multiple items into a Future, try
something like this: myObservable.toList().toBlocking().toFuture().
BlockingObservable.toFuture()
The toIterable operator applies to the BlockingObservable subclass, so in order
to use it, you must first convert your source Observable into a BlockingObservable by means
of either the BlockingObservable.from method or the Observable.toBlocking
operator.
This operator converts an Observable into an Iterable with which you can iterate over the
set of items emitted by the source Observable.
BlockingObservable.toIterable()
Normally, an Observable that emits multiple items will do so by invoking its observer’s
onNext method for each such item. You can change this behavior, instructing the Observable to
compose a list of these multiple items and then to invoke the observer’s onNext method only
once, passing it the entire list, by applying the toList operator to the Observable.
If the source Observable invokes onCompleted before emitting any items, the Observable
returned by toList will emit an empty list before invoking onCompleted. If the
source Observable invokes onError, the Observable returned by toList will
immediately invoke the onError methods of its observers.
toList does not by default operate on any particular
Scheduler.
toList()
The toMap operator collects the items emitted by the source Observable into a map (by
default, a HashMap, but you can optionally supply a factory function that generates another
Map variety) and then emits that map. You supply a function that generates the key for each
emitted item. You may also optionally supply a function that converts an emitted item into the value to be
stored in the map (by default, the item itself is this value).
toMap does not by default operate on any particular
Scheduler.
toMap(Func1)toMap(Func1,Func1)toMap(Func1,Func1,Func0)
The toMultiMap operator is similar to toMap except that the map it generates is
also an ArrayList (by default; or you can pass an optional factory method as a fourth
parameter by which you generate the variety of collection you prefer).
toMultiMap does not by default operate on any particular
Scheduler.
toMultiMap(Func1)toMultiMap(Func1,Func1)toMultiMap(Func1,Func1,Func0)toMultiMap(Func1,Func1,Func0,Func1)
The toSortedList operator behaves much like toList except that it sorts the
resulting list. By default it sorts the list naturally in ascending order by means of the
Comparable interface. If any of the items emitted by the Observable does not support
Comparable with respect to the type of every other item emitted by the Observable,
toSortedList will throw an exception. However, you can change this default behavior by also
passing in to toSortedList a function that takes as its parameters two items and returns a
number; toSortedList will then use that function instead of Comparable to sort
the items.
toSortedList does not by default operate on any particular
Scheduler.
toSortedList()toSortedList(Func2)
RxJava also has a nest operator that has one particular purpose: it converts a source
Observable into an Observable that emits that source Observable as its sole item.
Normally, an Observable that emits multiple items will do so by invoking its observer’s
onNext method for each such item. You can change this behavior, instructing the Observable to
compose an array of these multiple items and then to invoke the observer’s onNext method only
once, passing it the entire array, by applying the toArray operator to the Observable.
var source = Rx.Observable.timer(0, 1000)
.take(5)
.toArray();
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: [0,1,2,3,4] Completed
toArray is found in each of the following distributions:
rx.jsrx.all.jsrx.all.compat.jsrx.compat.jsrx.lite.jsrx.lite.compat.js
The toMap operator collects the items emitted by the source Observable into a
Map and then emits that map. You supply a function that generates the key for each emitted
item. You may also optionally supply a function that converts an emitted item into the value to be stored
in the map (by default, the item itself is this value).
var source = Rx.Observable.timer(0, 1000)
.take(5)
.toMap(function (x) { return x * 2; }, function (x) { return x * 4; });
var subscription = source.subscribe(
function (x) {
var arr = [];
x.forEach(function (value, key) { arr.push(value, key); })
console.log('Next: ' + arr);
},
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: [0,0,2,4,4,8,6,12,8,16] Completed
toMap is found in each of the following distributions:
rx.all.jsrx.all.compat.jsrx.aggregates.js
Normally, an Observable that emits multiple items will do so by invoking its observer’s
onNext method for each such item. You can change this behavior, instructing the Observable to
compose a Set of these multiple items and then to invoke the observer’s onNext
method only once, passing it the entire Set, by applying the toSet operator to
the Observable.
Note that this only works in an ES6 environment or polyfilled.
var source = Rx.Observable.timer(0, 1000)
.take(5)
.toSet();
var subscription = source.subscribe(
function (x) {
var arr = [];
x.forEach(function (i) { arr.push(i); })
console.log('Next: ' + arr);
},
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: [0,1,2,3,4] Completed
toSet is found in each of the following distributions:
rx.all.jsrx.all.compat.jsrx.aggregates.jsTBD
RxPHP implements this operator as toArray.
Creates an array from an observable sequence.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/toArray/toArray.php
$source = \Rx\Observable::fromArray([1, 2, 3, 4]);
$observer = $createStdoutObserver();
$subscription = $source->toArray()
->subscribe(new CallbackObserver(
function ($array) use ($observer) {
$observer->onNext(json_encode($array));
},
[$observer, "onError"],
[$observer, "onCompleted"]
));
Next value: [1,2,3,4]
Complete!
TBD
TBD