Skip to content

DB

SQLAlchemy async session management with transactions, table locking, and row-change polling.

Info

This module has been coded and tested to be compatible with PostgreSQL only.

Overview

The db module provides helpers to create FastAPI dependencies and context managers for AsyncSession, along with utilities for nested transactions, table lock and polling for row changes.

Session dependency

Use create_db_dependency to create a FastAPI dependency that yields a session and auto-commits on success:

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from fastapi_toolsets.db import create_db_dependency

engine = create_async_engine(url="postgresql+asyncpg://...", future=True)
session_maker = async_sessionmaker(bind=engine, expire_on_commit=False)

get_db = create_db_dependency(session_maker=session_maker)

@router.get("/users")
async def list_users(session: AsyncSession = Depends(get_db)):
    ...

Session context manager

Use create_db_context for sessions outside request handlers (e.g. background tasks, CLI commands):

from fastapi_toolsets.db import create_db_context

db_context = create_db_context(session_maker=session_maker)

async def seed():
    async with db_context() as session:
        ...

Nested transactions

get_transaction handles savepoints automatically, allowing safe nesting:

from fastapi_toolsets.db import get_transaction

async def create_user_with_role(session=session):
    async with get_transaction(session=session):
        ...
        async with get_transaction(session=session):  # uses savepoint
            ...

Table locking

lock_tables acquires PostgreSQL table-level locks before executing critical sections:

from fastapi_toolsets.db import lock_tables

async with lock_tables(session=session, tables=[User], mode="EXCLUSIVE"):
    # No other transaction can modify User until this block exits
    ...

Available lock modes are defined in LockMode: ACCESS_SHARE, ROW_SHARE, ROW_EXCLUSIVE, SHARE_UPDATE_EXCLUSIVE, SHARE, SHARE_ROW_EXCLUSIVE, EXCLUSIVE, ACCESS_EXCLUSIVE.

Row-change polling

wait_for_row_change polls a row until a specific column changes value, useful for waiting on async side effects:

from fastapi_toolsets.db import wait_for_row_change

# Wait up to 30s for order.status to change
await wait_for_row_change(
    session=session,
    model=Order,
    pk_value=order_id,
    columns=[Order.status],
    interval=1.0,
    timeout=30.0,
)

API Reference