A Reactive Computation Scheduler

Dusan Zamurovic
Alter Method
Published in
6 min readApr 22, 2019
Photo by Matthew Smith on Unsplash

Computation scheduling and job orchestration always play big parts in data processing platforms and reporting systems. This post is a story about how the team I am part of built a custom job scheduler — one that doesn’t care about Time.
It started as a greenfield project, a part of our new and developing data processing platform. It grew, it evolved and we fought some interesting challenges along the way.
The most challenging one was fighting a lost battle with Time. It made us realize that to win against it means not to rely on it.

Enter reactive scheduling.
A cool term we made-up for a cool approach. We named it like this because it is about scheduling jobs to run but using only events happening in our system to decide what and when should be activated.

To reiterate over the problem we were trying to solve:

  • there is a set of computations to run
  • those computations construct a chain of dependencies
  • each computation has one or more dependencies that have to be computed prior to it
  • chain is finished when data is imported into our BI tool

Being a small team, we put several hard requirements in front of ourselves. We wanted:

  • to manage as little infrastructure as possible
  • to achieve as much control as possible
  • to keep the expenses as low as possible

These brought us to our own implementation. We connected several AWS services and created our much creatively named JobScheduler.

Job activation — scheduler Lambda function

Since almost all of our computation are run through AWS Data Pipeline service, we had to find a way to trigger a pipeline when we want it to run.
It can be achieved by creating an “on-demand” data pipeline — this is the kind of a pipeline that is never going to run on its own schedule. To run, it has to be activated manually or programmatically.

We wrote a Python Lambda function for this purpose. It has a couple of features:

  • it is able to list our computations, i.e. data pipelines
  • it is able to find a computation by name
  • it is able to activate a computation and pass parameters too it
  • it “knows” about dependencies between our computations

Reason we chose Lambda is because it is super cheap for this use case of activating pipelines — we need just a few dozens of invocation per night for now — and we don’t have to maintain any infrastructure around it.

It is relying on the famous boto3 Python library for communicating with other AWS services. For example, code to list all available pipelines could look like this:

def list_pipelines(self):
pipelines = []
response = self.dp_client.list_pipelines()
pipelines.extend(response["pipelineIdList"])
while response["hasMoreResults"]:
response_marker = response["marker"]
response = self.dp_client.list_pipelines(marker=response_marker)
pipelines.extend(response["pipelineIdList"])
pipelines_dict = {}
for p in pipelines:
pipelines_dict[p["name"]] = p["id"]
return pipelines_dict

or to activate a pipeline:

if self.is_pipeline_runnable(job_id):
print "Activating pipeline %s with id %s..." % (name, job_id)
param_values = [
{"id": self.year_arg_id, "stringValue": self.year},
{"id": self.month_arg_id, "stringValue": self.month},
{"id": self.dayofmonth_arg_id, "stringValue": self.dayofmonth}
]

if self.hour is not None:
param_values.append(
{"id": self.hour_arg_id, "stringValue": self.hour}
)
try:
self.dp_client.activate_pipeline(
pipelineId=job_id, parameterValues=param_values
)
except Exception as exc:
...
else:
...
else:
...

First run of the JobScheduler function is triggered by a CloudWatch event at a certain point in time. Hardcoded time is good enough for starting the chain.
But this leaves us with some questions, how does the JobScheduler know when to activate computation(s) that come next in the chain of the dependencies?

Job done notification — from S3 over SNS to Lambda

This is where our approach again shows its reactive nature. We are not tracking if the jobs have finished. Instead, all of them fire notifications when their work is done.

No matter how many different steps there are inside of a computation, there is one final step to all of them. It pushes a .done file to a predefined S3 bucket. For the time being, contents of those files are not important so we just keep them empty.
Names of the files, on the other hand, are very important. For example, if a file named player_entry__20190312.done is pushed to the bucket, it means that a job called player_entry has finished and that it was computing data for date 2019–03–12.

Creations of those files trigger SNS notifications that fly from S3 over a SNS topic to our JobScheduler lambda function that listens to that topic. It gets invoked on every creation of .done files and uses their names to deduce which job has just finished, for which date, and which computation should be run next. Then it simply activates the appropriate data pipeline using the code snippet from above.

In certain cases, when a job has more than one dependency, this is not enough to ensure computation correctness. What I described is a very simple scenario:

  • Job A finished and pushed A__${date}.done file to S3
  • A SNS notification coming from S3 triggers JobScheduler lambda function
  • JobScheduler activates next job in chain, ex. job B

This does not save us from situations such as when job B depends on jobs A1 and A2. That particular example would lead to activation of job B either after job A1 or A2, whichever comes first.
Let’s say it was A1. Activating B when A1 is done is unsafe from correctness standpoint because it is possible that data coming from A2 will not be there in time and B would produce incorrect results.

bad situation: B starts before A2 has finished

For that reason, our jobs are waiting for all of their dependencies to finish, before they actually start. Each job has a waiting function built in which ensures that it starts only when all the inputs are where they should be.

good situation: B starts but also waits for A2 to finish

Did all dependencies finish their work?

We took care about the timeout feature, too. There is no reason a job should wait forever if preconditions are not met. We decided to go with “fail fast” mantra and break the computational chain if any of the jobs times out while waiting for input data to show up. There is no use in producing incorrect and incomplete results.

timeout situation: preconditions were not met in time

Cherry on the top

We also implemented some features that are not directly related to job scheduling but are very nice to have, such as sending messages to the team’s Slack channel when things go south.

Boto3 again comes to rescue. With just a couple of line of code, we get the alarm as soon as problem occurred.

def sns_notification(self, pipeline_name, pipeline_id):
message = "... %s ... %s" % (pipeline_name, pipeline_id)
sns = boto3.client('sns', region_name="...")
sns.publish(TopicArn="...", Message=message, Subject="...")
message received in Slack

Wrap up

We incorporated several AWS services to work in our benefit: CloudWatch, Lambda, Data Pipelines, S3 and SNS. All of them are fully managed by AWS so we have no infrastructure to maintain.

To recap on our initial goals:

to manage as little infrastructure as possible

Check.

We have built a custom solution. It is not a perfect one, but it fits our needs very well.

to achieve as much control as possible

Check.

The final setup of our JobScheduler counts for several dozens of lambda invocations, roughly the same number of empty files in S3 and very similar number of SNS messages.

to keep the expenses as low as possible

Check.

All this combined together produces very small financial cost, insignificantly larger than 0$.

Future plans and wishes

There is always room for improvements.

We were already thinking about using those empty .done files as containers of additional information. They could be used to transport some additional parameters for the computations or some meta-information about the computation process itself.

Replaying a job is something we are interested in from time to time as well as recomputing whole parts of the chain. This is a feature our scheduler still doesn’t have since it is stateless. In order to achieve this, it would have to track the state of the current computation process.
If we ever go down this road, you guessed it — we will most probably choose fully managed database service.

--

--

Dusan Zamurovic
Alter Method

Now data engineer and systems architect, former software consultant, body to be leased and engineering manager. Simply put, I love to code.