Streaming Data: Exactly Once Processing

By Andrew G. Psaltis

This article was excerpted from the book Streaming Data.

Plenty of cases where being able to process a message exactly once is important exist. For example, let’s say we’re receiving data from an order stream and need to update a 3rd party order system. Because we didn’t implement the 3rd party API, we can’t make any assumptions regarding its idempotency. Therefore, we need to ensure we don’t send it duplicate messages.

Ideally, to handle this we’d be able to use an acknowledgement feature of a streaming API. This interaction pattern — acknowledgement from the 3rd party API and the streaming client acknowledging the message being handled are detailed below in figure 1.

Figure 1 HML with full acknowledgement show in context

This is fraught with challenges though. First, you need to implement message acknowledgment for the 3rd party API. Then you need to implement it with the streaming API. Unfortunately, in many cases you won’t control the streaming API and may have to deal with the inability to send back acknowledgements. If that’s the case, how can we ensure we’ve processed a message exactly once?

In this case, we’ll need to keep a record of all the messages we’ve seen for a time period. If there’s a guaranteed unique message ID, we can use it in our “been processed message store;” otherwise the data can be hashed, or perhaps another fingerprinting mechanism can be computed. By keeping a record of all the messages we’ve seen, we’re increasing our client complexity and storage requirements. We’re also protecting ourselves for when the streaming API crashes. The architecture for this will look like figure 2.

Figure 2 HML with partial ACK’ing and local storage

This may seem simple enough, but what if there’s more than one streaming API server? At a certain point in time, one’s going to crash, or perhaps be taken down for maintenance. When this happens our streaming client will connect to a different API server and begin to consume the stream. This raises an interesting question. How do we ensure we don’t process a previously processed message again? Remember, we can’t assume the streaming API is keeping track of what messages were already delivered to us. Perhaps it does during normal maintenance, but what if it crashed before it could record the last message sent to us, and sends us the same message again?

To be able to adequately handle this, we’ll need to use a distributed store for recording the previously seen message. This is illustrated below in figure 3, where there’s more than one streaming API and a streaming client.

Figure 3 HML with partial ACK’ing and distributed storage for processed messages

To keep things simple in figure 3, I only showed a single streaming client. I think you get the picture. In essence, as time goes on we see that the Streaming API server 01 goes away and then Streaming API server 02 sends our streaming client a message we’ve already seen.

In this case, and in all cases, we need to check to see if we’ve processed the message already, and if so, we need to discard it. There’s a subtlety to this design you need to keep in mind. We’re using a distributed store for keeping track of previously processed messages. Fantastic! Now we’ve protected ourselves from double processing messages. If we’re processing a stream that has a significant velocity, any requests we make to a service that’s over the network may potentially jeopardize our ability to meet an SLA. You’ll want to keep this in mind, measure the performance and, if you notice an impact, you’ll want to consider using a local store that flushes its messages to a distributed store. Figure 4 below illustrates how this may work.

Figure 4 Time lapse of handling a streaming API server changing and keeping exactly once processing

The important thing to point out when looking at figure 4 is that when the streaming client detects the network connection to the streaming API server has been lost, it needs to synchronize the local store with the distributed store. As long as this step happens prior to receiving another message to process, we should be able to ensure that we don’t process a message more than once.