cyberbezpieczeństwo i programowanie

Handling Celery events (EN)

H

The code used in this article was written for Celery 5.0.5 but it should work with older versions as well (4.1.1 and higher). It’s available on GitHub.

According to the Celery official documentation, a worker is able to generate events if something relevant happens. Those events can be consumed by monitoring tools like Flower, so a user can see what’s going on with tasks and workers in real-time.

If you have ever wondered how it works underneath, you may want to read this article. Of course, we’ll not cover all features that mature monitoring tools (like Flower) offer, but we’ll take a close look on task-related events. The list of all task-related events that are sent by the workers, looks as follows:

  • task-sent
  • task-received
  • task-started
  • task-succeeded
  • task-failed
  • task-rejected
  • task-revoked
  • task-retired

Apart from the mentioned task-related events, there are worker-related events as well (not covered in this article):

  • worker-online​
  • worker-heartbeat
  • worker-offline

Before we move on to the essence, let’s create a simple Celery application that will be used for testing.

Sample Celery application

import requests
from urllib.parse import urlparse
from celery import Celery
from celery.exceptions import Reject
from bs4 import BeautifulSoup

# Create the Celery application with tasks defined in the 
# example_tasks/tasks.py (this) file.
# The application uses RabbitMQ as the broker and for results' storage.
app = Celery(
    'example_tasks.tasks', 
    broker='pyamqp://guest@localhost', 
    backend='rpc://'
)

# Enable sending the "task-sent" notification.
app.conf.task_send_sent_event=True

@app.task(
    autoretry_for=(requests.HTTPError,), 
    retry_kwargs={ 'max_retries': 3 },
    retry_backoff=True,
    acks_late=True #required if we want to task-rejected event to be sent
)
def visit(url, url_params=[]):
    """
    Visits the given URL address by sending the HTTP GET request.
    It also detects all occurrences of <a> and <img> tags on the 
    given page and store all extracted data in a text file.
    Args:
        url: Full URL address to be visisted.
        url_params: Additional parameters for the HTTP query string.
                    E.g. [('q', 'search phrase')]
    """
    try:
        response = requests.get(url, params=url_params)
        response.raise_for_status()
    except requests.exceptions.ConnectionError as ex:
        raise Reject(ex, requeue=False)
    
    print('Response from {0}: {1}.'.format(url, response.status_code))
    
    data = { 'url': url }
    
    # Chain the child tasks execution. First, it calls the 
    # 'extract_data_from_html' task and then it passes results to the
    # 'store_data' task.
    (
        extract_data_from_html.s(response.text, data) 
        | store_data.s()
    ).delay()

@app.task
def extract_data_from_html(html, data):
    """
    Uses the Beautiful Soup library to extract all links (href) and 
    images (src) on a given page and stores the results in the 
    dictionary (dict). The dictionary with collected data is returned
    from the task.
    Args:
        html: String with HTML that will be parsed.
        data: dict object that will be complemented by extracted data.
    Returns:
        dict: Dictionary containing extracted information.
    """
    soup = BeautifulSoup(html, 'html.parser')
    data['links'] = [] 
    data['images'] = []
    
    for link in soup.find_all('a'):
        data['links'].append(link.get('href'))
    
    for img in soup.find_all('img'):
        data['images'].append(img.get('src'))

    print('Links and images extracted.')

    return data

@app.task
def store_data(data):
    """
    Saves received data (dict) in a text file.
    Args:
        data: dict object containing data fetched from visited URL.
              Expected format: {'url':'', 'links':[], 'images':[]}.
    """
    url = urlparse(data['url'])
    filename = url.netloc.replace(':', '') + '.txt'

    with open(filename, 'w') as f:
        f.write('=== URL: {}n'.format(data['url']))
        f.write('=== LINKS:n')

        for link in data['links']:
            f.write(str(link) + 'n')

        f.write('=== IMAGES:n')

        for img in data['images']:
            f.write(str(img) + 'n')

    print('Fetched data saved to file {0}'.format(filename))

The application is simple and the code is rather self-explanatory. Nevertheless, let’s walk through it together.

First, we create a simple Celery application that uses RabbitMQ, with default credentials, as a message broker and as a result backend. You might have noticed that for storing the results the RPC protocol (rpc://) is used. The discrepancy between RPC and AMQP, in the backend context, is nicely explained in the topic on Stack Overflow: Celery rpc vs amqp result backend.

Next, we enable the “task-sent” events:

app.conf.task_send_sent_event=True

Without this setting, we wouldn’t be able to handle the task-sent events, generated the moment a task message is published.

Now, let’s go to the visit task’s definition. This method may be considered a core element of the whole application. It’s responsibility is to visit a given URL (in other words: to send a HTTP GET request using the requests library), to parse fetched HTML document and to store extracted information in a text file. As you may have already noticed it accepts two parameters (one positional argument url and one keyword argument query_params). The url parameter should contain a full URL address. The second argument is optional and it should contain a HTTP query’s parameters, in a form of list of tuples, accepted by the requests.get method (params argument). For example, if we’d like to visit the address: https://en.wikipedia.org/w/index.php?search=celery, we should call the visit task as follows:

visit.delay('https://en.wikipedia.org/w/index.php', url_params=[('search', 'celery')])

I believe you already noticed configuration passed to the @app.task decorator. In short, the configuration causes maximum 3 retries of the task but only if the requests.HTTPError exception is raised. In case of any other exception, the task fails. The “retry_backoff” parameter indicates that exponential backoff algorithm is used to calculate the retry intervals. For more information about retrying tasks, please visit the Celery documentation.

And the last thing to mention is calling nested tasks: extract_data_from_html and store_data:

(extract_data_from_html.s(response.text, data) | store_data.s()).delay()

Please note, we used the linking mechanism instead of synchronous invocation. That’s because calling internal tasks synchronously is strongly discouraged, as it may lead to exhaustion of the execution pool and to deadlocks in the end.

Let’s quickly analize two internal tasks. The first one (extract_data_from_html) is responsible for extracting links (the href attribute of <a> tag) and images (the src attribute of <img> tag) from the HTML document passed to the task as an argument. The task uses the Beautiful Soup library for parsing HTML. The second task (store_data) accepts a dict object with the collected information and stores them in a text file. That’s it!

Finally, it’s time to run our application. If you use any virtual environment (e.g. virtualenv) it’s the time to activate it. Assuming that the code is in the ./example_tasks/tasks.py file, let’s start the Celery worker:

celery -A example_tasks.tasks worker --loglevel=INFO -E

The command is pretty straightforward. The crutial part is the -E (or --task-events or --events) setting. It’s responsible for sending task-related events that can be captured by monitoring applications. In other words, without this setting we wouldn’t be able to capture and handle any task-related event.

Now, let’s start the python console and run the visit task:

>>> from example_tasks.tasks import visit
>>> visit.delay('https://en.wikipedia.org/w/index.php', url_params=[('search', 'celery')])

The Celery worker’s output should be similar to the output presented below:

[2020-04-24 19:54:31,154: INFO/MainProcess] Received task: example_tasks.tasks.visit[c283f0ea-7670-45e4-91fd-408acb5c385f]  
[2020-04-24 19:54:31,460: WARNING/ForkPoolWorker-2] Response from https://en.wikipedia.org/w/index.php: 200.
[2020-04-24 19:54:31,478: INFO/MainProcess] Received task: example_tasks.tasks.extract_data_from_html[b421f9e7-8f26-406e-97bf-a8135cc16c5e]  
[2020-04-24 19:54:31,479: INFO/ForkPoolWorker-2] Task example_tasks.tasks.visit[c283f0ea-7670-45e4-91fd-408acb5c385f] succeeded in 0.3243288990001929s: None
[2020-04-24 19:54:31,625: WARNING/ForkPoolWorker-2] Links and images extracted.
[2020-04-24 19:54:31,629: INFO/ForkPoolWorker-2] Task example_tasks.tasks.extract_data_from_html[b421f9e7-8f26-406e-97bf-a8135cc16c5e] succeeded in 0.14442902700011473s: {'url': 'https://en.wikipedia.org/w/index.php', 'links': [None, '#mw-head', '#p-search', '/wiki/Celery_(software)', '/wiki/File:Illustration_Apium_graveolens0.jpg', '/wiki/Conservation_status', '/wiki/Least_Concern', '/wiki/IUCN_Red_List', '#cite_note-iucn-1', '/wiki/Taxonomy_(biology)', '/wiki/Template:Taxonomy/Apium', '/wiki/Plant', '/wiki/Vascular_plant', '/wiki/Flowering_plant', '/wiki/Eudicots', '/wiki/Asterids', '/wiki/Apiales', '/wiki/Apiaceae', '/wiki/Apium', '/wiki/Binomial_nomenclature', '/wiki/Synonym_(taxonomy)', '#cite_note-GRIN-2', '/wiki/Apiaceae', '/wiki/Vegetable', '/wiki/Hypocotyl', '/wiki/Herbal_medicine', '#Description', '#Etymology', '#Taxonomy', '#Cultivation', '#North_America', '#Europe', '#Asia', '#Wild', '#Harvesting_and_storage', '#Sulfites', '#Uses', '#Leaves', '#Seeds', '#Celery_salt', '#Herbalism', '#Celery_juice_trend', '#Nutrition', '#Allergies', '#Chemistry', '#History', '#Cultural_depictions', '#See_also', '#References', '#Further_reading', '#External_links', '/w/index.php?titl...', ...]}
[2020-04-24 19:54:31,629: INFO/MainProcess] Received task: example_tasks.tasks.store_data[4baa4ccf-dd8f-43d6-b07d-43b62c0de201]  
[2020-04-24 19:54:31,634: WARNING/ForkPoolWorker-2] Fetched data saved to file en.wikipedia.org.txt
[2020-04-24 19:54:31,634: INFO/ForkPoolWorker-2] Task example_tasks.tasks.store_data[4baa4ccf-dd8f-43d6-b07d-43b62c0de201] succeeded in 0.002919067999755498s: None

Additionaly, the text file with extracted data should appear in the current directory: ./en.wikipedia.org.txt.

listening for events

We know that our application works. What we saw in the Celery worker’s output is actually a log of events, apart from the task’s output, that occurred while handling a task (e.g. “task-received” and “task-succeeded”). Now, what if we would like to react to those events in a specific way? Fortunately, we can do it by writing a valid events consumer.

import argparse
from celery import Celery
from datetime import datetime as dt

class Logger:
    def log_task_status_change(self, task, event):
        print('[{}] {} {} (STATE={}, UUID={})'.format(
            self._to_datetime(task.timestamp),
            event['type'].upper(),
            task.name,
            task.state.upper(),
            task.uuid
        ))

    def log_event_details(self, event):
        print('EVENT DETAILS: {}'.format(event))

    def log_task_details(self, task):
        print('TASK DETAILS:')
        print('UUID: {}'.format(task.uuid))
        print('Name: {}'.format(task.name))
        print('State: {}'.format(task.state))
        print('Received: {}'.format(self._to_datetime(task.received)))
        print('Sent: {}'.format(self._to_datetime(task.sent)))
        print('Started: {}'.format(self._to_datetime(task.started)))
        print('Rejected: {}'.format(self._to_datetime(task.rejected)))
        print('Succeeded: {}'.format(self._to_datetime(task.succeeded)))
        print('Failed: {}'.format(self._to_datetime(task.failed)))
        print('Retried: {}'.format(self._to_datetime(task.retried)))
        print('Revoked: {}'.format(self._to_datetime(task.revoked)))
        print('args (arguments): {}'.format(task.args))
        print('kwargs (keyword arguments): {}'.format(task.kwargs))
        print('ETA (Estimated Time of Arrival): {}'.format(task.eta))
        print('Expires: {}'.format(task.expires))
        print('Retries: {}'.format(task.retries))
        print('Worker: {}'.format(task.worker))
        print('Result: {}'.format(task.result))
        print('Exception: {}'.format(task.exception))
        print('Timestamp: {}'.format(self._to_datetime(task.timestamp)))
        print('Runtime: {}'.format(task.runtime))
        print('Traceback: {}'.format(task.traceback))
        print('Exchange: {}'.format(task.exchange))
        print('Routing Key: {}'.format(task.routing_key))
        print('Clock: {}'.format(task.clock))
        print('Client: {}'.format(task.client))
        print('Root: {}'.format(task.root))
        print('Root ID: {}'.format(task.root_id))
        print('Parent: {}'.format(task.parent))
        print('Parent ID: {}'.format(task.parent_id))
        print('Children:')

        for child in task.children:
            print('t{}n'.format(str(child)))

    def _to_datetime(self, timestamp):
        return dt.fromtimestamp(timestamp) if timestamp is not None else None 

class CeleryEventsHandler:
    def __init__(self, celery_app, verbose_logging=False):
        self._app = celery_app
        self._state = celery_app.events.State()
        self._logger = Logger()
        self._verbose_logging = verbose_logging

    def _event_handler(handler):
        def wrapper(self, event):
            self._state.event(event)
            task = self._state.tasks.get(event['uuid'])
            self._logger.log_task_status_change(task, event)
            if(self._verbose_logging):
                self._logger.log_event_details(event)
                self._logger.log_task_details(task)
            handler(self, event)
        return wrapper

    @_event_handler
    def _on_task_sent(self, event):
        pass

    @_event_handler
    def _on_task_received(self, event):
        pass

    @_event_handler
    def _on_task_started(self, event):
        pass

    @_event_handler
    def _on_task_succeeded(self, event):
        pass

    @_event_handler
    def _on_task_failed(self, event):
         pass

    @_event_handler
    def _on_task_rejected(self, event):
         pass

    @_event_handler
    def _on_task_revoked(self, event):
         pass

    @_event_handler
    def _on_task_retried(self, event):
        pass

    def start_listening(self):
        with self._app.connection() as connection:
            recv = self._app.events.Receiver(connection, handlers={
                'task-sent': self._on_task_sent,
                'task-received': self._on_task_received,
                'task-started': self._on_task_started,
                'task-succeeded': self._on_task_succeeded,
                'task-failed': self._on_task_failed,
                'task-rejected': self._on_task_rejected,
                'task-revoked': self._on_task_revoked,
                'task-retried': self._on_task_retried
            })
            recv.capture(limit=None, timeout=10)

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description='Dummy monitor of task-related events, generated by a Celery worker.'
    )
    parser.add_argument(
        '--verbose', 
        action='store_true', 
        help='Print detailed information about an event and a related task.'
    )
    args = parser.parse_args()

    # Use RabbitMQ with default credentials as a broker.
    app = Celery(broker='pyamqp://guest@localhost')
    events_handler = CeleryEventsHandler(app, args.verbose)
    events_handler.start_listening()

According to the official documentation, in order to process events in real-time, the following elements are needed:

  • An event consumer (celery.events.receiver.EventReceiver).
  • A set of handlers called when events come in.
  • Optionally: app.Events.State.

All three elements mentioned above have been wrapped in the CeleryEventsHandler class. Let’s start with the mentioned event consumer which is an instance of the EventReceiver class. It accepts multiple arguments in the constructor, but for events monitoring purpose, only two are important:

  • connection – represents connection to the broker (RabbitMQ in this case). It’s instance of kombu’s Connection class. Reminder: Kombu is a messaging library for Python and Celery uses it extensively.
  • handlers – A dict object containing map of event type names and their handlers.
def start_listening(self):
    with self._app.connection() as connection:
        recv = self._app.events.Receiver(connection, handlers={
            'task-sent': self._on_task_sent,
            'task-received': self._on_task_received,
            'task-started': self._on_task_started,
            'task-succeeded': self._on_task_succeeded,
            'task-failed': self._on_task_failed,
            'task-rejected': self._on_task_rejected,
            'task-revoked': self._on_task_revoked,
            'task-retried': self._on_task_retried
        })
        recv.capture()

The code’s snippet presented above is responsible for the EventReceiver object’s creation. First, a connection to the broker is established and the connection object is passed to the EventReceiver object along with the handlers map. You probably noticed that we don’t call EventReceiver‘s constructor explicitly, but instead we call the Receiver() method defined on the celery.app.events.Events class. If you take a look at Celery’s source code you’ll notice that the mentioned method not only returns the expected object but also wraps it in the cached property.

The handlers map contains all possible task-related events type definitions and their handlers. Of course you don’t have to define handlers for all event types. For example, if you’d like to catch the task-received events only, your map should contain only one mapping:

handlers = {
    'task-received': self._on_task_received
}

Other events will be simply ignored. On the other hand, if you wish to have one handler for all event types (including worker-related events), just use the asterisk symbol as the key and define a universal handler:

handlers = {
    '*': self._one_method_to_handle_them_all
}

Last but not least, the capture() method, called on the EventReceiver object, opens up a consumer responsible for capturing events. When you look at the source code, you’ll see that the method calls another one method: kombu.mixins.ConsumerMixin.consume, passing all its arguments to that method (the EventReceiver class inhertis from ConsumerMixin class). This in turn causes the consumer to wait from a single event from the server (see kombu.Connection.drain_events and Consumer for more information). Regarding the parameters that the capture() method accepts:

  • limit – to understand the meaning of this parameter, we should know that the aforementioned kombu.Connection.drain_events method is a blocking method that polls for new events for a given amount of time (1 second by default). This method is called in the for loop, within the kombu.mixins.ConsumerMixin.consume method, and the limit parameter determines the maximum number of iterations of that loop. If it’s set to None (default) the loop is infinite until it’s explicitly stopped.
  • timeout – determines how long the consumer should wait until the connection is established. The default value is None.
  • wakeup – to be honest, I don’t know how and when this parameter is used. It’s set to True by default and it’s passed as a keyword argument to the ConsumerMixin.consume() method.

Now, when the consumer is working, we can take a look at the registered handlers. As you noticed, we defined 8 different handler methods, within the CeleryEventsHandler class. These methods do nothing in this case, because each and every one of them is wrapped in the @_event_handler decorator that does the job:

def _event_handler(handler):
    def wrapper(self, event):
        self._print_event(event)
        self._state.event(event)
        task = self._state.tasks.get(event['uuid'])
        self._print_task(task)
        handler(self, event)
        return wrapper

It prints information about the cached event (the event object is simply a dictionary with only two fields required: type and timestamp); updates the task’s state by feeding it with the event data (more about the state a little bit later) and finally prints the task’s state.

a task’s state

As it was mentioned before, we can use app.events.Sate object for real-time events processing, but what does it mean, exactly? The official documentation says:

app.events.State is a convenient in-memory representation of tasks and workers in the cluster that’s updated as events come in. (…) It encapsulates solutions for many common things, like checking if a worker is still alive (by verifying heartbeats), merging event fields together as events come in, making sure time-stamps are in sync, and so on.

Source: Monitoring and Management Guide – Real Time Processing

This module implements a data-structure used to keep track of the state of a cluster of workers and the tasks it is working on (by consuming events). (…) For every event consumed the state is updated, so the state represents the state of the cluster at the time of the last event.

Source: celery.events.state

Long story short, each time the self._state.event(event) method is called, the in-memory task object’s state is updated and we can examine the current state of the task. Should you be interested in technical details, please review the Task.events() method’s source code. You’ll see, among other things, that the Task object is created the first time a related event is received and it’s updated each time the state object is fed with a new event.

OK, but what information can we get from a task’s state? Well, according to the _fields property of the celery.events.state.Task object, we have an access to the following data:

  • uuid – An unique identifier of the task (UUID = Universally Unique Identifier).
  • name – The name of the task, of course.
  • state – A current state of the task (possible values: PENDING, STARTED, RETRY, FAILURE, SUCCESS).
  • received, sent, started, rejected, succeeded, failed, retried, revoked – These are timestamps indicating when exactly a specific event occurred.
  • args, kwargs – Arguments and keyword arguments passed to the task, respectively. For example, for the task def visit(uri, uri_params=[]), the values will be: args=uri and kwargs="uri_params=[]".
  • eta Estimated time of arrival. In order to set the ETA value, the apply_async method must be used.
  • expires – Determines when a task expires (we can set the expiration time via the “expires” keyword argument passed to the aply_async method). When a worker receives an expired task, it marks the task as REVOKED.
  • retries – Shows how many times a task has been retried already (e.g. due to exception). Default time is 180 seconds.
  • worker – Represents a worker object responsible for handling the task.
  • exception – An exception that was raised by the task . Please note that if retries are enabled, the “exception” field has value assigned only in case of the TASK-FAILED event.
  • result – A result of the task. The property has value assigned only in case of the TASK-SUCCEEDED event, i.e. when the task has been successfully executed.
  • timestamp – A moment the task was lately updated. For example, if the task was fed with the TASK-SUCCEEDED event, the “timestamp” field’s value will be the same as the event’s timestamp value and the task’s “succeeded” value (mentioned above, in this table).
  • runtime – Time it took to execute the task using the pool (starting when the task is sent to a worker pool, and ending when the pool’s result handler callback is called).
  • traceback – Contains Python call statck, in case of an error.
  • exchange, routing_key – Properties related to AMQP’s routing capabilities. See advanced tasks calling for more details.
  • clock A Lamport clock (counter) received from the latest event.
  • client – Shows a hostname but only when a client’s event is received (TASK-SENT), so the task_send_sent_event setting must be enabled if we want to get this information.
  • root_id – A unique id of the first task in the workflow (one task is called from another), the task is part of. The “root” property indicates the related Task object.
  • parent_id – A unique id of a task that called this task. The “parent” property indicates the related Task object.
  • children – A WeakSet object containing the children tasks. The set is filled only when the task succeeded.

testing

We are ready to start testing different scenarios and check what events are generated. First, run the events monitoring application (if you work with a virtual environment, like “virtualenv”, it’s time to activate it):

python example_tasks/events_monitor_py

If we run the the script with the “–verbose” parameter, detailed information about tasks and handled events will be displayed. However, let’s stick to the compact version for the readability. Now, start the sample Celery application:

celery -A example_tasks.tasks worker --loglevel=INFO -E

Finally, enter the Python console and import the “visit” task:

>>> from example_tasks.tasks import visit

happy path

Let’s start testing with the easiest “Happy Path” scenario. Visit the “https//en.wikipedia.org/w/index.php?search=celery” address with the “visit” task defined in our sample Celery application. As it was mentioned earlier, the task visits a given URL, extracts links and images from retrieved HTML page and stores them in an output text file.

>>> visit.delay('https://en.wikipedia.org/w/index.php', url_params=[('search', 'celery')])

The events monitoring application’s output should be similar to:

[2020-11-03 19:22:40.190246] TASK-RECEIVED example_tasks.tasks.extract_data_from_html (STATE=RECEIVED, UUID=8d7bd838-ab79-48ec-bc80-231f1e482f8d)
[2020-11-03 19:22:40.186597] TASK-SENT example_tasks.tasks.extract_data_from_html (STATE=RECEIVED, UUID=8d7bd838-ab79-48ec-bc80-231f1e482f8d)
[2020-11-03 19:22:40.208708] TASK-STARTED example_tasks.tasks.extract_data_from_html (STATE=STARTED, UUID=8d7bd838-ab79-48ec-bc80-231f1e482f8d)
[2020-11-03 19:22:40.208838] TASK-SUCCEEDED None (STATE=SUCCESS, UUID=68cc7035-2859-43e8-a294-1922e7dd130d)
[2020-11-03 19:22:40.382471] TASK-SENT example_tasks.tasks.store_data (STATE=PENDING, UUID=095f6331-be69-45aa-94ac-0acd65a72751)
[2020-11-03 19:22:40.384208] TASK-RECEIVED example_tasks.tasks.store_data (STATE=RECEIVED, UUID=095f6331-be69-45aa-94ac-0acd65a72751)
[2020-11-03 19:22:40.386922] TASK-STARTED example_tasks.tasks.store_data (STATE=STARTED, UUID=095f6331-be69-45aa-94ac-0acd65a72751)
[2020-11-03 19:22:40.389827] TASK-SUCCEEDED example_tasks.tasks.store_data (STATE=SUCCESS, UUID=095f6331-be69-45aa-94ac-0acd65a72751)
[2020-11-03 19:22:40.389924] TASK-SUCCEEDED example_tasks.tasks.extract_data_from_html (STATE=SUCCESS, UUID=8d7bd838-ab79-48ec-bc80-231f1e482f8d)

Please note the nested tasks execution – each task generates its own set of events. You may also have noticed that events displayed in the output are not in order (e.g. the TASK-RECEIVED event is displayed before the TASK-SENT event). That’s because the monitoring application is very simple and it basically doesn’t sort events before displaying them – it prints an event as soon as it receives one.

retry and fail

Now, let’s try to generate the TASK-RETRIED and TASK-FAILED events. The sample application is configured to retry the “visit” task 3 times on any “requests.HTTPError” exception. We can enforce the mentioned exception by visiting a resource that doesn’t exist. It will raise the HTTPError exception with the “404 Not Found” status code:

>>> visit.delay('https://en.wikipedia.org/not-existing')

The output:

[2020-11-04 20:14:13.159171] TASK-SENT example_tasks.tasks.visit (STATE=PENDING, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:13.160125] TASK-RECEIVED example_tasks.tasks.visit (STATE=RECEIVED, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:13.163311] TASK-STARTED example_tasks.tasks.visit (STATE=STARTED, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:13.263379] TASK-SENT example_tasks.tasks.visit (STATE=STARTED, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:13.264929] TASK-RETRIED example_tasks.tasks.visit (STATE=RETRY, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:13.266419] TASK-RECEIVED example_tasks.tasks.visit (STATE=RECEIVED, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:13.267890] TASK-STARTED example_tasks.tasks.visit (STATE=STARTED, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:13.367687] TASK-RETRIED example_tasks.tasks.visit (STATE=RETRY, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:13.366267] TASK-SENT example_tasks.tasks.visit (STATE=PENDING, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:13.369117] TASK-RECEIVED example_tasks.tasks.visit (STATE=RECEIVED, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:17.274704] TASK-STARTED example_tasks.tasks.visit (STATE=STARTED, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:17.384223] TASK-SENT example_tasks.tasks.visit (STATE=STARTED, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:17.384223] TASK-RECEIVED example_tasks.tasks.visit (STATE=STARTED, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:17.386771] TASK-RETRIED example_tasks.tasks.visit (STATE=RETRY, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:22.341228] TASK-STARTED example_tasks.tasks.visit (STATE=STARTED, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)
[2020-11-04 20:14:22.441907] TASK-FAILED example_tasks.tasks.visit (STATE=FAILURE, UUID=80f4eef7-5bda-40c3-b262-f97df6a568cc)

reject

A task can be rejected by raising the “celery.exceptions.Reject” exception which is equivalent to calling the AMQP 0-9-1 protocol’s method “basic.reject” that allows a client to reject a delivered message, instructing broker to either discard the message or requeue it. In case of a RabbitMQ broker, a message’s rejection might result in sending the message to Dead Letter Exchanges (normal exchanges that can be any of the usual types and are declared as usual).

Additionally, a task meant to be rejected, must be configured to be acknowledged late (in the @app.task decorator’s argument), so the message is acknowledged after the task is executed and not before (as it is by default). Without the late acknowledgement, the “Reject” exception will have no effect.

The sample application rejects the “visit” task in case of the “requests.exceptions.ConnectionError” exception, so for example, the task will be rejected if it tries to visit a non-existing web address:

>>> visit.delay('https://not-existing')

The output should be similar to:

[2020-11-17 15:28:01.144513] TASK-SENT example_tasks.tasks.visit (STATE=PENDING, UUID=5577c21d-bd1a-46a1-8336-8fd4de4ee326)
[2020-11-17 15:28:01.146605] TASK-RECEIVED example_tasks.tasks.visit (STATE=RECEIVED, UUID=5577c21d-bd1a-46a1-8336-8fd4de4ee326)
[2020-11-17 15:28:01.148861] TASK-STARTED example_tasks.tasks.visit (STATE=STARTED, UUID=5577c21d-bd1a-46a1-8336-8fd4de4ee326)
[2020-11-17 15:28:01.172036] TASK-REJECTED example_tasks.tasks.visit (STATE=STARTED, UUID=5577c21d-bd1a-46a1-8336-8fd4de4ee326)

revoke an expired task

The last event type that we are going to generate is TASK-REVOKED. It’s published when a task expires. The expiration time of a task can be defined when calling the task execution with the “apply_async” method. We just neet to set the “expires” parameter to a desirable number of seconds.

Let’s try to repeat the earlier “Retry and fail” scenario, but this time with the short expiration time. In other words, we need to set short amount of time, so the task is revoked before it fails (1 second, for example):

>>> visit.apply_async(('https://en.wikipedia.org/not-existing',), expires=1)

As we can see in the output below, the task had been retried twice before it was revoked (if we had set the expiration time for more than one second, the task would have failed probably, before the revocation)

[2020-11-17 15:36:53.581788] TASK-SENT example_tasks.tasks.visit (STATE=PENDING, UUID=f7bf1603-bbf6-4125-b746-0a2c0ab162d3)
[2020-11-17 15:36:53.583529] TASK-RECEIVED example_tasks.tasks.visit (STATE=RECEIVED, UUID=f7bf1603-bbf6-4125-b746-0a2c0ab162d3)
[2020-11-17 15:36:53.584229] TASK-STARTED example_tasks.tasks.visit (STATE=STARTED, UUID=f7bf1603-bbf6-4125-b746-0a2c0ab162d3)
[2020-11-17 15:36:53.838836] TASK-SENT example_tasks.tasks.visit (STATE=STARTED, UUID=f7bf1603-bbf6-4125-b746-0a2c0ab162d3)
[2020-11-17 15:36:53.838836] TASK-RECEIVED example_tasks.tasks.visit (STATE=STARTED, UUID=f7bf1603-bbf6-4125-b746-0a2c0ab162d3)
[2020-11-17 15:36:53.847037] TASK-STARTED example_tasks.tasks.visit (STATE=STARTED, UUID=f7bf1603-bbf6-4125-b746-0a2c0ab162d3)
[2020-11-17 15:36:53.847480] TASK-RETRIED example_tasks.tasks.visit (STATE=RETRY, UUID=f7bf1603-bbf6-4125-b746-0a2c0ab162d3)
[2020-11-17 15:36:53.958531] TASK-RECEIVED example_tasks.tasks.visit (STATE=RECEIVED, UUID=f7bf1603-bbf6-4125-b746-0a2c0ab162d3)
[2020-11-17 15:36:53.957614] TASK-SENT example_tasks.tasks.visit (STATE=RECEIVED, UUID=f7bf1603-bbf6-4125-b746-0a2c0ab162d3)
[2020-11-17 15:36:53.962367] TASK-RETRIED example_tasks.tasks.visit (STATE=RETRY, UUID=f7bf1603-bbf6-4125-b746-0a2c0ab162d3)
[2020-11-17 15:36:54.948875] TASK-REVOKED example_tasks.tasks.visit (STATE=REVOKED, UUID=f7bf1603-bbf6-4125-b746-0a2c0ab162d3)

O autorze

Łukasz Mieczkowski

Programista, który zainteresował się cyberbezpieczeństwem.

3 komentarze

Skomentuj Łukasz Mieczkowski Anuluj odpowiedź

    • Hi, Luis! Thank you for the comment!

      I don’t think it’s possible out-of-the-box. However, Celery is an open source project, so it’s possible to change the codebase if you want to. You’d have to define your own event and then trigger it somewhere in the code.

  • Nice article. We can create custom events by useing ‘app.send_event(‘task-updated’, uuid=task_id, status=state)’. In this fuction, we can define custom msg type: ‘task-updated’, then we add this to Receiver handler.

cyberbezpieczeństwo i programowanie

Łukasz Mieczkowski

Programista, który zainteresował się cyberbezpieczeństwem.

Kontakt

Zapraszam do kontaktu za pośrednictwem mediów społecznościowych.