Data Indexing Overview
Indexing transforms raw blockchain data into queryable structures. On Solana, this involves parsing accounts, transactions, and program logs efficiently.
Why Index?
Text
┌─────────────────────────────────────────────────────────────────┐
│ Indexing Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Solana │ │ Indexer │ │ Database │ │
│ │ Node │─────▶│ Service │─────▶│ (Postgres) │ │
│ │ │ │ │ │ │ │
│ │ • Blocks │ │ • Parse │ │ • Tables │ │
│ │ • Txs │ │ • Decode │ │ • Indexes │ │
│ │ • Accounts │ │ • Transform│ │ • Views │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │
│ │ ┌─────────────┐ │ │
│ │ │ Webhooks │◀───────────────┤ │
│ │ └─────────────┘ │ │
│ │ ▼ │
│ │ ┌─────────────────┐ │
│ └─────────────────────────────▶│ GraphQL / │ │
│ │ REST API │ │
│ └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Frontend │ │
│ │ Application│ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Indexing Approaches
| Approach | Latency | Complexity | Best For |
|---|---|---|---|
| RPC Polling | High | Low | Prototyping |
| WebSocket | Medium | Medium | Real-time updates |
| Geyser Plugin | Low | High | High throughput |
| Yellowstone gRPC | Low | Medium | Production indexing |
| Third-party (Helius, Triton) | Low | Low | Quick deployment |
Data Sources
1. Account Data
TypeScript
import { Connection, PublicKey } from "@solana/web3.js";
import { deserialize } from "borsh";
interface UserAccount {
balance: bigint;
lastUpdate: bigint;
owner: Uint8Array;
}
const USER_SCHEMA = new Map([
[
UserAccount,
{
kind: "struct",
fields: [
["balance", "u64"],
["lastUpdate", "i64"],
["owner", [32]],
],
},
],
]);
async function indexProgramAccounts(
connection: Connection,
programId: PublicKey,
) {
// Fetch all accounts owned by program
const accounts = await connection.getProgramAccounts(programId, {
commitment: "confirmed",
filters: [
{ dataSize: 48 }, // Filter by account size
],
});
const indexed = accounts.map((account) => {
const data = deserialize(USER_SCHEMA, UserAccount, account.account.data);
return {
pubkey: account.pubkey.toBase58(),
balance: data.balance.toString(),
lastUpdate: new Date(Number(data.lastUpdate) * 1000),
owner: new PublicKey(data.owner).toBase58(),
};
});
return indexed;
}
2. Transaction Logs
TypeScript
import {
Connection,
PublicKey,
ParsedTransactionWithMeta,
} from "@solana/web3.js";
interface ParsedEvent {
name: string;
data: Record<string, unknown>;
signature: string;
slot: number;
timestamp: number;
}
async function parseTransactionLogs(
connection: Connection,
programId: PublicKey,
signature: string,
): Promise<ParsedEvent[]> {
const tx = await connection.getParsedTransaction(signature, {
commitment: "confirmed",
maxSupportedTransactionVersion: 0,
});
if (!tx?.meta?.logMessages) return [];
const events: ParsedEvent[] = [];
const programData = tx.meta.logMessages
.filter((log) => log.startsWith("Program data:"))
.map((log) => log.replace("Program data: ", ""));
for (const data of programData) {
const decoded = Buffer.from(data, "base64");
const event = decodeEvent(decoded);
if (event) {
events.push({
...event,
signature,
slot: tx.slot,
timestamp: tx.blockTime ?? 0,
});
}
}
return events;
}
function decodeEvent(
data: Buffer,
): { name: string; data: Record<string, unknown> } | null {
// First 8 bytes are discriminator
const discriminator = data.slice(0, 8);
const eventData = data.slice(8);
// Match discriminator to event type
// This would be generated from your IDL
const EVENT_DISCRIMINATORS: Record<string, string> = {
f5e23ed0d2ae2cf7: "Transfer",
"82a90824d59b3e8c": "Deposit",
};
const eventName = EVENT_DISCRIMINATORS[discriminator.toString("hex")];
if (!eventName) return null;
// Decode based on event schema
return {
name: eventName,
data: decodeEventData(eventName, eventData),
};
}
3. Account Subscriptions
TypeScript
import { Connection, PublicKey } from "@solana/web3.js";
class AccountWatcher {
private subscriptions: Map<string, number> = new Map();
constructor(
private connection: Connection,
private onAccountChange: (pubkey: string, data: Buffer) => void,
) {}
watch(pubkey: PublicKey): void {
const subscriptionId = this.connection.onAccountChange(
pubkey,
(accountInfo, context) => {
this.onAccountChange(pubkey.toBase58(), accountInfo.data);
},
"confirmed",
);
this.subscriptions.set(pubkey.toBase58(), subscriptionId);
}
unwatch(pubkey: PublicKey): void {
const subscriptionId = this.subscriptions.get(pubkey.toBase58());
if (subscriptionId !== undefined) {
this.connection.removeAccountChangeListener(subscriptionId);
this.subscriptions.delete(pubkey.toBase58());
}
}
async unwatchAll(): Promise<void> {
for (const [pubkey, subscriptionId] of this.subscriptions) {
await this.connection.removeAccountChangeListener(subscriptionId);
}
this.subscriptions.clear();
}
}
// Usage
const watcher = new AccountWatcher(connection, (pubkey, data) => {
console.log(`Account ${pubkey} changed:`, data);
// Update database
});
watcher.watch(new PublicKey("..."));
Database Schema Design
SQL
-- Core tables for Solana indexing
-- Accounts table
CREATE TABLE accounts (
pubkey TEXT PRIMARY KEY,
program_id TEXT NOT NULL,
lamports BIGINT NOT NULL,
data BYTEA,
owner TEXT NOT NULL,
executable BOOLEAN NOT NULL DEFAULT FALSE,
rent_epoch BIGINT NOT NULL,
slot BIGINT NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- Indexes for common queries
INDEX idx_accounts_program (program_id),
INDEX idx_accounts_owner (owner),
INDEX idx_accounts_slot (slot)
);
-- Transactions table
CREATE TABLE transactions (
signature TEXT PRIMARY KEY,
slot BIGINT NOT NULL,
block_time TIMESTAMP WITH TIME ZONE,
fee BIGINT NOT NULL,
status TEXT NOT NULL, -- 'success' or 'failed'
err JSONB,
logs TEXT[],
INDEX idx_transactions_slot (slot),
INDEX idx_transactions_block_time (block_time)
);
-- Transaction instructions
CREATE TABLE instructions (
id BIGSERIAL PRIMARY KEY,
signature TEXT NOT NULL REFERENCES transactions(signature),
program_id TEXT NOT NULL,
instruction_index INTEGER NOT NULL,
inner_index INTEGER, -- NULL for outer instructions
data BYTEA NOT NULL,
accounts TEXT[] NOT NULL,
INDEX idx_instructions_signature (signature),
INDEX idx_instructions_program (program_id)
);
-- Program events (decoded from logs)
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
signature TEXT NOT NULL REFERENCES transactions(signature),
program_id TEXT NOT NULL,
event_type TEXT NOT NULL,
data JSONB NOT NULL,
slot BIGINT NOT NULL,
block_time TIMESTAMP WITH TIME ZONE,
INDEX idx_events_program_type (program_id, event_type),
INDEX idx_events_block_time (block_time)
);
-- Token transfers (SPL Token specific)
CREATE TABLE token_transfers (
id BIGSERIAL PRIMARY KEY,
signature TEXT NOT NULL,
mint TEXT NOT NULL,
source TEXT NOT NULL,
destination TEXT NOT NULL,
amount NUMERIC NOT NULL,
decimals INTEGER NOT NULL,
slot BIGINT NOT NULL,
block_time TIMESTAMP WITH TIME ZONE,
INDEX idx_transfers_mint (mint),
INDEX idx_transfers_source (source),
INDEX idx_transfers_destination (destination),
INDEX idx_transfers_block_time (block_time)
);
-- NFT metadata
CREATE TABLE nft_metadata (
mint TEXT PRIMARY KEY,
name TEXT,
symbol TEXT,
uri TEXT,
seller_fee_basis_points INTEGER,
creators JSONB,
collection TEXT,
attributes JSONB,
image TEXT,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
INDEX idx_nft_collection (collection),
INDEX idx_nft_name (name)
);
Indexer Implementation
TypeScript
import { Connection, PublicKey, Commitment } from "@solana/web3.js";
import { Pool } from "pg";
interface IndexerConfig {
rpcUrl: string;
programIds: string[];
startSlot?: number;
batchSize: number;
commitment: Commitment;
}
class SolanaIndexer {
private connection: Connection;
private db: Pool;
private running = false;
private currentSlot = 0;
constructor(
private config: IndexerConfig,
db: Pool,
) {
this.connection = new Connection(config.rpcUrl, {
commitment: config.commitment,
});
this.db = db;
}
async start(): Promise<void> {
this.running = true;
// Get starting slot
this.currentSlot =
this.config.startSlot ?? (await this.getLastIndexedSlot());
console.log(`Starting indexer from slot ${this.currentSlot}`);
while (this.running) {
try {
await this.indexBatch();
await this.sleep(100); // Rate limiting
} catch (error) {
console.error("Indexer error:", error);
await this.sleep(5000); // Back off on error
}
}
}
stop(): void {
this.running = false;
}
private async indexBatch(): Promise<void> {
const latestSlot = await this.connection.getSlot();
if (this.currentSlot >= latestSlot) {
await this.sleep(400); // Wait for new slots
return;
}
const endSlot = Math.min(
this.currentSlot + this.config.batchSize,
latestSlot,
);
// Get block signatures for slot range
for (let slot = this.currentSlot; slot <= endSlot; slot++) {
const block = await this.connection.getBlock(slot, {
maxSupportedTransactionVersion: 0,
transactionDetails: "full",
});
if (!block) continue;
await this.processBlock(slot, block);
}
this.currentSlot = endSlot + 1;
await this.saveCheckpoint(this.currentSlot);
}
private async processBlock(slot: number, block: any): Promise<void> {
const client = await this.db.connect();
try {
await client.query("BEGIN");
for (const tx of block.transactions) {
await this.processTransaction(client, slot, block.blockTime, tx);
}
await client.query("COMMIT");
} catch (error) {
await client.query("ROLLBACK");
throw error;
} finally {
client.release();
}
}
private async processTransaction(
client: any,
slot: number,
blockTime: number | null,
tx: any,
): Promise<void> {
const signature = tx.transaction.signatures[0];
const meta = tx.meta;
// Skip failed transactions optionally
if (meta?.err) {
return;
}
// Insert transaction
await client.query(
`INSERT INTO transactions (signature, slot, block_time, fee, status, logs)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (signature) DO NOTHING`,
[
signature,
slot,
blockTime ? new Date(blockTime * 1000) : null,
meta?.fee ?? 0,
meta?.err ? "failed" : "success",
meta?.logMessages ?? [],
],
);
// Process instructions
await this.processInstructions(client, signature, tx.transaction.message);
// Process log events
if (meta?.logMessages) {
await this.processLogs(
client,
signature,
slot,
blockTime,
meta.logMessages,
);
}
// Process account changes
if (meta?.postBalances && meta?.preBalances) {
await this.processAccountChanges(
client,
slot,
tx.transaction.message.accountKeys,
meta.preBalances,
meta.postBalances,
);
}
}
private async processInstructions(
client: any,
signature: string,
message: any,
): Promise<void> {
const accountKeys = message.accountKeys;
for (let i = 0; i < message.instructions.length; i++) {
const ix = message.instructions[i];
const programId = accountKeys[ix.programIdIndex].toBase58();
// Only index configured programs
if (!this.config.programIds.includes(programId)) continue;
await client.query(
`INSERT INTO instructions (signature, program_id, instruction_index, data, accounts)
VALUES ($1, $2, $3, $4, $5)`,
[
signature,
programId,
i,
Buffer.from(ix.data, "base58"),
ix.accounts.map((idx: number) => accountKeys[idx].toBase58()),
],
);
}
}
private async processLogs(
client: any,
signature: string,
slot: number,
blockTime: number | null,
logs: string[],
): Promise<void> {
for (const log of logs) {
if (!log.startsWith("Program data:")) continue;
const data = log.replace("Program data: ", "");
const decoded = Buffer.from(data, "base64");
const event = this.decodeEvent(decoded);
if (!event) continue;
await client.query(
`INSERT INTO events (signature, program_id, event_type, data, slot, block_time)
VALUES ($1, $2, $3, $4, $5, $6)`,
[
signature,
event.programId,
event.type,
JSON.stringify(event.data),
slot,
blockTime ? new Date(blockTime * 1000) : null,
],
);
}
}
private decodeEvent(data: Buffer): {
programId: string;
type: string;
data: Record<string, unknown>;
} | null {
// Implementation depends on your program's event structure
// Use IDL to decode
return null;
}
private async getLastIndexedSlot(): Promise<number> {
const result = await this.db.query(
"SELECT slot FROM indexer_checkpoint ORDER BY slot DESC LIMIT 1",
);
return result.rows[0]?.slot ?? 0;
}
private async saveCheckpoint(slot: number): Promise<void> {
await this.db.query(
`INSERT INTO indexer_checkpoint (slot, updated_at)
VALUES ($1, NOW())
ON CONFLICT (id) DO UPDATE SET slot = $1, updated_at = NOW()`,
[slot],
);
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
API Layer
TypeScript
import express from "express";
import { Pool } from "pg";
const app = express();
const db = new Pool();
// Get account history
app.get("/accounts/:pubkey", async (req, res) => {
const { pubkey } = req.params;
const account = await db.query("SELECT * FROM accounts WHERE pubkey = $1", [
pubkey,
]);
if (account.rows.length === 0) {
return res.status(404).json({ error: "Account not found" });
}
res.json(account.rows[0]);
});
// Get events by type
app.get("/events/:programId/:eventType", async (req, res) => {
const { programId, eventType } = req.params;
const { limit = 100, offset = 0 } = req.query;
const events = await db.query(
`SELECT * FROM events
WHERE program_id = $1 AND event_type = $2
ORDER BY block_time DESC
LIMIT $3 OFFSET $4`,
[programId, eventType, limit, offset],
);
res.json(events.rows);
});
// Get token transfers
app.get("/transfers/:mint", async (req, res) => {
const { mint } = req.params;
const { limit = 100, from, to } = req.query;
let query = `SELECT * FROM token_transfers WHERE mint = $1`;
const params: any[] = [mint];
if (from) {
params.push(from);
query += ` AND block_time >= $${params.length}`;
}
if (to) {
params.push(to);
query += ` AND block_time <= $${params.length}`;
}
params.push(limit);
query += ` ORDER BY block_time DESC LIMIT $${params.length}`;
const transfers = await db.query(query, params);
res.json(transfers.rows);
});
Performance Optimization
| Technique | Description | Impact |
|---|---|---|
| Batch inserts | Group multiple rows per INSERT | 10x faster writes |
| Connection pooling | Reuse DB connections | Lower latency |
| Partial indexes | Index only relevant rows | Smaller index size |
| Materialized views | Pre-compute aggregations | Faster reads |
| Partitioning | Split tables by time | Faster pruning |
Next: Geyser Plugin - Low-latency streaming with Geyser.