Embracing celery - Part 1 - Extending

June 20, 20207 min read#python, #celery

Celery is touted as the async task processor in python. Its resilient, its fast and moreover has large set of utilities and deals with multiple messaging systems. But there's not one but two problems in it for me.

  1. The learning curve is steep for celery not just the concepts but even the code that resides and worse its configured with unintutive defaults.
  2. It lives upto its name of distributed task processing but its too much of a cake if you just want a simple messaging consumer.

But sometimes we are stuck. We cannot get rid of it since there are so many things that are managed by it. But what if the producer is not celery? You cannot consume such messages with celery.

In this post we will cover for use cases where we want to retain celery for other tasks but still be able to consume from producers that are not celery. As a bonus we will also setup a dead-letter-queue for our queue so that messages are not discarded. For use cases where getting rid of celery can be considered take look into this flask extension like implementation in part-2 of this post here.

NOTE: The below example has race conditions. Please copy-paste at your own caution.

1. Custom consumer

Although celery makes it needlessly difficult they luckily provide us with a barebones implementation of a consumer. custom-message-consumers - celery docs the part we are most concerned is

my_queue = Queue('custom', Exchange('custom'), 'routing_key')

app = Celery(broker='amqp://')


class MyConsumerStep(bootsteps.ConsumerStep):

    def get_consumers(self, channel):
        return [Consumer(channel,
                         queues=[my_queue],
                         callbacks=[self.handle_message],
                         accept=['json'])]

    def handle_message(self, body, message):
        print('Received message: {0!r}'.format(body))
        message.ack()
app.steps['consumer'].add(MyConsumerStep)

This can pickup a message thats not published by celery. But what about decode error? What about my retries?

Being Ambitious

My first approach was to convert this message to a celery task such that the user is not aware by replicating some of celery's magic (this was discared see here. So inside the MyConsumerStep I have added the following

def __init__(*args, **kwargs):
	self.parent = None

def start(self, parent):
    self.parent = parent
    logger.info(f'Consuming from {queue_obj.name} key={queue_obj.routing_key}, exchange={queue_obj.exchange}')
    return super().start(parent)

This parent here is the celery.worker.Consumer class. This class stores all the tasks that are in an app in a map called strategies. Strategies in celery knows how to run a task and handle its failures.

Lets say I have a function called process_message and now I have to call it in handle message and I want this to be as DRY as possible.

My train of thought would be handle process_mesage as a task. I end up with the following handle messgae:

def handle_message(self, body, message):
    task = shared_task(process_message)
    strategy = self.task.start_strategy(self.parent.app, self.parent)
    strategy(
        message, payload,
        promise(call_soon, (message.ack_log_error,)),
        promise(call_soon, (message.reject_log_error,)),
        callbacks,
    )
    .
    .
    .
    following this way you would end having 
    to access process_message.request.body 
    inside `process_message` like you would do for a bound task
    which gets very complicated quickly

Bailing out

We would be using so many internals of celery that makes our code brittle. I bailed out here. I just republish the message here after converting it into a taks.

def handle_message(self, body, message):
    task = shared_task(process_message)
    task.delay(body, message=message)

# now my process_message looks like

def process_message(body, **kwargs):
	pass

This gives us the ability to retain the retry mechanisms celery offers and as well as consuming a custom message. A properly honed out version of the above as celery_consumer.py by making it as reusable as possible but still encapsulating the internals

# celery_consumer.py

import logging

from celery import bootsteps, current_app, Task, shared_task

_registry = {}

logger = logging.getLogger(__name__)

def _custom_consumer_factory(queue, callback, kwargs):
    class BaseCustomConsumer(bootsteps.ConsumerStep):
        requires = (
            'celery.worker.consumer:Connection',
            'celery.worker.consumer.tasks:Tasks',
        )

        def start(self, parent):
            logger.info(f'Consuming from {queue.name} key={queue.routing_key}, exchange={queue.exchange}')
            return super().start(parent)

        def handle_message(self, body, message):
            callback.delay(body)
            message.ack()

        def on_decode_error(self, message, exc):
            message.reject()

        def get_consumers(self, channel):
            options = {
                'accept': ['json']
            }
            consumer_options = kwargs.pop('consumer_options', {})
            options.update(consumer_options)
            return [
                Consumer(
                    channel,
                    queues=[queue],
                    callbacks=[self.handle_message],
                    on_decode_error=self.on_decode_error,
                    **options
                )
            ]

    return type(f'{queue.name}Consumer', (BaseCustomConsumer,), {})


def _make_consumer(func, **kwargs):
    from kombu import Queue

    queue = kwargs.pop('queue', None)

    if not queue:
        raise RuntimeError("queue is a required parameter for consume_from")
    if isinstance(queue, Queue):
        queue_obj = queue
    else:
        _qoptions = {'no_declare': False}
        queue_options = kwargs.pop('queue_options', {})
        _qoptions.update(queue_options)
        queue_obj = Queue(queue, **_qoptions)

    return _custom_consumer_factory(queue_obj, func, kwargs), queue_obj


def consume_from(*args, **kwargs):
    """
    :param args: empty. will resolve to the wrapped function
    :param kwargs: queue - either string or kombu.Queue Instance, and other kwargs of kombu.Queue
    :return: the wrapped function

    Usage
    -----

    a)
    @consume_from(queue='<queue_name>', routing_key='<rk>')
    def process_message(body, message):
        pass


    b)
    exchange = Exchange("example", "topic")
    queue = Queue("example", exchange, routing_key="com.example")

    @consume_from(queue=queue)
    def process_message(body, message):
        pass

    """

    def decorator(**options):
        def __inner(func):
            consumer, queue = _make_consumer(func, **options)

            if queue.name in _registry:
                raise RuntimeError(f"Already registered {_registry[queue.name]} for {queue.name}")
            logger.info(f"Registering: {func} for queue {queue.name} and routing key {queue.routing_key}")
            _registry[queue.name] = func
            current_app.steps['consumer'].add(consumer)

            if not isinstance(func, Task):
                return shared_task(func)
            return func

        return __inner

    if len(args) == 1 and callable(args[0]):
        return decorator(**kwargs)(args[0])
    return decorator(*args, **kwargs)

So now process message looks like

from celery_consumer import consume_from

@consume_from(queue='custom_queue', routing_key='#')
def process_event(body):
    pass

# (or)

@consume_from(queue='custom_queue', routing_key='#')
@shared_task(autoretry_for=(Exception,), retry_backoff=2)
def process_event(body):
	pass

2. Dead letter queues in celery go brrrrrrr

This problem applies to both aspects here. firstly what if a celery task fails even after retries? Its ignored and the state is stored in a result backend. Secondly what happens if the message from the custom queue encounters an decode error? we dont store that at all since its a custom message.

Enter dead letter queues, But there's no provision of deadletter queues in celery you will have to set it up by default.

here is the celery_dlq.py file

# celery_dlq.py

from celery import bootsteps
from kombu import Queue, Exchange


def setup_default_dlq(app, dlq_suffix='dead'):
    queue = Queue(
        app.conf.task_default_queue,
        Exchange(app.conf.task_default_exchange, type='direct'),
        routing_key=app.conf.task_default_routing_key
    )
    setup_dlq(app, queue, dlq_suffix)
    app.conf.task_queues = (queue,)


def setup_dlq(app, queue: Queue, dql_suffix='dead'):
    deadletter_queue_name = f'{queue.name}.{dql_suffix}'
    deadletter_exchange_name = f'{queue.name}.{dql_suffix}'
    deadletter_routing_key = f'{queue.routing_key}.{dql_suffix}'

    if queue.queue_arguments is None:
        queue.queue_arguments = {}
    queue.queue_arguments.update({
        'x-dead-letter-exchange': deadletter_exchange_name,
        'x-dead-letter-routing-key': deadletter_routing_key
    })

    class DeclareDLXnDLQ(bootsteps.StartStopStep):
        """
        Celery Bootstep to declare the DL exchange and queues before the worker starts
            processing tasks
        """
        requires = {'celery.worker.components:Pool'}

        def start(self, worker):
            dlx = Exchange(deadletter_exchange_name, type='direct')

            dead_letter_queue = Queue(
                deadletter_queue_name, dlx, routing_key=deadletter_routing_key)

            with worker.app.pool.acquire() as conn:
                dead_letter_queue.bind(conn).declare()

    app.steps['worker'].add(DeclareDLXnDLQ)

As you can see here we use a start and stop step to declare a dlq. More details can be found here. When the celery app starts we can do something like this

app = Celery('amqp://')
app.autodiscover_tasks()
setup_default_dlq(app)

This creates celery.dead exchange and queue for all the failed messages to live so we can inspect it later. Now use this setup_dlq to our earlier _make_consumer to automatically declare if a flag is present.

def _make_consumer(func, **kwargs):
    from kombu import Queue

    queue = kwargs.pop('queue', None)

    if not queue:
        raise RuntimeError("queue is a required parameter for consume_from")
    should_setup_dlq = bool(kwargs.get('setup_dlq'))  # added newly
    if isinstance(queue, Queue):
        queue_obj = queue
    else:
        _qoptions = {'no_declare': not should_setup_dlq}  # added newly
        queue_options = kwargs.pop('queue_options', {})
        _qoptions.update(queue_options)
        queue_obj = Queue(queue, **_qoptions)

    # added newly
    if should_setup_dlq:
        setup_dlq(current_app, queue_obj)

    return _custom_consumer_factory(queue_obj, func, kwargs), queue_obj

now process event looks like with an extra setup_dlq=True

@consume_from(queue='example', routing_key='com.example', setup_dlq=True)
@shared_task(autoretry_for=(Exception,), retry_backoff=2)
def process_event(body):
    print(f"Received yoyo: {body}")

RELATED POSTS

  • Embracing celery - Part 2 - Extinguishing
    next
  • Microservices are hard
    prev

[similar articles]

COMMENTS

LinkedIn iconTwitter icon

© Copyright 2023, Ahiravan