Skip to content

Database Engine & ORM

ZodiacCore provides a high-performance, async-first database abstraction layer built on top of SQLModel and SQLAlchemy 2.0. It simplifies session management, connection pooling, and standardizes model definitions.

1. Core Concepts

The Database Manager

The DatabaseManager (exposed as the global db instance) is a strict singleton that manages the SQLAlchemy AsyncEngine and async_sessionmaker. It ensures that your process can reuse connection pools for the same named database instead of letting each app/container create its own pool, which is critical for performance and resource management.

The Repository Pattern

We encourage the use of the Repository Pattern via BaseSQLRepository. This decouples your business logic from database-specific code, making your application more maintainable and easier to unit test with mocks.


2. Model Definitions

ZodiacCore provides several mixins and base classes in zodiac_core.db.sql to standardize your database schema.

Standard Base Models

Instead of inheriting from SQLModel directly, we recommend using our pre-configured base models:

Base Model Primary Key Timestamps
IntIDModel id: int (Auto-increment) created_at, updated_at
UUIDModel id: UUID (v4) created_at, updated_at

Example: Using Base Models

from zodiac_core.db.sql import IntIDModel
from sqlmodel import Field

class User(IntIDModel, table=True):
    username: str = Field(unique=True, index=True)
    email: str

Automatic Timestamps

Both IntIDModel and UUIDModel include SQLDateTimeMixin, which provides:

  • created_at: Automatically set on insertion.
  • updated_at: Automatically updated on every save via a SQLAlchemy event listener.

3. Configuration & Lifecycle

You should initialize the database during your application's startup and ensure it shuts down cleanly. Calling db.setup(...) again with the same name is allowed only when the effective configuration is identical; different settings for an existing name raise RuntimeError. Lifecycle control is now name-aware:

  • await db.shutdown(name="...") disposes only the selected named database.
  • await db.shutdown() disposes all registered databases.

This lets multiple apps, containers, or resources share the global manager while still releasing only the resource they own.

FastAPI Integration

We recommend using the lifespan context manager (FastAPI 0.93+). The legacy on_event("startup") / on_event("shutdown") are deprecated.

from contextlib import asynccontextmanager
from fastapi import FastAPI
from zodiac_core.db.session import db

@asynccontextmanager
async def lifespan(app: FastAPI):
    db.setup(
        "postgresql+asyncpg://user:pass@localhost/dbname",
        pool_size=20,
        max_overflow=10,
        echo=False
    )
    await db.create_all()  # Optional: create tables if they don't exist
    yield
    await db.shutdown()

app = FastAPI(lifespan=lifespan)

For a single-app service, await db.shutdown() is still the simplest shutdown path. If you register multiple named databases or share the global db across multiple app lifecycles, prefer await db.shutdown(name="...") for scoped cleanup.


4. Working with Repositories

Inherit from BaseSQLRepository to create your data access layer.

from zodiac_core.db.repository import BaseSQLRepository
from sqlalchemy import select
from .models import User

class UserRepository(BaseSQLRepository):
    async def find_by_username(self, username: str) -> User | None:
        async with self.session() as session:
            stmt = select(User).where(User.username == username)
            result = await session.execute(stmt)
            return result.scalar_one_or_none()

    async def create_user(self, user: User) -> User:
        async with self.session() as session:
            session.add(user)
            await session.commit()
            await session.refresh(user)
            return user

5. Multi-Database Support

ZodiacCore supports multiple database connections simultaneously. This is essential for architectures involving:

  • Read-Write Splitting: Routing writes to a Master and reads to a Replica.
  • Vertical Partitioning: Storing different modules (e.g., Users, Analytics) in separate databases.

Registering Named Databases

You can call db.setup() multiple times with different name arguments.

# Primary Database (Master)
db.setup("postgresql+asyncpg://master_db_url", name="default")

# Read-only Replica
db.setup("postgresql+asyncpg://replica_db_url", name="read_only")

Releasing Named Databases

Named shutdown is the companion to named setup:

# Dispose only the replica pool
await db.shutdown(name="read_only")

# Dispose everything registered in the manager
await db.shutdown()

Use named shutdown when the process keeps other databases alive, such as multi-app hosting, plugin-based services, or multiple DI resources sharing the same global manager.

Binding Repositories to a Database

When creating a repository, specify which database it should use via db_name.

class ReadOnlyUserRepository(BaseSQLRepository):
    def __init__(self):
        # This repo will always use the 'read_only' engine
        super().__init__(db_name="read_only")

    async def get_total_users(self) -> int:
        async with self.session() as session:
            # Executes on replica
            ...

6. API Reference

Session & Lifecycle

zodiac_core.db.session

DEFAULT_DB_NAME = 'default' module-attribute
db = DatabaseManager() module-attribute
DatabaseManager

Manages multiple Async Database Engines and Session Factories. Implemented as a Strict Singleton to coordinate connection pools.

Integration Examples:

  1. Native FastAPI (Lifespan + Depends):

    # main.py
    from contextlib import asynccontextmanager
    from fastapi import FastAPI, Depends
    from sqlalchemy.ext.asyncio import AsyncSession
    from zodiac_core.db.session import db, get_session
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        db.setup("sqlite+aiosqlite:///database.db")
        yield
        await db.shutdown()
    
    app = FastAPI(lifespan=lifespan)
    
    @app.get("/items")
    async def list_items(session: AsyncSession = Depends(get_session)):
        return {"status": "ok"}
    
  2. Dependency Injector (Using provided init_db_resource):

    # containers.py
    from dependency_injector import containers, providers
    from zodiac_core.utils import strtobool
    from zodiac_core.db.session import init_db_resource
    
    class Container(containers.DeclarativeContainer):
        config = providers.Configuration(strict=True)
    
        # Use the pre-built resource helper
        db_manager = providers.Resource(
            init_db_resource,
            database_url=config.db.url,
            echo=config.db.echo.as_(strtobool),
        )
    
Source code in zodiac_core/db/session.py
class DatabaseManager:
    """
    Manages multiple Async Database Engines and Session Factories.
    Implemented as a Strict Singleton to coordinate connection pools.

    Integration Examples:

    1. **Native FastAPI (Lifespan + Depends):**

        ```python
        # main.py
        from contextlib import asynccontextmanager
        from fastapi import FastAPI, Depends
        from sqlalchemy.ext.asyncio import AsyncSession
        from zodiac_core.db.session import db, get_session

        @asynccontextmanager
        async def lifespan(app: FastAPI):
            db.setup("sqlite+aiosqlite:///database.db")
            yield
            await db.shutdown()

        app = FastAPI(lifespan=lifespan)

        @app.get("/items")
        async def list_items(session: AsyncSession = Depends(get_session)):
            return {"status": "ok"}
        ```

    2. **Dependency Injector (Using provided init_db_resource):**

        ```python
        # containers.py
        from dependency_injector import containers, providers
        from zodiac_core.utils import strtobool
        from zodiac_core.db.session import init_db_resource

        class Container(containers.DeclarativeContainer):
            config = providers.Configuration(strict=True)

            # Use the pre-built resource helper
            db_manager = providers.Resource(
                init_db_resource,
                database_url=config.db.url,
                echo=config.db.echo.as_(strtobool),
            )
        ```
    """

    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._engines: Dict[str, AsyncEngine] = {}
            cls._instance._session_factories: Dict[str, async_sessionmaker[AsyncSession]] = {}
            cls._instance._setup_configs: Dict[str, Dict[str, Any]] = {}
        return cls._instance

    def get_engine(self, name: str = DEFAULT_DB_NAME) -> AsyncEngine:
        """Access a specific SQLAlchemy AsyncEngine by name."""
        if name not in self._engines:
            raise RuntimeError(f"Database engine '{name}' is not initialized. Call db.setup(name='{name}') first.")
        return self._engines[name]

    def get_factory(self, name: str = DEFAULT_DB_NAME) -> async_sessionmaker[AsyncSession]:
        """Access a specific AsyncSession factory by name."""
        if name not in self._session_factories:
            raise RuntimeError(f"Session factory for '{name}' is not initialized. Call db.setup(name='{name}') first.")
        return self._session_factories[name]

    @property
    def engine(self) -> AsyncEngine:
        """Access the default SQLAlchemy AsyncEngine."""
        return self.get_engine(DEFAULT_DB_NAME)

    @property
    def session_factory(self) -> async_sessionmaker[AsyncSession]:
        """Access the default AsyncSession factory."""
        return self.get_factory(DEFAULT_DB_NAME)

    def setup(
        self,
        database_url: str,
        name: str = DEFAULT_DB_NAME,
        echo: bool = False,
        pool_size: int = 10,
        max_overflow: int = 20,
        pool_pre_ping: bool = True,
        connect_args: Optional[dict] = None,
        **kwargs,
    ) -> None:
        """Initialize an Async Engine and Session Factory with a specific name."""
        engine_args = {
            "echo": echo,
            "pool_pre_ping": pool_pre_ping,
            "connect_args": connect_args or {},
            **kwargs,
        }

        if "sqlite" not in database_url:
            engine_args["pool_size"] = pool_size
            engine_args["max_overflow"] = max_overflow

        current = {
            "database_url": database_url,
            "engine_args": deepcopy(engine_args),
        }

        if name in self._engines:
            existing = self._setup_configs.get(name)
            if existing == current:
                logger.debug(f"Database '{name}' is already configured with the same settings, skipping.")
                return
            raise RuntimeError(f"Database '{name}' is already configured with different settings")

        engine = create_async_engine(database_url, **engine_args)
        factory = async_sessionmaker(
            bind=engine,
            class_=AsyncSession,
            expire_on_commit=False,
            autoflush=False,
        )

        self._engines[name] = engine
        self._session_factories[name] = factory
        self._setup_configs[name] = current
        logger.info(f"Database '{name}' initialized successfully.")

    async def shutdown(self, name: str | None = None) -> None:
        """
        Dispose database resources.

        Args:
            name: Optional database name. When provided, only that engine/factory
                  is disposed. When omitted, all registered databases are disposed.
        """
        if name is not None:
            engine = self._engines.pop(name, None)
            self._session_factories.pop(name, None)
            self._setup_configs.pop(name, None)
            if engine is not None:
                await engine.dispose()
            return

        for engine in self._engines.values():
            await engine.dispose()
        self._engines.clear()
        self._session_factories.clear()
        self._setup_configs.clear()

    @asynccontextmanager
    async def session(self, name: str = DEFAULT_DB_NAME) -> AsyncGenerator[AsyncSession, None]:
        """
        Context Manager for obtaining a NEW database session from a specific engine.

        Note:
            This context manager does NOT auto-commit. You must explicitly call
            `await session.commit()` to persist changes to the database.

        Example:
            ```python
            async with db.session() as session:
                session.add(user)
                await session.commit()  # Required to persist changes
            ```
        """
        async with manage_session(self.get_factory(name)) as session:
            yield session

    async def verify(self, name: str = DEFAULT_DB_NAME) -> bool:
        """
        Verify the database connection is working.

        Args:
            name: The database name to verify.

        Returns:
            True if connection is successful.

        Raises:
            RuntimeError: If the database is not initialized.
            Exception: If the connection test fails.
        """
        async with self.session(name) as session:
            await session.execute(text("SELECT 1"))
        logger.info(f"Database '{name}' connection verified.")
        return True

    async def create_all(self, name: str = DEFAULT_DB_NAME, metadata: Any = None) -> None:
        """
        Create tables in the database.

        Args:
            name: The database name to create tables in.
            metadata: SQLAlchemy MetaData object. If None, uses SQLModel.metadata
                      which includes ALL registered models. For production, consider
                      using Alembic migrations instead.

        Example:
            ```python
            # Development: create all tables
            await db.create_all()

            # With custom metadata (only specific tables)
            from sqlalchemy import MetaData
            my_metadata = MetaData()
            await db.create_all(metadata=my_metadata)
            ```
        """
        target_metadata = metadata if metadata is not None else SQLModel.metadata
        async with self.get_engine(name).begin() as conn:
            await conn.run_sync(target_metadata.create_all)
engine property

Access the default SQLAlchemy AsyncEngine.

session_factory property

Access the default AsyncSession factory.

create_all(name=DEFAULT_DB_NAME, metadata=None) async

Create tables in the database.

Parameters:

Name Type Description Default
name str

The database name to create tables in.

DEFAULT_DB_NAME
metadata Any

SQLAlchemy MetaData object. If None, uses SQLModel.metadata which includes ALL registered models. For production, consider using Alembic migrations instead.

None
Example
# Development: create all tables
await db.create_all()

# With custom metadata (only specific tables)
from sqlalchemy import MetaData
my_metadata = MetaData()
await db.create_all(metadata=my_metadata)
Source code in zodiac_core/db/session.py
async def create_all(self, name: str = DEFAULT_DB_NAME, metadata: Any = None) -> None:
    """
    Create tables in the database.

    Args:
        name: The database name to create tables in.
        metadata: SQLAlchemy MetaData object. If None, uses SQLModel.metadata
                  which includes ALL registered models. For production, consider
                  using Alembic migrations instead.

    Example:
        ```python
        # Development: create all tables
        await db.create_all()

        # With custom metadata (only specific tables)
        from sqlalchemy import MetaData
        my_metadata = MetaData()
        await db.create_all(metadata=my_metadata)
        ```
    """
    target_metadata = metadata if metadata is not None else SQLModel.metadata
    async with self.get_engine(name).begin() as conn:
        await conn.run_sync(target_metadata.create_all)
get_engine(name=DEFAULT_DB_NAME)

Access a specific SQLAlchemy AsyncEngine by name.

Source code in zodiac_core/db/session.py
def get_engine(self, name: str = DEFAULT_DB_NAME) -> AsyncEngine:
    """Access a specific SQLAlchemy AsyncEngine by name."""
    if name not in self._engines:
        raise RuntimeError(f"Database engine '{name}' is not initialized. Call db.setup(name='{name}') first.")
    return self._engines[name]
get_factory(name=DEFAULT_DB_NAME)

Access a specific AsyncSession factory by name.

Source code in zodiac_core/db/session.py
def get_factory(self, name: str = DEFAULT_DB_NAME) -> async_sessionmaker[AsyncSession]:
    """Access a specific AsyncSession factory by name."""
    if name not in self._session_factories:
        raise RuntimeError(f"Session factory for '{name}' is not initialized. Call db.setup(name='{name}') first.")
    return self._session_factories[name]
session(name=DEFAULT_DB_NAME) async

Context Manager for obtaining a NEW database session from a specific engine.

Note

This context manager does NOT auto-commit. You must explicitly call await session.commit() to persist changes to the database.

Example
async with db.session() as session:
    session.add(user)
    await session.commit()  # Required to persist changes
Source code in zodiac_core/db/session.py
@asynccontextmanager
async def session(self, name: str = DEFAULT_DB_NAME) -> AsyncGenerator[AsyncSession, None]:
    """
    Context Manager for obtaining a NEW database session from a specific engine.

    Note:
        This context manager does NOT auto-commit. You must explicitly call
        `await session.commit()` to persist changes to the database.

    Example:
        ```python
        async with db.session() as session:
            session.add(user)
            await session.commit()  # Required to persist changes
        ```
    """
    async with manage_session(self.get_factory(name)) as session:
        yield session
setup(database_url, name=DEFAULT_DB_NAME, echo=False, pool_size=10, max_overflow=20, pool_pre_ping=True, connect_args=None, **kwargs)

Initialize an Async Engine and Session Factory with a specific name.

Source code in zodiac_core/db/session.py
def setup(
    self,
    database_url: str,
    name: str = DEFAULT_DB_NAME,
    echo: bool = False,
    pool_size: int = 10,
    max_overflow: int = 20,
    pool_pre_ping: bool = True,
    connect_args: Optional[dict] = None,
    **kwargs,
) -> None:
    """Initialize an Async Engine and Session Factory with a specific name."""
    engine_args = {
        "echo": echo,
        "pool_pre_ping": pool_pre_ping,
        "connect_args": connect_args or {},
        **kwargs,
    }

    if "sqlite" not in database_url:
        engine_args["pool_size"] = pool_size
        engine_args["max_overflow"] = max_overflow

    current = {
        "database_url": database_url,
        "engine_args": deepcopy(engine_args),
    }

    if name in self._engines:
        existing = self._setup_configs.get(name)
        if existing == current:
            logger.debug(f"Database '{name}' is already configured with the same settings, skipping.")
            return
        raise RuntimeError(f"Database '{name}' is already configured with different settings")

    engine = create_async_engine(database_url, **engine_args)
    factory = async_sessionmaker(
        bind=engine,
        class_=AsyncSession,
        expire_on_commit=False,
        autoflush=False,
    )

    self._engines[name] = engine
    self._session_factories[name] = factory
    self._setup_configs[name] = current
    logger.info(f"Database '{name}' initialized successfully.")
shutdown(name=None) async

Dispose database resources.

Parameters:

Name Type Description Default
name str | None

Optional database name. When provided, only that engine/factory is disposed. When omitted, all registered databases are disposed.

None
Source code in zodiac_core/db/session.py
async def shutdown(self, name: str | None = None) -> None:
    """
    Dispose database resources.

    Args:
        name: Optional database name. When provided, only that engine/factory
              is disposed. When omitted, all registered databases are disposed.
    """
    if name is not None:
        engine = self._engines.pop(name, None)
        self._session_factories.pop(name, None)
        self._setup_configs.pop(name, None)
        if engine is not None:
            await engine.dispose()
        return

    for engine in self._engines.values():
        await engine.dispose()
    self._engines.clear()
    self._session_factories.clear()
    self._setup_configs.clear()
verify(name=DEFAULT_DB_NAME) async

Verify the database connection is working.

Parameters:

Name Type Description Default
name str

The database name to verify.

DEFAULT_DB_NAME

Returns:

Type Description
bool

True if connection is successful.

Raises:

Type Description
RuntimeError

If the database is not initialized.

Exception

If the connection test fails.

Source code in zodiac_core/db/session.py
async def verify(self, name: str = DEFAULT_DB_NAME) -> bool:
    """
    Verify the database connection is working.

    Args:
        name: The database name to verify.

    Returns:
        True if connection is successful.

    Raises:
        RuntimeError: If the database is not initialized.
        Exception: If the connection test fails.
    """
    async with self.session(name) as session:
        await session.execute(text("SELECT 1"))
    logger.info(f"Database '{name}' connection verified.")
    return True
get_session(name=DEFAULT_DB_NAME) async

FastAPI Dependency for obtaining a database session.

Note

This dependency does NOT auto-commit. You must explicitly call await session.commit() within your endpoint to persist changes.

Example
# Default database — use directly as a dependency
@app.post("/users")
async def create_user(session: AsyncSession = Depends(get_session)):
    session.add(User(name="test"))
    await session.commit()
    return user

# Named database — wrap in a thin dependency
async def analytics_session():
    async for s in get_session("analytics"):
        yield s

@app.get("/reports")
async def get_reports(session: AsyncSession = Depends(analytics_session)):
    ...
Source code in zodiac_core/db/session.py
async def get_session(name: str = DEFAULT_DB_NAME) -> AsyncGenerator[AsyncSession, None]:
    """
    FastAPI Dependency for obtaining a database session.

    Note:
        This dependency does NOT auto-commit. You must explicitly call
        `await session.commit()` within your endpoint to persist changes.

    Example:
        ```python
        # Default database — use directly as a dependency
        @app.post("/users")
        async def create_user(session: AsyncSession = Depends(get_session)):
            session.add(User(name="test"))
            await session.commit()
            return user

        # Named database — wrap in a thin dependency
        async def analytics_session():
            async for s in get_session("analytics"):
                yield s

        @app.get("/reports")
        async def get_reports(session: AsyncSession = Depends(analytics_session)):
            ...
        ```
    """
    async with db.session(name) as session:
        yield session
init_db_resource(database_url, name=DEFAULT_DB_NAME, echo=False, connect_args=None, **kwargs) async

A helper for dependency_injector's Resource provider. Handles the setup and shutdown lifecycle of the global db instance. Cleanup is scoped to the provided database name, so other registered databases remain available.

Source code in zodiac_core/db/session.py
async def init_db_resource(
    database_url: str,
    name: str = DEFAULT_DB_NAME,
    echo: bool = False,
    connect_args: Optional[dict] = None,
    **kwargs,
) -> AsyncGenerator[DatabaseManager, None]:
    """
    A helper for dependency_injector's Resource provider.
    Handles the setup and shutdown lifecycle of the global `db` instance.
    Cleanup is scoped to the provided database `name`, so other registered
    databases remain available.
    """
    db.setup(database_url=database_url, name=name, echo=echo, connect_args=connect_args, **kwargs)
    try:
        yield db
    finally:
        await db.shutdown(name=name)

Repository Base

zodiac_core.db.repository.BaseSQLRepository

Standard base class for SQL-based repositories.

Supports multiple database instances via db_name and provides professional utilities for common operations like pagination.

Source code in zodiac_core/db/repository.py
class BaseSQLRepository:
    """
    Standard base class for SQL-based repositories.

    Supports multiple database instances via `db_name` and provides
    professional utilities for common operations like pagination.
    """

    def __init__(
        self,
        session_factory: Optional[async_sessionmaker[AsyncSession]] = None,
        db_name: str = DEFAULT_DB_NAME,
        options: Optional[Any] = None,
    ) -> None:
        """
        Initialize the repository.

        Args:
            session_factory: Optional custom session factory. If provided, db_name is ignored.
            db_name: The name of the database engine registered in db.setup(). Defaults to DEFAULT_DB_NAME ("default").
            options: Optional configuration/options for the repository.
        """
        self._session_factory = session_factory
        self.db_name = db_name
        self.options = options

    @asynccontextmanager
    async def session(self) -> AsyncIterator[AsyncSession]:
        """
        Async context manager for obtaining a database session.
        Uses the injected factory or resolves one from the global 'db' via 'db_name'.

        Note:
            This context manager does NOT auto-commit. You must explicitly call
            `await session.commit()` to persist changes to the database.
        """
        factory = self._session_factory or db.get_factory(self.db_name)
        async with manage_session(factory) as session:
            yield session

    async def paginate(
        self,
        session: AsyncSession,
        statement: Any,
        params: PageParams,
        transformer: Optional[Type[T]] = None,
    ) -> PagedResponse[T]:
        """
        Execute a paginated query with automatic count and paging.

        Performs:
        1. Automatic total count query using the provided statement.
        2. Automatic limit/offset application.
        3. Packaging results into a standardized PagedResponse.

        Args:
            session: The active AsyncSession.
            statement: The SQLAlchemy select statement (without limit/offset).
            params: Standard PageParams (page, size).
            transformer: Optional Pydantic model to transform DB objects into.

        Example:
            ```python
            async with self.session() as session:
                stmt = select(UserModel).order_by(UserModel.created_at.desc())
                return await self.paginate(session, stmt, params)
            ```
        """
        # 1. Execute Count Query
        # Remove limit/offset (if any) for count query, then wrap in subquery
        # subquery() handles order_by correctly, and wrapping in subquery handles complex queries (joins, groups)
        count_base = statement.limit(None).offset(None)
        count_stmt = select(func.count()).select_from(count_base.subquery())
        total = (await session.execute(count_stmt)).scalar() or 0

        # 2. Execute Paged Query
        skip = (params.page - 1) * params.size
        paged_stmt = statement.offset(skip).limit(params.size)
        result = await session.execute(paged_stmt)
        items = result.scalars().all()

        # 3. Optional Transformation
        if transformer:
            items = [transformer.model_validate(item) for item in items]

        return PagedResponse.create(items=list(items), total=total, params=params)

    async def paginate_query(
        self,
        statement: Any,
        params: PageParams,
        transformer: Optional[Type[T]] = None,
    ) -> PagedResponse[T]:
        """
        Convenience method that automatically manages session for pagination.

        This is a wrapper around `paginate()` that handles session management,
        making it easier to use in repository methods.

        Args:
            statement: The SQLAlchemy select statement (without limit/offset).
            params: Standard PageParams (page, size).
            transformer: Optional Pydantic model to transform DB objects into.

        Example:
            ```python
            async def list_items(self, params: PageParams) -> PagedResponse[ItemModel]:
                stmt = select(ItemModel).order_by(ItemModel.id)
                return await self.paginate_query(stmt, params)
            ```
        """
        async with self.session() as session:
            return await self.paginate(session, statement, params, transformer)
__init__(session_factory=None, db_name=DEFAULT_DB_NAME, options=None)

Initialize the repository.

Parameters:

Name Type Description Default
session_factory Optional[async_sessionmaker[AsyncSession]]

Optional custom session factory. If provided, db_name is ignored.

None
db_name str

The name of the database engine registered in db.setup(). Defaults to DEFAULT_DB_NAME ("default").

DEFAULT_DB_NAME
options Optional[Any]

Optional configuration/options for the repository.

None
Source code in zodiac_core/db/repository.py
def __init__(
    self,
    session_factory: Optional[async_sessionmaker[AsyncSession]] = None,
    db_name: str = DEFAULT_DB_NAME,
    options: Optional[Any] = None,
) -> None:
    """
    Initialize the repository.

    Args:
        session_factory: Optional custom session factory. If provided, db_name is ignored.
        db_name: The name of the database engine registered in db.setup(). Defaults to DEFAULT_DB_NAME ("default").
        options: Optional configuration/options for the repository.
    """
    self._session_factory = session_factory
    self.db_name = db_name
    self.options = options
paginate(session, statement, params, transformer=None) async

Execute a paginated query with automatic count and paging.

Performs: 1. Automatic total count query using the provided statement. 2. Automatic limit/offset application. 3. Packaging results into a standardized PagedResponse.

Parameters:

Name Type Description Default
session AsyncSession

The active AsyncSession.

required
statement Any

The SQLAlchemy select statement (without limit/offset).

required
params PageParams

Standard PageParams (page, size).

required
transformer Optional[Type[T]]

Optional Pydantic model to transform DB objects into.

None
Example
async with self.session() as session:
    stmt = select(UserModel).order_by(UserModel.created_at.desc())
    return await self.paginate(session, stmt, params)
Source code in zodiac_core/db/repository.py
async def paginate(
    self,
    session: AsyncSession,
    statement: Any,
    params: PageParams,
    transformer: Optional[Type[T]] = None,
) -> PagedResponse[T]:
    """
    Execute a paginated query with automatic count and paging.

    Performs:
    1. Automatic total count query using the provided statement.
    2. Automatic limit/offset application.
    3. Packaging results into a standardized PagedResponse.

    Args:
        session: The active AsyncSession.
        statement: The SQLAlchemy select statement (without limit/offset).
        params: Standard PageParams (page, size).
        transformer: Optional Pydantic model to transform DB objects into.

    Example:
        ```python
        async with self.session() as session:
            stmt = select(UserModel).order_by(UserModel.created_at.desc())
            return await self.paginate(session, stmt, params)
        ```
    """
    # 1. Execute Count Query
    # Remove limit/offset (if any) for count query, then wrap in subquery
    # subquery() handles order_by correctly, and wrapping in subquery handles complex queries (joins, groups)
    count_base = statement.limit(None).offset(None)
    count_stmt = select(func.count()).select_from(count_base.subquery())
    total = (await session.execute(count_stmt)).scalar() or 0

    # 2. Execute Paged Query
    skip = (params.page - 1) * params.size
    paged_stmt = statement.offset(skip).limit(params.size)
    result = await session.execute(paged_stmt)
    items = result.scalars().all()

    # 3. Optional Transformation
    if transformer:
        items = [transformer.model_validate(item) for item in items]

    return PagedResponse.create(items=list(items), total=total, params=params)
paginate_query(statement, params, transformer=None) async

Convenience method that automatically manages session for pagination.

This is a wrapper around paginate() that handles session management, making it easier to use in repository methods.

Parameters:

Name Type Description Default
statement Any

The SQLAlchemy select statement (without limit/offset).

required
params PageParams

Standard PageParams (page, size).

required
transformer Optional[Type[T]]

Optional Pydantic model to transform DB objects into.

None
Example
async def list_items(self, params: PageParams) -> PagedResponse[ItemModel]:
    stmt = select(ItemModel).order_by(ItemModel.id)
    return await self.paginate_query(stmt, params)
Source code in zodiac_core/db/repository.py
async def paginate_query(
    self,
    statement: Any,
    params: PageParams,
    transformer: Optional[Type[T]] = None,
) -> PagedResponse[T]:
    """
    Convenience method that automatically manages session for pagination.

    This is a wrapper around `paginate()` that handles session management,
    making it easier to use in repository methods.

    Args:
        statement: The SQLAlchemy select statement (without limit/offset).
        params: Standard PageParams (page, size).
        transformer: Optional Pydantic model to transform DB objects into.

    Example:
        ```python
        async def list_items(self, params: PageParams) -> PagedResponse[ItemModel]:
            stmt = select(ItemModel).order_by(ItemModel.id)
            return await self.paginate_query(stmt, params)
        ```
    """
    async with self.session() as session:
        return await self.paginate(session, statement, params, transformer)
session() async

Async context manager for obtaining a database session. Uses the injected factory or resolves one from the global 'db' via 'db_name'.

Note

This context manager does NOT auto-commit. You must explicitly call await session.commit() to persist changes to the database.

Source code in zodiac_core/db/repository.py
@asynccontextmanager
async def session(self) -> AsyncIterator[AsyncSession]:
    """
    Async context manager for obtaining a database session.
    Uses the injected factory or resolves one from the global 'db' via 'db_name'.

    Note:
        This context manager does NOT auto-commit. You must explicitly call
        `await session.commit()` to persist changes to the database.
    """
    factory = self._session_factory or db.get_factory(self.db_name)
    async with manage_session(factory) as session:
        yield session

SQL Models & Mixins

zodiac_core.db.sql

IntIDModel

Bases: SQLBase, IntIDMixin

Base SQLModel with Integer ID and Timestamps. Includes: ID (int) + CreatedAt + UpdatedAt.

Source code in zodiac_core/db/sql.py
class IntIDModel(SQLBase, IntIDMixin):
    """
    Base SQLModel with Integer ID and Timestamps.
    Includes: ID (int) + CreatedAt + UpdatedAt.
    """
UUIDModel

Bases: SQLBase, UUIDMixin

Base SQLModel with UUID and Timestamps. Includes: ID (UUID) + CreatedAt + UpdatedAt.

Source code in zodiac_core/db/sql.py
class UUIDModel(SQLBase, UUIDMixin):
    """
    Base SQLModel with UUID and Timestamps.
    Includes: ID (UUID) + CreatedAt + UpdatedAt.
    """
SQLDateTimeMixin

Bases: SQLModel

Mixin for created_at and updated_at with SQLAlchemy server-side defaults. Supports PostgreSQL, MySQL, and SQLite with proper UTC handling.

Source code in zodiac_core/db/sql.py
class SQLDateTimeMixin(SQLModel):
    """
    Mixin for created_at and updated_at with SQLAlchemy server-side defaults.
    Supports PostgreSQL, MySQL, and SQLite with proper UTC handling.
    """

    created_at: datetime = Field(
        default_factory=utc_now,
        sa_column_kwargs={
            "server_default": utcnow(),
            "nullable": False,
        },
        sa_type=DateTime(timezone=True),
        description="Record creation timestamp (UTC)",
    )
    updated_at: datetime = Field(
        default_factory=utc_now,
        sa_column_kwargs={
            "server_default": utcnow(),
            "onupdate": utcnow(),
            "nullable": False,
        },
        sa_type=DateTime(timezone=True),
        description="Record last update timestamp (UTC)",
    )