您现在的位置是:亿华云 > IT科技
如何快速创建一个拥有异步任务队列集群的 Rest Api
亿华云2025-10-04 02:43:11【IT科技】0人已围观
简介异步任务是 Web 后端开发中最常见的需求,非常适合多任务、高并发的场景。本文分享如何使用 docker-compose、FastAPI、rq 来快速创建一个包含异步任务队列集群的 REST API,
异步任务是何快 Web 后端开发中最常见的需求,非常适合多任务、速创高并发的建个集群场景。本文分享如何使用 docker-compose、拥有异步FastAPI、任务rq 来快速创建一个包含异步任务队列集群的队列的 REST API,后端执行任务的何快节点可以随意扩展。
系统的速创架构图:
上图中的每一个方框都可以理解为一个服务器。
用户请求 api,建个集群 api 将任务放入 redis 队列,拥有异步worker 自动去 redis 队列取出任务并执行,任务worker 节点可以任意水平扩展。队列的
接下来,何快我们来实现这一架构的速创 demo,你可以看到 docker 的建个集群强大和方便之处。
1、先创建一个虚拟环境,安装依赖
依赖 fastapi,redis,rq 库,安装后生成一个 requirements.txt 文件
mkdir myproject python3 -m venv env source env/bin/activate pip install rq pip install fastapi pip install redis pip freeze > requirements.txt2、编码实现 REST API、Worker
REST 是云南idc服务商一种风格,这里不是重点,我们使用 FastAPI 来快速创建一个接口,新建一个 api.py 的文件,内容如下:
from fastapi import FastAPI from redis import Redis from rq import Queue from worker import send_captcha app = FastAPI() # 需要注意,这里的 host 是主机名,在 docker 中就是服务名,后面的 docker-compose.ymal 中的服务名称也要是这个 redis_conn = Redis(host=myproj_redis, port=6379, db=0) # 定义一个队列,名称是 my_queue q = Queue(my_queue, connection=redis_conn) @app.get(/hello) def hello(): """Test endpoint""" return { hello: world} # Rest API 示例 @app.post(/send_captcha/{ phone_number}, status_code=201) def addTask(phone_number: str): """ Adds tasks to worker queue. Expects body as dictionary matching the Group class. """ job = q.enqueue(send_captcha, phone_number) return { job: "tasks add done."}这里的 send_captcha 函数就是一个异步任务,从 worker.py 中导入,worker.py 的内容如下:
import time def send_captcha(phone_number): """ 模拟一个耗时的异步任务 """ print(f{ time.strftime("%T")} 准备发送手机验证码) # in place of actual logging print(f{ time.strftime("%T")} 生成随机验证码并存入 redis,设置 5 分钟过期时间) time.sleep(5) # simulate long running task print(f{ time.strftime("%T")} { phone_number}发送完成) return { phone_number: task complete}return { phone_number: task complete}
3、构建 Dokcer 镜像
现在的目标是实现一个拥有两个执行节点的集群。我们需要启动 4 个容器来完成一个集群部署:
容器1:运行 FastAPI app 容器2:运行 Redis 服务 容器3:运行 worker 1 服务 容器4:运行 worker 2 服务其中容器 1、3、4 都是 Python 应用,亿华云计算可以共用一个 Python 镜像。
为了方便调试,我们可以让 1、3、4 容器共享我们的本地路径,这样改了代码就不需要重新构建镜像,比较方便。
创建一个包含依赖的 Python 镜像
现在我们来创建一个包含前文 requirements.txt 依赖的 Python 镜像,编写 Dockerfile,内容如下:
FROM python:3.8-alpine RUN adduser -D myproj WORKDIR /home/myproj COPY requirements.txt requirements.txt RUN pip install -r requirements.txt RUN chown -R myproj:myproj ./ USER myproj CMD uvicorn api:app --host 0.0.0.0 --port 5057内容说明:
FROM python:3.8-alpine
指定使用 python:3.8-alpine,这个容器已经预装了 Python3.8,可以在命令行执行 docker search python 看看有哪些 Python 镜像。
RUN adduser -D myproj
添加一个用户 myproj,这一步的主要目的是为了生成目录 /home/myproj
WORKDIR /home/myproj
设置程序的执行路径为 /home/myproj
COPY requirements.txt requirements.txt
复制当前路径下的 requirements.txt 到容器的 /home/myproj,这里没有复制 .py 文件是因为后面我们启动容器的时候会共享本地路径,不需要再复制了,生产部署时最好复制到窗口内部,这样容器就不会依赖本机。
RUN pip install -r requirements.txt
在容器中安装依赖
RUN chown -R myproj:myproj ./
将 /home/myproj 路径下的文件的拥有者和所属组改为 myproj,这一步为了使用 myproj 用户来启动 fastapi 服务,源码下载生产环境通常用 root 用户启动,也就不需要这个指令了。
USER myproj
切换到 myproj 用户
CMD uvicorn api:app --host 0.0.0.0 --port 5057
容器启动后执行的命令,服务端口为 5057
更多的 Dockerfile 语法请参考官方文档,这里仅是简要说明。
现在 Dockerfile 所在的目录执行下面的命令构建一个镜像:
docker build -t myproject:latest .创建完成后,可以使用 docker images 来查看:
❯ docker images | grep myproj myproject4、启动集群
这里使用 Docker Compose 来启动 4 个容器,为什么用 Docker Compose 呢?因为方便,如果不用的话,需要手动一个容器一个容器启动。
Docker Compose 会读取一个 yaml 格式的配置文件,依据配置文件来启动容器,各容器共享同一网络。还记得 api.py 中使用的 Redis 主机名吗,这里就需要将 redis 服务名设置为那个主机名。
编写一个 docker-compose.yml 内容如下:
version: 3 services: myproj_redis: image: redis:4.0-alpine ports: - "6379:6379" volumes: - ./redis:/data myproj_api: image: myproject:latest command: uvicorn api:app --host 0.0.0.0 --port 5057 ports: - "5057:5057" volumes: - ./:/home/myproj myproj_worker1: image: myproject:latest command: rq worker --url redis://myproj_redis:6379 my_queue volumes: - ./:/home/myproj myproj_worker2: image: myproject:latest command: rq worker --url redis://myproj_redis:6379 my_queue volumes: - ./:/home/myproj第一个容器是 myproj_redis,运行着 redis 服务, redis 的数据通过 volumes 方式保存在本地,因此需要在本地创建一个 redis 目录,来映射容器内部的 /data 目录。
第二个容器就是 fastapi 服务,端口 5057,使用本地路径映射为 /home/myproj
第三个容器和第四个容器是 worker 节点,虽然也映射了本地路径,但它仅使用 worker.py 文件。当任务太多时,worker 节点可以扩展,解决负载压力,
最终的目录是这样:
执行 docker compose 命令启动 4 个容器:
docker compose -f docker-compose.yml up可以看到 4 个服务均启动并正常打印了日志输出。
4、测试
现在来测试一下,左边的窗口,我使用 Python 快速发送了 3 个 post 请求:
import subprocess for i in range(3): subprocess.run("curl -v -X POST http://localhost:5057/send_captcha/18012345678",shell = True)从右边窗口的日志输出可以看出 worker1 和 worker2 都执行了任务,其中 worker1 执行了 2 个,worker2 执行了 1 个。
查看完整代码请点击「阅读原文」
最后的话
本文分享了如何使用 Dockerfile 构建一个镜像,使用 Docker Compose 管理一个容器集群,以此为基础实现了一个具有异步任务队列集群的 REST API,抛砖引玉,关于 Dockerfile、docker-compose 的详细用法,还请参考 Docker 官方文档
很赞哦!(87121)
上一篇: 以上的就是为大家介绍的关于域名的详解
相关文章
- 4、待所有域名查询结束后可在右侧点击导出结果,即可以excel的文件方式将查询到的结果导出。
- C/C++ 单元自动化测试解决方案实践
- 如何在 Flutter 中构建增强现实应用
- 面试突击:为什么单例一定要加 Volatile?
- 3、不明先知,根据相关征兆预测可能发生的事件,以便提前做好准备,赶紧注册相关域名。;不差钱域名;buchaqian抢先注册,就是这种敏感类型。预言是最敏感的状态。其次,你应该有眼力。所谓眼力,就是善于从社会上时不时出现的各种热点事件中获取与事件相关的域名资源。眼力的前提是对域名领域的熟悉和丰富的知识。
- 真厉害!1 秒写入 10 万条消息,Kafka 写得这么快,都是因为这些优化!
- 软件研发的十大浪费:研发效能的另一面
- 哪个版本的JVM最快?
- ④注册门槛低
- 超快微服务:当Microstream遇上Wildfly