Celery 学习与实践

1 celery 简要概述

Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。
它是一个任务队列,专注于实时处理,同时还支持任务调度

celery 的优点

  • 简单:celery的 配置和使用还是比较简单的, 非常容易使用和维护和不需要配置文件
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
    如果连接丢失或发生故障,worker和client 将自动重试,并且一些代理通过主/主或主/副本复制方式支持HA。
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活: 几乎celery的各个组件都可以被扩展及自定制

1.1 celery 可以做什么?

典型的应用场景, 比如

  • 异步发邮件 , 一般发邮件比较耗时的操作,需要及时返回给前端,这个时候 只需要提交任务给celery 就可以了.之后 由worker 进行发邮件的操作 .
  • 比如有些 跑批接口的任务,需要耗时比较长,这个时候 也可以做成异步任务 .
  • 定时调度任务等

2 celery 的核心模块

2-1 celery 的5个角色

Task:就是任务,有异步任务和定时任务
Broker:中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
Worker:执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
Beat:定时任务调度器,根据配置定时将任务发送给Broker。
Backend:用于存储任务的执行结果。

file

3 实战

选择 Broker

使用 RabbitMQ 作为 Broker

$ docker run -d -p 5672:5672 rabbitmq

命令完成后,代理已经在后台运行,准备好为您移动消息:正在启动 rabbitmq-server: SUCCESS。

安装 Celery

$ pip install celery

应用

Let’s create the file tasks.py:

from celery import Celery

# app = Celery('tasks', broker='pyamqp://guest@localhost//')

# 对接MQ
app = Celery('tasks', broker='pyamqp://guest@98.142.143.145:5672//')

@app.task
def add(x, y):
    return x + y

Running the Celery worker server
You can now run the worker by executing our program with the worker argument:

第一个参数为当前模块的名称,只有在 main 模块中定义任务时才会生产名称。
第二个参数为中间人(Broker)的链接 URL ,实例中使用的 RabbitMQ(Celery默认使用的也是RabbitMQ)。

$ celery -A tasks worker --loglevel=INFO

执行结果:

[root@quant celery_test]# celery -A tasks worker --loglevel=INFO
/root/anaconda3/lib/python3.7/site-packages/celery/platforms.py:841: SecurityWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,

 -------------- celery@quant v5.2.6 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Linux-5.13.12-1.el7.elrepo.x86_64-x86_64-with-centos-7.9.2009-Core 2022-05-02 11:04:45
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7f21f5813390
- ** ---------- .> transport:   amqp://guest:**@98.142.143.145:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . tasks.add

[2022-05-02 11:04:45,801: INFO/MainProcess] Connected to amqp://guest:**@98.142.143.145:5672//
[2022-05-02 11:04:45,805: INFO/MainProcess] mingle: searching for neighbors
[2022-05-02 11:04:46,825: INFO/MainProcess] mingle: all alone
[2022-05-02 11:04:46,849: INFO/MainProcess] celery@quant ready.

需要调用我们创建的实例任务,可以通过 delay() 进行调用。
delay() 是 apply_async() 的快捷方法,可以更好的控制任务的执行(详情:调用任务:Calling Tasks):

>>> from tasks import add
>>> add.delay(4, 4)

该任务已经有职程(Worker)开始处理,可以通过控制台输出的日志进行查看执行情况。
调用任务会返回一个 AsyncResult 的实例,用于检测任务的状态,等待任务完成获取返回值(如果任务执行失败,会抛出异常)。默认这个功能是不开启的,如果开启则需要配置 Celery 的结果后端,下一小节会详细说明。

针对大型的项目,建议使用专用配置模块,进行针对 Celery 配置。不建议使用硬编码,建议将所有的配置项集中化配置。集中化配置可以像系统管理员一样,当系统发生故障时可针对其进行微调。
可以通过 app.config_from_object() 进行加载配置模块:

app.config_from_object('celeryconfig')

其中 celeryconfig 为配置模块的名称,这个是可以自定义修改的、
在上面的实例中,需要在同级目录下创建一个名为 celeryconfig.py 的文件,添加以下内容

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

相关文章:
Github|celery/celery
Celery中文手册
First Steps with Celery
celery 简要概述
知乎|Celery详解

为者常成,行者常至