Skip to content

db

Here's the reference for all database session utilities, transaction helpers, and locking functions.

You can import them directly from fastapi_toolsets.db:

from fastapi_toolsets.db import (
    LockMode,
    create_db_dependency,
    create_db_context,
    get_transaction,
    lock_tables,
    wait_for_row_change,
)

fastapi_toolsets.db.LockMode

Bases: str, Enum

PostgreSQL table lock modes.

See: https://www.postgresql.org/docs/current/explicit-locking.html

fastapi_toolsets.db.create_db_dependency(session_maker)

Create a FastAPI dependency for database sessions.

Creates a dependency function that yields a session and auto-commits if a transaction is active when the request completes.

Parameters:

Name Type Description Default
session_maker async_sessionmaker[AsyncSession]

Async session factory from create_session_factory()

required

Returns:

Type Description
Callable[[], AsyncGenerator[AsyncSession, None]]

An async generator function usable with FastAPI's Depends()

Example
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from fastapi_toolsets.db import create_db_dependency

engine = create_async_engine("postgresql+asyncpg://...")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
get_db = create_db_dependency(SessionLocal)

@app.get("/users")
async def list_users(session: AsyncSession = Depends(get_db)):
    ...

fastapi_toolsets.db.create_db_context(session_maker)

Create a context manager for database sessions.

Creates a context manager for use outside of FastAPI request handlers, such as in background tasks, CLI commands, or tests.

Parameters:

Name Type Description Default
session_maker async_sessionmaker[AsyncSession]

Async session factory from create_session_factory()

required

Returns:

Type Description
Callable[[], AbstractAsyncContextManager[AsyncSession]]

An async context manager function

Example
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from fastapi_toolsets.db import create_db_context

engine = create_async_engine("postgresql+asyncpg://...")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
get_db_context = create_db_context(SessionLocal)

async def background_task():
    async with get_db_context() as session:
        user = await UserCrud.get(session, [User.id == 1])
        ...

fastapi_toolsets.db.get_transaction(session) async

Get a transaction context, handling nested transactions.

If already in a transaction, creates a savepoint (nested transaction). Otherwise, starts a new transaction.

Parameters:

Name Type Description Default
session AsyncSession

AsyncSession instance

required

Yields:

Type Description
AsyncGenerator[AsyncSession, None]

The session within the transaction context

Example
async with get_transaction(session):
    session.add(model)
    # Auto-commits on exit, rolls back on exception

fastapi_toolsets.db.lock_tables(session, tables, *, mode=LockMode.SHARE_UPDATE_EXCLUSIVE, timeout='5s') async

Lock PostgreSQL tables for the duration of a transaction.

Acquires table-level locks that are held until the transaction ends. Useful for preventing concurrent modifications during critical operations.

Parameters:

Name Type Description Default
session AsyncSession

AsyncSession instance

required
tables list[type[DeclarativeBase]]

List of SQLAlchemy model classes to lock

required
mode LockMode

Lock mode (default: SHARE UPDATE EXCLUSIVE)

SHARE_UPDATE_EXCLUSIVE
timeout str

Lock timeout (default: "5s")

'5s'

Yields:

Type Description
AsyncGenerator[AsyncSession, None]

The session with locked tables

Raises:

Type Description
SQLAlchemyError

If lock cannot be acquired within timeout

Example
from fastapi_toolsets.db import lock_tables, LockMode

async with lock_tables(session, [User, Account]):
    # Tables are locked with SHARE UPDATE EXCLUSIVE mode
    user = await UserCrud.get(session, [User.id == 1])
    user.balance += 100

# With custom lock mode
async with lock_tables(session, [Order], mode=LockMode.EXCLUSIVE):
    # Exclusive lock - no other transactions can access
    await process_order(session, order_id)

fastapi_toolsets.db.wait_for_row_change(session, model, pk_value, *, columns=None, interval=0.5, timeout=None) async

Poll a database row until a change is detected.

Queries the row every interval seconds and returns the model instance once a change is detected in any column (or only the specified columns).

Parameters:

Name Type Description Default
session AsyncSession

AsyncSession instance

required
model type[_M]

SQLAlchemy model class

required
pk_value Any

Primary key value of the row to watch

required
columns list[str] | None

Optional list of column names to watch. If None, all columns are watched.

None
interval float

Polling interval in seconds (default: 0.5)

0.5
timeout float | None

Maximum time to wait in seconds. None means wait forever.

None

Returns:

Type Description
_M

The refreshed model instance with updated values

Raises:

Type Description
LookupError

If the row does not exist or is deleted during polling

TimeoutError

If timeout expires before a change is detected

Example
from fastapi_toolsets.db import wait_for_row_change

# Wait for any column to change
updated = await wait_for_row_change(session, User, user_id)

# Watch specific columns with a timeout
updated = await wait_for_row_change(
    session, User, user_id,
    columns=["status", "email"],
    interval=1.0,
    timeout=30.0,
)