A semi quick guide to Reliable Stream Processing with RxJS
This article is about an approach that I recently dealt with to handle a scenario where I had a stream of events (user drawing on the canvas) which I had to process (send over socket.io to a server). I wanted the ability to pause the syncing process when the socket goes down, and resume it when the socket goes up. I also felt that it was important to be able to retry items with a delay in between, while honoring a maximum number of retries.
I am not arrogant enough to think that this is the best solution, nor the only one for the problem. I just had fun coding it up, and would like to share the results with you.
Regarding terminology, throughout this post, I used the term Stream instead of Observable. I hope you don’t mind, but I just think of these things as streams.
A Promise only executes it’s logic once
This works really well. A Promise is a great way to handle async interaction. Promises even play well with async/await, so it makes a lot of sense to use them.
That said, a Promise is a once-off concept. When it executes, it can either resolve a value when it was successul in doing it’s job, or return an error when it wasn’t. But once it’s done either one of those, it’s finished.
Have a look at this code example. Regardless of the fact that the second call to
then happens 2 seconds after the first, it returns the same result. This is because the result is cached, the promise has already been resolved, so it just returns the value again.
This is not a problem when you want to do one action, like a once-off HTTP call to save an object to a service, because you can manually introduce some retry logic to ensure that the message makes it to the service.
When working with a stream
Let’s expand the scope of our example. Let’s imagine that you are working with a stream of messages/events, and you need to ensure that each message/event gets passed to a service.
For example, if you are working on a project where you need to send analytics events to a server based on the user’s interaction with an app, as long as that user is using the app, you will need to send data to the server.
That’s what we mean when we say it’s a stream of events, it doesn’t have an upper cap, like you would have with an array of data items.
By the way, this is usually where something like websockets enter the picture, but for the purpose of this article it doesn’t really matter what your communication channel to your service looks like. It only matters that you now have a bunch of messages that are piling up, and you need to go over an async IO boundary, in this case the network, for each of these messages.
Ok, enough of that, let’s have a look at some example code.
In the code above, the
api.syncEvent will be called a lot of times. As long as the user is moving the mouse around, it’ll call the syncEvent function and attempt to resolve the promise.
As a quick aside, I’m currently working on a course for pluralsight.com on combining React, socket.io, and RethinkDB to build a real-time collaborative white board app. I’ll post the github repo containing the source code and the link to the course here when it’s ready. In the course, I wanted to ensure that if the connection to the websocket drops while the user is drawing, that it can handle it and sync the changes to the drawing when the websocket is reestablished. As you can imagine, a collaborative drawing app generates a lot of data that needs to be synced to the server, so it made sense to find a nice solution to this problem for the course.
Back to our example. How would you ensure that a Promise generated by an item in your stream of events gets retried when it fails, or that it retries only for a number of times before giving up?
How about this scenario? You have an active stream of events, but you know that your connection to the service has been broken. How do you pause your operations, to ensure that you don’t unnecessarily try sending the messages to the service and burn through the number of retries that you want to allocate per item? In other words, how do you manage the syncing process as a whole?
Modelling for a stream
The problem with the code above is that we tied the event handler directly on to the event where we immediately attempt to execute a promise. It would be easier to add concerns like retries, delays, and orchestration of the process as a whole if we encoded it as a proper RxJS stream.
It’s definitely possible to handle all these issues without introducing a streaming library like RxJS, but it’s more expressive to do it using a streaming library.
Let’s publish the events through a stream and process it from the stream, instead of trying to process it directly, like we attempted in the code above.
In the code above, we haven’t really added much, except for the fact that we are not executing the logic directly in the
syncMouseEvent event handler function any more. That said, now we have our items going through a stream, which gives us an idiomatic way to add concerns like retries, delays between retries, and orchestration of the whole process.
Now that we’re working of a stream (RxJS observable), we can add some code to retry items when the promise failed. First, we’re going to just retry items indefinitely.
We added just one line! On line 10, the
pendingItems.next(item) code will publish it to the stream again, which means it will come through the
subscribe handler again. This would’ve been easy enough to do with normal imperative code, without using RxJS, but please bear with me. When we start adding more concerns, it’ll be clear that RxJS is worth it.
Adding a retry limit
So now we have a stream, but it’s kinda like an infinite loop. A promise executes and if it fails, it just tries again. It would be nice if we could have a maximum number of retries and give up when we reach the max number of retries per item.
To do that, we’ll have to store some meta data for each item, so that we know how many times we’ve tried to execute it’s logic.
Phew! Ok, so that’s quite a bit more code. In the code above, we’re not publishing directly to our stream from the
syncMouseEvent function. Instead, we’ve introduced a new function called
queue , where we decorate the item with metadata, so that we can keep track of how many times we’ve tried executing the promise logic for the item. We also added some logic to this queue function to ensure that if the item is being queued, but has already exceeded the maximum retries, that it logs an error to the console, and exits without adding the item back to the stream.
We also had to adapt the logic where we subscribe to the queue. Because it now receives a decorated item with metadata, we needed to pull the
item property from it for our business logic api, which builds up the promise. The subscribe logic also uses the
queue function when the promise fails, because that’s where we up the counter and exit after the max retries have been exceeded.
Adding a delay between retries
Using RxJS makes our stream logic idiomatic, but so far we haven’t really done anything that would’ve been difficult to do without RxJS. That said, with each concept that we add without using a stream library like RxJS, our code will just become more unwieldy, until nobody can make sense of it.
Imagine we wanted to add retry logic to our code without using RxJS, with a loop for retries. We would need to use
setTimeout to our code.. And what about later, when we want to pause/resume the whole process? It’ll become a mess very quickly.
Let’s see what’s involved when we want to add a delay between retries for each item.
This time around, you’ll see that I didn’t share the code in entirety. That’s because it’s actually just 2 lines of code needed to add a delay between retries for each item!
In the code above, have a look at line 2-3. On line 2, we map over the items in the stream and return an observable containing a delay, using the RxJS delay operator. If it’s the first time that this item is tried, according to it’s meta data, we’ll use a delay of zero. On line 3, we just use the mergeAll operator to flatten it out and whala! We now have delays in our sequence!
The above would’ve been a bit more difficult to follow if we had an infinite loop to reprocess promises along with
setTimeout to handle delays. I hope you agree this is way more expressive and elegant.
Subscribing to the stream processing results
So far, we just log an error out to the console when an item’s maximum retries are exceeded or when the item was processed successfully.
It would make sense for our app to be aware of the results of the stream processing, to know when items have been synced successfully or when they failed after exhausting their retry limit.
To do this, we’re going to use streams again, to allow our code to subscribe to it and do whatever they want to do with the results.
In the above code, on line 2 and 3 we added two new Rx Subjects, which allows us to publish and subscribe to streams. Instead of handling the failure and success scenarios directly where it happens, we rather publish to the respective streams.
On line 17, we publish an item that exceeded it’s retries to the
failedItems stream and on line 33, we publish and item that was successfully processed to the
On line 39 and 40, we subscribe to the respective streams and output messages for each item encountered in the stream. Using streams here, allows you to add other concerns on here. For example, you could use it to show something to the user in the app.
Making it generic
Going with streams brought us a bit closer to making this pattern generic. If we just make the promise logic a bit more dynamic, we could use this for anything where we have a stream of IO operations that we would like to process.
Also, it would be nice to add the functionality to make this processing logic pausable, so that if we get notified of a broken connection to the service, we could stop the whole process until the network connection is up again.
In the end, what we want is a generic api that allows us to process a stream of operations, giving us the ability to subscribe to the results, configure a delay between items, configure a maximum per-item retry limit, and allows us to pause/resume the processing as a whole. This is what a consumer of the api would look like:
The above is an example of the library that could be found at https://github.com/hendrikswan/rxsync. It adds on to what we discussed in this article with the concept of pause/resume, but other than that, it’s pretty much the same. I felt that going into pause/resume might make this article a bit too heavy, so if you are interested to find out how it works, go and have a look at the source code!
Streams are awesome, but more importantly, they are ubiquitous. You can pretty much find streams everywhere. When a user interacts with an app, those events are actually a stream of events. When you have a cursor open to a DB with a live feed, that’s a stream. A message queue? Stream! Kafka subscriber? Stream!
The strategy shared in this article can be used anywhere where you have a stream of data that you need to sync over an asynchronous IO boundary, but I think it makes the most sense in a front-end where you can be sure that connectivity is not reliable. I needed it for a course on pluralsight.com where the sample application is a collaborative drawing app on socket.io. I think combining socket.io and this approach makes a lot of sense.
I would love to hear your thoughts.