Skip to content

Module 10: Backend Services

Overview

While Solana programs handle on-chain logic, production DApps require backend services for indexing, caching, analytics, and providing efficient APIs. This module covers building high-performance backends using FastAPI (Python) for rapid development and Poem (Rust) for performance-critical services.

Learning Objectives

By the end of this module, you will be able to:

  • Design backend architectures for Solana DApps
  • Build async APIs with FastAPI and proper error handling
  • Index blockchain data efficiently using RPC subscriptions
  • Implement high-performance Rust services with Poem
  • Set up transaction relay with priority fees
  • Handle rate limiting, caching, and webhook integrations

Part A: FastAPI Backend

Why FastAPI for Solana Backends?

FastAPI offers several advantages for blockchain backends:

  1. Async-first: Native async/await support for non-blocking RPC calls
  2. Type safety: Pydantic models ensure data validation
  3. Auto-documentation: OpenAPI/Swagger docs generated automatically
  4. Performance: One of the fastest Python frameworks
  5. Python ecosystem: Access to data science and ML libraries

Project Structure

api/
├── app/
│   ├── __init__.py
│   ├── main.py              # Application entry point
│   ├── config.py            # Configuration management
│   ├── dependencies.py      # Dependency injection
│   ├── routers/
│   │   ├── __init__.py
│   │   ├── escrows.py       # Token escrow endpoints
│   │   ├── listings.py      # NFT marketplace endpoints
│   │   ├── pools.py         # AMM pool endpoints
│   │   └── proposals.py     # DAO governance endpoints
│   ├── models/
│   │   ├── __init__.py
│   │   ├── common.py        # Shared models
│   │   ├── escrow.py        # Escrow data models
│   │   ├── listing.py       # Listing data models
│   │   ├── pool.py          # Pool data models
│   │   └── proposal.py      # Proposal data models
│   ├── services/
│   │   ├── __init__.py
│   │   ├── solana_client.py # RPC client wrapper
│   │   ├── indexer.py       # Blockchain indexer
│   │   └── cache.py         # Caching service
│   └── db/
│       ├── __init__.py
│       ├── session.py       # Database session
│       └── models.py        # SQLAlchemy models
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   └── test_api.py
├── pyproject.toml
├── Dockerfile
└── .env.example

Configuration Management

# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    """Application settings with environment variable support."""

    # API Configuration
    app_name: str = "Solana DApps API"
    debug: bool = False
    api_prefix: str = "/api/v1"

    # Solana Configuration
    solana_rpc_url: str = "https://api.devnet.solana.com"
    solana_ws_url: str = "wss://api.devnet.solana.com"
    commitment: str = "confirmed"

    # Program IDs
    token_escrow_program_id: str = ""
    nft_marketplace_program_id: str = ""
    defi_amm_program_id: str = ""
    dao_governance_program_id: str = ""

    # Database
    database_url: str = "sqlite+aiosqlite:///./app.db"

    # Redis Cache
    redis_url: str = "redis://localhost:6379"
    cache_ttl: int = 60  # seconds

    # Rate Limiting
    rate_limit_requests: int = 100
    rate_limit_period: int = 60  # seconds

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

@lru_cache
def get_settings() -> Settings:
    """Cached settings instance."""
    return Settings()

Pydantic Models for Blockchain Data

# app/models/common.py
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from datetime import datetime
from enum import Enum

class PublicKeyStr(str):
    """Validated Solana public key string."""

    @classmethod
    def __get_validators__(cls):
        yield cls.validate

    @classmethod
    def validate(cls, v):
        if not isinstance(v, str):
            raise TypeError("string required")
        if len(v) not in (43, 44):
            raise ValueError("invalid public key length")
        # Base58 character check
        import re
        if not re.match(r'^[1-9A-HJ-NP-Za-km-z]+$', v):
            raise ValueError("invalid base58 characters")
        return cls(v)

class TransactionStatus(str, Enum):
    """Transaction confirmation status."""
    PENDING = "pending"
    CONFIRMED = "confirmed"
    FINALIZED = "finalized"
    FAILED = "failed"

class SolanaTransaction(BaseModel):
    """Solana transaction response model."""
    signature: str
    slot: int
    block_time: Optional[int] = None
    fee: int
    status: TransactionStatus

    class Config:
        json_schema_extra = {
            "example": {
                "signature": "5...abc",
                "slot": 123456789,
                "block_time": 1699999999,
                "fee": 5000,
                "status": "confirmed"
            }
        }

class PaginatedResponse(BaseModel):
    """Generic paginated response."""
    items: List
    total: int
    page: int
    page_size: int
    has_next: bool
    has_prev: bool

Escrow Models

# app/models/escrow.py
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
from enum import Enum
from .common import PublicKeyStr

class EscrowState(str, Enum):
    """Escrow lifecycle states."""
    CREATED = "created"
    FUNDED = "funded"
    COMPLETED = "completed"
    CANCELLED = "cancelled"

class EscrowBase(BaseModel):
    """Base escrow model."""
    maker: PublicKeyStr
    mint_a: PublicKeyStr
    mint_b: PublicKeyStr
    amount_a: int = Field(ge=0)
    amount_b: int = Field(ge=0)

class EscrowCreate(EscrowBase):
    """Request model for creating escrow."""
    pass

class EscrowResponse(EscrowBase):
    """Response model for escrow data."""
    public_key: PublicKeyStr
    taker: Optional[PublicKeyStr] = None
    vault: PublicKeyStr
    state: EscrowState
    created_at: datetime
    updated_at: datetime

    class Config:
        from_attributes = True

class EscrowListResponse(BaseModel):
    """Paginated list of escrows."""
    escrows: list[EscrowResponse]
    total: int
    page: int
    page_size: int

Async Solana RPC Client

# app/services/solana_client.py
import httpx
from typing import Optional, Any, List
from contextlib import asynccontextmanager
import asyncio
from ..config import get_settings

class SolanaRPCError(Exception):
    """Custom exception for RPC errors."""
    def __init__(self, code: int, message: str):
        self.code = code
        self.message = message
        super().__init__(f"RPC Error {code}: {message}")

class SolanaClient:
    """Async Solana RPC client with connection pooling."""

    def __init__(self, rpc_url: Optional[str] = None):
        settings = get_settings()
        self.rpc_url = rpc_url or settings.solana_rpc_url
        self.commitment = settings.commitment
        self._client: Optional[httpx.AsyncClient] = None
        self._request_id = 0

    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            base_url=self.rpc_url,
            timeout=30.0,
            limits=httpx.Limits(max_connections=100)
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._client:
            await self._client.aclose()

    def _next_id(self) -> int:
        self._request_id += 1
        return self._request_id

    async def _request(
        self,
        method: str,
        params: Optional[List] = None
    ) -> Any:
        """Make JSON-RPC request."""
        if not self._client:
            raise RuntimeError("Client not initialized. Use async context manager.")

        payload = {
            "jsonrpc": "2.0",
            "id": self._next_id(),
            "method": method,
            "params": params or []
        }

        response = await self._client.post("", json=payload)
        response.raise_for_status()

        data = response.json()
        if "error" in data:
            raise SolanaRPCError(
                data["error"].get("code", -1),
                data["error"].get("message", "Unknown error")
            )

        return data.get("result")

    async def get_account_info(
        self,
        pubkey: str,
        encoding: str = "base64"
    ) -> Optional[dict]:
        """Get account info by public key."""
        result = await self._request(
            "getAccountInfo",
            [pubkey, {"encoding": encoding, "commitment": self.commitment}]
        )
        return result.get("value") if result else None

    async def get_multiple_accounts(
        self,
        pubkeys: List[str],
        encoding: str = "base64"
    ) -> List[Optional[dict]]:
        """Get multiple accounts in single request."""
        result = await self._request(
            "getMultipleAccounts",
            [pubkeys, {"encoding": encoding, "commitment": self.commitment}]
        )
        return result.get("value", []) if result else []

    async def get_signatures_for_address(
        self,
        address: str,
        limit: int = 100,
        before: Optional[str] = None,
        until: Optional[str] = None
    ) -> List[dict]:
        """Get transaction signatures for an address."""
        options = {"limit": limit, "commitment": self.commitment}
        if before:
            options["before"] = before
        if until:
            options["until"] = until

        return await self._request(
            "getSignaturesForAddress",
            [address, options]
        )

    async def get_transaction(
        self,
        signature: str,
        encoding: str = "json"
    ) -> Optional[dict]:
        """Get transaction details by signature."""
        return await self._request(
            "getTransaction",
            [signature, {"encoding": encoding, "commitment": self.commitment}]
        )

    async def get_slot(self) -> int:
        """Get current slot."""
        return await self._request("getSlot", [{"commitment": self.commitment}])

    async def get_balance(self, pubkey: str) -> int:
        """Get SOL balance in lamports."""
        result = await self._request(
            "getBalance",
            [pubkey, {"commitment": self.commitment}]
        )
        return result.get("value", 0)

# Dependency for FastAPI
async def get_solana_client():
    """FastAPI dependency for Solana client."""
    async with SolanaClient() as client:
        yield client

Blockchain Indexer Service

# app/services/indexer.py
import asyncio
from typing import Optional, Callable, List, Dict, Any
from datetime import datetime
import logging
from .solana_client import SolanaClient
from ..config import get_settings

logger = logging.getLogger(__name__)

class BlockchainIndexer:
    """Service for indexing on-chain data."""

    def __init__(
        self,
        program_id: str,
        on_account_update: Optional[Callable] = None,
        on_transaction: Optional[Callable] = None
    ):
        self.program_id = program_id
        self.on_account_update = on_account_update
        self.on_transaction = on_transaction
        self._running = False
        self._last_signature: Optional[str] = None

    async def start_polling(self, interval: int = 5):
        """Start polling for new transactions."""
        self._running = True
        logger.info(f"Starting indexer for program {self.program_id}")

        async with SolanaClient() as client:
            while self._running:
                try:
                    await self._poll_transactions(client)
                except Exception as e:
                    logger.error(f"Polling error: {e}")

                await asyncio.sleep(interval)

    async def _poll_transactions(self, client: SolanaClient):
        """Poll for new transactions."""
        signatures = await client.get_signatures_for_address(
            self.program_id,
            limit=50,
            until=self._last_signature
        )

        if not signatures:
            return

        # Process oldest first
        for sig_info in reversed(signatures):
            signature = sig_info["signature"]

            if self.on_transaction:
                tx = await client.get_transaction(signature)
                if tx:
                    await self.on_transaction(tx)

            self._last_signature = signature

    async def index_program_accounts(
        self,
        client: SolanaClient,
        account_type: str,
        discriminator: bytes
    ) -> List[Dict[str, Any]]:
        """Index all accounts of a specific type."""
        accounts = await client._request(
            "getProgramAccounts",
            [
                self.program_id,
                {
                    "encoding": "base64",
                    "commitment": client.commitment,
                    "filters": [
                        {
                            "memcmp": {
                                "offset": 0,
                                "bytes": discriminator.hex()
                            }
                        }
                    ]
                }
            ]
        )

        return accounts or []

    def stop(self):
        """Stop the indexer."""
        self._running = False
        logger.info(f"Stopping indexer for program {self.program_id}")

Caching Service

# app/services/cache.py
from typing import Optional, Any
import json
import hashlib
from functools import wraps
from ..config import get_settings

# Try to import redis, fall back to in-memory cache
try:
    import redis.asyncio as redis
    REDIS_AVAILABLE = True
except ImportError:
    REDIS_AVAILABLE = False

class CacheService:
    """Caching service with Redis or in-memory fallback."""

    def __init__(self):
        settings = get_settings()
        self.ttl = settings.cache_ttl
        self._redis: Optional[redis.Redis] = None
        self._memory_cache: dict = {}

    async def connect(self):
        """Connect to Redis if available."""
        if REDIS_AVAILABLE:
            settings = get_settings()
            try:
                self._redis = redis.from_url(settings.redis_url)
                await self._redis.ping()
            except Exception:
                self._redis = None

    async def disconnect(self):
        """Disconnect from Redis."""
        if self._redis:
            await self._redis.close()

    async def get(self, key: str) -> Optional[Any]:
        """Get cached value."""
        if self._redis:
            value = await self._redis.get(key)
            if value:
                return json.loads(value)
        else:
            cached = self._memory_cache.get(key)
            if cached:
                return cached.get("value")
        return None

    async def set(
        self,
        key: str,
        value: Any,
        ttl: Optional[int] = None
    ):
        """Set cached value."""
        ttl = ttl or self.ttl

        if self._redis:
            await self._redis.setex(
                key,
                ttl,
                json.dumps(value)
            )
        else:
            self._memory_cache[key] = {"value": value}

    async def delete(self, key: str):
        """Delete cached value."""
        if self._redis:
            await self._redis.delete(key)
        else:
            self._memory_cache.pop(key, None)

    async def clear_pattern(self, pattern: str):
        """Clear all keys matching pattern."""
        if self._redis:
            keys = await self._redis.keys(pattern)
            if keys:
                await self._redis.delete(*keys)
        else:
            keys_to_delete = [
                k for k in self._memory_cache
                if pattern.replace("*", "") in k
            ]
            for k in keys_to_delete:
                del self._memory_cache[k]

def cached(prefix: str, ttl: Optional[int] = None):
    """Decorator for caching function results."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            key_data = f"{prefix}:{func.__name__}:{args}:{kwargs}"
            cache_key = hashlib.md5(key_data.encode()).hexdigest()

            cache = args[0] if hasattr(args[0], '_cache') else None
            if cache:
                cached_value = await cache._cache.get(cache_key)
                if cached_value is not None:
                    return cached_value

            result = await func(*args, **kwargs)

            if cache and result is not None:
                await cache._cache.set(cache_key, result, ttl)

            return result
        return wrapper
    return decorator

API Router Implementation

# app/routers/escrows.py
from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Optional
from ..models.escrow import EscrowResponse, EscrowListResponse, EscrowState
from ..services.solana_client import SolanaClient, get_solana_client
from ..services.cache import CacheService
from ..config import get_settings
import base64
import struct

router = APIRouter(prefix="/escrows", tags=["Token Escrow"])

# Anchor discriminator for Escrow account
ESCROW_DISCRIMINATOR = bytes([0x88, 0x45, 0x6a, 0xc3, 0x15, 0x7e, 0x8b, 0x22])

def parse_escrow_account(pubkey: str, data: bytes) -> dict:
    """Parse raw escrow account data."""
    # Skip 8-byte discriminator
    offset = 8

    # Parse fields based on struct layout
    maker = base64.b58encode(data[offset:offset+32]).decode()
    offset += 32

    taker = base64.b58encode(data[offset:offset+32]).decode()
    offset += 32

    mint_a = base64.b58encode(data[offset:offset+32]).decode()
    offset += 32

    mint_b = base64.b58encode(data[offset:offset+32]).decode()
    offset += 32

    amount_a = struct.unpack("<Q", data[offset:offset+8])[0]
    offset += 8

    amount_b = struct.unpack("<Q", data[offset:offset+8])[0]
    offset += 8

    state = data[offset]

    return {
        "public_key": pubkey,
        "maker": maker,
        "taker": taker if taker != "1" * 44 else None,
        "mint_a": mint_a,
        "mint_b": mint_b,
        "amount_a": amount_a,
        "amount_b": amount_b,
        "state": EscrowState(["created", "funded", "completed", "cancelled"][state])
    }

@router.get("/", response_model=EscrowListResponse)
async def list_escrows(
    state: Optional[EscrowState] = None,
    maker: Optional[str] = None,
    page: int = Query(1, ge=1),
    page_size: int = Query(20, ge=1, le=100),
    client: SolanaClient = Depends(get_solana_client)
):
    """
    List all escrows with optional filtering.

    - **state**: Filter by escrow state
    - **maker**: Filter by maker address
    - **page**: Page number (1-indexed)
    - **page_size**: Items per page (max 100)
    """
    settings = get_settings()

    accounts = await client._request(
        "getProgramAccounts",
        [
            settings.token_escrow_program_id,
            {
                "encoding": "base64",
                "filters": [
                    {"memcmp": {"offset": 0, "bytes": base64.b64encode(ESCROW_DISCRIMINATOR).decode()}}
                ]
            }
        ]
    )

    if not accounts:
        return EscrowListResponse(escrows=[], total=0, page=page, page_size=page_size)

    escrows = []
    for account in accounts:
        pubkey = account["pubkey"]
        data = base64.b64decode(account["account"]["data"][0])

        try:
            escrow = parse_escrow_account(pubkey, data)
            if state and escrow["state"] != state:
                continue
            if maker and escrow["maker"] != maker:
                continue
            escrows.append(escrow)
        except Exception:
            continue

    total = len(escrows)
    start = (page - 1) * page_size
    end = start + page_size

    return EscrowListResponse(
        escrows=escrows[start:end],
        total=total,
        page=page,
        page_size=page_size
    )

@router.get("/{pubkey}", response_model=EscrowResponse)
async def get_escrow(
    pubkey: str,
    client: SolanaClient = Depends(get_solana_client)
):
    """Get escrow details by public key."""
    account = await client.get_account_info(pubkey)

    if not account:
        raise HTTPException(status_code=404, detail="Escrow not found")

    data = base64.b64decode(account["data"][0])

    try:
        return parse_escrow_account(pubkey, data)
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Failed to parse escrow: {e}")

Main Application Entry Point

# app/main.py
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
import time
import logging

from .config import get_settings
from .routers import escrows, listings, pools, proposals
from .services.cache import CacheService

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

cache_service = CacheService()

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan handler."""
    logger.info("Starting Solana DApps API")
    await cache_service.connect()
    yield
    logger.info("Shutting down Solana DApps API")
    await cache_service.disconnect()

def create_app() -> FastAPI:
    """Create and configure FastAPI application."""
    settings = get_settings()

    app = FastAPI(
        title=settings.app_name,
        description="Backend API for Solana DApps course",
        version="1.0.0",
        docs_url="/docs",
        redoc_url="/redoc",
        lifespan=lifespan
    )

    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )

    @app.middleware("http")
    async def add_timing_header(request: Request, call_next):
        start_time = time.time()
        response = await call_next(request)
        process_time = time.time() - start_time
        response.headers["X-Process-Time"] = str(process_time)
        return response

    @app.exception_handler(Exception)
    async def global_exception_handler(request: Request, exc: Exception):
        logger.error(f"Unhandled exception: {exc}")
        return JSONResponse(
            status_code=500,
            content={"detail": "Internal server error"}
        )

    app.include_router(escrows.router, prefix=settings.api_prefix)
    app.include_router(listings.router, prefix=settings.api_prefix)
    app.include_router(pools.router, prefix=settings.api_prefix)
    app.include_router(proposals.router, prefix=settings.api_prefix)

    @app.get("/health")
    async def health_check():
        return {"status": "healthy", "version": "1.0.0"}

    return app

app = create_app()

Rate Limiting

# app/dependencies.py
from fastapi import HTTPException, Request
from typing import Dict
import time
from ..config import get_settings

class RateLimiter:
    """Simple in-memory rate limiter."""

    def __init__(self):
        self.requests: Dict[str, list] = {}
        settings = get_settings()
        self.max_requests = settings.rate_limit_requests
        self.period = settings.rate_limit_period

    def is_allowed(self, client_id: str) -> bool:
        """Check if request is allowed."""
        now = time.time()

        if client_id in self.requests:
            self.requests[client_id] = [
                t for t in self.requests[client_id]
                if now - t < self.period
            ]
        else:
            self.requests[client_id] = []

        if len(self.requests[client_id]) >= self.max_requests:
            return False

        self.requests[client_id].append(now)
        return True

rate_limiter = RateLimiter()

async def check_rate_limit(request: Request):
    """FastAPI dependency for rate limiting."""
    client_id = request.client.host if request.client else "unknown"

    if not rate_limiter.is_allowed(client_id):
        raise HTTPException(
            status_code=429,
            detail="Too many requests"
        )

Part B: Poem (Rust) Services

Why Poem for High-Performance Services?

Poem is a full-featured Rust web framework that offers:

  1. Blazing fast: Near-native performance
  2. Type safety: Rust's compile-time guarantees
  3. Async runtime: Built on Tokio for high concurrency
  4. WebSocket support: Native WebSocket handling
  5. Shared types: Share structs with Anchor programs

Service Architecture

services/
├── indexer/                 # Blockchain indexer service
│   ├── Cargo.toml
│   ├── src/
│   │   ├── main.rs
│   │   ├── config.rs
│   │   ├── subscription.rs
│   │   ├── parser.rs
│   │   └── db.rs
│   └── Dockerfile
├── relay/                   # Transaction relay service
│   ├── Cargo.toml
│   ├── src/
│   │   ├── main.rs
│   │   ├── config.rs
│   │   ├── priority.rs
│   │   └── submit.rs
│   └── Dockerfile
└── shared/                  # Shared types
    ├── Cargo.toml
    └── src/
        └── lib.rs

Shared Types Library

// services/shared/src/lib.rs
use serde::{Deserialize, Serialize};

/// Account subscription configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionConfig {
    pub commitment: String,
    pub encoding: String,
}

impl Default for SubscriptionConfig {
    fn default() -> Self {
        Self {
            commitment: "confirmed".to_string(),
            encoding: "base64".to_string(),
        }
    }
}

/// WebSocket message types
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum WsMessage {
    Subscribe { account: String },
    Unsubscribe { account: String },
    AccountUpdate { account: String, data: String, slot: u64 },
    Error { message: String },
}

/// Transaction submission request
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubmitTxRequest {
    pub transaction: String,
    pub priority_fee: Option<u64>,
    pub skip_preflight: bool,
}

/// Transaction submission response
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubmitTxResponse {
    pub signature: String,
    pub slot: u64,
    pub confirmations: Option<u64>,
}

/// Priority fee estimate
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PriorityFeeEstimate {
    pub low: u64,
    pub medium: u64,
    pub high: u64,
    pub percentiles: Vec<u64>,
}

Transaction Relay Service

// services/relay/src/main.rs
use poem::{
    get, handler,
    listener::TcpListener,
    middleware::{Cors, Tracing},
    post,
    web::{Data, Json, Path},
    Route, Server,
};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use std::sync::Arc;

mod config;
mod priority;
mod submit;

use config::Config;
use shared::{PriorityFeeEstimate, SubmitTxRequest, SubmitTxResponse};

struct AppState {
    rpc_client: RpcClient,
    config: Config,
}

#[handler]
async fn health() -> Json<serde_json::Value> {
    Json(serde_json::json!({
        "status": "healthy",
        "service": "relay"
    }))
}

#[handler]
async fn get_priority_fees(
    Data(state): Data<&Arc<AppState>>,
) -> poem::Result<Json<PriorityFeeEstimate>> {
    let estimate = priority::get_fee_estimate(&state.rpc_client).await
        .map_err(|e| poem::Error::from_string(
            e.to_string(),
            poem::http::StatusCode::INTERNAL_SERVER_ERROR
        ))?;

    Ok(Json(estimate))
}

#[handler]
async fn submit_transaction(
    Data(state): Data<&Arc<AppState>>,
    Json(request): Json<SubmitTxRequest>,
) -> poem::Result<Json<SubmitTxResponse>> {
    let response = submit::submit_transaction(&state.rpc_client, request).await
        .map_err(|e| poem::Error::from_string(
            e.to_string(),
            poem::http::StatusCode::BAD_REQUEST
        ))?;

    Ok(Json(response))
}

#[handler]
async fn get_transaction_status(
    Data(state): Data<&Arc<AppState>>,
    Path(signature): Path<String>,
) -> poem::Result<Json<serde_json::Value>> {
    let sig: solana_sdk::signature::Signature = signature.parse()
        .map_err(|_| poem::Error::from_string(
            "Invalid signature",
            poem::http::StatusCode::BAD_REQUEST
        ))?;

    let status = state.rpc_client
        .get_signature_status(&sig)
        .await
        .map_err(|e| poem::Error::from_string(
            e.to_string(),
            poem::http::StatusCode::INTERNAL_SERVER_ERROR
        ))?;

    Ok(Json(serde_json::json!({
        "signature": signature,
        "status": status.map(|s| format!("{:?}", s))
    })))
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::fmt::init();

    let config = Config::from_env()?;

    let rpc_client = RpcClient::new_with_commitment(
        config.rpc_url.clone(),
        CommitmentConfig::confirmed(),
    );

    let state = Arc::new(AppState { rpc_client, config: config.clone() });

    let app = Route::new()
        .at("/health", get(health))
        .at("/priority-fees", get(get_priority_fees))
        .at("/submit", post(submit_transaction))
        .at("/status/:signature", get(get_transaction_status))
        .with(Cors::new())
        .with(Tracing)
        .data(state);

    let addr = format!("0.0.0.0:{}", config.port);
    tracing::info!("Starting relay service on {}", addr);

    Server::new(TcpListener::bind(&addr))
        .run(app)
        .await?;

    Ok(())
}

Priority Fee Estimation

// services/relay/src/priority.rs
use solana_client::nonblocking::rpc_client::RpcClient;
use shared::PriorityFeeEstimate;

pub async fn get_fee_estimate(
    client: &RpcClient,
) -> Result<PriorityFeeEstimate, Box<dyn std::error::Error>> {
    let fees = client.get_recent_prioritization_fees(&[]).await?;

    if fees.is_empty() {
        return Ok(PriorityFeeEstimate {
            low: 1000,
            medium: 10000,
            high: 100000,
            percentiles: vec![],
        });
    }

    let mut fee_values: Vec<u64> = fees.iter()
        .map(|f| f.prioritization_fee)
        .collect();
    fee_values.sort();

    let len = fee_values.len();
    let p25 = fee_values.get(len / 4).copied().unwrap_or(0);
    let p50 = fee_values.get(len / 2).copied().unwrap_or(0);
    let p75 = fee_values.get(len * 3 / 4).copied().unwrap_or(0);
    let p90 = fee_values.get(len * 9 / 10).copied().unwrap_or(0);

    Ok(PriorityFeeEstimate {
        low: p25.max(1000),
        medium: p50.max(5000),
        high: p75.max(25000),
        percentiles: vec![p25, p50, p75, p90],
    })
}

pub fn calculate_priority_fee(
    estimate: &PriorityFeeEstimate,
    urgency: Urgency,
    compute_units: u32,
) -> u64 {
    let micro_lamports_per_cu = match urgency {
        Urgency::Low => estimate.low,
        Urgency::Medium => estimate.medium,
        Urgency::High => estimate.high,
    };

    (micro_lamports_per_cu as u128 * compute_units as u128 / 1_000_000) as u64
}

#[derive(Debug, Clone, Copy)]
pub enum Urgency {
    Low,
    Medium,
    High,
}

Transaction Submission

// services/relay/src/submit.rs
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
    commitment_config::CommitmentConfig,
    compute_budget::ComputeBudgetInstruction,
    instruction::Instruction,
    message::Message,
    transaction::Transaction,
};
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use shared::{SubmitTxRequest, SubmitTxResponse};

pub async fn submit_transaction(
    client: &RpcClient,
    request: SubmitTxRequest,
) -> Result<SubmitTxResponse, Box<dyn std::error::Error>> {
    let tx_bytes = BASE64.decode(&request.transaction)?;
    let mut transaction: Transaction = bincode::deserialize(&tx_bytes)?;

    if let Some(priority_fee) = request.priority_fee {
        inject_priority_fee(&mut transaction, priority_fee)?;
    }

    let blockhash = client.get_latest_blockhash().await?;
    transaction.message.recent_blockhash = blockhash;

    let config = solana_client::rpc_config::RpcSendTransactionConfig {
        skip_preflight: request.skip_preflight,
        preflight_commitment: Some(CommitmentConfig::confirmed().commitment),
        encoding: None,
        max_retries: Some(3),
        min_context_slot: None,
    };

    let signature = client
        .send_transaction_with_config(&transaction, config)
        .await?;

    let slot = client.get_slot().await?;

    Ok(SubmitTxResponse {
        signature: signature.to_string(),
        slot,
        confirmations: None,
    })
}

fn inject_priority_fee(
    transaction: &mut Transaction,
    priority_fee_lamports: u64,
) -> Result<(), Box<dyn std::error::Error>> {
    let compute_limit_ix = ComputeBudgetInstruction::set_compute_unit_limit(200_000);

    let micro_lamports_per_cu = priority_fee_lamports * 1_000_000 / 200_000;
    let compute_price_ix = ComputeBudgetInstruction::set_compute_unit_price(micro_lamports_per_cu);

    let mut new_instructions = vec![compute_limit_ix, compute_price_ix];
    new_instructions.extend(
        transaction.message.instructions.clone().into_iter().map(|ix| {
            Instruction {
                program_id: transaction.message.account_keys[ix.program_id_index as usize],
                accounts: ix.accounts.iter().map(|&i| {
                    solana_sdk::instruction::AccountMeta {
                        pubkey: transaction.message.account_keys[i as usize],
                        is_signer: transaction.message.is_signer(i as usize),
                        is_writable: transaction.message.is_writable(i as usize),
                    }
                }).collect(),
                data: ix.data.clone(),
            }
        })
    );

    let payer = transaction.message.account_keys[0];
    transaction.message = Message::new(&new_instructions, Some(&payer));

    Ok(())
}

WebSocket Subscription Service

// services/indexer/src/subscription.rs
use futures_util::{SinkExt, StreamExt};
use poem::web::websocket::{Message, WebSocket};
use serde_json::json;
use std::sync::Arc;
use tokio::sync::broadcast;

pub struct SubscriptionManager {
    pubsub_url: String,
    update_tx: broadcast::Sender<AccountUpdate>,
}

#[derive(Debug, Clone)]
pub struct AccountUpdate {
    pub pubkey: String,
    pub data: String,
    pub slot: u64,
}

impl SubscriptionManager {
    pub fn new(pubsub_url: String) -> Self {
        let (update_tx, _) = broadcast::channel(1000);
        Self { pubsub_url, update_tx }
    }

    pub async fn subscribe_account(
        &self,
        pubkey: String,
    ) -> Result<(), Box<dyn std::error::Error>> {
        let pubsub_url = self.pubsub_url.clone();
        let update_tx = self.update_tx.clone();

        tokio::spawn(async move {
            // WebSocket subscription logic here
            tracing::info!("Subscribed to account: {}", pubkey);
        });

        Ok(())
    }

    pub fn subscribe_updates(&self) -> broadcast::Receiver<AccountUpdate> {
        self.update_tx.subscribe()
    }
}

pub async fn websocket_handler(
    ws: WebSocket,
    manager: Arc<SubscriptionManager>,
) -> impl poem::IntoResponse {
    ws.on_upgrade(move |socket| async move {
        let (mut sink, mut stream) = socket.split();
        let mut update_rx = manager.subscribe_updates();

        let manager_clone = manager.clone();
        let recv_task = tokio::spawn(async move {
            while let Some(Ok(msg)) = stream.next().await {
                if let Message::Text(text) = msg {
                    if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&text) {
                        if let Some(account) = parsed.get("subscribe").and_then(|v| v.as_str()) {
                            let _ = manager_clone.subscribe_account(account.to_string()).await;
                        }
                    }
                }
            }
        });

        let send_task = tokio::spawn(async move {
            while let Ok(update) = update_rx.recv().await {
                let msg = json!({
                    "type": "accountUpdate",
                    "pubkey": update.pubkey,
                    "data": update.data,
                    "slot": update.slot
                });

                if sink.send(Message::Text(msg.to_string())).await.is_err() {
                    break;
                }
            }
        });

        tokio::select! {
            _ = recv_task => {},
            _ = send_task => {},
        }
    })
}

Cargo Configuration

# services/relay/Cargo.toml
[package]
name = "relay"
version = "0.1.0"
edition = "2021"

[dependencies]
poem = { version = "3", features = ["websocket"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
solana-client = "1.18"
solana-sdk = "1.18"
bincode = "1.3"
base64 = "0.21"
tracing = "0.1"
tracing-subscriber = "0.3"
dotenvy = "0.15"

shared = { path = "../shared" }

[profile.release]
lto = true
codegen-units = 1
opt-level = 3

Deployment

Docker Configuration

# api/Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY pyproject.toml poetry.lock* ./
RUN pip install poetry && poetry install --no-dev

COPY app ./app

CMD ["poetry", "run", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# services/relay/Dockerfile
FROM rust:1.75 as builder

WORKDIR /app
COPY . .
RUN cargo build --release

FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/relay /usr/local/bin/relay

CMD ["relay"]

Environment Variables

# .env.example for API
SOLANA_RPC_URL=https://api.devnet.solana.com
SOLANA_WS_URL=wss://api.devnet.solana.com
DATABASE_URL=postgresql+asyncpg://user:pass@localhost:5432/solana_dapps
REDIS_URL=redis://localhost:6379
TOKEN_ESCROW_PROGRAM_ID=Escr1111111111111111111111111111111111111111
NFT_MARKETPLACE_PROGRAM_ID=NFTM1111111111111111111111111111111111111111
DEFI_AMM_PROGRAM_ID=AMM1111111111111111111111111111111111111111
DAO_GOVERNANCE_PROGRAM_ID=Gov11111111111111111111111111111111111111111

Summary

This module covered building production backend services for Solana DApps:

FastAPI (Python): - Async RPC client with connection pooling - Pydantic models for type-safe blockchain data - Caching with Redis or in-memory fallback - Rate limiting middleware - Blockchain indexing service

Poem (Rust): - High-performance API endpoints - WebSocket subscriptions for real-time updates - Transaction relay with priority fee injection - Shared types with Anchor programs

Key Patterns: - Use async/await for all RPC operations - Cache frequently accessed data - Implement proper error handling - Use typed models for safety - Monitor and index on-chain events

Next Steps