Embracing celery - Part 2 - Extinguishing

July 19, 20202 min read#python, #celery

This post works upon internal library kombu used by celery for amqp transport mechanisms. For people that are not a option visit blog one where we extend the functionality of celery

  1. Gotchas of celery

    1. celery events gossip
    2. celery canvas large payloads
    3. The code
    4. Metadata celery uses
    5. Unintutive defaults
  2. But I want resilience of celery

  3. dead letter queues !!!

  4. But but but where are my retries and delayed tasks?

History

One can down download the code as zip and include in their lib. Perhaps someday will make a pip package out of it. Sneak peek of examples for django and flask as below.

Django Example

import os
import sys

import django

from rabbitmq.django import Rabbitmq, handle_db_conn_close
from rabbitmq.utils import maybe_jsonify

rmq = Rabbitmq()

my_exchange = rmq.E('my-exchange', 'topic')

my_queue = rmq.Q(
    queue_name='my-queue',
    routing_key='#',
    exchange=my_exchange,
)


@maybe_jsonify
@handle_db_conn_close
def process_message(body, message):
    print(body, message)


def main():
    rmq.publish(my_queue, {"hi": "there"})

    rmq.consume(
        callback=process_message,
        queue=my_queue,
        enable_retries=False
    )


if __name__ == '__main__':
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.config.settings')
    django.setup()
    sys.exit(main() or 0)

Flask Example

import os
import sys


from rabbitmq.flask import Rabbitmq
from rabbitmq.utils import maybe_jsonify

rmq = Rabbitmq()

my_exchange = rmq.E('my-exchange', 'topic')

my_queue = rmq.Q(
    queue_name='my-queue',
    routing_key='#',
    exchange=my_exchange,
)


@maybe_jsonify
def process_message(body, message):
    print(body, message)


def main():
    rmq.publish(my_queue, {"hi": "there"})

    rmq.consume(
        callback=process_message,
        queue=my_queue,
        enable_retries=False
    )


if __name__ == '__main__':
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.config.settings')
    from flask import current_app
    rmq.init_app(current_app)
    sys.exit(main() or 0)

RELATED POSTS

  • Underrated python libraries
    next
  • Embracing celery - Part 1 - Extending
    prev

[similar articles]

COMMENTS

LinkedIn iconTwitter icon

© Copyright 2022, Ahiravan