celery+Rabbit MQ简单的Demo

2023-07-11,,

介绍

一个简单celery + rabbitmq 的搭建例子,用于记录

Celery

异步处理框架, 安装命令

pip install celery

RabbitMQ

消息中间件,用来做队列

安装配置参考 https://juejin.cn/post/6993229378729541646

项目目录结构

先写celery_app文件夹里面的内容

celery_app/init.py

引入配置文件,并指定用什么做队列

from celery import Celery

# app = Celery("celery_app", backend='amqp')
app = Celery("celery_app", backend='amqp')
app.config_from_object("celery_app.celeryconfig")

celery_app/celeryconfig.py

celery的配置文件

RABBIT_MQ = {
'HOST': '192.168.254.128',
'PORT': 5672,
'USER': 'test',
'PASSWORD': '123456'
} BROKER_URL = 'amqp://%s:%s@%s:%s/myvhost' % (RABBIT_MQ['USER'], RABBIT_MQ['PASSWORD'], RABBIT_MQ['HOST'], RABBIT_MQ['PORT']) CELERYD_LOG_FORMAT = '[%(asctime)s] [%(levelname)s] %(message)s' CELERY_TIMEZONE = 'Asia/Shanghai' # 时区
CELERY_IMPORTS = ( # 指定需要导入的任务模块
'celery_app.task1', # task1和task2是模块名称
'celery_app.task2',
) # 指定队列
CELERY_ROUTES={
"add":{"queue":"add",},
"multiply":{"queue":"mul"},
} # CELERY_ROUTES = {
# 'add': {'queue': 'add', 'routing_key': 'key1'},
# 'multiply': {'queue': 'mul', 'routing_key': 'key2'},
# }

引入两个异步任务

celery_app/task1.py

import time
from celery_app import app @app.task(queue="add") # 指定队列
def add(x,y):
time.sleep(2)
return x + y

celery_app/task2.py

import time
from celery_app import app @app.task(queue="multiply")
def multiply(x, y):
time.sleep(2)
return x * y

最后,最外层的测试,将任务push进队列中去执行异步的任务

api.py

from celery_app.task1 import add
from celery_app.task2 import multiply print("start...") result = add.delay(1,2)
# result2 = multiply.delay(1,2)
print('异步。。',result, result2)
# print('异步。。',result) print("end...")

启动

按照文章上面的步骤,安装好 rabbitmq 之后,启动 rabbitmq

service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务

然后启动 celery

celery -A celery_app worker --loglevel=info -Q add # celery_app 是celery指定的app名字,-l 是输出日志, -q 是指定队列

测试运行celery + rabbitmq

完。

celery+Rabbit MQ简单的Demo的相关教程结束。

《celery+Rabbit MQ简单的Demo.doc》

下载本文的Word格式文档,以方便收藏与打印。