Embracing celery - Part 2 - Extinguishing
This post works upon internal library kombu used by celery for amqp transport mechanisms. For people that are not a option visit blog one where we extend the functionality of celery
Gotchas of celery
- celery events gossip
- celery canvas large payloads
- The code
- Metadata celery uses
- Unintutive defaults
But I want resilience of celery
dead letter queues !!!
But but but where are my retries and delayed tasks?
History
- Started of as a simple script to implement celery via gevent here https://gist.github.com/thulasi-ram/6f0a867e8303047a76cb0d2b56af8de4
- Then evolved to a gist here with basic structure: https://gist.github.com/thulasi-ram/3c166f587a3df69af8aaccb4f2449db5
- Then to a full blown library packaged in github here: https://github.com/thulasi-ram/kombu-rabbitmq
One can down download the code as zip and include in their lib. Perhaps someday will make a pip package out of it. Sneak peek of examples for django and flask as below.
Django Example
import os
import sys
import django
from rabbitmq.django import Rabbitmq, handle_db_conn_close
from rabbitmq.utils import maybe_jsonify
rmq = Rabbitmq()
my_exchange = rmq.E('my-exchange', 'topic')
my_queue = rmq.Q(
queue_name='my-queue',
routing_key='#',
exchange=my_exchange,
)
@maybe_jsonify
@handle_db_conn_close
def process_message(body, message):
print(body, message)
def main():
rmq.publish(my_queue, {"hi": "there"})
rmq.consume(
callback=process_message,
queue=my_queue,
enable_retries=False
)
if __name__ == '__main__':
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.config.settings')
django.setup()
sys.exit(main() or 0)
Flask Example
import os
import sys
from rabbitmq.flask import Rabbitmq
from rabbitmq.utils import maybe_jsonify
rmq = Rabbitmq()
my_exchange = rmq.E('my-exchange', 'topic')
my_queue = rmq.Q(
queue_name='my-queue',
routing_key='#',
exchange=my_exchange,
)
@maybe_jsonify
def process_message(body, message):
print(body, message)
def main():
rmq.publish(my_queue, {"hi": "there"})
rmq.consume(
callback=process_message,
queue=my_queue,
enable_retries=False
)
if __name__ == '__main__':
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.config.settings')
from flask import current_app
rmq.init_app(current_app)
sys.exit(main() or 0)