Celery是Python开发的简单、灵活可靠的、处理大量消息的分布式任务调度模块
安装:
pip install celery # 安装celery库
pip install redis # celery依赖于redis
pip install eventlet # Windows下需要安装
项目结构:
主项目目录下celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery # django_test是我的项目名 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_test.settings') # 设置django环境 app = Celery('django_test') app.config_from_object('django.conf:settings', namespace='CELERY') # 使用CELERY_ 作为前缀,在settings中写配置 app.autodiscover_tasks()
在主项目下__init__.py添加下面代码
# celery
from .celery import app as celery_app
__all__ = ['celery_app']
在settings.py添加celery配置
# celery 配置
redis_host = "redis://:" + redispwd + "@" + redishost + ":" + redisport + "/3"
CELERY_BROKER_URL = redis_host
redis_host = "redis://:" + redispwd + "@" + redishost + ":" + redisport + "/4"
CELERY_RESULT_BACKEND = redis_host
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 24 * 60 * 60
# CELERY_MAX_TASKS_PER_CHILD = 10
CELERY_TIMEZONE = 'Asia/Shanghai'
在app下面新建tasks.py
from __future__ import absolute_import, unicode_literals import time from celery import shared_task
from django.conf import settings @shared_task
def add(x, y):
time.sleep(30)
print("celery结果")
return x + y @shared_task
def mul(x, y):
print('x*y')
return x * y @shared_task
def reduce(number): with open(settings.BASE_DIR + '/data.py', 'r') as f:
tot = int(f.read())
print(tot)
print('celery_t')
if tot == 0:
return 0
else:
data = tot - number
with open(settings.BASE_DIR+'/data.py', 'w') as f:
f.write(str(data))
return 1
我的views.py文件
import datetime
import json
import traceback from django.conf import settings
from django.contrib.auth.hashers import make_password
from django.core.cache import cache
from django.core.paginator import Paginator
from django.http import JsonResponse
from rest_framework.response import Response from rest_framework.decorators import APIView
from . import tasks class Test(APIView): def get(self, request):
li = []
body = request.query_params.dict()
task_id = body.get('task_id')
from celery import result
ar = result.AsyncResult(task_id)
print("--------", ar)
print(ar.ready(), ar.state)
if ar.ready():
print(ar.state, ar.get())
if ar.state == 'SUCCESS':
ret = ar.get() resp = {"success": 1, "data": ret} elif ar.state == "FAILURE" or ar.state == "REVOKED": resp = {"success": 1, "data": 0}
else: resp = {"success": 1, "data": 0} return Response(resp) def post(self, request):
li = []
resp = {"success": 1, "data": li}
# with open(settings.BASE_DIR+'/data.py', 'r') as f:
# total = int(f.read())
# print(total) t = tasks.reduce.delay(1)
print(t.id)
resp = {"success": 1, "data": t.id}
return Response(resp)
post请求创建新的任务,get请求通过任务id查询任务状态。
我的data.py文件写入了一个数字,代表商品所剩数量,每次post请求创建新的任务,则会打开data.py文件判断里面数字是否为0,为0表示商品已被抢完,否则数字减1,表示抢到该商品。