Photo by Robs on Unsplash

Flashlist PART 4: Mind your Context Sensitivity

Enabling Real-Time Collaboration with Serverpod and Riverpod

Ben Auer
7 min readApr 1, 2024

--

Intro

Now that we’ve established a system where CRUD operations are seamlessly updated via stream, it’s time to explore how users can grant access to this stream to other users. Our stream is already built to update data for multiple users, so let’s see how we can extend this functionality to facilitate collaboration among users.

But before we dive into that, if you missed last week’s post or want to catch up on what we’ve covered in this series so far, you can click on one of these links:

Okay, everybody who is still here can follow me on today's quick tour through a simplified version of the Flashlist Stream. No Photos please!

The Flashlist Stream

In the backend our stream starts when a client connects to it, then the streamOpened method is called.

// ..._server/lib/src/endpoints/flashlist_endpoint.dart

class FlashlistEndpoint extends Endpoint {
// CRUD logic goes before

...
@override
Future<void> streamOpened(StreamingSession session) async {
try {
// get all Flashlists from the database
final flashlistsForUser = await getFlashlistsForUser(session);

// send list-collection
sendStreamMessage(session, FlashlistBatch(collection: flashlistsForUser));

// add a listener to a channel associated with user
session.messages.addListener('channel-user-${currentUser.id}', (message) {
sendStreamMessage(session, message);
});

// loop over the list of lists and add a listener to a channel per list
for (final flashlist in flashlists) {
session.messages.addListener('flashlist-channel-${flashlist.id}',
(message) {
sendStreamMessage(session, message);
});
}

} catch (e) {
// throw Exception or just print
}
}
...

Every message received from the client will call handleStreamMessage:

@override
Future<void> handleStreamMessage(
StreamingSession session,
SerializableEntity message,
) async {
if (message is Flashlist) {
// Use the data-class "Flashlist" to create a db entity
// It's important to await this db entity and send otherwise the flashlist won't know it's ID in the UI
final flashlist = await createFlashlist(session, message);

// Send the list via the user channel
session.messages.postMessage('channel-user-$userId', flashlist);

// Add a listener to this lists channel
session.messages.addListener('flashlist-channel-$listId', (message) {
sendStreamMessage(session, message);
});
}

if (message is DeleteFlashlist) {
// Delete the flashlist from DB
await flashlistHelper.deleteFlashlist(session, message.flashlistId);

// Send the delete-message via list channel
session.messages.postMessage(
'flashlist-channel-$listId',
message,
);

// Remove listener from list channel
session.messages.removeListener(
'flashlist-channel-${message.flashlistId}',
(message) {
sendStreamMessage(session, message);
},
);
}

if (message is UpdateFlashlist) {
// Update DB row
await flashlistHelper.updateFlashlist(session, message);

// Send update-message via list channel
session.messages.postMessage(
'flashlist-channel-${message.id},
message,
);
}
}

And when these messages are sent through either the user- or the list-channel, they have to be applied to the list that the user has checked out on his phone.

// ..._flutter/lib/src/endpoints/flashlist_endpoint.dart

@riverpod
Stream<List<Flashlist?>> flashlistsForUser(FlashlistsForUserRef ref) async* {
final client = ref.watch(clientProvider);

// To ensure a new session
client.flashlist.resetStream();

// Triggers `streamOpened`
await client.openStreamingConnection();

// Close connection when ref is no longer used
ref.onDispose(() async {
await client.closeStreamingConnection();
});

var streamItems = <Flashlist>[];

// loop over the messages sent via streaming endpoint
await for (final message in client.flashlist.stream) {
// "Unpack" the initial Batch
if (message is FlashlistBatch) {
streamItems = [...message.collection];
}

// Add Flashlist when one is sent
if (message is Flashlist) {
streamItems.add(message);
}

// Deletes Flashlist when id == message.flashlistId
if (message is DeleteFlashlist) {
streamItems.removeWhere((element) => element!.id == message.flashlistId);
}

// Removes old Item and replaces with new one
if (message is UpdateFlashlist) {
final indexToRemove =
streamItems.indexWhere((element) => element!.id == message.id);
final itemToRemove = streamItems[indexToRemove];

streamItems.removeAt(indexToRemove);
streamItems.insert(
indexToRemove,
itemToRemove!.copyWith(
title: message.title,
color: message.color,
),
);
}

// Yield streamItems on the end of every iteration
yield streamItems;
}
}

Of course, this is not everything, as we also handle FlashlistItems and reordering. You can review the code here: Server and Flutter.

Adding users

Flashlist will have two ways of adding a user to a Flashlist:

  1. Instant join: If users are connected/friends, they can be added via a tap on the connection’s avatar in the share dialog, and the invitee should be added to the list instantly.
  2. Via email: Similarly to connection requests, a request is added for the invitee and the list, and the invitee will join the list upon confirming the request.

This means, when a user connects to the Websocket:

  • We read the database and send lists associated with the current user.
  • We attach listeners to the user- and list channels.

Now, when a message arrives from the client side:

  • We process the incoming message and determine the corresponding operation.
  • We apply the operation to the database.
  • We send the message to the client.

Upon receiving this message on the client, it will update the UI by either adding a Flashlist to the streamItems yielded by our stream or replacing an item by its ID with the newly sent item.

These operations are the same for every user. However, when a user is added to a list, we are not creating a new list; instead, we are sharing an existing one along with its items and other editors. Additionally, we have to update Flashlist.authors so that existing authors see the new author in their list.

In technical terms this means we:

  • Create a new FlashlistPermission for the invitee and the list
  • Post the message to the channel associated with the list

Moreover, this ensures that when the invitee opens a new session, the list will be included in the initial FlashlistBatch, allowing for seamless integration into their user experience. Additionally, all users subscribed to the list-channel will receive a real-time update of a new co-author being added to the list.

BUT in the case of our main Invitation method (instant-join) we are sending the message to add the invitee from the inviters client. Meaning we can’t access the invitees session in handleStreamMessage.

Or Can we? Spoiler alert: yeah we can. While searching on google and harassing every AI that wants to talk to me, I got a few suggestions like using the Dio http client to send a HTTP request to the same endpoint checking if the invitee has an active session. This might definitely work but in my opinion that’s more of a hack than clean streaming workflow.

So, I decided to take a different approach: Splitting the process into two events and sending the event JoinFlashlist when handling AddUserToFlashlist on the server.

And here is how that looks in code:

if (message is AddUserToFlashlist) {
// Create new permission
await flashlistPermissionHelper.createFlashlistPermission(
session,
message.flashlistId,
message.user.id!,
message.accessLevel,
);

// Post add-user to the list-channel
session.messages.postMessage(
'flashlist-channel-${message.flashlistId}',
message,
);

// posts JoinFlashlist-Event to the invitees user channel
session.messages.postMessage(
'channel-user-${message.user.id!}',
JoinFlashlist(
user: message.user,
flashlistId: message.flashlistId,
accessLevel: message.accessLevel,
),
);
}


if (message is JoinFlashlist) {
// Retrieves the flashlist with items and co-authors
final flashlist = await flashlistHelper.getFlashlistByIdWithAttachments(
session,
message.flashlistId,
);

// Posts the list to the user-channel
session.messages.postMessage(
'channel-user-${message.user.id!}',
flashlist!,
);

// And as this is the invitees session we add a listener to the list-channel
session.messages.addListener(
'flashlist-channel-${message.flashlistId}',
(message) {
sendStreamMessage(session, message);
},
);
}

And to make this work in our messageStreamHandler, we need:

// Sent to everybody that listens to the list-channel
if (message is AddUserToFlashlist) {
// Get list
final flashlistToUpdate =
getFlashlistByFromStream(streamItems, message.flashlistId);

// Add user to Flashlist.authors
if (flashlistToUpdate!.authors == null) {
flashlistToUpdate.authors = <AppUser>[message.user];
} else {
flashlistToUpdate.authors?.add(message.user);
}
}

// Only posted to the invitees channel
if (message is JoinFlashlist) {
final client = ref.read(clientProvider);

// Sends the JoinFlashlist event back to the server
// IF the invitee has an active session
client.flashlist.sendStreamMessage(message);
}

In short: AddUserToFlashlist adds permission, informs co-authors of new editor and sends a JoinFlashlist message to the invitees user-channel to be bounced back to the server.

If no message is received or bounced back, it indicates that the user has no active session. In this case, the next time the user connects to the WebSocket, the streamOpened function will read the new permission and add a listener accordingly.

However, if the message is sent back, it means that the invitee has an active session. In this scenario, we access the invitee’s session in our message handler for JoinFlashlist and attach a listener. This ensures that the invitee’s UI is updated promptly whenever changes are made to the list.

And this is how it looks when compiled:

Removing users

Like mentioned in the intro, a user should also have the option to leave a list or be removed from it. This is where it becomes a little bit tricky. I tried an approach comparable to the others, in which I send an event to the user that is to be removed, removing the list from his streamItems, and an event to remove a user. This worked fine until the user was invited a second time within the same session, which broke the app on the invitee’s side. This is why I decided that when a user leaves, I reset the stream and invalidate its reference. This still allows for seamless updates on the user’s side but guarantees that the departing user does not receive any messages that could break the app on his side.

Here is how it looks like in code:

if (message is RemoveUserFromFlashlist) {
// Delete list permission
await flashlistPermissionHelper.deleteFlashlistPermission(
session,
message.flashlistId,
message.userId,
);

// Post remove-message to the list-channel
session.messages.postMessage(
'flashlist-channel-${message.flashlistId}',
message,
);

/// When received in the frontend, LeaveFlashlist will reset the stream
/// and invalidate the ref, which will trigger a re-fetch of the flashlists
session.messages.postMessage(
'channel-user-${message.userId}',
LeaveFlashlist(
flashlistId: message.flashlistId,
userId: message.userId,
),
);
}

And in the frontend:

if (message is RemoveUserFromFlashlist) {
// Gets list from streamItems
final flashlistToUpdate =
getFlashlistByFromStream(streamItems, message.flashlistId);

// Removes the user from Flashlist.authors
flashlistToUpdate?.authors!
.removeWhere((currentAuthor) => currentAuthor!.id == message.userId);
}

/// [LeaveFlashlist] is a message that is sent to a user that is removed from a list.
/// It resets the stream and invalidates the ref.
if (message is LeaveFlashlist) {
final client = ref.read(clientProvider);

client.flashlist.resetStream();

ref.invalidateSelf();
}

Conclusion

When empowering users to collaborate it is important to understand that an event may need different actions for the involved users based on their roles within the collaboration. Additionally, when users conclude their collaboration on a specific element, it is important to ensure that this does not introduce the potential for unhandled exceptions or other adverse outcomes.

Thanks for reading my update on Flashlist, Happy remaining Easter holidays and “May the pods have mercy on your code!”

--

--