Celery 任务:SQLAlchemy 会话处理指南
最近在做 AI RAG
相关的项目功能,对于 RAG
需要生成一些文本处理的异步任务,使用到了 Celery 。今天就写写关于Celery
任务的文章,SQLAlchemy 的真实情况是:
- 它的学习曲线比 Django ORM 更陡峭
- 需要一些示例代码
- 你需要了解一些较低层次的概念
- 有一些难以理解的文档
如果您确实使用 SQLAlchemy
,则必须为 Celery
任务编写一些示例文件,其中包含风险意大利面条代码。
在本文中,我将向您介绍一些基本的 SQLAlchemy 概念,并向您展示如何在 Celery 任务中使用 SQLAlchemy
,而无需求助于第三方包,这
- 帮助您了解事物是如何运作的
- 提供了一个通用的解决方案,即使没有 Web 框架,也可以与
Flask
、FastAPI
或其他任何东西一起使用
SQLAlchemy
Django ORM
世界中的生活非常简单。数据库操作通过模型对象提供:
from celery import Celery
app = Celery(...)
@app.task()
def my_task():
book = Book.objects.get(title="To Kill a Mockingbird")
...
book.save()
在 SQLAlchemy
世界中,情况非常不同。所有数据库操作都是通过会话对象执行的。会话与模型对象严格分开:
from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
app = Celery(...)
engine = create_engine("...")
@app.task()
def my_task():
session = Session(engine)
book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
...
session.add(book)
session.commit()
session.close()
会话建立与数据库的对话,并代表您在其生命周期内加载、创建或操作的所有对象的暂存区域。
会话管理
您可以将 SQLAlchemy
会话视为数据库事务。作为一般规则,会话的生命周期应该与访问和操作数据库数据的函数和对象分开并位于外部。会议应该很短。例如,在传入 Celery 任务请求的上下文中,应在任务代码的开头创建会话并在结束时关闭,而不是无限期地保持打开状态并在任务之间共享。
from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
app = Celery(...)
engine = create_engine("...")
@app.task()
def my_task():
session = Session(engine)
book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
...
session.add(book)
session.commit()
session.close()
或者,使用上下文管理器:
from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
app = Celery(...)
engine = create_engine("...")
@app.task()
def my_task():
with Session(engine) as session:
book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
...
session.add(book)
session.commit()
Celery task
我对上述两个选项的问题是,它在每个任务中涉及大量重复的锅炉代码。如果每个 Celery 任务请求都带有一个现成的会话对象,而不必在开始时创建它并在最后关闭它,那就太好了。像这样的事情:
def my_task(session):
book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
...
session.add(book)
session.commit()
事实证明,在运行时注入会话变量是不可能的。不过可以绑定任务。绑定任务始终将任务实例作为其第一个参数。
@app.task(bind=True)
def my_task(self):
...
默认情况下,self
类型为celery.Task
。celery.Task
定义了可用于 Celery 任务的所有方法,例如apply_async
和retry
。
您的代码和 Celery 任务之间的每次交互以及您的工作线程和 Celery 任务之间的每次交互都是通过这些celery.Task
方法发生的。事实上,当你的worker处理一个任务时,它总是遵循以下顺序:
- 跑步
before_start
- 运行任务
- 跑步
after_return
即使步骤 2 中的任务抛出异常,也after_return
能保证运行。您可以使用它来简化 SQLAlchemy 会话的创建和拆卸:
- 在中创建会话
before_start
- 使会话可用于绑定任务
- 关闭会话
after_return
import celery
from sqlalchemy.orm import Session
class MyTask(celery.Task):
def __init__(self):
self.sessions = {}
def before_start(self, task_id, args, kwargs):
self.sessions[task_id] = Session(...)
super().before_start(task_id, args, kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
session = self.sessions.pop(task_id)
session.close()
super().after_return(status, retval, task_id, args, kwargs, einfo)
@property
def session(self):
return self.sessions[self.request.id]
请注意,每个流程只有一个任务实例,这意味着流程中的每个任务共享相同的任务对象。为了隔离每个任务请求的 SQLAlchemy 会话,我使用字典和唯一的任务请求 ID 作为键。
绑定任务
到目前为止我们有:
MyTask
,自定义celery.Task
实现- 一个任务,绑定
celery.Task
到 Celery 任务
缺少的是绑定MyTask
而不是celery.Task
任务。为此,Celery 提供了以下base
参数:
from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
app = Celery(...)
engine = create_engine("...")
class MyTask(celery.Task):
def __init__(self):
self.sessions = {}
def before_start(self, task_id, args, kwargs):
self.sessions[task_id] = Session(...)
super().before_start(task_id, args, kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
session = self.sessions.pop(task_id)
session.close()
super().after_return(status, retval, task_id, args, kwargs, einfo)
@property
def session(self):
return self.sessions[self.request.id]
@app.task(bind=True, base=MyTask)
def my_task(self):
book = self.session.query(Book).filter_by(title="To Kill a Mockingbird").one()
...
self.session.add(book)
self.session.commit()
这是一个通用解决方案,将 SQLAlchemy 会话处理委托给自定义任务类。它使您的任务代码免受重复的样板代码的影响。你怎么认为?可以在评论区讨论👇。