Consume a RabbitMQ message from AWS Lambda
In my previous post I gave a very brief explanation of RabbitMQ and showed you how to publish a message from AWS Lambda to RabbitMQ (hosted in the cloud of course). But messages that just sit in a queue don’t help anyone, so let’s grab those messages in the queue using AWS Lambda again.
RabbitMQ Setup and Message Publish
If you didn’t go through my previous post on publishing to RabbitMQ, do so now. It will get you setup with RabbitMQ in the cloud and a Lambda Function to publish Messages. As before this post is designed for people already familiar with Python Lambda Functions.
The SNS and SQS services offered by AWS are great messaging services and play well with Lambda, but when it comes to…medium.com
AWS Lambda Function: Get a Message from the Queue
Create a new Lambda Function from scratch with the Python 3.6 runtime and an execution role that gives the function access to AWS Key Management Service and Amazon CloudWatch Logs.
Add the PikaClient Layer you used in the previous post to the function. Copy and Paste the below code (and add blank lines as needed for readability that Medium removes) to the function:
import pika #Python AMQP Library
from base64 import b64decode
# get Environment Variables
RABBIT_HOST = os.environ['RABBIT_HOST']
RABBIT_USER = os.environ['RABBIT_USER']
RABBIT_PWD_ENCRYPTED = os.environ['RABBIT_PWD']
# Decrypt Password
RABBIT_PWD_DECRYPTED = boto3.client('kms').decrypt(CiphertextBlob=b64decode(RABBIT_PWD_ENCRYPTED))['Plaintext']
credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PWD_DECRYPTED)
parameters = pika.ConnectionParameters(credentials=credentials, ssl=True, host=RABBIT_HOST, virtual_host=RABBIT_USER) #CloudAMQP sets the vhost same as User
# parameters and credentials ready to support calls to RabbitMQ
def lambda_handler(event, context):
connection = pika.BlockingConnection(parameters) #Establishes TCP Connection with RabbitMQ
channel = connection.channel() #Establishes logical channel within Connection
method_frame, header_frame, body = channel.basic_get('Lambda') # consume next single message, if any
print ('MESSAGE BODY: ' + body.decode('UTF-8'))
print ('No message returned')
connection.close() #Close Connection and Channel(s) within
Add the Environment Variables shown below with the values for your RabbitMQ instance (and encrypt the password variable):
Save the Function, create a Test Event, and Test! If a message was in the Queue from the function you developed in the previous post, it will display the message body in the log. Otherwise the log will have an entry that states there was no message in the Queue:
This code is similar in approach to the previous publish. A Connection/Channel is established to RabbitMQ. Then a basic_get is called. This gets one (and only one message) from the Queue named Lambda if there are messages present in the Queue.
If a message is present it prints out the message body, and then calls a basic_ack. This lets RabbitMQ know that the message was received and to remove it from the Queue. The connection is then closed.
This is an extremely simple example. In general, message consumption is usually a little more complex than message publishing. In many use cases a consumer will keep a persistent connection open and receive messages as soon as they are placed on the Queue. Or it may connect, receive all messages that exist on the Queue and then disconnect. How you implement your message consumption depends entirely on your use case.
I hope you found these posts helpful and you were able to publish and consume RabbitMQ messages from AWS Lambda. Happy Messaging!