A Reactive Computation Scheduler
Computation scheduling and job orchestration always play big parts in data processing platforms and reporting systems. This post is a story about how the team I am part of built a custom job scheduler — one that doesn’t care about Time.
It started as a greenfield project, a part of our new and developing data processing platform. It grew, it evolved and we fought some interesting challenges along the way.
The most challenging one was fighting a lost battle with Time. It made us realize that to win against it means not to rely on it.
Enter reactive scheduling.
A cool term we made-up for a cool approach. We named it like this because it is about scheduling jobs to run but using only events happening in our system to decide what and when should be activated.
To reiterate over the problem we were trying to solve:
- there is a set of computations to run
- those computations construct a chain of dependencies
- each computation has one or more dependencies that have to be computed prior to it
- chain is finished when data is imported into our BI tool
Being a small team, we put several hard requirements in front of ourselves. We wanted:
- to manage as little infrastructure as possible
- to achieve as much control as possible
- to keep the expenses as low as possible
These brought us to our own implementation. We connected several AWS services and created our much creatively named JobScheduler.
Job activation — scheduler Lambda function
Since almost all of our computation are run through AWS Data Pipeline service, we had to find a way to trigger a pipeline when we want it to run.
It can be achieved by creating an “on-demand” data pipeline — this is the kind of a pipeline that is never going to run on its own schedule. To run, it has to be activated manually or programmatically.
We wrote a Python Lambda function for this purpose. It has a couple of features:
- it is able to list our computations, i.e. data pipelines
- it is able to find a computation by name
- it is able to activate a computation and pass parameters too it
- it “knows” about dependencies between our computations
Reason we chose Lambda is because it is super cheap for this use case of activating pipelines — we need just a few dozens of invocation per night for now — and we don’t have to maintain any infrastructure around it.
It is relying on the famous boto3 Python library for communicating with other AWS services. For example, code to list all available pipelines could look like this:
def list_pipelines(self):
pipelines = []
response = self.dp_client.list_pipelines()
pipelines.extend(response["pipelineIdList"])
while response["hasMoreResults"]:
response_marker = response["marker"]
response = self.dp_client.list_pipelines(marker=response_marker)
pipelines.extend(response["pipelineIdList"])
pipelines_dict = {}
for p in pipelines:
pipelines_dict[p["name"]] = p["id"]
return pipelines_dict
or to activate a pipeline:
if self.is_pipeline_runnable(job_id):
print "Activating pipeline %s with id %s..." % (name, job_id)
param_values = [
{"id": self.year_arg_id, "stringValue": self.year},
{"id": self.month_arg_id, "stringValue": self.month},
{"id": self.dayofmonth_arg_id, "stringValue": self.dayofmonth}
]
if self.hour is not None:
param_values.append(
{"id": self.hour_arg_id, "stringValue": self.hour}
)
try:
self.dp_client.activate_pipeline(
pipelineId=job_id, parameterValues=param_values
)
except Exception as exc:
...
else:
...
else:
...
First run of the JobScheduler function is triggered by a CloudWatch event at a certain point in time. Hardcoded time is good enough for starting the chain.
But this leaves us with some questions, how does the JobScheduler know when to activate computation(s) that come next in the chain of the dependencies?
Job done notification — from S3 over SNS to Lambda
This is where our approach again shows its reactive nature. We are not tracking if the jobs have finished. Instead, all of them fire notifications when their work is done.
No matter how many different steps there are inside of a computation, there is one final step to all of them. It pushes a .done
file to a predefined S3 bucket. For the time being, contents of those files are not important so we just keep them empty.
Names of the files, on the other hand, are very important. For example, if a file named player_entry__20190312.done
is pushed to the bucket, it means that a job called player_entry
has finished and that it was computing data for date 2019–03–12
.
Creations of those files trigger SNS notifications that fly from S3 over a SNS topic to our JobScheduler lambda function that listens to that topic. It gets invoked on every creation of .done
files and uses their names to deduce which job has just finished, for which date, and which computation should be run next. Then it simply activates the appropriate data pipeline using the code snippet from above.
In certain cases, when a job has more than one dependency, this is not enough to ensure computation correctness. What I described is a very simple scenario:
- Job A finished and pushed
A__${date}.done
file to S3 - A SNS notification coming from S3 triggers JobScheduler lambda function
- JobScheduler activates next job in chain, ex. job B
This does not save us from situations such as when job B depends on jobs A1 and A2. That particular example would lead to activation of job B either after job A1 or A2, whichever comes first.
Let’s say it was A1. Activating B when A1 is done is unsafe from correctness standpoint because it is possible that data coming from A2 will not be there in time and B would produce incorrect results.
For that reason, our jobs are waiting for all of their dependencies to finish, before they actually start. Each job has a waiting function built in which ensures that it starts only when all the inputs are where they should be.
Did all dependencies finish their work?
We took care about the timeout feature, too. There is no reason a job should wait forever if preconditions are not met. We decided to go with “fail fast” mantra and break the computational chain if any of the jobs times out while waiting for input data to show up. There is no use in producing incorrect and incomplete results.
Cherry on the top
We also implemented some features that are not directly related to job scheduling but are very nice to have, such as sending messages to the team’s Slack channel when things go south.
Boto3 again comes to rescue. With just a couple of line of code, we get the alarm as soon as problem occurred.
def sns_notification(self, pipeline_name, pipeline_id):
message = "... %s ... %s" % (pipeline_name, pipeline_id)
sns = boto3.client('sns', region_name="...")
sns.publish(TopicArn="...", Message=message, Subject="...")
Wrap up
We incorporated several AWS services to work in our benefit: CloudWatch, Lambda, Data Pipelines, S3 and SNS. All of them are fully managed by AWS so we have no infrastructure to maintain.
To recap on our initial goals:
to manage as little infrastructure as possible
Check.
We have built a custom solution. It is not a perfect one, but it fits our needs very well.
to achieve as much control as possible
Check.
The final setup of our JobScheduler counts for several dozens of lambda invocations, roughly the same number of empty files in S3 and very similar number of SNS messages.
to keep the expenses as low as possible
Check.
All this combined together produces very small financial cost, insignificantly larger than 0$.
Future plans and wishes
There is always room for improvements.
We were already thinking about using those empty .done
files as containers of additional information. They could be used to transport some additional parameters for the computations or some meta-information about the computation process itself.
Replaying a job is something we are interested in from time to time as well as recomputing whole parts of the chain. This is a feature our scheduler still doesn’t have since it is stateless. In order to achieve this, it would have to track the state of the current computation process.
If we ever go down this road, you guessed it — we will most probably choose fully managed database service.