Tag SQS

Idempotent Django Email Sender with Amazon SQS and Memcache

Recently I wrote about my problem with duplicate Amazon SQS messages causing multiple emails for Difio. After considering several options and feedback from @Answers4AWS I wrote a small decorator to fix this.

It uses the cache backend to prevent the task from executing twice during the specified time frame. The code is available at https://djangosnippets.org/snippets/3010/.

As stated on Twitter you should use Memcache (or ElastiCache) for this. If using Amazon S3 with my django-s3-cache don't use the us-east-1 region because it is eventually consistent.

The solution is fast and simple on the development side and uses my existing cache infrastructure so it doesn't cost anything more!

There is still a race condition between marking the message as processed and the second check but nevertheless this should minimize the possibility of receiving duplicate emails to an accepted level. Only time will tell though!

There are comments.

Duplicate Amazon SQS Messages Cause Multiple Emails

Beware if using Amazon Simple Queue Service to send email messages! Sometime SQS messages are duplicated which results in multiple copies of the messages being sent. This happened today at Difio and is really annoying to users. In this post I will explain why there is no easy way of fixing it.

Q: Can a deleted message be received again?

Yes, under rare circumstances you might receive a previously deleted message again. This can occur in the rare situation in which a DeleteMessage operation doesn't delete all copies of a message because one of the servers in the distributed Amazon SQS system isn't available at the time of the deletion. That message copy can then be delivered again. You should design your application so that no errors or inconsistencies occur if you receive a deleted message again.

Amazon FAQ

In my case the cron scheduler logs say:

>>> <AsyncResult: a9e5a73a-4d4a-4995-a91c-90295e27100a>

While on the worker nodes the logs say:

[2013-12-06 10:13:06,229: INFO/MainProcess] Got task from broker: tasks.cron_monthly_email_reminder[a9e5a73a-4d4a-4995-a91c-90295e27100a]
[2013-12-06 10:18:09,456: INFO/MainProcess] Got task from broker: tasks.cron_monthly_email_reminder[a9e5a73a-4d4a-4995-a91c-90295e27100a]

This clearly shows the same message (see the UUID) has been processed twice! This resulted in hundreds of duplicate emails :(.

Why This Is Hard To Fix

There are two basic approaches to solve this issue:

  • Check some log files or database for previous record of the message having been processed;
  • Use idempotent operations that if you process the message again, you get the same results, and that those results don't create duplicate files/records.

The problem with checking for duplicate messages is:

  • There is a race condition between marking the message as processed and the second check;
  • You need to use some sort of locking mechanism to safe-guard against the race condition;
  • In the event of an eventual consistency of the log/DB you can't guarantee that the previous attempt will show up and so can't guarantee that you won't process the message twice.

All of the above don't seem to work well for distributed applications not to mention Difio processes millions of messages per month, per node and the logs are quite big.

The second option is to have control of the Message-Id or some other email header so that the second message will be discarded either at the server (Amazon SES in my case) or at the receiving MUA. I like this better but I don't think it is technically possible with the current environment. Need to check though.

I've asked AWS support to look into this thread and hopefully they will have some more hints. If you have any other ideas please post in the comments! Thanks!

There are comments.

Tip: Caching Large Objects for Celery and Amazon SQS

Some time ago a guy called Matt asked about passing large objects through their messaging queue. They were switching from RabbitMQ to Amazon SQS which has a limit of 64K total message size.

Recently I've made some changes in Difio which require passing larger objects as parameters to a Celery task. Since Difio is also using SQS I faced the same problem. Here is the solution using a cache back-end:

from celery.task import task
from django.core import cache as cache_module

def some_method():
    ... skip ...

    task_cache = cache_module.get_cache('taskq')
    task_cache.set(uuid, data, 3600)

    handle_data.delay(uuid)

    ... skip ...

@task
def handle_data(uuid):
    task_cache = cache_module.get_cache('taskq')
    data = task_cache.get(uuid)

    if data is None:
        return

    ... do stuff ...

Objects are persisted in a secondary cache back-end, not the default one, to avoid accidental destruction. uuid parameter is a string.

Although the objects passed are smaller than 64K I haven't seen any issues with this solution so far. Let me know if you are using something similar in your code and how it works for you.

There are comments.


Page 1 / 1