Using Streams in Flutter

This year, Flutter team presented the most powerful feature of Dart in Google I/O, known as Streams. They remarked:

Widgets + Streams = Reactive Flutter App

Streams are similar to observables in Rx, LiveData in Android. Stream as the name suggest is a sequence of asynchronous events. You define stream at one place, add data to it at some other place and then listen to this data addition elsewhere, so as to refresh your UI etc (like some widget or).

Let’s understand streams by some simple examples.

Firstly, create a flutter project (say flutter_streams).

Part 1: Creating a simple stream

Here we’ll be creating a stream from scratch and understand the basic workflow.

@override
void initState() {
super.initState();

print("Creating a sample stream...");
Stream<String> stream = new Stream.fromFuture(getData());
print("Created the stream");

stream.listen((data) {
print("DataReceived: "+data);
}, onDone: () {
print("Task Done");
}, onError: (error) {
print("Some Error");
});

print("code controller is here");
}

Future<String> getData() async {
await Future.delayed(Duration(seconds: 5)); //Mock delay
print("Fetched Data");
return "This a test data";
}

The output will be :

The output is self explanatory…

First we create a stream which gets its data from the getData() function, which simply returns a sample string after a delay of 5 sec. 
Next, we listen to the stream for the data incoming with .listen() method. This has the incoming ‘data’, an onDone method which marks the completion, and an onError method which simply returns any error encountered. 
This stream closes automatically once it receive onDone() callback.

You should always close a stream when its task is completed. Otherwise performance issue may arise when an app becomes a complex app or when there are many active streams.

*Note the line ‘code controller is here’ is printed before the stream has emitted the data.

Part 2: SteamController

A simple stream isn’t much useful when creating directly. Hence we go a one step ahead, using StreamController class for creating a stream.

StreamController<String> streamController = new StreamController();

@override
void initState() {
super.initState();

print("Creating a StreamController...");
streamController.stream.listen((data) {
print("DataReceived: " + data);
}, onDone: () {
print("Task Done");
}, onError: (error) {
print("Some Error");
});

streamController.add("This a test data");
print("code controller is here");

}

@override
void dispose() {
streamController.close(); //Streams must be closed when not needed
super.dispose();
}

The output is :

Here we are creating our stream from the StreamController, a class which simply controls a stream that it creates. Note that onDone() method hasn’t been called here, because our stream is active and its not closed yet. That why it is mandatory to close the stream manually (say in dispose() method) to prevent memory leaks.

Part 3: StreamSubscription

Now if you just hover over the listen() method you can see that it returns StreamSubscription. Documentation says that:

The subscription provides events to the listener, and holds the callbacks used to handle the events. The subscription can also be used to unsubscribe from the events, or to temporarily pause the events from the stream.

Stream subscription can hence be used to pause/resumes the event from streams when required. Make sure to cancel subscription when not needed.

Part 4: Broadcast Streams

In dart there are two types of streams: single-subscription streams and broadcast streams. The key difference between them is the number of subscription that they can take. The former takes only one subscription while the latter can take as many as required(the name itself justifies that it broadcasts data).

So whenever you are required to listen to the same stream at two different places, also simultaneously, you can use broadcast stream. Here’s an example:

StreamController<String> streamController = new StreamController.broadcast();   //Add .broadcast here

@override
void initState() {
super.initState();

print("Creating a StreamController...");
  //First subscription
streamController.stream.listen((data) {
print("DataReceived1: " + data);
}, onDone: () {
print("Task Done1");
}, onError: (error) {
print("Some Error1");
});
  //Second subscription
streamController.stream.listen((data) {
print("DataReceived2: " + data);
}, onDone: () {
print("Task Done2");
}, onError: (error) {
print("Some Error2");
});

streamController.add("This a test data");
print("code controller is here");

}

The output is as shown:

You can see we added the data to the stream only once and its been received by two subscription simultaneously. See how powerful streams are! Let’s say you have a databaseHelper class and you want to listen for the changes to a particular data. With the Broadcast streams you can listen for this change at multiple locations and update the data accordingly. No callbacks, interfaces or boilerplate codes etc required.

So this was all for this article. Hope you got the basic insights of what stream are. The above sample code is available here.

Whola! Both you and I learnt something new today. Congrats
Clap! Clap! Clap!

Further Reading:

  1. https://stackoverflow.com/questions/14536437/how-do-you-create-a-stream-in-dart
  2. https://books.google.co.in/books?id=hYGOBQAAQBAJ&pg=PT198&lpg=PT198&dq=Stream.periodic&source=bl&ots=4_tShWWyXE&sig=fl5RadXHL3nqGEbPoWfpQD_EZ4w&hl=en&sa=X&ved=0ahUKEwjxp4aY5qbcAhWDp48KHe-6BWEQ6AEIXjAM#v=onepage&q=Stream.periodic&f=true
  3. https://www.dartlang.org/tutorials/language/streams