Celery 分布式任务队列入门
一、Celery介绍和基本使用
Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery。
Celery是一个功能完备即插即用的任务队列。它使得我们不需要考虑复杂的问题,使用非常简单。 celery适用异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。
celery的特点是:
简单,易于使用和维护,有丰富的文档。 高效,单个celery进程每分钟可以处理数百万个任务。 灵活,celery中几乎每个部分都可以自定义扩展。 celery非常易于集成到一些web开发框架中.
Celery 安装
pip install -U Celery
任务队列
任务队列是一种跨线程、跨机器工作的一种机制.
任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.
celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。
一个celery系统可以包含很多的worker和broker,可增强横向扩展性和高可用性能。
borker
Celery需要一种解决消息的发送和接受的方式,我们把这种用来存储消息的的中间装置叫做message broker, 也可叫做消息中间人。
Celery的默认broker是RabbitMQ, 仅需配置一行就可以
broker_url = 'amqp://guest:guest@localhost:5672//'
rabbitMQ 没装的话请装一下,安装rabbitMQ参考
使用Redis做broker也可以,需要安装redis组件
pip install -U "celery[redis]"
配置:
配置消息中间人地址:
app.conf.broker_url = 'redis://localhost:6379/0'
如果想获取每个任务的执行结果,还需要配置一下把任务结果存在哪
app.conf.result_backend = 'redis://localhost:6379/0'
创建应用
创建一个celery application 用来定义你的任务列表。
创建一个任务文件 tasks.py
from celery import Celery
# Celery第一个参数是给其设定一个名字, 第二参数我们设定一个中间人broker, 在这里我们使用Redis作为中间人。
app = Celery('test', broker='redis://192.168.20.71:6379/0')
# 任务函数,通过加上装饰器app.task, 将其注册到broker的队列中。
@app.task
def print_task():
print("my celery 异步任务")
现在我们在创建工作者worker, 等待处理队列中的任务.
进入到 tasks.py目录下
celery -A tasks worker -l info
注意:如果是在windows下使用celery 3.1.17以上版本可能会报错请使用下面命令开启worker:
celery对windows支持不太好。参考GitHub问题
celery -A tasks worker -l info --pool=solo
调用任务
任务加入到broker队列中。如何将任务函数加入到队列中,可使用delay()方法。
from tasks import *
print_task.delay()
或者:
print_task.apply_async()
调用任务函数后,在worker的控制台,有一个任务被执行,返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。
保存结果
如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方,这里我们同样保存在redis中。
from celery import Celery
# Celery第一个参数是给其设定一个名字, 第二参数我们设定一个中间人broker, 在这里我们使用Redis作为中间人。
# backend 指定保存结果,指定保存在redis中。
app = Celery('test', broker='redis://192.168.20.71:6379/0',
backend='redis://192.168.20.71:6379/1'
)
# 任务函数,通过加上装饰器app.task, 将其注册到broker的队列中。
# 任务函数有返回值,并且增加两个参数。
@app.task
def print_task(a, b):
print("任务函数正在执行....")
return a + b
result对象方法:
方法 | 说明 |
---|---|
result | 查看任务函数返回的结果 |
state/status | PENDING 任务正在等待执行。 STARTED 任务已经开始。 RETRY 任务将被重试,可能是因为失败。FAILURE 该任务引发了一个例外,或者超过了重试限制。该result属性包含任务引发的异常。 SUCCESS 执行成功 |
failed | 执行成功返回 flask,执行失败返回 True |
forget | 删除这个任务的结果 |
get | 等待任务准备就绪,然后返回结果,等待任务中的任务可能会导致死锁。参数:timeout等待多久超时 |
info | 任务执行完成后,将包含返回值。如果任务引发异常,则这将是异常实例。 |
ready | True如果任务已执行则返回,如果任务仍在运行,挂起或正在等待重试,则False返回。 |
Celery 配置
Celery 配置简单,Celery有很多配置选项可以由开发人员配置,但是默认的配置都可以满足大部分应用场景了。 配置信息可以直接在app中设置,或者通过专有的配置模块来配置。
直接通过app配置:
from celery import Celery
app = Celery('test')
# 增加celery配置
app.conf.update(
result_backend='redis://192.168.20.71:6379/2',
broker_url='redis://192.168.20.71:6379/1',
)
通过配置文件配置: 对于比较大的项目,我们建议配置信息作为一个单独的模块。我们可以通过调用app的函数来告诉Celery使用我们的配置模块。 我们将Celery配置放在celeryconfig.py的模块中,这个模块名自定义。但需要确保能在tasks.py中导入。
下面我们在tasks.py模块 同级目录下创建配置模块celeryconfig.py:
broker_url = 'redis://192.168.20.71:6379/0'
result_backend = 'redis://192.168.20.71:6379/1'
注意:celery 4.0以上的版本引入了小写配置,但是还会兼容大写配置。如果使用旧版本,会识别不了小写配置,更多Celery配置参考.
修改tasks.py模块:
import time
from celery import Celery
import celeryconfig
app = Celery('test')
# 加载celery配置文件中的配置
app.config_from_object(celeryconfig)
# 任务函数,通过加上装饰器app.task, 将其注册到broker的队列中。
# 任务函数有返回值,并且增加两个参数。
@app.task
def print_task(a, b):
time.sleep(20)
print("任务函数正在执行....")
return a + b
在项目中使用celery
创建一个名为 celery_test目录作为celery文件目录。
celery.py 内容:
from celery import Celery
from . import celeryconfig
# 创建celery应用
app = Celery('u_test')
# 加载celery配置文件中的配置
app.config_from_object(celeryconfig)
# 自动搜索任务,指定目录名
app.autodiscover_tasks(['celery_test'])
celeryconfig.py 内容:
# 消息传输的中间人 broker
broker_url = 'redis://192.168.20.71:6379/0'
# 保存结果后端
result_backend = 'redis://192.168.20.71:6379/1'
tasks.py 内容:
from .celery import app
@app.task
def print_task(a, b):
print("任务函数正在执行....")
return a + b
@app.task
def print_task1(a, b):
print("任务函数正在执行....")
return a + b
@app.task
def print_task2(a, b):
print("任务函数正在执行....")
return a + b
@app.task
def print_task3(a, b):
print("任务函数正在执行....")
return a + b
启动worker并制定队列名称为 'myqueue':
celery -A celery_test worker -l info -Q 'myqueue'
调用任务
调用任务之前一直使用delay()方法,还有apply_async()方法也可以调用任务,apply_async()方法可以接收参数、执行选项,实际上delay方法也是调用 apply_async()方法,但是不能配置执行选项。
print_task.apply_async((2, 2), queue='my_queue', countdown=10)
任务my_task将会被发送到my_queue队列中,并且在发送10秒之后执行。
如果想要使用delay()方法,并且需要配置执行选项,可以使用signature函数进行封装,将参数以及选项封装到任务函数中。
In [31]: a = tasks.print_task.signature((3,2),countdown=10)
In [32]: a.delay()
任务组
group: 一组任务并行执行,返回一组返回值,并可以按顺序检索返回值。
chain: 任务一个一个执行,一个执行完将执行return结果传递给下一个任务函数.
group使用:
在celery_test同级目录下,创建一个test.py文件
from celery_test.tasks import print_task
from celery_test.tasks import print_task1
from celery_test.tasks import print_task2
from celery_test.tasks import print_task3
from celery import group
# 将多个signature放入同一组中
my_group = group(print_task.signature((2, 3),
queue='myqueue'),
print_task1.signature((1, 2),
queue='myqueue'),
print_task2.signature((1, 2),
queue='myqueue'),
print_task3.signature((1, 2),
queue='myqueue')
)
# 执行组任务,多个任务同时执行
res = my_group.delay()
# get会阻塞等待返回结果
print(res.get()) # [5, 3, 3, 3]
chain 使用:
from celery_test.tasks import print_task
from celery_test.tasks import print_task1
from celery_test.tasks import print_task2
from celery_test.tasks import print_task3
from celery import chain
# chain 将多个signature组成一个任务链
# print_task 的返回值传给print_task1的第一个参数
# print_task1 的返回值传给print_task2的第一个参数
my_chain = chain(print_task.signature((1, 3), queue='myqueue') |
print_task.signature((3,), queue='myqueue') |
print_task.signature((6,), queue='myqueue')
)
# 多个任务顺序执行
res = my_chain.delay()
# 最终返回print_task2 的结果
print(res.get()) # 返回13
任务路由
在生产中可以有多个worker,分别处理不同的任务。比如一个worker处理图片上传,一个worker专门处理短信发送。 我们创建两个队列,一个专门用于图片上传任务队列和图像处理,一个用来短信发送任务队列。 celery支持队列路由设置的,也可以在调用任务的时候指定队列。
首先在tasks.py中增加任务函数:
@app.task
def upload_img():
print("upload_img 任务函数正在执行....")
@app.task
def handle_img():
print("handel_img 任务函数正在执行....")
@app.task
def send_message():
print("send_message 任务函数正在执行....")
celeryconfig.py模块中配置 CELERY_ROUTES 选项:
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
task_routes = ({
'celery_test.tasks.upload_img': {'queue': 'queue1'},
'celery_test.tasks1.handle_img': {'queue': 'queue1'},
'celery_test.tasks2.send_message': {'queue': 'queue2'},
},
)
在test.py模块中将调用任务:
from celery_test.tasks import *
# 将任务加入队列
a = upload_img.delay()
print(a)
b = handle_img.delay()
print(b)
c = send_message.delay()
print(c)
打开两个终端开启两个worker,两个队列
celery -A celery_test worker -l info -Q 'queue1'
celery -A celery_test worker -l info -Q 'queue2'
也执行一个worker处理两个队列
celery -A celery_test worker -l info -Q queue2,queue1
二、celery 实现定时任务
celery 可以实现周期性任务,如果想要某个任务周期性执行,那么需要增加beat_schedule 配置信息.
在celeryconfig.py配置文件中增加
# 配置周期性任务
beat_schedule = {
'every-5-seconds': # 任务名称
{
'task': 'celery_test.tasks.upload_img', # 指定任务
'schedule': 5.0, # 每5秒执行一次
}
}
启动worker执行周期性任务:
注意如果要执行周期性任务,启动是需要加 --beat 参数
celery -A celery_test worker -l info -Q queue2,queue1 --beat
定时任务配置
- task 任务函数
- schedule 执行频率,可以是时间秒,也可以是 crontab对象
- args 列表或者元祖,传递给任务函数的参数
- kwargs 关键字参数,字典类型
- options apply_async函数支持的执行选项
from celery.schedules import crontab
beat_schedule = {
# 每周一早上7点半执行
'add-every-monday-morning': {
'task': 'celery_test.tasks.upload_img',# 任务
'schedule': crontab(hour=7, minute=30, day_of_week=1),#时间
'args': (16, 16),#参数
},
}
Crontab时间表
时间 | 说明 |
---|---|
crontab() | 每分钟执行一次 |
crontab(minute=0, hour=0) | # 每天凌晨十二点执行 |
crontab(minute=0, hour='*/3') | 每三小时执行一次,午夜,上午3点,上午6点,上午9点,中午,下午3点,下午6点,晚上9点。 |
crontab(minute='*/15') | 每15分钟执行一次 |
crontab(minute='*/10',hour='3,17,22', day_of_week='thu,fri') | 每十分钟执行一次,但仅在周四或周五的上午3-4点,下午5-6点和下午10-11点之间执行。 |
crontab(0, 0,day_of_month='1-7,15-21') | 在本月的第一周和第三周执行。 |
crontab(0, 0, day_of_month='2') | 每个月的第二天执行。 |
crontab(minute=0, hour='*/3,8-17') | 每3小时执行一次,但只在上午8点至下午5点)之间执行。 |
crontab(0, 0, day_of_month='11',month_of_year='5') | 每年5月11日执行。 |
如果周期性任务中断后,想要继续上一次的时间,需要访问执行celery 目录下的一个文件名为celerybeat-schedule 的文件。使用-s 参数是文件目录
celery -A celery_test worker -l info -Q queue2,queue1 --beat -s celerybeat-schedule
三、flask异步发送邮件
from flask import Flask
from celery import Celery
from flask_mail import Mail, Message
app = Flask(__name__) #type:Flask
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/15'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/15'
# 创建celery实例
celery_app = Celery(app.name, broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND'])
# 为Celery同步Flask上的配置
celery_app.conf.update(app.config)
# 配置 发送邮箱
app.config.update(
DEBUG=True,
MAIL_SERVER='smtp.163.com', # 服务器地址
MAIL_PROT=25, # 邮件服务器端口
MAIL_USE_TLS=True, # 使用tls 安全协议
MAIL_USERNAME='pydjango@163.com', # 发送邮箱
MAIL_PASSWORD='qwer1234', # 密码
)
mail = Mail(app)
@app.route('/')
def hello_world():
# 调用任务,将任务加入任务队列
send_async_email.delay('来自flask',app.config['MAIL_USERNAME'],
['pydjango@163.com'],'celery 异步发送邮件')
return 'Hello World!'
@celery_app.task
def send_async_email(sub,sender,recipients,body):
"""发送邮件任务"""
msg = Message(subject=sub, sender=sender, recipients=recipients,)
# 邮件内容
msg.body = body
# 添加附件 content_type对照表: http://tool.oschina.net/commons/
# 上传一张图片作为附件
msg.attach(filename='01.jpg',content_type='image/jpeg',data=open('01.jpg','rb').read())
# 因为Flask-Mail需要应用的context,所以需要在调用send方法前先创建应用的context环境。
with app.app_context():
mail.send(msg)
if __name__ == '__main__':
app.run()
在项目同级目录下执行开启worker
celery -A TestCelery.celery_app worker -l info
当访问 hello_world 视图时,将任务交给worker去处理。
使用celery发送邮件,不会因为网络问题,上传慢导致程序阻塞。直接将邮件交给后台去发送,视图函数直接返回。