Database Patterns
Effectively storing and querying blockchain data requires specialized database patterns that handle immutable history, real-time updates, and complex relationships.
Database Schema Design
Text
┌─────────────────────────────────────────────────────────────────┐
│ Database Schema Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Accounts │────▶│Token Accounts│────▶│ Tokens │ │
│ │ │ │ │ │ (Mints) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Transactions │────▶│ Instructions │────▶│ Events │ │
│ │ │ │ │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │
│ │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Balances │ │ Prices │ │ NFTs │ │
│ │ (Snapshots) │ │ (History) │ │ (Metadata) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
PostgreSQL Schema
SQL
-- Core tables for blockchain data
-- Accounts table
CREATE TABLE accounts (
address TEXT PRIMARY KEY,
owner TEXT NOT NULL,
lamports BIGINT NOT NULL DEFAULT 0,
executable BOOLEAN DEFAULT FALSE,
rent_epoch BIGINT,
data_hash TEXT,
slot_updated BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_accounts_owner ON accounts(owner);
CREATE INDEX idx_accounts_slot ON accounts(slot_updated);
-- Token mints
CREATE TABLE token_mints (
address TEXT PRIMARY KEY,
decimals SMALLINT NOT NULL,
supply NUMERIC(78, 0) NOT NULL DEFAULT 0,
mint_authority TEXT,
freeze_authority TEXT,
name TEXT,
symbol TEXT,
uri TEXT,
is_nft BOOLEAN DEFAULT FALSE,
slot_updated BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_token_mints_symbol ON token_mints(symbol);
CREATE INDEX idx_token_mints_nft ON token_mints(is_nft) WHERE is_nft = TRUE;
-- Token accounts
CREATE TABLE token_accounts (
address TEXT PRIMARY KEY,
mint TEXT NOT NULL REFERENCES token_mints(address),
owner TEXT NOT NULL,
amount NUMERIC(78, 0) NOT NULL DEFAULT 0,
delegate TEXT,
delegated_amount NUMERIC(78, 0) DEFAULT 0,
is_frozen BOOLEAN DEFAULT FALSE,
is_native BOOLEAN DEFAULT FALSE,
close_authority TEXT,
slot_updated BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_token_accounts_owner ON token_accounts(owner);
CREATE INDEX idx_token_accounts_mint ON token_accounts(mint);
CREATE INDEX idx_token_accounts_owner_mint ON token_accounts(owner, mint);
-- Transactions
CREATE TABLE transactions (
signature TEXT PRIMARY KEY,
slot BIGINT NOT NULL,
block_time TIMESTAMPTZ,
fee BIGINT NOT NULL,
success BOOLEAN NOT NULL,
err JSONB,
memo TEXT,
compute_units_consumed BIGINT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_transactions_slot ON transactions(slot);
CREATE INDEX idx_transactions_block_time ON transactions(block_time DESC);
CREATE INDEX idx_transactions_success ON transactions(success);
-- Transaction account mappings (for address lookups)
CREATE TABLE transaction_accounts (
signature TEXT NOT NULL REFERENCES transactions(signature),
address TEXT NOT NULL,
is_signer BOOLEAN NOT NULL DEFAULT FALSE,
is_writable BOOLEAN NOT NULL DEFAULT FALSE,
position SMALLINT NOT NULL,
PRIMARY KEY (signature, address)
);
CREATE INDEX idx_tx_accounts_address ON transaction_accounts(address);
CREATE INDEX idx_tx_accounts_address_signer ON transaction_accounts(address)
WHERE is_signer = TRUE;
-- Instructions
CREATE TABLE instructions (
id BIGSERIAL PRIMARY KEY,
signature TEXT NOT NULL REFERENCES transactions(signature),
program_id TEXT NOT NULL,
instruction_index SMALLINT NOT NULL,
inner_index SMALLINT,
data BYTEA,
parsed_type TEXT,
parsed_info JSONB
);
CREATE INDEX idx_instructions_signature ON instructions(signature);
CREATE INDEX idx_instructions_program ON instructions(program_id);
CREATE INDEX idx_instructions_parsed_type ON instructions(parsed_type);
-- Events (program logs)
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
signature TEXT NOT NULL REFERENCES transactions(signature),
program_id TEXT NOT NULL,
event_type TEXT NOT NULL,
data JSONB NOT NULL,
slot BIGINT NOT NULL,
block_time TIMESTAMPTZ
);
CREATE INDEX idx_events_program ON events(program_id);
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_program_type ON events(program_id, event_type);
CREATE INDEX idx_events_block_time ON events(block_time DESC);
-- Balance snapshots (for historical queries)
CREATE TABLE balance_snapshots (
id BIGSERIAL PRIMARY KEY,
address TEXT NOT NULL,
slot BIGINT NOT NULL,
lamports BIGINT NOT NULL,
block_time TIMESTAMPTZ NOT NULL,
UNIQUE(address, slot)
);
CREATE INDEX idx_balance_snapshots_address_time ON balance_snapshots(address, block_time DESC);
-- Token balance snapshots
CREATE TABLE token_balance_snapshots (
id BIGSERIAL PRIMARY KEY,
token_account TEXT NOT NULL,
mint TEXT NOT NULL,
owner TEXT NOT NULL,
amount NUMERIC(78, 0) NOT NULL,
slot BIGINT NOT NULL,
block_time TIMESTAMPTZ NOT NULL,
UNIQUE(token_account, slot)
);
CREATE INDEX idx_token_balance_snapshots_owner_mint ON token_balance_snapshots(owner, mint, block_time DESC);
-- Price history
CREATE TABLE price_history (
id BIGSERIAL PRIMARY KEY,
mint TEXT NOT NULL,
price_usd NUMERIC(20, 8) NOT NULL,
volume_24h NUMERIC(20, 2),
market_cap NUMERIC(20, 2),
timestamp TIMESTAMPTZ NOT NULL,
source TEXT NOT NULL
);
CREATE INDEX idx_price_history_mint_time ON price_history(mint, timestamp DESC);
-- NFT metadata
CREATE TABLE nft_metadata (
mint TEXT PRIMARY KEY REFERENCES token_mints(address),
name TEXT NOT NULL,
symbol TEXT,
uri TEXT NOT NULL,
seller_fee_basis_points SMALLINT,
primary_sale_happened BOOLEAN DEFAULT FALSE,
is_mutable BOOLEAN DEFAULT TRUE,
collection_mint TEXT,
collection_verified BOOLEAN DEFAULT FALSE,
attributes JSONB,
image TEXT,
animation_url TEXT,
external_url TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_nft_metadata_collection ON nft_metadata(collection_mint);
CREATE INDEX idx_nft_metadata_attributes ON nft_metadata USING GIN(attributes);
Query Patterns
TypeScript
import { Pool } from "pg";
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
// ==================== Account Queries ====================
interface AccountWithTokens {
address: string;
lamports: bigint;
tokens: Array<{
mint: string;
symbol: string;
balance: string;
decimals: number;
priceUsd: number | null;
valueUsd: number | null;
}>;
}
export async function getAccountWithTokens(
address: string,
): Promise<AccountWithTokens | null> {
const accountResult = await pool.query(
"SELECT * FROM accounts WHERE address = $1",
[address],
);
if (accountResult.rows.length === 0) return null;
const tokensResult = await pool.query(
`SELECT
ta.mint,
tm.symbol,
ta.amount as balance,
tm.decimals,
ph.price_usd
FROM token_accounts ta
JOIN token_mints tm ON ta.mint = tm.address
LEFT JOIN LATERAL (
SELECT price_usd
FROM price_history
WHERE mint = ta.mint
ORDER BY timestamp DESC
LIMIT 1
) ph ON true
WHERE ta.owner = $1 AND ta.amount > 0
ORDER BY (ta.amount::numeric / power(10, tm.decimals)) * COALESCE(ph.price_usd, 0) DESC`,
[address],
);
const tokens = tokensResult.rows.map((row) => {
const balance = parseFloat(row.balance) / Math.pow(10, row.decimals);
return {
mint: row.mint,
symbol: row.symbol,
balance: balance.toString(),
decimals: row.decimals,
priceUsd: row.price_usd ? parseFloat(row.price_usd) : null,
valueUsd: row.price_usd ? balance * parseFloat(row.price_usd) : null,
};
});
return {
address,
lamports: BigInt(accountResult.rows[0].lamports),
tokens,
};
}
// ==================== Transaction Queries ====================
interface TransactionFilter {
address?: string;
programId?: string;
startTime?: Date;
endTime?: Date;
success?: boolean;
limit?: number;
cursor?: string;
}
export async function getTransactions(filter: TransactionFilter) {
const conditions: string[] = [];
const params: any[] = [];
let paramIndex = 1;
if (filter.address) {
conditions.push(`
EXISTS (
SELECT 1 FROM transaction_accounts ta
WHERE ta.signature = t.signature AND ta.address = $${paramIndex}
)
`);
params.push(filter.address);
paramIndex++;
}
if (filter.programId) {
conditions.push(`
EXISTS (
SELECT 1 FROM instructions i
WHERE i.signature = t.signature AND i.program_id = $${paramIndex}
)
`);
params.push(filter.programId);
paramIndex++;
}
if (filter.startTime) {
conditions.push(`t.block_time >= $${paramIndex}`);
params.push(filter.startTime);
paramIndex++;
}
if (filter.endTime) {
conditions.push(`t.block_time <= $${paramIndex}`);
params.push(filter.endTime);
paramIndex++;
}
if (filter.success !== undefined) {
conditions.push(`t.success = $${paramIndex}`);
params.push(filter.success);
paramIndex++;
}
if (filter.cursor) {
conditions.push(`t.slot < $${paramIndex}`);
params.push(parseInt(filter.cursor));
paramIndex++;
}
const limit = Math.min(filter.limit || 20, 100);
params.push(limit + 1); // Fetch one extra to determine hasMore
const query = `
SELECT
t.signature,
t.slot,
t.block_time,
t.fee,
t.success,
t.err,
t.memo,
t.compute_units_consumed
FROM transactions t
${conditions.length > 0 ? "WHERE " + conditions.join(" AND ") : ""}
ORDER BY t.slot DESC
LIMIT $${paramIndex}
`;
const result = await pool.query(query, params);
const hasMore = result.rows.length > limit;
const transactions = result.rows.slice(0, limit);
return {
transactions,
pagination: {
hasMore,
nextCursor: hasMore
? transactions[transactions.length - 1].slot.toString()
: null,
},
};
}
// ==================== Historical Balance Queries ====================
export async function getBalanceHistory(
address: string,
startTime: Date,
endTime: Date,
interval: "hour" | "day" | "week" = "day",
) {
const intervalMap = {
hour: "1 hour",
day: "1 day",
week: "1 week",
};
const result = await pool.query(
`WITH time_series AS (
SELECT generate_series(
date_trunc($1, $2::timestamptz),
date_trunc($1, $3::timestamptz),
$4::interval
) AS bucket
)
SELECT
ts.bucket,
COALESCE(
(SELECT lamports
FROM balance_snapshots
WHERE address = $5 AND block_time <= ts.bucket
ORDER BY block_time DESC
LIMIT 1),
0
) AS lamports
FROM time_series ts
ORDER BY ts.bucket`,
[interval, startTime, endTime, intervalMap[interval], address],
);
return result.rows.map((row) => ({
timestamp: row.bucket,
lamports: BigInt(row.lamports),
sol: parseFloat(row.lamports) / 1e9,
}));
}
// ==================== NFT Queries ====================
interface NFTFilter {
owner?: string;
collection?: string;
attributes?: Record<string, string | number>;
limit?: number;
offset?: number;
}
export async function getNFTs(filter: NFTFilter) {
const conditions: string[] = [];
const params: any[] = [];
let paramIndex = 1;
if (filter.owner) {
conditions.push(`ta.owner = $${paramIndex}`);
params.push(filter.owner);
paramIndex++;
}
if (filter.collection) {
conditions.push(`nm.collection_mint = $${paramIndex}`);
params.push(filter.collection);
paramIndex++;
}
if (filter.attributes) {
for (const [key, value] of Object.entries(filter.attributes)) {
conditions.push(`nm.attributes @> $${paramIndex}::jsonb`);
params.push(JSON.stringify([{ trait_type: key, value }]));
paramIndex++;
}
}
const limit = Math.min(filter.limit || 20, 100);
const offset = filter.offset || 0;
params.push(limit, offset);
const query = `
SELECT
nm.mint,
nm.name,
nm.symbol,
nm.uri,
nm.image,
nm.collection_mint,
nm.attributes,
ta.owner
FROM nft_metadata nm
JOIN token_accounts ta ON ta.mint = nm.mint AND ta.amount > 0
${conditions.length > 0 ? "WHERE " + conditions.join(" AND ") : ""}
ORDER BY nm.created_at DESC
LIMIT $${paramIndex - 1} OFFSET $${paramIndex}
`;
const result = await pool.query(query, params);
// Get total count
const countQuery = `
SELECT COUNT(*) as total
FROM nft_metadata nm
JOIN token_accounts ta ON ta.mint = nm.mint AND ta.amount > 0
${conditions.length > 0 ? "WHERE " + conditions.join(" AND ") : ""}
`;
const countResult = await pool.query(countQuery, params.slice(0, -2));
return {
nfts: result.rows,
pagination: {
total: parseInt(countResult.rows[0].total),
limit,
offset,
hasMore:
offset + result.rows.length < parseInt(countResult.rows[0].total),
},
};
}
// ==================== Event Queries ====================
export async function getEvents(
programId: string,
eventType: string,
options: {
startTime?: Date;
endTime?: Date;
limit?: number;
cursor?: bigint;
} = {},
) {
const conditions = ["program_id = $1", "event_type = $2"];
const params: any[] = [programId, eventType];
let paramIndex = 3;
if (options.startTime) {
conditions.push(`block_time >= $${paramIndex}`);
params.push(options.startTime);
paramIndex++;
}
if (options.endTime) {
conditions.push(`block_time <= $${paramIndex}`);
params.push(options.endTime);
paramIndex++;
}
if (options.cursor) {
conditions.push(`id < $${paramIndex}`);
params.push(options.cursor);
paramIndex++;
}
const limit = Math.min(options.limit || 50, 200);
params.push(limit + 1);
const result = await pool.query(
`SELECT id, signature, data, slot, block_time
FROM events
WHERE ${conditions.join(" AND ")}
ORDER BY id DESC
LIMIT $${paramIndex}`,
params,
);
const hasMore = result.rows.length > limit;
const events = result.rows.slice(0, limit);
return {
events,
pagination: {
hasMore,
nextCursor: hasMore ? events[events.length - 1].id : null,
},
};
}
Real-time Updates with LISTEN/NOTIFY
TypeScript
import { Pool, Client } from "pg";
// Database trigger for notifications
const setupTriggers = `
-- Function to notify on changes
CREATE OR REPLACE FUNCTION notify_changes()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(
'db_changes',
json_build_object(
'table', TG_TABLE_NAME,
'operation', TG_OP,
'data', CASE
WHEN TG_OP = 'DELETE' THEN row_to_json(OLD)
ELSE row_to_json(NEW)
END
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Apply to tables
CREATE TRIGGER accounts_notify
AFTER INSERT OR UPDATE ON accounts
FOR EACH ROW EXECUTE FUNCTION notify_changes();
CREATE TRIGGER token_accounts_notify
AFTER INSERT OR UPDATE ON token_accounts
FOR EACH ROW EXECUTE FUNCTION notify_changes();
CREATE TRIGGER transactions_notify
AFTER INSERT ON transactions
FOR EACH ROW EXECUTE FUNCTION notify_changes();
CREATE TRIGGER events_notify
AFTER INSERT ON events
FOR EACH ROW EXECUTE FUNCTION notify_changes();
`;
// Listener class
export class DatabaseListener {
private client: Client;
private handlers: Map<string, Set<(data: any) => void>> = new Map();
constructor(connectionString: string) {
this.client = new Client({ connectionString });
}
async connect() {
await this.client.connect();
await this.client.query("LISTEN db_changes");
this.client.on("notification", (msg) => {
if (msg.channel === "db_changes" && msg.payload) {
const { table, operation, data } = JSON.parse(msg.payload);
this.emit(table, { operation, data });
}
});
}
subscribe(table: string, handler: (data: any) => void) {
if (!this.handlers.has(table)) {
this.handlers.set(table, new Set());
}
this.handlers.get(table)!.add(handler);
return () => {
this.handlers.get(table)?.delete(handler);
};
}
private emit(table: string, data: any) {
const handlers = this.handlers.get(table);
if (handlers) {
for (const handler of handlers) {
handler(data);
}
}
}
async disconnect() {
await this.client.end();
}
}
// Usage
const listener = new DatabaseListener(process.env.DATABASE_URL!);
await listener.connect();
listener.subscribe("accounts", ({ operation, data }) => {
console.log(`Account ${operation}:`, data.address);
// Invalidate cache, notify WebSocket clients, etc.
});
listener.subscribe("events", ({ operation, data }) => {
console.log(`Event ${operation}:`, data.event_type);
// Process events in real-time
});
Time-series Data with TimescaleDB
SQL
-- Enable TimescaleDB
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- Create hypertable for price history
CREATE TABLE price_ticks (
time TIMESTAMPTZ NOT NULL,
mint TEXT NOT NULL,
price_usd NUMERIC(20, 8) NOT NULL,
volume NUMERIC(20, 2),
liquidity NUMERIC(20, 2)
);
SELECT create_hypertable('price_ticks', 'time');
-- Add compression
ALTER TABLE price_ticks SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'mint'
);
SELECT add_compression_policy('price_ticks', INTERVAL '7 days');
-- Continuous aggregates for OHLCV
CREATE MATERIALIZED VIEW price_ohlcv_1h
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
mint,
first(price_usd, time) AS open,
max(price_usd) AS high,
min(price_usd) AS low,
last(price_usd, time) AS close,
sum(volume) AS volume
FROM price_ticks
GROUP BY bucket, mint;
-- Refresh policy
SELECT add_continuous_aggregate_policy('price_ohlcv_1h',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
Query Optimization
TypeScript
// Connection pooling
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
});
// Prepared statements for frequent queries
const preparedStatements = {
getAccount: {
name: "get_account",
text: "SELECT * FROM accounts WHERE address = $1",
},
getTokenBalances: {
name: "get_token_balances",
text: `
SELECT ta.mint, tm.symbol, ta.amount, tm.decimals
FROM token_accounts ta
JOIN token_mints tm ON ta.mint = tm.address
WHERE ta.owner = $1 AND ta.amount > 0
`,
},
getRecentTransactions: {
name: "get_recent_transactions",
text: `
SELECT t.signature, t.slot, t.block_time, t.success
FROM transactions t
JOIN transaction_accounts ta ON t.signature = ta.signature
WHERE ta.address = $1
ORDER BY t.slot DESC
LIMIT $2
`,
},
};
// Batch loading with DataLoader pattern
import DataLoader from "dataloader";
const accountLoader = new DataLoader(
async (addresses: readonly string[]) => {
const result = await pool.query(
"SELECT * FROM accounts WHERE address = ANY($1)",
[addresses],
);
const accountMap = new Map(result.rows.map((r) => [r.address, r]));
return addresses.map((addr) => accountMap.get(addr) || null);
},
{ maxBatchSize: 100, cache: true },
);
// Usage
const accounts = await Promise.all([
accountLoader.load("address1"),
accountLoader.load("address2"),
accountLoader.load("address3"),
]); // Single batched query
Best Practices
| Pattern | Description |
|---|---|
| Immutable history | Never update transaction/event data, only append |
| Snapshots | Store periodic balance snapshots for historical queries |
| Denormalization | Pre-compute frequently accessed aggregations |
| Partitioning | Partition large tables by time (slot, block_time) |
| Indexes | Create indexes based on actual query patterns |
| Connection pooling | Use connection pools with appropriate limits |
Next: Deployment - Production deployment strategies for Solana applications.