In ReactiveX it is not difficult to get into a situation in which an Observable is emitting items more rapidly than an operator or observer can consume them. This presents the problem of what to do with such a growing backlog of unconsumed items.
For example, imagine using the Zip operator to zip together two infinite Observables, one of which emits items twice as frequently as the other. A naive implementation of the operator would have to maintain an ever-expanding buffer of items emitted by the faster Observable to eventually combine with items emitted by the slower one. This could cause ReactiveX to seize an unwieldy amount of system resources.
There are a variety of strategies with which you can exercise flow control and backpressure in ReactiveX in order to alleviate the problems caused when a quickly-producing Observable meets a slow-consuming observer, which include, in some ReactiveX implementations, reactive pull backpressure and some backpressure-specific operators.
A cold Observable emits a particular sequence of items, but can begin emitting this sequence when its observer finds it to be convenient, and at whatever rate the observer desires, without disrupting the integrity of the sequence. For example if you convert a static iterable into an Observable, that Observable will emit the same sequence of items no matter when it is later subscribed to or how frequently those items are observed. Examples of items emitted by a cold Observable might include the results of a database query, file retrieval, or web request.
A hot Observable begins generating items to emit immediately when it is created. Subscribers typically begin observing the sequence of items emitted by a hot Observable from somewhere in the middle of the sequence, beginning with the first item emitted by the Observable subsequent to the establishment of the subscription. Such an Observable emits items at its own pace, and it is up to its observers to keep up. Examples of items emitted by a hot Observable might include mouse & keyboard events, system events, or stock prices.
When a cold Observable is multicast (when it is converted into a connectable Observable and its Connect method is called), it effectively becomes hot and for the purposes of backpressure and flow-control it should be treated as a hot Observable.
Cold Observables are ideal for the reactive pull model of backpressure implemented by some implementations of ReactiveX (which is described elsewhere). Hot Observables typically do not cope well with a reactive pull model, and are better candidates for other flow control strategies, such as the use of the operators described on this page, or operators like Buffer, Sample, Debounce, or Window.
TBD
RxGroovy implements reactive pull backpressure, and many of its operators support that form of
backpressure. It also has three operators that you can apply to Observables that have not been
written to support backpressure:
A version of this operator that was introduced in RxGroovy 1.1 allows you to set the capacity of the
buffer; applying this operator will cause the resulting Observable to terminate with an error if this
buffer is overrun. A second version, introduced during the same release, allows you to set an
A version of this operator that was introduced in the 1.1 release notifies you, by means of an
onBackpressureBuffer
maintains a buffer of all unobserved emissions from the
source Observable and emits them to downstream observers according to the requests they
generate.
Action
that onBackpressureBuffer
will call if the buffer is overrun.
onBackpressureBuffer()
onBackpressureBuffer(long)
(RxGroovy 1.1)onBackpressureBuffer(long, Action0)
(RxGroovy 1.1)
onBackpressureDrop
drops emissions from the source Observable unless there is a
pending request from a downstream observer, in which case it will emit enough items to fulfill
the request.
Action
you pass as a parameter, when an item has been dropped and which item was dropped.
onBackpressureDrop()
onBackpressureLatest
(new in RxJava 1.1) holds on to the most-recently emitted item from the
source Observable and immediately emits that item to its observer upon request. It drops any other items
that it observes between requests from its observer.
onBackpressureLatest()
RxJava implements reactive pull backpressure, and many of its operators support that form of
backpressure. It also has three operators that you can apply to Observables that have not been
written to support backpressure:
A version of this operator that was introduced in RxJava 1.1 allows you to set the capacity of the
buffer; applying this operator will cause the resulting Observable to terminate with an error if this
buffer is overrun. A second version, introduced during the same release, allows you to set an
A version of this operator that was introduced in the 1.1 release notifies you, by means of an
onBackpressureBuffer
maintains a buffer of all unobserved emissions from the
source Observable and emits them to downstream observers according to the requests they
generate.
Action
that onBackpressureBuffer
will call if the buffer is overrun.
onBackpressureBuffer()
onBackpressureBuffer(long)
(RxJava 1.1)onBackpressureBuffer(long, Action0)
(RxJava 1.1)
onBackpressureDrop
drops emissions from the source Observable unless there is a
pending request from a downstream observer, in which case it will emit enough items to fulfill
the request.
Action
you pass as a parameter, when an item has been dropped and which item was dropped.
onBackpressureDrop()
onBackpressureLatest
(new in RxJava 1.1) holds on to the most-recently emitted item from the
source Observable and immediately emits that item to its observer upon request. It drops any other items
that it observes between requests from its observer.
onBackpressureLatest()
RxJS implements backpressure by transforming an ordinary Observable into a
ControlledObservable
with the controlled
operator. This forces the
Observable to respect pull request
s from its observer rather than pushing items on
its own initiative.
As an alternative to using request
to pull items from a
ControlledObservable
, you may apply the stopAndWait
operator to it.
This operator will request a new item from the Observable each time its observers’
onNext
routine receives the latest item.
A second possibility is to use the windowed(
n)
. This behaves
similarly to stopAndWait
but has an internal buffer of n items, which
allows the ControlledObservable
to run somewhat ahead of the observer from time
to time. windowed(1)
is equivalent to stopAndWait
.
There are also two operators that convert an ordinary Observable into at
PausableObservable
.
If you call the pause
method of a PausableObservable
created with
the pausable
operator, it will drop (ignore) any items emitted by the underlying
source Observable until such time as you call its resume
method, whereupon it
will continue to pass along emitted items to its observers.
If you call the pause
method of a PausableObservable
created with
the pausableBuffered
operator, it will buffer any items emitted by the underlying
source Observable until such time as you call its resume
method, whereupon it
will emit those buffered items and then continue to pass along any additional emitted items to
its observers.
TBD
TBD
TBD
TBD