db¶
Here's the reference for all database session utilities, transaction helpers, and locking functions.
You can import them directly from fastapi_toolsets.db:
from fastapi_toolsets.db import (
LockMode,
create_db_dependency,
create_db_context,
get_transaction,
lock_tables,
wait_for_row_change,
)
fastapi_toolsets.db.LockMode
¶
fastapi_toolsets.db.create_db_dependency(session_maker)
¶
Create a FastAPI dependency for database sessions.
Creates a dependency function that yields a session and auto-commits if a transaction is active when the request completes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_maker
|
async_sessionmaker[AsyncSession]
|
Async session factory from create_session_factory() |
required |
Returns:
| Type | Description |
|---|---|
Callable[[], AsyncGenerator[AsyncSession, None]]
|
An async generator function usable with FastAPI's Depends() |
Example
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from fastapi_toolsets.db import create_db_dependency
engine = create_async_engine("postgresql+asyncpg://...")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
get_db = create_db_dependency(SessionLocal)
@app.get("/users")
async def list_users(session: AsyncSession = Depends(get_db)):
...
fastapi_toolsets.db.create_db_context(session_maker)
¶
Create a context manager for database sessions.
Creates a context manager for use outside of FastAPI request handlers, such as in background tasks, CLI commands, or tests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_maker
|
async_sessionmaker[AsyncSession]
|
Async session factory from create_session_factory() |
required |
Returns:
| Type | Description |
|---|---|
Callable[[], AbstractAsyncContextManager[AsyncSession]]
|
An async context manager function |
Example
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from fastapi_toolsets.db import create_db_context
engine = create_async_engine("postgresql+asyncpg://...")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
get_db_context = create_db_context(SessionLocal)
async def background_task():
async with get_db_context() as session:
user = await UserCrud.get(session, [User.id == 1])
...
fastapi_toolsets.db.get_transaction(session)
async
¶
Get a transaction context, handling nested transactions.
If already in a transaction, creates a savepoint (nested transaction). Otherwise, starts a new transaction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
AsyncSession instance |
required |
Yields:
| Type | Description |
|---|---|
AsyncGenerator[AsyncSession, None]
|
The session within the transaction context |
fastapi_toolsets.db.lock_tables(session, tables, *, mode=LockMode.SHARE_UPDATE_EXCLUSIVE, timeout='5s')
async
¶
Lock PostgreSQL tables for the duration of a transaction.
Acquires table-level locks that are held until the transaction ends. Useful for preventing concurrent modifications during critical operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
AsyncSession instance |
required |
tables
|
list[type[DeclarativeBase]]
|
List of SQLAlchemy model classes to lock |
required |
mode
|
LockMode
|
Lock mode (default: SHARE UPDATE EXCLUSIVE) |
SHARE_UPDATE_EXCLUSIVE
|
timeout
|
str
|
Lock timeout (default: "5s") |
'5s'
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[AsyncSession, None]
|
The session with locked tables |
Raises:
| Type | Description |
|---|---|
SQLAlchemyError
|
If lock cannot be acquired within timeout |
Example
from fastapi_toolsets.db import lock_tables, LockMode
async with lock_tables(session, [User, Account]):
# Tables are locked with SHARE UPDATE EXCLUSIVE mode
user = await UserCrud.get(session, [User.id == 1])
user.balance += 100
# With custom lock mode
async with lock_tables(session, [Order], mode=LockMode.EXCLUSIVE):
# Exclusive lock - no other transactions can access
await process_order(session, order_id)
fastapi_toolsets.db.wait_for_row_change(session, model, pk_value, *, columns=None, interval=0.5, timeout=None)
async
¶
Poll a database row until a change is detected.
Queries the row every interval seconds and returns the model instance
once a change is detected in any column (or only the specified columns).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
AsyncSession instance |
required |
model
|
type[_M]
|
SQLAlchemy model class |
required |
pk_value
|
Any
|
Primary key value of the row to watch |
required |
columns
|
list[str] | None
|
Optional list of column names to watch. If None, all columns are watched. |
None
|
interval
|
float
|
Polling interval in seconds (default: 0.5) |
0.5
|
timeout
|
float | None
|
Maximum time to wait in seconds. None means wait forever. |
None
|
Returns:
| Type | Description |
|---|---|
_M
|
The refreshed model instance with updated values |
Raises:
| Type | Description |
|---|---|
LookupError
|
If the row does not exist or is deleted during polling |
TimeoutError
|
If timeout expires before a change is detected |
Example
from fastapi_toolsets.db import wait_for_row_change
# Wait for any column to change
updated = await wait_for_row_change(session, User, user_id)
# Watch specific columns with a timeout
updated = await wait_for_row_change(
session, User, user_id,
columns=["status", "email"],
interval=1.0,
timeout=30.0,
)