Reactive Programming Using RxDart For Flutter Applications - Part 1

Mohit Chauhan
Mindful Engineering
6 min readAug 8, 2021

Reactive programming is programming with asynchronous observable streams. In Dart, stream provide an asynchronous sequence of data. In this first part, we will be looking on how to do reactive programming for our flutter applications by extending the capabilities of dart stream with RxDart using it’s Subjects and Stream Classes. In next part we will check the Extension Methods on the Stream Classes aka Operators of Rx.

tPhoto by Jordan on Unsplash

Introduction

Dart streams were developed with reactive programming in mind, Instead of providing an alternative to it, RxDart adds additional functionality using the reactive approach on top of it.

Setup

Add rxdart to your flutter project’s pubspec.yaml file:

dependencies:
rxdart:

Now in our code, we can use it by importing it:

import 'package:rxdart/rxdart.dart';

RxDart does not provide Rx’s main class Observable as alternative for Dart Streams. Instead, it offers several additional Subjects, Stream Classes and Extension Methods on the Stream Classes (Operators of Rx). let’s look over it,

Subjects

In Dart, if we’ve to send data, error and done events to its stream we use StreamController but, in RxDart we have to use Subject which is the same as StreamController but with additional stuff.

all the subjects of RxDart is similar to broadcast StreamController which means we can listened to the subject’s stream as many time as we want.

BehaviorSubject

In BehaviorSubject the most recent item that we added to our subject is dispatched to it’s new listeners. When we listen to our new listener it will receive the latest stored item from the subject and after that new event will be sent to all other listeners. let’s see with example,

We can also assign an initial value to our BehaviorSubject. like this,

PublishSubject

In PublishSubject all the items that we added to our subject is dispatched to it’s listeners. this is most easiest subject among other subjects provided by rxdart.
for this, the sequence of listener matters for adding and listening to the items of PublishSubject. let’s see with example,

ReplaySubject

In ReplaySubject all the items we added to our subject is dispatched to it’s listeners doesn’t matter the sequence when we add first or listen first to that items of ReplaySubject, unlike PublishSubject and BehaviorSubject. let’s see with example,

We can also specify a maxSize value at the time of initializing the ReplaySubject variable. this means we can only listen to the number of latest items that equal to the maxSize. like this,

But maxSize did not work when we added items later / or after the listener.

Stream Classes

The Stream Class is a source of asynchronous data events. RxDart gives us different ways to create a Stream with Stream Classes that have additional capabilities to create a variety of tasks as per our requirement, such as combining or merging Streams!!

We can create the Streams provided by RxDart in two ways. by instantiating the stream class directly like this,

final mergedStream = MergeStream([firstStream, secondStream]);

and using static factories from the Rx class like this,

final mergedStream = Rx.merge([firstStream, secondStream]);

There are so many Stream Classes are provided by RxDart let’s see them with example,

  • CombineLatestStream : This stream joins all the given streams into one single stream sequence by combining them when any of the source stream sequences send forth an item. this stream will emit items after all others streams that we want to combine have emitted at least one item. If any of the streams that we want to combine is empty then the resulting sequence completes instantly without emitting any items.
  • ConcatStream : This stream is used when we want to concatenates all stream sequences. ConcatStream concat the next stream sequence after the previous stream sequence is terminated successfully. In case where the streams is empty then it’s completes instantly without emitting any items.
  • ConcatEagerStream : This stream is similar with ConcatStream only the difference is that instead of subscribing to stream one by one, all the streams are immediately subscribed with correct time.
  • DeferStream : This stream is used when we want to constructs a stream lazily when we subscribes to it. In some case we have to wait until the last minute to generate the stream that have latest data. it’s a single subscription stream but we can make it reusable.
  • ForkJoinStream : This stream is used when we only want sequence that contains only final emitted value of each stream. in the case where any of the inner streams have some error then we will lose the value of any other streams that already completed if we not catch that error correctly on the inner stream.
  • FromCallableStream : This stream is used when we want to return a stream that is based on the result of some function. the stream emits the value that’s returned from that function.
  • MergeStream : This stream is used when we have to flattens the items that emitted by the given streams into a single sequence of stream. In this the items is emitted one after another from all the given streams.
  • NeverStream : This stream returns a non-terminating stream sequence, which can be used for infinite duration. this stream have very specific and limited behavior. as per my knowledge this is used for testing purposes only.
  • RaceStream : This stream returns a stream that emits all of it’s items before any other streams emits it’s items.
  • RangeStream : This stream class basically used when we want to returns a resulting stream that emits a sequence of integers within a particular range that we added.
  • RepeatStream : This stream creates a stream that will recreate it self and re- listen to the source stream for the specified number of times until the stream terminates successfully. In case if we forget to specify the count then it repeats indefinitely.
  • RetryStream : This stream is similar with RepeatStream only the difference is that if the retry count is not specified, it retries indefinitely and if the retry count is met, but the stream has not terminated successfully then all of the errors that caused the failure will be emitted at the end.
  • RetryWhenStream : This stream is somehow similar with the RetryStream the difference is it will take two stream as an argument (streamFactory and retryWhenFactory). if the retryWhenFactory throws an error or returns a stream that emits an error then the original error will be emitted and then the error from retryWhenFactory will be emitted if it is not same as the original error.
  • SequenceEqualStream : This stream is used to check that whether the two streams emit the same sequence of items or not.
  • SwitchLatestStream : This stream is used when we only want the single streams items that emitted most recently from the multiple streams.
  • TimerStream : This stream is useful when we want to emits an item after a some specific amount of time.
  • UsingStream : This stream is used to create a way so we can instruct an stream to create a resource for us that exists only during the lifetime of the stream and is disposed when the stream terminates.
  • ZipStream : This stream is used to merge all the specified streams into a one single stream sequence using the zipper function whenever all of the stream sequences have produced an element at a corresponding index.

Well done! You’ve survived Part 1 of Reactive Programming Using RxDart in which we check RxDart’s Subjects and Stream Classes.

All explanation and example code in this part is based on rxdart version 0.27.1.

Now you’re ready to check Part 2 of Reactive Programming Using RxDart. In that we will check the Extension Methods on the Stream Classes aka Operators of Rx.

--

--