Database Engine & ORM
ZodiacCore provides a high-performance, async-first database abstraction layer built on top of SQLModel and SQLAlchemy 2.0. It simplifies session management, connection pooling, and standardizes model definitions.
1. Core Concepts
The Database Manager
The DatabaseManager (exposed as the global db instance) is a strict singleton that manages the SQLAlchemy AsyncEngine and async_sessionmaker. It ensures that your process can reuse connection pools for the same named database instead of letting each app/container create its own pool, which is critical for performance and resource management.
The Repository Pattern
We encourage the use of the Repository Pattern via BaseSQLRepository. This decouples your business logic from database-specific code, making your application more maintainable and easier to unit test with mocks.
2. Model Definitions
ZodiacCore provides several mixins and base classes in zodiac_core.db.sql to standardize your database schema.
Standard Base Models
Instead of inheriting from SQLModel directly, we recommend using our pre-configured base models:
| Base Model | Primary Key | Timestamps |
|---|---|---|
IntIDModel |
id: int (Auto-increment) |
created_at, updated_at |
UUIDModel |
id: UUID (v4) |
created_at, updated_at |
Example: Using Base Models
from zodiac_core.db.sql import IntIDModel
from sqlmodel import Field
class User(IntIDModel, table=True):
username: str = Field(unique=True, index=True)
email: str
Automatic Timestamps
Both IntIDModel and UUIDModel include SQLDateTimeMixin, which provides:
- created_at: Automatically set on insertion.
- updated_at: Automatically updated on every save via a SQLAlchemy event listener.
3. Configuration & Lifecycle
You should initialize the database during your application's startup and ensure it shuts down cleanly.
Calling db.setup(...) again with the same name is allowed only when the effective configuration is identical; different settings for an existing name raise RuntimeError.
Lifecycle control is now name-aware:
await db.shutdown(name="...")disposes only the selected named database.await db.shutdown()disposes all registered databases.
This lets multiple apps, containers, or resources share the global manager while still releasing only the resource they own.
FastAPI Integration
We recommend using the lifespan context manager (FastAPI 0.93+). The legacy on_event("startup") / on_event("shutdown") are deprecated.
from contextlib import asynccontextmanager
from fastapi import FastAPI
from zodiac_core.db.session import db
@asynccontextmanager
async def lifespan(app: FastAPI):
db.setup(
"postgresql+asyncpg://user:pass@localhost/dbname",
pool_size=20,
max_overflow=10,
echo=False
)
await db.create_all() # Optional: create tables if they don't exist
yield
await db.shutdown()
app = FastAPI(lifespan=lifespan)
For a single-app service, await db.shutdown() is still the simplest shutdown path.
If you register multiple named databases or share the global db across multiple app lifecycles, prefer await db.shutdown(name="...") for scoped cleanup.
4. Working with Repositories
Inherit from BaseSQLRepository to create your data access layer.
from zodiac_core.db.repository import BaseSQLRepository
from sqlalchemy import select
from .models import User
class UserRepository(BaseSQLRepository):
async def find_by_username(self, username: str) -> User | None:
async with self.session() as session:
stmt = select(User).where(User.username == username)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def create_user(self, user: User) -> User:
async with self.session() as session:
session.add(user)
await session.commit()
await session.refresh(user)
return user
5. Multi-Database Support
ZodiacCore supports multiple database connections simultaneously. This is essential for architectures involving:
- Read-Write Splitting: Routing writes to a Master and reads to a Replica.
- Vertical Partitioning: Storing different modules (e.g., Users, Analytics) in separate databases.
Registering Named Databases
You can call db.setup() multiple times with different name arguments.
# Primary Database (Master)
db.setup("postgresql+asyncpg://master_db_url", name="default")
# Read-only Replica
db.setup("postgresql+asyncpg://replica_db_url", name="read_only")
Releasing Named Databases
Named shutdown is the companion to named setup:
# Dispose only the replica pool
await db.shutdown(name="read_only")
# Dispose everything registered in the manager
await db.shutdown()
Use named shutdown when the process keeps other databases alive, such as multi-app hosting, plugin-based services, or multiple DI resources sharing the same global manager.
Binding Repositories to a Database
When creating a repository, specify which database it should use via db_name.
class ReadOnlyUserRepository(BaseSQLRepository):
def __init__(self):
# This repo will always use the 'read_only' engine
super().__init__(db_name="read_only")
async def get_total_users(self) -> int:
async with self.session() as session:
# Executes on replica
...
6. API Reference
Session & Lifecycle
zodiac_core.db.session
DEFAULT_DB_NAME = 'default'
module-attribute
db = DatabaseManager()
module-attribute
DatabaseManager
Manages multiple Async Database Engines and Session Factories. Implemented as a Strict Singleton to coordinate connection pools.
Integration Examples:
-
Native FastAPI (Lifespan + Depends):
# main.py from contextlib import asynccontextmanager from fastapi import FastAPI, Depends from sqlalchemy.ext.asyncio import AsyncSession from zodiac_core.db.session import db, get_session @asynccontextmanager async def lifespan(app: FastAPI): db.setup("sqlite+aiosqlite:///database.db") yield await db.shutdown() app = FastAPI(lifespan=lifespan) @app.get("/items") async def list_items(session: AsyncSession = Depends(get_session)): return {"status": "ok"} -
Dependency Injector (Using provided init_db_resource):
# containers.py from dependency_injector import containers, providers from zodiac_core.utils import strtobool from zodiac_core.db.session import init_db_resource class Container(containers.DeclarativeContainer): config = providers.Configuration(strict=True) # Use the pre-built resource helper db_manager = providers.Resource( init_db_resource, database_url=config.db.url, echo=config.db.echo.as_(strtobool), )
Source code in zodiac_core/db/session.py
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 | |
engine
property
Access the default SQLAlchemy AsyncEngine.
session_factory
property
Access the default AsyncSession factory.
create_all(name=DEFAULT_DB_NAME, metadata=None)
async
Create tables in the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The database name to create tables in. |
DEFAULT_DB_NAME
|
metadata
|
Any
|
SQLAlchemy MetaData object. If None, uses SQLModel.metadata which includes ALL registered models. For production, consider using Alembic migrations instead. |
None
|
Example
Source code in zodiac_core/db/session.py
get_engine(name=DEFAULT_DB_NAME)
Access a specific SQLAlchemy AsyncEngine by name.
Source code in zodiac_core/db/session.py
get_factory(name=DEFAULT_DB_NAME)
Access a specific AsyncSession factory by name.
Source code in zodiac_core/db/session.py
session(name=DEFAULT_DB_NAME)
async
Context Manager for obtaining a NEW database session from a specific engine.
Note
This context manager does NOT auto-commit. You must explicitly call
await session.commit() to persist changes to the database.
Example
Source code in zodiac_core/db/session.py
setup(database_url, name=DEFAULT_DB_NAME, echo=False, pool_size=10, max_overflow=20, pool_pre_ping=True, connect_args=None, **kwargs)
Initialize an Async Engine and Session Factory with a specific name.
Source code in zodiac_core/db/session.py
shutdown(name=None)
async
Dispose database resources.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Optional database name. When provided, only that engine/factory is disposed. When omitted, all registered databases are disposed. |
None
|
Source code in zodiac_core/db/session.py
verify(name=DEFAULT_DB_NAME)
async
Verify the database connection is working.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The database name to verify. |
DEFAULT_DB_NAME
|
Returns:
| Type | Description |
|---|---|
bool
|
True if connection is successful. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the database is not initialized. |
Exception
|
If the connection test fails. |
Source code in zodiac_core/db/session.py
get_session(name=DEFAULT_DB_NAME)
async
FastAPI Dependency for obtaining a database session.
Note
This dependency does NOT auto-commit. You must explicitly call
await session.commit() within your endpoint to persist changes.
Example
# Default database — use directly as a dependency
@app.post("/users")
async def create_user(session: AsyncSession = Depends(get_session)):
session.add(User(name="test"))
await session.commit()
return user
# Named database — wrap in a thin dependency
async def analytics_session():
async for s in get_session("analytics"):
yield s
@app.get("/reports")
async def get_reports(session: AsyncSession = Depends(analytics_session)):
...
Source code in zodiac_core/db/session.py
init_db_resource(database_url, name=DEFAULT_DB_NAME, echo=False, connect_args=None, **kwargs)
async
A helper for dependency_injector's Resource provider.
Handles the setup and shutdown lifecycle of the global db instance.
Cleanup is scoped to the provided database name, so other registered
databases remain available.
Source code in zodiac_core/db/session.py
Repository Base
zodiac_core.db.repository.BaseSQLRepository
Standard base class for SQL-based repositories.
Supports multiple database instances via db_name and provides
professional utilities for common operations like pagination.
Source code in zodiac_core/db/repository.py
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | |
__init__(session_factory=None, db_name=DEFAULT_DB_NAME, options=None)
Initialize the repository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_factory
|
Optional[async_sessionmaker[AsyncSession]]
|
Optional custom session factory. If provided, db_name is ignored. |
None
|
db_name
|
str
|
The name of the database engine registered in db.setup(). Defaults to DEFAULT_DB_NAME ("default"). |
DEFAULT_DB_NAME
|
options
|
Optional[Any]
|
Optional configuration/options for the repository. |
None
|
Source code in zodiac_core/db/repository.py
paginate(session, statement, params, transformer=None)
async
Execute a paginated query with automatic count and paging.
Performs: 1. Automatic total count query using the provided statement. 2. Automatic limit/offset application. 3. Packaging results into a standardized PagedResponse.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
AsyncSession
|
The active AsyncSession. |
required |
statement
|
Any
|
The SQLAlchemy select statement (without limit/offset). |
required |
params
|
PageParams
|
Standard PageParams (page, size). |
required |
transformer
|
Optional[Type[T]]
|
Optional Pydantic model to transform DB objects into. |
None
|
Example
Source code in zodiac_core/db/repository.py
paginate_query(statement, params, transformer=None)
async
Convenience method that automatically manages session for pagination.
This is a wrapper around paginate() that handles session management,
making it easier to use in repository methods.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
statement
|
Any
|
The SQLAlchemy select statement (without limit/offset). |
required |
params
|
PageParams
|
Standard PageParams (page, size). |
required |
transformer
|
Optional[Type[T]]
|
Optional Pydantic model to transform DB objects into. |
None
|
Example
Source code in zodiac_core/db/repository.py
session()
async
Async context manager for obtaining a database session. Uses the injected factory or resolves one from the global 'db' via 'db_name'.
Note
This context manager does NOT auto-commit. You must explicitly call
await session.commit() to persist changes to the database.
Source code in zodiac_core/db/repository.py
SQL Models & Mixins
zodiac_core.db.sql
IntIDModel
Bases: SQLBase, IntIDMixin
Base SQLModel with Integer ID and Timestamps. Includes: ID (int) + CreatedAt + UpdatedAt.
Source code in zodiac_core/db/sql.py
UUIDModel
SQLDateTimeMixin
Bases: SQLModel
Mixin for created_at and updated_at with SQLAlchemy server-side defaults. Supports PostgreSQL, MySQL, and SQLite with proper UTC handling.