db¶
Here's the reference for all database session utilities, transaction helpers, and locking functions.
You can import them directly from fastapi_toolsets.db:
from fastapi_toolsets.db import (
LockMode,
cleanup_tables,
create_database,
create_db_dependency,
create_db_context,
get_transaction,
lock_tables,
m2m_add,
m2m_remove,
m2m_set,
wait_for_row_change,
)
fastapi_toolsets.db.LockMode
¶
fastapi_toolsets.db.create_db_dependency(session_maker)
¶
Create a FastAPI dependency for database sessions.
Creates a dependency function that yields a session and auto-commits if a transaction is active when the request completes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_maker
|
async_sessionmaker[_SessionT]
|
Async session factory from create_session_factory() |
required |
Returns:
| Type | Description |
|---|---|
Callable[[], AsyncGenerator[_SessionT, None]]
|
An async generator function usable with FastAPI's Depends() |
Example
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from fastapi_toolsets.db import create_db_dependency
engine = create_async_engine("postgresql+asyncpg://...")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
get_db = create_db_dependency(SessionLocal)
@app.get("/users")
async def list_users(session: AsyncSession = Depends(get_db)):
...
fastapi_toolsets.db.create_db_context(session_maker)
¶
Create a context manager for database sessions.
Creates a context manager for use outside of FastAPI request handlers, such as in background tasks, CLI commands, or tests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_maker
|
async_sessionmaker[_SessionT]
|
Async session factory from create_session_factory() |
required |
Returns:
| Type | Description |
|---|---|
Callable[[], AbstractAsyncContextManager[_SessionT]]
|
An async context manager function |
Example
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from fastapi_toolsets.db import create_db_context
engine = create_async_engine("postgresql+asyncpg://...")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
get_db_context = create_db_context(SessionLocal)
async def background_task():
async with get_db_context() as session:
user = await UserCrud.get(session, [User.id == 1])
...
fastapi_toolsets.db.get_transaction(session)
async
¶
Get a transaction context, handling nested transactions.
If already in a transaction, creates a savepoint (nested transaction). Otherwise, starts a new transaction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
AsyncSession instance |
required |
Yields:
| Type | Description |
|---|---|
AsyncGenerator[AsyncSession, None]
|
The session within the transaction context |
fastapi_toolsets.db.lock_tables(session, tables, *, mode=LockMode.SHARE_UPDATE_EXCLUSIVE, timeout='5s')
async
¶
Lock PostgreSQL tables for the duration of a transaction.
Acquires table-level locks that are held until the transaction ends. Useful for preventing concurrent modifications during critical operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
AsyncSession instance |
required |
tables
|
list[type[DeclarativeBase]]
|
List of SQLAlchemy model classes to lock |
required |
mode
|
LockMode
|
Lock mode (default: SHARE UPDATE EXCLUSIVE) |
SHARE_UPDATE_EXCLUSIVE
|
timeout
|
str
|
Lock timeout (default: "5s") |
'5s'
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[AsyncSession, None]
|
The session with locked tables |
Raises:
| Type | Description |
|---|---|
SQLAlchemyError
|
If lock cannot be acquired within timeout |
Example
from fastapi_toolsets.db import lock_tables, LockMode
async with lock_tables(session, [User, Account]):
# Tables are locked with SHARE UPDATE EXCLUSIVE mode
user = await UserCrud.get(session, [User.id == 1])
user.balance += 100
# With custom lock mode
async with lock_tables(session, [Order], mode=LockMode.EXCLUSIVE):
# Exclusive lock - no other transactions can access
await process_order(session, order_id)
fastapi_toolsets.db.wait_for_row_change(session, model, pk_value, *, columns=None, interval=0.5, timeout=None)
async
¶
Poll a database row until a change is detected.
Queries the row every interval seconds and returns the model instance
once a change is detected in any column (or only the specified columns).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
AsyncSession instance |
required |
model
|
type[_M]
|
SQLAlchemy model class |
required |
pk_value
|
Any
|
Primary key value of the row to watch |
required |
columns
|
list[str] | None
|
Optional list of column names to watch. If None, all columns are watched. |
None
|
interval
|
float
|
Polling interval in seconds (default: 0.5) |
0.5
|
timeout
|
float | None
|
Maximum time to wait in seconds. None means wait forever. |
None
|
Returns:
| Type | Description |
|---|---|
_M
|
The refreshed model instance with updated values |
Raises:
| Type | Description |
|---|---|
NotFoundError
|
If the row does not exist or is deleted during polling |
TimeoutError
|
If timeout expires before a change is detected |
Example
from fastapi_toolsets.db import wait_for_row_change
# Wait for any column to change
updated = await wait_for_row_change(session, User, user_id)
# Watch specific columns with a timeout
updated = await wait_for_row_change(
session, User, user_id,
columns=["status", "email"],
interval=1.0,
timeout=30.0,
)
fastapi_toolsets.db.create_database(db_name, *, server_url)
async
¶
Create a database.
Connects to server_url using AUTOCOMMIT isolation and issues a
CREATE DATABASE statement for db_name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db_name
|
str
|
Name of the database to create. |
required |
server_url
|
str
|
URL used for server-level DDL (must point to an existing database on the same server). |
required |
fastapi_toolsets.db.cleanup_tables(session, base)
async
¶
Truncate all tables for fast between-test cleanup.
Executes a single TRUNCATE … RESTART IDENTITY CASCADE statement
across every table in base's metadata, which is significantly faster
than dropping and re-creating tables between tests.
This is a no-op when the metadata contains no tables.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
An active async database session. |
required |
base
|
type[DeclarativeBase]
|
SQLAlchemy DeclarativeBase class containing model metadata. |
required |
fastapi_toolsets.db.m2m_add(session, instance, rel_attr, *related, ignore_conflicts=False)
async
¶
Insert rows into a Many-to-Many association table without loading the ORM collection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
DB async session. |
required |
instance
|
DeclarativeBase
|
The "owner" side model instance (e.g. the |
required |
rel_attr
|
QueryableAttribute
|
The M2M relationship attribute on the model class (e.g. |
required |
*related
|
DeclarativeBase
|
One or more related instances to associate with |
()
|
ignore_conflicts
|
bool
|
When |
False
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
fastapi_toolsets.db.m2m_remove(session, instance, rel_attr, *related)
async
¶
Remove rows from a Many-to-Many association table without loading the ORM collection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
DB async session. |
required |
instance
|
DeclarativeBase
|
The "owner" side model instance (e.g. the |
required |
rel_attr
|
QueryableAttribute
|
The M2M relationship attribute on the model class (e.g. |
required |
*related
|
DeclarativeBase
|
One or more related instances to disassociate from |
()
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
fastapi_toolsets.db.m2m_set(session, instance, rel_attr, *related)
async
¶
Replace the entire Many-to-Many association set atomically.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
DB async session. |
required |
instance
|
DeclarativeBase
|
The "owner" side model instance (e.g. the |
required |
rel_attr
|
QueryableAttribute
|
The M2M relationship attribute on the model class (e.g. |
required |
*related
|
DeclarativeBase
|
The new complete set of related instances. |
()
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If |