Distributed scheduling with Work4j

Davit Petrosyan
Fortra Armenia
Published in
6 min readMar 12, 2021

Almost every large application one way or another addresses the problem of scheduling. There is always work to do at a specific time, with a specific period. Usually, you will choose a simple library in your favorite language to use and everything will be great. However, at a certain scale this becomes a real challenge.

In this article we will discuss how to use Work4j and setup a distributed job orchestration subsystem to run thousands of tasks across any number of machines, pods or containers.

What is Work4j?

Work4j is an open source library based on Quartz that adds some important features such as instant scaling with load balancing, replication, job management, monitoring and more.

Separation of Concerns

In contrast with Quartz, Work4j separates two main pieces based on their responsibilities — Scheduler Controller and Worker Controller. As you may guess from the names, first one is responsible for defining, monitoring and ordering jobs in, while the second one takes care of actual jobs to be executed properly according to their schedule.

Based on your actual needs you may choose if scheduler and worker controllers should be bundled together or they should be hosted separately, but in either way both are designed to scale.

In this example the Scheduler Controller will be hosted as a REST API, while Worker Controller will be hosted inside a simple console application.

In a nutshell, we want to achieve the architecture demonstrated in this figure:

Distributed Scheduling Subsystem with Work4j

First of all, we will create a REST API to handle all the operations with jobs, then we will create a worker program that will run the jobs. The messaging infrastructure, in fact, is optional, and designed to deliver any request from scheduler to worker immediately. In this example we will achieve this with polling for simplicity, however, you can integrate and use whatever messaging system you have in your application.

Setup of Scheduler Controller

As mentioned earlier, everything can be bundled within one single application, however, in this example will host Scheduler Controller as a separate REST API based on Spring Boot. We also will use MongoDB as a database to store and manage our jobs. You can easily change this by implementing a special interface for your preferred database. Here is the wiki page with more details about Scheduler Controller and Scheduler Data Repository.

First of all, let’s add necessary Maven dependencies:

<dependency>
<groupId>io.imast</groupId>
<artifactId>work4j-data-mongo</artifactId>
<version>0.0.1.RC2</version>
</dependency>
<dependency>
<groupId>io.imast</groupId>
<artifactId>work4j-controller</artifactId>
<version>0.0.1.RC2</version>
</dependency>

Here are two main dependencies: work4j-data-mongo and work4j-controller. The first one is a data provider implemented specifically for MongoDB, while the second one is the scheduler controller module itself. Of course, when using MongoDB-based repository, you will need to include it’s driver mongodb-driver-sync. The complete pom file example can be found here.

Once dependencies are specified, let’s create some Spring Beans:

Configuration of Scheduler Controller API

Assuming you already have a MongoClient and MongoDatabase to store the data in. At that point all you need to do is create this two beans:

  1. Scheduler Data Repository — for any data manipulation operations,
  2. Scheduler Controller — the entry point for all the scheduler operations.

Then, as we plan to expose our controller as a REST API, we will create a set of endpoints for all the required scheduler operations. For this purpose we will create following REST endpoints:

  1. Defining and managing jobs — /api/v1/scheduler/jobs
  2. Managing job executions — /api/v1/scheduler/executions
  3. Monitoring execution iterations — /api/v1/scheduler/iterations
  4. Managing workers and clusters — /api/v1/scheduler/cluster

In this example, we have created four main API’s to handle all the scheduler operations. This includes defining jobs, executing them, monitoring iterations and cluster states. The complete example can be found here.

Setup of Workers

Worker is, in fact, the actual piece of software that will execute the jobs. In Work4j, workers are backed by Quartz which already has various features inside such as persistence and clustering. In a nutshell, worker should have two main pieces — a channel to connect to the scheduler and the actual implementations of running jobs. Rather than this, it’s important to note, that workers and schedulers can synchronize in two ways: polling and messaging. Work4j supports polling-based synchronization out -of-box and provides simplistic interface to implement synchronization using your favorite messaging system (Kafka, RabbitMQ, other). In this example we will setup workers to synchronize with scheduler by polling.

First of all, let’s highlight some of dependencies to build the worker:

<dependency>
<groupId>io.imast</groupId>
<artifactId>work4j-worker</artifactId>
<version>0.0.1.RC2</version>
</dependency>
<dependency>
<groupId>io.imast</groupId>
<artifactId>work4j-model</artifactId>
<version>0.0.1.RC2</version>
</dependency>
<dependency>
<groupId>io.imast</groupId>
<artifactId>work4j-channel</artifactId>
<version>0.0.1.RC2</version>
</dependency>

Here is an example of worker application setup:

What we do here is very simple. First, a Worker Configuration object is created to tell how we expect the worker to behave. Then, we try to connect to the scheduler to register the worker in it either to share the load within cluster or to replicate the load.

When worker tries to join to a cluster, it should declare its kind to be one of these:

  1. Exclusive — the worker should be the only worker in the cluster. If there is already any active worker in the cluster, connection will fail. In this case, persistence is not necessary.
  2. Replica — the worker will execute all the jobs assigned to the cluster regardless existence of other workers.
  3. Balanced — the worker will share the load with other balanced workers in the cluster. This behavior is based on Quartz’s native clustering using one of its shared persistence methods.

Another important point here is we have to implement the Scheduler Channel if we want a custom way of communication between Scheduler and Worker controllers. In this example, as Scheduler Controller is exposed as a REST API, the Scheduler Channel should be implemented as an HTTP Client that interacts with the Scheduler API.

At last, the Worker Controller can be created and executed. This is where you define:

  1. The job types supported by worker and their implementations,
  2. The necessary dependent modules for each job type if any,
  3. Optionally, define the custom listener that will accept execution-related messages (pausing/resuming jobs, scheduling new ones, removing, etc.).

Here is an example of how to implement executor for a specific job type:

Implementation of Echo Job

This is it! Now our infrastructure is ready to serve incoming jobs. More about implementing jobs can be found here.

How to use it?

The last part of this article will show how a client application (user interface, edge service, etc.) should use the scheduling platform. In this example, as we exposed the Scheduler Controller with REST APIs, we will use the scheduler calling various API endpoints.

There are two main steps to do: define a job and execute it.

Job Definitions

One of the central objects for Work4j is Job Definition. The object defines all the required specifications for the desired work to run. Here’s example:

Creating Job Definition: Submit request to /api/v1/scheduler/jobs

This is what a minimal job definition looks like. It has a name and a folder for easy identification, a type that defines behavior of activity, a set of triggers and the cluster the job is assigned to. In this example, the job /examples/simple_job is assigned the cluster “/” to run every 30 seconds as an ECHO_JOB. In payload of job, we will put all the necessary input data for the job to run.

Currently, Work4j supports following trigger types:

  1. Periodic — repeating every N seconds/minutes/hours/etc.,
  2. Cron — triggering an execution of job according to given Cron Expression,
  3. Once — triggering job only once.

Job Executions

In the above example, we defined a job that will be stored in the system, but we did not execute it yet. In order to start actual execution we should ask the scheduler to create and execution instance. This can be easily done by creating a Job Execution as follows:

Executing job: Submit request to /api/v1/scheduler/executions

In this case we ask to execute the job definition by given identifier. Optionally, you can provide a new cluster value to run it within another cluster or you may want to override any of payload values. After this operation jobs will start their execution on the cluster workers.

That is it! Now the job execution scheduled on the cluster “/” will fire every 30 seconds as mentioned in its trigger.

Conclusion

In this article we went trough the setup and usage process of a distributed scheduling system based on Work4j.

To test this on your machine you can simply checkout the sample code from here, build it and run. If you have Maven, Docker and Docker-Compose on your machine, you can run it like this:

cd samples/distributed-sample/
mvn clean package install
sudo docker-compose build
sudo docker-compose up -d

This will build and run 4 applications: scheduler, worker, Mongo instance and a MySQL instance:

Then, to test scaling and clustering capabilities of the worker add some more instances and schedule more jobs:

sudo docker-compose up -d --scale worker=4

--

--