Consume a RabbitMQ message from AWS Lambda

Curtis Strain
learningsam

--

In my previous post I gave a very brief explanation of RabbitMQ and showed you how to publish a message from AWS Lambda to RabbitMQ (hosted in the cloud of course). But messages that just sit in a queue don’t help anyone, so let’s grab those messages in the queue using AWS Lambda again.

RabbitMQ Setup and Message Publish

If you didn’t go through my previous post on publishing to RabbitMQ, do so now. It will get you setup with RabbitMQ in the cloud and a Lambda Function to publish Messages. As before this post is designed for people already familiar with Python Lambda Functions.

AWS Lambda Function: Get a Message from the Queue

Create a new Lambda Function from scratch with the Python 3.6 runtime and an execution role that gives the function access to AWS Key Management Service and Amazon CloudWatch Logs.

Add the PikaClient Layer you used in the previous post to the function. Copy and Paste the below code (and add blank lines as needed for readability that Medium removes) to the function:

import pika   #Python AMQP Library
import boto3
import os
import datetime
from base64 import b64decode
# get Environment Variables
RABBIT_HOST = os.environ['RABBIT_HOST']
RABBIT_USER = os.environ['RABBIT_USER']
RABBIT_PWD_ENCRYPTED = os.environ['RABBIT_PWD']
# Decrypt Password
RABBIT_PWD_DECRYPTED = boto3.client('kms').decrypt(CiphertextBlob=b64decode(RABBIT_PWD_ENCRYPTED))['Plaintext']
credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PWD_DECRYPTED)parameters = pika.ConnectionParameters(credentials=credentials, ssl=True, host=RABBIT_HOST, virtual_host=RABBIT_USER) #CloudAMQP sets the vhost same as User# parameters and credentials ready to support calls to RabbitMQdef lambda_handler(event, context):


connection = pika.BlockingConnection(parameters) #Establishes TCP Connection with RabbitMQ
channel = connection.channel() #Establishes logical channel within Connection

method_frame, header_frame, body = channel.basic_get('Lambda') # consume next single message, if any
if method_frame:
print ('MESSAGE BODY: ' + body.decode('UTF-8'))
channel.basic_ack(method_frame.delivery_tag)
else:
print ('No message returned')

connection.close() #Close Connection and Channel(s) within

Add the Environment Variables shown below with the values for your RabbitMQ instance (and encrypt the password variable):

Environment Variables

Save the Function, create a Test Event, and Test! If a message was in the Queue from the function you developed in the previous post, it will display the message body in the log. Otherwise the log will have an entry that states there was no message in the Queue:

What’s Happening

This code is similar in approach to the previous publish. A Connection/Channel is established to RabbitMQ. Then a basic_get is called. This gets one (and only one message) from the Queue named Lambda if there are messages present in the Queue.

If a message is present it prints out the message body, and then calls a basic_ack. This lets RabbitMQ know that the message was received and to remove it from the Queue. The connection is then closed.

This is an extremely simple example. In general, message consumption is usually a little more complex than message publishing. In many use cases a consumer will keep a persistent connection open and receive messages as soon as they are placed on the Queue. Or it may connect, receive all messages that exist on the Queue and then disconnect. How you implement your message consumption depends entirely on your use case.

I hope you found these posts helpful and you were able to publish and consume RabbitMQ messages from AWS Lambda. Happy Messaging!

--

--