Celery 学习与实践
1 celery 简要概述
Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。
它是一个任务队列,专注于实时处理,同时还支持任务调度。
celery 的优点
- 简单:celery的 配置和使用还是比较简单的, 非常容易使用和维护和不需要配置文件
- 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
如果连接丢失或发生故障,worker和client 将自动重试,并且一些代理通过主/主或主/副本复制方式支持HA。 - 快速:一个单进程的celery每分钟可处理上百万个任务
- 灵活: 几乎celery的各个组件都可以被扩展及自定制
1.1 celery 可以做什么?
典型的应用场景, 比如
- 异步发邮件 , 一般发邮件比较耗时的操作,需要及时返回给前端,这个时候 只需要提交任务给celery 就可以了.之后 由worker 进行发邮件的操作 .
- 比如有些 跑批接口的任务,需要耗时比较长,这个时候 也可以做成异步任务 .
- 定时调度任务等
2 celery 的核心模块
2-1 celery 的5个角色
Task:就是任务,有异步任务和定时任务
Broker:中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
Worker:执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
Beat:定时任务调度器,根据配置定时将任务发送给Broker。
Backend:用于存储任务的执行结果。
3 实战
选择 Broker
使用 RabbitMQ 作为 Broker
$ docker run -d -p 5672:5672 rabbitmq
命令完成后,代理已经在后台运行,准备好为您移动消息:正在启动 rabbitmq-server: SUCCESS。
安装 Celery
$ pip install celery
应用
Let’s create the file tasks.py:
from celery import Celery
# app = Celery('tasks', broker='pyamqp://guest@localhost//')
# 对接MQ
app = Celery('tasks', broker='pyamqp://guest@98.142.143.145:5672//')
@app.task
def add(x, y):
return x + y
Running the Celery worker server
You can now run the worker by executing our program with the worker argument:
第一个参数为当前模块的名称,只有在 main 模块中定义任务时才会生产名称。
第二个参数为中间人(Broker)的链接 URL ,实例中使用的 RabbitMQ(Celery默认使用的也是RabbitMQ)。
$ celery -A tasks worker --loglevel=INFO
执行结果:
[root@quant celery_test]# celery -A tasks worker --loglevel=INFO
/root/anaconda3/lib/python3.7/site-packages/celery/platforms.py:841: SecurityWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the --uid option.
User information: uid=0 euid=0 gid=0 egid=0
uid=uid, euid=euid, gid=gid, egid=egid,
-------------- celery@quant v5.2.6 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.13.12-1.el7.elrepo.x86_64-x86_64-with-centos-7.9.2009-Core 2022-05-02 11:04:45
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f21f5813390
- ** ---------- .> transport: amqp://guest:**@98.142.143.145:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2022-05-02 11:04:45,801: INFO/MainProcess] Connected to amqp://guest:**@98.142.143.145:5672//
[2022-05-02 11:04:45,805: INFO/MainProcess] mingle: searching for neighbors
[2022-05-02 11:04:46,825: INFO/MainProcess] mingle: all alone
[2022-05-02 11:04:46,849: INFO/MainProcess] celery@quant ready.
需要调用我们创建的实例任务,可以通过 delay() 进行调用。
delay() 是 apply_async() 的快捷方法,可以更好的控制任务的执行(详情:调用任务:Calling Tasks):
>>> from tasks import add
>>> add.delay(4, 4)
该任务已经有职程(Worker)开始处理,可以通过控制台输出的日志进行查看执行情况。
调用任务会返回一个 AsyncResult 的实例,用于检测任务的状态,等待任务完成获取返回值(如果任务执行失败,会抛出异常)。默认这个功能是不开启的,如果开启则需要配置 Celery 的结果后端,下一小节会详细说明。
针对大型的项目,建议使用专用配置模块,进行针对 Celery 配置。不建议使用硬编码,建议将所有的配置项集中化配置。集中化配置可以像系统管理员一样,当系统发生故障时可针对其进行微调。
可以通过 app.config_from_object() 进行加载配置模块:
app.config_from_object('celeryconfig')
其中 celeryconfig
为配置模块的名称,这个是可以自定义修改的、
在上面的实例中,需要在同级目录下创建一个名为 celeryconfig.py
的文件,添加以下内容
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
相关文章:
Github|celery/celery
Celery中文手册
First Steps with Celery
celery 简要概述
知乎|Celery详解
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)