起步
Celery 是一個簡單、靈活且可靠的,處理大量消息的分布式系統(tǒng),并且提供維護這樣一個系統(tǒng)的必需工具。它是一個專注于實時處理的任務(wù)隊列,同時也支持任務(wù)調(diào)度。
運行模式是生產(chǎn)者消費者模式:

任務(wù)隊列:任務(wù)隊列是一種在線程或機器間分發(fā)任務(wù)的機制。
消息隊列:消息隊列的輸入是工作的一個單元,稱為任務(wù),獨立的職程(Worker)進程持續(xù)監(jiān)視隊列中是否有需要處理的新任務(wù)。
Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,之后中間人把消息派送給職程,職程對消息進行處理。
Celery的架構(gòu)由三部分組成,消息中間件(message broker),任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(task result store)組成。
消息中間件:Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ, Redis, MongoDB等,本文使用 redis 。
任務(wù)執(zhí)行單元:Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運行在分布式的系統(tǒng)節(jié)點中
任務(wù)結(jié)果存儲:Task result store用來存儲Worker執(zhí)行的任務(wù)的結(jié)果,Celery支持以不同方式存儲任務(wù)的結(jié)果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲的,就先選用Redis來存儲任務(wù)執(zhí)行結(jié)果。
安裝
通過 pip 命令即可安裝:
pip install celery
本文使用 redis 做消息中間件,所以需要在安裝:
pip install redis
redis軟件也要安裝,官網(wǎng)只提供了 linux 版本的下載:https://redis.io/download,windows 的可以到 https://github.com/MicrosoftArchive/redis 下載 exe 安裝包。
簡單的demo
為了運行一個簡單的任務(wù),從中說明 celery 的使用方式。在項目文件夾內(nèi)創(chuàng)建 app.py 和 tasks.py 。tasks.py 用來定義任務(wù):
# tasks.pyimport timefrom celery import Celerybroker = 'redis://127.0.0.1:6379/1'backend = 'redis://127.0.0.1:6379/2'app = Celery('my_tasks', broker=broker, backend=backend)@app.taskdef add(x, y): print('enter task') time.sleep(3) return x + y這些代碼做了什么事。 broker 指定任務(wù)隊列的消息中間件,backend 指定了任務(wù)執(zhí)行結(jié)果的存儲。app 就是我們創(chuàng)建的 Celery 對象。通過 app.task 修飾器將 add 函數(shù)變成一個一部的任務(wù)。
# app.pyfrom tasks import addif __name__ == '__main__': print('start task') result = add.delay(2, 18) print('end task') print(result)add.delay 函數(shù)將任務(wù)序列化發(fā)送到消息中間件。終端執(zhí)行 python app.py 可以看到輸出一個任務(wù)的唯一識別:
新聞熱點
疑難解答