Database Patterns

Effectively storing and querying blockchain data requires specialized database patterns that handle immutable history, real-time updates, and complex relationships.

Database Schema Design

Text
┌─────────────────────────────────────────────────────────────────┐
│                  Database Schema Architecture                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐    │
│  │   Accounts   │────▶│Token Accounts│────▶│   Tokens     │    │
│  │              │     │              │     │   (Mints)    │    │
│  └──────────────┘     └──────────────┘     └──────────────┘    │
│         │                                         │            │
│         │                                         │            │
│         ▼                                         ▼            │
│  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐    │
│  │ Transactions │────▶│ Instructions │────▶│   Events     │    │
│  │              │     │              │     │              │    │
│  └──────────────┘     └──────────────┘     └──────────────┘    │
│         │                                                      │
│         │                                                      │
│         ▼                                                      │
│  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐    │
│  │   Balances   │     │   Prices     │     │   NFTs       │    │
│  │  (Snapshots) │     │  (History)   │     │  (Metadata)  │    │
│  └──────────────┘     └──────────────┘     └──────────────┘    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

PostgreSQL Schema

SQL
-- Core tables for blockchain data

-- Accounts table
CREATE TABLE accounts (
    address TEXT PRIMARY KEY,
    owner TEXT NOT NULL,
    lamports BIGINT NOT NULL DEFAULT 0,
    executable BOOLEAN DEFAULT FALSE,
    rent_epoch BIGINT,
    data_hash TEXT,
    slot_updated BIGINT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_accounts_owner ON accounts(owner);
CREATE INDEX idx_accounts_slot ON accounts(slot_updated);

-- Token mints
CREATE TABLE token_mints (
    address TEXT PRIMARY KEY,
    decimals SMALLINT NOT NULL,
    supply NUMERIC(78, 0) NOT NULL DEFAULT 0,
    mint_authority TEXT,
    freeze_authority TEXT,
    name TEXT,
    symbol TEXT,
    uri TEXT,
    is_nft BOOLEAN DEFAULT FALSE,
    slot_updated BIGINT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_token_mints_symbol ON token_mints(symbol);
CREATE INDEX idx_token_mints_nft ON token_mints(is_nft) WHERE is_nft = TRUE;

-- Token accounts
CREATE TABLE token_accounts (
    address TEXT PRIMARY KEY,
    mint TEXT NOT NULL REFERENCES token_mints(address),
    owner TEXT NOT NULL,
    amount NUMERIC(78, 0) NOT NULL DEFAULT 0,
    delegate TEXT,
    delegated_amount NUMERIC(78, 0) DEFAULT 0,
    is_frozen BOOLEAN DEFAULT FALSE,
    is_native BOOLEAN DEFAULT FALSE,
    close_authority TEXT,
    slot_updated BIGINT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_token_accounts_owner ON token_accounts(owner);
CREATE INDEX idx_token_accounts_mint ON token_accounts(mint);
CREATE INDEX idx_token_accounts_owner_mint ON token_accounts(owner, mint);

-- Transactions
CREATE TABLE transactions (
    signature TEXT PRIMARY KEY,
    slot BIGINT NOT NULL,
    block_time TIMESTAMPTZ,
    fee BIGINT NOT NULL,
    success BOOLEAN NOT NULL,
    err JSONB,
    memo TEXT,
    compute_units_consumed BIGINT,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_transactions_slot ON transactions(slot);
CREATE INDEX idx_transactions_block_time ON transactions(block_time DESC);
CREATE INDEX idx_transactions_success ON transactions(success);

-- Transaction account mappings (for address lookups)
CREATE TABLE transaction_accounts (
    signature TEXT NOT NULL REFERENCES transactions(signature),
    address TEXT NOT NULL,
    is_signer BOOLEAN NOT NULL DEFAULT FALSE,
    is_writable BOOLEAN NOT NULL DEFAULT FALSE,
    position SMALLINT NOT NULL,
    PRIMARY KEY (signature, address)
);

CREATE INDEX idx_tx_accounts_address ON transaction_accounts(address);
CREATE INDEX idx_tx_accounts_address_signer ON transaction_accounts(address)
    WHERE is_signer = TRUE;

-- Instructions
CREATE TABLE instructions (
    id BIGSERIAL PRIMARY KEY,
    signature TEXT NOT NULL REFERENCES transactions(signature),
    program_id TEXT NOT NULL,
    instruction_index SMALLINT NOT NULL,
    inner_index SMALLINT,
    data BYTEA,
    parsed_type TEXT,
    parsed_info JSONB
);

CREATE INDEX idx_instructions_signature ON instructions(signature);
CREATE INDEX idx_instructions_program ON instructions(program_id);
CREATE INDEX idx_instructions_parsed_type ON instructions(parsed_type);

-- Events (program logs)
CREATE TABLE events (
    id BIGSERIAL PRIMARY KEY,
    signature TEXT NOT NULL REFERENCES transactions(signature),
    program_id TEXT NOT NULL,
    event_type TEXT NOT NULL,
    data JSONB NOT NULL,
    slot BIGINT NOT NULL,
    block_time TIMESTAMPTZ
);

CREATE INDEX idx_events_program ON events(program_id);
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_program_type ON events(program_id, event_type);
CREATE INDEX idx_events_block_time ON events(block_time DESC);

-- Balance snapshots (for historical queries)
CREATE TABLE balance_snapshots (
    id BIGSERIAL PRIMARY KEY,
    address TEXT NOT NULL,
    slot BIGINT NOT NULL,
    lamports BIGINT NOT NULL,
    block_time TIMESTAMPTZ NOT NULL,
    UNIQUE(address, slot)
);

CREATE INDEX idx_balance_snapshots_address_time ON balance_snapshots(address, block_time DESC);

-- Token balance snapshots
CREATE TABLE token_balance_snapshots (
    id BIGSERIAL PRIMARY KEY,
    token_account TEXT NOT NULL,
    mint TEXT NOT NULL,
    owner TEXT NOT NULL,
    amount NUMERIC(78, 0) NOT NULL,
    slot BIGINT NOT NULL,
    block_time TIMESTAMPTZ NOT NULL,
    UNIQUE(token_account, slot)
);

CREATE INDEX idx_token_balance_snapshots_owner_mint ON token_balance_snapshots(owner, mint, block_time DESC);

-- Price history
CREATE TABLE price_history (
    id BIGSERIAL PRIMARY KEY,
    mint TEXT NOT NULL,
    price_usd NUMERIC(20, 8) NOT NULL,
    volume_24h NUMERIC(20, 2),
    market_cap NUMERIC(20, 2),
    timestamp TIMESTAMPTZ NOT NULL,
    source TEXT NOT NULL
);

CREATE INDEX idx_price_history_mint_time ON price_history(mint, timestamp DESC);

-- NFT metadata
CREATE TABLE nft_metadata (
    mint TEXT PRIMARY KEY REFERENCES token_mints(address),
    name TEXT NOT NULL,
    symbol TEXT,
    uri TEXT NOT NULL,
    seller_fee_basis_points SMALLINT,
    primary_sale_happened BOOLEAN DEFAULT FALSE,
    is_mutable BOOLEAN DEFAULT TRUE,
    collection_mint TEXT,
    collection_verified BOOLEAN DEFAULT FALSE,
    attributes JSONB,
    image TEXT,
    animation_url TEXT,
    external_url TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_nft_metadata_collection ON nft_metadata(collection_mint);
CREATE INDEX idx_nft_metadata_attributes ON nft_metadata USING GIN(attributes);

Query Patterns

TypeScript
import { Pool } from "pg";

const pool = new Pool({ connectionString: process.env.DATABASE_URL });

// ==================== Account Queries ====================

interface AccountWithTokens {
  address: string;
  lamports: bigint;
  tokens: Array<{
    mint: string;
    symbol: string;
    balance: string;
    decimals: number;
    priceUsd: number | null;
    valueUsd: number | null;
  }>;
}

export async function getAccountWithTokens(
  address: string,
): Promise<AccountWithTokens | null> {
  const accountResult = await pool.query(
    "SELECT * FROM accounts WHERE address = $1",
    [address],
  );

  if (accountResult.rows.length === 0) return null;

  const tokensResult = await pool.query(
    `SELECT 
       ta.mint,
       tm.symbol,
       ta.amount as balance,
       tm.decimals,
       ph.price_usd
     FROM token_accounts ta
     JOIN token_mints tm ON ta.mint = tm.address
     LEFT JOIN LATERAL (
       SELECT price_usd
       FROM price_history
       WHERE mint = ta.mint
       ORDER BY timestamp DESC
       LIMIT 1
     ) ph ON true
     WHERE ta.owner = $1 AND ta.amount > 0
     ORDER BY (ta.amount::numeric / power(10, tm.decimals)) * COALESCE(ph.price_usd, 0) DESC`,
    [address],
  );

  const tokens = tokensResult.rows.map((row) => {
    const balance = parseFloat(row.balance) / Math.pow(10, row.decimals);
    return {
      mint: row.mint,
      symbol: row.symbol,
      balance: balance.toString(),
      decimals: row.decimals,
      priceUsd: row.price_usd ? parseFloat(row.price_usd) : null,
      valueUsd: row.price_usd ? balance * parseFloat(row.price_usd) : null,
    };
  });

  return {
    address,
    lamports: BigInt(accountResult.rows[0].lamports),
    tokens,
  };
}

// ==================== Transaction Queries ====================

interface TransactionFilter {
  address?: string;
  programId?: string;
  startTime?: Date;
  endTime?: Date;
  success?: boolean;
  limit?: number;
  cursor?: string;
}

export async function getTransactions(filter: TransactionFilter) {
  const conditions: string[] = [];
  const params: any[] = [];
  let paramIndex = 1;

  if (filter.address) {
    conditions.push(`
      EXISTS (
        SELECT 1 FROM transaction_accounts ta
        WHERE ta.signature = t.signature AND ta.address = $${paramIndex}
      )
    `);
    params.push(filter.address);
    paramIndex++;
  }

  if (filter.programId) {
    conditions.push(`
      EXISTS (
        SELECT 1 FROM instructions i
        WHERE i.signature = t.signature AND i.program_id = $${paramIndex}
      )
    `);
    params.push(filter.programId);
    paramIndex++;
  }

  if (filter.startTime) {
    conditions.push(`t.block_time >= $${paramIndex}`);
    params.push(filter.startTime);
    paramIndex++;
  }

  if (filter.endTime) {
    conditions.push(`t.block_time <= $${paramIndex}`);
    params.push(filter.endTime);
    paramIndex++;
  }

  if (filter.success !== undefined) {
    conditions.push(`t.success = $${paramIndex}`);
    params.push(filter.success);
    paramIndex++;
  }

  if (filter.cursor) {
    conditions.push(`t.slot < $${paramIndex}`);
    params.push(parseInt(filter.cursor));
    paramIndex++;
  }

  const limit = Math.min(filter.limit || 20, 100);
  params.push(limit + 1); // Fetch one extra to determine hasMore

  const query = `
    SELECT 
      t.signature,
      t.slot,
      t.block_time,
      t.fee,
      t.success,
      t.err,
      t.memo,
      t.compute_units_consumed
    FROM transactions t
    ${conditions.length > 0 ? "WHERE " + conditions.join(" AND ") : ""}
    ORDER BY t.slot DESC
    LIMIT $${paramIndex}
  `;

  const result = await pool.query(query, params);
  const hasMore = result.rows.length > limit;
  const transactions = result.rows.slice(0, limit);

  return {
    transactions,
    pagination: {
      hasMore,
      nextCursor: hasMore
        ? transactions[transactions.length - 1].slot.toString()
        : null,
    },
  };
}

// ==================== Historical Balance Queries ====================

export async function getBalanceHistory(
  address: string,
  startTime: Date,
  endTime: Date,
  interval: "hour" | "day" | "week" = "day",
) {
  const intervalMap = {
    hour: "1 hour",
    day: "1 day",
    week: "1 week",
  };

  const result = await pool.query(
    `WITH time_series AS (
       SELECT generate_series(
         date_trunc($1, $2::timestamptz),
         date_trunc($1, $3::timestamptz),
         $4::interval
       ) AS bucket
     )
     SELECT 
       ts.bucket,
       COALESCE(
         (SELECT lamports 
          FROM balance_snapshots 
          WHERE address = $5 AND block_time <= ts.bucket 
          ORDER BY block_time DESC 
          LIMIT 1),
         0
       ) AS lamports
     FROM time_series ts
     ORDER BY ts.bucket`,
    [interval, startTime, endTime, intervalMap[interval], address],
  );

  return result.rows.map((row) => ({
    timestamp: row.bucket,
    lamports: BigInt(row.lamports),
    sol: parseFloat(row.lamports) / 1e9,
  }));
}

// ==================== NFT Queries ====================

interface NFTFilter {
  owner?: string;
  collection?: string;
  attributes?: Record<string, string | number>;
  limit?: number;
  offset?: number;
}

export async function getNFTs(filter: NFTFilter) {
  const conditions: string[] = [];
  const params: any[] = [];
  let paramIndex = 1;

  if (filter.owner) {
    conditions.push(`ta.owner = $${paramIndex}`);
    params.push(filter.owner);
    paramIndex++;
  }

  if (filter.collection) {
    conditions.push(`nm.collection_mint = $${paramIndex}`);
    params.push(filter.collection);
    paramIndex++;
  }

  if (filter.attributes) {
    for (const [key, value] of Object.entries(filter.attributes)) {
      conditions.push(`nm.attributes @> $${paramIndex}::jsonb`);
      params.push(JSON.stringify([{ trait_type: key, value }]));
      paramIndex++;
    }
  }

  const limit = Math.min(filter.limit || 20, 100);
  const offset = filter.offset || 0;

  params.push(limit, offset);

  const query = `
    SELECT 
      nm.mint,
      nm.name,
      nm.symbol,
      nm.uri,
      nm.image,
      nm.collection_mint,
      nm.attributes,
      ta.owner
    FROM nft_metadata nm
    JOIN token_accounts ta ON ta.mint = nm.mint AND ta.amount > 0
    ${conditions.length > 0 ? "WHERE " + conditions.join(" AND ") : ""}
    ORDER BY nm.created_at DESC
    LIMIT $${paramIndex - 1} OFFSET $${paramIndex}
  `;

  const result = await pool.query(query, params);

  // Get total count
  const countQuery = `
    SELECT COUNT(*) as total
    FROM nft_metadata nm
    JOIN token_accounts ta ON ta.mint = nm.mint AND ta.amount > 0
    ${conditions.length > 0 ? "WHERE " + conditions.join(" AND ") : ""}
  `;
  const countResult = await pool.query(countQuery, params.slice(0, -2));

  return {
    nfts: result.rows,
    pagination: {
      total: parseInt(countResult.rows[0].total),
      limit,
      offset,
      hasMore:
        offset + result.rows.length < parseInt(countResult.rows[0].total),
    },
  };
}

// ==================== Event Queries ====================

export async function getEvents(
  programId: string,
  eventType: string,
  options: {
    startTime?: Date;
    endTime?: Date;
    limit?: number;
    cursor?: bigint;
  } = {},
) {
  const conditions = ["program_id = $1", "event_type = $2"];
  const params: any[] = [programId, eventType];
  let paramIndex = 3;

  if (options.startTime) {
    conditions.push(`block_time >= $${paramIndex}`);
    params.push(options.startTime);
    paramIndex++;
  }

  if (options.endTime) {
    conditions.push(`block_time <= $${paramIndex}`);
    params.push(options.endTime);
    paramIndex++;
  }

  if (options.cursor) {
    conditions.push(`id < $${paramIndex}`);
    params.push(options.cursor);
    paramIndex++;
  }

  const limit = Math.min(options.limit || 50, 200);
  params.push(limit + 1);

  const result = await pool.query(
    `SELECT id, signature, data, slot, block_time
     FROM events
     WHERE ${conditions.join(" AND ")}
     ORDER BY id DESC
     LIMIT $${paramIndex}`,
    params,
  );

  const hasMore = result.rows.length > limit;
  const events = result.rows.slice(0, limit);

  return {
    events,
    pagination: {
      hasMore,
      nextCursor: hasMore ? events[events.length - 1].id : null,
    },
  };
}

Real-time Updates with LISTEN/NOTIFY

TypeScript
import { Pool, Client } from "pg";

// Database trigger for notifications
const setupTriggers = `
-- Function to notify on changes
CREATE OR REPLACE FUNCTION notify_changes()
RETURNS TRIGGER AS $$
BEGIN
  PERFORM pg_notify(
    'db_changes',
    json_build_object(
      'table', TG_TABLE_NAME,
      'operation', TG_OP,
      'data', CASE 
        WHEN TG_OP = 'DELETE' THEN row_to_json(OLD)
        ELSE row_to_json(NEW)
      END
    )::text
  );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Apply to tables
CREATE TRIGGER accounts_notify
  AFTER INSERT OR UPDATE ON accounts
  FOR EACH ROW EXECUTE FUNCTION notify_changes();

CREATE TRIGGER token_accounts_notify
  AFTER INSERT OR UPDATE ON token_accounts
  FOR EACH ROW EXECUTE FUNCTION notify_changes();

CREATE TRIGGER transactions_notify
  AFTER INSERT ON transactions
  FOR EACH ROW EXECUTE FUNCTION notify_changes();

CREATE TRIGGER events_notify
  AFTER INSERT ON events
  FOR EACH ROW EXECUTE FUNCTION notify_changes();
`;

// Listener class
export class DatabaseListener {
  private client: Client;
  private handlers: Map<string, Set<(data: any) => void>> = new Map();

  constructor(connectionString: string) {
    this.client = new Client({ connectionString });
  }

  async connect() {
    await this.client.connect();
    await this.client.query("LISTEN db_changes");

    this.client.on("notification", (msg) => {
      if (msg.channel === "db_changes" && msg.payload) {
        const { table, operation, data } = JSON.parse(msg.payload);
        this.emit(table, { operation, data });
      }
    });
  }

  subscribe(table: string, handler: (data: any) => void) {
    if (!this.handlers.has(table)) {
      this.handlers.set(table, new Set());
    }
    this.handlers.get(table)!.add(handler);

    return () => {
      this.handlers.get(table)?.delete(handler);
    };
  }

  private emit(table: string, data: any) {
    const handlers = this.handlers.get(table);
    if (handlers) {
      for (const handler of handlers) {
        handler(data);
      }
    }
  }

  async disconnect() {
    await this.client.end();
  }
}

// Usage
const listener = new DatabaseListener(process.env.DATABASE_URL!);
await listener.connect();

listener.subscribe("accounts", ({ operation, data }) => {
  console.log(`Account ${operation}:`, data.address);
  // Invalidate cache, notify WebSocket clients, etc.
});

listener.subscribe("events", ({ operation, data }) => {
  console.log(`Event ${operation}:`, data.event_type);
  // Process events in real-time
});

Time-series Data with TimescaleDB

SQL
-- Enable TimescaleDB
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- Create hypertable for price history
CREATE TABLE price_ticks (
    time TIMESTAMPTZ NOT NULL,
    mint TEXT NOT NULL,
    price_usd NUMERIC(20, 8) NOT NULL,
    volume NUMERIC(20, 2),
    liquidity NUMERIC(20, 2)
);

SELECT create_hypertable('price_ticks', 'time');

-- Add compression
ALTER TABLE price_ticks SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'mint'
);

SELECT add_compression_policy('price_ticks', INTERVAL '7 days');

-- Continuous aggregates for OHLCV
CREATE MATERIALIZED VIEW price_ohlcv_1h
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS bucket,
    mint,
    first(price_usd, time) AS open,
    max(price_usd) AS high,
    min(price_usd) AS low,
    last(price_usd, time) AS close,
    sum(volume) AS volume
FROM price_ticks
GROUP BY bucket, mint;

-- Refresh policy
SELECT add_continuous_aggregate_policy('price_ohlcv_1h',
    start_offset => INTERVAL '3 hours',
    end_offset => INTERVAL '1 hour',
    schedule_interval => INTERVAL '1 hour');

Query Optimization

TypeScript
// Connection pooling
const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
  max: 20,
  idleTimeoutMillis: 30000,
  connectionTimeoutMillis: 2000,
});

// Prepared statements for frequent queries
const preparedStatements = {
  getAccount: {
    name: "get_account",
    text: "SELECT * FROM accounts WHERE address = $1",
  },
  getTokenBalances: {
    name: "get_token_balances",
    text: `
      SELECT ta.mint, tm.symbol, ta.amount, tm.decimals
      FROM token_accounts ta
      JOIN token_mints tm ON ta.mint = tm.address
      WHERE ta.owner = $1 AND ta.amount > 0
    `,
  },
  getRecentTransactions: {
    name: "get_recent_transactions",
    text: `
      SELECT t.signature, t.slot, t.block_time, t.success
      FROM transactions t
      JOIN transaction_accounts ta ON t.signature = ta.signature
      WHERE ta.address = $1
      ORDER BY t.slot DESC
      LIMIT $2
    `,
  },
};

// Batch loading with DataLoader pattern
import DataLoader from "dataloader";

const accountLoader = new DataLoader(
  async (addresses: readonly string[]) => {
    const result = await pool.query(
      "SELECT * FROM accounts WHERE address = ANY($1)",
      [addresses],
    );

    const accountMap = new Map(result.rows.map((r) => [r.address, r]));
    return addresses.map((addr) => accountMap.get(addr) || null);
  },
  { maxBatchSize: 100, cache: true },
);

// Usage
const accounts = await Promise.all([
  accountLoader.load("address1"),
  accountLoader.load("address2"),
  accountLoader.load("address3"),
]); // Single batched query

Best Practices

PatternDescription
Immutable historyNever update transaction/event data, only append
SnapshotsStore periodic balance snapshots for historical queries
DenormalizationPre-compute frequently accessed aggregations
PartitioningPartition large tables by time (slot, block_time)
IndexesCreate indexes based on actual query patterns
Connection poolingUse connection pools with appropriate limits

Next: Deployment - Production deployment strategies for Solana applications.