Flashlist PART 3: Implementing the CRUD Stream
It’s like playing “throw and catch” with Serverpod and Riverpod
Intro
With users now able to sign up and connect with each other on Flashlist, our attention turns to enhancing the app’s core functionality. To achieve this, I intend to leverage Websockets to facilitate real-time transfer of lists and their updates, unlocking the full collaborative potential of Flashlist.
In case you’re new to this series and never heard about Flashlist, no worries basically nobody did XD. But if you’re curious or bored you can catch up here:
- Flashlists Journey with Cutting Edge Technology
- Flashlist PART 1: Serverpod + Riverpod = ServerRiverPodPod
- Flashlist PART 2: Let there be users
In this post I’d like to share the way I interpret streaming CRUD operations from Serverpod to Riverpod.
Serverpod’s WebSocket streaming workflow already functions effectively within a StatefulWidget. However, when integrating Riverpod to cache and optimize functionality, it enables a highly satisfying user experience. Additionally, it promotes clean architecture principles by separating business logic from the presentation layer, thus fostering a very calm developer experience.
Define the models
To transmit data effectively, we need to define classes that extend SerializableEntity. In our case, the first ones will be:
class: Flashlist
table: flashlist
fields:
title: String
color: String
items: List<FlashlistItem?>?, !persist
authors: List<AppUser?>?, !persist
and
class: FlashlistItem
table: flashlist_item
fields:
name: String
parentId: int, relation(parent = flashlist, onUpdate = CASCADE, onDelete = CASCADE)
orderNr: int
The use of !persist
for items and authors indicates they are separate entities stored independently in the database, only attached before transmission via stream message.
StreamMessages
To transmit stream messages via WebSocket in Serverpod, our messages also must extend SerializableEntity. The initial message we require is FlashlistBatch, defined as follows:
class: FlashlistBatch
fields:
collection: List<Flashlist>
As we define a class
without table
we only create a data class extending SerializableEntity
that will be sent to the to the client when a streaming session starts. This way, even when there are no lists, we have an item telling the frontend that it is no longer loading.
Streaming Endpoint
Before delving into streaming capabilities, it’s essential to establish an endpoint with CRUD operations. You can explore my endpoint here.
As you might have seen on Serverpod’s documentation it’s streaming workflow works by overriding two methods: streamOpened
and handleStreamMessage
.
// ..._server/lib/src/endpoints/flashlist_endpoint.dart
class FlashlistEndpoint extends Endpoint {
// CRUD logic goes before
...
@override
Future<void> streamOpened(StreamingSession session) async {
try {
// get all Flashlists from the database
final flashlistsForUser = await getFlashlistsForUser(session);
// send list-collection
sendStreamMessage(session, FlashlistBatch(collection: flashlistsForUser));
// add a listener to a channel associated with user
session.messages.addListener('channel-user-$userId', (message) {
sendStreamMessage(session, message);
});
// loop over the list of lists and add a listener to a channel per list
for (final flashlist in flashlists) {
session.messages.addListener('flashlist-channel-$listId',
(message) {
sendStreamMessage(session, message);
});
}
} catch (e) {
// throw Exception or just print
}
}
Alright! Now that we’ve covered what to do when the client connects to the stream, we want to handle what should happen when a user sends something: like a Flashlist, commands to update or delete one, or in our case a FlashlistItem which is to be displayed in the body of the Flashlist within the UI.
@override
Future<void> handleStreamMessage(
StreamingSession session,
SerializableEntity message,
) async {
if (message is Flashlist) {
// Use the data-class "Flashlist" to create a db entity
// It's important to await this db entity and send otherwise the flashlist won't know it's ID in the UI
final flashlist = await createFlashlist(session, message);
// Send the list via the user channel
session.messages.postMessage('channel-user-$userId', flashlist);
// Add a listener to this lists channel
session.messages.addListener('flashlist-channel-$listId', (message) {
sendStreamMessage(session, message);
});
}
}
Sticking to the metaphor from the start, this the throwing part and now we need to…
StreamProvider
To “catch” our Flashlist and provide it to the Flutter app when loaded, we need a StreamProvider
(among other methods, which you can find here).
// ..._flutter/lib/src/endpoints/flashlist_endpoint.dart
@riverpod
Stream<List<Flashlist?>> flashlistsForUser(FlashlistsForUserRef ref) async* {
final client = ref.watch(clientProvider);
// To ensure a new session
client.flashlist.resetStream();
// Triggers `streamOpened`
await client.openStreamingConnection();
// Close connection when ref is no longer used
ref.onDispose(() async {
await client.closeStreamingConnection();
});
var streamItems = <Flashlist>[];
// loop over the messages sent via streaming endpoint
await for (final message in client.flashlist.stream) {
// "Unpack" the initial Batch
if (message is FlashlistBatch) {
streamItems = [...message.collection];
}
// Add Flashlist when one is sent
if (message is Flashlist) {
streamItems.add(message);
}
yield streamItems;
}
}
With this logic in place we can use the resulting provider within an AsyncValue
in our presentation layer:
// Somewhere in your WidgetTree
ref.watch(flashlistForUserProvider).when(
data: (flashlists) {
// Handle empty state
if (flashlists.isEmpty) {
return const Center(
child: Text('No flashlists yet'),
);
}
return ListView.builder(
itemCount: flashlists.length,
itemBuilder: (context, index) {
final flashlist = flashlists[index];
return FlashlistCard(
...
);
},
);
},
loading: () => const Center(
child: CircularProgressIndicator(),
),
error: (e, st) => Center(
child: Text(e.toString()),
),
);
This ensures that the UI updates seamlessly when a user creates a Flashlist via client.flashlist.sendStreamMessage(Flashlist(…));
, effectively covering the creation and reading aspects of our list management.
Update and Delete
Utilizing a WebSocket and subscribing to a list enables us to mirror backend operations to our frontend list. When a Flashlist is sent to the client and a user is subscribed to it, the list should consistently reflect any updates, facilitating the management of updates and deletions.
First we need messages representing update
and delete
to wrap the data we want to send.
class: UpdateFlashlist
fields:
id: int
title: String?
color: String?
class: DeleteFlashlist
fields:
flashlistId: int
And when a user sends one of them to the server, we run the operation on the db, and post the message to the list channel.
// ..._flutter/lib/src/endpoints/flashlist_endpoint.dart
if (message is DeleteFlashlist) {
// Delete Flashlist from db
await deleteFlashlist(session, message.flashlistId);
// Post message to listChannel
session.messages.postMessage(
'list-channel-${message.flashlistId}',
message,
);
// Unsubscribe listChannel
session.messages.removeListener(
'list-channel-${message.flashlistId}',
(message) {
sendStreamMessage(session, message);
},
);
}
if (message is UpdateFlashlist) {
// Update Flashlist
await updateFlashlist(session, message);
// Post message to listChannel
session.messages.postMessage(
list-channel-${message.id}',
message,
);
}
In our StreamProvider we now add cases handling our message and also change streamItems
accordingly.
// ..._flutter/lib/src/endpoints/flashlist_endpoint.dart
...
if (message is DeleteFlashlist) {
// Remove list from the
streamItems.removeWhere((element) => element!.id == message.flashlistId);
}
if (message is UpdateFlashlist) {
// Find item to update/remove
final indexToRemove =
streamItems.indexWhere((element) => element!.id == message.id);
final itemToRemove = streamItems[indexToRemove];
// Remove old item
streamItems.removeAt(indexToRemove);
// Insert updated item ensuring the UI updates
streamItems.insert(
indexToRemove,
itemToRemove!.copyWith(
title: message.title,
color: message.color,
),
);
}
...
With this implementation, we’ve achieved a functional CRUD Stream that automatically synchronises our UI whenever an item is added, updated, or deleted. By mirroring our database operations to the list on the user’s device, we ensure real-time data updates.
In case you only have one list, that’s all you need. If you need a list of nested lists like my humble self, you better stay tuned.
But you should take a short break, while writing this I also got up, stretched my back and made myself this cappuccino.
FlashlistItem
The FlashlistItem has a field parentId
which makes it easy for us to retrieve a List’s items by its ID,
/// Returns all items where [parentId] == [flashlistId].
/// Also sorts them
Future<List<FlashlistItem?>> getFlashlistItemsByFlashlistId(
Session session,
int flashlistId,
) async {
final items = await FlashlistItem.db.find(
session,
where: (currentItem) => currentItem.parentId.equals(flashlistId),
);
if (items.isNotEmpty) {
items.sort((a, b) => a.orderNr.compareTo(b.orderNr));
}
return items;
}
and also catch the Event in the Frontend and update the List our subscribed user checked out when the Session started or he added the list.
/// FlashlistItem is a message that contains a single [FlashlistItem] entity.
if (message is FlashlistItem) {
final flashlistToUpdate = streamItems.firstWhere((streamItem) => streamItem!.id == flashlistId);
if (flashlistToUpdate!.items == null) {
flashlistToUpdate.items = <FlashlistItem>[message];
} else {
flashlistToUpdate.items!.add(message);
}
}
Of course we also need messages representing at least the deletion of a FlashlistItem which you can find here: frontend and backend
And for good measure I want to add the option to reorder them…
Reorder Listitems
To enable the reordering of list items, we must first establish their initial order. This is achieved by assigning each item an order number using the ‘orderNr’ field, which we define as ‘list.length + 1’. It’s important to note that this approach necessitates updating the order number of a FlashlistItem whenever a preceding item is deleted or when an item is moved ahead of it.
I tried to cover both cases with one method that works great but is not comfortable to look at XD. But anyway to give you an idea, I’m sharing it here.
Future<void> updateOrderNumbers(
Session session,
FlashlistItem flashlistItem,
int? newOrderNr,
) async {
final flashlistItems = await FlashlistItem.db.find(session,
where: (currentItem) =>
currentItem.parentId.equals(flashlistItem.parentId));
if (newOrderNr == null) {
// Item is being deleted, reduce orderNr for all siblings coming after
for (var currentItem in flashlistItems) {
if (currentItem.orderNr >= flashlistItem.orderNr &&
currentItem.id != flashlistItem.id) {
await FlashlistItem.db.updateRow(
session,
currentItem.copyWith(orderNr: currentItem.orderNr - 1),
);
}
}
} else {
// Item is being moved
int oldOrderNr = flashlistItem.orderNr;
for (var currentItem in flashlistItems) {
if (currentItem.id != flashlistItem.id) {
if (oldOrderNr < newOrderNr &&
currentItem.orderNr > oldOrderNr &&
currentItem.orderNr <= newOrderNr) {
// Item is moving down the list, decrease the orderNr of every following item by 1
await FlashlistItem.db.updateRow(
session,
currentItem.copyWith(orderNr: currentItem.orderNr - 1),
);
} else if (oldOrderNr > newOrderNr &&
currentItem.orderNr < oldOrderNr &&
currentItem.orderNr >= newOrderNr) {
// Item is moving up the list, increase the orderNr of every preceding item by 1
await FlashlistItem.db.updateRow(
session,
currentItem.copyWith(orderNr: currentItem.orderNr + 1),
);
}
}
}
// Finally, update the orderNr of the moved item
await FlashlistItem.db.updateRow(
session,
flashlistItem.copyWith(orderNr: newOrderNr),
);
}
}
Of course these changes also need to be applied to any copy of the list a user currently is looking at on his phone, which you can also find here.
Conclusion
By using a WebSocket to transfer data, we ensure that the user’s data is always up to date as long as there is an internet connection. We send the most recent state when the session starts and subscribe to any changes while it lasts, enabling real-time updates without the need for manual refreshes. This approach enhances the user experience by providing seamless and responsive interaction with the application.
Thanks for reading this one. As this is the core functionality of Flashlist, it was important for me to share it in a little more detail than most readers might have required. However, I wanted to ensure it was understandable for everyone.
Make sure to subscribe to not miss anything. And of course don’t forget to clap, either on medium or in real life while sitting in front of your computer. I swear! I will hear it ❤.
Have a great day, and “May the Pods have mercy on your code”.