Simple python client for a SQS emulator

Introduction

This is a post about how to write a simple python client for a sqs emulator, for the purpose of functional testing.

How to run the queue emulator

Pre-requisites:

  • docker
  • docker-compose

The easiest way to run a queue emulator in a deployed environment or locally is to use a docker image. There is a publicly available base image that you can use, you just need to add your configuration of the queues on top of that.

Create a config file called emulator.conf:

include classpath("application.conf")
queues {
  test-queue {
    fifo = true
  }
}

Create a Dockerfile that adds the config in a location where the emulator can find it:

FROM softwaremill/elasticmq-native
COPY emulator.conf /opt/elasticmq.conf

The emulator will run on port 9324 by default, so you can spin it up with docker-compose up using this configuration in your docker-compose.yaml file:

services:
  emulator:
    build: emulator
    ports:
      - "9324:9324"

How to create a simple python emulator client

In order to interact with the emulator and test the functionality it supports, you can use a simple python client that looks like this:

import json
import uuid
import boto3


class SqsEmulatorClient:

    def __init__(self):
        self._client = boto3.client(
            "sqs",
            region_name="elasticmq",
            endpoint_url="http://localhost:9324",
            aws_access_key_id="secret-key-id",
            aws_secret_access_key="secret-key",
            use_ssl=False,
        )
        self._queue = "test-queue.fifo"
        self._queue_url = self._get_queue_url(self._queue)

    def send_message(self, content: str) -> str:
        message_data = {
            "content": content,
        }
        message = (
            """{
        "Type": "Notification",
        "MessageId":"""
            + f'"some-message-id"'
            + """,
        "Message": """
            + json.dumps(message_data)
            + """,
        }"""
        )

        response = self._client.send_message(
            QueueUrl=self._queue_url,
            MessageBody=message,
            MessageGroupId=f"{uuid.uuid4()}",
            MessageDeduplicationId=f"{uuid.uuid4()}",
        )
        return response["ResponseMetadata"]["HTTPStatusCode"]

    def read_messages(self):
        messages = self._client.receive_message(
            QueueUrl=self._queue_url,
            WaitTimeSeconds=5,
            MaxNumberOfMessages=10,
            ReceiveRequestAttemptId=str(uuid.uuid4()),
        )
        message_list = messages.get("Messages")
        return message_list

    def delete_message(self, receipt_handle: str):
        self._client.delete_message(QueueUrl=self._queue_url, ReceiptHandle=receipt_handle)

    def purge_queue(self):
        self._client.purge_queue(QueueUrl=self._queue_url)

    def _get_queue_url(self, queue_name: str) -> str:
        return self._client.get_queue_url(QueueName=queue_name)["QueueUrl"]

The only pre-requisite for this client is the boto3 library. Assuming you are using poetry to install dependencies, run:

poetry add boto3

Now you can interact with the emulator using the client above.

Send a message and read it back.

>>> client = SqsEmulatorClient()
>>> client.send_message("Hello")
200
>>> client.read_messages()
[{'MessageId': '1d2d8ca9-0165-4405-ae05-048a6f4559d8', 'ReceiptHandle': '1d2d8ca9-0165-4405-ae05-048a6f4559d8#df729bbc-706d-4ff7-941f-36f6772aec21', 'MD5OfBody': '4192ab737454363e4a69257e68315cf3', 'Body': '{\n        "Type": "Notification",\n        "MessageId":"some-message-id",\n        "Message": {"content": "Hello"},\n        }'}]

Send another messages and read both messages.

>>> client.send_message("Moose")
200
>>> client.read_messages()
[{'MessageId': 'd1abaa75-5d94-4977-b49a-cbf9a6c8d4d4', 'ReceiptHandle': 'd1abaa75-5d94-4977-b49a-cbf9a6c8d4d4#f15b5809-e792-499f-a74e-087dbb645427', 'MD5OfBody': '41a63a8331c41a520929af9f3575fbc7', 'Body': '{\n        "Type": "Notification",\n        "MessageId":"some-message-id",\n        "Message": {"content": "Moose"},\n        }'}, 
{'MessageId': '1d2d8ca9-0165-4405-ae05-048a6f4559d8', 'ReceiptHandle': '1d2d8ca9-0165-4405-ae05-048a6f4559d8#6c687b57-b751-4fee-b164-8e9abbb22af8', 'MD5OfBody': '4192ab737454363e4a69257e68315cf3', 'Body': '{\n        "Type": "Notification",\n        "MessageId":"some-message-id",\n        "Message": {"content": "Hello"},\n        }'}]
>>>

Please note that reading a message does not remove it from the queue, you need to delete it if you want it removed. In order to remove a message, you need to delete it from the queue using the receipt handle.

>>> client.delete_message(receipt_handle="d1abaa75-5d94-4977-b49a-cbf9a6c8d4d4#f15b5809-e792-499f-a74e-087dbb645427")
>>> client.read_messages()
[{'MessageId': '1d2d8ca9-0165-4405-ae05-048a6f4559d8', 'ReceiptHandle': '1d2d8ca9-0165-4405-ae05-048a6f4559d8#35c591a7-f3ae-4fd2-96ed-be35e5f14173', 'MD5OfBody': '4192ab737454363e4a69257e68315cf3', 'Body': '{\n        "Type": "Notification",\n        "MessageId":"some-message-id",\n        "Message": {"content": "Hello"},\n        }'}]
>>>

You can also purge the entire queue:

>>> client.purge_queue()
>>>
>>> client.read_messages()
[]

Summary

Emulating a sqs queue for the purpose of testing is a very straight forward task, and it removes the dependency on a real queue for your testing. This also means that your local, ci and deployed testing can use the exact same mechanism for validating your code.