How to do graceful shutdown of spark streaming job

Manoj Kumar Dhakad
2 min readMay 11, 2019

--

Spark streaming is the extension to the core spark API that enables fault-tolerant, scalable and high throughput stream processing of live data. Spark streaming jobs are 24x7 running jobs and we need to write a very good solutions so it can run 24x7 without any data lost from stream. we have data recovery feature in spark streaming called checkpoint so in case of hardware failure it can restart from checkpoint so there is no data lost. you can read more about checkpoint here but what if we want to do redeployment? At this time we have to stop running job so there are several ways to do it
1. Go to the sparkUI and kill the application
2. Kill the application from client
3. Graceful shutdown
In approach 1 and 2 we are killing our job forcefully but in 2nd approach we can stop application gracefully but I didn’t chose it because I am using AWS and I don’t want to login to client.

Graceful shutdown: If we want to stop spark job gracefully so first it will process all the events in process as well as queued batches and then stops reading from stream after that shutdown the job so there is no chance of losing a single event/record. graceful shutdown is most useful when we want to run our job again.

My approach to implement graceful shutdown: I am deploying my all spark streaming jobs in AWS so I am using Kinesis, EMR, S3 and DynamoDB, CloudWatch, Postgre, HBase and many more AWS components so here I am using S3 to store files like checkpoint, jars and input files. Now, I want to do my all activities from AWS console like monitoring, deployment that’s why I implemented graceful shutdown feature like below.

  1. After starting spark streaming I am creating one file in the checkpoint directory with stream application name. you can create file at any location with any name.
  2. Now the application will continuously check for that file at specified location.
  3. Now , If you want to stop it gracefully, you only need to delete that file and job will stop only after finishing current processing batch as well as queued batches so there is no lose of data.
  4. Please find the complete java pseudo code below
    JavaStreamingContext streamCtx = JavaStreamingContext.getOrCreate(kinesisCheckPointDir,() -> createContextForKinesis(batchInterval));
    LOG.info(“Starting Streaming”)
    try {

    streamCtx.start();
    String appName=args[1];
    S3Utils.createPIDFile(kinesisCheckPointDir,appName);
    boolean isPidFileExist=true;
    while(isPidFileExist){
    isPidFileExist=S3Utils.isPIDFileExist(kinesisCheckPointDir, appName);
    if(!isPidFileExist)
    {
    LOG.info(“Stopping the streaming”);
    streamCtx.stop(true,true);
    LOG.info(“Deleting the checkpoint directory”);
    S3Utils.deleteS3Dir(kinesisCheckPointDir);
    }
    Thread.sleep(30000);
    }
    } catch (InterruptedException e) {
    LOG.error(e.getMessage(), e);
    }
    LOG.debug(“End Streaming”);

--

--