起步
Celery 是一個簡單、靈活且可靠的,處理大量消息的分布式系統,并且提供維護這樣一個系統的必需工具。它是一個專注于實時處理的任務隊列,同時也支持任務調度。
運行模式是生產者消費者模式:
任務隊列:任務隊列是一種在線程或機器間分發任務的機制。
消息隊列:消息隊列的輸入是工作的一個單元,稱為任務,獨立的職程(Worker)進程持續監視隊列中是否有需要處理的新任務。
Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,之后中間人把消息派送給職程,職程對消息進行處理。
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
消息中間件:Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ, Redis, MongoDB等,本文使用 redis 。
任務執行單元:Worker是Celery提供的任務執行的單元,worker并發的運行在分布式的系統節點中
任務結果存儲:Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲的,就先選用Redis來存儲任務執行結果。
安裝
通過 pip 命令即可安裝:
pip install celery
本文使用 redis 做消息中間件,所以需要在安裝:
pip install redis
redis軟件也要安裝,官網只提供了 linux 版本的下載:https://redis.io/download,windows 的可以到 https://github.com/MicrosoftArchive/redis 下載 exe 安裝包。
簡單的demo
為了運行一個簡單的任務,從中說明 celery 的使用方式。在項目文件夾內創建 app.py 和 tasks.py 。tasks.py 用來定義任務:
# tasks.pyimport timefrom celery import Celerybroker = 'redis://127.0.0.1:6379/1'backend = 'redis://127.0.0.1:6379/2'app = Celery('my_tasks', broker=broker, backend=backend)@app.taskdef add(x, y): print('enter task') time.sleep(3) return x + y
這些代碼做了什么事。 broker 指定任務隊列的消息中間件,backend 指定了任務執行結果的存儲。app 就是我們創建的 Celery 對象。通過 app.task 修飾器將 add 函數變成一個一部的任務。
# app.pyfrom tasks import addif __name__ == '__main__': print('start task') result = add.delay(2, 18) print('end task') print(result)
add.delay 函數將任務序列化發送到消息中間件。終端執行 python app.py 可以看到輸出一個任務的唯一識別:
新聞熱點
疑難解答