使用celery异步需要的包

1
2
3
4
使用的版本:
pip install django==2.0.4
pip install celery==4.4.2
pip install eventlet==0.25.2

django配置celery

在settings中配置

1
2
3
4
5
6
#代理url,异步任务代理
CELERY_BROKER_URL = 'redis://localhost:6379/'
#保存结果
CELERY_RESULT_BACKEND = 'redis://localhost:6379/'
#保存类型格式,使用json
CELERY_RESULT_SERIALIZER = 'json'

在和settings同一级目录下,创建celery.py文件,进行配置文件,用来启动服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mydjango.settings')

# 注册Celery的APP
app = Celery('mydjango')
# 绑定配置文件,声明命名空间,可以在电脑任何位置启动
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现各个app下的tasks.py文件
app.autodiscover_tasks()

在和settings同一级目录下的init.py文件下,当启动服务,加载init.py文件配置

1
2
3
4
5
6
7
8
9
10
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

#导包
import pymysql
#初始化
pymysql.install_as_MySQLdb()

#加载celery应用
__all__ = ['celery_app']

在myapp文件夹下,创建一个tasks.py文件,自定义任务

1
2
3
4
5
6
7
8
9
from celery.task import task


#自定义异步任务
#实用装饰器声明是异步任务
@task
def async_test():
print('异步任务')
return '123123'

触发异步任务队列

1
2
3
4
5
6
7
#调用tasks异步
from myapp import tasks
#触发异步任务视图
def celery_test(request):
#使用delay()异步任务调用
res = tasks.async_test.delay()
return JsonResponse({'task_id':res.task_id})

配置url

1
2
3
4
5
from myapp.views import celery_test
urlpatterns = [
#定义超链接路由
path('celery_test/',celery_test),
]

启动celery服务

1
2
3
1.先启动django服务:python manage.py runserver
2.在manage.py文件的同级目录下,启动celery服务
3.以协程方式启动服务命令:celery worker -A mydjango -l info -P eventlet

selery

实例:做邮件发送使用celery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#task中:
from celery.task import task
from mydjango.settings import MY_MAIL,MY_PASS
from email.mime.text import MIMEText
from email.utils import formataddr
import smtplib

@task
def mail(subject,content,mailaddr):
#声明邮件对象
msg = MIMEText(content,'plain','utf-8')
#设置发送方对象
msg['From'] = formataddr(['在线教育平台',MY_MAIL])
#设置收件方对象
msg['To'] = formataddr(['尊敬的客户',mailaddr])
#设置标题
msg['Subject'] = subject
#设置smtp服务器
server = smtplib.SMTP_SSL('smtp.qq.com',465)
#登录邮箱
server.login(MY_MAIL,MY_PASS)
#发送邮件
server.sendmail(MY_MAIL,[mailaddr],msg.as_string())
#关闭smtp连接,节约系统资源
server.quit()#退出

#在视图中
class Register(APIView):
def post(self,request):
email = request.data.get('email') #获取前端用户邮件
subject = '亲爱的用户您好:,欢迎注册在线教育平台' #邮件标题
yanzhengma = random.randint(1000,9999)
print(yanzhengma) #数字验证码
r.set('email',yanzhengma,120) #存到redis
print(123,r.get('email'))
content = '欢迎注册在线教育平台,您的验证码是%s,请在2分钟内输入,失效请重新获取' % yanzhengma
print(content) #邮件内容
mailaddr = email #给发送邮件
#参数需要存放到delay(里)
send_email = tasks.mail.delay(subject, content, mailaddr) #异步发送邮件调用邮件
#使用celery需要使用JsonResponse返回
return JsonResponse({'code':200,'yzm':yanzhengma,'send_email':send_email.task_id})

评论





载入天数...载入时分秒...

Blog content follows the Attribution-NonCommercial-ShareAlike 4.0 International (CC BY-NC-SA 4.0) License

Use WZH as theme, total visits times