Creating an Accumulating Batcher Service with Cadence

Nathan Lim
StashAway Engineering
9 min readMay 3, 2020

What is an Accumulating Batcher Service? Essentially we store up entities one at a time until it hits a certain threshold before sending them to be processed. Most of the time we need such a service to reduce the number of API calls that we have to make. Reasons for reducing them could be because they could be charged per-call or we may simply be trying to not overwhelm a particular service. This is especially true if the number of calls we have to make are in the thousands or millions.

This article assumes you have some basic knowledge of Cadence. If you need an introduction, you can read my blog post on Building your first Cadence Workflow.

Here are the requirements for the service that we will be building.

  • Workflow should be able to accumulate entities
  • The service should start processing only when batchSize limit is hit
  • We also want to make sure even if it does not hit the batch size, the batch job should start processing after some time.
  • Should only process a specified maximum number of entities at one time
  • Make sure no entities are lost between batched jobs

That doesn’t sound too difficult. One way to implement this is as shown in the diagram above:

  • we will have a /submit_request endpoint which receives the request and checks if the batch size has been hit on each request
  • to handle the time trigger, a separate /trigger_batch which would be called by some external service (like a cron script) once an hour

Now if it’s just your hobby project, the set up should be good enough. But we will also need to look at the reliability of the system if it is an enterprise system. There are a few potential failure points for this design. For example, if the external service makes a call and the web server is
actually inaccessible due to network failure or software errors, the data is lost forever. Or there could be too many REST calls happening at the same time and it blocks up the server, causing some of them timeout. It would be safer to replace REST calls with a messaging system like Apache Kafka, because it helps to ensure that none of the data is lost.

A few other important considerations for batch service could be the following:

  • ability to automated and manual retry
  • history of previous runs
  • resilience

In the past, people might have use a crontab on the server where you’ve installed the application. In the age of docker and k8s, using something like airflow is much easier and has the benefit of a GUI which makes tracking errors/restarting super easy and helps fulfils the above mentioned considerations.

Now if all these systems are already set up and running at your business, it would be simple to incorporate this. But companies that does not have them, immediately your simple batcher service requires many other resources to be introduced. The bad part about this design is also there are too many services just for supposedly a simple task. However, wouldn’t it be a blessing if there we could substitute all the different services with only one?

Batcher Service as Cadence Workflow

When we design it with Cadence instead, we can see that Apache Kafka and Airflow are no longer needed. Because Cadence workflow are built to be immune to process and cadence service failures, signals sent to it should not be lost normally. Signals can still be lost due to bad coding, more into this later on hence replacing the functionality of Apache Kafka. As this workflow is a long running workflow and we are able to add retry policies to it, we also no longer need to have Airflow to manage it. Cadence also comes with its own web GUI to manage current running workflows as well!

Concepts

Before we head into the code, it is good to know some concepts about Cadence.

First thing we know is that we need to accumulate data, the way to do so in Cadence is using Signals. Signals are available in Go as well but we must always use the Cadence client implementation when we use it in workflows. Eg. workflow.GetSignalChannel(ctx, signalName) instead of make(chan bool, 1). For triggering the process after a certain duration, we can make use of a timer future that expires after a given duration.

It is possible to code this workflow into a workflow that never ends and just trigger process whenever it hits the batch limit. But that will just makes the history very long. So in order to not clog up the history, we will be using the ContinueAsNew function to restart the workflow after each batch we process.

Lastly, I always try to find ways to make it easier to debug our workflow. To do that, we need to have a way to understand the current variables in the workflow while it is running. We can make use of Query Handlers to peek into the workflow while it is running.

As we will focus on writing the workflow in the article, we will be using a boilerplate to start things off. Download this repository and run the command git checkout skeleton-code. You can see that the code to start the worker is already written so you can just focus on writing the workflow. Open the file at worker/workflows/batcher.go to begin.

As always, the first thing to do is to register our workflow. The RegisterWithOptions method allows us to pass a desired name to our workflow instead of it being named as github.com/nndd91/cadence-batcher-example/worker/workflow/batcher.BatcherWorkflow. This is useful when we are starting workflow using the cli command as it shortens the command we need to write.

We will also create a mock activity that simulates a long-running task, in this case just sleeping for 30 seconds. This is the activity that will be run once the batcher accumulates enough input or hit the time limit. You can also replace this activity with any activity of your choice.

I always find that it is good to be able to understand the state of variables when the workflow is running. Let us include a log message when the workflow starts as well as add in a query handler that will return us the value of the accumulated batchedCustomerIds.

With this SetQueryHandler method called, we should now be able to query the variable batchedCustomerIds when the workflow is running. Later on, when the workflow is running, you can visit the cadence web and query for the batchedCustomerIds as shown in the image below.

After the workflow starts, we will need to wait for signals. As workflow codes should not create and interact with Goroutines directly, we will need to use the function workflow.NewSelector provided by the Cadence client library in order to create a select.

Here we have set up 2 new functions to make our main workflow code cleaner.

setupSignalHandler will use the AddReceive method to tell cadence that we are expecting a signal and how to handle it. In this case, once we receive a signal from the signal channel specified, we will do some logging and save the value into the variable signalVal.

setupTimerHandler will create a timer that starts running when the function is called. Once the timer future is completed, the timerExpired variable will be set to true by the AddFuture function.

Now we will need to start waiting for the signal at this point. Since we want the workflow to keep receiving signals as long as the timer is not expired and the number of customerIds has not reached batchSize, we will make use of a for loop with an ending variable here.

The first if !isInitialRun {} else {} is used to make sure that workflow will run the following logic on its first run instead of waiting for the signal. We have to do this because the workflow can receive many signals while it is running. But as we do not want to immediately process an unlimited amount of items, we should limit the processing to a certain number. This means that the extra unhandled item is passed on to the next workflow. When the new workflow starts with a large number of item this time round, we want it to immediately start processing the item instead of waiting for a signal before it starts.

The switch statement here has a couple of cases. The first one is when the timer expired and there are no items batched. As there is nothing to process, we simply rerun the setupTimerHandler to reset the timer back to 0.

The second case is if there are batched items when the timer expired, or if the batch limit is reached. As mentioned above, we will not want to handle an unlimited number of items at this point. So before we run the activity, we first have to slice out the items to fit the batch size specified. We will pass this slice into the activity and run it before we set the endWorkflow variable to true.

When this logic is done we can finally end the workflow!

As the activity to process the batched ids is not the last task, it may be possible that cadence will still receive some signals but will not handle them. Usually when such issue occur, cadence will drop an info log saying that there are unhandled signals. Hence before we terminate the workflow, we must make sure that we check the signal channels for any unhandled signals and append them to our batchedCustomerIds variable. Remember that we have sliced this variable just now? This variable should now only contain items that were not processed as well any new items that were found in the signal channel.

We will then create instance of this workflow using the workflow.NewContinueAsNewError so that we keep the history clean.

Running the workflow

Now that the workflow is completed, we need to test it out. We can start the workflow using the command

docker run --network=host --rm ubercadence/cli:master --domain simple-domain workflow start --tl "batcherTask" --wt BatcherWorkflow --et 600 --dt 600 -w BatcherMain -i '[]'

Once workflow has started we can then add to the batchedCustomerIds by sending it a signal.

docker run --network=host --rm ubercadence/cli:master --domain simple-domain wf signal -w BatcherMain -n batcherSignal -i '"customer1"'

If you send enough signals, making sure to replace customer1 with customer2 and so on.. Then you should see that the workflow will eventually run the ProcessCustomerActivity before finally running a new workflow with ContinueAsNew

And thats it! As mention at the start, this is just a general template for a batcher workflow. You can easily replace the activity with a simple http call or even just handle the batch processor in the activity itself. If it gets too complicated, you can even replace it with a child workflow. Just keep in mind that Cadence workflows might be immune to service failures but it is not immune to developers error! If you are working with signals be sure to check for any unhandled signals before terminating and always add in query endpoints to make it easier to debug your workflows. Hopefully this article have proved useful to you. Thanks for reading!

We are constantly on the lookout for great tech talent to join our engineering team — visit our website to learn more and feel free to reach out to us!

--

--

Nathan Lim
StashAway Engineering

Full Stack Developer SpecialProjectsTeam@StashAway Currently grinding levels in Kotlin+Springboot. Proficient with Ruby+Rails, Python+Airflow,JS+React,RN