Celery 分布式任务队列入门

一、Celery介绍和基本使用

Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery。

Celery是一个功能完备即插即用的任务队列。它使得我们不需要考虑复杂的问题,使用非常简单。 celery适用异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。

celery的特点是:

简单,易于使用和维护,有丰富的文档。 高效,单个celery进程每分钟可以处理数百万个任务。 灵活,celery中几乎每个部分都可以自定义扩展。 celery非常易于集成到一些web开发框架中.

Celery 安装

pip install -U Celery

任务队列

任务队列是一种跨线程、跨机器工作的一种机制.

任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.

celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。

 一个celery系统可以包含很多的worker和broker,可增强横向扩展性和高可用性能。

borker

Celery需要一种解决消息的发送和接受的方式,我们把这种用来存储消息的的中间装置叫做message broker, 也可叫做消息中间人。

Celery的默认broker是RabbitMQ, 仅需配置一行就可以

broker_url = 'amqp://guest:guest@localhost:5672//'

rabbitMQ 没装的话请装一下,安装rabbitMQ参考

使用Redis做broker也可以,需要安装redis组件

pip install -U "celery[redis]"

配置:

配置消息中间人地址:

app.conf.broker_url = 'redis://localhost:6379/0'

如果想获取每个任务的执行结果,还需要配置一下把任务结果存在哪

app.conf.result_backend = 'redis://localhost:6379/0'

创建应用

创建一个celery application 用来定义你的任务列表。

创建一个任务文件 tasks.py

from celery import Celery

# Celery第一个参数是给其设定一个名字, 第二参数我们设定一个中间人broker, 在这里我们使用Redis作为中间人。

app = Celery('test', broker='redis://192.168.20.71:6379/0')


# 任务函数,通过加上装饰器app.task, 将其注册到broker的队列中。
@app.task
def print_task():
    print("my celery 异步任务")

现在我们在创建工作者worker, 等待处理队列中的任务.

进入到 tasks.py目录下

celery -A tasks worker -l info

注意:如果是在windows下使用celery 3.1.17以上版本可能会报错请使用下面命令开启worker:

celery对windows支持不太好。参考GitHub问题

celery -A tasks worker  -l info --pool=solo

调用任务

任务加入到broker队列中。如何将任务函数加入到队列中,可使用delay()方法。

from tasks import *
print_task.delay()

或者:

print_task.apply_async()

调用任务函数后,在worker的控制台,有一个任务被执行,返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。

保存结果

如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方,这里我们同样保存在redis中。

from celery import Celery

# Celery第一个参数是给其设定一个名字, 第二参数我们设定一个中间人broker, 在这里我们使用Redis作为中间人。
# backend 指定保存结果,指定保存在redis中。
app = Celery('test', broker='redis://192.168.20.71:6379/0',
             backend='redis://192.168.20.71:6379/1'
             )


# 任务函数,通过加上装饰器app.task, 将其注册到broker的队列中。
# 任务函数有返回值,并且增加两个参数。
@app.task
def print_task(a, b):
    print("任务函数正在执行....")
    return a + b

result对象方法:

方法 说明
result 查看任务函数返回的结果
state/status PENDING 任务正在等待执行。 STARTED 任务已经开始。 RETRY 任务将被重试,可能是因为失败。FAILURE 该任务引发了一个例外,或者超过了重试限制。该result属性包含任务引发的异常。 SUCCESS 执行成功
failed 执行成功返回 flask,执行失败返回 True
forget 删除这个任务的结果
get 等待任务准备就绪,然后返回结果,等待任务中的任务可能会导致死锁。参数:timeout等待多久超时
info 任务执行完成后,将包含返回值。如果任务引发异常,则这将是异常实例。
ready True如果任务已执行则返回,如果任务仍在运行,挂起或正在等待重试,则False返回。

更多AsyncResult对象信息

Celery 配置

Celery 配置简单,Celery有很多配置选项可以由开发人员配置,但是默认的配置都可以满足大部分应用场景了。 配置信息可以直接在app中设置,或者通过专有的配置模块来配置。

直接通过app配置:

from celery import Celery
app = Celery('test')

# 增加celery配置
app.conf.update(
    result_backend='redis://192.168.20.71:6379/2',
    broker_url='redis://192.168.20.71:6379/1',
)

通过配置文件配置: 对于比较大的项目,我们建议配置信息作为一个单独的模块。我们可以通过调用app的函数来告诉Celery使用我们的配置模块。 我们将Celery配置放在celeryconfig.py的模块中,这个模块名自定义。但需要确保能在tasks.py中导入。

下面我们在tasks.py模块 同级目录下创建配置模块celeryconfig.py:

broker_url = 'redis://192.168.20.71:6379/0'
result_backend = 'redis://192.168.20.71:6379/1'

注意:celery 4.0以上的版本引入了小写配置,但是还会兼容大写配置。如果使用旧版本,会识别不了小写配置,更多Celery配置参考.

修改tasks.py模块:

import time
from celery import Celery
import celeryconfig


app = Celery('test')

# 加载celery配置文件中的配置
app.config_from_object(celeryconfig)



# 任务函数,通过加上装饰器app.task, 将其注册到broker的队列中。
# 任务函数有返回值,并且增加两个参数。
@app.task
def print_task(a, b):
    time.sleep(20)
    print("任务函数正在执行....")
    return a + b

在项目中使用celery

创建一个名为 celery_test目录作为celery文件目录。

celery.py 内容:



from celery import Celery
from . import celeryconfig

# 创建celery应用
app = Celery('u_test')

# 加载celery配置文件中的配置
app.config_from_object(celeryconfig)

# 自动搜索任务,指定目录名
app.autodiscover_tasks(['celery_test'])

celeryconfig.py 内容:

# 消息传输的中间人 broker
broker_url = 'redis://192.168.20.71:6379/0'
# 保存结果后端
result_backend = 'redis://192.168.20.71:6379/1'

tasks.py 内容:

from .celery import app

@app.task
def print_task(a, b):

    print("任务函数正在执行....")
    return a + b

@app.task
def print_task1(a, b):
    print("任务函数正在执行....")
    return a + b

@app.task
def print_task2(a, b):
    print("任务函数正在执行....")
    return a + b

@app.task
def print_task3(a, b):
    print("任务函数正在执行....")
    return a + b

启动worker并制定队列名称为 'myqueue':

celery -A celery_test worker -l info -Q 'myqueue'

调用任务

调用任务之前一直使用delay()方法,还有apply_async()方法也可以调用任务,apply_async()方法可以接收参数、执行选项,实际上delay方法也是调用 apply_async()方法,但是不能配置执行选项。

print_task.apply_async((2, 2), queue='my_queue', countdown=10)

任务my_task将会被发送到my_queue队列中,并且在发送10秒之后执行。

如果想要使用delay()方法,并且需要配置执行选项,可以使用signature函数进行封装,将参数以及选项封装到任务函数中。

In [31]: a = tasks.print_task.signature((3,2),countdown=10)

In [32]: a.delay()

任务组

group: 一组任务并行执行,返回一组返回值,并可以按顺序检索返回值。

chain: 任务一个一个执行,一个执行完将执行return结果传递给下一个任务函数.

group使用:

在celery_test同级目录下,创建一个test.py文件

from celery_test.tasks import print_task
from celery_test.tasks import print_task1
from celery_test.tasks import print_task2
from celery_test.tasks import print_task3
from celery import group

# 将多个signature放入同一组中

my_group = group(print_task.signature((2, 3),
                                      queue='myqueue'),
                 print_task1.signature((1, 2),
                                       queue='myqueue'),
                 print_task2.signature((1, 2),
                                       queue='myqueue'),
                 print_task3.signature((1, 2),
                                       queue='myqueue')
                 )
# 执行组任务,多个任务同时执行
res = my_group.delay()
# get会阻塞等待返回结果
print(res.get())  # [5, 3, 3, 3]

chain 使用:

from celery_test.tasks import print_task
from celery_test.tasks import print_task1
from celery_test.tasks import print_task2
from celery_test.tasks import print_task3

from celery import chain

# chain 将多个signature组成一个任务链
# print_task 的返回值传给print_task1的第一个参数
# print_task1 的返回值传给print_task2的第一个参数
my_chain = chain(print_task.signature((1, 3), queue='myqueue') |
                 print_task.signature((3,), queue='myqueue') |
                 print_task.signature((6,), queue='myqueue')
                 )
# 多个任务顺序执行
res = my_chain.delay()
# 最终返回print_task2 的结果
print(res.get())  # 返回13

任务路由

在生产中可以有多个worker,分别处理不同的任务。比如一个worker处理图片上传,一个worker专门处理短信发送。 我们创建两个队列,一个专门用于图片上传任务队列和图像处理,一个用来短信发送任务队列。 celery支持队列路由设置的,也可以在调用任务的时候指定队列。

首先在tasks.py中增加任务函数:


@app.task
def upload_img():
    print("upload_img 任务函数正在执行....")


@app.task
def handle_img():
    print("handel_img 任务函数正在执行....")

@app.task
def send_message():
    print("send_message 任务函数正在执行....")

celeryconfig.py模块中配置 CELERY_ROUTES 选项:

broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'

task_routes = ({
    'celery_test.tasks.upload_img': {'queue': 'queue1'},
    'celery_test.tasks1.handle_img': {'queue': 'queue1'},
    'celery_test.tasks2.send_message': {'queue': 'queue2'},
    },
)

在test.py模块中将调用任务:

from celery_test.tasks import *

# 将任务加入队列
a = upload_img.delay()
print(a)
b = handle_img.delay()
print(b)
c = send_message.delay()
print(c)

打开两个终端开启两个worker,两个队列

celery -A celery_test  worker -l info -Q 'queue1'

celery -A celery_test  worker -l info -Q 'queue2'

也执行一个worker处理两个队列

celery -A celery_test  worker -l info -Q queue2,queue1

二、celery 实现定时任务

celery 可以实现周期性任务,如果想要某个任务周期性执行,那么需要增加beat_schedule 配置信息.

在celeryconfig.py配置文件中增加

# 配置周期性任务
beat_schedule = {
    'every-5-seconds': # 任务名称
        {
            'task': 'celery_test.tasks.upload_img', # 指定任务
            'schedule': 5.0,  # 每5秒执行一次
        }
}

启动worker执行周期性任务:

注意如果要执行周期性任务,启动是需要加 --beat 参数

celery -A celery_test  worker -l info -Q queue2,queue1 --beat

定时任务配置

  • task 任务函数
  • schedule 执行频率,可以是时间秒,也可以是 crontab对象
  • args 列表或者元祖,传递给任务函数的参数
  • kwargs 关键字参数,字典类型
  • options apply_async函数支持的执行选项
from celery.schedules import crontab

beat_schedule = {
    # 每周一早上7点半执行
    'add-every-monday-morning': {
        'task': 'celery_test.tasks.upload_img',# 任务
        'schedule': crontab(hour=7, minute=30, day_of_week=1),#时间
        'args': (16, 16),#参数
    },
}

Crontab时间表

时间 说明
crontab() 每分钟执行一次
crontab(minute=0, hour=0) # 每天凌晨十二点执行
crontab(minute=0, hour='*/3') 每三小时执行一次,午夜,上午3点,上午6点,上午9点,中午,下午3点,下午6点,晚上9点。
crontab(minute='*/15') 每15分钟执行一次
crontab(minute='*/10',hour='3,17,22', day_of_week='thu,fri') 每十分钟执行一次,但仅在周四或周五的上午3-4点,下午5-6点和下午10-11点之间执行。
crontab(0, 0,day_of_month='1-7,15-21') 在本月的第一周和第三周执行。
crontab(0, 0, day_of_month='2') 每个月的第二天执行。
crontab(minute=0, hour='*/3,8-17') 每3小时执行一次,但只在上午8点至下午5点)之间执行。
crontab(0, 0, day_of_month='11',month_of_year='5') 每年5月11日执行。

如果周期性任务中断后,想要继续上一次的时间,需要访问执行celery 目录下的一个文件名为celerybeat-schedule 的文件。使用-s 参数是文件目录

celery -A celery_test  worker -l info -Q queue2,queue1 --beat -s celerybeat-schedule

三、flask异步发送邮件

from flask import Flask
from celery import Celery
from flask_mail import Mail, Message

app = Flask(__name__)  #type:Flask

app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/15'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/15'

# 创建celery实例
celery_app = Celery(app.name, broker=app.config['CELERY_BROKER_URL'],
                    backend=app.config['CELERY_RESULT_BACKEND'])

# 为Celery同步Flask上的配置
celery_app.conf.update(app.config)

# 配置 发送邮箱
app.config.update(
    DEBUG=True,
    MAIL_SERVER='smtp.163.com',  # 服务器地址
    MAIL_PROT=25,   # 邮件服务器端口
    MAIL_USE_TLS=True,  # 使用tls 安全协议
    MAIL_USERNAME='pydjango@163.com',  # 发送邮箱
    MAIL_PASSWORD='qwer1234',   # 密码
)

mail = Mail(app)
@app.route('/')
def hello_world():
    # 调用任务,将任务加入任务队列
    send_async_email.delay('来自flask',app.config['MAIL_USERNAME'],
                           ['pydjango@163.com'],'celery 异步发送邮件')
    return 'Hello World!'


@celery_app.task
def send_async_email(sub,sender,recipients,body):
    """发送邮件任务"""
    msg = Message(subject=sub, sender=sender, recipients=recipients,)
    # 邮件内容
    msg.body = body

    # 添加附件 content_type对照表: http://tool.oschina.net/commons/
    # 上传一张图片作为附件
    msg.attach(filename='01.jpg',content_type='image/jpeg',data=open('01.jpg','rb').read())
    # 因为Flask-Mail需要应用的context,所以需要在调用send方法前先创建应用的context环境。
    with app.app_context():
        mail.send(msg)

if __name__ == '__main__':
    app.run()

在项目同级目录下执行开启worker

celery -A TestCelery.celery_app worker -l info

当访问 hello_world 视图时,将任务交给worker去处理。

使用celery发送邮件,不会因为网络问题,上传慢导致程序阻塞。直接将邮件交给后台去发送,视图函数直接返回。

Iyoyo电子书 一本集作者多年开发经验的python电子书 all right reserved,powered by Gitbook文件修订时间: 2022年 09:47:52