High Available Task Scheduling — Design using Kafka and Kafka Streams
In any Enterprise or Cloud application, Task scheduling is a key requirement. A highly available and fault tolerant task scheduling will help you to improve your business goals.
A classic task scheduling infrastructure is typically backed by databases. The instances/service that performs the scheduling, loads the task definitions from the database into memory and performs the task scheduling.
This kind of infrastructure creates issues like stateful services, inability to scale the services horizontally, being prone to frequent failures, etc., If the state of these kinds of service is not maintained well, it may lead to inconsistent and integrity issues.
To mitigate these issues, we will explore a highly available and fault-tolerant task scheduling infrastructure using Kafka, Kafka Streams, and State Store.
How is Fault Tolerance achieved?
The task definitions are stored in a particular partition in a Kafka topic. The state of these tasks is stored in a State store backed by Kafka changelog topics.
If the streaming app that is responsible for processing task definition from a particular partition goes down, the other instance of the streaming app will take the responsibility of processing from the same partition; the previous state of each task will be loaded from the respective state store. Since the states are preserved, if the streaming app fails, the other instance will be able to take it over from the previous state.
High level design
- Task definitions will be created/updated by appropriate entities and sent to a Kafka Topic; say task-schedules
- Horizontally scaled Kafka Streams application will read these definitions from the topic partitions and store them respective state stores
- These Kafka Consumer application will use the Kafka Stream Processor API to manage these task definitions in the state store
- The Transformer implementation will call a Punctuator periodically
- The Punctuator reads all the Task definition from the store and sends them out for execution.
- There will be up to 5 schedulers created in the Transformer implementation; one each for MILLISECOND, SECOND, MINUTE, DAY, and WEEK call the respective Punctuator to send the tasks out. Each scheduler will call the respective Punctuator to send the tasks out.
- When the Punctuator sends out the task upon elapse of the scheduled time, a TopicNameExtractor implementation will be used to decide which target topic the task needs to be sent out.
State Store
This design uses State stores that are connected with the Kafka Streams application to preserve the task definitions and their previous states. There will be an individual state store for each of the period value. For eg, all the tasks schedules that are needed to be scheduled every second will be store in one state store.
Task Avro Schema
Let us look into the avro schema which we will be used to generate the Java POJO.
Task Frequency Time Unit Definition
The enum style constant definition that defines all possible values for the periodicity of tasks scheduling that we will be handling, in terms of Time Units.
// task frequency time unit definition
{
"namespace": "org.cbenaveen.task.scheduling",
"type": "enum",
"name": "TaskFrequencyTimeUnits",
"symbols": ["MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS", "WEEKS"]
}
Task Configuration Definition
The possible configuration that will be created and set when the task scheduling is created. One important configuration to note is the output topic (outputTopic) to which the task needs to be sent out so that the respective consumer will be able to consume the tasks.
// task configuration definition
{
"namespace": "org.cbenaveen.task.scheduling",
"type": "record",
"name": "TaskConfiguration",
"fields": [
{"name": "outputTopic","type": "string"},
{"name": "configurations","type": [ "null", { "type": "map", "values": "string" }],"default": null}
]
}
Task Frequency Definition
The time-frequency at which the task needs to be scheduled.
// task frequency definition
{
"namespace": "org.cbenaveen.task.scheduling",
"type": "record",
"name": "TaskFrequency",
"fields": [
{"name": "frequencyTimeUnit", "type": "TaskFrequencyTimeUnits"},
{"name": "time", "type": "int"}
]
}
Task Definition
The actual task definitions are sent to Kafka Cluster as the task that needs to be scheduled. This schema will use the Task Configuration and Frequency to fine-tune the execution properties. The customer id field will help us to create the same task but for different customers.
// task definition
{
"namespace": "org.cbenaveen.task.scheduling",
"type": "record",
"name": "TaskDefinition",
"fields": [
{"name": "customerId","type": "string"},
{"name": "taskName","type": "string"},
{"name": "frequency","type": ["null", "TaskFrequency"], "default": null},
{"name": "config", "type": "TaskConfiguration"},
{"name": "data","type": [ "null",
{
"type": "map",
"values": "string"
}
], "default": null}
]
}
Task Definition Key
A complex object that will be used as a key when the definitions are sent to the Kafka cluster. The customer id + task name combination will help differentiate the same type of task from one customer to another.
// task definition key
{
"namespace": "org.cbenaveen.task.scheduling",
"type": "record",
"name": "TaskDefinitionKey",
"fields": [
{"name": "customerId","type": "string"},
{"name": "taskName","type": "string"}
]
}
Generated Java Class
All the above avro schemas will be used to generate the Java Pojo object during the compilation time using the avro-maven-plugin plugin. Below is the actual maven avro plugin configuration.
<!-- Maven Avro plugin for generating pojo-->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/schemas/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/generated/</outputDirectory>
<imports>
<import>${project.basedir}/src/main/resources/schemas/task-config.avsc</import>
<import>${project.basedir}/src/main/resources/schemas/frequency-timeunit.avsc</import>
<import>${project.basedir}/src/main/resources/schemas/task-frequency.avsc</import>
<import>${project.basedir}/src/main/resources/schemas/task-definition.avsc</import>
</imports>
</configuration>
</execution>
</executions>
</plugin>
Java Class Hierarchy
Generated java class hierarchy diagram generated using IntelliJ.
Task Handlers
To be able to handle the task scheduling and task definition CRUD on State Store, a task handler interface is defined.
package org.cbenaveen.task.scheduling.handiling;
import org.cbenaveen.task.scheduling.TaskDefinition;
import org.cbenaveen.task.scheduling.TaskDefinitionKey;
import java.util.concurrent.TimeUnit;
public interface TaskHandler {
void add(final TaskDefinitionKey taskDefinitionKey, final TaskDefinition taskDefinition);
TaskDefinition get(final TaskDefinitionKey taskDefinitionKey);
TaskDefinition delete(final TaskDefinitionKey taskDefinitionKey);
TimeUnit handlingDuration();
}
There is a Task Handler for each task schedule type. As we defined different task scheduling options, there are one task handlers per scheduling option. Here are the task handler and implementation hierarchy.
TaskHandler will also extend the Punctuator interface to be able to invoke in a periodic manner using the Processor Context schedule API.
Task Scheduling Transformer
The transformer implementation receives both TaskDefinitionKey and TaskDefinition form the respective partitions and delegate the same to task handler implementations. The handler will take care of storing them in the correct state stores.
Topic Name Extractor
Each task schedule want be sent out to a particular topic, from where respective tasks consumers will pick up the tasks for the execution. Due to the dynamic nature, An implementation of TopicNameExtractor interface is used to decide the target topic. This implementation will use the Task Configuration set in the Task Definition to return the correct topic name.
public class TargetTopicNameExtractor implements TopicNameExtractor<TaskDefinitionKey, TaskDefinition> {
@Override
public String extract(final TaskDefinitionKey taskDefinitionKey,
final TaskDefinition taskDefinition,
final RecordContext recordContext) {
return taskDefinition.getConfig().getOutputTopic().toString();
}
}
Kafka Streams Topology
The Kafka Stream DSL approach is used to connect the transformer implementation to the topology. Since there is no other processor added to the topology, the topology definition looks very simple.
// create a Stream Builder instance
final StreamsBuilder streamsBuilder = new StreamsBuilder();// Create the stream to consume the tasks/tasks definitions from the tasks topics
final KStream<TaskDefinitionKey, TaskDefinition> keyTaskDefinitionKStream = streamsBuilder
.stream(Config.TASK_SCHEDULING_INPUT_TOPIC, consumed);
// add processors using DSL and PAPI
keyTaskDefinitionKStream
.transform(new TaskSchedulingTransformerSupplier(), Config.TASK_STORES)
.to(new TargetTopicNameExtractor(), Produced.with(getTaskDefinitionKeySpecificAvroSerde(), getTaskDefinitionSpecificAvroSerde()));
Complete Implementation
This post gives a brief overview of the implementation, the complete implementation is available the below github. This github also consists of a few test implementations that could be used to generate the test task scheduling.
Follow me on Medium to learn more about Kafka, Spring, Spring Boot, Java, Kubernetes, and many more similar topics.