Transforming Spark Jobs Scheduling design with the Power of Event-Driven Architecture

kgshukla
DBS Tech Blog
Published in
9 min readFeb 14, 2023

A step-by-step guide to implement event-driven architecture that ensures resilience and operational excellence.

By Kapil Shukla

In previous articles, I wrote about short-term and long-term solutions to optimise the performance of data-intensive applications in Spark. In this article, I’d like to focus on the challenges of job scheduling in Spark, and how to solve it.

As the number of jobs increased, job scheduling proved to be a larger project than expected, making work a bit more challenging for the Middle Office Technology (MOT) Finance team. Shell scripting is often used to solve this issue, but I’d like to share a few cons of using it, and why event-driven architecture works better in this instance.

Shell scripts are easy to start with and was the initial preferred option to submit Spark jobs when starting with big data projects. However, we found that shell scripting is not the best choice for the long haul. With one of our projects that used shell scripts, we encountered the following key issues:

1. Scripts restrict the architecture from evolving

Shell scripts are easy to begin with and the initial task was to submit a Spark job. Over time, the script started to orchestrate other tasks — including processing job dependencies, updating entities’ state in the database, and performing data validation checks after the completion of processing jobs. The architecture and workflow built was a restrictive synchronous architecture, resulting in a tight-coupling pattern that is hard to manage and not scalable.

2. Database transaction management is difficult and prone to errors

In frameworks like Spring and Spring Boot, database transaction management is straightforward. Implementing transaction management in shell scripts is difficult, and the problem multiplies if you have a combination of database updates and third-party API (application programming interfaces) calls that may or may not be idempotent. As time progresses, developers usually implement a best-case flow (i.e. assuming the database or other products will not be down). This ultimately leads to poor error and failure handling.

3. Recovering from failure is challenging

The script was doing a lot of coordination, including upserting into the database and calling third-party APIs. The script would also sometimes spawn a new thread that tracks the progress of the Spark job that has just been submitted. Any form of failure resulted in the operations team spending a lot of time reconciling the state of various entities in the database before re-running the script.

4. You’ll face operational and observability challenges

We observed several issues, including:

a. The script (process and threads) was flushing the logs to several log files, making correlation of logs difficult. The absence of a centralised logging system made it hard to resolve issues.

b. When a Spark job was submitted using the Spark-submit command, yarn did not return the Spark application ID immediately. In cases where 20–40 Spark jobs were submitted by the script at one go, the operations team had to manually associate the Spark job ID with the batch job.

These challenges multiplied when the same script was invoked multiple times to run Spark jobs for multiple countries or business segments.

5. There are untested shell scripts

We’ve noticed that when the script is heavy and untested, the quality of the architecture goes downhill over time. Developers were putting in effort to write unit tests for the Spark Jobs (business logic), however there weren’t any test cases written to unit test the scheduler logic written in the shell scripts.

The Move to Event-Driven Scheduling

To overcome the above issues, we decided to overhaul the scheduling architecture to be event-driven. Event-driven architecture is a design pattern that focuses on the production, detection, and consumption of events.

Among the many benefits of implementing an event-driven architecture, one of them evolved our functionalities in an efficient manner, while providing increased resilience as multiple components were loosely connected. From an architecture perspective, it was a move to an eventual consistency pattern where several components were connected loosely, and communication was asynchronous.

Another consideration was to use workflow management tools, like Airflow. However, we couldn’t use it because the number of jobs and the configuration of each job were dynamic every month. Additionally, the jobs were different for every country. That led us to take the control of the scheduling logic and architecture.

The architecture blueprint below highlights the several components (or microservices) we built to overhaul our scheduling process. Essentially, the shell script-based scheduling was replaced by four products/components:

1. Apache Kafka: Binds Spark jobs and various microservices through events.

2. Apache Livy: Provides restful APIs to submit Spark jobs. Since the jobs will be submitted by a microservice, we needed a mechanism to interact with a Spark cluster.

3. Spring Boot microservices: Various microservices that are responsible for different functionalities.

These include:

  • Job Scheduler microservice, which schedules a Spark job.
  • Data Quality Check microservice, which is invoked to validate whether the data generated by the Spark job is correct once the job is completed.
  • State Reconciler microservice, which periodically gets invoked (a cron job) to ensure that what is observed in the database (the state of the jobs) is consistent with what is happening in reality.

4. Pivotal Cloud Foundry: A container platform that hosts our microservices and provides centralised logging, automatic scalability, and resilience.

Figure 1: Event-Driven Scheduling Architecture

We implemented the following steps:

  1. The Job Scheduler microservice evaluates which job is to be submitted next. The metadata information of all jobs is stored in the transactional database.
  2. The Job Scheduler microservice submits a Spark job via Livy and updates the states of various entities (that we need to monitor and manage) in the transactional database.
  3. The Spark job finishes and publishes an event in a Kafka topic, TOPIC_1. If Kafka is down, it retries 3 times. If the event cannot be published in Kafka, the event information is inserted in a Transactional DB.
  4. The Job Scheduler microservice consumes the event from TOPIC_1, updates the entities’ state in the transactional database, and submits the next eligible Spark job(s) via Livy.
  5. The Job Scheduler microservice also publishes a new event in TOPIC_2, which includes the updated state of the job it consumed from TOPIC_1 in step 4.
  6. The Data Quality Check microservice consumes the event from TOPIC_2. It runs validation checks and updates several tables in the transactional database.
  7. The State Reconciler microservice periodically reconciles the states captured in the transactional database vs the reality. For example, if any event persisted in the database because of Kafka unavailability, then the reconciler service will try to publish it to the Kafka periodically.

Benefits of using Event-Driven Architecture

We observed the following benefits once we implemented the above steps:

  1. Evolving Architecture: We decoupled several functionalities into separate microservices which were acting based on the event consumed by them. For example, Jobs Scheduling and Data Validation are two different functionalities now. Previously, both these tasks were done by the script.
  2. Resilience: We are following the eventual consistency pattern. Even if any microservice is down temporarily, it would eventually catch up asynchronously once it is up and running.
  3. Operational Excellency: Pivotal Cloud Foundry (PCF) provides the consolidated logging mechanism for all microservices and provides a way to automatically scale our microservices. Additionally, Livy provides the application ID after submitting the job, thereby helping to navigate the Spark UI immediately for a specific job.
  4. Observability: This needs to be implemented by the team. We can separate observability-related metrics from the transactional database, allowing the operations team to continue accessing metrics data should the latter not work. This allows us to add functionality in our Observability microservice to cater to future needs.
  5. Unit Tests for Microservices: We unit-tested our several microservices before rolling them out to production. This allowed us to catch and fix bugs at an earlier stage in our development cycle.
  6. Scalability: Every component (Kafka, microservices, databases) of this solution can be scaled on demand and independently of each other, unlike our previous approach where all tasks were done by a single shell script that could not be scaled.

The Tough Part: Writing the Job Scheduler Microservice

While implementing the above blueprint, we faced challenges updating the entities’ state in the database, creating a new event to be published in the Kafka topic, and calling Livy service to submit a job.

This is because a transaction spanned across multiple products, and it was not easy to maintain reliability in such cases. Essentially, we wanted to achieve two objectives:

  1. Atomicity: In the instance where any component (like database, Kafka, or the microservice itself) is unavailable during an event processing, then the state of various components/entities in the database must be consistent. For example, if the Job Scheduler microservice could update the entities’ states in the database but couldn’t publish a new Kafka event to TOPIC_2, then the updates must be rolled back. And if the new event is successfully produced in Kafka, then the database updates must be committed.
  2. Idempotent Consumer: Event consumers must be robust enough to handle a scenario when the event is redelivered due to event processing delays.

These two objectives are critical, yet difficult to implement if we don’t understand the complexities of dealing with transactions spanning multiple products. To achieve them, we resorted to the Idempotent Consumer and Transactional Outbox pattern. The below UML (Unified Modeling Language) diagram highlights the processing order and database transaction boundaries.

Figure 2: Sequencing Flow

From the above image, you’ll notice three events:

  1. There are two microservices, Scheduler and Kafka Consumer. They are part of the same Job Scheduler Microservice, but for reader’s understanding, I’ve split them into two parts in the diagram.
  2. Job Scheduler microservices can be invoked in two ways: (1) through on-demand RESTful API calls, or (2) when it consumes an event. On-demand RESTful calls are required when we want to schedule the next job, i.e., no entities’ updates in the database. On the other hand, when the event is consumed successfully, we need to call the functionality to schedule the next eligible job.
  3. “Begin TXN” and “End TXN” boxes are database transactions boundaries handled in Spring Boot via @Transactional annotation.

Idempotent Consumer Pattern

The Idempotent Consumer pattern is handled using the saveandflush() functionality offered in Spring data (look at the first Begin TXN and End TXN blocks). Save and Flush functionality will acquire a lock on the row in the database table. It would be released if the transaction is either committed or rolled back. If an event is consumed again by the microservice due to processing delays, the processing waits until the first processing is completed (regardless of doing it successfully or unsuccessfully), and then releases the lock.

Where the first processing is successful and the database commits the data, the second processing would fail due to unique constraint error.

Where the first processing is unsuccessful, the second processing would follow instructions written in the Begin TXN and End TXN blocks.

Transactional Outbox Pattern

The Transactional Outbox pattern performs transactions that span multiple products. The Job Scheduler microservice needs to handle multiple calls and updates with different products, including updating the entities’ state in the database, publishing an event in Kafka, and scheduling other jobs via calling Livy service.

Failures can happen at any point in time and the code must be robust enough to maintain eventual consistency. Therefore, rather than producing a message via calling Kafka producer APIs, we made a database entry (via insertNewEventDetails()) in an outbox table, and used the transactional boundaries of the database to commit the transaction along with other database updates. We then leveraged the change data capture (CDC) functionality of the database to produce an event in the Kafka Topic asynchronously. This helped in mitigating the failure and state inconsistency because of updates that had to take place in two different products.

Conclusion

It is liberating to move away from script-based to event-driven scheduling. The team can implement several functionalities at a quicker rate, which was not possible when they were working on a single shell script spanning thousands of lines of code. Event-driven architecture has provided the necessary decoupling, scaling, and resilient architecture that the team was aspiring for.

Kapil is a technologist with over a decade of experience working as a software developer, product manager and solution architect building large-scale enterprise products. In his current role as Head of Engineering and Architecture, Kapil is responsible for defining the technology strategy and adoption, hiring and coaching of talents to build superior products for the finance platform in middle office technology at DBS.

--

--

kgshukla
DBS Tech Blog

Kapil is a technologist with focus on building data intensive applications and data analytics.