試到一半的 Celery ~”~ (未來會繼續)
Introduction
simple, flexible and reliable distributed system to process vast amounts of messages
task queue with focus on real-time processing, while also supporting task scheduling
BSD license
Tutorial
Choosing a Broker
Broker 的功能是收發 messages,這邊通常會用專門的 message broker 來處理, 例如 :
- RabbitMQ
feature-complete
stable
durable
excellent choice for a production environment
- Redis
feature-complete
more susceptible to data loss in the event of abrupt termination or power failures
- Database
不推薦拿 database 來當 message queue
但對於小需求可能就足夠
- Other Brokers
Amazon SQS
MongoDB
IronMQ
Broker Overview : (2015-01-27)
Name |
Status |
Monitoring |
Remote Control |
---|---|---|---|
RabbitMQ |
Stable |
Yes |
Yes |
Redis |
Stable |
Yes |
Yes |
Mongo DB |
Experimental |
Yes |
Yes |
Beanstalk |
Experimental |
No |
No |
Amazon SQS |
Experimental |
No |
No |
Couch DB |
Experimental |
No |
No |
Zookeeper |
Experimental |
No |
No |
Django DB |
Experimental |
No |
No |
SQLAlchemy |
Experimental |
No |
No |
Iron MQ |
3rd party |
No |
No |
command on Arch Linux :
yaourt -S rabbitmq
sudo systemctl start rabbitmq
安裝 Celery
要裝 Celery 沒有什麼特別的,用 pip
就可以裝,
通常再搭配上 virtualenv 做隔離
command :
pip install celery
Application
Celery 裝完後要做的就是先建立一個 Celery instance (Celery application), 這個 instance 會是所有要交給 Celery 完成的事的 entry-point, 所以要讓各 modules 都可以 import
# proj/celery.py
from celery import Celery
app = Celery('proj',
broker='amqp://localhost:5672', # 沒寫 port 的話會自動找 5672 port
backend='amqp://',
include=['proj.tasks']) # include 裡面是要跑的 tasks
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
app.start()
寫完 instance 後,可以開始給 task 了
# proj/tasks.py
from proj.celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
啟動 Celery
接下來可以用 command 啟動 celery
# celery -A ${app instance} worker -l info
# 如果 -A 後面接的 folder 的話,會去找裡面的 celery.py
# 等同於 "celery -A proj.celery worker -l info"
# 如果在 proj 底下的 app instance 檔案叫 mycelery.py 的話
# command 就變成 "celery -A proj.mycelery worker -l info"
celery -A proj worker -l info
# More Help
celery worker --help
celery help
啟動畫面 :
-------------- celery@linux-dv v3.1.17 (Cipater)
---- **** -----
--- * *** * -- Linux-3.17.3-1-ARCH-x86_64-with-arch
-- * - **** ---
1. ** ---------- [config]
2. ** ---------- .> app: __main__:0x7fc92f14e0f0
3. ** ---------- .> transport: amqp://guest:**@localhost:5672//
4. ** ---------- .> results: amqp://
5. *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. proj.task.add
. proj.task.mul
. proj.task.xsum
開始丟 Task
from proj.tasks import add
add(4, 5) # 直接 call 不換產生 task 給 Celery,而是會直接做
add.delay(4, 5) # 用 delay 會產生 task 交給 Celery,所以是 asynchronous 的
# delay 其實 "apply_async" 的 shortcut
# add.delay(4, 5) 等同於 add.apply_async((4, 5))
data = add.delay(8, 9) # type(data) : celery.result.AsyncResult
data.get(timeout=1) # 取值,等超過 1 秒還沒完成就 timeout,會 raise TimeoutError
data.revoke(terminate=True) # 強制停止 task
強制停止 task (from outside) :
from celery.task.control import revoke
# 參數一是 task id,可以從 celery 的 log 中的得知
revoke("69511b35-dcbc-4f93-9252-e428ed8114f5", terminate=True)
保存結果
如果要持續監控 tasks 的狀態的話,需要把 Celery 接上 backend 來儲存, 內建的 backend 支援有很多種, 例如: SQLAlchemy、Django ORM、Memcached、Redis、AMQP (RabbitMQ)、MongoDB 等, 或者是可以自己接新的 backend。
backend 的設定在 “Celery” 的 backend
argument 或是 configuration module 裡的 CELERY_RESULT_BACKEND
AMQP 當 backend :
app = Celery('tasks', backend='amqp', broker='amqp://')
Redis 當 backend :
app = Celery('tasks', backend='redis://localhost', broker='amqp://')
在用 delay 來丟 task 的時候,回傳的會是一個 AsyncResult
instance,
接著可以用 ready()
這個 method 來確認完成了沒。
result = add.delay(4, 4) # AsyncResult
result.ready() # True / False
另外可以用 get()
method 來一段等待時間,超過時間還沒完成就 timeout,
最後就會拿到 TimeoutError 這個 exception。
result.get(timeout=1) # result value / TimeoutError exception / exception in task
如果 task 裡 raise 了 exception, get()
會再 re-raise exception,
如果不想要再 re-raise 的話,就在 get 裡加上 propagate=False
這參數,
加上這參數後,如果 task 裡 raise 了 exception 的話,
“get()” 拿到的會是 exception instance,
而不是直接 re-raise exception,
另外有 exception 的時候可以在 traceback
這個 attribute 裡看到原本的 traceback。
假設現在寫了一種新的 task :
# proj/tasks.py
from proj.celery import app
@app.task
def exception():
raise ValueError("just kidding")
接著在別的地方來丟出 task :
from proj import tasks
result = tasks.exception.delay()
result.get() # exception and traceback
# ValueError: just kidding
tmp = result.get(propagate=False) # instance of ValueError
print(tmp) # just kidding
print(repr(tmp)) # ValueError('just kidding',)
isinstance(tmp, ValueError) # True
print(result.traceback)
Configuration
用預設的東西,不太別去調設定,其實就可以良好運作了。 但是 Celery 也有提供更多的設定讓使用者可以自己調整,
serializer
Celery with non-Python
Frequently Asked Questions
Celery 的 FAQ 的節錄 (2015-01-27)
General
我該拿 Celery 做什麼 ?
- 把東西放在 background 跑
例如 web request 盡可能地早點回傳給使用者,接著再把耗時的東西陸續傳給使用者,這會讓使用者感覺回應時間減少、performance 變好
在 web request 結束後跑其他額外的工作
確保工作有完成 (asynchronously 執行,定期檢查、重試)
定期執行的工作
分散式計算
平行化執行
Misconceptions
Celery 一定需要 pickle ?
No.
Celery 支援各種 serialization scheme, 目前內建支援的有 JSON、YAML、Pickle、msgpack, 每個 task 要用什麼 serialization scheme 是可以分別指定的, 預設使用 pickle 是因為可以傳送複雜的 Python objects, 如果需要跟不同語言溝通的話可以選擇其他適合的 format。
Troubleshooting
清除所有 waiting tasks ?
celery -A proj purge
Results
取得某 task id 的東西
result = my_task.AsyncResult(task_id)
result.get()
Security
用 pickle 會有 security 問題吧 ?
是,用 pickle 會有安全疑慮, 基本上你要確保不該有 access 權限的地方不能 access 到你的 broker、databases、other services。
可以設定 CELERY_TASK_SERIALIZER
來改變 task messages 格式成 json 或 yaml 之類的。
我想要加密
對於支援 SSL 的 AMQP brokers 可以設定 BROKER_USE_SSL
來加密
Tasks
用名稱來 call task ?
app.send_task('tasks.add', args=[2, 2], kwargs={})
取得 task id ?
@app.task(bind=True)
def mytask(self):
cache.set(self.request.id, "Running")
我要跑連續的 task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task
def add(x, y):
return x + y
@app.task(ignore_result=True)
def log_result(result):
logger.info("log_result got: %r", result)
# run task
# 這邊注意到用的是 ".s" 而不是直接 call ".delay"
# 詳細看 celery.canvas.Signature
(add.s(2, 2) | log_result.s()).delay()