django中使用celery,模拟商品秒杀。

2023-05-13,,

Celery是Python开发的简单、灵活可靠的、处理大量消息的分布式任务调度模块

安装:

  pip install celery  # 安装celery库

  pip install redis  # celery依赖于redis

  pip install eventlet  # Windows下需要安装

项目结构:

  

主项目目录下celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery # django_test是我的项目名 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_test.settings') # 设置django环境 app = Celery('django_test') app.config_from_object('django.conf:settings', namespace='CELERY') # 使用CELERY_ 作为前缀,在settings中写配置 app.autodiscover_tasks()

在主项目下__init__.py添加下面代码

# celery
from .celery import app as celery_app
__all__ = ['celery_app']

在settings.py添加celery配置

# celery 配置
redis_host = "redis://:" + redispwd + "@" + redishost + ":" + redisport + "/3"
CELERY_BROKER_URL = redis_host
redis_host = "redis://:" + redispwd + "@" + redishost + ":" + redisport + "/4"
CELERY_RESULT_BACKEND = redis_host
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 24 * 60 * 60
# CELERY_MAX_TASKS_PER_CHILD = 10
CELERY_TIMEZONE = 'Asia/Shanghai'

在app下面新建tasks.py

from __future__ import absolute_import, unicode_literals

import time

from celery import shared_task
from django.conf import settings @shared_task
def add(x, y):
time.sleep(30)
print("celery结果")
return x + y @shared_task
def mul(x, y):
print('x*y')
return x * y @shared_task
def reduce(number): with open(settings.BASE_DIR + '/data.py', 'r') as f:
tot = int(f.read())
print(tot)
print('celery_t')
if tot == 0:
return 0
else:
data = tot - number
with open(settings.BASE_DIR+'/data.py', 'w') as f:
f.write(str(data))
return 1

我的views.py文件

import datetime
import json
import traceback from django.conf import settings
from django.contrib.auth.hashers import make_password
from django.core.cache import cache
from django.core.paginator import Paginator
from django.http import JsonResponse
from rest_framework.response import Response from rest_framework.decorators import APIView
from . import tasks class Test(APIView): def get(self, request):
li = []
body = request.query_params.dict()
task_id = body.get('task_id')
from celery import result
ar = result.AsyncResult(task_id)
print("--------", ar)
print(ar.ready(), ar.state)
if ar.ready():
print(ar.state, ar.get())
if ar.state == 'SUCCESS':
ret = ar.get() resp = {"success": 1, "data": ret} elif ar.state == "FAILURE" or ar.state == "REVOKED": resp = {"success": 1, "data": 0}
else: resp = {"success": 1, "data": 0} return Response(resp) def post(self, request):
li = []
resp = {"success": 1, "data": li}
# with open(settings.BASE_DIR+'/data.py', 'r') as f:
# total = int(f.read())
# print(total) t = tasks.reduce.delay(1)
print(t.id)
resp = {"success": 1, "data": t.id}
return Response(resp)

post请求创建新的任务,get请求通过任务id查询任务状态。

我的data.py文件写入了一个数字,代表商品所剩数量,每次post请求创建新的任务,则会打开data.py文件判断里面数字是否为0,为0表示商品已被抢完,否则数字减1,表示抢到该商品。

django中使用celery,模拟商品秒杀。的相关教程结束。

《django中使用celery,模拟商品秒杀。.doc》

下载本文的Word格式文档,以方便收藏与打印。