Understanding Streams in Flutter (Dart)

A stream is one of the challenging topics for the beginner. It took lots of efforts to understand. In order to understand the Streams, you will need to go through the various Examples and then you will get what Exactly Streams is.

At the end of the post, I will write one Flutter application which will be based on the same example which I had discussed in Dart.

I am also going to put my understanding of Streams into the words and Examples.

Streams are a concept Dart. So, I will start streams in Dart and end with applying those concepts in Flutter.

Asynchronous programming in Dart is characterized by the Future and Stream classes.
What is Future?
  • A Future is used to represent a potential value, or error, that will be available at some time in the future.
A simple Example of Future with potential value.
// asynchronous data 
main() async {
String x = await HelloAsync();
print(x);
}
Future<String> HelloAsync() async{
await Future.delayed(Duration(seconds:5));
return 'Message from Future.';
}
Why is Streams?
  • Streams provide an asynchronous sequence of data.

How stream and Future are similar?

  • Both works asynchronously
  • Both have some potential value

How stream and Future are different?

  • A stream is a combination of Futures
  • Future has only one response but Stream could have any number of Response.

If you had ever sent any request to any API in your flutter application then probably you know about async ( Future ).

There are two kinds of streams :
  1. Single Subscription: There could be a maximum of one listener to this stream.
  2. Broadcast: There could be the infinite number of the listener to this stream.

All the example which I will discuss in this post will be Single Subscription based. Once you understand the single subscription then you can easily understand the Broadcast.


Let’s understand this with examples

The main purpose of this example is to let you understand how the streams work. These examples will make you familiar with some available methods and property of the Stream.

Stream Methods

All the available example can be run on Dartpad. I highly recommend you to run the examples and see the pattern of output and understand them.

Example 1:
Stream<T> .periodic() → Stream<T>
// asynchronous data 
main() async {
Duration interval = Duration(seconds: 2);
Stream<int> stream = Stream<int>.periodic(interval, callback);
  await for(int i in stream){
print(i);
}
}
// This callback modify the given value to even number.
int callback(int value){
return ( value + 1 ) * 2;
}

Let’s understand the creation of the stream.

Stream<T> .periodic: Creates a stream that repeatedly emits events at period intervals.

This is one of the basic streams which you can create easily to make anyone understand about the stream.

If callback is omitted the event values will all be null.

  • This stream gives us data every second because we had specified the interval as 2 second.
  • The argument to this callback is an integer that starts with 0 and is incremented for every event.
  • This is an infinite stream of data because we had not specified any condition.
Example 2:
Stream<T>.take(int count) → Stream<T>

Let’s create a finite stream

// asynchronous data 
main() async {
Duration interval = Duration(seconds: 1);
Stream<int> stream = Stream<int>.periodic(interval,transform);
// Added this statement
stream = stream.take(5);
await for(int i in stream){
print(i);
}
}
int transform(int x){
return (x + 1) * 2;
}
  • This is a finite stream of data because we had not specified the condition. The condition says that emit only specified data events. 
    This will print 2, 4, 6, 8, 10.
  • take: take(int count) is based on the number of events
Example 3:
Stream<T>.takeWhile(bool test(T element)) → Stream<T>

You can specify the condition on the returned value also.

// asynchronous data 
main() async {
Duration interval = Duration(seconds: 1);
Stream<int> stream = Stream<int>.periodic(interval,transform);
// Added this statement
stream = stream.takeWhile(condition);
await for(int i in stream){
print(i);
}
}
int transform(int x){
return (x + 1) * 2;
}
// Added this function
bool condition(int x){
return x <= 10;
}
  • takeWhile: takeWhile(bool test(T element)) is based on the Event emitted value.
Example 4:
Stream<T>.skip(int count) → Stream<T>

You can skip some first emitted event.

// asynchronous data 
main() async {
Duration interval = Duration(seconds: 1);
Stream<int> stream = Stream<int>.periodic(interval,transform);
stream = stream.take(10);
stream = stream.skip(2);
await for(int i in stream){
print(i);
}
}
int transform(int x){
return (x + 1) * 2;
}
  • This will skip the first 2 Emitted Event
Example 5:
Stream<T>.skipWhile(bool test(T element)) → Stream<T>

You can skip the event based on the event value too.

// asynchronous data 
main() async {
Duration interval = Duration(seconds: 1);
Stream<int> stream = Stream<int>.periodic(interval,transform);
stream = stream.take(10);
stream = stream.skipWhile(condition);
await for(int i in stream){
print(i);
}
}
int transform(int x){
return (x + 1) * 2;
}
bool condition(int x){
return x < 5;
}
  • If the test gets a pass then it will skip those value.
  • Once the test gets fails it will start emitting the value.
  • If the test gets fail it will stop checking the condition.
Example 6:
Stream<T>.toList() → Future<List<T>>

This method collects all the data from the stream and store in the List.

// asynchronous data 
main() async {
Duration interval = Duration(seconds: 1);
Stream<int> stream = Stream<int>.periodic(interval,transform);
stream = stream.take(5);
List<int> data = await stream.toList();
for(int i in data){
print(i);
}
}
int transform(int x){
return (x + 1) * 2;
}
  • This is async, means you need to wait for some time to get the task done.
  • When this stream ends, the returned future is completed with that List.
Example 6:
Stream<T>. listen() → StreamSubscription<T>

There is one specific method for listening to the stream on data. I like the for loop approach because it is much more friendly.

// asynchronous data 
main() async {
Duration interval = Duration(seconds: 1);
Stream<int> stream = Stream<int>.periodic(interval,transform);
stream = stream.take(10);

stream.listen((x){
print(x);
});

}
int transform(int x){
return (x + 1) * 2;
}
Example 7:
Stream<T>. forEach() → Future

There is one specific method for listening to the stream on data. I like the for loop approach because it is much more friendly.

// asynchronous data 
main() async {
Duration interval = Duration(seconds: 1);
Stream<int> stream = Stream<int>.periodic(interval,transform);
stream = stream.take(10);

stream.forEach((int x){
print(x);
});

}
int transform(int x){
return (x + 1) * 2;
}

Stream Property

Example 8:
Stream<T> .length → Future<int>
// asynchronous data 
main() async {
Duration interval = Duration(seconds: 1);
Stream<int> stream = Stream<int>.periodic(interval);
stream = stream.take(10);
print(await stream.length);
}
  • length is the property of the stream.
  • It specifies the number of emitted event.
  • Waits for all events to be emitted in the stream.

Now I will create a simple flutter application which will be based on the above example. I am pretty sure you will understand how the stream works in a flutter application. There are various ways of using the stream in flutter application. I am showing you one of them.

To build app something like this, every beginner will use build a Stateful Widget and will probably use the setState((){}) again and again to update the UI.

I am going to build the exact same app which is shown in the image

  • Stateless Widget
  • without setState((){}) ( You can’t call setState((){}) with Stateless)
  • I will use Stream
  • I will use StreamBuilder

What is this StreamBuilder?

StreamBuilder listen to the stream and build itself on every new emitted Event by the stream.

Let’s see then stream Builder Implementation.

child: StreamBuilder<T>(
stream: stream, // a Stream<int> or null
builder: (BuildContext context, AsyncSnapshot<T> snapshot) {
if (snapshot.hasError) return Text('Error: ${snapshot.error}');

switch (snapshot.connectionState) {
case ConnectionState.none:
return Text('Not connected to the Stream or null');
case ConnectionState.waiting:
return Text('awaiting interaction');
case ConnectionState.active:
return Text('Stream has started but not finished');
case ConnectionState.done:
return Text('Stream has finished');
}
},
),

You can see the above Example and get some idea about the StreamBuilder. Stream Builder needs two things.

  1. stream: A source of asynchronous data events. We know how we can build our own asynchronous data event source (using Stream.periodic()).
  2. builder: We need to write the logic for UI based on asynchronous interaction (stream emitted event).

Connection State

  1. ConnectionState.none : stream is null
  2. ConnectionState.waiting: awaiting interaction
  3. ConnectionState.active: The stream had started emitting data but not finished yet.
  4. ConnectionState.done: The stream has finished.

Here is the final code.

import 'package:flutter/material.dart';

void main() {
runApp(MaterialApp(
home: HomePage(),
title: 'Stream Demo',
));
}

class HomePage extends StatelessWidget {
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Stream Demo'),
),
body: Center(
child: StreamBuilder(
builder: (BuildContext context, AsyncSnapshot<int> snapshot) {
if (snapshot.connectionState == ConnectionState.done) {
return Text(
'1 Minute Completed',
style: TextStyle(
fontSize: 30.0,
),
);
} else if (snapshot.connectionState == ConnectionState.waiting) {
return Text(
'Waiting For Stream',
style: TextStyle(
fontSize: 30.0,
),
);
}
return Text(
'00:${snapshot.data.toString().padLeft(2,'0')}',
style: TextStyle(
fontSize: 30.0,
),
);
},
initialData: 0,
stream: _stream(),
),
),
);
}

Stream<int> _stream() {
Duration interval = Duration(seconds: 1);
Stream<int> stream = Stream<int>.periodic(interval, transform);
stream = stream.take(59);
return stream;
}

int transform(int value) {
return value;
}
}

This code is using two main thing

  1. Stream
  2. StreamBuilder
If you have any doubts about the stream you can ask. I would love to answer your question.

I will upload the code to GitHub soon and update the link here.