Geyser Plugin
Geyser is Solana's streaming interface for real-time account and transaction data. It provides sub-second latency compared to RPC polling.
Architecture
Text
┌─────────────────────────────────────────────────────────────────┐
│ Geyser Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Solana Validator │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Accounts │ │ Transactions │ │ │
│ │ │ DB │ │ Queue │ │ │
│ │ └──────┬───────┘ └──────┬───────┘ │ │
│ │ │ │ │ │
│ │ └─────────┬─────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────┐ │ │
│ │ │ Geyser Plugin │ │ │
│ │ │ Interface │ │ │
│ │ └────────┬─────────┘ │ │
│ │ │ │ │
│ └──────────────────┼───────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Custom Plugin Implementation │ │
│ │ │ │
│ │ • Postgres Plugin • Kafka Plugin │ │
│ │ • gRPC Plugin • Custom Processing │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Plugin Interface
Rust
use {
solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions,
ReplicaTransactionInfoVersions, Result, SlotStatus,
},
std::fmt::{Debug, Formatter},
};
pub struct MyGeyserPlugin {
config: PluginConfig,
db_pool: Option<PgPool>,
}
#[derive(Debug, serde::Deserialize)]
pub struct PluginConfig {
pub database_url: String,
pub program_ids: Vec<String>,
pub batch_size: usize,
}
impl Debug for MyGeyserPlugin {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "MyGeyserPlugin")
}
}
impl GeyserPlugin for MyGeyserPlugin {
fn name(&self) -> &'static str {
"my-geyser-plugin"
}
fn on_load(&mut self, config_file: &str) -> Result<()> {
// Load configuration
let config_content = std::fs::read_to_string(config_file)
.map_err(|e| GeyserPluginError::ConfigFileReadError {
msg: e.to_string()
})?;
self.config = serde_json::from_str(&config_content)
.map_err(|e| GeyserPluginError::ConfigFileReadError {
msg: e.to_string()
})?;
// Initialize database pool
let runtime = tokio::runtime::Runtime::new().unwrap();
self.db_pool = Some(runtime.block_on(async {
PgPool::connect(&self.config.database_url).await.unwrap()
}));
Ok(())
}
fn on_unload(&mut self) {
// Cleanup resources
if let Some(pool) = &self.db_pool {
pool.close();
}
}
fn update_account(
&self,
account: ReplicaAccountInfoVersions,
slot: u64,
is_startup: bool,
) -> Result<()> {
// Skip startup loading if not needed
if is_startup {
return Ok(());
}
let account_info = match account {
ReplicaAccountInfoVersions::V0_0_3(info) => info,
_ => return Ok(()),
};
// Filter by program
let owner = bs58::encode(account_info.owner).into_string();
if !self.config.program_ids.contains(&owner) {
return Ok(());
}
// Process account update
let pubkey = bs58::encode(account_info.pubkey).into_string();
let data = account_info.data.to_vec();
let lamports = account_info.lamports;
// Queue for batch insert
self.queue_account_update(pubkey, owner, data, lamports, slot);
Ok(())
}
fn notify_transaction(
&self,
transaction: ReplicaTransactionInfoVersions,
slot: u64,
) -> Result<()> {
let tx_info = match transaction {
ReplicaTransactionInfoVersions::V0_0_2(info) => info,
_ => return Ok(()),
};
// Skip failed transactions
if tx_info.transaction_status_meta.status.is_err() {
return Ok(());
}
let signature = bs58::encode(tx_info.signature).into_string();
// Process logs for events
if let Some(log_messages) = &tx_info.transaction_status_meta.log_messages {
self.process_logs(&signature, slot, log_messages)?;
}
Ok(())
}
fn notify_end_of_startup(&self) -> Result<()> {
// Called when historical replay is complete
log::info!("Geyser plugin startup complete");
Ok(())
}
fn update_slot_status(
&self,
slot: u64,
parent: Option<u64>,
status: SlotStatus,
) -> Result<()> {
match status {
SlotStatus::Confirmed => {
// Slot confirmed, safe to use data
self.flush_batch(slot)?;
}
SlotStatus::Rooted => {
// Slot finalized, won't be rolled back
}
_ => {}
}
Ok(())
}
fn account_data_notifications_enabled(&self) -> bool {
true
}
fn transaction_notifications_enabled(&self) -> bool {
true
}
}
impl MyGeyserPlugin {
fn queue_account_update(
&self,
pubkey: String,
owner: String,
data: Vec<u8>,
lamports: u64,
slot: u64,
) {
// Implementation: add to batch queue
}
fn process_logs(
&self,
signature: &str,
slot: u64,
logs: &[String],
) -> Result<()> {
for log in logs {
if log.starts_with("Program data:") {
let data = log.strip_prefix("Program data: ").unwrap();
let decoded = base64::decode(data)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
// Decode and store event
self.process_event(signature, slot, &decoded)?;
}
}
Ok(())
}
fn process_event(&self, signature: &str, slot: u64, data: &[u8]) -> Result<()> {
// Decode event using discriminator
Ok(())
}
fn flush_batch(&self, slot: u64) -> Result<()> {
// Flush pending updates to database
Ok(())
}
}
#[no_mangle]
#[allow(improper_ctypes_definitions)]
pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin {
Box::into_raw(Box::new(MyGeyserPlugin {
config: PluginConfig {
database_url: String::new(),
program_ids: vec![],
batch_size: 100,
},
db_pool: None,
}))
}
Yellowstone gRPC
For easier integration, use Yellowstone gRPC (Triton's Geyser interface):
TypeScript
import {
SubscribeRequestFilterAccounts,
CommitmentLevel,
} from "@triton-one/yellowstone-grpc";
import Client from "@triton-one/yellowstone-grpc";
interface StreamConfig {
endpoint: string;
token: string;
programIds: string[];
}
async function streamAccounts(config: StreamConfig) {
const client = new Client(config.endpoint, config.token, {});
const stream = await client.subscribe();
// Handle incoming data
stream.on("data", (data) => {
if (data.account) {
const account = data.account;
console.log("Account update:", {
pubkey: account.account.pubkey,
slot: account.slot,
lamports: account.account.lamports,
data: account.account.data,
});
}
if (data.transaction) {
const tx = data.transaction;
console.log("Transaction:", {
signature: tx.transaction.signature,
slot: tx.slot,
});
}
});
stream.on("error", (error) => {
console.error("Stream error:", error);
});
// Subscribe to accounts
const accountFilter: SubscribeRequestFilterAccounts = {
account: [],
owner: config.programIds,
filters: [],
};
await new Promise<void>((resolve, reject) => {
stream.write(
{
slots: {},
accounts: { program_accounts: accountFilter },
transactions: {},
blocks: {},
blocksMeta: {},
commitment: CommitmentLevel.CONFIRMED,
accountsDataSlice: [],
ping: undefined,
},
(err: Error | null) => {
if (err) reject(err);
else resolve();
},
);
});
return stream;
}
// Usage
const stream = await streamAccounts({
endpoint: "https://grpc.example.com",
token: "your-token",
programIds: ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
});
Full Indexer with Yellowstone
TypeScript
import Client, {
CommitmentLevel,
SubscribeRequest,
} from "@triton-one/yellowstone-grpc";
import { Pool } from "pg";
import { BorshCoder } from "@coral-xyz/anchor";
interface IndexerConfig {
grpcEndpoint: string;
grpcToken: string;
databaseUrl: string;
programId: string;
idl: any;
}
class YellowstoneIndexer {
private client: Client;
private db: Pool;
private coder: BorshCoder;
private running = false;
constructor(private config: IndexerConfig) {
this.client = new Client(config.grpcEndpoint, config.grpcToken, {});
this.db = new Pool({ connectionString: config.databaseUrl });
this.coder = new BorshCoder(config.idl);
}
async start(): Promise<void> {
this.running = true;
while (this.running) {
try {
await this.runStream();
} catch (error) {
console.error("Stream error, reconnecting:", error);
await this.sleep(5000);
}
}
}
private async runStream(): Promise<void> {
const stream = await this.client.subscribe();
stream.on("data", async (data) => {
try {
await this.processData(data);
} catch (error) {
console.error("Processing error:", error);
}
});
stream.on("error", (error) => {
console.error("Stream error:", error);
throw error;
});
stream.on("end", () => {
console.log("Stream ended");
throw new Error("Stream ended unexpectedly");
});
// Subscribe request
const request: SubscribeRequest = {
slots: {},
accounts: {
program: {
account: [],
owner: [this.config.programId],
filters: [],
},
},
transactions: {
program: {
vote: false,
failed: false,
signature: undefined,
accountInclude: [this.config.programId],
accountExclude: [],
accountRequired: [],
},
},
blocks: {},
blocksMeta: {},
commitment: CommitmentLevel.CONFIRMED,
accountsDataSlice: [],
ping: undefined,
};
await new Promise<void>((resolve, reject) => {
stream.write(request, (err: Error | null) => {
if (err) reject(err);
else resolve();
});
});
// Keep stream alive with pings
const pingInterval = setInterval(() => {
stream.write({ ping: { id: Date.now() } }, () => {});
}, 30000);
// Wait for stream to end
await new Promise<void>((resolve) => {
stream.on("end", () => {
clearInterval(pingInterval);
resolve();
});
});
}
private async processData(data: any): Promise<void> {
if (data.account) {
await this.processAccount(data.account);
}
if (data.transaction) {
await this.processTransaction(data.transaction);
}
if (data.ping) {
console.log("Ping received:", data.ping.id);
}
}
private async processAccount(accountData: any): Promise<void> {
const pubkey = accountData.account.pubkey;
const data = Buffer.from(accountData.account.data);
const slot = accountData.slot;
// Decode account data using IDL
const accountType = this.detectAccountType(data);
if (!accountType) return;
const decoded = this.coder.accounts.decode(accountType, data);
await this.db.query(
`INSERT INTO accounts (pubkey, account_type, data, slot, updated_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (pubkey) DO UPDATE SET
data = EXCLUDED.data,
slot = EXCLUDED.slot,
updated_at = NOW()
WHERE accounts.slot < EXCLUDED.slot`,
[pubkey, accountType, JSON.stringify(decoded), slot],
);
}
private async processTransaction(txData: any): Promise<void> {
const signature = txData.transaction.signature;
const slot = txData.slot;
const meta = txData.transaction.meta;
if (!meta?.logMessages) return;
// Extract events from logs
const events = this.extractEvents(meta.logMessages);
for (const event of events) {
await this.db.query(
`INSERT INTO events (signature, event_type, data, slot, created_at)
VALUES ($1, $2, $3, $4, NOW())`,
[signature, event.name, JSON.stringify(event.data), slot],
);
}
}
private detectAccountType(data: Buffer): string | null {
const discriminator = data.slice(0, 8);
// Match discriminator to account type
// Generated from IDL
for (const [name, schema] of Object.entries(this.config.idl.accounts)) {
const expected = this.coder.accounts.accountDiscriminator(name);
if (discriminator.equals(expected)) {
return name;
}
}
return null;
}
private extractEvents(logs: string[]): Array<{ name: string; data: any }> {
const events: Array<{ name: string; data: any }> = [];
for (const log of logs) {
if (!log.startsWith("Program data:")) continue;
const base64Data = log.replace("Program data: ", "");
const data = Buffer.from(base64Data, "base64");
const discriminator = data.slice(0, 8);
// Match discriminator to event type
for (const event of this.config.idl.events ?? []) {
const expected = this.coder.events.eventDiscriminator(event.name);
if (discriminator.equals(expected)) {
const decoded = this.coder.events.decode(event.name, data);
events.push({ name: event.name, data: decoded });
break;
}
}
}
return events;
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
stop(): void {
this.running = false;
}
}
Plugin Configuration
JSON
{
"libpath": "/path/to/libmy_geyser_plugin.so",
"database_url": "postgres://user:pass@localhost:5432/solana_index",
"program_ids": [
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
"YourProgramId111111111111111111111111111111"
],
"batch_size": 1000,
"accounts": {
"owners": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"]
},
"transactions": {
"mentions": ["YourProgramId111111111111111111111111111111"]
}
}
Comparison
| Feature | RPC | WebSocket | Geyser Plugin | Yellowstone |
|---|---|---|---|---|
| Latency | High | Medium | ~50ms | ~100ms |
| Reliability | Moderate | Moderate | High | High |
| Throughput | Low | Medium | Very High | High |
| Setup | Easy | Easy | Hard | Medium |
| Infrastructure | RPC only | RPC only | Validator | gRPC server |
Next: Helius Integration - Using Helius webhooks and APIs.