Reactive programming in Flutter

Alvaro Armijos
8 min readApr 13, 2023

--

Reactive programming is one of the most used paradigms in mobile applications. In classical programming, when we want some kind of information, we actively request it. This can be from a database, an API or a sensor. What we do is ask the data source for the specific information that we need. Instead in reactive programming, what I tell the data source is that I’m interested in its information and to notify me whenever there is a change.

In this way we achieve that our application updates when the data source to which we are subscribed is modified. And with this we avoid having to constantly ask for new information.

To achieve this, reactive programming implements the Pattern Observer. This one helps us to subscribe to events of a Class and get notifications every time there is a change. This means that there is someone who emits data and another who listens to them. Whoever emits the data is simply in charge of sending it to whoever is subscribed to that data and nothing more. The entities that are listening can be removed or added and for the other entity it’s totally transparent.

Reactive programming basics

To grasp the reactive programming paradigm, we must understand its basic concepts.

Stream

It’s the constant stream of data that is changing. It can be the data from the database or a server every time that a certain value is modified or a sensor when it updates the value.

Observable

It’s the object that catches the stream and notifies subscribed entities when the stream changes.

Observer

It’s the component that subscribes to the Observable and receives the change notifications from the stream. The Observer subscribes when it wants to get data and can unsubscribe when it’s not interested in the information.

Subscription

It’s the function that allows the Observer to connect with the Observable and receive notifications when the data changes.

Operator

They are functions that modify the data stream that arrives at the Observer. That is, when the Observable sends the data to each Observer, in the middle may exist an Operator in charge of modifying the flow. For example a timer that delays the stream every second to not receive immediate information.

Now we are going to implement this in Flutter. Observer is in charge of notifying all the events that come from the Observable.

abstract class Observer<T> {
void notifyChange(T newValue) {}
}

In the Observable we have some functions, the main one is to add an Observer. We also create a function to remove the Observer. Finally we are going to notify all Observers, that is, advise that a value has changed, through the notifyObservers method.

abstract class Observable<T> {
void addObserver(Observer<T> observer) {}
void removeObserver(Observer<T> observer) {}
void notifyObservers(T newValue) {}
}

Now we create a Singleton DataProvider that is in charge of handling the data streams. In this Class we implement the Observable and the addObserver, removeObserver and notifyObservers methods.

class DataProvider implements Observable<int> {
static final DataProvider _instance = DataProvider._internal();

factory DataProvider() {
return _instance;
}

DataProvider._internal();

final List<Observer> _observerList = [];
int _count = 0;

void changeCount() {
_count++;
notifyObservers(_count);
}

@override
void addObserver(Observer<int> observer) {
_observerList.add(observer);
}

@override
void removeObserver(Observer<int> observer) {
_observerList.remove(observer);
}

@override
void notifyObservers(int newValue) {
for (var observer in _observerList) {
observer.notifyChange(newValue);
}
}
}

To register the Observers, we must save them in memory, for this we create a variable _observerList where we have the list of Observers. When there is a change in the data, we have to notify each Observer that we have in the list. Finally we create a Notification widget where we implement our Observer.

class ObserverPage extends StatelessWidget {
const ObserverPage({super.key});

@override
Widget build(BuildContext context) {
final _dataProvider = DataProvider();
return Scaffold(
appBar: AppBar(
title: const Text("Observer"),
actions: const [
Notification(),
],
),
body: ProductGrid(
onTap: () => _dataProvider.changeCount(),
),
);
}
}
class Notification extends StatefulWidget {
const Notification({super.key});

@override
State<Notification> createState() => _NotificationState();
}

class _NotificationState extends State<Notification> implements Observer<int> {
final DataProvider _dataProvider = DataProvider();
int itemCount = 0;

@override
void initState() {
_dataProvider.addObserver(this);
super.initState();
}

@override
void dispose() {
_dataProvider.removeObserver(this);
super.dispose();
}

@override
Widget build(BuildContext context) {
return CartNotification(
count: itemCount,
);
}

@override
void notifyChange(int newValue) {
setState(() {
itemCount = newValue;
});
}
}

As we can see, every time that a data is updated in the DataProvider, the Observable notifies all the Observers and the Observer that we implement in the Notification widget is in charge of updating the number in the icon, without the necessity for the user to consult constantly for new information.

The above example works, but not in a reactive way as expected, since we’re using setState() to update the number in the icon. Let's see how we can improve it.

Reactive programming in Flutter

As we mention at the beginning, reactive programming is the programming with asynchronous data streams. In Dart, this data type is called a Stream.

In 2018 Google introduced the most powerful feature of Dart, Streams:

Widgets + Streams = Reactive

Flutter Widgets in combination with Streams offer a reactive way of handling the UI, data stream through the application and updating the UI when the data changes.

Streams

In Dart, a stream is a pipe with two ends. We can add any type of Object asynchronously in the pipeline and on the other side listen for this information, for example to update the UI.

Streams characteristics:

  • Streams provide an asynchronous sequence of data.
  • Data sequences include user-generated events and data read from files.
  • You can process a stream using either await for or listen() from the Stream API.
  • Streams provide a way to respond to errors.
  • There are two kinds of streams: single subscription or broadcast.
  • In a Stream we can send a value, event, object, collection, map, error or even another Stream. A stream can transmit any type of data.
  • Stream also allows processing of the data that flows into it before it leaves.

StreamTransformer

StreamTransformer is used to control the processing of data within a stream. A StreamTransformer:

  • It’s a function that “captures” the data that flows inside the stream.
  • Transform the data.
  • The result of this transformation is also a Stream.
  • It’s possible to cascade multiple StreamTransformers.

Any kind of processing can be done in a StreamTransformer, for example:

  • Filter,
  • Regroup,
  • Modification,
  • Inject data to other streams,
  • Buffering,
  • Perform any type of action/operation based on the data.

Listen is called on a stream to tell it that it wants to receive events and to register the callbacks that will receive those events. When listen is called, it receives a StreamSubscription object that is the active object that provides the events and that can be used to stop listening again or temporarily pause subscription events.

Single-subscription Streams

A single-subscription stream allows only a single listener during the whole lifetime of the stream. It doesn’t start generating events until it has a listener, and it stops sending events when the listener is unsubscribed, even if the source of events could still provide more.

Listening twice on a single-subscription stream is not allowed, even after the first subscription has been canceled.

Single-subscription streams are generally used for streaming chunks of larger contiguous data, like file I/O.

Broadcast stream

A broadcast stream allows any number of listeners, and it fires its events when they are ready, whether there are listeners or not.

RxDart

RxDart extends the capabilities of Dart Streams and StreamControllers.

RxDart adds functionality to Dart Streams in three ways:

  • Stream Classes — create Streams with specific capabilities, such as combining or merging many Streams.
  • Extension Methods — transform a source Stream into a new Stream with different capabilities, such as throttling or buffering events.
  • Subjects — StreamControllers with additional powers

Dart provides the StreamController class to create and manage a Stream. RxDart offers two additional StreamControllers with additional capabilities, known as Subjects:

  • BehaviorSubject — A broadcast StreamController that caches the latest added value or error. When a new listener subscribes to the Stream, the latest value or error will be emitted to the listener. Furthermore, you can synchronously read the last emitted value.
  • ReplaySubject — A broadcast StreamController that caches the added values. When a new listener subscribes to the Stream, the cached values will be emitted to the listener.

Now let’s review the previous example using Streams. We created CartBloc to handle the business logic. First we have the addition that allows externally adding String elements to the Stream. For the number of products in the cart we have a Stream of int that is backed by BehaviorSubject and is updated each time a product is added to the cart.

In the CartBloc constructor we add a listener to the additionController to know when a product is added to the Stream. Every time that a new product enters the Stream, this item is added to the list of products and we also add the number of items in the cart to the Stream's itemCount.

class CartBloc {
final _cart = [];

Sink<String> get addition => _additionController.sink;

final _additionController = StreamController<String>();

Stream<int> get itemCount => _itemCountSubject.stream;

final _itemCountSubject = BehaviorSubject<int>();

CartBloc() {
_additionController.stream.listen(_handle);
}

void _handle(String product) {
_cart.add(product);
_itemCountSubject.add(_cart.length);
}
}

In the UI we use a StreamBuilder to listen to the itemCount Stream and update the notification whenever there is a change.

class ReactivePage extends StatelessWidget {
const ReactivePage({super.key});

@override
Widget build(BuildContext context) {
final cartBloc = CartProvider.of(context);

return Scaffold(
appBar: AppBar(
title: const Text("Reactive"),
actions: [
StreamBuilder<int>(
stream: cartBloc.itemCount,
initialData: 0,
builder: (_, snapshot) {
return CartNotification(
count: snapshot.data!,
);
}),
],
),
body: const ProductGrid(),
);
}
}

In the ProductGrid widget, each time that the user taps on a product, this item is added to the Stream.

class ProductGrid extends StatelessWidget {
const ProductGrid({super.key});

@override
Widget build(BuildContext context) {
final cartBloc = CartProvider.of(context);
return GridView.count(
crossAxisCount: 2,
children: products
.map((product) => ProductSquare(
product: product,
onTap: () {
cartBloc.addition.add(product);
},
))
.toList(),
);
}
}

Finally we obtain the same result, but now in a reactive way.

If you like it, you can Buy Me A Coffee!

--

--

Alvaro Armijos

Electronic and Telecommunications Engineer | #Flutter Developer 💙 | Always eager to learn | https://www.linkedin.com/in/alvaro-armijos-sarango/