Handyman: The task manager

Trilok Jain
Hevo Data Engineering
6 min readJul 23, 2020

Most of the direct and auxiliary activities that help in the processing of events by Hevo comprise of background tasks. Be it the task that checks if a BigQuery warehouse is still active or the task that actively fetches data from a billion rowed Postgres table, all of the tasks require active management and monitoring.

Goals

The expectations from a task manager at Hevo, have evolved over time. Some of which are:

  • Provide a fail-safe means of submitting and tracking tasks
  • Be able to gracefully recover from system failures
  • In a multi-client environment, provide a fair timeshare to all the task groups
  • Limit parallelisation (so that client resources are not overwhelmed).
  • Provide a mechanism to schedule some tasks on multi-level priorities
  • Prevent any task from starvation and being unattended for long.

At Hevo, this is accomplished by Handyman — it’s distributed task manager.

Task

A task is the fundamental unit of execution in Handyman. It constitutes of the following key parameters:

- Type (And the corresponding handler)
- An initial priority
- An ageing function - to elevate priority with time
- State change callbacks
- Dependency on other tasks
- Group constraints
- Maximum permitted run time
- Maximum permitted retries

As of today, Handyman processes 45 different types of background tasks every day

Handyman task state transitions. More on this in subsequent sections

Architecture

The building blocks of Handyman

Tasks are submitted to Handyman via its client which in turn submits them to an Apache Kafka topic. The tasks go through a duplicity check and are then permanently registered in Handyman. The handyman processor hands these tasks over to Jesque which processes them via its workers.

Jesque: This Redis backed library enables processing of tasks on several priority queues and is the spine of Handyman.

Task Workers: Worker threads on which tasks are actually processed. The distributed architecture of Jesque helps Handyman scale horizontally via its task workers. The worker thread pools can be namespaced to restrict the kind of tasks that are executed on them and can be scaled independent of other worker pools.

Priority Evaluator: Each task is submitted with a default base priority (some being more important than the others) and a priority ageing function (which usually is a function of waiting time). The ageing function determines the updated priority of a task as a function of time it waits. As the priority of a task increases, it becomes eligible to be moved to a higher priority execution queue of Jesque. This evaluator continuously evaluates the priority of tasks that are waiting to be executed and remaps them to an appropriate priority queue of Jesque.

Kafka topics/consumers: Good old Kafka helps in fault tolerance and prevents the MySql database from getting overwhelmed with numerous state change updates. Task submissions and state changes are processed asynchronously via a couple of Kafka topics.

Dependency Resolver: If you have created a BinLog based MySql pipeline, you may have noticed that although several jobs are created for historical loads, the BinLog job is prioritised over other jobs and is executed exclusively. This is achieved by submitting poll jobs DAG to Handyman. Dependency Resolver is a lightweight component which ensures that within a dependency group, tasks higher in rank (when the dependency graph is topologically sorted) are executed before those that are ranked low.

Utilisation Evaluator: The following chart shows the distribution of the number of poll jobs (the list of jobs you see on a pipeline overview page) against pipelines.

No. of Jobs per pipeline

About 72% of the pipelines in one of the clusters have less than 10 jobs. However, about 5.6% of the pipelines have more than 100 jobs with one of the pipelines having more than 1500 jobs! All of the jobs of a pipeline are submitted for execution together per their schedule. This poses two problems — a few pipelines may hog all of the execution resources and also that too many connections to a source may overwhelm it.

The Utilization Evaluator ensures that all job groups get a fair share of execution time and that only a limited number of parallel executions in a job group are permitted. When there are only a few job groups contending for execution resources, a job group can dynamically scale up to utilize greater number of resources.

Revisiting tasks

Each worker thread executes the task on a separate thread that it spawns. Doing so, enables the worker restrict the amount of time a task handler can execute for — by means of a guaranteed interruption. Each task handler, within its purview handles the interrupted state on the thread.

Once done, the task handler can optionally return the execution result (success or failure) back to the worker thread which can in turn invoke pre-configured callback handlers for the result.

Certain exceptions have special meaning for Handyman as it takes cues from them for re-scheduling tasks that may have failed temporarily or may have timed out. For instance, a job that loads data from a large table may take several hours to complete. The handyman worker thread that processes this job creates a plan to let it run for an hour. At the end of this period, the interruption is intercepted and the same task is scheduled to be executed again immediately. It is the responsibility of the task handler to maintain the checkpoints and other metadata that will help it continue from where it had already executed.

As a worker begins execution, it creates a shared memory space to store some context around the task being executed. A handle to the actual thread that processes tha task is always a part of the context. Sometimes the tasks themselves choose to add some data to the context. This context is typically used when a task is to be cancelled on the fly. For example, if a running model is to be stopped (upon user request or during an application restart) — this context is used to first interrupt the thread that is running the model. Next, the contextual data (say, a query watermark stored by the task in the shared memory) is selectively used to terminate the query job running on the destination warehouse, thus freeing up the dangling resources.

Selected statistics

In a typical production cluster, around 650K tasks are processed by Handyman every day. The execution times vary from a few seconds to as much as an hour.

Time share of different task types
Counts of different task types

Most of the tasks get executed within a minute of submission. The Handyman task workers are auto-scaled horizontally. In a later post, we’ll cover how some selected metrics are used in auto-scaling.

Monitoring & Alerting

Several statistics are collected around Handyman tasks and their state transitions. Persisted in InfluxDB, they are monitored via alerts configured in Grafana. Two of the important ones are — average wait time and the number of failures.

What lies ahead

Handyman is capable of recovering from failures and actively tries to do so. One of the enhancements that is being made to the failure handling is better cleanup of client resources when a failure happens.

Thank you for reading the post till the end. If you have any questions or have suggestions for us, please do write at dev @ hevodata.com

--

--