sqlalchemy 2.x 如何写异步 session + 事务上下文管理器

SQLAlchemy 2.x异步需用AsyncSession和async with管理事务:1.用create_async_engine配asyncpg等驱动;2.显式async with session.begin()开启事务;3.支持begin_nested和手动commit/rollback;4.所有操作需await,禁用laz

y loading。

SQLAlchemy 2.x 原生支持异步,需用 AsyncSession 配合 async with 实现事务上下文管理,不能直接复用同步的 sessionmaker

1. 创建异步引擎和 session 工厂

必须使用 create_async_engine,并指定异步驱动(如 postgresql+asyncpgmysql+aiomysql):

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

# 异步引擎(注意 driver!)
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    echo=True,
    pool_pre_ping=True,
)

# 异步 session 工厂(不是 session 实例!)
AsyncSessionLocal = async_sessionmaker(
    bind=engine,
    expire_on_commit=False,  # 异步下推荐设为 False,避免 await 后对象过期
)

2. 使用 async with 管理事务生命周期

AsyncSession 本身不自动开启事务,需显式用 async with session.begin() 进入事务上下文。成功则自动 commit,异常则 rollback:

from sqlalchemy.exc import SQLAlchemyError

async def create_user(name: str, email: str):
    async with AsyncSessionLocal() as session:
        try:
            async with session.begin():  # ✅ 关键:显式开启事务
                user = User(name=name, email=email)
                session.add(user)
                # 可继续 add / delete / execute 等操作
                await session.flush()  # 可选:获取插入后的主键(如 user.id)
                return user.id
        except SQLAlchemyError:
            # session 已自动 rollback,无需手动调用
            raise

3. 手动控制 commit/rollback(进阶场景)

若需在事务中分阶段提交(如保存部分状态),可用 session.commit()session.rollback(),但必须确保最终退出时事务已结束:

  • async with session.begin_nested() 支持 savepoint(嵌套事务)
  • 显式调用 await session.commit() 后,该事务段即结束,后续操作属于新事务
  • 手动 commit/rollback 后,不能再对同一 session 调用 begin() —— 应改用新 session 或重新 begin

4. 注意事项

  • 所有数据库操作(addexecutescalarsscalarall 等)都必须 await
  • 对象延迟加载(lazy loading)在异步 session 中默认禁用,需显式用 selectinloadjoinedload 预加载
  • 不要在异步 session 中混用同步方法(如 session.query() 已废弃)
  • 关闭引擎用 await engine.dispose(),而非 engine.dispose()