The Buffer operator transforms an Observable that emits items into an Observable that emits buffered collections of those items. There are a number of variants in the various language-specific implementations of Buffer that differ in how they choose which items go in which buffers.
Note that if the source Observable issues an onError
notification,
Buffer will pass on this notification immediately without first
emitting the buffer it is in the process of assembling, even if that buffer contains items that
were emitted by the source Observable before it issued the error notification.
The Window operator is similar to Buffer but collects items into separate Observables rather than into data structures before reemitting them.
RxCpp implements two variants of Buffer:
buffer(count)
buffer(count)
emits non-overlapping buffers in the form of
vector
s, each of which contains at most count
items from the
source Observable (the final emitted vector
may have fewer than
count
items).
buffer(count, skip)
buffer(count, skip)
creates a new buffer starting with the first
emitted item from the source Observable, and every skip
items thereafter, and
fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as vector
s. Depending on the values
of count
and skip
these buffers may overlap (multiple buffers
may contain the same item), or they may have gaps (where items emitted by the source
Observable are not represented in any buffer).
In RxGroovy there are several variants of Buffer:
buffer(count)
buffer(count)
emits non-overlapping buffers in the form of
List
s, each of which contains at most count
items from the
source Observable (the final emitted List
may have fewer than
count
items).
buffer(int)
buffer(count, skip)
buffer(count, skip)
creates a new buffer starting with the first
emitted item from the source Observable, and every skip
items thereafter, and
fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as List
s. Depending on the values of
count
and skip
these buffers may overlap (multiple buffers may
contain the same item), or they may have gaps (where items emitted by the source
Observable are not represented in any buffer).
buffer(int,int)
buffer(bufferClosingSelector)
When it subscribes to the source Observable, buffer(bufferClosingSelector)
begins to collect its emissions into a List
, and it also calls
bufferClosingSelector
to generate a second Observable. When this second
Observable emits an TClosing
object, buffer
emits the current
List
and repeats this process: beginning a new List
and calling
bufferClosingSelector
to create a new Observable to monitor. It will do this
until the source Observable terminates.
buffer(Func0)
buffer(boundary
[, initialCapacity
])
buffer(boundary)
monitors an Observable, boundary
. Each time
that Observable emits an item, it creates a new List
to begin collecting
items emitted by the source Observable and emits the previous List
.
buffer(Observable)
buffer(Observable,int)
buffer(bufferOpenings, bufferClosingSelector)
buffer(bufferOpenings, bufferClosingSelector)
monitors an Observable,
bufferOpenings
, that emits BufferOpening
objects. Each time it
observes such an emitted item, it creates a new List
to begin collecting
items emitted by the source Observable and it passes the bufferOpenings
Observable into the closingSelector
function. That function returns an
Observable. buffer
monitors that Observable and when it detects an emitted
item from it, it closes the List
and emits it as its own emission.
buffer(Observable,Func1)
buffer(timespan, unit
[, scheduler
])
buffer(timespan, unit)
emits a new List
of items
periodically, every timespan
amount of time, containing all items emitted by
the source Observable since the previous bundle emission or, in the case of the first
bundle, since the subscription to the source Observable. There is also a version of this
variant of the operator that takes a Scheduler
as a
parameter and uses it to govern the timespan; by default this variant uses the computation
Scheduler.
buffer(long,TimeUnit)
buffer(long,TimeUnit,Scheduler)
buffer(timespan, unit, count
[, scheduler
])
buffer(timespan, unit, count)
emits a new List
of
items for every count
items emitted by the source Observable, or, if
timespan
has elapsed since its last bundle emission, it emits a bundle of
however many items the source Observable has emitted in that span, even if this is fewer
than count
. There is also a version of this variant of the operator that
takes a Scheduler
as a parameter and uses it to govern the
timespan; by default this variant uses the computation
scheduler.
buffer(long,TimeUnit,int)
buffer(long,TimeUnit,int,Scheduler)
buffer(timespan, timeshift, unit
[, scheduler
])
buffer(timespan, timeshift, unit)
creates a new List
of items every timeshift
period of time, and fills this bundle with every
item emitted by the source Observable from that time until timespan
time has
passed since the bundle’s creation, before emitting this List
as its
own emission. If timespan
is longer than timeshift
, the emitted
bundles will represent time periods that overlap and so they may contain duplicate items.
There is also a version of this variant of the operator that takes a
Scheduler
as a parameter and uses it to govern the
timespan; by default this variant uses the computation
scheduler.
buffer(long,long,TimeUnit)
buffer(long,long,TimeUnit,Scheduler)
You can use the Buffer operator to implement backpressure (that is, to cope with an Observable that may produce items too quickly for its observer to consume).
Buffer can reduce a sequence of many items to a sequence of fewer buffers-of-items, making them more manageable. You could, for example, close and emit a buffer of items from a bursty Observable periodically, at a regular interval of time.
Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the Debounce operator to emit a buffer closing indicator to the buffer operator.
// we have to multicast the original bursty Observable so we can use it // both as our source and as the source for our buffer closing selector: Observable<Integer> burstyMulticast = bursty.publish().refCount(); // burstyDebounced will be our buffer closing selector: Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS); // and this, finally, is the Observable of buffers we're interested in: Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
In RxJava there are several variants of Buffer:
buffer(count)
buffer(count)
emits non-overlapping buffers in the form of
List
s, each of which contains at most count
items from the
source Observable (the final emitted List
may have fewer than
count
items).
buffer(int)
buffer(count, skip)
buffer(count, skip)
creates a new buffer starting with the first
emitted item from the source Observable, and every skip
items thereafter, and
fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as List
s. Depending on the values of
count
and skip
these buffers may overlap (multiple buffers may
contain the same item), or they may have gaps (where items emitted by the source
Observable are not represented in any buffer).
buffer(int,int)
buffer(bufferClosingSelector)
When it subscribes to the source Observable, buffer(bufferClosingSelector)
begins to collect its emissions into a List
, and it also calls
bufferClosingSelector
to generate a second Observable. When this second
Observable emits an TClosing
object, buffer
emits the current
List
and repeats this process: beginning a new List
and calling
bufferClosingSelector
to create a new Observable to monitor. It will do this
until the source Observable terminates.
buffer(Func0)
buffer(boundary)
buffer(boundary)
monitors an Observable, boundary
. Each time
that Observable emits an item, it creates a new List
to begin collecting
items emitted by the source Observable and emits the previous List
.
buffer(Observable)
buffer(Observable,int)
buffer(bufferOpenings, bufferClosingSelector)
buffer(bufferOpenings, bufferClosingSelector)
monitors an Observable,
bufferOpenings
, that emits BufferOpening
objects. Each time it
observes such an emitted item, it creates a new List
to begin collecting
items emitted by the source Observable and it passes the bufferOpenings
Observable into the closingSelector
function. That function returns an
Observable. buffer
monitors that Observable and when it detects an emitted
item from it, it closes the List
and emits it as its own emission.
buffer(Observable,Func1)
buffer(timespan, unit
[, scheduler
])
buffer(timespan, unit)
emits a new List
of items
periodically, every timespan
amount of time, containing all items emitted by
the source Observable since the previous bundle emission or, in the case of the first
bundle, since the subscription to the source Observable. There is also a version of this
variant of the operator that takes a Scheduler
as a
parameter and uses it to govern the timespan; by default this variant uses the computation
scheduler.
buffer(long,TimeUnit)
buffer(long,TimeUnit,Scheduler)
buffer(timespan, unit, count
[, scheduler
])
buffer(timespan, unit, count)
emits a new List
of
items for every count
items emitted by the source Observable, or, if
timespan
has elapsed since its last bundle emission, it emits a bundle of
however many items the source Observable has emitted in that span, even if this is fewer
than count
. There is also a version of this variant of the operator that
takes a Scheduler
as a parameter and uses it to govern the
timespan; by default this variant uses the computation
scheduler.
buffer(long,TimeUnit,int)
buffer(long,TimeUnit,int,Scheduler)
buffer(timespan, timeshift, unit
[, scheduler
])
buffer(timespan, timeshift, unit)
creates a new List
of items every timeshift
period of time, and fills this bundle with every
item emitted by the source Observable from that time until timespan
time has
passed since the bundle’s creation, before emitting this List
as its
own emission. If timespan
is longer than timeshift
, the emitted
bundles will represent time periods that overlap and so they may contain duplicate items.
There is also a version of this variant of the operator that takes a
Scheduler
as a parameter and uses it to govern the
timespan; by default this variant uses the computation
scheduler.
buffer(long,long,TimeUnit)
buffer(long,long,TimeUnit,Scheduler)
You can use the Buffer operator to implement backpressure (that is, to cope with an Observable that may produce items too quickly for its observer to consume).
Buffer can reduce a sequence of many items to a sequence of fewer buffers-of-items, making them more manageable. You could, for example, close and emit a buffer of items from a bursty Observable periodically, at a regular interval of time.
Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the Debounce operator to emit a buffer closing indicator to the buffer operator.
// we have to multicast the original bursty Observable so we can use it // both as our source and as the source for our buffer closing selector: Observable<Integer> burstyMulticast = bursty.publish().refCount(); // burstyDebounced will be our buffer closing selector: Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS); // and this, finally, is the Observable of buffers we're interested in: Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
RxJS has four Buffer operators — buffer
,
bufferWithCount
, bufferWithTime
, and
bufferWithTimeOrCount
— each of which has variants that have different
ways of governing which source Observable items are emitted as part of which buffers.
buffer(bufferBoundaries)
buffer(bufferBoundaries)
monitors an Observable,
bufferBoundaries
. Each time that Observable emits an item, it creates a new
collection to begin collecting items emitted by the source Observable and emits the
previous collection.
buffer(bufferClosingSelector)
When it subscribes to the source Observable, buffer(bufferClosingSelector)
begins to collect its emissions into a collection, and it also calls
bufferClosingSelector
to generate a second Observable. When this second
Observable emits an item, buffer
emits the current collection and repeats
this process: beginning a new collection and calling bufferClosingSelector
to create a new Observable to monitor. It will do this until the source Observable
terminates.
buffer(bufferOpenings,bufferClosingSelector)
buffer(bufferOpenings, bufferClosingSelector)
monitors an Observable,
bufferOpenings
, that emits BufferOpening
objects. Each time it
observes such an emitted item, it creates a new collection to begin collecting items
emitted by the source Observable and it passes the bufferOpenings
Observable into the bufferClosingSelector
function. That function returns an
Observable. buffer
monitors that Observable and when it detects an emitted
item from it, it emits the current collection and begins a new one.
buffer
is found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.coincidence.js
buffer
requires one of the following distributions:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
bufferWithCount(count)
bufferWithCount(count)
emits non-overlapping buffers, each of which contains
at most count
items from the source Observable (the final emitted buffer may
contain fewer than count
items).
bufferWithCount(count, skip)
bufferWithCount(count, skip)
creates a new buffer starting with the
first emitted item from the source Observable, and a new one for every skip
items thereafter, and fills each buffer with count
items: the initial item
and count-1
subsequent ones, emitting each buffer when it is complete.
Depending on the values of count
and skip
these buffers may
overlap (multiple buffers may contain the same item), or they may have gaps (where items
emitted by the source Observable are not represented in any buffer).
bufferWithCount
is found in each of the following distributions:
rx.js
rx.compat.js
rx.all.js
rx.all.compat.js
rx.lite.extras.js
bufferWithTime(timeSpan)
bufferWithTime(timeSpan)
emits a new collection of items periodically, every
timeSpan
milliseconds, containing all items emitted by the source Observable
since the previous bundle emission or, in the case of the first bundle, since the
subscription to the source Observable. There is also a version of this variant of the
operator that takes a Scheduler
as a parameter and uses it
to govern the timespan; by default this variant uses the timeout
scheduler.
bufferWithTime(timeSpan, timeShift)
bufferWithTime(timeSpan, timeShift)
creates a new collection of items
every timeShift
milliseconds, and fills this bundle with every item emitted
by the source Observable from that time until timeSpan
milliseconds has
passed since the collection’s creation, before emitting this collection as its own
emission. If timeSpan
is longer than timeShift
, the emitted
bundles will represent time periods that overlap and so they may contain duplicate items.
There is also a version of this variant of the operator that takes a
Scheduler
as a parameter and uses it to govern the
timespan; by default this variant uses the timeout
scheduler.
bufferWithTimeOrCount(timeSpan, count)
bufferWithTimeOrCount(timeSpan, count)
emits a new collection of items
for every count
items emitted by the source Observable, or, if
timeSpan
milliseconds have elapsed since its last collection emission, it
emits a collection of however many items the source Observable has emitted in that span,
even if this is fewer than count
. There is also a version of this variant of
the operator that takes a Scheduler
as a parameter and uses
it to govern the timespan; by default this variant uses the timeout
scheduler.
bufferWithTime
and bufferWithTimeOrCount
are found in each of the
following distributions:
rx.all.js
rx.all.compat.js
rx.time.js
bufferWithTime
and bufferWithTimeOrCount
require one of the
following distributions:
rx.time.js
requires rx.js
or rx.compat.js
rx.lite.js
or rx.lite.compat.js
In RxKotlin there are several variants of Buffer:
buffer(count)
buffer(count)
emits non-overlapping buffers in the form of
List
s, each of which contains at most count
items from the
source Observable (the final emitted List
may have fewer than
count
items).
buffer(count, skip)
buffer(count, skip)
creates a new buffer starting with the first
emitted item from the source Observable, and every skip
items thereafter, and
fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as List
s. Depending on the values of
count
and skip
these buffers may overlap (multiple buffers may
contain the same item), or they may have gaps (where items emitted by the source
Observable are not represented in any buffer).
buffer(bufferClosingSelector)
When it subscribes to the source Observable, buffer(bufferClosingSelector)
begins to collect its emissions into a List
, and it also calls
bufferClosingSelector
to generate a second Observable. When this second
Observable emits an TClosing
object, buffer
emits the current
List
and repeats this process: beginning a new List
and calling
bufferClosingSelector
to create a new Observable to monitor. It will do this
until the source Observable terminates.
buffer(boundary)
buffer(boundary)
monitors an Observable, boundary
. Each time
that Observable emits an item, it creates a new List
to begin collecting
items emitted by the source Observable and emits the previous List
.
buffer(bufferOpenings, bufferClosingSelector)
buffer(bufferOpenings, bufferClosingSelector)
monitors an Observable,
bufferOpenings
, that emits BufferOpening
objects. Each time it
observes such an emitted item, it creates a new List
to begin collecting
items emitted by the source Observable and it passes the bufferOpenings
Observable into the closingSelector
function. That function returns an
Observable. buffer
monitors that Observable and when it detects an emitted
item from it, it closes the List
and emits it as its own emission.
buffer(timespan, unit
[, scheduler
])
buffer(timespan, unit)
emits a new List
of items
periodically, every timespan
amount of time, containing all items emitted by
the source Observable since the previous bundle emission or, in the case of the first
bundle, since the subscription to the source Observable. There is also a version of this
variant of the operator that takes a Scheduler
as a
parameter and uses it to govern the timespan; by default this variant uses the computation
scheduler.
buffer(timespan, unit, count
[, scheduler
])
buffer(timespan, unit, count)
emits a new List
of
items for every count
items emitted by the source Observable, or, if
timespan
has elapsed since its last bundle emission, it emits a bundle of
however many items the source Observable has emitted in that span, even if this is fewer
than count
. There is also a version of this variant of the operator that
takes a Scheduler
as a parameter and uses it to govern the
timespan; by default this variant uses the computation
scheduler.
buffer(timespan, timeshift, unit
[, scheduler
])
buffer(timespan, timeshift, unit)
creates a new List
of items every timeshift
period of time, and fills this bundle with every
item emitted by the source Observable from that time until timespan
time has
passed since the bundle’s creation, before emitting this List
as its
own emission. If timespan
is longer than timeshift
, the emitted
bundles will represent time periods that overlap and so they may contain duplicate items.
There is also a version of this variant of the operator that takes a
Scheduler
as a parameter and uses it to govern the
timespan; by default this variant uses the computation
scheduler.
In Rx.NET there are several variants of Buffer. For each variety you can either pass in the source Observable as the first parameter, or you can call it as an instance method of the source Observable (in which case you can omit that parameter):
Buffer(count)
Buffer(count)
emits non-overlapping buffers in the form of
IList
s, each of which contains at most count
items from the
source Observable (the final emitted IList
may have fewer than
count
items).
Buffer(count, skip)
Buffer(count, skip)
creates a new buffer starting with the first
emitted item from the source Observable, and every skip
items thereafter, and
fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as IList
s. Depending on the values of
count
and skip
these buffers may overlap (multiple buffers may
contain the same item), or they may have gaps (where items emitted by the source
Observable are not represented in any buffer).
Buffer(bufferClosingSelector)
When it subscribes to the source Observable, Buffer(bufferClosingSelector)
begins to collect its emissions into an IList
, and it also calls
bufferClosingSelector
to generate a second Observable. When this second
Observable emits an TBufferClosing
object, Buffer
emits the
current IList
and repeats this process: beginning a new IList
and calling bufferClosingSelector
to create a new Observable to monitor. It
will do this until the source Observable terminates.
Buffer(bufferOpenings,bufferClosingSelector)
Buffer(bufferOpenings, bufferClosingSelector)
monitors an Observable,
BufferOpenings
, that emits TBufferOpening
objects. Each time it
observes such an emitted item, it creates a new IList
to begin collecting
items emitted by the source Observable and it passes the TBufferOpening
object into the bufferClosingSelector
function. That function returns an
Observable. Buffer
monitors that Observable and when it detects an emitted
item from it, it closes the IList
and emits it as its own emission.
Buffer(timeSpan)
Buffer(timeSpan)
emits a new IList
of items periodically, every
timeSpan
amount of time, containing all items emitted by the source
Observable since the previous bundle emission or, in the case of the first list, since
the subscription to the source Observable. There is also a version of this variant of the
operator that takes an IScheduler
as a parameter and uses
it to govern the timespan.
Buffer(timeSpan, count)
Buffer(timeSpan, count)
emits a new IList
of items for
every count
items emitted by the source Observable, or, if
timeSpan
has elapsed since its last list emission, it emits a list of
however many items the source Observable has emitted in that span, even if this is fewer
than count
. There is also a version of this variant of the operator that
takes an IScheduler
as a parameter and uses it to govern
the timespan.
Buffer(timeSpan, timeShift)
Buffer(timeSpan, timeShift)
creates a new IList
of items
every timeShift
period of time, and fills this list with every item emitted
by the source Observable from that time until timeSpan
time has passed since
the list’s creation, before emitting this IList
as its own emission. If
timeSpan
is longer than timeShift
, the emitted lists will
represent time periods that overlap and so they may contain duplicate items. There is also
a version of this variant of the operator that takes an
IScheduler
as a parameter and uses it to govern the
timespan.
RxPHP implements this operator as bufferWithCount
.
Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/bufferWithCount/bufferWithCount.php $source = Rx\Observable::range(1, 6) ->bufferWithCount(2) ->subscribe($stdoutObserver);
Next value: [1,2] Next value: [3,4] Next value: [5,6] Complete!
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/bufferWithCount/bufferWithCountAndSkip.php $source = Rx\Observable::range(1, 6) ->bufferWithCount(2, 1) ->subscribe($stdoutObserver);
Next value: [1,2] Next value: [2,3] Next value: [3,4] Next value: [4,5] Next value: [5,6] Next value: [6] Complete!
RxPY has several Buffer variants: buffer
,
buffer_with_count
, buffer_with_time
, and
buffer_with_time_or_count
. For each of these variants there are optional
parameters that change the behavior of the operator. As always in RxPY, when an operator may
take more than one optional parameter, be sure to name the parameter in the parameter list
when you call the operator so as to avoid ambiguity.
buffer(buffer_openings)
buffer(buffer_openings=boundaryObservable)
monitors an Observable,
buffer_openings
. Each time that Observable emits an item, it creates a new
array to begin collecting items emitted by the source Observable and emits the
previous array.
buffer(closing_selector)
buffer(closing_selector=closingSelector)
begins collecting items emitted
by the source Observable immediately upon subscription, and also calls the
closing_selector
function to generate a second Observable. It monitors this
new Observable and, when it completes or emits an item, it emits the current array,
begins a new array to collect items from the source Observable, and calls
closing_selector
again to generate a new Observable to monitor in order to
determine when to emit the new array. It repeats this process until the source Observable
terminates, whereupon it emits the final array.
buffer(closing_selector=openingSelector, buffer_closing_selector=closingSelector)
begins by calling closing_selector
to get an Observable. It monitors this
Observable, and, whenever it emits an item, buffer
creates a new array, begins
to collect items subsequently emitted by the source Observable into this array, and calls
buffer_closing_selector
to get a new Observable to govern the closing of that
array. When this new Observable emits an item or terminates, buffer
closes and
emits the array that the Observable governs.
buffer_with_count(count)
buffer_with_count(count)
emits non-overlapping buffers in the form of
arrays, each of which contains at most count
items from the source Observable
(the final emitted array may have fewer than count
items).
buffer_with_count(count, skip)
buffer_with_count(count, skip=skip)
creates a new buffer starting with
the first emitted item from the source Observable, and every skip
items
thereafter, and fills each buffer with count
items: the initial item and
count-1
subsequent ones. It emits these buffers as arrays. Depending on the
values of count
and skip
these buffers may overlap (multiple
buffers may contain the same item), or they may have gaps (where items emitted by the
source Observable are not represented in any buffer).
buffer_with_time(timespan)
buffer_with_time(timespan)
emits a new array of items periodically, every
timespan
milliseconds, containing all items emitted by the source Observable
since the previous bundle emission or, in the case of the first bundle, since the
subscription to the source Observable. There is also a version of this variant of the
operator that takes a scheduler
parameter and uses it to govern the timespan;
by default this variant uses the timeout
scheduler.
buffer_with_time(timespan, timeshift)
buffer(timespan, timeshift=timeshift)
creates a new array of items
every timeshift
milliseconds, and fills this array with every item emitted
by the source Observable from that time until timespan
milliseconds have
passed since the array’s creation, before emitting this array as its own emission.
If timespan
is longer than timeshift
, the emitted arrays will
represent time periods that overlap and so they may contain duplicate items. There is also
a version of this variant of the operator that takes a scheduler
parameter
and uses it to govern the timespan; by default this variant uses the timeout
scheduler.
buffer_with_time_or_count(timespan, count)
buffer_with_time_or_count(timespan, count)
emits a new array of items
for every count
items emitted by the source Observable, or, if
timespan
milliseconds have elapsed since its last bundle emission, it emits
an array of however many items the source Observable has emitted in that span, even if
this is fewer than count
. There is also a version of this variant of the
operator that takes a scheduler
parameter and uses it to govern the timespan;
by default this variant uses the timeout
scheduler.
Rx.rb has three variants of the Buffer operator:
buffer_with_count(count)
buffer_with_count(count)
emits non-overlapping buffers in the form of
arrays, each of which contains at most count
items from the source Observable
(the final emitted array may have fewer than count
items).
buffer_with_count(count,skip)
buffer_with_count(count, skip=skip)
creates a new buffer starting with
the first emitted item from the source Observable, and every skip
items
thereafter, and fills each buffer with count
items: the initial item and
count-1
subsequent ones. It emits these buffers as arrays. Depending on the
values of count
and skip
these buffers may overlap (multiple
buffers may contain the same item), or they may have gaps (where items emitted by the
source Observable are not represented in any buffer).
buffer_with_time(timespan)
buffer_with_time(timespan)
emits a new array of items periodically, every
timespan
milliseconds, containing all items emitted by the source Observable
since the previous bundle emission or, in the case of the first bundle, since the
subscription to the source Observable.
RxScala has two varieties of Buffer —
slidingBuffer
and tumblingBuffer
— each of which has variants
with different ways of assembling the buffers they emit:
slidingBuffer(count, skip)
slidingBuffer(count, skip)
creates a new buffer starting with the first
emitted item from the source Observable, and every skip
items thereafter, and
fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as Seq
s. Depending on the values of
count
and skip
these buffers may overlap (multiple buffers may
contain the same item), or they may have gaps (where items emitted by the source
Observable are not represented in any buffer).
slidingBuffer(timespan, timeshift)
slidingBuffer(timespan, timeshift)
creates a new
Seq
of items every timeshift
(a Duration
), and
fills this buffer with every item emitted by the source Observable from that time until
timespan
(also a Duration
) has passed since the buffer’s
creation, before emitting this Seq
as its own emission. If
timespan
is longer than timeshift
, the emitted arrays will
represent time periods that overlap and so they may contain duplicate items. There is also
a version of this variant of the operator that takes a
Scheduler
as a parameter and uses it to govern the
timespan.
slidingBuffer(openings, closings)
slidingBuffer(openings,closings)
monitors the openings
Observable, and, whenever it emits an Opening
item, slidingBuffer
creates a new Seq
, begins to collect items subsequently emitted by the source
Observable into this buffer, and calls closings
to get a new Observable to
govern the closing of that buffer. When this new Observable emits an item or terminates,
slidingBuffer
closes and emits the Seq
that the Observable
governs.
tumblingBuffer(count)
tumblingBuffer(count)
emits non-overlapping buffers in the form of
Seq
s, each of which contains at most count
items from the source
Observable (the final emitted buffer may have fewer than count
items).
tumblingBuffer(boundary)
tumblingBuffer(boundary)
monitors an Observable, boundary
. Each
time that Observable emits an item, it creates a new Seq
to begin collecting
items emitted by the source Observable and emits the previous Seq
. This
variant of the operator has an optional second parameter, initialCapacity
with which you can indicate the expected size of these buffers so as to make memory
allocation more efficient.
tumblingBuffer(timespan)
tumblingBuffer(timespan)
emits a new Seq
of items periodically,
every timespan
(a Duration
), containing all items emitted by the
source Observable since the previous bundle emission or, in the case of the first bundle,
since the subscription to the source Observable. This variant of the operator has an
optional second parameter, scheduler
, with which you can set the
Scheduler
that you want to govern the timespan
calculation.
tumblingBuffer(timespan, count)
tumblingBuffer(timespan, count)
emits a new Seq
of items
for every count
items emitted by the source Observable, or, if
timespan
(a Duration
) has elapsed since its last bundle
emission, it emits a Seq
containing however many items the source Observable
emitted in that span, even if this is fewer than count
. This variant of the
operator has an optional third parameter, scheduler
, with which you can set
the Scheduler
that you want to govern the timespan
calculation.
TBD