Skip to content

DB

SQLAlchemy async session management with transactions, table locking, and row-change polling.

Info

This module has been coded and tested to be compatible with PostgreSQL only.

Overview

The db module provides helpers to create FastAPI dependencies and context managers for AsyncSession, along with utilities for nested transactions, table lock and polling for row changes.

Session dependency

Use create_db_dependency to create a FastAPI dependency that yields a session and auto-commits on success:

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from fastapi_toolsets.db import create_db_dependency

engine = create_async_engine(url="postgresql+asyncpg://...", future=True)
session_maker = async_sessionmaker(bind=engine, expire_on_commit=False)

get_db = create_db_dependency(session_maker=session_maker)

@router.get("/users")
async def list_users(session: AsyncSession = Depends(get_db)):
    ...

Session context manager

Use create_db_context for sessions outside request handlers (e.g. background tasks, CLI commands):

from fastapi_toolsets.db import create_db_context

db_context = create_db_context(session_maker=session_maker)

async def seed():
    async with db_context() as session:
        ...

Nested transactions

get_transaction handles savepoints automatically, allowing safe nesting:

from fastapi_toolsets.db import get_transaction

async def create_user_with_role(session=session):
    async with get_transaction(session=session):
        ...
        async with get_transaction(session=session):  # uses savepoint
            ...

Table locking

lock_tables acquires PostgreSQL table-level locks before executing critical sections:

from fastapi_toolsets.db import lock_tables

async with lock_tables(session=session, tables=[User], mode="EXCLUSIVE"):
    # No other transaction can modify User until this block exits
    ...

Available lock modes are defined in LockMode: ACCESS_SHARE, ROW_SHARE, ROW_EXCLUSIVE, SHARE_UPDATE_EXCLUSIVE, SHARE, SHARE_ROW_EXCLUSIVE, EXCLUSIVE, ACCESS_EXCLUSIVE.

Row-change polling

wait_for_row_change polls a row until a specific column changes value, useful for waiting on async side effects:

from fastapi_toolsets.db import wait_for_row_change

# Wait up to 30s for order.status to change
await wait_for_row_change(
    session=session,
    model=Order,
    pk_value=order_id,
    columns=[Order.status],
    interval=1.0,
    timeout=30.0,
)

Creating a database

Added in v2.1

create_database creates a database at a given URL. It connects to server_url and issues a CREATE DATABASE statement:

from fastapi_toolsets.db import create_database

SERVER_URL = "postgresql+asyncpg://postgres:postgres@localhost/postgres"

await create_database(db_name="myapp_test", server_url=SERVER_URL)

For test isolation with automatic cleanup, use create_worker_database from the pytest module instead — it handles drop-before, create, and drop-after automatically.

Cleaning up tables

Added in v2.1

cleanup_tables truncates all tables:

from fastapi_toolsets.db import cleanup_tables

@pytest.fixture(autouse=True)
async def clean(db_session):
    yield
    await cleanup_tables(session=db_session, base=Base)

Many-to-Many helpers

SQLAlchemy's ORM collection API triggers lazy-loads when you append to a relationship inside a savepoint (e.g. inside lock_tables or a nested get_transaction). The three m2m_* helpers bypass the ORM collection entirely and issue direct SQL against the association table.

m2m_add — insert associations

m2m_add inserts one or more rows into a secondary table without touching the ORM collection:

from fastapi_toolsets.db import lock_tables, m2m_add

async with lock_tables(session, [Tag]):
    tag = await TagCrud.create(session, TagCreate(name="python"))
    await m2m_add(session, post, Post.tags, tag)

Pass ignore_conflicts=True to silently skip associations that already exist:

await m2m_add(session, post, Post.tags, tag, ignore_conflicts=True)

m2m_remove — delete associations

m2m_remove deletes specific association rows. Removing a non-existent association is a no-op:

from fastapi_toolsets.db import get_transaction, m2m_remove

async with get_transaction(session):
    await m2m_remove(session, post, Post.tags, tag1, tag2)

m2m_set — replace the full set

m2m_set atomically replaces all associations: it deletes every existing row for the owner instance then inserts the new set. Passing no related instances clears the association entirely:

from fastapi_toolsets.db import get_transaction, m2m_set

# Replace all tags
async with get_transaction(session):
    await m2m_set(session, post, Post.tags, tag_a, tag_b)

# Clear all tags
async with get_transaction(session):
    await m2m_set(session, post, Post.tags)

All three helpers raise TypeError if the relationship attribute is not a Many-to-Many (i.e. has no secondary table).


API Reference