GCP Pub/Sub ordering and Apache NiFi

Neil Kolban
Google Cloud - Community
7 min readJan 19, 2020

Note: This article was published prior to the availability of a feature of GCP Pub/Sub called Ordering Messages. Please consider that native feature of GCP Pub/Sub as having precedence over notions in this article which pre-date the newer function.

GCP Pub/Sub does not guarantee message ordering. This means that if a single publisher publishes the sequence of messages #1, #2 and #3 they are not guaranteed to be received by a single subscriber in that order.

For some use cases, this could be a serious problem. For example, if the logical messages being published in sequence were:

  • Open an account
  • Deposit $100 in the account
  • Transfer $50 from the account

then the sequence of processing could be very important. Executing upon one of the previous messages out of order could easily result in an incorrect outcome.

Apache NiFi is an Open Source platform to process and distribute data. We won’t be covering the details of Apache NiFi here but there is plenty of excellent literature upon it found elsewhere. What we will look at is how Apache NiFi can be used to provide message ordering for GCP Pub/Sub.

Apache NiFi has a GCP subscriber processor (ConsumeGCPubSub) that can be used as a source of FlowFiles (FlowFiles are the units of data … think GCP Pub/Sub messages). We need to ensure that messages are processed in order. One way to achieve this is through the use of the NiFi supplied processor called EnforceOrder. This processor has two important concepts associated with FlowFiles presented to it.

The first is that it is expected that each FlowFile has a sequence number attribute. It is this attribute that is used to define the required ordering.

Note: A question that comes up here is why we need a sequence number when each message has a publication time stamp. Can’t we order our messages by the timestamp? To see why this won’t work, consider a message that arrives with a timestamp of 2:00:00. Are we certain that there isn’t a message that may subsequently arrive with a timestamp of 1:59:59? What if the next message has a timestamp of 2:05:00. Are we sure that we aren’t missing a message from 2:03:00?

Imagine that EnforceOrder is initially expecting a message with sequence ID of 1. If a message with that id arrives, it will be passed through. EnforceOrder is stateful. It knows which message sequence ID it is expecting. When it finds a matching message, not only does it pass that message through but it then increments the sequence number expected … In this example, we would now be looking for a message with sequence ID of 2. If the next message seen has a sequence ID of 3, EnforceOrder would realize that something is amiss. It would now route this message to the wait connection and move on to the next input message. If the next message it sees has a sequence ID of 2, it would pass that through and increments the stateful desired next message to be 3. Eventually, the message that was routed to wait would be re-processed and we would have emitted the correct ordered sequence.

The second concept relating to EnforceOrder is that not all FlowFiles need be ordered relative to each other but may be ordered relative to groupings. If we imagine that there can be multiple simultaneous orders being processed then we find that we only need to maintain order per grouping and not as a totality.

Mapping these ideas to the properties of the EnforceOrder processor, we find that the sequence ID will be looked for as an attribute on an incoming FlowFile whose name is provided in the “Order Attribute” property. The grouping of FlowFiles to form a sequence is supplied as an expression in the “Grouping Identifier” property. This will commonly be an expression that evaluates to an attribute found on the FlowFile.

In our example, we will imagine that a FlowFile contains the following properties:

  • seqNum — An integer valued sequence number starting at 1.
  • groupId — A string value common among FlowFIles of the same group.

Let us examine EnforceOrder in some more detail to understand how it works and how it is used to solve our problems. If we look at the default properties of the EnforceOrder processor, we will find the following:

We will also find that the processor has the following output connections:

  • success
  • failure
  • wait
  • skipped
  • overtook

The overall goal is for FlowFiles to arrive and be output in their correct sequence through the success connection. To see how this happens, let us examine a sequence of messages. The first one to arrive is a message with groupId of XYZ and seqNum of 1. EnforceOrder will look at the groupId and realize that it hasn’t seen such a group of messages in the past. It will now maintain state for this group and set the initial expected order to be 1. The incoming message’s seqNum will be examined and found to be 1. This is indeed the next (in our case first) expected message and will be passed on through success. The next expected seqNum will be incremented from 1 to 2 meaning that the next message we expect should have a value of 2. Now we assume that the next message has a groupId of XYZ and a seqNum of 2. EnforceOrder recognizes that this is a groupId seen before and compares the expected seqNum (2) to the current seqNum contained in the message (2) and again lets it pass through the success connection. The next expected seqNum is incremented from 2 to 3. Let us now look at an out of sequence message. Assume the next message has groupId of XYZ and a seqNum of 4. EnforceOrder sees that we have a known groupId (XYZ) and expects the message to have a seqNum of 3. It discovers that the seqNum is 4 and hence it has arrived too early (out of sequence). The message is sent to the wait connection and the state of EnforceOrder remains the same. It is assumed that the wait connection is looped back to EnforceOrder so that the message can be processed again in the future. Take a moment to look at the flow diagram and see that wait is indeed routed back to EnforceOrder. Messages continue to be processed in this fashion with messages received being sent to wait and the next expected message being passed through to success and the state of EnforceOrder next sequence number being incremented. Messages that were received early and sent to wait will be re-examined in the future.

This is the core of EnforceOrder but there are other possibilities we haven’t yet discussed which we will now examine.

A duplicate message is one which is received more than once. Thinking this through, we will realize that a duplicate message is one which has a seqNum equal to a value we have already seen. This is equivalent to having a seqNum less than the next sequence number we next expect. For example, if we are expecting a message with seqNum 4 and receive a message with seqNum 2, that means that we have previously processed a message with seqNum 2. In this case, EnforceOrder routes the duplicate message to the skipped connection.

What about a broken sequence of messages where we receive messages 1, 2 and 4 and never receive message 3? This could be a problem as we would forever be maintaining message 4 and locking up resources for no reason. This is where the wait timeout property comes into play. This is a configurable interval of time which is used to define how long a waiting (early received) message should be held. If the message is still present after this interval, it is routed to the overtook connection.

We have also seen that EnforceOrder is maintaining state per groupId. Since we don’t know how many messages are in a group and there is no indication of a last message this seems to imply that the state of groups will continue to accumulate. This is where the inactive timeout configuration property of EnforceOrder can be used. This specifies an interval of time that causes EnforceOrder to forget about message groups. The property is defined as an interval of time after the last message seen of a given group. For example, if message number 5 of group XYZ is seen at 12:00 and we define an inactive timeout of 10 minutes, then at 12:10 we can declare that we will no longer see messages belonging to this group and can forget about the state we are maintaining.

For GCP Pub/Sub, we can map attributes of the published message to the attributes corresponding to groupId and seqNum to provide the data needed for EnforceOrder.

--

--

Neil Kolban
Google Cloud - Community

IT specialist with 30+ years industry experience. I am also a Google Customer Engineer assisting users to get the most out of Google Cloud Platform.