Skip to content

Module 11: Database Layer

Overview

Production Solana DApps need persistent storage for indexed blockchain data, user preferences, analytics, and cached state. This module covers designing database schemas for blockchain applications, using PostgreSQL for production and SQLite for development, with Alembic for version-controlled migrations.

Learning Objectives

By the end of this module, you will be able to:

  • Design database schemas for blockchain data
  • Implement async database operations with SQLAlchemy
  • Create efficient indexes for common query patterns
  • Set up Alembic migrations for schema versioning
  • Migrate from SQLite to PostgreSQL for production

Part A: Schema Design

Blockchain Data Considerations

Blockchain data has unique characteristics that affect schema design:

  1. Immutability: On-chain data is append-only
  2. Public keys as identifiers: 32-byte base58 strings
  3. Large integers: Token amounts use u64 (up to 18.4 quintillion)
  4. Timestamps: Unix timestamps in seconds
  5. State machines: Entities with defined lifecycle states

Core Tables

-- database/schemas/common.sql

-- Accounts table for tracking watched addresses
CREATE TABLE accounts (
    id SERIAL PRIMARY KEY,
    pubkey VARCHAR(44) UNIQUE NOT NULL,
    label VARCHAR(255),
    account_type VARCHAR(50) NOT NULL,
    owner_program VARCHAR(44),
    lamports BIGINT DEFAULT 0,
    data_hash VARCHAR(64),
    slot BIGINT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_accounts_pubkey ON accounts(pubkey);
CREATE INDEX idx_accounts_type ON accounts(account_type);
CREATE INDEX idx_accounts_owner ON accounts(owner_program);

-- Transactions table for history
CREATE TABLE transactions (
    id SERIAL PRIMARY KEY,
    signature VARCHAR(88) UNIQUE NOT NULL,
    slot BIGINT NOT NULL,
    block_time TIMESTAMP,
    fee BIGINT NOT NULL,
    status VARCHAR(20) NOT NULL,
    program_id VARCHAR(44),
    instruction_type VARCHAR(50),
    accounts JSONB,
    raw_data JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_transactions_signature ON transactions(signature);
CREATE INDEX idx_transactions_slot ON transactions(slot DESC);
CREATE INDEX idx_transactions_program ON transactions(program_id);
CREATE INDEX idx_transactions_block_time ON transactions(block_time DESC);
CREATE INDEX idx_transactions_accounts ON transactions USING GIN(accounts);

Token Escrow Schema

-- database/schemas/escrows.sql

-- Escrow state enum
CREATE TYPE escrow_state AS ENUM ('created', 'funded', 'completed', 'cancelled');

-- Escrow records
CREATE TABLE escrows (
    id SERIAL PRIMARY KEY,
    pubkey VARCHAR(44) UNIQUE NOT NULL,
    escrow_id BIGINT NOT NULL,
    maker VARCHAR(44) NOT NULL,
    taker VARCHAR(44),
    mint_a VARCHAR(44) NOT NULL,
    mint_b VARCHAR(44) NOT NULL,
    amount_a BIGINT NOT NULL,
    amount_b BIGINT NOT NULL,
    vault VARCHAR(44) NOT NULL,
    state escrow_state NOT NULL DEFAULT 'created',
    created_slot BIGINT NOT NULL,
    completed_slot BIGINT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_escrows_pubkey ON escrows(pubkey);
CREATE INDEX idx_escrows_maker ON escrows(maker);
CREATE INDEX idx_escrows_taker ON escrows(taker);
CREATE INDEX idx_escrows_state ON escrows(state);
CREATE INDEX idx_escrows_mints ON escrows(mint_a, mint_b);

-- Escrow events for audit trail
CREATE TABLE escrow_events (
    id SERIAL PRIMARY KEY,
    escrow_pubkey VARCHAR(44) NOT NULL REFERENCES escrows(pubkey),
    event_type VARCHAR(50) NOT NULL,
    signature VARCHAR(88) NOT NULL,
    slot BIGINT NOT NULL,
    data JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_escrow_events_escrow ON escrow_events(escrow_pubkey);
CREATE INDEX idx_escrow_events_type ON escrow_events(event_type);

NFT Marketplace Schema

-- database/schemas/marketplace.sql

-- Listing state enum
CREATE TYPE listing_state AS ENUM ('active', 'sold', 'cancelled');

-- NFT listings
CREATE TABLE listings (
    id SERIAL PRIMARY KEY,
    pubkey VARCHAR(44) UNIQUE NOT NULL,
    marketplace VARCHAR(44) NOT NULL,
    seller VARCHAR(44) NOT NULL,
    mint VARCHAR(44) NOT NULL,
    price BIGINT NOT NULL,
    state listing_state NOT NULL DEFAULT 'active',
    created_slot BIGINT NOT NULL,
    sold_slot BIGINT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_listings_pubkey ON listings(pubkey);
CREATE INDEX idx_listings_seller ON listings(seller);
CREATE INDEX idx_listings_mint ON listings(mint);
CREATE INDEX idx_listings_state ON listings(state);
CREATE INDEX idx_listings_price ON listings(price) WHERE state = 'active';

-- Sales history
CREATE TABLE sales (
    id SERIAL PRIMARY KEY,
    listing_pubkey VARCHAR(44) NOT NULL REFERENCES listings(pubkey),
    buyer VARCHAR(44) NOT NULL,
    price BIGINT NOT NULL,
    signature VARCHAR(88) NOT NULL,
    slot BIGINT NOT NULL,
    royalty_amount BIGINT DEFAULT 0,
    marketplace_fee BIGINT DEFAULT 0,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_sales_listing ON sales(listing_pubkey);
CREATE INDEX idx_sales_buyer ON sales(buyer);
CREATE INDEX idx_sales_slot ON sales(slot DESC);

-- Collections
CREATE TABLE collections (
    id SERIAL PRIMARY KEY,
    mint VARCHAR(44) UNIQUE NOT NULL,
    name VARCHAR(255) NOT NULL,
    symbol VARCHAR(10),
    uri VARCHAR(500),
    creator VARCHAR(44),
    verified BOOLEAN DEFAULT FALSE,
    floor_price BIGINT,
    total_volume BIGINT DEFAULT 0,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_collections_mint ON collections(mint);
CREATE INDEX idx_collections_creator ON collections(creator);

DeFi AMM Schema

-- database/schemas/amm.sql

-- Liquidity pools
CREATE TABLE pools (
    id SERIAL PRIMARY KEY,
    pubkey VARCHAR(44) UNIQUE NOT NULL,
    token_a_mint VARCHAR(44) NOT NULL,
    token_b_mint VARCHAR(44) NOT NULL,
    token_a_vault VARCHAR(44) NOT NULL,
    token_b_vault VARCHAR(44) NOT NULL,
    lp_mint VARCHAR(44) NOT NULL,
    fee_rate INTEGER NOT NULL,
    token_a_reserve BIGINT DEFAULT 0,
    token_b_reserve BIGINT DEFAULT 0,
    lp_supply BIGINT DEFAULT 0,
    cumulative_volume_a BIGINT DEFAULT 0,
    cumulative_volume_b BIGINT DEFAULT 0,
    created_slot BIGINT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_pools_pubkey ON pools(pubkey);
CREATE INDEX idx_pools_tokens ON pools(token_a_mint, token_b_mint);
CREATE INDEX idx_pools_lp_mint ON pools(lp_mint);

-- Swap transactions
CREATE TABLE swaps (
    id SERIAL PRIMARY KEY,
    pool_pubkey VARCHAR(44) NOT NULL REFERENCES pools(pubkey),
    user_address VARCHAR(44) NOT NULL,
    input_mint VARCHAR(44) NOT NULL,
    output_mint VARCHAR(44) NOT NULL,
    input_amount BIGINT NOT NULL,
    output_amount BIGINT NOT NULL,
    fee_amount BIGINT NOT NULL,
    signature VARCHAR(88) NOT NULL,
    slot BIGINT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_swaps_pool ON swaps(pool_pubkey);
CREATE INDEX idx_swaps_user ON swaps(user_address);
CREATE INDEX idx_swaps_slot ON swaps(slot DESC);

-- Liquidity events (add/remove)
CREATE TABLE liquidity_events (
    id SERIAL PRIMARY KEY,
    pool_pubkey VARCHAR(44) NOT NULL REFERENCES pools(pubkey),
    user_address VARCHAR(44) NOT NULL,
    event_type VARCHAR(20) NOT NULL, -- 'add' or 'remove'
    token_a_amount BIGINT NOT NULL,
    token_b_amount BIGINT NOT NULL,
    lp_amount BIGINT NOT NULL,
    signature VARCHAR(88) NOT NULL,
    slot BIGINT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_liquidity_events_pool ON liquidity_events(pool_pubkey);
CREATE INDEX idx_liquidity_events_user ON liquidity_events(user_address);
CREATE INDEX idx_liquidity_events_type ON liquidity_events(event_type);

DAO Governance Schema

-- database/schemas/governance.sql

-- Proposal state enum
CREATE TYPE proposal_state AS ENUM (
    'active', 'succeeded', 'defeated', 'queued', 'executed', 'cancelled'
);

-- Governance configuration
CREATE TABLE governances (
    id SERIAL PRIMARY KEY,
    pubkey VARCHAR(44) UNIQUE NOT NULL,
    authority VARCHAR(44) NOT NULL,
    governance_token_mint VARCHAR(44) NOT NULL,
    treasury VARCHAR(44) NOT NULL,
    proposal_count BIGINT DEFAULT 0,
    voting_period BIGINT NOT NULL,
    proposal_threshold BIGINT NOT NULL,
    quorum_votes BIGINT NOT NULL,
    execution_delay BIGINT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_governances_pubkey ON governances(pubkey);
CREATE INDEX idx_governances_token ON governances(governance_token_mint);

-- Proposals
CREATE TABLE proposals (
    id SERIAL PRIMARY KEY,
    pubkey VARCHAR(44) UNIQUE NOT NULL,
    governance_pubkey VARCHAR(44) NOT NULL REFERENCES governances(pubkey),
    proposal_id BIGINT NOT NULL,
    proposer VARCHAR(44) NOT NULL,
    title VARCHAR(255) NOT NULL,
    description_uri VARCHAR(500),
    votes_for BIGINT DEFAULT 0,
    votes_against BIGINT DEFAULT 0,
    state proposal_state NOT NULL DEFAULT 'active',
    voting_starts_at TIMESTAMP NOT NULL,
    voting_ends_at TIMESTAMP NOT NULL,
    execution_time TIMESTAMP,
    created_slot BIGINT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_proposals_pubkey ON proposals(pubkey);
CREATE INDEX idx_proposals_governance ON proposals(governance_pubkey);
CREATE INDEX idx_proposals_proposer ON proposals(proposer);
CREATE INDEX idx_proposals_state ON proposals(state);
CREATE INDEX idx_proposals_voting ON proposals(voting_ends_at) WHERE state = 'active';

-- Vote records
CREATE TABLE votes (
    id SERIAL PRIMARY KEY,
    pubkey VARCHAR(44) UNIQUE NOT NULL,
    proposal_pubkey VARCHAR(44) NOT NULL REFERENCES proposals(pubkey),
    voter VARCHAR(44) NOT NULL,
    support BOOLEAN NOT NULL,
    vote_weight BIGINT NOT NULL,
    signature VARCHAR(88) NOT NULL,
    slot BIGINT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_votes_proposal ON votes(proposal_pubkey);
CREATE INDEX idx_votes_voter ON votes(voter);
CREATE UNIQUE INDEX idx_votes_unique ON votes(proposal_pubkey, voter);

-- Proposal executions
CREATE TABLE executions (
    id SERIAL PRIMARY KEY,
    proposal_pubkey VARCHAR(44) NOT NULL REFERENCES proposals(pubkey),
    executor VARCHAR(44) NOT NULL,
    signature VARCHAR(88) NOT NULL,
    slot BIGINT NOT NULL,
    success BOOLEAN NOT NULL,
    error_message TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_executions_proposal ON executions(proposal_pubkey);

Part B: SQLAlchemy Models

Base Configuration

# api/app/db/base.py
from datetime import datetime
from typing import AsyncGenerator

from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase

from ..config import get_settings

# Naming convention for constraints
convention = {
    "ix": "ix_%(column_0_label)s",
    "uq": "uq_%(table_name)s_%(column_0_name)s",
    "ck": "ck_%(table_name)s_%(constraint_name)s",
    "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
    "pk": "pk_%(table_name)s",
}

metadata = MetaData(naming_convention=convention)


class Base(DeclarativeBase):
    """Base class for all SQLAlchemy models."""
    metadata = metadata


# Create async engine
settings = get_settings()
engine = create_async_engine(
    settings.database_url,
    echo=settings.debug,
    pool_pre_ping=True,
)

# Session factory
async_session_factory = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


async def get_db() -> AsyncGenerator[AsyncSession, None]:
    """Dependency for getting database sessions."""
    async with async_session_factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Escrow Model

# api/app/db/models/escrow.py
from datetime import datetime
from enum import Enum as PyEnum
from typing import Optional

from sqlalchemy import BigInteger, Enum, ForeignKey, String, DateTime, Index
from sqlalchemy.orm import Mapped, mapped_column, relationship

from ..base import Base


class EscrowState(PyEnum):
    CREATED = "created"
    FUNDED = "funded"
    COMPLETED = "completed"
    CANCELLED = "cancelled"


class Escrow(Base):
    """Escrow database model."""
    __tablename__ = "escrows"

    id: Mapped[int] = mapped_column(primary_key=True)
    pubkey: Mapped[str] = mapped_column(String(44), unique=True, nullable=False, index=True)
    escrow_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
    maker: Mapped[str] = mapped_column(String(44), nullable=False, index=True)
    taker: Mapped[Optional[str]] = mapped_column(String(44), nullable=True, index=True)
    mint_a: Mapped[str] = mapped_column(String(44), nullable=False)
    mint_b: Mapped[str] = mapped_column(String(44), nullable=False)
    amount_a: Mapped[int] = mapped_column(BigInteger, nullable=False)
    amount_b: Mapped[int] = mapped_column(BigInteger, nullable=False)
    vault: Mapped[str] = mapped_column(String(44), nullable=False)
    state: Mapped[EscrowState] = mapped_column(
        Enum(EscrowState),
        nullable=False,
        default=EscrowState.CREATED,
        index=True
    )
    created_slot: Mapped[int] = mapped_column(BigInteger, nullable=False)
    completed_slot: Mapped[Optional[int]] = mapped_column(BigInteger, nullable=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
    updated_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        onupdate=datetime.utcnow
    )

    # Relationships
    events: Mapped[list["EscrowEvent"]] = relationship(
        back_populates="escrow",
        cascade="all, delete-orphan"
    )

    __table_args__ = (
        Index("idx_escrows_mints", "mint_a", "mint_b"),
    )


class EscrowEvent(Base):
    """Escrow event audit trail."""
    __tablename__ = "escrow_events"

    id: Mapped[int] = mapped_column(primary_key=True)
    escrow_pubkey: Mapped[str] = mapped_column(
        String(44),
        ForeignKey("escrows.pubkey"),
        nullable=False,
        index=True
    )
    event_type: Mapped[str] = mapped_column(String(50), nullable=False, index=True)
    signature: Mapped[str] = mapped_column(String(88), nullable=False)
    slot: Mapped[int] = mapped_column(BigInteger, nullable=False)
    data: Mapped[Optional[dict]] = mapped_column(nullable=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)

    # Relationships
    escrow: Mapped["Escrow"] = relationship(back_populates="events")

Pool Model

# api/app/db/models/pool.py
from datetime import datetime
from typing import Optional

from sqlalchemy import BigInteger, Integer, String, DateTime, Index
from sqlalchemy.orm import Mapped, mapped_column, relationship

from ..base import Base


class Pool(Base):
    """AMM Pool database model."""
    __tablename__ = "pools"

    id: Mapped[int] = mapped_column(primary_key=True)
    pubkey: Mapped[str] = mapped_column(String(44), unique=True, nullable=False, index=True)
    token_a_mint: Mapped[str] = mapped_column(String(44), nullable=False)
    token_b_mint: Mapped[str] = mapped_column(String(44), nullable=False)
    token_a_vault: Mapped[str] = mapped_column(String(44), nullable=False)
    token_b_vault: Mapped[str] = mapped_column(String(44), nullable=False)
    lp_mint: Mapped[str] = mapped_column(String(44), nullable=False, index=True)
    fee_rate: Mapped[int] = mapped_column(Integer, nullable=False)
    token_a_reserve: Mapped[int] = mapped_column(BigInteger, default=0)
    token_b_reserve: Mapped[int] = mapped_column(BigInteger, default=0)
    lp_supply: Mapped[int] = mapped_column(BigInteger, default=0)
    cumulative_volume_a: Mapped[int] = mapped_column(BigInteger, default=0)
    cumulative_volume_b: Mapped[int] = mapped_column(BigInteger, default=0)
    created_slot: Mapped[int] = mapped_column(BigInteger, nullable=False)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
    updated_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        onupdate=datetime.utcnow
    )

    # Relationships
    swaps: Mapped[list["Swap"]] = relationship(back_populates="pool")
    liquidity_events: Mapped[list["LiquidityEvent"]] = relationship(back_populates="pool")

    __table_args__ = (
        Index("idx_pools_tokens", "token_a_mint", "token_b_mint"),
    )


class Swap(Base):
    """Swap transaction record."""
    __tablename__ = "swaps"

    id: Mapped[int] = mapped_column(primary_key=True)
    pool_pubkey: Mapped[str] = mapped_column(
        String(44),
        ForeignKey("pools.pubkey"),
        nullable=False,
        index=True
    )
    user_address: Mapped[str] = mapped_column(String(44), nullable=False, index=True)
    input_mint: Mapped[str] = mapped_column(String(44), nullable=False)
    output_mint: Mapped[str] = mapped_column(String(44), nullable=False)
    input_amount: Mapped[int] = mapped_column(BigInteger, nullable=False)
    output_amount: Mapped[int] = mapped_column(BigInteger, nullable=False)
    fee_amount: Mapped[int] = mapped_column(BigInteger, nullable=False)
    signature: Mapped[str] = mapped_column(String(88), nullable=False)
    slot: Mapped[int] = mapped_column(BigInteger, nullable=False, index=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)

    # Relationships
    pool: Mapped["Pool"] = relationship(back_populates="swaps")


class LiquidityEvent(Base):
    """Liquidity add/remove event."""
    __tablename__ = "liquidity_events"

    id: Mapped[int] = mapped_column(primary_key=True)
    pool_pubkey: Mapped[str] = mapped_column(
        String(44),
        ForeignKey("pools.pubkey"),
        nullable=False,
        index=True
    )
    user_address: Mapped[str] = mapped_column(String(44), nullable=False, index=True)
    event_type: Mapped[str] = mapped_column(String(20), nullable=False, index=True)
    token_a_amount: Mapped[int] = mapped_column(BigInteger, nullable=False)
    token_b_amount: Mapped[int] = mapped_column(BigInteger, nullable=False)
    lp_amount: Mapped[int] = mapped_column(BigInteger, nullable=False)
    signature: Mapped[str] = mapped_column(String(88), nullable=False)
    slot: Mapped[int] = mapped_column(BigInteger, nullable=False)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)

    # Relationships
    pool: Mapped["Pool"] = relationship(back_populates="liquidity_events")

Part C: Alembic Migrations

Setup

# Initialize Alembic
cd api
alembic init migrations

Configuration

# api/migrations/env.py
import asyncio
from logging.config import fileConfig

from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context

from app.config import get_settings
from app.db.base import Base
from app.db.models import escrow, pool as pool_model, governance  # Import all models

config = context.config

if config.config_file_name is not None:
    fileConfig(config.config_file_name)

target_metadata = Base.metadata

settings = get_settings()
config.set_main_option("sqlalchemy.url", settings.database_url)


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode."""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def do_run_migrations(connection: Connection) -> None:
    context.configure(connection=connection, target_metadata=target_metadata)

    with context.begin_transaction():
        context.run_migrations()


async def run_async_migrations() -> None:
    """Run migrations in 'online' mode with async engine."""
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()


def run_migrations_online() -> None:
    """Run migrations in 'online' mode."""
    asyncio.run(run_async_migrations())


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Initial Migration

# api/migrations/versions/001_initial.py
"""Initial migration

Revision ID: 001
Revises:
Create Date: 2024-01-01 00:00:00.000000
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

revision: str = '001'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    # Accounts table
    op.create_table(
        'accounts',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('pubkey', sa.String(44), nullable=False),
        sa.Column('label', sa.String(255), nullable=True),
        sa.Column('account_type', sa.String(50), nullable=False),
        sa.Column('owner_program', sa.String(44), nullable=True),
        sa.Column('lamports', sa.BigInteger(), default=0),
        sa.Column('data_hash', sa.String(64), nullable=True),
        sa.Column('slot', sa.BigInteger(), nullable=True),
        sa.Column('created_at', sa.DateTime(), default=sa.func.now()),
        sa.Column('updated_at', sa.DateTime(), default=sa.func.now()),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('pubkey')
    )
    op.create_index('idx_accounts_pubkey', 'accounts', ['pubkey'])
    op.create_index('idx_accounts_type', 'accounts', ['account_type'])

    # Escrows table
    op.create_table(
        'escrows',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('pubkey', sa.String(44), nullable=False),
        sa.Column('escrow_id', sa.BigInteger(), nullable=False),
        sa.Column('maker', sa.String(44), nullable=False),
        sa.Column('taker', sa.String(44), nullable=True),
        sa.Column('mint_a', sa.String(44), nullable=False),
        sa.Column('mint_b', sa.String(44), nullable=False),
        sa.Column('amount_a', sa.BigInteger(), nullable=False),
        sa.Column('amount_b', sa.BigInteger(), nullable=False),
        sa.Column('vault', sa.String(44), nullable=False),
        sa.Column('state', sa.String(20), nullable=False),
        sa.Column('created_slot', sa.BigInteger(), nullable=False),
        sa.Column('completed_slot', sa.BigInteger(), nullable=True),
        sa.Column('created_at', sa.DateTime(), default=sa.func.now()),
        sa.Column('updated_at', sa.DateTime(), default=sa.func.now()),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('pubkey')
    )
    op.create_index('idx_escrows_pubkey', 'escrows', ['pubkey'])
    op.create_index('idx_escrows_maker', 'escrows', ['maker'])
    op.create_index('idx_escrows_state', 'escrows', ['state'])


def downgrade() -> None:
    op.drop_table('escrows')
    op.drop_table('accounts')

Migration Commands

# Create new migration
alembic revision --autogenerate -m "description"

# Apply all migrations
alembic upgrade head

# Rollback one migration
alembic downgrade -1

# Show migration history
alembic history

# Show current revision
alembic current

Part D: Query Patterns

Repository Pattern

# api/app/repositories/escrow.py
from typing import Optional, List
from datetime import datetime

from sqlalchemy import select, update, and_
from sqlalchemy.ext.asyncio import AsyncSession

from ..db.models.escrow import Escrow, EscrowEvent, EscrowState


class EscrowRepository:
    """Repository for escrow database operations."""

    def __init__(self, session: AsyncSession):
        self.session = session

    async def get_by_pubkey(self, pubkey: str) -> Optional[Escrow]:
        """Get escrow by public key."""
        result = await self.session.execute(
            select(Escrow).where(Escrow.pubkey == pubkey)
        )
        return result.scalar_one_or_none()

    async def get_by_maker(
        self,
        maker: str,
        state: Optional[EscrowState] = None,
        limit: int = 100,
        offset: int = 0
    ) -> List[Escrow]:
        """Get escrows by maker with optional state filter."""
        query = select(Escrow).where(Escrow.maker == maker)

        if state:
            query = query.where(Escrow.state == state)

        query = query.order_by(Escrow.created_at.desc())
        query = query.limit(limit).offset(offset)

        result = await self.session.execute(query)
        return list(result.scalars().all())

    async def get_active(
        self,
        limit: int = 100,
        offset: int = 0
    ) -> List[Escrow]:
        """Get all active escrows."""
        result = await self.session.execute(
            select(Escrow)
            .where(Escrow.state.in_([EscrowState.CREATED, EscrowState.FUNDED]))
            .order_by(Escrow.created_at.desc())
            .limit(limit)
            .offset(offset)
        )
        return list(result.scalars().all())

    async def create(self, escrow_data: dict) -> Escrow:
        """Create new escrow record."""
        escrow = Escrow(**escrow_data)
        self.session.add(escrow)
        await self.session.flush()
        return escrow

    async def update_state(
        self,
        pubkey: str,
        new_state: EscrowState,
        completed_slot: Optional[int] = None
    ) -> bool:
        """Update escrow state."""
        update_data = {
            "state": new_state,
            "updated_at": datetime.utcnow()
        }
        if completed_slot:
            update_data["completed_slot"] = completed_slot

        result = await self.session.execute(
            update(Escrow)
            .where(Escrow.pubkey == pubkey)
            .values(**update_data)
        )
        return result.rowcount > 0

    async def add_event(
        self,
        escrow_pubkey: str,
        event_type: str,
        signature: str,
        slot: int,
        data: Optional[dict] = None
    ) -> EscrowEvent:
        """Add event to escrow audit trail."""
        event = EscrowEvent(
            escrow_pubkey=escrow_pubkey,
            event_type=event_type,
            signature=signature,
            slot=slot,
            data=data
        )
        self.session.add(event)
        await self.session.flush()
        return event

Using Repositories

# api/app/routers/escrows.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

from ..db.base import get_db
from ..repositories.escrow import EscrowRepository
from ..models.escrow import EscrowResponse, EscrowListResponse

router = APIRouter(prefix="/escrows", tags=["Token Escrow"])


@router.get("/{pubkey}", response_model=EscrowResponse)
async def get_escrow(
    pubkey: str,
    db: AsyncSession = Depends(get_db)
):
    """Get escrow by public key."""
    repo = EscrowRepository(db)
    escrow = await repo.get_by_pubkey(pubkey)

    if not escrow:
        raise HTTPException(status_code=404, detail="Escrow not found")

    return EscrowResponse.model_validate(escrow)


@router.get("/maker/{address}")
async def get_maker_escrows(
    address: str,
    state: Optional[str] = None,
    page: int = 1,
    page_size: int = 20,
    db: AsyncSession = Depends(get_db)
):
    """Get all escrows by maker."""
    repo = EscrowRepository(db)

    state_filter = EscrowState(state) if state else None
    offset = (page - 1) * page_size

    escrows = await repo.get_by_maker(
        maker=address,
        state=state_filter,
        limit=page_size,
        offset=offset
    )

    return {
        "escrows": [EscrowResponse.model_validate(e) for e in escrows],
        "page": page,
        "page_size": page_size
    }

Part E: SQLite for Development

Configuration

# For SQLite (development)
DATABASE_URL = "sqlite+aiosqlite:///./dev.db"

# For PostgreSQL (production)
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost:5432/solana_dapps"

SQLite Considerations

# api/app/db/sqlite_compat.py
"""SQLite compatibility utilities."""

from sqlalchemy import event
from sqlalchemy.engine import Engine


@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
    """Enable foreign keys and WAL mode for SQLite."""
    cursor = dbapi_connection.cursor()
    cursor.execute("PRAGMA foreign_keys=ON")
    cursor.execute("PRAGMA journal_mode=WAL")
    cursor.close()

Type Differences

Feature PostgreSQL SQLite
BIGINT Native Stored as INTEGER
ENUM Native type String
JSONB Native, indexed JSON text
Array Native Not supported
Full-text tsvector FTS5 extension

Summary

This module covered the database layer for Solana DApps:

Schema Design: - Tables for escrows, listings, pools, proposals - Proper indexes for query patterns - Audit trail with event tables - State enums for lifecycle management

SQLAlchemy: - Async models with mapped columns - Relationships and foreign keys - Repository pattern for data access - Session management with dependency injection

Alembic Migrations: - Version-controlled schema changes - Async migration runner - Upgrade and downgrade support

Best Practices: - Use BIGINT for token amounts (u64) - VARCHAR(44) for public keys - Index frequently queried columns - GIN indexes for JSONB queries - Separate event tables for audit trails

Next Steps