Module 10: Backend Services¶
Overview¶
While Solana programs handle on-chain logic, production DApps require backend services for indexing, caching, analytics, and providing efficient APIs. This module covers building high-performance backends using FastAPI (Python) for rapid development and Poem (Rust) for performance-critical services.
Learning Objectives¶
By the end of this module, you will be able to:
- Design backend architectures for Solana DApps
- Build async APIs with FastAPI and proper error handling
- Index blockchain data efficiently using RPC subscriptions
- Implement high-performance Rust services with Poem
- Set up transaction relay with priority fees
- Handle rate limiting, caching, and webhook integrations
Part A: FastAPI Backend¶
Why FastAPI for Solana Backends?¶
FastAPI offers several advantages for blockchain backends:
- Async-first: Native async/await support for non-blocking RPC calls
- Type safety: Pydantic models ensure data validation
- Auto-documentation: OpenAPI/Swagger docs generated automatically
- Performance: One of the fastest Python frameworks
- Python ecosystem: Access to data science and ML libraries
Project Structure¶
api/
├── app/
│ ├── __init__.py
│ ├── main.py # Application entry point
│ ├── config.py # Configuration management
│ ├── dependencies.py # Dependency injection
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── escrows.py # Token escrow endpoints
│ │ ├── listings.py # NFT marketplace endpoints
│ │ ├── pools.py # AMM pool endpoints
│ │ └── proposals.py # DAO governance endpoints
│ ├── models/
│ │ ├── __init__.py
│ │ ├── common.py # Shared models
│ │ ├── escrow.py # Escrow data models
│ │ ├── listing.py # Listing data models
│ │ ├── pool.py # Pool data models
│ │ └── proposal.py # Proposal data models
│ ├── services/
│ │ ├── __init__.py
│ │ ├── solana_client.py # RPC client wrapper
│ │ ├── indexer.py # Blockchain indexer
│ │ └── cache.py # Caching service
│ └── db/
│ ├── __init__.py
│ ├── session.py # Database session
│ └── models.py # SQLAlchemy models
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ └── test_api.py
├── pyproject.toml
├── Dockerfile
└── .env.example
Configuration Management¶
# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
"""Application settings with environment variable support."""
# API Configuration
app_name: str = "Solana DApps API"
debug: bool = False
api_prefix: str = "/api/v1"
# Solana Configuration
solana_rpc_url: str = "https://api.devnet.solana.com"
solana_ws_url: str = "wss://api.devnet.solana.com"
commitment: str = "confirmed"
# Program IDs
token_escrow_program_id: str = ""
nft_marketplace_program_id: str = ""
defi_amm_program_id: str = ""
dao_governance_program_id: str = ""
# Database
database_url: str = "sqlite+aiosqlite:///./app.db"
# Redis Cache
redis_url: str = "redis://localhost:6379"
cache_ttl: int = 60 # seconds
# Rate Limiting
rate_limit_requests: int = 100
rate_limit_period: int = 60 # seconds
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
@lru_cache
def get_settings() -> Settings:
"""Cached settings instance."""
return Settings()
Pydantic Models for Blockchain Data¶
# app/models/common.py
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from datetime import datetime
from enum import Enum
class PublicKeyStr(str):
"""Validated Solana public key string."""
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not isinstance(v, str):
raise TypeError("string required")
if len(v) not in (43, 44):
raise ValueError("invalid public key length")
# Base58 character check
import re
if not re.match(r'^[1-9A-HJ-NP-Za-km-z]+$', v):
raise ValueError("invalid base58 characters")
return cls(v)
class TransactionStatus(str, Enum):
"""Transaction confirmation status."""
PENDING = "pending"
CONFIRMED = "confirmed"
FINALIZED = "finalized"
FAILED = "failed"
class SolanaTransaction(BaseModel):
"""Solana transaction response model."""
signature: str
slot: int
block_time: Optional[int] = None
fee: int
status: TransactionStatus
class Config:
json_schema_extra = {
"example": {
"signature": "5...abc",
"slot": 123456789,
"block_time": 1699999999,
"fee": 5000,
"status": "confirmed"
}
}
class PaginatedResponse(BaseModel):
"""Generic paginated response."""
items: List
total: int
page: int
page_size: int
has_next: bool
has_prev: bool
Escrow Models¶
# app/models/escrow.py
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
from enum import Enum
from .common import PublicKeyStr
class EscrowState(str, Enum):
"""Escrow lifecycle states."""
CREATED = "created"
FUNDED = "funded"
COMPLETED = "completed"
CANCELLED = "cancelled"
class EscrowBase(BaseModel):
"""Base escrow model."""
maker: PublicKeyStr
mint_a: PublicKeyStr
mint_b: PublicKeyStr
amount_a: int = Field(ge=0)
amount_b: int = Field(ge=0)
class EscrowCreate(EscrowBase):
"""Request model for creating escrow."""
pass
class EscrowResponse(EscrowBase):
"""Response model for escrow data."""
public_key: PublicKeyStr
taker: Optional[PublicKeyStr] = None
vault: PublicKeyStr
state: EscrowState
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class EscrowListResponse(BaseModel):
"""Paginated list of escrows."""
escrows: list[EscrowResponse]
total: int
page: int
page_size: int
Async Solana RPC Client¶
# app/services/solana_client.py
import httpx
from typing import Optional, Any, List
from contextlib import asynccontextmanager
import asyncio
from ..config import get_settings
class SolanaRPCError(Exception):
"""Custom exception for RPC errors."""
def __init__(self, code: int, message: str):
self.code = code
self.message = message
super().__init__(f"RPC Error {code}: {message}")
class SolanaClient:
"""Async Solana RPC client with connection pooling."""
def __init__(self, rpc_url: Optional[str] = None):
settings = get_settings()
self.rpc_url = rpc_url or settings.solana_rpc_url
self.commitment = settings.commitment
self._client: Optional[httpx.AsyncClient] = None
self._request_id = 0
async def __aenter__(self):
self._client = httpx.AsyncClient(
base_url=self.rpc_url,
timeout=30.0,
limits=httpx.Limits(max_connections=100)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._client:
await self._client.aclose()
def _next_id(self) -> int:
self._request_id += 1
return self._request_id
async def _request(
self,
method: str,
params: Optional[List] = None
) -> Any:
"""Make JSON-RPC request."""
if not self._client:
raise RuntimeError("Client not initialized. Use async context manager.")
payload = {
"jsonrpc": "2.0",
"id": self._next_id(),
"method": method,
"params": params or []
}
response = await self._client.post("", json=payload)
response.raise_for_status()
data = response.json()
if "error" in data:
raise SolanaRPCError(
data["error"].get("code", -1),
data["error"].get("message", "Unknown error")
)
return data.get("result")
async def get_account_info(
self,
pubkey: str,
encoding: str = "base64"
) -> Optional[dict]:
"""Get account info by public key."""
result = await self._request(
"getAccountInfo",
[pubkey, {"encoding": encoding, "commitment": self.commitment}]
)
return result.get("value") if result else None
async def get_multiple_accounts(
self,
pubkeys: List[str],
encoding: str = "base64"
) -> List[Optional[dict]]:
"""Get multiple accounts in single request."""
result = await self._request(
"getMultipleAccounts",
[pubkeys, {"encoding": encoding, "commitment": self.commitment}]
)
return result.get("value", []) if result else []
async def get_signatures_for_address(
self,
address: str,
limit: int = 100,
before: Optional[str] = None,
until: Optional[str] = None
) -> List[dict]:
"""Get transaction signatures for an address."""
options = {"limit": limit, "commitment": self.commitment}
if before:
options["before"] = before
if until:
options["until"] = until
return await self._request(
"getSignaturesForAddress",
[address, options]
)
async def get_transaction(
self,
signature: str,
encoding: str = "json"
) -> Optional[dict]:
"""Get transaction details by signature."""
return await self._request(
"getTransaction",
[signature, {"encoding": encoding, "commitment": self.commitment}]
)
async def get_slot(self) -> int:
"""Get current slot."""
return await self._request("getSlot", [{"commitment": self.commitment}])
async def get_balance(self, pubkey: str) -> int:
"""Get SOL balance in lamports."""
result = await self._request(
"getBalance",
[pubkey, {"commitment": self.commitment}]
)
return result.get("value", 0)
# Dependency for FastAPI
async def get_solana_client():
"""FastAPI dependency for Solana client."""
async with SolanaClient() as client:
yield client
Blockchain Indexer Service¶
# app/services/indexer.py
import asyncio
from typing import Optional, Callable, List, Dict, Any
from datetime import datetime
import logging
from .solana_client import SolanaClient
from ..config import get_settings
logger = logging.getLogger(__name__)
class BlockchainIndexer:
"""Service for indexing on-chain data."""
def __init__(
self,
program_id: str,
on_account_update: Optional[Callable] = None,
on_transaction: Optional[Callable] = None
):
self.program_id = program_id
self.on_account_update = on_account_update
self.on_transaction = on_transaction
self._running = False
self._last_signature: Optional[str] = None
async def start_polling(self, interval: int = 5):
"""Start polling for new transactions."""
self._running = True
logger.info(f"Starting indexer for program {self.program_id}")
async with SolanaClient() as client:
while self._running:
try:
await self._poll_transactions(client)
except Exception as e:
logger.error(f"Polling error: {e}")
await asyncio.sleep(interval)
async def _poll_transactions(self, client: SolanaClient):
"""Poll for new transactions."""
signatures = await client.get_signatures_for_address(
self.program_id,
limit=50,
until=self._last_signature
)
if not signatures:
return
# Process oldest first
for sig_info in reversed(signatures):
signature = sig_info["signature"]
if self.on_transaction:
tx = await client.get_transaction(signature)
if tx:
await self.on_transaction(tx)
self._last_signature = signature
async def index_program_accounts(
self,
client: SolanaClient,
account_type: str,
discriminator: bytes
) -> List[Dict[str, Any]]:
"""Index all accounts of a specific type."""
accounts = await client._request(
"getProgramAccounts",
[
self.program_id,
{
"encoding": "base64",
"commitment": client.commitment,
"filters": [
{
"memcmp": {
"offset": 0,
"bytes": discriminator.hex()
}
}
]
}
]
)
return accounts or []
def stop(self):
"""Stop the indexer."""
self._running = False
logger.info(f"Stopping indexer for program {self.program_id}")
Caching Service¶
# app/services/cache.py
from typing import Optional, Any
import json
import hashlib
from functools import wraps
from ..config import get_settings
# Try to import redis, fall back to in-memory cache
try:
import redis.asyncio as redis
REDIS_AVAILABLE = True
except ImportError:
REDIS_AVAILABLE = False
class CacheService:
"""Caching service with Redis or in-memory fallback."""
def __init__(self):
settings = get_settings()
self.ttl = settings.cache_ttl
self._redis: Optional[redis.Redis] = None
self._memory_cache: dict = {}
async def connect(self):
"""Connect to Redis if available."""
if REDIS_AVAILABLE:
settings = get_settings()
try:
self._redis = redis.from_url(settings.redis_url)
await self._redis.ping()
except Exception:
self._redis = None
async def disconnect(self):
"""Disconnect from Redis."""
if self._redis:
await self._redis.close()
async def get(self, key: str) -> Optional[Any]:
"""Get cached value."""
if self._redis:
value = await self._redis.get(key)
if value:
return json.loads(value)
else:
cached = self._memory_cache.get(key)
if cached:
return cached.get("value")
return None
async def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None
):
"""Set cached value."""
ttl = ttl or self.ttl
if self._redis:
await self._redis.setex(
key,
ttl,
json.dumps(value)
)
else:
self._memory_cache[key] = {"value": value}
async def delete(self, key: str):
"""Delete cached value."""
if self._redis:
await self._redis.delete(key)
else:
self._memory_cache.pop(key, None)
async def clear_pattern(self, pattern: str):
"""Clear all keys matching pattern."""
if self._redis:
keys = await self._redis.keys(pattern)
if keys:
await self._redis.delete(*keys)
else:
keys_to_delete = [
k for k in self._memory_cache
if pattern.replace("*", "") in k
]
for k in keys_to_delete:
del self._memory_cache[k]
def cached(prefix: str, ttl: Optional[int] = None):
"""Decorator for caching function results."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
key_data = f"{prefix}:{func.__name__}:{args}:{kwargs}"
cache_key = hashlib.md5(key_data.encode()).hexdigest()
cache = args[0] if hasattr(args[0], '_cache') else None
if cache:
cached_value = await cache._cache.get(cache_key)
if cached_value is not None:
return cached_value
result = await func(*args, **kwargs)
if cache and result is not None:
await cache._cache.set(cache_key, result, ttl)
return result
return wrapper
return decorator
API Router Implementation¶
# app/routers/escrows.py
from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Optional
from ..models.escrow import EscrowResponse, EscrowListResponse, EscrowState
from ..services.solana_client import SolanaClient, get_solana_client
from ..services.cache import CacheService
from ..config import get_settings
import base64
import struct
router = APIRouter(prefix="/escrows", tags=["Token Escrow"])
# Anchor discriminator for Escrow account
ESCROW_DISCRIMINATOR = bytes([0x88, 0x45, 0x6a, 0xc3, 0x15, 0x7e, 0x8b, 0x22])
def parse_escrow_account(pubkey: str, data: bytes) -> dict:
"""Parse raw escrow account data."""
# Skip 8-byte discriminator
offset = 8
# Parse fields based on struct layout
maker = base64.b58encode(data[offset:offset+32]).decode()
offset += 32
taker = base64.b58encode(data[offset:offset+32]).decode()
offset += 32
mint_a = base64.b58encode(data[offset:offset+32]).decode()
offset += 32
mint_b = base64.b58encode(data[offset:offset+32]).decode()
offset += 32
amount_a = struct.unpack("<Q", data[offset:offset+8])[0]
offset += 8
amount_b = struct.unpack("<Q", data[offset:offset+8])[0]
offset += 8
state = data[offset]
return {
"public_key": pubkey,
"maker": maker,
"taker": taker if taker != "1" * 44 else None,
"mint_a": mint_a,
"mint_b": mint_b,
"amount_a": amount_a,
"amount_b": amount_b,
"state": EscrowState(["created", "funded", "completed", "cancelled"][state])
}
@router.get("/", response_model=EscrowListResponse)
async def list_escrows(
state: Optional[EscrowState] = None,
maker: Optional[str] = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
client: SolanaClient = Depends(get_solana_client)
):
"""
List all escrows with optional filtering.
- **state**: Filter by escrow state
- **maker**: Filter by maker address
- **page**: Page number (1-indexed)
- **page_size**: Items per page (max 100)
"""
settings = get_settings()
accounts = await client._request(
"getProgramAccounts",
[
settings.token_escrow_program_id,
{
"encoding": "base64",
"filters": [
{"memcmp": {"offset": 0, "bytes": base64.b64encode(ESCROW_DISCRIMINATOR).decode()}}
]
}
]
)
if not accounts:
return EscrowListResponse(escrows=[], total=0, page=page, page_size=page_size)
escrows = []
for account in accounts:
pubkey = account["pubkey"]
data = base64.b64decode(account["account"]["data"][0])
try:
escrow = parse_escrow_account(pubkey, data)
if state and escrow["state"] != state:
continue
if maker and escrow["maker"] != maker:
continue
escrows.append(escrow)
except Exception:
continue
total = len(escrows)
start = (page - 1) * page_size
end = start + page_size
return EscrowListResponse(
escrows=escrows[start:end],
total=total,
page=page,
page_size=page_size
)
@router.get("/{pubkey}", response_model=EscrowResponse)
async def get_escrow(
pubkey: str,
client: SolanaClient = Depends(get_solana_client)
):
"""Get escrow details by public key."""
account = await client.get_account_info(pubkey)
if not account:
raise HTTPException(status_code=404, detail="Escrow not found")
data = base64.b64decode(account["data"][0])
try:
return parse_escrow_account(pubkey, data)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to parse escrow: {e}")
Main Application Entry Point¶
# app/main.py
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
import time
import logging
from .config import get_settings
from .routers import escrows, listings, pools, proposals
from .services.cache import CacheService
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
cache_service = CacheService()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler."""
logger.info("Starting Solana DApps API")
await cache_service.connect()
yield
logger.info("Shutting down Solana DApps API")
await cache_service.disconnect()
def create_app() -> FastAPI:
"""Create and configure FastAPI application."""
settings = get_settings()
app = FastAPI(
title=settings.app_name,
description="Backend API for Solana DApps course",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.middleware("http")
async def add_timing_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"Unhandled exception: {exc}")
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)
app.include_router(escrows.router, prefix=settings.api_prefix)
app.include_router(listings.router, prefix=settings.api_prefix)
app.include_router(pools.router, prefix=settings.api_prefix)
app.include_router(proposals.router, prefix=settings.api_prefix)
@app.get("/health")
async def health_check():
return {"status": "healthy", "version": "1.0.0"}
return app
app = create_app()
Rate Limiting¶
# app/dependencies.py
from fastapi import HTTPException, Request
from typing import Dict
import time
from ..config import get_settings
class RateLimiter:
"""Simple in-memory rate limiter."""
def __init__(self):
self.requests: Dict[str, list] = {}
settings = get_settings()
self.max_requests = settings.rate_limit_requests
self.period = settings.rate_limit_period
def is_allowed(self, client_id: str) -> bool:
"""Check if request is allowed."""
now = time.time()
if client_id in self.requests:
self.requests[client_id] = [
t for t in self.requests[client_id]
if now - t < self.period
]
else:
self.requests[client_id] = []
if len(self.requests[client_id]) >= self.max_requests:
return False
self.requests[client_id].append(now)
return True
rate_limiter = RateLimiter()
async def check_rate_limit(request: Request):
"""FastAPI dependency for rate limiting."""
client_id = request.client.host if request.client else "unknown"
if not rate_limiter.is_allowed(client_id):
raise HTTPException(
status_code=429,
detail="Too many requests"
)
Part B: Poem (Rust) Services¶
Why Poem for High-Performance Services?¶
Poem is a full-featured Rust web framework that offers:
- Blazing fast: Near-native performance
- Type safety: Rust's compile-time guarantees
- Async runtime: Built on Tokio for high concurrency
- WebSocket support: Native WebSocket handling
- Shared types: Share structs with Anchor programs
Service Architecture¶
services/
├── indexer/ # Blockchain indexer service
│ ├── Cargo.toml
│ ├── src/
│ │ ├── main.rs
│ │ ├── config.rs
│ │ ├── subscription.rs
│ │ ├── parser.rs
│ │ └── db.rs
│ └── Dockerfile
├── relay/ # Transaction relay service
│ ├── Cargo.toml
│ ├── src/
│ │ ├── main.rs
│ │ ├── config.rs
│ │ ├── priority.rs
│ │ └── submit.rs
│ └── Dockerfile
└── shared/ # Shared types
├── Cargo.toml
└── src/
└── lib.rs
Shared Types Library¶
// services/shared/src/lib.rs
use serde::{Deserialize, Serialize};
/// Account subscription configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionConfig {
pub commitment: String,
pub encoding: String,
}
impl Default for SubscriptionConfig {
fn default() -> Self {
Self {
commitment: "confirmed".to_string(),
encoding: "base64".to_string(),
}
}
}
/// WebSocket message types
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum WsMessage {
Subscribe { account: String },
Unsubscribe { account: String },
AccountUpdate { account: String, data: String, slot: u64 },
Error { message: String },
}
/// Transaction submission request
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubmitTxRequest {
pub transaction: String,
pub priority_fee: Option<u64>,
pub skip_preflight: bool,
}
/// Transaction submission response
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubmitTxResponse {
pub signature: String,
pub slot: u64,
pub confirmations: Option<u64>,
}
/// Priority fee estimate
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PriorityFeeEstimate {
pub low: u64,
pub medium: u64,
pub high: u64,
pub percentiles: Vec<u64>,
}
Transaction Relay Service¶
// services/relay/src/main.rs
use poem::{
get, handler,
listener::TcpListener,
middleware::{Cors, Tracing},
post,
web::{Data, Json, Path},
Route, Server,
};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use std::sync::Arc;
mod config;
mod priority;
mod submit;
use config::Config;
use shared::{PriorityFeeEstimate, SubmitTxRequest, SubmitTxResponse};
struct AppState {
rpc_client: RpcClient,
config: Config,
}
#[handler]
async fn health() -> Json<serde_json::Value> {
Json(serde_json::json!({
"status": "healthy",
"service": "relay"
}))
}
#[handler]
async fn get_priority_fees(
Data(state): Data<&Arc<AppState>>,
) -> poem::Result<Json<PriorityFeeEstimate>> {
let estimate = priority::get_fee_estimate(&state.rpc_client).await
.map_err(|e| poem::Error::from_string(
e.to_string(),
poem::http::StatusCode::INTERNAL_SERVER_ERROR
))?;
Ok(Json(estimate))
}
#[handler]
async fn submit_transaction(
Data(state): Data<&Arc<AppState>>,
Json(request): Json<SubmitTxRequest>,
) -> poem::Result<Json<SubmitTxResponse>> {
let response = submit::submit_transaction(&state.rpc_client, request).await
.map_err(|e| poem::Error::from_string(
e.to_string(),
poem::http::StatusCode::BAD_REQUEST
))?;
Ok(Json(response))
}
#[handler]
async fn get_transaction_status(
Data(state): Data<&Arc<AppState>>,
Path(signature): Path<String>,
) -> poem::Result<Json<serde_json::Value>> {
let sig: solana_sdk::signature::Signature = signature.parse()
.map_err(|_| poem::Error::from_string(
"Invalid signature",
poem::http::StatusCode::BAD_REQUEST
))?;
let status = state.rpc_client
.get_signature_status(&sig)
.await
.map_err(|e| poem::Error::from_string(
e.to_string(),
poem::http::StatusCode::INTERNAL_SERVER_ERROR
))?;
Ok(Json(serde_json::json!({
"signature": signature,
"status": status.map(|s| format!("{:?}", s))
})))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let config = Config::from_env()?;
let rpc_client = RpcClient::new_with_commitment(
config.rpc_url.clone(),
CommitmentConfig::confirmed(),
);
let state = Arc::new(AppState { rpc_client, config: config.clone() });
let app = Route::new()
.at("/health", get(health))
.at("/priority-fees", get(get_priority_fees))
.at("/submit", post(submit_transaction))
.at("/status/:signature", get(get_transaction_status))
.with(Cors::new())
.with(Tracing)
.data(state);
let addr = format!("0.0.0.0:{}", config.port);
tracing::info!("Starting relay service on {}", addr);
Server::new(TcpListener::bind(&addr))
.run(app)
.await?;
Ok(())
}
Priority Fee Estimation¶
// services/relay/src/priority.rs
use solana_client::nonblocking::rpc_client::RpcClient;
use shared::PriorityFeeEstimate;
pub async fn get_fee_estimate(
client: &RpcClient,
) -> Result<PriorityFeeEstimate, Box<dyn std::error::Error>> {
let fees = client.get_recent_prioritization_fees(&[]).await?;
if fees.is_empty() {
return Ok(PriorityFeeEstimate {
low: 1000,
medium: 10000,
high: 100000,
percentiles: vec![],
});
}
let mut fee_values: Vec<u64> = fees.iter()
.map(|f| f.prioritization_fee)
.collect();
fee_values.sort();
let len = fee_values.len();
let p25 = fee_values.get(len / 4).copied().unwrap_or(0);
let p50 = fee_values.get(len / 2).copied().unwrap_or(0);
let p75 = fee_values.get(len * 3 / 4).copied().unwrap_or(0);
let p90 = fee_values.get(len * 9 / 10).copied().unwrap_or(0);
Ok(PriorityFeeEstimate {
low: p25.max(1000),
medium: p50.max(5000),
high: p75.max(25000),
percentiles: vec![p25, p50, p75, p90],
})
}
pub fn calculate_priority_fee(
estimate: &PriorityFeeEstimate,
urgency: Urgency,
compute_units: u32,
) -> u64 {
let micro_lamports_per_cu = match urgency {
Urgency::Low => estimate.low,
Urgency::Medium => estimate.medium,
Urgency::High => estimate.high,
};
(micro_lamports_per_cu as u128 * compute_units as u128 / 1_000_000) as u64
}
#[derive(Debug, Clone, Copy)]
pub enum Urgency {
Low,
Medium,
High,
}
Transaction Submission¶
// services/relay/src/submit.rs
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
commitment_config::CommitmentConfig,
compute_budget::ComputeBudgetInstruction,
instruction::Instruction,
message::Message,
transaction::Transaction,
};
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use shared::{SubmitTxRequest, SubmitTxResponse};
pub async fn submit_transaction(
client: &RpcClient,
request: SubmitTxRequest,
) -> Result<SubmitTxResponse, Box<dyn std::error::Error>> {
let tx_bytes = BASE64.decode(&request.transaction)?;
let mut transaction: Transaction = bincode::deserialize(&tx_bytes)?;
if let Some(priority_fee) = request.priority_fee {
inject_priority_fee(&mut transaction, priority_fee)?;
}
let blockhash = client.get_latest_blockhash().await?;
transaction.message.recent_blockhash = blockhash;
let config = solana_client::rpc_config::RpcSendTransactionConfig {
skip_preflight: request.skip_preflight,
preflight_commitment: Some(CommitmentConfig::confirmed().commitment),
encoding: None,
max_retries: Some(3),
min_context_slot: None,
};
let signature = client
.send_transaction_with_config(&transaction, config)
.await?;
let slot = client.get_slot().await?;
Ok(SubmitTxResponse {
signature: signature.to_string(),
slot,
confirmations: None,
})
}
fn inject_priority_fee(
transaction: &mut Transaction,
priority_fee_lamports: u64,
) -> Result<(), Box<dyn std::error::Error>> {
let compute_limit_ix = ComputeBudgetInstruction::set_compute_unit_limit(200_000);
let micro_lamports_per_cu = priority_fee_lamports * 1_000_000 / 200_000;
let compute_price_ix = ComputeBudgetInstruction::set_compute_unit_price(micro_lamports_per_cu);
let mut new_instructions = vec![compute_limit_ix, compute_price_ix];
new_instructions.extend(
transaction.message.instructions.clone().into_iter().map(|ix| {
Instruction {
program_id: transaction.message.account_keys[ix.program_id_index as usize],
accounts: ix.accounts.iter().map(|&i| {
solana_sdk::instruction::AccountMeta {
pubkey: transaction.message.account_keys[i as usize],
is_signer: transaction.message.is_signer(i as usize),
is_writable: transaction.message.is_writable(i as usize),
}
}).collect(),
data: ix.data.clone(),
}
})
);
let payer = transaction.message.account_keys[0];
transaction.message = Message::new(&new_instructions, Some(&payer));
Ok(())
}
WebSocket Subscription Service¶
// services/indexer/src/subscription.rs
use futures_util::{SinkExt, StreamExt};
use poem::web::websocket::{Message, WebSocket};
use serde_json::json;
use std::sync::Arc;
use tokio::sync::broadcast;
pub struct SubscriptionManager {
pubsub_url: String,
update_tx: broadcast::Sender<AccountUpdate>,
}
#[derive(Debug, Clone)]
pub struct AccountUpdate {
pub pubkey: String,
pub data: String,
pub slot: u64,
}
impl SubscriptionManager {
pub fn new(pubsub_url: String) -> Self {
let (update_tx, _) = broadcast::channel(1000);
Self { pubsub_url, update_tx }
}
pub async fn subscribe_account(
&self,
pubkey: String,
) -> Result<(), Box<dyn std::error::Error>> {
let pubsub_url = self.pubsub_url.clone();
let update_tx = self.update_tx.clone();
tokio::spawn(async move {
// WebSocket subscription logic here
tracing::info!("Subscribed to account: {}", pubkey);
});
Ok(())
}
pub fn subscribe_updates(&self) -> broadcast::Receiver<AccountUpdate> {
self.update_tx.subscribe()
}
}
pub async fn websocket_handler(
ws: WebSocket,
manager: Arc<SubscriptionManager>,
) -> impl poem::IntoResponse {
ws.on_upgrade(move |socket| async move {
let (mut sink, mut stream) = socket.split();
let mut update_rx = manager.subscribe_updates();
let manager_clone = manager.clone();
let recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = stream.next().await {
if let Message::Text(text) = msg {
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&text) {
if let Some(account) = parsed.get("subscribe").and_then(|v| v.as_str()) {
let _ = manager_clone.subscribe_account(account.to_string()).await;
}
}
}
}
});
let send_task = tokio::spawn(async move {
while let Ok(update) = update_rx.recv().await {
let msg = json!({
"type": "accountUpdate",
"pubkey": update.pubkey,
"data": update.data,
"slot": update.slot
});
if sink.send(Message::Text(msg.to_string())).await.is_err() {
break;
}
}
});
tokio::select! {
_ = recv_task => {},
_ = send_task => {},
}
})
}
Cargo Configuration¶
# services/relay/Cargo.toml
[package]
name = "relay"
version = "0.1.0"
edition = "2021"
[dependencies]
poem = { version = "3", features = ["websocket"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
solana-client = "1.18"
solana-sdk = "1.18"
bincode = "1.3"
base64 = "0.21"
tracing = "0.1"
tracing-subscriber = "0.3"
dotenvy = "0.15"
shared = { path = "../shared" }
[profile.release]
lto = true
codegen-units = 1
opt-level = 3
Deployment¶
Docker Configuration¶
# api/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY pyproject.toml poetry.lock* ./
RUN pip install poetry && poetry install --no-dev
COPY app ./app
CMD ["poetry", "run", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# services/relay/Dockerfile
FROM rust:1.75 as builder
WORKDIR /app
COPY . .
RUN cargo build --release
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/relay /usr/local/bin/relay
CMD ["relay"]
Environment Variables¶
# .env.example for API
SOLANA_RPC_URL=https://api.devnet.solana.com
SOLANA_WS_URL=wss://api.devnet.solana.com
DATABASE_URL=postgresql+asyncpg://user:pass@localhost:5432/solana_dapps
REDIS_URL=redis://localhost:6379
TOKEN_ESCROW_PROGRAM_ID=Escr1111111111111111111111111111111111111111
NFT_MARKETPLACE_PROGRAM_ID=NFTM1111111111111111111111111111111111111111
DEFI_AMM_PROGRAM_ID=AMM1111111111111111111111111111111111111111
DAO_GOVERNANCE_PROGRAM_ID=Gov11111111111111111111111111111111111111111
Summary¶
This module covered building production backend services for Solana DApps:
FastAPI (Python): - Async RPC client with connection pooling - Pydantic models for type-safe blockchain data - Caching with Redis or in-memory fallback - Rate limiting middleware - Blockchain indexing service
Poem (Rust): - High-performance API endpoints - WebSocket subscriptions for real-time updates - Transaction relay with priority fee injection - Shared types with Anchor programs
Key Patterns: - Use async/await for all RPC operations - Cache frequently accessed data - Implement proper error handling - Use typed models for safety - Monitor and index on-chain events
Next Steps¶
- Module 11: Database Layer - PostgreSQL schemas and migrations
- Module 12: Kubernetes Deployment - Container orchestration