Integration testing in Python — RabbitMQ

When testing we mostly think about unit-testing. Even though the lines are a bit fuzzy most agree that a unit-test needs to run without any external dependencies and that it must run fast. In most cases, a unit is a single method or class that we test to see if it gives the expected output. If the unit has dependencies they should be exchanged for a test double/mock to make sure that we only test the logic inside the unit and not the workings of the dependencies.

But we can’t test everything using a unit-test. To expand our test coverage, one way to go is to use integration-tests. When using an integration we want to test how a unit interacts with other units or external dependencies.

Motivation for integration tests

To motivate why integration tests are needed in some cases I have a few examples. Also look at this discussion on stack exchange about the topic.

import mysql.connector


def get_data_from_mysql(cnx, key):
cursor = cnx.cursor(prepared=True)

query = "SELECT field FROM items WHERE key = %s"

cursor.execute(query, (key,))

total = 0
for row in cursor:
total += row[0]

cursor.close()

return total

snippet.BUILD view raw

This method has an external dependency, the database connection. It does a simple SELECT to fetch one or more rows, which it loops and returns the sum of a field. Simple enough?

In a normal unit test, we would mock the database connection and make it return a known value for the rows. Like in the test below.

import unittest.mock
from unittest import TestCase
from unittest.mock import MagicMock
from mysql.connector.cursor import MySQLCursorPrepared


class TestMySQL(TestCase):
def setUp(self):
self.connection = unittest.mock.create_autospec(mysql.connector.MySQLConnection)
cursor = MagicMock(MySQLCursorPrepared)
cursor.fetchall = MagicMock(return_value=[[2], [3]])
self.connection.cursor = MagicMock(return_value=cursor)


def test_that_sum_is_calculated_correctly(self):
self.assertEqual(5, get_data_from_mysql(self.connection, "testkey"))


if __name__ == '__main__':
unittest.main()

snippet.BUILD view raw

With this test, we can validate that the business logic, the loop, works as expected. Jobs done? not really.

Example — difference in expectations

Did you notice the problem that the SQL statement selects a field called field but that it was actually a spelling error since the fields actual name in the database schema is: fields? No of course not, I did not show you the schema of the database, so you could not have known. Also, the keyword key, is a reserved word and it is not escaped.

This is one case of errors that a unit-test can’t test for, that our expectation of how a dependency is working is wrong. Because we exchange the dependencies, for objects that we control, the test world is incomplete and different to the production environment. Of course, this is what we want in a unit test since we only want to test the business logic and not the interaction with the dependencies. But they can’t stand alone always. You could argue that the SQL part should be refactored to a separate object that has database communication as its responsibility, but that is not my point.

Example — unexpected behavior

Another example is when the SQL statement does not work as expected. In the example above the SELECT statement would look like this “SELECT field FROM items WHERE `key` = 1234”. If the key field is a string field and this is run on a MySQL server we have the following gotcha.

Given three rows like this

key field 1234 5 9999 8 1234–6 5

The above query will return both the first and the last row. This is because in the statement 1234 is an integer, and to compare the key field it is cast to an integer. Causing both rows to have the key value 1234. This is a very subtle error, even if we follow the advice of many test advocates to use an in-memory database like SQLite, we would probably never capture this error. The only way to do it is to set up an integration environment with the actual dependency, MySQL, and set up a test case for it.

I hope the examples motivates why integration tests might be a good idea in some cases.

Testing a RabbitMQ queue

As motivated above we can’t rely only on unit tests. We also need integration tests. On a project, I’m working on I wanted to use RabbitMQ as the message queue platform. But I’m not very familiar with this software, so to avoid making errors in the implementation I decided to test my implementation thoroughly using integration tests. To my surprise, I was unable to find any example implementation of integration testing a queue. I was able to find many examples of integration testing databases but none for queues. If anyone knows of a good example of integration testing queues please write in the comments.

In my project, I have multiple workers/consumers, that connect to a queue and consumes the messages. To consume a message a simple callback function is invoked. Like the example below

import pika


class SimpleConsumer(object):
def __init__(self, channel, consume_queue_name):
self.channel = channel

self.properties = pika.BasicProperties(
delivery_mode=2, # make message persistent
)

self.channel.queue_declare(queue=consume_queue_name, durable=True)

self.channel.basic_consume(self.callback,
queue=consume_queue_name,
no_ack=False)

self.channel.start_consuming()

def callback(self, ch: pika.adapters.blocking_connection.BlockingChannel, method, _, body):
print(body)

ch.basic_ack(delivery_tag=method.delivery_tag)


if __name__ == '__main__':
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

consumer = SimpleConsumer(connection.channel(), 'queue_name')

snippet.BUILD view raw

Even if you don’t have prior experience with queues this should be easy enough to understand. It connects to a queue on localhost called “queue_name”. Whenever a message appears in the queue the callback function is called. This prints the message to the console and acknowledges the message.

This implementation ignores many details of interaction with a queue. For example, what will happen if a message is invalid, if the channel is closed, if the queue crashes aso. Even though the “business logic” of this consumer is almost too simple, the print statement, it is part of the class that also interacts with the queue service. This is violating the Single responsibility principle. Which for larger consumers can cause problems.

We want to have a Consumer class that handles all the details of the queue interaction and has the business logic as an injected dependency. This gives us the advantage that we can integration test a generic consumer without worrying about any business logic.

Splitting out the business logic from the consumer

We can inject the callback method into the GenericConsumer and if the callback returns true, the consumer knows that the message was processed correctly, and if it returns false it knows that the processing failed.

import pika


class GenericConsumer(object):
def __init__(self, channel, consume_queue_name, callback):
self.channel = channel
self.callback = callback

self.properties = pika.BasicProperties(
delivery_mode=2, # make message persistent
)

self.channel.queue_declare(queue=consume_queue_name, durable=True)

self.channel.basic_consume(self.callback,
queue=consume_queue_name,
no_ack=False)

self.channel.start_consuming()

def callback(self, ch: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
if self.callback(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
# failed to handle message
pass


if __name__ == '__main__':
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

def callback(ch, method, properties, body):
print(body)
return True

consumer = GenericConsumer(connection.channel(), 'queue_name', callback)

snippet.BUILD view raw

This handles the split of the consumer and the business code.

Next step is to set up the integration test.

Setting up an integration test

An important key point with unit tests is that they should have no external dependencies. They should be guaranteed to always run and they always provide the same result. When running an integration test we need access to the actual dependency, which causes a few extra problems. The dependency needs to be reset to a known state before each test. And the dependency needs to be installed on the environment we run the tests on.

To integration test with RabbitMQ, a running instance is needed. This can easily be handled using pifpaf a small python tool that allows us to spin up an instance of many different dependencies, and automatically kill it after the test is done. And RabbitMQ is supported out of the box.

To make the set up easy I use tox to install the dependencies and run the test command. This can later integrate nicely with most CI/CD systems.

When I run the tox command it will install the dependencies, startup rabbitmq using pifpaf and run the test cases. You can get the full project here. It contains less than 10 files so it should be easy to get an overview. If you view the complete code for the consumer, you will notice that it is very event-driven. Almost all the code sets up a callback function to be called when a specific event happens. For example, when the connection is successfully opened, a callback is called, when a channel is opened, another callback is called.

I see this type of code as very difficult to unit test because much of the pika library needs to be mocked, which are error-prone in itself. So this is handled better using an integration test.

Getting around the blocking operation of consuming

Because waiting for messages is a blocking operation we will be unable to control the consumer from the test, it will just block everything. This is overcome by running the consumer as a thread. This allows us to have the consumer running while the test case can assert on its operation. This gives some extra challenges to overcome, but they are manageable.

Since the consumer runs in another thread we need a way to monitor its status. This is done through the logging interface, we implement a simple log handler that saves all log messages to memory. One example: when starting the consumer we need to wait until it is ready to consume messages before we assert anything. This is implemented by having a small loop wait for a specific message to be emitted in the log.

def wait_for_log_contains(self, text, level='info'):
timeout = 1
max_tries = 5
while max_tries is not 0:
if any(text in s for s in self.consumer_log_messages[level]):
return True

import time
time.sleep(timeout)
max_tries -= 1

self.fail("Log message was not found %s" % text)

snippet.BUILD view raw

Testing

Before any test is run a few prerequisites are needed.

For every test we do

  1. Clear the log handler so we have a clean state
  2. Set up a connection to RabbitMQ using pika
  3. Delete the queues that we will later use, to make sure we start in a known state

Testing that messages are accepted

The first and easiest test we want to perform is that messages published to the queue that the consumer listens on are processed and acknowledged. To do this a few steps are needed.

Then for this test case

  1. Publish a message to the queue and assert that there is 1 message in the queue. To make sure that the dependency works as expected
  2. Set up the BasicConsumer with a callback function that approves any message.
  3. Finally, assert that the queue is empty and that the callback was called once.

It is easy to see that an integration test consists of multiple steps, many more steps than a normal unit test would have.

def test_that_consume_on_valid_message_are_consumed_from_the_queue(self):
# publish a message
self.channel.basic_publish(
exchange='',
routing_key=self.queue_name,
body="dummymessage"
)

# see that the message is in the queue
res = self.create_queue(self.queue_name, True)
self.assertEqual(res.method.message_count, 1)

# accept any message
message_callback = MagicMock(return_value=True)

consumer = BasicConsumer(self.rabbitmq_url,
self.queue_name,
self.queue_arguments,
message_callback
)
# start consumer
consumer.start()

# wait for the consumer to be ready
self.wait_for_log_contains(BasicConsumer.CONSUMER_READY)

# wait for the consumer to signal that is has consumed a message
self.wait_for_log_contains(BasicConsumer.CONSUMER_PROCESSED_MESSAGE)

# assert that the queue is empty
res = self.create_queue(self.queue_name, True)
self.assertEqual(res.method.message_count, 0)

message_callback.assert_called_once()

# stop the consumer
consumer.join()

snippet.BUILD view raw

Testing rejected messages

As soon as we move away from the success path in software the road becomes more twisted and full of holes. So when handling a rejected message we need to take several decisions. I used this article by Lorna Mitchell as inspiration to what to do with messages that can’t be consumed.

There are two caused for rejecting messages, that needs to be handled differently. A message can be invalid, and it will never be possible to process it. Such a message should be thrown away. But a message that the consumer for whatever reason was unable to process temporarily should be rescheduled to be processed later.

This means that we need to make it possible for the callback to signal success, invalid and failed. True, will signal success, False will signal invalid and any Exception will signal temporary failure. Invalid messages might be interesting to take a look at later, so they will be sent to a “dead letters exchange”. If for example we by error deploy a new producer that published a new version of messages, and the consumers are not updated, then all messages will be invalid, and we don’t want to throw them away. So instead we reschedule them for a dead letters exchange. This gives us the option to later publish the messages back on the original queue.

First the test for invalid messages. To assert that the consumer behaves as expected the test asserts the following.

  1. Assert that the consumer processed a message
  2. Assert that the queue is empty
  3. Assert that the dead letters queue contains 1 message
  4. Assert that the callback was called once
def test_that_consume_on_invalid_message_does_moves_message_to_dead_letter_queue(self):
# publish a message
self.channel.basic_publish(
exchange='',
routing_key=self.queue_name,
body="dummymessage"
)

res = self.create_queue(self.queue_name, True)
self.assertEqual(res.method.message_count, 1)

message_callback = MagicMock(return_value=False)

consumer = BasicConsumer(self.rabbitmq_url,
self.queue_name,
self.queue_arguments,
message_callback
)
# start consumer
consumer.start()

# wait for the consumer to be ready
self.wait_for_log_contains(BasicConsumer.CONSUMER_READY)

# wait for the consumer to signal that is has consumed a message
self.wait_for_log_contains(BasicConsumer.CONSUMER_PROCESSED_MESSAGE)

# assert that the queue is empty
res = self.create_queue(self.queue_name, True)
self.assertEqual(res.method.message_count, 0)

# assert that the message is not in the dead-letters-queue
res = self.channel.queue_declare(queue=self.queue_name_dead_letters, passive=True)
self.assertEqual(res.method.message_count, 1)

message_callback.assert_called_once()

# stop the consumer
consumer.join()

snippet.BUILD view raw

The second test, temporary failures. It follows a bit different flow than the previous test.

  1. Assert that the consumer processed a message
  2. Assert that the message is still in the queue
  3. Assert that the callback was called

Notice that we can’t assert that the callback was called a single time because the message is rescheduled on the same queue the consumer will try to consume the message multiple times before the test finishes. This should be improved at a later time.

Finishing notes

The repository also contains a test for how the consumer handles reconnect on lost connection and a test for if it stops when it receives a close message from RabbitMQ. Many more tests could be added, but I think the test suite I created should give a reasonable coverage.

Integration testing creates an additional safety net compared to unit tests. But the tests are slower to run and are more fragile. I think the trouble is worth the extra effort. Especially in cases where the code will be difficult or impossible to test using unit tests.


Originally published at Datadriven-investment.com.