Message Queues
Through the module fluke.queues Fluke provides a straightforward API that makes it easy to interact with various message queue services in the cloud, such as Amazon SQS and Azure Queue Storage. The simplicity of this API comes from the fact that it is based on just three basic operations:
Push - Sending messages to a queue.
Peek - Getting a glimpse of the messages within a queue.
Poll - Fetching messages from a queue, thereby deleting them.
Below we’ll take a closer look on each one.
Pushing messages into a queue
As already mentioned, the push operation is used in order
to send messages to the queue. After gaining access to a queue,
you can simply invoke its push
method, providing it with
the string message you wish to send to the queue:
from fluke.auth import AWSAuth
from fluke.queues import AmazonSQSQueue
auth = AWSAuth(
aws_access_key_id="aws_access_key",
aws_secret_access_key="aws_secret_key")
with AmazonSQSQueue(auth=auth, queue='queue') as queue:
is_delivered = queue.push(message="Hello!")
In response to invoking the push
method, a boolean value
is returned which indicates whether the message has been
successfully delivered to the queue or not.
Peeking at messages in a queue
Next up, you might like to take a peek at the messages in a
queue without actually deleting them. This operation can come
quite handy when you want for example to observe the format of the
messages within a queue so that you are able to build a parser in
order to extract the information you are interested in. Whatever its use,
peeking at messages is possible by invoking a queue instance’s peek
method, which goes on to return a list that contains up to 10 messages
found within the queue:
from fluke.auth import AWSAuth
from fluke.queues import AmazonSQSQueue
auth = AWSAuth(
aws_access_key_id="aws_access_key",
aws_secret_access_key="aws_secret_key")
with AmazonSQSQueue(auth=auth, queue='queue') as queue:
messages = queue.peek()
Warning
Please note that even though the peek
method does not explicitly
delete messages, it can still result in a message being removed from
the queue, as every time we fetch a message its receive count is
increased by one. Most queues are configured in such a way so that if
a message is fetched a specific number of times, then it is automatically
deleted.
Polling messages from a queue
Finally, you want to be able to fetch messages from a queue
while at the same time removing them from it entirely. This
is where the poll operation comes in. By invoking a queue
instance’s poll
method, you essentially get access to an
iterator that is capable of going through the queue’s messages
in distinct batches, removing any received messages in the process
of doing so:
from fluke.auth import AWSAuth
from fluke.queues import AmazonSQSQueue
auth = AWSAuth(
aws_access_key_id="aws_access_key",
aws_secret_access_key="aws_secret_key")
with AmazonSQSQueue(auth=auth, queue='queue') as queue:
for batch in queue.poll(batch_size=10):
for msg in batch:
# Process the message...
By default, the poll
method will go on to fetch all of the messages
available in the queue. If you just want to fetch a particular
number of messages, you can set the method’s num_messages
parameter accordingly:
from fluke.auth import AWSAuth
from fluke.queues import AmazonSQSQueue
auth = AWSAuth(
aws_access_key_id="aws_access_key",
aws_secret_access_key="aws_secret_key")
with AmazonSQSQueue(auth=auth, queue='queue') as queue:
for batch in queue.poll(num_messages=100, batch_size=10):
for msg in batch:
# Process the message...
Continuous polling
When either all messages or a subset of them has been successfully received,
depending on the value of num_messages
, the iterator reaches its ending
point and the program carries on. If you wish to keep on polling the queue for
any new messages, then this is possible by setting the polling_frequency
parameter:
from fluke.auth import AWSAuth
from fluke.queues import AmazonSQSQueue
auth = AWSAuth(
aws_access_key_id="aws_access_key",
aws_secret_access_key="aws_secret_key")
with AmazonSQSQueue(auth=auth, queue='queue') as queue:
for batch in queue.poll(
num_messages=100,
batch_size=10,
polling_frequency=60
):
for msg in batch:
# Process the message...
By defining a value for this parameter, each time the specified number
of messages has been fetched from the queue, the program will wait for
a total number of seconds equal to the value of polling_frequency
,
at which point the queue is polled again until either a total of num_messages
messages are received or the queue is empty. This goes on indefinitely until
either an unhandled exception occurs or the process is explicitly killed.
Pre- vs post-delivery delete
Whenever a batch of messages is delivered, the messages within
the batch have not yet actually been deleted from the queue,
and will only be deleted just before the next batch of messages arrives,
after all messages within the current batch have already been processed.
This is the queue’s default behaviour when polling messages, as defined
by the default value of parameter pre_delivery_delete
, and it guarantees
that no batch of messages will be lost in case something goes wrong during
their processing:
from fluke.auth import AWSAuth
from fluke.queues import AmazonSQSQueue
auth = AWSAuth(
aws_access_key_id="aws_access_key",
aws_secret_access_key="aws_secret_key")
with AmazonSQSQueue(auth=auth, queue='queue') as queue:
for batch in queue.poll():
# Although an exception is raised, no messages will be lost
# as, at this point, the messages within the batch have yet
# to be deleted.
raise Exception()
Nevertheless, if something goes wrong after we have already processed one or more messages within the batch, then there is always the danger of reprocessing the exact same messages as they may be included in any one of the subsequent batches:
from fluke.auth import AWSAuth
from fluke.queues import AmazonSQSQueue
auth = AWSAuth(
aws_access_key_id="aws_access_key",
aws_secret_access_key="aws_secret_key")
with AmazonSQSQueue(auth=auth, queue='queue') as queue:
for batch in queue.poll(num_messages=2, batch_size=2):
for i, msg in enumerate(batch):
if i == 0:
# Process message...
else:
raise Exception()
In the example above, the first message of the batch will be processed as expected. However, during the processing of the second message, an exception will be thrown, causing the program to exit. This means that despite the second message being the only one that was not processed, the queue still contains both messages, as the next batch of messages never arrived.
If you want to avoid this, you can set pre_delivery_delete
to True
.
This results in any batch of messages being removed from the queue before
they are actually delivered to you:
from fluke.auth import AWSAuth
from fluke.queues import AmazonSQSQueue
auth = AWSAuth(
aws_access_key_id="aws_access_key",
aws_secret_access_key="aws_secret_key")
with AmazonSQSQueue(auth=auth, queue='queue') as queue:
for batch in queue.poll(pre_delivery_delete=True):
# At this point, all messages within the batch
# have already been removed from the queue.
As a general rule of thumb, set pre_delivery_delete
to False
if you don’t mind processing the same message twice and are more concerned
in losing a message without processing it first, whereas set pre_delivery_delete
to True
if you cannot afford to process the same message more than once.
Warning
Pre- and post-delivery are concepts related to Fluke and do not
apply to the underlying message queue services. Note that you
should always enforce your own rules in order to check for duplicate
messages if this is a matter of concern to you, as such messages
can still arrive regardless of the value of pre_delivery_delete
.
For instance, the default standard queues offered by Amazon SQS
support at-least-once delivery which dictates that any messages
pushed to the queue will be delivered to you at least once, but there
is always the chance that a message be re-delivered due to the distributed
nature of the queues. In order to avoid any suprises, be sure to read the
specifications of the type of queue you are working with and take any
necessary precautions, no matter whether you are accessing said queue
through its standard API or through the API provided by Fluke.