起步
在 《分布式任務隊列Celery使用說明》 中介紹了在 Python 中使用 Celery 來實驗異步任務和定時任務功能。本文介紹如何在 Django 中使用 Celery。
安裝
pip install django-celery
這個命令使用的依賴是 Celery 3.x 的版本,所以會把我之前安裝的 4.x 卸載,不過對功能上并沒有什么影響。我們也完全可以僅用Celery在django中使用,但使用 django-celery 模塊能更好的管理 celery。
使用
可以把有關 Celery 的配置放到 settings.py 里去,但我比較習慣單獨一個文件來放,然后在 settings.py 引入進來:
# celery_config.pyimport djceleryimport osos.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')djcelery.setup_loader()BROKER_URL = 'redis://127.0.0.1:6379/1'CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'# UTCCELERY_ENABLE_UTC = TrueCELERY_TIMEZONE = 'Asia/Shanghai'CELERY_IMPORTS = ( 'app.tasks',)# 有些情況可以防止死鎖CELERY_FORCE_EXECV = True# 設置并發的worker數量CELERYD_CONCURRENCY = 4# 任務發送完成是否需要確認,這一項對性能有一點影響CELERY_ACKS_LATE = True# 每個worker執行了多少任務就會銷毀,防止內存泄露,默認是無限的CELERYD_MAX_TASKS_PER_CHILD = 40# 規定完成任務的時間CELERYD_TASK_TIME_LIMIT = 15 * 60 # 在15分鐘內完成任務,否則執行該任務的worker將被殺死,任務移交給父進程# 設置默認的隊列名稱,如果一個消息不符合其他的隊列就會放在默認隊列里面,如果什么都不設置的話,數據都會發送到默認的隊列中CELERY_DEFAULT_QUEUE = "default"# 設置詳細的隊列CELERY_QUEUES = { "default": { # 這是上面指定的默認隊列 "exchange": "default", "exchange_type": "direct", "routing_key": "default" }, "beat_queue": { "exchange": "beat_queue", "exchange_type": "direct", "routing_key": "beat_queue" }}
配置文件中設置了 CELERY_IMPORTS 導入的任務,所以在django app中創建相應的任務文件:
# app/tasks.pyfrom celery.task import Taskimport timeclass TestTask(Task): name = 'test-task' # 給任務設置個自定義名稱 def run(self, *args, **kwargs): print('start test task') time.sleep(4) print('args={}, kwargs={}'.format(args, kwargs)) print('end test task')
在 settings.py 添加:
INSTALLED_APPS = [ # ... 'djcelery',]# Celeryfrom learn_django.celery_config import *
觸發任務或提交任務可以在view中來調用:
# views.pyfrom django.http import HttpResponsefrom app.tasks import TestTaskdef test_task(request): # 執行異步任務 print('start do request') t = TestTask() t.delay() print('end do request') return HttpResponse('ok')
啟動 woker 的命令是:
python manage.py celery worker -l info
新聞熱點
疑難解答