记录 Flask 与 Celery 集成实现异步任务执行的基本流程,适用于延时任务、耗时操作(如邮件发送、批量处理等)场景。


一、基础依赖安装

pip install celery

消息队列示例使用 Redis:

pip install redis

Redis 本地默认监听在 localhost:6379


二、Flask 与 Celery 简单整合

项目结构示意:

myapp/
├── app.py
├── celery_worker.py
├── tasks.py

app.py(Flask 主程序)

from flask import Flask, jsonify
from tasks import long_task

app = Flask(__name__)

@app.route('/start-task')
def start_task():
    result = long_task.delay()
    return jsonify({'task_id': result.id})

tasks.py(任务定义)

from celery import Celery

celery_app = Celery('myapp', broker='redis://localhost:6379/0')

@celery_app.task
def long_task():
    import time
    time.sleep(5)
    return "任务完成"

celery_worker.py(启动 Celery)

from tasks import celery_app

# 执行:celery -A celery_worker worker --loglevel=info

说明:

  • broker 设置为 Redis 地址
  • 启动 Celery 时以 worker 方式运行
  • delay() 用于异步调用任务

三、返回结果查询(可选)

可以通过 AsyncResult 查询任务状态:

from celery.result import AsyncResult

@app.route('/task-status/<task_id>')
def get_status(task_id):
    res = AsyncResult(task_id)
    return jsonify({'status': res.status, 'result': res.result})

四、小结

  • Flask 可通过 Celery 集成异步任务处理
  • Redis 是常用的中间件(也可替换为 RabbitMQ 等)
  • 任务使用 .delay() 异步触发,避免阻塞主线程