Reactive Programming Using RxDart For Flutter Applications — Part 2

Mohit Chauhan
Mindful Engineering
9 min readAug 8, 2021

In previous part of this , we checked rxdart’s Subjects and Stream Classes. So now in this part we will check the Extension Methods on the Stream Classes which is also known as the Operators of Rx.

Photo by Jordan on Unsplash

Extension Methods on the Stream Classes (Operators of Rx)

Extension Methods is used to perform new functionality to an existing stream class.

extension methods is used with any stream class.

The extension methods of stream classes provided by RxDart is come with many powers. this stream classes is used to transform a source stream into a new stream with unique capabilities, such as throttling or buffering events. let’s see them with example,

BufferExtensions

To extends the stream class with the facility to buffer or store events temporarily in various ways then we can use this extension methods.

buffer() to create a stream where all item is a List containing the items from the source sequence.

bufferCount() to create a buffers of values from the source stream by using count to group them in the buffer and after that clears it and starts a new buffer for each values.

bufferTest() to creates a stream where all item is a List that contains the items from the source sequence and batched when item passes the test.

bufferTime() to create a stream where all item is a List that contains the items from the source sequence that stored in buffer based on time frame.

ConcatExtension

To extends the stream class with the facility to concatenate one stream with the another stream.

concatWith() this method returns a stream that emits all the items from the current stream after that emits all items from the given streams one at a time.

DebounceExtensions

To extends the stream class with the facility to debounce events in different ways.

debounce() to convert a stream in a such a way that will only emit items from the source sequence if a window has completed without the source sequence emitting another item. This window is created after the last debounced event was emitted.

debounceTime() this method is similar with debounce() difference is that when the time span that we defined for duration is passed, without the source sequence emitting another item then only items is emitted.

DefaultIfEmptyExtension

defaultIfEmpty() this method emits an items from the source stream or a single default item if the source stream found to be null.

DelayExtension

To extends the stream class with the facility to delay events being emitted.

delay() to modify a source stream by waiting for some amount of time that we added before emitting each of the source streams items.

DistinctUniqueExtension

To extends the stream class with the facility to emit only unique events.

distinctUnique() to create a stream where data events are skipped if they have already been emitted before.

More commonly known as distinct in other Rx implementations.

DoExtensions

To extends the stream class with the facility to execute interesting callback functions at different points in the streams lifecycle methods.

doOnCancel() executes the callback function when the stream subscription is cancelled.

doOnData() executes the callback function when the stream emits an item.

doOnDone() executes the callback function when the stream finishes emitting items.

doOnEach() executes the callback function when the stream emits data, emits an error, or emits done event of NotificationObject.

doOnError() executes the callback function when the stream emits an error.

doOnListen() executes the callback function when the stream is first listened.

doOnPause() executes the callback function when the stream subscription is paused.

doOnResume() executes the callback function when the stream subscription resumes receiving items.

EndWithExtension

endWith() to attach a value as the final item to the source stream before closing it.

EndWithManyExtension

endWithMany() to attach a sequence of values as final events to the source stream before closing it.

ExhaustMapExtension

exhaustMap() to converts the items from the source stream into a new stream using the mapper function. If an item is being processed then it will ignores other items emitted by the source until the current item completed successfully

FlatMapExtensions

To extends the stream class with the facility to convert the source stream into a new stream each time when the source stream emits an item.

flatMap() to converts all the emitted item into a stream using the mapper function. In this the newly created stream will be be listened to and begin emitting items into it’s downstream by maintaining the same order.

flatMapIterable() to converts all the item into a stream. for this stream must return an Iterable after that each item from the Iterable will be emitted one by one.

GroupByExtension

To extends the stream class with the facility to convert events into streams of events that are united by a key.

groupBy() to divides a stream that emits items into a stream that emits a GroupByStream, each one of which emits some subset of the items from the original source of the stream.

GroupByStream’s key property receives its Type and Value from the grouper function.

IntervalExtension

interval() to creates a stream that emits all item in the stream after a given time duration.

MapToExtension

To extends the stream class with the facility to convert all item in to the same value.

mapTo() to emits the constant value on the output stream every time when the source stream emits a value.

MaterializeExtension

materialize() to converts the onData, onDone, and onError events into NotificationObject that are passed into the downstream’s onData listener.

DematerializeExtension

dematerialize() to converts the onData, onDone, and onError events of NotificationObject from a materialized stream into normal onData, onDone, and onError events.

Basically when a stream has been materialized it emits onData, onDone, and onError events as NotificationObject. Dematerialize reverse this by transforming NotificationObject back to a normal stream of events.

MergeExtension

mergeWith() to combines the items emitted by multiple streams into a single stream of items. The items are emitted in the same order as they are emitted by the sources.

MaxExtension

max() to converts a stream into a Future that completes with the largest item emitted by the stream. this is same as we find the max value from the list, but here the values are asynchronous!

MinExtension

min() to converts a stream into a Future that completes with the smallest item emitted by the stream. this is same as we find the min value from the list, but here the values are asynchronous!

OnErrorExtensions

To extends the stream class with the facility to recover from errors in various ways.

onErrorResume() used to intercepts an onError event of NotificationObject from the source stream and instead of passing the error through to any listeners it replaces it with another stream of items created by the recovery function.

Mostly used to perform logic based on the type of error that was emitted.

onErrorResumeNext() used to intercepts an onError event of NotificationObject from the source stream and instead of passing the error through to any listeners it replaces it with another stream of items.

onErrorReturn() used to intercepts an onError event of NotificationObject from the source stream and instead of passing it through to any observers it replaces it with a given item and then terminates normally.

onErrorReturnWith() used to intercepts an onError event of NotificationObject from the source stream and instead of passing it through to any observers it replaces it with a given item and then terminates normally.

Mostly used to perform logic based on the type of error that was emitted.

PairwiseExtension

pairwise() used to emits the current(n-th) and next(n-1th) events as a pair and in this the first event will not emitted until the second one enter.

SampleExtensions

To extends the stream class with the facility to sample events from the stream.

sample() to emits the most recently emitted item emitted by the source stream since the previous release from the sample stream.

sampleTime() to emits the most recently item emitted by the source stream since the previous release within the periodically time span defined by Duration.

ScanExtension

scan() this method is used to applies an accumulator function to get the result of arithmetic logic on stream sequence and returns each result.

The seed value (0 in this example) is used as the initial accumulator value.

SkipLastExtension

skipLast() this method starts emitting all the items of the stream except the last count items.

SkipUntilExtension

skipUntil() this method starts emitting items only after the given stream emits an item or skip events until another stream emits an item.

StartWithExtension & StartWithManyExtension

startWith() this method emit the given value as the first item of the stream sequence.

startWithMany() this method emit the given values as the first items of the stream sequence.

SwitchIfEmptyExtension

switchIfEmpty() used when we’ve to return an alternative stream if the initial stream completes with no items in it.

SwitchMapExtension

switchMap() to convert one stream into a new stream using the mapper function whenever the source stream emits an item. In this, the newly created stream will be be listened and started emitting the items and any previously created stream will stop emitting the items.

The switchMap operator is similar to the flatMap but the difference is that it only emits items from the most recently created stream. This can be useful when we want the very latest result from our asynchronous APIs response.

TakeLastExtension

takeLast() used to receive only the final count values emitted by the source stream.

TakeUntilExtension

takeUntil() used when we’ve to returns the values from the source stream sequence before the other stream sequence started producing a value.

TakeWhileInclusiveExtension

takeWhileInclusive() to emits the values emitted by the source stream so that all values satisfies the test case and when the test case is not satisfied by a value then it will emit this value as a final event and then complete.

ThrottleExtensions

To extends the stream class with the facility to throttle events in various ways.

throttle() to emits a value from the source stream and then ignores it’s subsequent source values while the window stream is open after that repeats the same process.

If leading is true, then the first item in each window is emitted and If trailing is true, then the last item in each window is emitted.

throttleTime() to emits a value from the source stream and after that ignores it’s subsequent source values for a specific amount of duration, then repeats the same process.

TimeIntervalExtension

timeInterval() used to record the time interval between consecutive values in a stream sequence.

TimeStampExtension

timestamp() to wraps each item emitted by the source stream in a TimeStamped object that includes the emitted item and the time when the item was emitted.

WhereTypeExtension

whereType() used to filter down the events from the source stream and the resulting stream contains only those events of a specific type.

WindowExtensions

To extends the stream class with the ability to use window in our stream.

window() to creates a stream where each item is a stream containing the items from the source sequence.

windowCount() this method creates buffers of a number of values from the source stream using the count and then emits the buffer as a stream and clears it and after that starts a new buffer for other values.

count is the maximum size of the buffer to be emitted.

windowTest() to creates a stream where each item is a stream containing the items from the source sequence and it’s batched when the test case is passes.

windowTime() to creates a stream where each item is a stream containing the items from the source sequence and sampled on a time frame with specified Duration.

WithLatestFromExtensions

withLatestFrom() to creates a stream that emits when the source stream emits and after that combining the latest values from the two streams using the provided function.

ZipWithExtension

zipWith() to create a stream that combines the current stream together with the another stream using a zipper function.

Well done! You’ve survived Part 2 of Reactive Programming Using RxDart in which we check Extension Methods on the Stream Classes which is also known as the Operators of Rx.

Thanks for reading this article. I highly appreciate your patience for the RxDart.

👏👏👏 Claps if this blog helps you and Keep Fluttering.😀

--

--