Skip to main content

SQLAlchemy Adapter

A detailed guide on configuring the SQLAlchemy integration for DatabaseEnforcerProvider.

Supported databases

Any database supported by SQLAlchemy with an async driver works:

DatabaseDriverConnection string
PostgreSQLasyncpgpostgresql+asyncpg://user:pass@host/db
MySQLaiomysqlmysql+aiomysql://user:pass@host/db
SQLiteaiosqlitesqlite+aiosqlite:///./app.db

Engine and session factory

from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine

# Create the engine
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/mydb",
echo=False, # set True for SQL logging
pool_size=10,
max_overflow=20,
)

# Create the session factory
async_session = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)

Policy model

The policy model can have any shape — DatabaseEnforcerProvider uses your policy_mapper to convert rows, so you control the mapping:

from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, DateTime
from datetime import datetime

class Base(DeclarativeBase):
pass

class Policy(Base):
__tablename__ = "casbin_policies"

id: Mapped[int] = mapped_column(primary_key=True)
sub: Mapped[str] = mapped_column(String(100), index=True)
obj: Mapped[str] = mapped_column(String(100), index=True)
act: Mapped[str] = mapped_column(String(100))
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
is_active: Mapped[bool] = mapped_column(default=True)

Custom policy mapper

The policy_mapper receives each row and must return a tuple of strings matching your Casbin model's [policy_definition]:

# Standard RBAC: (sub, obj, act)
policy_mapper=lambda p: (p.sub, p.obj, p.act)

# Only include active policies
# (filter in query or in mapper)
policy_mapper=lambda p: (p.subject, p.resource, p.permission)

# 4-field model: (sub, dom, obj, act)
policy_mapper=lambda p: (p.sub, p.domain, p.obj, p.act)

Database initialization

Use FastAPI's lifespan to create tables and seed initial data:

from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

# Seed initial policies
await seed_policies()

yield

await engine.dispose()


async def seed_policies() -> None:
async with async_session() as session:
from sqlalchemy import select
result = await session.execute(select(Policy))
if result.scalars().first() is not None:
return # already seeded

session.add_all([
Policy(sub="admin", obj="post", act="read"),
Policy(sub="admin", obj="post", act="write"),
Policy(sub="admin", obj="post", act="delete"),
Policy(sub="editor", obj="post", act="read"),
Policy(sub="editor", obj="post", act="write"),
Policy(sub="viewer", obj="post", act="read"),
])
await session.commit()


app = FastAPI(lifespan=lifespan)

Managing policies at runtime

Since policies live in the database, you can add a management API:

from pydantic import BaseModel

class PolicyCreate(BaseModel):
sub: str
obj: str
act: str

@app.post("/admin/policies")
async def add_policy(data: PolicyCreate) -> dict:
async with async_session() as session:
policy = Policy(sub=data.sub, obj=data.obj, act=data.act)
session.add(policy)
await session.commit()
return {"created": True}

@app.delete("/admin/policies")
async def remove_policy(sub: str, obj: str, act: str) -> dict:
from sqlalchemy import delete
async with async_session() as session:
await session.execute(
delete(Policy).where(
Policy.sub == sub,
Policy.obj == obj,
Policy.act == act,
)
)
await session.commit()
return {"deleted": True}
note

Policy changes take effect on the next request since DatabaseEnforcerProvider creates a fresh enforcer on each call.