Dart asynchronous programming: Streams

Kathy Walrath
Dart
Published in
7 min readFeb 11, 2020

Note: You might want to check out a version of this article where the code is updated to be null safe. (The video has not changed.)

A simple Flutter app that displays data from a stream

This article covers one of the fundamentals of reactive programming: streams, which are objects of type Stream.

If you’ve read the previous article on futures, you might remember that each future represents a single value (either an error or data) that it delivers asynchronously. Streams work similarly, but instead of a single thing, a stream can deliver zero or more values and errors over time.

This article is the third one based on the Flutter in Focus video series Asynchronous Programming in Dart. The first article, Isolates and event loops, covered the foundations of Dart’s support for background work. The second one, Futures, discussed the Future class.

If you prefer to learn by watching or listening, everything in this article is covered in the following video.

Andrew Brogdon’s video is the inspiration for this article.

If you think about the way a single value relates to an iterator of the same type, that’s how a future relates to a stream.

Just like with futures, the key is deciding in advance, “Here’s what to do when a piece of data is ready, and when there’s an error, and when the stream completes.”

Also just like with futures, the Dart event loop is still running the show.

Streams work with the Dart event loop.

If you’re using the File class’s openRead() method to read data from a file, for example, that method returns a stream.

Chunks of data are read from disk and arrive at the event loop. A Dart library looks at them and says, “Ah, I’ve got somebody waiting for this,” adds the data to the stream, and it pops out in your app’s code.

When another piece of data arrives, in it goes, and out it comes. Timer-based streams, streaming data from a network socket — they work with the event loop, too, using clock and network events.

Listening to streams

Let’s talk about how to work with data provided by a stream. Say you have a class that gives you a stream that kicks out a new integer once per second: 1, 2, 3, 4, 5…

You can use the listen() method to subscribe to the stream. The only required parameter is a function.

Simple code for creating and listening to a stream.

Every time a new value is emitted by the stream, the function gets called and prints the value:

Data: 1
Data: 2
Data: 3
Data: 4
...

That’s how listen() works.

Important: By default, streams are set up for single subscription. They hold onto their values until someone subscribes, and they only allow a single listener for their entire lifespan. If you try to listen to a stream twice, you’ll get an exception.

Fortunately Dart also offers broadcast streams. You can use the asBroadcastStream() method to make a broadcast stream from a single subscription one. Broadcast streams work the same as single subscription streams, but they can have multiple listeners, and if nobody’s listening when a piece of data is ready, that data is tossed out.

To listen to a stream more than once, you need to convert it into a broadcast stream.

Let’s go back to that first listen() call, because there are a couple more things to talk about.

As we mentioned earlier, streams can produce errors just like futures can. By adding an onError function to the listen() call, you can catch and process any error.

There’s also a cancelOnError property that’s true by default, but can be set to false to keep the subscription going even after an error.

And you can add an onDone function to execute some code when the stream is finished sending data, such as when a file has been completely read.

With all four of those parameters combined — onError, onDone, cancelOnError, and the required parameter (onData) — you can be ready in advance for whatever happens.

This example uses all four parameters to listen().

Tip: The little subscription object that listen() returns has some useful methods of its own. It’s a StreamSubscription, and you can use it to pause, resume, and even cancel the flow of data.

You might need the StreamSubscription object that listen() returns.

Using and manipulating streams

Now that you know how to use listen() to subscribe to a stream and receive data events, we can talk about what makes streams really cool: manipulating them. Once you’ve got data in a stream, a lot of operations become fluent and elegant.

Going back to that number stream from earlier, we can use a method called map() to take each value from the stream and convert it on the fly into something else. Give map() a function to do the conversion, and it returns a new stream, typed to match the return value of the function. Instead of a stream of ints, you now have a stream of strings. You can throw a listen() call on the end, give it the print() function, and now you’re printing strings directly off the stream, asynchronously, as they arrive.

There are a ton of methods that you can chain up like this. If you only want to print the even numbers, for example, you can use where() to filter the stream. Give it a test function that returns a boolean for each element, and it returns a new stream that only includes values that pass the test.

The distinct() method is another good one. If you have an app that uses a Redux store, that store emits new app state objects in an onChange stream. You can use map() to convert that stream of state objects to a stream of view models for one part of the app. Then you can use the distinct() method to get a stream that filters out consecutive identical values (in case the store kicks out a change that doesn’t affect the subset of data in the view model). Then you can listen and update the UI whenever you get a new view model.

There are a bunch of additional methods built into Dart that you can use to shape and modify your streams. Plus, when you’re ready for even more advanced stuff, there’s the async package maintained by the Dart team and available on pub.dev. It has classes that can merge two streams together, cache results, and perform other types of stream-based wizardry.

Try the async package for more stream-based wizardry.

For even more stream magic, take a look at the stream_transform package.

Creating streams

One advanced topic deserves a mention here, and that’s how to create streams of your own. Just like with futures, most of the time you’re going to be working with streams created for you by network libraries, file libraries, state management, and so on. But you can make your own as well, using a StreamController.

Let’s go back to that NumberCreator we’ve been using so far. Here’s the actual code for it:

As you can see, it keeps a running count, and it uses a timer to increment that count each second. The interesting bit, though, is the stream controller.

A StreamController creates a brand new stream from scratch, and gives you access to both ends of it. There’s the stream end itself, where data arrives. (We’ve been using that one throughout this article.)

Stream<int> get stream => _controller.stream;

Then there’s the sink end, which is where new data gets added to the stream:

_controller.sink.add(_count);

NumberCreator here uses both of them. When the timer goes off, it adds the latest count to the controller’s sink, and then it exposes the controller’s stream with a public property so other objects can subscribe to it.

Building Flutter widgets using streams

Now that we’ve covered creating, manipulating, and listening to streams, let’s talk about how to put them to work building widgets in Flutter.

If you saw the previous video on futures, you might remember FutureBuilder. You give it a future and a builder method, and it builds widgets based on the state of the future.

For streams, there’s a similar widget called StreamBuilder. Give it a stream and a builder method, and it will rebuild its children whenever a new value is emitted by the stream.

The snapshot parameter is an AsyncSnapshot, just like with FutureBuilder. You can check its connectionState property to see if the stream hasn’t yet sent any data or if it’s completely finished. You can use the hasError property to see if the latest value is an error. And, of course, you can handle data values.

The main thing is just to make sure your builder knows how to handle all the possible states of the stream. Once you’ve got that, it can react to whatever the stream does.

Summary

This article talked about what streams represent, how you get values from a stream, ways to manipulate those values, and how StreamBuilder helps you use stream values in a Flutter app.

You can learn more about streams from the Dart and Flutter documentation:

Or go on to the next video in the Asynchronous Programming in Dart series. It talks about async and await, which are two keywords that Dart offers to help you keep your asynchronous code tight and easy to read.

Big thanks to Andrew Brogdon, who created the video that this article is based on.

--

--