Publish to RabbitMQ from AWS Lambda

The SNS and SQS services offered by AWS are great messaging services and play well with Lambda, but when it comes to messaging, it takes two to tango. What if you are working with Customers, Vendors, Partners or other external entities that already have an established messaging platform? The onus may very well be on you to integrate into their platform of choice.

RabbitMQ (http://www.rabbitmq.com/)

If you’re lucky they are using RabbitMQ. RabbitMQ is a modern Messaging Broker built on the new AMQP protocol and championed by Pivotal. It offers many advantages over older messaging systems such as JMS (IMHO) and is gaining in popularity. AWS Lambda functions can publish messages to RabbitMQ, and this post will present a simple example. This post is designed for people familiar with AWS Lambda using Python, but new to RabbitMQ.

RabbitMQ in the Cloud

The only thing better than on premise RabbitMQ is RabbitMQ hosted in the cloud. CloudAMQP has you covered.

Sign up for a free Little Lemur sized instance at: https://cloudampq.com

Once you’ve signed up, the Instance detail page will show you your hostname, user and password. That’s all you need to connect to your instance.

Queue Setup

On the instance detail page, click the green RabbitMQ Manager button. This takes you to the delivered RabbitMQ web console. This console is quite helpful during development and management afterwards. Click on the Queues tab:

In the ‘Add a new queue’ section, enter Lambda into the Name box and click the Add queue button. You’ve created a Queue that’s ready to be used! Your Queues page should look like:

AWS Lambda Function to Publish to Queue

Pika Layer

Pika is a pure python AMQP client listed on the RabbitMQ site. I’ve created a Lambda Layer with it in us-west-1 and made it accessible to all AWS accounts. Continue here in us-west-1 if you’re able, or I can put the layer into other regions if you post a request in the comments.

arn:aws:lambda:us-west-1:928226111567:layer:PikaClient:1

The Lambda Function

Okay, getting close now! Create a new Lambda Function from scratch with the Python 3.6 runtime and an execution role that gives the function access to AWS Key Management Service and Amazon CloudWatch Logs.

Add the PikaClient Layer to the function. Copy and Paste the below code (and add blank lines as needed that Medium removes) to the function:

import pika   #Python AMQP Library
import boto3
import os
import datetime
from base64 import b64decode
# get Environment Variables
RABBIT_HOST = os.environ['RABBIT_HOST']
RABBIT_USER = os.environ['RABBIT_USER']
RABBIT_PWD_ENCRYPTED = os.environ['RABBIT_PWD']
# Decrypt Password
RABBIT_PWD_DECRYPTED = boto3.client('kms').decrypt(CiphertextBlob=b64decode(RABBIT_PWD_ENCRYPTED))['Plaintext']
credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PWD_DECRYPTED)
parameters = pika.ConnectionParameters(credentials=credentials, ssl=True, host=RABBIT_HOST, virtual_host=RABBIT_USER)  #CloudAMQP sets the vhost same as User
# parameters and credentials ready to support calls to RabbitMQ
def lambda_handler(event, context):

connection = pika.BlockingConnection(parameters) #Establishes TCP Connection with RabbitMQ
    channel = connection.channel()  #Establishes logical channel within Connection

channel.basic_publish(exchange='', routing_key='Lambda', body='Howdy RabbitMQ, Lambda Here!! ' + datetime.datetime.now().strftime("%I:%M%p on %B %d, %Y") + ' UTC') #Send Message

connection.close() #Close Connection and Channel(s) within

Add the Environment Variables shown below with the values for your RabbitMQ instance (and encrypt the password variable):

Environment Variables

Save the Function, create a Test Event, and Test! A message should have been published to RabbitMQ.

Check RabbitMQ for your message

Go back to the RabbitMQ web console, to the Queues tab, and click on the Queue we created earlier, Lambda, to get to the detail page for the Queue:

Queue detail page

Expand the Get Messages section, change the Ack Mode to Ack message requeue false and click the Get Message(s) button. The web console pulls the message from the Queue and displays the body!

Hopefully your message made it to the Queue and you were able to get it. If not check your Lambda execution results, Pika gives pretty descriptive errors. If you’re still not able to get it working, post a comment and I will do my best to help.

In my next post I will show you how to consume a RabbitMQ message from a Lambda Function.

A Note on Security

The code sets the connection to use SSL/TLS to protect the message contents and the instance password. This appears to be working with the developer only having to set a flag to True. This is a testament to the quality of both AWS and CloudAMQP. When working with on premise messaging, I like to confirm the encryption by using a proxy or checking with my friendly (hopefully) networking team. Being in the cloud makes this a bit trickier. Check with your cloud providers to confirm your messages are encrypted before putting sensitive information in your messages. If you think your RabbitMQ instance password may be compromised, change it.

Extra Credit

The above example is the “cleanest” way, it opens and closes the connection with each function call. If you publish numerous messages, this open and close for each message will be needlessly expensive…

In this case a better way to do it is the code below. This code opens the connection/channel outside the lambda_handler and eliminates the close. This is similar to how AWS recommends opening connections to databases from Lambda. This connection/channel is then used for subsequent function calls. The connection/channel stays open until the Lambda container is torn down or the RabbitMQ server terminates it. You can confirm the connection/channel is still open by checking the connection/channel tabs in the RabbitMQ web console after your function has finished:

Connection open after function is done

Persistent Connection code:

import pika   #Python AMQP Library
import boto3
import os
import datetime
from base64 import b64decode
# get Environment Variables
RABBIT_HOST = os.environ['RABBIT_HOST']
RABBIT_USER = os.environ['RABBIT_USER']
RABBIT_PWD_ENCRYPTED = os.environ['RABBIT_PWD']
# Decrypt Password
RABBIT_PWD_DECRYPTED = boto3.client('kms').decrypt(CiphertextBlob=b64decode(RABBIT_PWD_ENCRYPTED))['Plaintext']
credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PWD_DECRYPTED)
parameters = pika.ConnectionParameters(credentials=credentials, ssl=True, host=RABBIT_HOST, virtual_host=RABBIT_USER)  #CloudAMQP sets the vhost same as User
# parameters and credentials ready to support calls to RabbitMQ
connection = pika.BlockingConnection(parameters) #Establishes TCP Connection with RabbitMQ
channel = connection.channel()  #Establishes logical channel within Connection
def lambda_handler(event, context):
    channel.basic_publish(exchange='', routing_key='Lambda', body='Howdy RabbitMQ, Lambda Here!! ' + datetime.datetime.now().strftime("%I:%M%p on %B %d, %Y") + ' UTC') #Send Message