Monitoring Amazon EMR Step Failures
When we run Spark Jobs on Amazon EMR, steps may fail for many reasons, resulting in the job not running at all. The workflow will be affected unless we are informed of these failures.
One of our ETL jobs’ classpath was changed, but on the scheduler side, the classpath was not updated. It was difficult to be aware of the step failures as we did not receive any alarms.
Receiving alarms on a monitoring service allows us to take quick action. Using Amazon SNS messages to get failures and process those messages with AWS Lambda function is a solution to our problem. Lambda function allows us to run calls in response to events from other services, and in this case, it generates alarms and sends them to Slack Service. In this post, I will talk about this solution.
Creating an Amazon SNS topic is the first step of the solution, it is basically a notification service. When an event occurs, a message will be published to the topic and it will deliver an identical copy of that message to the Lambda function that subscribes to it. You can learn how to create an Amazon SNS topic from AWS documents.
An event that Lambda function receives is structured as below:
"Records": [
{
"Sns": {
"Type": "Notification",
"MessageId": "123",
"TopicArn": "arn:aws:sns:region:123123",
"Subject": null,
"Message": "{\"detail-type\":\"Detail Type\", \"detail\":{\"severity\":\"ERROR\", \"clusterId\":\"j-123123\", \"message\":\"Step s-123123 failed.\"}}",
"Timestamp": "2022-06-10T03:33:48.976Z",
"SignatureVersion": "1",
"Signature": "123",
"SigningCertUrl": "signingCertUrl",
"UnsubscribeUrl": "unsubUrl",
"MessageAttributes": {}
}
}
]
}
Lambda function will process received events:
def lambda_handler(event, context):
records = event.get('Records')
if not records:
logging.error("There is no record.")
return
for record in records:
sns = event_parser.get_sns(record)
if not sns:
logging.error("There is no SNS in record.")
continue
sns_topic_arn = str(sns.get('TopicArn', ''))
message = json.loads(str(sns.get('Message', '')) or '{}')
We should build a generic alarm creator to support processing different types of messages.
class AlarmGenerator(ABC):
""" Base service for creating the alarm. """
@abstractmethod
def create_alarm(self):
raise NotImplementedError()
Creating a Lambda layer helps us to manage libraries from a single source. We have created a Lambda Layer to reuse the function that sends messages to Slack Service and reused it here to send alert messages.
The last step is to create EMR step failure alarms. Here, using the detail.message field of SNS to create alarms explains the issue clearly.
class EMRStepFailureAlarmGenerator(AlarmGenerator):
def __init__(self, detail: dict, detail_type: str):
self.detail = detail
self.detail_type = detail_type def create_alarm(self, ):
""" Create alarm. """ level = self.__get_log_level()
cluster_id = self.detail['clusterId']
title = f'{self.detail_type} - Cluster Id: {cluster_id}'
message = self.detail['message']
username = 'Slack Alerts Lambda'
channel = 'alert-channel' send_slack_notification(
message=message,
channel=channel,
username=username,
title=title
)
def __get_log_level(self):
severity = self.detail['severity']
if severity.lower() == 'error':
return 'error'
else:
return 'info'
We can take preventive actions against failures while monitoring the jobs and detect the problems instantly thanks to this implementation. I hope this monitoring flow works for you too.