Skip to content

db

Here's the reference for all database session utilities, transaction helpers, and locking functions.

You can import them directly from fastapi_toolsets.db:

from fastapi_toolsets.db import (
    LockMode,
    cleanup_tables,
    create_database,
    create_db_dependency,
    create_db_context,
    get_transaction,
    lock_tables,
    m2m_add,
    m2m_remove,
    m2m_set,
    wait_for_row_change,
)

fastapi_toolsets.db.LockMode

Bases: str, Enum

PostgreSQL table lock modes.

See: https://www.postgresql.org/docs/current/explicit-locking.html

fastapi_toolsets.db.create_db_dependency(session_maker)

Create a FastAPI dependency for database sessions.

Creates a dependency function that yields a session and auto-commits if a transaction is active when the request completes.

Parameters:

Name Type Description Default
session_maker async_sessionmaker[_SessionT]

Async session factory from create_session_factory()

required

Returns:

Type Description
Callable[[], AsyncGenerator[_SessionT, None]]

An async generator function usable with FastAPI's Depends()

Example
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from fastapi_toolsets.db import create_db_dependency

engine = create_async_engine("postgresql+asyncpg://...")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
get_db = create_db_dependency(SessionLocal)

@app.get("/users")
async def list_users(session: AsyncSession = Depends(get_db)):
    ...

fastapi_toolsets.db.create_db_context(session_maker)

Create a context manager for database sessions.

Creates a context manager for use outside of FastAPI request handlers, such as in background tasks, CLI commands, or tests.

Parameters:

Name Type Description Default
session_maker async_sessionmaker[_SessionT]

Async session factory from create_session_factory()

required

Returns:

Type Description
Callable[[], AbstractAsyncContextManager[_SessionT]]

An async context manager function

Example
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from fastapi_toolsets.db import create_db_context

engine = create_async_engine("postgresql+asyncpg://...")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
get_db_context = create_db_context(SessionLocal)

async def background_task():
    async with get_db_context() as session:
        user = await UserCrud.get(session, [User.id == 1])
        ...

fastapi_toolsets.db.get_transaction(session) async

Get a transaction context, handling nested transactions.

If already in a transaction, creates a savepoint (nested transaction). Otherwise, starts a new transaction.

Parameters:

Name Type Description Default
session AsyncSession

AsyncSession instance

required

Yields:

Type Description
AsyncGenerator[AsyncSession, None]

The session within the transaction context

Example
async with get_transaction(session):
    session.add(model)
    # Auto-commits on exit, rolls back on exception

fastapi_toolsets.db.lock_tables(session, tables, *, mode=LockMode.SHARE_UPDATE_EXCLUSIVE, timeout='5s') async

Lock PostgreSQL tables for the duration of a transaction.

Acquires table-level locks that are held until the transaction ends. Useful for preventing concurrent modifications during critical operations.

Parameters:

Name Type Description Default
session AsyncSession

AsyncSession instance

required
tables list[type[DeclarativeBase]]

List of SQLAlchemy model classes to lock

required
mode LockMode

Lock mode (default: SHARE UPDATE EXCLUSIVE)

SHARE_UPDATE_EXCLUSIVE
timeout str

Lock timeout (default: "5s")

'5s'

Yields:

Type Description
AsyncGenerator[AsyncSession, None]

The session with locked tables

Raises:

Type Description
SQLAlchemyError

If lock cannot be acquired within timeout

Example
from fastapi_toolsets.db import lock_tables, LockMode

async with lock_tables(session, [User, Account]):
    # Tables are locked with SHARE UPDATE EXCLUSIVE mode
    user = await UserCrud.get(session, [User.id == 1])
    user.balance += 100

# With custom lock mode
async with lock_tables(session, [Order], mode=LockMode.EXCLUSIVE):
    # Exclusive lock - no other transactions can access
    await process_order(session, order_id)

fastapi_toolsets.db.wait_for_row_change(session, model, pk_value, *, columns=None, interval=0.5, timeout=None) async

Poll a database row until a change is detected.

Queries the row every interval seconds and returns the model instance once a change is detected in any column (or only the specified columns).

Parameters:

Name Type Description Default
session AsyncSession

AsyncSession instance

required
model type[_M]

SQLAlchemy model class

required
pk_value Any

Primary key value of the row to watch

required
columns list[str] | None

Optional list of column names to watch. If None, all columns are watched.

None
interval float

Polling interval in seconds (default: 0.5)

0.5
timeout float | None

Maximum time to wait in seconds. None means wait forever.

None

Returns:

Type Description
_M

The refreshed model instance with updated values

Raises:

Type Description
NotFoundError

If the row does not exist or is deleted during polling

TimeoutError

If timeout expires before a change is detected

Example
from fastapi_toolsets.db import wait_for_row_change

# Wait for any column to change
updated = await wait_for_row_change(session, User, user_id)

# Watch specific columns with a timeout
updated = await wait_for_row_change(
    session, User, user_id,
    columns=["status", "email"],
    interval=1.0,
    timeout=30.0,
)

fastapi_toolsets.db.create_database(db_name, *, server_url) async

Create a database.

Connects to server_url using AUTOCOMMIT isolation and issues a CREATE DATABASE statement for db_name.

Parameters:

Name Type Description Default
db_name str

Name of the database to create.

required
server_url str

URL used for server-level DDL (must point to an existing database on the same server).

required
Example
from fastapi_toolsets.db import create_database

SERVER_URL = "postgresql+asyncpg://postgres:postgres@localhost/postgres"
await create_database("myapp_test", server_url=SERVER_URL)

fastapi_toolsets.db.cleanup_tables(session, base) async

Truncate all tables for fast between-test cleanup.

Executes a single TRUNCATE … RESTART IDENTITY CASCADE statement across every table in base's metadata, which is significantly faster than dropping and re-creating tables between tests.

This is a no-op when the metadata contains no tables.

Parameters:

Name Type Description Default
session AsyncSession

An active async database session.

required
base type[DeclarativeBase]

SQLAlchemy DeclarativeBase class containing model metadata.

required
Example
@pytest.fixture
async def db_session(worker_db_url):
    async with create_db_session(worker_db_url, Base) as session:
        yield session
        await cleanup_tables(session, Base)

fastapi_toolsets.db.m2m_add(session, instance, rel_attr, *related, ignore_conflicts=False) async

Insert rows into a Many-to-Many association table without loading the ORM collection.

Parameters:

Name Type Description Default
session AsyncSession

DB async session.

required
instance DeclarativeBase

The "owner" side model instance (e.g. the A in A.b_list).

required
rel_attr QueryableAttribute

The M2M relationship attribute on the model class (e.g. A.b_list).

required
*related DeclarativeBase

One or more related instances to associate with instance.

()
ignore_conflicts bool

When True, silently skip rows that already exist in the association table (ON CONFLICT DO NOTHING).

False

Raises:

Type Description
TypeError

If rel_attr is not a Many-to-Many relationship.

fastapi_toolsets.db.m2m_remove(session, instance, rel_attr, *related) async

Remove rows from a Many-to-Many association table without loading the ORM collection.

Parameters:

Name Type Description Default
session AsyncSession

DB async session.

required
instance DeclarativeBase

The "owner" side model instance (e.g. the A in A.b_list).

required
rel_attr QueryableAttribute

The M2M relationship attribute on the model class (e.g. A.b_list).

required
*related DeclarativeBase

One or more related instances to disassociate from instance.

()

Raises:

Type Description
TypeError

If rel_attr is not a Many-to-Many relationship.

fastapi_toolsets.db.m2m_set(session, instance, rel_attr, *related) async

Replace the entire Many-to-Many association set atomically.

Parameters:

Name Type Description Default
session AsyncSession

DB async session.

required
instance DeclarativeBase

The "owner" side model instance (e.g. the A in A.b_list).

required
rel_attr QueryableAttribute

The M2M relationship attribute on the model class (e.g. A.b_list).

required
*related DeclarativeBase

The new complete set of related instances.

()

Raises:

Type Description
TypeError

If rel_attr is not a Many-to-Many relationship.