Как настроить Celery в Django

В этом руководстве по использованию Celery совместно с Django я расскажу:

  1. Как настроить Celery с Django.
  2. Как протестировать Celery-задачу в Django-оболочке.
  3. Где контролировать работу Celery-приложения.

Вы можете использовать на исходный код проекта из этого репозитория.

Зачем приложению на Django нужен Celery

Celery нужен для запуска задач в отдельном рабочем процессе (worker), что позволяет немедленно отправить HTTP-ответ пользователю в веб-процессе (даже если задача в рабочем процессе все еще выполняется). Цикл обработки запроса не будет заблокирован, что повысит качество взаимодействия с пользователем.
Ниже приведены некоторые примеры использования Celery:

  • Вы создали приложение с функцией отправки комментариев, в которых пользователь может использовать символ @, чтобы упомянуть другого пользователя, после чего последний получит уведомление по электронной почте. Если пользователь упоминает 10 человек в своем комментарии, веб-процессу необходимо обработать и отправить 10 электронных писем. Иногда это занимает много времени (сеть, сервер и другие факторы). В данном случае Celery может организовать отправку писем в фоновом режиме, что в свою очередь позволит вернуть HTTP-ответ пользователю без ожидания.
  • Нужно создать миниатюру загруженного пользователем изображения? Такую задачу стоит выполнить в рабочем процессе.
  • Вам необходимо делать что-то периодически, например, генерировать ежедневный отчет, очищать данные истекшей сессии. Используйте Celery для отправки задач рабочему процессу в назначенное время.

Когда вы создаете веб-приложение, постарайтесь сделать время отклика не более, чем 500мс (используйте New Relic или Scout APM), если пользователь ожидает ответа слишком долго, выясните причину и попытайтесь устранить ее. В решении такой проблемы может помочь Celery.

Celery или RQ

RQ (Redis Queue) — еще одна библиотека Python, которая решает вышеуказанные проблемы.
Логика работы RQ схожа с Celery (используется шаблон проектирования производитель/потребитель). Далее я проведу поверхностное сравнение для лучшего понимания, какой из инструментов более подходит для задачи.

  • RQ (Redis Queue) проста в освоении, направлена на снижение барьера в использовании асинхронного рабочего процесса. В ней отсутствуют некоторые функции, и она работает только с Redis и Python.
  • Celery предоставляет больше возможностей, поддерживает множество различных серверных конфигураций. Одним из минусов такой гибкости является более сложная документация, что довольно часто пугает новичков.

Я предпочитаю Celery, поскольку он замечательно подходит для решения многих проблем. Данная статья написана мной, чтобы помочь читателю (особенно новичку) быстро изучить Celery!

Брокер сообщений и бэкенд результатов

Брокер сообщений — это хранилище, которое играет роль транспорта между производителем и потребителем.
Из документации Celery рекомендуемым брокером является RabbitMQ, потому что он поддерживает AMQP (расширенный протокол очереди сообщений).

Так как во многих случаях нам не нужно использовать AMQP, другой диспетчер очереди, такой как Redis, также подойдет.

Бэкенд результатов — это хранилище, которое содержит информацию о результатах выполнения Celery-задач и о возникших ошибках.

Здесь рекомендуется использовать Redis.

Как настроить Celery

Celery не работает на Windows. Используйте Linux или терминал Ubuntu в Windows.

Далее я покажу вам, как импортировать Celery worker в ваш Django-проект.

Мы будем использовать Redis в качестве брокера сообщений и бэкенда результатов, что немного упрощает задачу. Но вы свободны в выборе любой другой комбинации, которая удовлетворяет требованиям вашего приложения.

Используйте Docker для подготовки среды разработки

Если вы работаете в Linux или Mac, у вас есть возможность использовать менеджер пакетов для настройки Redis (brew, apt-get install), однако я хотел бы порекомендовать вам попробовать применить Docker для установки сервера redis.

  1. Вы можете скачать Docker-клиент здесь.
  2. Затем попробуйте запустить службу Redis $ docker run -p 6379: 6379 --name some-redis -d redis

Команда выше запустит Redis на 127.0.0.1:6379.

  1. Если вы намерены использовать RabbitMQ в качестве брокера сообщений, вам нужно изменить только приведенную выше команду.
  2. Закончив работу с проектом, вы можете закрыть Docker-контейнер — окружение вашей рабочей машины по-прежнему будет чистым.

Теперь импортируем Celery в наш Django-проект.

Создание Django-проекта

Рекомендую создать отдельное виртуальное окружение и работать в нем.

$ pip install django==3.1
$ django-admin startproject celery_django
$ python manage.py startapp polls

Ниже представлена структура проекта.

├── celery_django 
│   ├── __init__.py

│   ├── asgi.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── manage.py
└── polls
    ├── __init__.py
    ├── admin.py
    ├── apps.py
    ├── migrations
    │   └── __init__.py
    ├── models.py
    ├── tests.py
    └── views.py

Файл celery.py

Давайте приступим к установке и настройке Celery.

pip install celery==4.4.7 redis==3.5.3 flower==0.9.7

Создайте файл celery_django/celery.py рядом с celery_django/wsgi.py.

"""
Файл настроек Celery
https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html
"""
from __future__ import absolute_import
import os
from celery import Celery

# этот код скопирован с manage.py
# он установит модуль настроек по умолчанию Django для приложения 'celery'.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_django.settings')

# здесь вы меняете имя
app = Celery("celery_django")

# Для получения настроек Django, связываем префикс "CELERY" с настройкой celery
app.config_from_object('django.conf:settings', namespace='CELERY')

# загрузка tasks.py в приложение django
app.autodiscover_tasks()


@app.task
def add(x, y):
    return x / y

Файл __init__.py

Давайте продолжим изменять проект, в celery_django/__init__.py добавьте.

from __future__ import absolute_import, unicode_literals

# Это позволит убедиться, что приложение всегда импортируется, когда запускается Django
from .celery import app as celery_app

__all__ = ('celery_app',)

Дополнение settings.py

Поскольку Celery может читать конфигурацию из файла настроек Django, мы внесем в него следующие изменения.

CELERY_BROKER_URL = "redis://127.0.0.1:6379/0"
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0"

Есть кое-что, о чем следует помнить.

При изучении документации Celery вы вероятно увидите, что broker_url — это ключ конфигурации, который вы должны установить для диспетчера сообщений, однако в приведенном выше celery.py:

  1. app.config_from_object('django.conf: settings', namespace = 'CELERY') сообщает Celery, чтобы он считывал значение из пространства имен CELERY, поэтому, если вы установите просто broker_url в своем файле настроек Django, этот параметр не будет работать. Правило применяется для всех ключей конфигурации в документации Celery.
  2. Некоторые конфигурационные ключи различаются между Celery 3 и Celery 4, так что, пожалуйста, загляните в документацию при настройке.

Отправка заданий Celery

После завершение работы с конфигурацией все готово к использованию Celery. Мы будем запускать некоторые команды в отдельном терминале, но я рекомендую вам взглянуть на Tmux, когда у вас будет время.

Сначала запустите Redis-клиент, потом celery worker в другом терминале, celery_django — это имя Celery-приложения, которое вы установили в celery_django/celery.py.

$ celery worker -A celery_django --loglevel=info

  -------------- celery@DESKTOP-111111 v4.4.7 (cliffs)
--- ***** ----- 
-- ******* ---- Linux-4.4.0-19041-Microsoft-x86_64-with-glibc2.27 2021-03-15 15:03:44
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         celery_django:0x7ff07f818ac0
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:     redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery_django.celery.add

Далее запустим приложение в новом терминале, которое поможет нам отслеживать Celery-задачу (я расскажу об этом чуть позже).


$ flower -A celery_django --port=5555

[I 210315 16:11:39 command:135] Visit me at http://localhost:5555
[I 210315 16:11:39 command:142] Broker: redis://127.0.0.1:6379/0
[I 210315 16:11:39 command:143] Registered tasks:
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap',
     'celery_django.celery.add']
[I 210315 16:11:39 mixins:229] Connected to redis://127.0.0.1:6379/0

Затем откройте http://localhost:5555/. Вы должны увидеть информационную панель, на которой отображаются детали выполнения рабочего процесса Celery.

Теперь войдем в Django shell и попробуем отправить Celery несколько задач.

$ python manage.py migrate
$ python manage.py shell
...
>>> from celery_django.celery import add
>>> task = add.delay(1, 2)

Рассмотрим некоторые моменты:

  1. Мы используем xxx.delay для отправки сообщения брокеру. Рабочий процесс получает эту задачу и выполняет ее.
  2. Когда вы нажимаете клавишу enter для ввода task = add.delay(1, 2), кажется, что команда быстро завершает выполнение (отсутствие блокировки), но метод добавления все еще активен в рабочем процессе Celery.
  3. Если вы проверите вывод терминала, где был запущен Celery, то увидите что-то вроде этого:
[2021-03-15 15:04:32,859: INFO/MainProcess] Received task: celery_django.celery.add[e1964774-fd3b-4add-96ff-116e3578de
de]
[2021-03-15 15:04:32,882: INFO/ForkPoolWorker-1] Task celery_django.celery.add[e1964774-fd3b-4add-96ff-116e3578dede] s
ucceeded in 0.013418699999988348s: 0.5

Рабочий процесс получил задачу в 15:04:32, и она была успешно выполнена.
Думаю, теперь у вас уже есть базовое представление об использовании Celery. Попробуем ввести еще один блок кода.

>>> print(task.state, task.result)
SUCCESS 0.5

Затем давайте попробуем вызвать ошибку в Celery worker и посмотрим, что произойдет.

>>> task = add.delay(1, 0)
>>> type(task)
celery.result.AsyncResult

>>> task.state
'FAILURE'
>>> task.result
ZeroDivisionError('division by zero')

Как видите, результатом вызова метода delay является экземпляр AsyncResult.
Мы можем использовать его следующим образом:

  1. Проверить состояние задачи.
  2. Узнать возвращенное значение (результат) или сведения об исключении.
  3. Получить другие метаданные.

Мониторинг Celery с помощью Flower

Flower позволяет отобразить информацию о работу Celery более наглядно на веб-странице с дружественным интерфейсом. Это значительно упрощает понимание происходящего, поэтому я хочу обратить внимание на Flower, прежде чем углубиться в дальнейшее рассмотрение Celery.

URL-адрес панели управления: http://127.0.0.1:5555/. Откройте страницу задач — Tasks.

Как настроить Celery в Django


При изучении Celery довольно полезно использовать Flower для лучшего понимания деталей.
Когда вы развертываете свой проект на сервере, Flower не является обязательным компонентом. Я имею в виду, что вы можете напрямую использовать команды Celery, чтобы управлять приложением и проверять статус рабочего процесса.

Заключение

В этой статье я рассказал об основных аспектах Celery. Надеюсь, что после прочтения вы стали лучше понимать процесс работы с ним. Исходный код проекта доступен по ссылке в начале статьи.

Максим
Я создал этот блог в 2018 году, чтобы распространять полезные учебные материалы, документации и уроки на русском. На сайте опубликовано множество статей по основам python и библиотекам, уроков для начинающих и примеров написания программ.
Мои контакты: Почта
admin@pythonru.comAlex Zabrodin2018-10-26OnlinePython, Programming, HTML, CSS, JavaScript