Hadoop/HDFS Events to AWS SQS in Real Time
Hadoop, with its distributed file system HDFS, is the most popular on-premise Big Data system. This project describes a way to send a log of all HDFS activity to the cloud. The HDFS activity log may be used in the cloud for a variety of purposes — Log Analysis, Metrics, Capacity Planning, Auditing, Security Scanning, etc. Here is the high level architecture:
The steps involved in this project are straightforward and non-invasive to the running Hadoop. All of the software is open source under the permissive Apache License. The initial software hdfs-inotify-example was written by Mark Brooks a.k.a onefoursix on github. He has archived the project, however, I forked the source code and have continued to develop it here: https://github.com/jagane-infinstor/hdfs-inotify-example
Capture HDFS Events using inotify
HDFS includes functionality called inotify for monitoring file system activity. Inotify is well architected as it imposes minimal load on the NameNode and is non-invasive. In modern Hadoop (circa Feb 2023), the events reported are CREATE, UNLINK, APPEND, CLOSE, RENAME and METADATA.
Mapping HDFS Events to AWS SQS: HDFS Transaction ID
The stream of events provided to the inotify client includes the HDFS Transaction ID for each event. This is useful to us. We send the inotify events to an AWS FIFO SQS Queue. The FIFO queue supports the concept of a Message Deduplication ID. If two messages with the same Deduplication ID are posted to the SQS Queue within 5 minutes, AWS SQS will not deliver the second message. This enables us to build resilience into our system — we can configure the inotify client to be a systemd auto-restart service. If our inotify client crashes and we restart, we can pick up from the last transaction ID. If we happen to repeat one or more transactions, that is perfectly fine since FIFO SQS will perform deduplication for us.
Mapping HDFS Path to AWS SQS Message Group ID:
FIFO SQS queues include the concept of a message group. All messages in a specific message group are delivered in order. We exploit this mechanism to maintain order of specific messages. For example, if there are three events CREATE(‘/user/jagane/testfile.tmp’), CLOSE(‘/user/jagane/testfile.tmp’) and RENAME(‘/user/jagane/testfile.tmp’ to ‘/user/jagane/testfile’), these three events must be processed in order. Processing the RENAME before the CREATE makes no sense. We specify the HDFS Path as the SQS Message Group ID. This ensures that critical operations are processing in the right order.
Step by step instructions
The rest of this article is devoted to a step by step guide for running this software.
AWS Console: Create an IAM user with permission to write to the SQS Queue
First, we create the policy permit-sqs-write-hdfs-test. If you observe the screen capture, you will notice that we are permitting all ‘SQS Write’ actions on all the SQS Queues in this AWS account. Once the SQS Queue has been created we will come back and tighten the permissions.
Next, we create the user hdfs-test-sqs-writer and assign the permit-sqs-write-hdfs-test policy to it. Generate an access key for this user and save the access key id and secret access key. This will be used when running the hdfs-inotify-example
AWS Console: Create an IAM Role for the AWS Lambda that will process the SQS Messages
First, create the permissions policy called permit-all-hdfs-test that allows all access to our SQS queue. Note that in the screenshot below, we allow all SQS operations on all the queues in that account. Once we create the SQS queue, we will come back to the IAM console and tighten permissions
Next, create a role called role-for-hdfs-test-lambda for the hdfs-test lambda and assign it two permissions:
AWSLambdaBasicExecutionRole
permit-all-hdfs-test
AWS Console: Create the SQS Queue
While creating the FIFO queue, I turned on ‘High Throughput’ as you can see below
Further, I specified that the user hdfs-test-sqs-writer is the only one who can send messages to this queue. I also specified that that role-for-hdfs-test-lambda is the only role that can read from this sqs queue.
AWS Console: Tighten permissions
At this point, for security reasons, you should go back to the permissions policies permit-sqs-write-hdfs-test and permit-all-hdfs-test and modify them, restricting them to just the queue that we created in the previous step
AWS Console: Create Lambda that logs SQS Queue Entries to CloudWatch
In this example, I am going to create an AWS Lambda function from the Lambda console. The following screenshot shows that I am ‘Authoring from Scratch’ a lambda function called hdfs-test-lambda using Python 3.9
import json
def lambda_handler(event, context):
if ‘Records’ in event:
for record in event[‘Records’]:
inotify_entry = json.loads(record[‘body’])
print(json.dumps(inotify_entry))
return {
‘statusCode’: 200,
‘body’: json.dumps(‘Hello from Lambda!’)
}
Replace the code in the lambda with the code shown above
AWS Console: Change Lambda Execution Role
Now, we switch the execution role for this Lambda to the one we created earlier — this role allows the lambda to read from the SQS Queue. This is done by going to Lambda -> Configuration -> Permissions
AWS Console: Configure SQS Queue to trigger Lambda
Go to the SQS Queue Configuration Page, choose ‘Lambda Triggers’ and setup the newly created Lambda as the trigger for this SQS Queue.
Hadoop: Compile and run hdfs-inotify-example
First, checkout the code
git clone https://github.com/jagane-infinstor/hdfs-inotify-example.git
Next, compile it
mvn clean package
Run the program:
java -jar target/hdfs-inotify-example-uber.jar hdfs://hpworkstation.example.com:9000/ ‘AKIAAAAAAAAAAAAAAA’ ‘vnrneilugihuwgtu3495rehgkqewkjgherjkkg’ https://sqs.us-east-1.amazonaws.com/123456789012/hdfs-test.fifo
Note that in the above command, my HDFS is available at hdfs://hpworkstation.example.com:9000/ and my queue is at https://sqs.us-east-1.amazonaws.com/123456789012/hdfs-test.fifo
Also, note that I am specifying the AWS Access Key ID and the Secret Access Key for the IAM User hdfs-test-sqs-writer that we created earlier. Specifying the Access Key ID and Secret Access Key in the command line has some security risks.
Finally, the command line output should be something like the following:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Entering poll no block
returned batch is not null
getTxidsBehindEstimate=0
TxId = 496704
event type = CLOSE
CloseEvent [path=/user/Robin.Gallagher/test1/hpworkstation.example.com/99, fileSize=1048576, timestamp=1675972595891]
JSON={“type”: “CLOSE”,”path”: “/user/Robin.Gallagher/test1/hpworkstation.example.com/99”,”timestamp”: “1675972595891”,”fileSize”: “1048576”}
Entering poll no block
returned batch is null
Sending 1 events
Entering poll with block
Finished poll with block
returned batch is null
Entering poll with block
If you now go over to the CloudWatch Console and lookup the Log Group /aws/lambda/hdfs-test-lambda, you will see something like the following:
I have tested this software at scale, and am happy to report that the inotify listener is robust and the SQS Queue has no difficulty keeping up.
Thats all folks!