Celery. The Missing Blog

Taking care of Celery (the distributed Python task queue, not the vegetable)

May 01, 2018

Unit Testing Celery Tasks

Posted by Bjoern Stiel

All source code examples used in this blog post can be found on GitHub:
https://github.com/ZoomerAnalytics/python-celery-unit-testing

While you might get away with not writing unit tests for very simple Rest API endpoints, doing the same for celery tasks is recipe for frustration (and disaster).

Celery tasks are asynchronous by design and therefore a lot harder to get a grip on using a “development driven development” approach.

Test Driven Development (TDD) might not have taken us to the promised land we had hoped for, but when it comes to celery tasks, it most definitely is essential to a sane, effective and efficient development process - and having that peace of mind when releasing your code into production.

A Celery task

Let’s have a look at this simple celery task:

import requests
import os

from datetime import datetime
from worker import app

@app.task(bind=True, name='fetch_data')
def fetch_data(self, url):
    response = requests.get(url)
    path = './data'
    if response.ok:
        if not os.path.exists(path):
            os.makedirs(path)
        slug = datetime.utcnow().strftime('%Y%m%dT%H%M%S%f')
        with open(os.path.join(path, slug), 'w') as f:
            f.write(response.text)
    else:
        raise ValueError('Unexpected response')

This celery tasks executes a GET request against the argument url and saves the response body to the file system. There are several strategies to test this Celery task.

Strategy 1: Wait for the task to finish

Some authors recommend calling the Celery task asynchronously and then making the code wait until the task is ready to fetch the result and evaluate the test assertions.

def test_fetch_data(self):
    task = fetch_data.s(url='...').delay()
    result = task.get()
    self.assertEqual(task.status, 'SUCCESS')
    ...

Pros

  • Tests the Celery stuff
  • Testcase and real world are nearly identical
  • Very close to real environment

Cons

  • Dependency on message broker
  • Requires a celery worker
  • More of an integration than unit test

Strategy 2: Just test the method

The Celery docs suggest Celery tasks should just be tested like any other Python method.

def test_fetch_data(self):
    fetch_data(url='...')
    self.assertEqual(...)
    ...

Pros

  • Very simple
  • No dependency on message broker
  • No celery worker required
  • An isolated unit test

Cons

  • Does not test the Celery stuff
  • Testcase and real world differ

Strategy 3: Call the task synchronously

This strategy combines the best of both worlds. We call the Celery task in nearly the same way we do in real life, but synchronously (no need to wait) and locally (in the same process).

def test_fetch_data(self):
    task = fetch_data.s(url='...').apply()
    self.assertEqual(task.result, 'SUCCESS')
    ...

Pros

  • Very simple
  • No dependency on message broker
  • No celery worker required
  • An isolated unit test
  • Tests the Celery stuff
  • Testcase and real world are very close

Cons

  • Not sure there are any

How to apply this

Invoking your Celery tasks inside your tests with the apply() method executes the task synchronously and locally. This allows you to write tests that look and feel very similar to the ones for your API endpoints.

In our next blog post, we will look at how to test Celery chains.

All source code examples used in this blog post can be found on GitHub:
https://github.com/ZoomerAnalytics/python-celery-unit-testing

Please comment below if you have any questions, suggestions or find something wrong.



Subscribe to our mailing list