0%

Celery实现定时任务

Celery定时任务

celery beat

定时任务由 Celery Beat 进程周期性地将任务发往任务队列,所以我们需要启动一个Celery Beat和一个Celery Worker。Celery Beat启动方式和Celery Worker一致,下面我们先来看下Celery定时任务怎么coding。

设置

首先是定时任务的各种设置

时区

默认情况下是UTC 时区,可以通过timezone修改:

1
2
3
4
timezone='Europe/London'
# 或者配置中国时间
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False

beat_schedule

定时任务的核心配置

task: 指定任务的名字
schedule : 设定任务的调度方式,可以是一个表示秒的整数,也可以是一个 timedelta 对象,或者是一个 crontab 对象(后面介绍),总之就是设定任务如何重复执行
args: 任务的位置参数以列表的形式
kwargs:任务的关键字参数,以字典的形式
options:所有 apply_async 所支持的参数

一个常见的例子:

1
2
3
4
5
6
7
8
myapp.conf.beat_schedule = {
# m每分钟一次demo_schedule任务
'sending_email_minute': {
'task': 'sending_email',
'schedule': crontab(),
'args': ("123456@qq.com", "hello"),
},
}

执行周期的对象

timedelta 对象

表示间隔时间执行,是比较简单的一种定时方式。

1
2
3
from datetime import timedelta

timedelta(seconds=3), #每三秒 执行一次

crontab 对象

参考与linux的crontab定制方式,可进行细节化定制定时任务

1
2
3
4
5
6
7
8
#schedule配置举例
from celery.schedules import crontab
# 每分钟执行一次
crontab()
# 每天凌晨十二点执行
crontab(minute=0, hour=0)
# 或者这么写,每小时执行一次
crontab(minute=0, hour="*/1")

加载配置

1、app.conf.参数名称 = 参数值
2、app.config_from_object(配置文件路径)
3、app.conf.update(
参数名称=参数值,
参数名称=参数值
)

Celery beat 和 Worker启动

1、celery -A celery_server.myapp beat -l debug

2、celery -A celery_server.myapp worker

定时任务demo

celery_server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# -*- coding: utf-8 -*-
# redis开启后 启动下面两个任务
# 1、celery -A celery_server.myapp beat -l debug
# 2、celery -A celery_server.myapp worker

from celery import Celery
from celery.schedules import crontab

myapp = Celery('schedule')
myapp.conf.broker_url = 'redis://localhost:6379/0'
myapp.conf.result_backend = 'redis://localhost:6379/1'
myapp.conf.imports = ['celery_task']
myapp.conf.worker_concurrency = 1

myapp.conf.beat_schedule = {
# 每分钟一次demo_schedule任务
'sending_email_minute': {
'task': 'sending_email',
'schedule': crontab(),
'args': ("123456@qq.com", "hello"),
},
}

celery_task.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14

# -*- coding: utf-8 -*-

# redis开启后 启动下面两个任务
# 1、celery -A celery_server.myapp beat -l debug
# 2、celery -A celery_server.myapp worker

import requests
from celery_server import myapp

@myapp.task(name='sending_email')
def send_email(email,content):
requests.get("http://redis.xxxxx.ceye.io")
print(email+"发送成功")

进入到代码所在目录,启动:

celery -A celery_server.myapp beat -l debug

celery -A celery_server.myapp worker

定时任务就会定时执行。

备注

后续如果遇到更复杂的定时任务的时候再接着补充,目前仅用到了一些简单的定时任务模块。


采用署名-非商业性使用-相同方式共享 4.0(CC BY-NC-SA 4.0)许可协议
「分享也是一种学习」