Docs

SubscribeCheckpoints - Real-Time Checkpoint Streaming

Stream Sui blockchain checkpoints in real-time using gRPC server-streaming RPC. Monitor network activity, track finality, and build event-driven applications with Dwellir's high-performance infrastructure.

Real-Time Blockchain Monitoring with gRPC Streaming

The SubscribeCheckpoints method from the Subscription Service enables real-time streaming of Sui blockchain checkpoints through server-streaming RPC. This powerful capability allows applications to monitor network activity continuously, receive checkpoint data as it finalizes, and build responsive event-driven systems with minimal latency.

Overview

Checkpoints represent finalized states of the Sui blockchain. Each checkpoint contains a batch of executed transactions and represents a guaranteed point of finality. By subscribing to checkpoints, applications can:

  • Monitor blockchain activity in real-time
  • Track transaction finality as it occurs
  • Build event-driven architectures without polling
  • Maintain synchronized local state
  • Analyze network throughput and gas consumption

Key Features

  • Server Streaming: Continuous data flow from server to client
  • Real-Time Updates: Receive checkpoints as they finalize (sub-second latency)
  • Field Masking: Control data volume by selecting specific fields
  • Automatic Resumption: Reconnection support for reliable streaming
  • Low Overhead: HTTP/2 multiplexing enables efficient long-lived connections

Method Signature

Service: sui.rpc.v2.SubscriptionService Method: SubscribeCheckpoints Type: Server-streaming RPC (single request, stream of responses)

Use Cases

Real-Time Network Monitor

Build a live blockchain monitoring dashboard:

TypeScript
class NetworkMonitor {
  private checkpointCount: number = 0;
  private startTime: number;
  private lastSequence: number = 0;

  constructor() {
    this.startTime = Date.now();
  }

  processCheckpoint(checkpoint: any): void {
    this.checkpointCount++;
    const currentSequence = parseInt(checkpoint.sequence_number);

    // Calculate metrics
    const elapsed = (Date.now() - this.startTime) / 1000;
    const checkpointsPerSecond = this.checkpointCount / elapsed;

    // Detect missed checkpoints
    if (this.lastSequence > 0) {
      const gap = currentSequence - this.lastSequence - 1;
      if (gap > 0) {
        console.warn(`āš ļø Missed ${gap} checkpoint(s)`);
      }
    }

    this.lastSequence = currentSequence;

    // Display metrics
    console.log('\nšŸ“Š Network Metrics:');
    console.log(`Checkpoints/sec: ${checkpointsPerSecond.toFixed(2)}`);
    console.log(`Total Checkpoints: ${this.checkpointCount}`);
    console.log(`Network TPS: ${this.calculateTPS(checkpoint)}`);
  }

  private calculateTPS(checkpoint: any): number {
    const txCount = checkpoint.transactions?.length || 0;
    // Assuming ~1 checkpoint per 0.5 seconds
    return txCount * 2;
  }
}

Transaction Finality Tracker

Monitor when transactions reach finality:

TypeScript
class FinalityTracker {
  private pendingTxs: Set<string> = new Set();

  watchTransaction(txDigest: string): void {
    this.pendingTxs.add(txDigest);
    console.log(`Watching transaction: ${txDigest}`);
  }

  processCheckpoint(checkpoint: any): void {
    const finalizedTxs: string[] = [];

    for (const txDigest of checkpoint.transactions || []) {
      if (this.pendingTxs.has(txDigest)) {
        finalizedTxs.push(txDigest);
        this.pendingTxs.delete(txDigest);
      }
    }

    if (finalizedTxs.length > 0) {
      console.log('\nāœ… Transactions Finalized:');
      finalizedTxs.forEach(tx => {
        console.log(`  ${tx}`);
        this.onTransactionFinalized(tx, checkpoint.sequence_number);
      });
    }
  }

  private onTransactionFinalized(txDigest: string, checkpoint: string): void {
    // Trigger callbacks, update UI, send notifications
  }
}

Epoch Transition Detector

Detect and handle epoch transitions:

TypeScript
class EpochMonitor {
  private currentEpoch: number = 0;

  processCheckpoint(checkpoint: any): void {
    if (checkpoint.end_of_epoch_data) {
      const newEpoch = this.currentEpoch + 1;

      console.log('\nšŸ”„ Epoch Transition Detected:');
      console.log(`Old Epoch: ${this.currentEpoch}`);
      console.log(`New Epoch: ${newEpoch}`);
      console.log(`Transition Checkpoint: ${checkpoint.sequence_number}`);

      this.handleEpochTransition(newEpoch, checkpoint.end_of_epoch_data);
      this.currentEpoch = newEpoch;
    }
  }

  private handleEpochTransition(newEpoch: number, epochData: any): void {
    // Process epoch change
    // - Update validator set
    // - Record gas costs
    // - Calculate staking rewards
    // - Update system parameters
  }
}

Blockchain Analytics Pipeline

Stream checkpoint data to analytics system:

TypeScript
interface CheckpointMetrics {
  sequence: number;
  timestamp: Date;
  transactionCount: number;
  totalTransactions: number;
  throughput: number;
}

class AnalyticsPipeline {
  private metrics: CheckpointMetrics[] = [];
  private windowSize: number = 100;  // Keep last 100 checkpoints

  processCheckpoint(checkpoint: any): void {
    const metrics: CheckpointMetrics = {
      sequence: parseInt(checkpoint.sequence_number),
      timestamp: new Date(parseInt(checkpoint.timestamp_ms)),
      transactionCount: checkpoint.transactions?.length || 0,
      totalTransactions: parseInt(checkpoint.network_total_transactions),
      throughput: this.calculateThroughput(checkpoint)
    };

    this.metrics.push(metrics);

    // Maintain rolling window
    if (this.metrics.length > this.windowSize) {
      this.metrics.shift();
    }

    // Compute aggregates
    this.computeAggregates();
  }

  private calculateThroughput(checkpoint: any): number {
    const txCount = checkpoint.transactions?.length || 0;
    // Approximate TPS based on checkpoint interval
    return txCount * 2;  // Assuming 0.5s per checkpoint
  }

  private computeAggregates(): void {
    const totalTx = this.metrics.reduce((sum, m) => sum + m.transactionCount, 0);
    const avgThroughput = totalTx / this.metrics.length;

    console.log('\nšŸ“ˆ Analytics (last 100 checkpoints):');
    console.log(`Average TPS: ${avgThroughput.toFixed(2)}`);
    console.log(`Total Transactions: ${totalTx}`);
  }
}

Stream Management

Reconnection Strategy

Implement robust reconnection logic:

TypeScript
class ResilientSubscriber {
  private maxRetries: number = 10;
  private retryCount: number = 0;
  private baseDelay: number = 1000;  // 1 second

  async connectWithRetry(): Promise<void> {
    while (this.retryCount < this.maxRetries) {
      try {
        await this.connect();
        this.retryCount = 0;  // Reset on success
        return;
      } catch (error) {
        this.retryCount++;
        const delay = Math.min(
          this.baseDelay * Math.pow(2, this.retryCount),
          30000  // Max 30 seconds
        );

        console.log(
          `Connection failed. Retry ${this.retryCount}/${this.maxRetries} in ${delay}ms`
        );

        await new Promise(resolve => setTimeout(resolve, delay));
      }
    }

    throw new Error('Max retries exceeded');
  }

  private connect(): Promise<void> {
    return new Promise((resolve, reject) => {
      const stream = client.SubscribeCheckpoints(request, metadata);

      stream.on('data', (checkpoint) => {
        this.handleCheckpoint(checkpoint);
      });

      stream.on('error', (error) => {
        reject(error);
      });

      stream.on('end', () => {
        resolve();
      });
    });
  }
}

Health Monitoring

Monitor stream health and detect stalls:

TypeScript
class HealthMonitor {
  private lastCheckpointTime: number = Date.now();
  private healthCheckInterval: number = 10000;  // 10 seconds
  private maxStallDuration: number = 30000;  // 30 seconds

  startMonitoring(): void {
    setInterval(() => {
      this.checkHealth();
    }, this.healthCheckInterval);
  }

  onCheckpointReceived(): void {
    this.lastCheckpointTime = Date.now();
  }

  private checkHealth(): void {
    const stallDuration = Date.now() - this.lastCheckpointTime;

    if (stallDuration > this.maxStallDuration) {
      console.error('āš ļø Stream stalled! No checkpoints received.');
      this.handleStall();
    }
  }

  private handleStall(): void {
    // Restart stream, alert operators, switch to backup
  }
}

Performance Considerations

Bandwidth Optimization

Minimize bandwidth with selective field masking:

Field MaskCheckpoint SizeBandwidth (1000 checkpoints)
All fields~2.5 KB2.5 MB
Standard (5 fields)~400 bytes400 KB
Minimal (2 fields)~80 bytes80 KB

Recommendation: Request only fields needed for your use case.

Backpressure Handling

Process checkpoints faster than they arrive:

TypeScript
class BackpressureHandler {
  private queue: any[] = [];
  private processing: boolean = false;
  private maxQueueSize: number = 100;

  async onCheckpoint(checkpoint: any): Promise<void> {
    if (this.queue.length >= this.maxQueueSize) {
      console.warn('āš ļø Queue full, dropping checkpoint');
      return;
    }

    this.queue.push(checkpoint);

    if (!this.processing) {
      this.processQueue();
    }
  }

  private async processQueue(): Promise<void> {
    this.processing = true;

    while (this.queue.length > 0) {
      const checkpoint = this.queue.shift();
      await this.processCheckpoint(checkpoint);
    }

    this.processing = false;
  }

  private async processCheckpoint(checkpoint: any): Promise<void> {
    // Heavy processing logic
  }
}

Best Practices

Implement Graceful Shutdown

Handle termination signals properly:

TypeScript
let isShuttingDown = false;

process.on('SIGINT', async () => {
  if (isShuttingDown) return;

  isShuttingDown = true;
  console.log('\nGraceful shutdown initiated...');

  // Cancel stream
  stream.cancel();

  // Flush pending data
  await flushPendingData();

  // Close connections
  await closeConnections();

  process.exit(0);
});

Monitor Stream Health

Track metrics and detect issues:

TypeScript
const metrics = {
  checkpointsReceived: 0,
  lastCheckpointTime: Date.now(),
  errors: 0,
  reconnections: 0
};

setInterval(() => {
  console.log('Stream Metrics:', metrics);

  if (Date.now() - metrics.lastCheckpointTime > 60000) {
    console.error('No checkpoints for 60s!');
  }
}, 30000);

Use Field Masking

Request only necessary fields:

TypeScript
// āœ… Good: Only request needed fields
const request = {
  read_mask: {
    paths: ['sequence_number', 'transactions']
  }
};

// āŒ Bad: Request all fields
const request = {};  // Wastes bandwidth

Performance Benefits

gRPC Streaming vs HTTP Polling:

MetricgRPC StreamingHTTP Polling (5s interval)
LatencyLess than 1s2.5s average
Bandwidth (1 hour)1.4 MB8.6 MB
OverheadMinimal (single connection)High (720 requests)
Missed UpdatesNonePossible

Need help with real-time streaming? Contact our support team or check the gRPC overview.