Geyser Plugin

Geyser is Solana's streaming interface for real-time account and transaction data. It provides sub-second latency compared to RPC polling.

Architecture

Text
┌─────────────────────────────────────────────────────────────────┐
│                    Geyser Architecture                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │                   Solana Validator                        │   │
│  │                                                          │   │
│  │  ┌──────────────┐    ┌──────────────┐                    │   │
│  │  │   Accounts   │    │ Transactions │                    │   │
│  │  │     DB       │    │    Queue     │                    │   │
│  │  └──────┬───────┘    └──────┬───────┘                    │   │
│  │         │                   │                            │   │
│  │         └─────────┬─────────┘                            │   │
│  │                   │                                      │   │
│  │                   ▼                                      │   │
│  │         ┌──────────────────┐                             │   │
│  │         │  Geyser Plugin   │                             │   │
│  │         │  Interface       │                             │   │
│  │         └────────┬─────────┘                             │   │
│  │                  │                                       │   │
│  └──────────────────┼───────────────────────────────────────┘   │
│                     │                                           │
│                     ▼                                           │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │              Custom Plugin Implementation                 │   │
│  │                                                          │   │
│  │  • Postgres PluginKafka Plugin                     │   │
│  │  • gRPC PluginCustom Processing                │   │
│  │                                                          │   │
│  └──────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Plugin Interface

Rust
use {
    solana_geyser_plugin_interface::geyser_plugin_interface::{
        GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions,
        ReplicaTransactionInfoVersions, Result, SlotStatus,
    },
    std::fmt::{Debug, Formatter},
};

pub struct MyGeyserPlugin {
    config: PluginConfig,
    db_pool: Option<PgPool>,
}

#[derive(Debug, serde::Deserialize)]
pub struct PluginConfig {
    pub database_url: String,
    pub program_ids: Vec<String>,
    pub batch_size: usize,
}

impl Debug for MyGeyserPlugin {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "MyGeyserPlugin")
    }
}

impl GeyserPlugin for MyGeyserPlugin {
    fn name(&self) -> &'static str {
        "my-geyser-plugin"
    }

    fn on_load(&mut self, config_file: &str) -> Result<()> {
        // Load configuration
        let config_content = std::fs::read_to_string(config_file)
            .map_err(|e| GeyserPluginError::ConfigFileReadError {
                msg: e.to_string()
            })?;

        self.config = serde_json::from_str(&config_content)
            .map_err(|e| GeyserPluginError::ConfigFileReadError {
                msg: e.to_string()
            })?;

        // Initialize database pool
        let runtime = tokio::runtime::Runtime::new().unwrap();
        self.db_pool = Some(runtime.block_on(async {
            PgPool::connect(&self.config.database_url).await.unwrap()
        }));

        Ok(())
    }

    fn on_unload(&mut self) {
        // Cleanup resources
        if let Some(pool) = &self.db_pool {
            pool.close();
        }
    }

    fn update_account(
        &self,
        account: ReplicaAccountInfoVersions,
        slot: u64,
        is_startup: bool,
    ) -> Result<()> {
        // Skip startup loading if not needed
        if is_startup {
            return Ok(());
        }

        let account_info = match account {
            ReplicaAccountInfoVersions::V0_0_3(info) => info,
            _ => return Ok(()),
        };

        // Filter by program
        let owner = bs58::encode(account_info.owner).into_string();
        if !self.config.program_ids.contains(&owner) {
            return Ok(());
        }

        // Process account update
        let pubkey = bs58::encode(account_info.pubkey).into_string();
        let data = account_info.data.to_vec();
        let lamports = account_info.lamports;

        // Queue for batch insert
        self.queue_account_update(pubkey, owner, data, lamports, slot);

        Ok(())
    }

    fn notify_transaction(
        &self,
        transaction: ReplicaTransactionInfoVersions,
        slot: u64,
    ) -> Result<()> {
        let tx_info = match transaction {
            ReplicaTransactionInfoVersions::V0_0_2(info) => info,
            _ => return Ok(()),
        };

        // Skip failed transactions
        if tx_info.transaction_status_meta.status.is_err() {
            return Ok(());
        }

        let signature = bs58::encode(tx_info.signature).into_string();

        // Process logs for events
        if let Some(log_messages) = &tx_info.transaction_status_meta.log_messages {
            self.process_logs(&signature, slot, log_messages)?;
        }

        Ok(())
    }

    fn notify_end_of_startup(&self) -> Result<()> {
        // Called when historical replay is complete
        log::info!("Geyser plugin startup complete");
        Ok(())
    }

    fn update_slot_status(
        &self,
        slot: u64,
        parent: Option<u64>,
        status: SlotStatus,
    ) -> Result<()> {
        match status {
            SlotStatus::Confirmed => {
                // Slot confirmed, safe to use data
                self.flush_batch(slot)?;
            }
            SlotStatus::Rooted => {
                // Slot finalized, won't be rolled back
            }
            _ => {}
        }
        Ok(())
    }

    fn account_data_notifications_enabled(&self) -> bool {
        true
    }

    fn transaction_notifications_enabled(&self) -> bool {
        true
    }
}

impl MyGeyserPlugin {
    fn queue_account_update(
        &self,
        pubkey: String,
        owner: String,
        data: Vec<u8>,
        lamports: u64,
        slot: u64,
    ) {
        // Implementation: add to batch queue
    }

    fn process_logs(
        &self,
        signature: &str,
        slot: u64,
        logs: &[String],
    ) -> Result<()> {
        for log in logs {
            if log.starts_with("Program data:") {
                let data = log.strip_prefix("Program data: ").unwrap();
                let decoded = base64::decode(data)
                    .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;

                // Decode and store event
                self.process_event(signature, slot, &decoded)?;
            }
        }
        Ok(())
    }

    fn process_event(&self, signature: &str, slot: u64, data: &[u8]) -> Result<()> {
        // Decode event using discriminator
        Ok(())
    }

    fn flush_batch(&self, slot: u64) -> Result<()> {
        // Flush pending updates to database
        Ok(())
    }
}

#[no_mangle]
#[allow(improper_ctypes_definitions)]
pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin {
    Box::into_raw(Box::new(MyGeyserPlugin {
        config: PluginConfig {
            database_url: String::new(),
            program_ids: vec![],
            batch_size: 100,
        },
        db_pool: None,
    }))
}

Yellowstone gRPC

For easier integration, use Yellowstone gRPC (Triton's Geyser interface):

TypeScript
import {
  SubscribeRequestFilterAccounts,
  CommitmentLevel,
} from "@triton-one/yellowstone-grpc";
import Client from "@triton-one/yellowstone-grpc";

interface StreamConfig {
  endpoint: string;
  token: string;
  programIds: string[];
}

async function streamAccounts(config: StreamConfig) {
  const client = new Client(config.endpoint, config.token, {});

  const stream = await client.subscribe();

  // Handle incoming data
  stream.on("data", (data) => {
    if (data.account) {
      const account = data.account;
      console.log("Account update:", {
        pubkey: account.account.pubkey,
        slot: account.slot,
        lamports: account.account.lamports,
        data: account.account.data,
      });
    }

    if (data.transaction) {
      const tx = data.transaction;
      console.log("Transaction:", {
        signature: tx.transaction.signature,
        slot: tx.slot,
      });
    }
  });

  stream.on("error", (error) => {
    console.error("Stream error:", error);
  });

  // Subscribe to accounts
  const accountFilter: SubscribeRequestFilterAccounts = {
    account: [],
    owner: config.programIds,
    filters: [],
  };

  await new Promise<void>((resolve, reject) => {
    stream.write(
      {
        slots: {},
        accounts: { program_accounts: accountFilter },
        transactions: {},
        blocks: {},
        blocksMeta: {},
        commitment: CommitmentLevel.CONFIRMED,
        accountsDataSlice: [],
        ping: undefined,
      },
      (err: Error | null) => {
        if (err) reject(err);
        else resolve();
      },
    );
  });

  return stream;
}

// Usage
const stream = await streamAccounts({
  endpoint: "https://grpc.example.com",
  token: "your-token",
  programIds: ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
});

Full Indexer with Yellowstone

TypeScript
import Client, {
  CommitmentLevel,
  SubscribeRequest,
} from "@triton-one/yellowstone-grpc";
import { Pool } from "pg";
import { BorshCoder } from "@coral-xyz/anchor";

interface IndexerConfig {
  grpcEndpoint: string;
  grpcToken: string;
  databaseUrl: string;
  programId: string;
  idl: any;
}

class YellowstoneIndexer {
  private client: Client;
  private db: Pool;
  private coder: BorshCoder;
  private running = false;

  constructor(private config: IndexerConfig) {
    this.client = new Client(config.grpcEndpoint, config.grpcToken, {});
    this.db = new Pool({ connectionString: config.databaseUrl });
    this.coder = new BorshCoder(config.idl);
  }

  async start(): Promise<void> {
    this.running = true;

    while (this.running) {
      try {
        await this.runStream();
      } catch (error) {
        console.error("Stream error, reconnecting:", error);
        await this.sleep(5000);
      }
    }
  }

  private async runStream(): Promise<void> {
    const stream = await this.client.subscribe();

    stream.on("data", async (data) => {
      try {
        await this.processData(data);
      } catch (error) {
        console.error("Processing error:", error);
      }
    });

    stream.on("error", (error) => {
      console.error("Stream error:", error);
      throw error;
    });

    stream.on("end", () => {
      console.log("Stream ended");
      throw new Error("Stream ended unexpectedly");
    });

    // Subscribe request
    const request: SubscribeRequest = {
      slots: {},
      accounts: {
        program: {
          account: [],
          owner: [this.config.programId],
          filters: [],
        },
      },
      transactions: {
        program: {
          vote: false,
          failed: false,
          signature: undefined,
          accountInclude: [this.config.programId],
          accountExclude: [],
          accountRequired: [],
        },
      },
      blocks: {},
      blocksMeta: {},
      commitment: CommitmentLevel.CONFIRMED,
      accountsDataSlice: [],
      ping: undefined,
    };

    await new Promise<void>((resolve, reject) => {
      stream.write(request, (err: Error | null) => {
        if (err) reject(err);
        else resolve();
      });
    });

    // Keep stream alive with pings
    const pingInterval = setInterval(() => {
      stream.write({ ping: { id: Date.now() } }, () => {});
    }, 30000);

    // Wait for stream to end
    await new Promise<void>((resolve) => {
      stream.on("end", () => {
        clearInterval(pingInterval);
        resolve();
      });
    });
  }

  private async processData(data: any): Promise<void> {
    if (data.account) {
      await this.processAccount(data.account);
    }

    if (data.transaction) {
      await this.processTransaction(data.transaction);
    }

    if (data.ping) {
      console.log("Ping received:", data.ping.id);
    }
  }

  private async processAccount(accountData: any): Promise<void> {
    const pubkey = accountData.account.pubkey;
    const data = Buffer.from(accountData.account.data);
    const slot = accountData.slot;

    // Decode account data using IDL
    const accountType = this.detectAccountType(data);
    if (!accountType) return;

    const decoded = this.coder.accounts.decode(accountType, data);

    await this.db.query(
      `INSERT INTO accounts (pubkey, account_type, data, slot, updated_at)
       VALUES ($1, $2, $3, $4, NOW())
       ON CONFLICT (pubkey) DO UPDATE SET
         data = EXCLUDED.data,
         slot = EXCLUDED.slot,
         updated_at = NOW()
       WHERE accounts.slot < EXCLUDED.slot`,
      [pubkey, accountType, JSON.stringify(decoded), slot],
    );
  }

  private async processTransaction(txData: any): Promise<void> {
    const signature = txData.transaction.signature;
    const slot = txData.slot;
    const meta = txData.transaction.meta;

    if (!meta?.logMessages) return;

    // Extract events from logs
    const events = this.extractEvents(meta.logMessages);

    for (const event of events) {
      await this.db.query(
        `INSERT INTO events (signature, event_type, data, slot, created_at)
         VALUES ($1, $2, $3, $4, NOW())`,
        [signature, event.name, JSON.stringify(event.data), slot],
      );
    }
  }

  private detectAccountType(data: Buffer): string | null {
    const discriminator = data.slice(0, 8);

    // Match discriminator to account type
    // Generated from IDL
    for (const [name, schema] of Object.entries(this.config.idl.accounts)) {
      const expected = this.coder.accounts.accountDiscriminator(name);
      if (discriminator.equals(expected)) {
        return name;
      }
    }

    return null;
  }

  private extractEvents(logs: string[]): Array<{ name: string; data: any }> {
    const events: Array<{ name: string; data: any }> = [];

    for (const log of logs) {
      if (!log.startsWith("Program data:")) continue;

      const base64Data = log.replace("Program data: ", "");
      const data = Buffer.from(base64Data, "base64");
      const discriminator = data.slice(0, 8);

      // Match discriminator to event type
      for (const event of this.config.idl.events ?? []) {
        const expected = this.coder.events.eventDiscriminator(event.name);
        if (discriminator.equals(expected)) {
          const decoded = this.coder.events.decode(event.name, data);
          events.push({ name: event.name, data: decoded });
          break;
        }
      }
    }

    return events;
  }

  private sleep(ms: number): Promise<void> {
    return new Promise((resolve) => setTimeout(resolve, ms));
  }

  stop(): void {
    this.running = false;
  }
}

Plugin Configuration

JSON
{
  "libpath": "/path/to/libmy_geyser_plugin.so",
  "database_url": "postgres://user:pass@localhost:5432/solana_index",
  "program_ids": [
    "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
    "YourProgramId111111111111111111111111111111"
  ],
  "batch_size": 1000,
  "accounts": {
    "owners": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"]
  },
  "transactions": {
    "mentions": ["YourProgramId111111111111111111111111111111"]
  }
}

Comparison

FeatureRPCWebSocketGeyser PluginYellowstone
LatencyHighMedium~50ms~100ms
ReliabilityModerateModerateHighHigh
ThroughputLowMediumVery HighHigh
SetupEasyEasyHardMedium
InfrastructureRPC onlyRPC onlyValidatorgRPC server

Next: Helius Integration - Using Helius webhooks and APIs.