Custom Monitoring for Spring Batch Job Pipelines

Siddhant Sorann
MiQ Tech and Analytics
7 min readAug 25, 2020

Providing better monitoring and alerting of spring batch jobs and our pipelines.

What do we do?

In today’s fast-paced digital landscape, an effective campaign management solution ensures brands and enterprises deliver relevant, personalized experiences to customers across multiple channels and touchpoints. But managing marketing campaigns isn’t easy and it comes with its share of challenges: channel overload, ROI Attribution, automation, personalization, and more.

To achieve the best out of our campaigns, Trading Lab — MiQ’s Campaign Management Tool enables traders to manage the campaign’s set up, margin, reporting, optimizations from a single interface. Trading Lab combines each step of the campaign management cycle into a unique interface, making life easier for the traders.

Why did we need custom monitoring and alerting for our pipelines?

To ensure that the traders have up to date information available before making any decisions we ingest data from multiple DSPs (Demand-Side Platform) (Appnexus, The Trade Desk & DV360) and aggregate it. Being a programmatic media buying agency, we at MiQ work closely with DSPs to buy inventory from them. To make this possible, we run 100+ batch jobs every day. Due to the amount of processing being done and sometimes due to unforced errors some of our jobs fail which results in data being inaccurate for some or all of the DSPs.

The traders manage their campaigns, create reports based on the aggregated data we ingest, so it is very important for us to make sure that the traders are aware of the status of our data ingestion pipelines, i.e is the data ingested in Trading Lab accurate, when was it last updated and when will it be updated next?

The following drawbacks motivated us to create the new service:

  1. The manual process to update traders if we saw a failure in our pipeline was tedious and delayed.
  2. Informing traders required knowledge about which job is part of which pipeline and what the failure of that job means for the data.
  3. Some of our jobs are dependent on one another which made it difficult to understand the exact effect of a job’s failure.
  4. We could only update traders while we were in the office. Since we have offices in almost all timezones and the development team is located in Bengaluru, India this resulted in a delay in informing the traders.
  5. There was no way of letting traders know when was the data last updated and when will it be updated next.

How did we achieve this?

We run our Spring Batch Jobs as Tasks on Spring Cloud Dataflow. You can refer to our previous blog on SCDF to get more information on it here.

To solve our issues we decided to write a custom service named “Watch”. The main responsibility of this service was to automate the monitoring and alerting of our pipelines and jobs. Another very important thing we kept in mind while designing this service was to keep it highly dynamic, i.e. allow the addition and modification of tasks on SCDF without having to change the code in Watch. To achieve this dynamic structure we used MongoDB for the storage of our documents.

We divided the service into 2 parts -

  1. The first one was responsible for calculating the stability of each of our tasks and alerting us if a task failed.
  2. The second part was responsible for calculating the stability of a pipeline running with respect to each DSP, i.e. making sure that the data for each DSP is accurate. This part was a little more complex as we run multiple jobs to ingest and aggregate data for each DSP.

Task Stability

This part of our service was responsible for checking the status of all our tasks scheduled on SCDF.

  1. Get a list of all tasks added on SCDF. **
  2. Each task in the list is iterated and its stability is computed -
  3. Get the cron expressions from the schedule of the task. If the schedule is not present, i.e. task is not scheduled, we catch the exception and ignore it. **
  4. From the cron expression, we calculate the previously scheduled time of the task and run an SQL Query (below) to figure out if the task ran successfully between then and now.
  5. If the task had failed to run i.e. a job failed, task failed to trigger, etc, we add it to a list that contains all the failed tasks.
  6. The list of failed tasks is returned to the API which calls the task stability controller. If there are any failures we also send an alert to slack via webhook.

** — These methods are cached to avoid redundant API requests as the result very rarely changes. We have a separate API to clear the cache in case there is a change.

Sample result:

[{    “taskName”: “apn-advertiser-metadata”,    “message”: “Some or all Jobs under the Task have failed”,    “taskTriggerType”: “SCHEDULER”,    “nextScheduledTaskAt”: “2020–08–04T09:10:00”,    “taskStatus”: “JOB_FAILURE”}]

Why can’t we use the task status table in the DB to calculate the task stability?

We have multiple jobs running under each task. Spring Cloud Task is defined in a way that if a job under a task fails, the task will not fail. There is an annotation available “ — spring.cloud.task.batch.fail-on-job-failure=true” this annotation fails the task if a job fails but at the same time stops the further execution of other jobs under the task. This is why we decided to write an SQL query to figure out if all jobs under the task run successfully.

SELECTt.TASK_NAME,IF(t.EXIT_CODE!=0,“TASK_FAILURE”,IF((Sum(Case when e.STATUS = “failed” then 1 end ) )>0,“JOB_FAILURE”,IF(COUNT(b.TASK_EXECUTION_ID)=0,“NO_JOBS_EXECUTED”,“NO_FAILURES”))) AS TASK_STATUS,t.START_TIME,t.END_TIME,t.TASK_EXECUTION_IDFROMdataflow.TASK_EXECUTION tLEFT JOIN dataflow.TASK_TASK_BATCH b ONb.TASK_EXECUTION_ID = t.TASK_EXECUTION_IDLEFT JOIN dataflow.BATCH_JOB_EXECUTION e ONb.JOB_EXECUTION_ID = e.JOB_EXECUTION_IDLEFT JOIN dataflow.BATCH_JOB_INSTANCE i ONi.JOB_INSTANCE_ID = e.JOB_INSTANCE_IDWHEREt.TASK_NAME = :taskNameAND t.START_TIME <= :endTimeAND t.START_TIME >= :startTimeGROUP BYt.TASK_EXECUTION_IDORDER BYt.START_TIME DESCLIMIT 1;

DSP Stability

DSP Stability is used to infer if all tasks responsible for ingesting and aggregating data for each DSP ran successfully. This automates the process of letting users know if the data present in the UI is correct or not. It is also used to notify the developers if something goes wrong.

For each DSP we had to create a dependency chart between the tasks, something like:

To get the final status of the tree (DSP Stability) -

  1. We perform a pre-order traversal of the above tree structure.
  2. For each node that we encounter we calculate the stability and store it’s the start time.
  3. We then move on to the child node where we find out if the task ran successfully before it’s parent task had started.
  4. This process goes on until we have covered the entire tree.
  5. In this process, if at any point we encounter failure in the child node we return from the function with a failure status and a list of all the failed tasks.

Sample result:

{    “featureName”: “TTD”,    “updatedTill”: “2020–08–04T00:46:20”,    “nextUpdate”: “2020–08–04T11:00:00”,    “failedTasks”: [        “ttd-raw-stats”    ],    “stable”: false}

This result is also regularly published to our realtime firebase database. From there it’s picked by our UI so that the traders (users) can effortlessly have access to the stability of data in Trading Lab. As soon as there is a failure our Data Health Monitor will display an error on the UI without any manual intervention. Below is a snippet of how our UI looks -

How do we store the tree in our code?

Firstly, we store the dependencies as a JSON document in MongoDB. Following is a sample document:

{    “name”: “TTD”,    “tree”: [{        “name”: “server-placement-stats-processor”,            “childrenTaskNodes”: [{                “name”: “ttd-raw-stats”            },            {                “name”: “server-campaign-revalidate-lookups”         }]},        {            “name”: “ttd-raw-stats”,            “childrenTaskNodes”: [{                “name”: “ttd-advertiser”             },             {                 “name”: “ttd-insertion-order”        }]}
]}

Then we have a method in our code to convert this JSON Document into a Map<String, FeatureTreeNode>. This is how FeatureTreeNode is defined:

@Datapublic class FeatureTreeNode {List<String> taskList;Date startTime;// Enum to store values for stability.FeatureNodeStability stability;String seat;}

This Map stores the stability values and start times for all the nodes in the tree and is passed around throughout the methods and manipulated. We use a map to make it easier to locate a particular task while calculating the stability of the DSP.

All the functions like fetching documents from MongoDB, creating the tree from the JSON Document in MongoDB, etc are cached as these change very rarely. Since these are present in MongoDB, we have exposed CRUD REST APIs to manipulate these documents via Watch itself. This also allows us to manipulate, add, or remove the trees without having to release or restart our service.

Conclusion

With the help of this dynamic service, we managed to get real-time automated updates on the status of our jobs and DSP pipelines. More importantly, our stakeholders could simply look at the status of our pipelines to understand if the data being displayed is correct or not. Enabling us to recover easily and efficiently in case of a failure in any of our pipelines was another big advantage. This automated monitoring managed to save an estimated 10 hours of dev effort each sprint (2 weeks) which was spent in figuring out how failures would affect the data and informing the stakeholders. The process to enable the monitoring of a new pipeline was made effortless as it only required the addition of another document in MongoDB.

--

--