2. Celery

В одном из файлов встречается использование sleep.

@celery.task(name="users.rebuild_tags") def rebuild_tags(student_id: str | int) -> None: # preventing race condition when task looks for user which isn't saved into db yet  time.sleep(1) # ❌  user = apps.get_model("users.User").objects.get(pk=student_id) generate_tags(user) 

Нормально использовать sleep после операции, которая требует ожидания, но здесь такого нет, а это уже плохо пахнет.

Причина появления sleep указана в комментарии. Как же так получается, что student_id уже есть, а пользователя связанного с этим id в БД ещё нет? Всё дело в транзакциях. Пока транзакция не завершится, созданные ею изменения не видны.

Рассмотрим следующий пример

class UserView(APIView): def post(request): user = create_user(request) rebuild_tags.delay(student_id=user.id) # ❌  long_time_db_operation() return Response({"id": user.id}, status=201) 

По умолчанию в django все http-запросы заворачиваются в транзакции. Это значит, что созданный пользователь виден только в представлении (view), которое его создала. Код за пределами представления не видит этого пользователя, пока не завершится транзакция. Для простоты, будем считать, что транзакция завершается, когда выполниться return.
Задача rebuild_tags ставится в очередь раньше, чем будет выполнен return, а значит возможна ситуация, когда задача выполнится раньше завершения транзакции. sleep должен был решить эту проблему, но представим себе, что long_time_db_operation может иметь переменную длительность в зависимости от загруженности базы данных, тогда в некоторых случаях длительность может быть больше одной секунды и тогда sleep не поможет.

Есть надёжный вариант

class UserView(APIView): def post(request): user = create_user(request) task = rebuild_tags.s(student_id=user.id).delay transaction.on_commit(task) # ✅  long_time_db_operation() return Response({"id": user.id}, status=201) 

transaction.on_commit – принимает на вход функцию, которую нужно выполнить после завершения транзакции, поэтому у .delay нет скобок
Параметры задачи переезжают в метод .s, это позволяет задаче запуститься с нужными параметрами.
Таким образом, не важно сколько времени занимает транзакция, фоновая задача будет запущена, только после успешного завершения транзакции.

Offtop. С точки зрения TDD вариант со sleep правильный, т.к. TDD требует писать минимальный код, который будет проходить тесты. Но если написать тест, который проверяет, что задача не ставится в очередь при падении транзакции, то тогда без on_commit не обойтись.

Используйте именованные аргументы

Ещё посмотреть на события celery во flower, то информация о задаче может выглядеть rebuild_tags args=(1) kwargs={}. Когда параметров много, это могло выглядеть так args=(10, 1000, true, 1, 100). Неизвестно что означают эти числа. Когда перед глазами сотни событий celery, это не очень хорошо. Исправить это очень легко, достаточно добавить *, перед аргументами задачи. В нашем примере это будет так:

@celery.task(name="users.rebuild_tags") def rebuild_tags(*, student_id: str | int) -> None: # ✅  ... 

Во flower это будет отображаться так rebuild_tags args=() kwargs={"student_id": 1} – так намного понятней.

Рассмотрим ещё один пример. Пусть в celery beat добавлена такая задача.

@celery.task(name="Рассылка") def send_mails(*, hour: int = 8, minute: int = 0) -> None: users = find_users(hour) # Найти пользователей, у сейчас которых подходящее локальное время  if now().minute = minute: # Странная проверка, которая мешает тестированию  ... 

Как такое тестировать? Это не проблема. Благодаря именованным аргументам, их можно задать через админку в блоке Arguments в формате json: {"hour":17, "minute":47}


Source: DEV Community.


Leave a Reply

Your email address will not be published. Required fields are marked *

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.

The reCAPTCHA verification period has expired. Please reload the page.