The ability to efficiently query and analyze blockchain data separates amateur projects from production-grade applications. While Solana's raw throughput of thousands of transactions per second creates unprecedented opportunities for data analysis, it also presents unique challenges that require sophisticated indexing strategies. This comprehensive guide explores the architecture, implementation patterns, and optimization techniques necessary for building custom indexers that can keep pace with Solana's high-performance blockchain.
Understanding Solana's Data Architecture
Before diving into indexer implementation, it's crucial to understand how Solana structures and stores data. Unlike account-based blockchains like Ethereum, Solana employs a unique account model where all state is stored in accounts, and programs (smart contracts) are stateless executors that operate on these accounts.
Each Solana account contains several key fields: the owner program, lamports balance, data buffer, and executable flag. The data buffer is particularly important for indexing purposes, as it contains the serialized state that programs use to track application-specific information. Understanding the serialization format—typically Borsh for native programs or Anchor's IDL-based serialization—is essential for correctly parsing account data.
Solana's transaction structure also differs significantly from other blockchains. Each transaction contains a list of instructions, where each instruction specifies a program to invoke along with the accounts it will read from or write to. This explicit account declaration enables Solana's parallel transaction processing but also means indexers must understand the relationship between programs and the accounts they manage.
Indexing Strategies Overview
There are three primary approaches to indexing Solana data, each with distinct trade-offs in terms of latency, completeness, and infrastructure requirements.
RPC Polling
The simplest approach involves periodically querying RPC nodes for account changes and new transactions. While straightforward to implement, RPC polling suffers from several limitations. The polling interval creates inherent latency—you can only detect changes as frequently as you poll. Additionally, high-frequency polling can quickly exhaust RPC rate limits and incur significant costs when using commercial RPC providers.
// Basic RPC polling implementation
import { Connection, PublicKey } from '@solana/web3.js';
class RpcPoller {
private connection: Connection;
private lastSignature: string | undefined;
constructor(rpcUrl: string) {
this.connection = new Connection(rpcUrl, 'confirmed');
}
async pollTransactions(programId: PublicKey, callback: (tx: any) => void) {
const signatures = await this.connection.getSignaturesForAddress(
programId,
{ until: this.lastSignature, limit: 100 }
);
if (signatures.length > 0) {
this.lastSignature = signatures[0].signature;
for (const sig of signatures.reverse()) {
const tx = await this.connection.getParsedTransaction(
sig.signature,
{ maxSupportedTransactionVersion: 0 }
);
if (tx) callback(tx);
}
}
}
}
RPC polling remains useful for low-throughput applications or as a fallback mechanism when real-time streaming is unavailable. However, for production systems requiring near-real-time data, more sophisticated approaches are necessary.
WebSocket Subscriptions
Solana RPC nodes support WebSocket connections for real-time notifications. You can subscribe to account changes, program updates, slot notifications, and transaction signatures. This approach provides significantly lower latency than polling while reducing the load on RPC infrastructure.
// WebSocket subscription implementation
import { Connection, PublicKey } from '@solana/web3.js';
class WebSocketIndexer {
private connection: Connection;
private subscriptionIds: number[] = [];
constructor(rpcUrl: string) {
this.connection = new Connection(rpcUrl, {
wsEndpoint: rpcUrl.replace('https', 'wss'),
commitment: 'confirmed'
});
}
async subscribeToProgram(
programId: PublicKey,
callback: (accountInfo: any, context: any) => void
) {
const subscriptionId = this.connection.onProgramAccountChange(
programId,
(accountInfo, context) => {
callback(accountInfo, context);
},
'confirmed',
[{ dataSize: 165 }] // Optional filters
);
this.subscriptionIds.push(subscriptionId);
return subscriptionId;
}
async subscribeTologs(
programId: PublicKey,
callback: (logs: any, context: any) => void
) {
const subscriptionId = this.connection.onLogs(
programId,
(logs, context) => {
callback(logs, context);
},
'confirmed'
);
this.subscriptionIds.push(subscriptionId);
return subscriptionId;
}
async unsubscribeAll() {
for (const id of this.subscriptionIds) {
await this.connection.removeAccountChangeListener(id);
}
this.subscriptionIds = [];
}
}
WebSocket subscriptions work well for monitoring specific accounts or programs with moderate activity. However, they have limitations when dealing with high-throughput programs or when you need historical data replay capabilities.
Geyser Plugins
For the highest performance and most complete data access, Geyser plugins represent the gold standard. Geyser is Solana's plugin interface that allows validators to stream account updates, transactions, and slot notifications directly to external systems. This approach bypasses the RPC layer entirely, providing the lowest possible latency and highest throughput.
Geyser plugins receive data synchronously as the validator processes blocks, meaning you get updates before they're even available via RPC. This is critical for applications like MEV searchers, high-frequency trading systems, or real-time analytics platforms where milliseconds matter.
// Geyser plugin interface (Rust)
use solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, ReplicaAccountInfoVersions, Result
};
pub struct CustomGeyserPlugin {
// Plugin state
}
impl GeyserPlugin for CustomGeyserPlugin {
fn name(&self) -> &'static str {
"custom-indexer-plugin"
}
fn on_load(&mut self, config_file: &str) -> Result<()> {
// Initialize plugin, connect to database, etc.
Ok(())
}
fn update_account(
&self,
account: ReplicaAccountInfoVersions,
slot: u64,
is_startup: bool
) -> Result<()> {
match account {
ReplicaAccountInfoVersions::V0_0_3(info) => {
// Process account update
// info.pubkey, info.data, info.owner, etc.
}
}
Ok(())
}
fn notify_transaction(
&self,
transaction: ReplicaTransactionInfoVersions,
slot: u64
) -> Result<()> {
// Process transaction
Ok(())
}
}
The primary drawback of Geyser plugins is infrastructure complexity. You need to run your own validator or work with an RPC provider that supports custom Geyser plugins. Services like Helius, Triton, and QuickNode offer managed Geyser streaming through their Yellowstone gRPC interfaces, providing a middle ground between running your own infrastructure and using basic RPC methods.
Database Design Patterns
The choice of database and schema design significantly impacts indexer performance. Different use cases call for different database technologies.
PostgreSQL for Relational Data
PostgreSQL remains an excellent choice for most indexing applications. Its JSONB support allows flexible schema evolution, while traditional relational features enable complex queries across related entities. For Solana indexing, consider the following schema patterns:
-- Core tables for Solana indexing
CREATE TABLE accounts (
pubkey BYTEA PRIMARY KEY,
owner BYTEA NOT NULL,
lamports BIGINT NOT NULL,
data BYTEA,
slot BIGINT NOT NULL,
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_accounts_owner ON accounts(owner);
CREATE INDEX idx_accounts_slot ON accounts(slot);
CREATE TABLE transactions (
signature BYTEA PRIMARY KEY,
slot BIGINT NOT NULL,
block_time TIMESTAMP,
success BOOLEAN NOT NULL,
fee BIGINT NOT NULL,
compute_units_consumed BIGINT,
accounts BYTEA[] NOT NULL,
log_messages TEXT[]
);
CREATE INDEX idx_transactions_slot ON transactions(slot);
CREATE INDEX idx_transactions_block_time ON transactions(block_time);
CREATE INDEX idx_transactions_accounts ON transactions USING GIN(accounts);
-- Program-specific decoded data
CREATE TABLE token_transfers (
signature BYTEA REFERENCES transactions(signature),
instruction_index INT NOT NULL,
source BYTEA NOT NULL,
destination BYTEA NOT NULL,
mint BYTEA NOT NULL,
amount NUMERIC NOT NULL,
PRIMARY KEY (signature, instruction_index)
);
CREATE INDEX idx_token_transfers_source ON token_transfers(source);
CREATE INDEX idx_token_transfers_destination ON token_transfers(destination);
CREATE INDEX idx_token_transfers_mint ON token_transfers(mint);
TimescaleDB for Time-Series Analytics
When your primary use case involves time-based analytics—tracking metrics over time, computing moving averages, or generating historical charts—TimescaleDB extends PostgreSQL with optimized time-series capabilities:
-- TimescaleDB hypertable for metrics
CREATE TABLE program_metrics (
time TIMESTAMPTZ NOT NULL,
program_id BYTEA NOT NULL,
transaction_count INT NOT NULL,
compute_units BIGINT NOT NULL,
unique_signers INT NOT NULL,
fee_total BIGINT NOT NULL
);
SELECT create_hypertable('program_metrics', 'time');
-- Continuous aggregate for hourly rollups
CREATE MATERIALIZED VIEW program_metrics_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
program_id,
SUM(transaction_count) as tx_count,
SUM(compute_units) as total_cu,
SUM(fee_total) as total_fees
FROM program_metrics
GROUP BY bucket, program_id;
ClickHouse for Heavy Analytics
For applications requiring analytical queries across billions of rows—such as blockchain explorers or comprehensive analytics platforms—ClickHouse provides superior query performance through columnar storage and vectorized execution:
-- ClickHouse table for transaction analytics
CREATE TABLE transactions (
signature FixedString(64),
slot UInt64,
block_time DateTime,
success UInt8,
fee UInt64,
compute_units UInt64,
program_ids Array(FixedString(32)),
signer FixedString(32)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(block_time)
ORDER BY (block_time, signature);
-- Materialized view for program statistics
CREATE MATERIALIZED VIEW program_stats_mv
ENGINE = SummingMergeTree()
ORDER BY (program_id, date)
AS SELECT
arrayJoin(program_ids) as program_id,
toDate(block_time) as date,
count() as tx_count,
sum(fee) as total_fees,
sum(compute_units) as total_cu
FROM transactions
GROUP BY program_id, date;
Production Pipeline Architecture
A production-grade indexing pipeline requires careful attention to reliability, scalability, and monitoring. The following architecture provides a robust foundation:
// Production indexer architecture
import { Connection, PublicKey } from '@solana/web3.js';
import { Pool } from 'pg';
import Redis from 'ioredis';
interface IndexerConfig {
rpcUrl: string;
wsUrl: string;
databaseUrl: string;
redisUrl: string;
programs: string[];
}
class ProductionIndexer {
private connection: Connection;
private db: Pool;
private redis: Redis;
private config: IndexerConfig;
private isRunning: boolean = false;
private lastProcessedSlot: number = 0;
constructor(config: IndexerConfig) {
this.config = config;
this.connection = new Connection(config.rpcUrl, {
wsEndpoint: config.wsUrl,
commitment: 'confirmed'
});
this.db = new Pool({ connectionString: config.databaseUrl });
this.redis = new Redis(config.redisUrl);
}
async start() {
this.isRunning = true;
// Restore last processed slot from Redis
const savedSlot = await this.redis.get('indexer:last_slot');
this.lastProcessedSlot = savedSlot ? parseInt(savedSlot) : 0;
// Start WebSocket subscriptions
await this.setupSubscriptions();
// Start gap detection and backfill
this.startGapDetection();
// Start health monitoring
this.startHealthCheck();
}
private async setupSubscriptions() {
for (const programId of this.config.programs) {
const pubkey = new PublicKey(programId);
this.connection.onProgramAccountChange(
pubkey,
async (accountInfo, context) => {
await this.processAccountUpdate(pubkey, accountInfo, context.slot);
},
'confirmed'
);
this.connection.onLogs(
pubkey,
async (logs, context) => {
await this.processLogs(logs, context.slot);
},
'confirmed'
);
}
}
private async processAccountUpdate(
programId: PublicKey,
accountInfo: any,
slot: number
) {
const client = await this.db.connect();
try {
await client.query('BEGIN');
await client.query(`
INSERT INTO accounts (pubkey, owner, lamports, data, slot)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (pubkey) DO UPDATE SET
owner = EXCLUDED.owner,
lamports = EXCLUDED.lamports,
data = EXCLUDED.data,
slot = EXCLUDED.slot,
updated_at = NOW()
WHERE accounts.slot < EXCLUDED.slot
`, [
accountInfo.accountId.toBuffer(),
accountInfo.accountInfo.owner.toBuffer(),
accountInfo.accountInfo.lamports,
accountInfo.accountInfo.data,
slot
]);
await client.query('COMMIT');
await this.updateLastSlot(slot);
} catch (error) {
await client.query('ROLLBACK');
await this.handleError('processAccountUpdate', error);
} finally {
client.release();
}
}
private async processLogs(logs: any, slot: number) {
// Parse and decode instruction logs
// Store relevant events in database
}
private async updateLastSlot(slot: number) {
if (slot > this.lastProcessedSlot) {
this.lastProcessedSlot = slot;
await this.redis.set('indexer:last_slot', slot.toString());
}
}
private startGapDetection() {
setInterval(async () => {
const currentSlot = await this.connection.getSlot();
const gaps = await this.detectGaps(this.lastProcessedSlot, currentSlot);
for (const gap of gaps) {
await this.backfillSlotRange(gap.start, gap.end);
}
}, 60000); // Check every minute
}
private async detectGaps(start: number, end: number): Promise<{start: number, end: number}[]> {
// Query database for missing slots
const result = await this.db.query(`
WITH slot_range AS (
SELECT generate_series($1::bigint, $2::bigint) as slot
)
SELECT sr.slot as gap_start,
LEAD(sr.slot) OVER (ORDER BY sr.slot) as gap_end
FROM slot_range sr
LEFT JOIN transactions t ON t.slot = sr.slot
WHERE t.slot IS NULL
`, [start, end]);
return result.rows.filter(r => r.gap_end - r.gap_start > 1);
}
private async backfillSlotRange(start: number, end: number) {
// Fetch and process historical blocks
}
private startHealthCheck() {
setInterval(async () => {
const metrics = {
lastSlot: this.lastProcessedSlot,
dbConnections: this.db.totalCount,
isRunning: this.isRunning
};
await this.redis.hset('indexer:health', metrics);
}, 10000);
}
private async handleError(context: string, error: any) {
console.error(`[Indexer Error] ${context}:`, error);
// Send to error tracking service
// Implement retry logic
}
}
Decoding Program Data
Raw account data and transaction logs are often serialized using Borsh or other binary formats. Correctly decoding this data requires understanding the program's data structures. For Anchor programs, the IDL (Interface Definition Language) provides a schema for automatic deserialization:
// Anchor IDL-based decoder
import { BorshCoder, Idl } from '@coral-xyz/anchor';
class ProgramDecoder {
private coder: BorshCoder;
constructor(idl: Idl) {
this.coder = new BorshCoder(idl);
}
decodeAccount<T>(accountName: string, data: Buffer): T | null {
try {
return this.coder.accounts.decode<T>(accountName, data);
} catch (error) {
console.error(`Failed to decode ${accountName}:`, error);
return null;
}
}
decodeInstruction(data: Buffer): { name: string; data: any } | null {
try {
return this.coder.instruction.decode(data);
} catch (error) {
console.error('Failed to decode instruction:', error);
return null;
}
}
decodeEvent(eventData: string): any | null {
try {
return this.coder.events.decode(eventData);
} catch (error) {
return null;
}
}
}
// Usage with specific program
import { IDL as RaydiumAmmIdl } from './idl/raydium_amm';
const decoder = new ProgramDecoder(RaydiumAmmIdl);
const poolState = decoder.decodeAccount('PoolState', accountData);
Monitoring and Alerting
Production indexers require comprehensive monitoring to ensure data completeness and system health. Key metrics to track include indexing lag (difference between current slot and last indexed slot), throughput (accounts and transactions processed per second), error rates, and database performance.
// Prometheus metrics for indexer monitoring
import { Registry, Counter, Gauge, Histogram } from 'prom-client';
const registry = new Registry();
const indexerMetrics = {
processedAccounts: new Counter({
name: 'indexer_processed_accounts_total',
help: 'Total number of processed account updates',
labelNames: ['program'],
registers: [registry]
}),
processedTransactions: new Counter({
name: 'indexer_processed_transactions_total',
help: 'Total number of processed transactions',
labelNames: ['program', 'success'],
registers: [registry]
}),
indexingLag: new Gauge({
name: 'indexer_lag_slots',
help: 'Number of slots behind current tip',
registers: [registry]
}),
processingDuration: new Histogram({
name: 'indexer_processing_duration_seconds',
help: 'Time spent processing updates',
labelNames: ['operation'],
buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1],
registers: [registry]
}),
databaseQueryDuration: new Histogram({
name: 'indexer_db_query_duration_seconds',
help: 'Database query duration',
labelNames: ['query_type'],
buckets: [0.001, 0.01, 0.05, 0.1, 0.5, 1, 5],
registers: [registry]
})
};
Optimization Techniques
Several optimization strategies can dramatically improve indexer performance. Batch processing reduces database round-trips by accumulating updates and writing them in batches. Connection pooling with tools like PgBouncer maximizes database throughput. Parallel processing with worker threads or separate processes can increase throughput for CPU-intensive decoding operations.
Caching frequently accessed data in Redis reduces database load for read-heavy operations. Proper indexing of database tables is crucial—analyze query patterns and create appropriate indexes, but avoid over-indexing which slows down writes. Partitioning large tables by time (for transactions) or by program (for accounts) enables efficient data management and query optimization.
Conclusion
Building custom indexers for Solana requires understanding the unique characteristics of Solana's data model and choosing appropriate technologies for your specific use case. Start with WebSocket subscriptions for simpler applications, graduate to Geyser plugins for high-performance requirements, and invest in robust monitoring and error handling for production deployments.
The patterns and code examples presented in this guide provide a foundation for building production-grade indexing infrastructure. As Solana's ecosystem continues to evolve, staying current with new developments in validator software, RPC improvements, and community tools will help you maintain efficient and reliable data pipelines.
Remember that indexer development is iterative—start with a minimal viable implementation, measure performance, identify bottlenecks, and optimize based on real-world data. The investment in proper indexing infrastructure pays dividends through faster queries, more reliable applications, and the ability to derive insights from on-chain data that would otherwise be inaccessible.