Skip to main content

SubscribeCheckpoints

Real-Time Blockchain Monitoring with gRPC Streaming#

The SubscribeCheckpoints method from the Subscription Service enables real-time streaming of Sui blockchain checkpoints through server-streaming RPC. This powerful capability allows applications to monitor network activity continuously, receive checkpoint data as it finalizes, and build responsive event-driven systems with minimal latency.

Overview#

Checkpoints represent finalized states of the Sui blockchain. Each checkpoint contains a batch of executed transactions and represents a guaranteed point of finality. By subscribing to checkpoints, applications can:

  • Monitor blockchain activity in real-time
  • Track transaction finality as it occurs
  • Build event-driven architectures without polling
  • Maintain synchronized local state
  • Analyze network throughput and gas consumption

Key Features#

  • Server Streaming: Continuous data flow from server to client
  • Real-Time Updates: Receive checkpoints as they finalize (sub-second latency)
  • Field Masking: Control data volume by selecting specific fields
  • Automatic Resumption: Reconnection support for reliable streaming
  • Low Overhead: HTTP/2 multiplexing enables efficient long-lived connections

Method Signature#

Service: sui.rpc.v2beta2.SubscriptionService Method: SubscribeCheckpoints Type: Server-streaming RPC (single request, stream of responses)

Parameters#

ParameterTypeRequiredDescription
read_maskFieldMaskNoSpecifies which checkpoint fields to include in the stream

Field Mask Options#

The read_mask parameter controls which fields are streamed. Available paths include:

PathDescription
sequence_numberCheckpoint sequence number (incrementing counter)
digestCryptographic hash of the checkpoint
network_total_transactionsCumulative transaction count across network
previous_digestPrevious checkpoint's digest (for chain verification)
epoch_rolling_gas_cost_summaryGas cost aggregates for the epoch
timestamp_msCheckpoint finalization timestamp (milliseconds)
transactionsArray of transaction digests included in checkpoint
end_of_epoch_dataEpoch transition data (present only at epoch boundaries)
validator_signatureValidator signatures proving checkpoint validity

Example Field Masks#

// Minimal: Only sequence and timestamp
read_mask: {
paths: ["sequence_number", "timestamp_ms"]
}

// Standard: Core checkpoint data
read_mask: {
paths: [
"sequence_number",
"digest",
"timestamp_ms",
"network_total_transactions"
]
}

// Complete: All checkpoint fields
read_mask: {
paths: [
"sequence_number",
"digest",
"network_total_transactions",
"previous_digest",
"timestamp_ms",
"transactions",
"end_of_epoch_data"
]
}

Response Structure#

The stream yields a series of Checkpoint messages:

message Checkpoint {
uint64 sequence_number = 1;
string digest = 2;
uint64 network_total_transactions = 3;
string previous_digest = 4;
uint64 timestamp_ms = 5;
repeated string transactions = 6;
EpochData end_of_epoch_data = 7;
ValidatorSignature validator_signature = 8;
GasCostSummary epoch_rolling_gas_cost_summary = 9;
}

Response Fields#

FieldTypeDescription
sequence_numberuint64Monotonically increasing checkpoint identifier
digeststringCheckpoint digest hash
network_total_transactionsuint64Total transactions executed on network since genesis
previous_digeststringPrevious checkpoint's digest for chain linking
timestamp_msuint64Unix timestamp in milliseconds
transactionsrepeated stringTransaction digests in this checkpoint
end_of_epoch_dataEpochDataEpoch transition information (only at epoch end)
validator_signatureValidatorSignatureSignature data from validators
epoch_rolling_gas_cost_summaryGasCostSummaryCumulative gas costs for current epoch

Code Examples#

import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';

// Configuration
const ENDPOINT = 'api-sui-mainnet-full.n.dwellir.com';
const API_TOKEN = 'your_api_token_here';

// Load proto definition
const packageDefinition = protoLoader.loadSync(
'./protos/subscription.proto',
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
includeDirs: ['./protos']
}
);

const protoDescriptor = grpc.loadPackageDefinition(packageDefinition) as any;

// Create client with TLS
const credentials = grpc.credentials.createSsl();
const client = new protoDescriptor.sui.rpc.v2beta2.SubscriptionService(
ENDPOINT,
credentials
);

// Setup authentication
const metadata = new grpc.Metadata();
metadata.add('x-api-key', API_TOKEN);

// Subscribe to checkpoint stream
class CheckpointSubscriber {
private stream: any;
private isConnected: boolean = false;

connect(): void {
const request = {
read_mask: {
paths: [
'sequence_number',
'digest',
'timestamp_ms',
'network_total_transactions',
'transactions'
]
}
};

this.stream = client.SubscribeCheckpoints(request, metadata);
this.isConnected = true;

// Handle incoming checkpoints
this.stream.on('data', (checkpoint: any) => {
this.handleCheckpoint(checkpoint);
});

// Handle stream errors
this.stream.on('error', (error: any) => {
console.error('Stream error:', error.message);
this.isConnected = false;
this.reconnect();
});

// Handle stream end
this.stream.on('end', () => {
console.log('Stream ended');
this.isConnected = false;
this.reconnect();
});

console.log('✓ Checkpoint subscription started');
}

private handleCheckpoint(checkpoint: any): void {
const timestamp = new Date(parseInt(checkpoint.timestamp_ms));

console.log('\n📦 New Checkpoint:');
console.log('================');
console.log('Sequence:', checkpoint.sequence_number);
console.log('Digest:', checkpoint.digest);
console.log('Time:', timestamp.toISOString());
console.log('Total Transactions:', checkpoint.network_total_transactions);
console.log('Transactions in Checkpoint:', checkpoint.transactions?.length || 0);

// Process checkpoint data
this.processCheckpoint(checkpoint);
}

private processCheckpoint(checkpoint: any): void {
// Custom processing logic
// - Update local database
// - Trigger webhooks
// - Analyze transaction patterns
// - Calculate network metrics
}

private reconnect(): void {
console.log('Reconnecting in 5 seconds...');
setTimeout(() => {
if (!this.isConnected) {
this.connect();
}
}, 5000);
}

disconnect(): void {
if (this.stream) {
this.stream.cancel();
this.isConnected = false;
console.log('✓ Disconnected from checkpoint stream');
}
}
}

// Usage
const subscriber = new CheckpointSubscriber();
subscriber.connect();

// Graceful shutdown
process.on('SIGINT', () => {
console.log('\nShutting down...');
subscriber.disconnect();
process.exit(0);
});

// Keep process alive
process.stdin.resume();

Use Cases#

Real-Time Network Monitor#

Build a live blockchain monitoring dashboard:

class NetworkMonitor {
private checkpointCount: number = 0;
private startTime: number;
private lastSequence: number = 0;

constructor() {
this.startTime = Date.now();
}

processCheckpoint(checkpoint: any): void {
this.checkpointCount++;
const currentSequence = parseInt(checkpoint.sequence_number);

// Calculate metrics
const elapsed = (Date.now() - this.startTime) / 1000;
const checkpointsPerSecond = this.checkpointCount / elapsed;

// Detect missed checkpoints
if (this.lastSequence > 0) {
const gap = currentSequence - this.lastSequence - 1;
if (gap > 0) {
console.warn(`⚠️ Missed ${gap} checkpoint(s)`);
}
}

this.lastSequence = currentSequence;

// Display metrics
console.log('\n📊 Network Metrics:');
console.log(`Checkpoints/sec: ${checkpointsPerSecond.toFixed(2)}`);
console.log(`Total Checkpoints: ${this.checkpointCount}`);
console.log(`Network TPS: ${this.calculateTPS(checkpoint)}`);
}

private calculateTPS(checkpoint: any): number {
const txCount = checkpoint.transactions?.length || 0;
// Assuming ~1 checkpoint per 0.5 seconds
return txCount * 2;
}
}

Transaction Finality Tracker#

Monitor when transactions reach finality:

class FinalityTracker {
private pendingTxs: Set<string> = new Set();

watchTransaction(txDigest: string): void {
this.pendingTxs.add(txDigest);
console.log(`Watching transaction: ${txDigest}`);
}

processCheckpoint(checkpoint: any): void {
const finalizedTxs: string[] = [];

for (const txDigest of checkpoint.transactions || []) {
if (this.pendingTxs.has(txDigest)) {
finalizedTxs.push(txDigest);
this.pendingTxs.delete(txDigest);
}
}

if (finalizedTxs.length > 0) {
console.log('\n✅ Transactions Finalized:');
finalizedTxs.forEach(tx => {
console.log(` ${tx}`);
this.onTransactionFinalized(tx, checkpoint.sequence_number);
});
}
}

private onTransactionFinalized(txDigest: string, checkpoint: string): void {
// Trigger callbacks, update UI, send notifications
}
}

Epoch Transition Detector#

Detect and handle epoch transitions:

class EpochMonitor {
private currentEpoch: number = 0;

processCheckpoint(checkpoint: any): void {
if (checkpoint.end_of_epoch_data) {
const newEpoch = this.currentEpoch + 1;

console.log('\n🔄 Epoch Transition Detected:');
console.log(`Old Epoch: ${this.currentEpoch}`);
console.log(`New Epoch: ${newEpoch}`);
console.log(`Transition Checkpoint: ${checkpoint.sequence_number}`);

this.handleEpochTransition(newEpoch, checkpoint.end_of_epoch_data);
this.currentEpoch = newEpoch;
}
}

private handleEpochTransition(newEpoch: number, epochData: any): void {
// Process epoch change
// - Update validator set
// - Record gas costs
// - Calculate staking rewards
// - Update system parameters
}
}

Blockchain Analytics Pipeline#

Stream checkpoint data to analytics system:

interface CheckpointMetrics {
sequence: number;
timestamp: Date;
transactionCount: number;
totalTransactions: number;
throughput: number;
}

class AnalyticsPipeline {
private metrics: CheckpointMetrics[] = [];
private windowSize: number = 100; // Keep last 100 checkpoints

processCheckpoint(checkpoint: any): void {
const metrics: CheckpointMetrics = {
sequence: parseInt(checkpoint.sequence_number),
timestamp: new Date(parseInt(checkpoint.timestamp_ms)),
transactionCount: checkpoint.transactions?.length || 0,
totalTransactions: parseInt(checkpoint.network_total_transactions),
throughput: this.calculateThroughput(checkpoint)
};

this.metrics.push(metrics);

// Maintain rolling window
if (this.metrics.length > this.windowSize) {
this.metrics.shift();
}

// Compute aggregates
this.computeAggregates();
}

private calculateThroughput(checkpoint: any): number {
const txCount = checkpoint.transactions?.length || 0;
// Approximate TPS based on checkpoint interval
return txCount * 2; // Assuming 0.5s per checkpoint
}

private computeAggregates(): void {
const totalTx = this.metrics.reduce((sum, m) => sum + m.transactionCount, 0);
const avgThroughput = totalTx / this.metrics.length;

console.log('\n📈 Analytics (last 100 checkpoints):');
console.log(`Average TPS: ${avgThroughput.toFixed(2)}`);
console.log(`Total Transactions: ${totalTx}`);
}
}

Stream Management#

Reconnection Strategy#

Implement robust reconnection logic:

class ResilientSubscriber {
private maxRetries: number = 10;
private retryCount: number = 0;
private baseDelay: number = 1000; // 1 second

async connectWithRetry(): Promise<void> {
while (this.retryCount < this.maxRetries) {
try {
await this.connect();
this.retryCount = 0; // Reset on success
return;
} catch (error) {
this.retryCount++;
const delay = Math.min(
this.baseDelay * Math.pow(2, this.retryCount),
30000 // Max 30 seconds
);

console.log(
`Connection failed. Retry ${this.retryCount}/${this.maxRetries} in ${delay}ms`
);

await new Promise(resolve => setTimeout(resolve, delay));
}
}

throw new Error('Max retries exceeded');
}

private connect(): Promise<void> {
return new Promise((resolve, reject) => {
const stream = client.SubscribeCheckpoints(request, metadata);

stream.on('data', (checkpoint) => {
this.handleCheckpoint(checkpoint);
});

stream.on('error', (error) => {
reject(error);
});

stream.on('end', () => {
resolve();
});
});
}
}

Health Monitoring#

Monitor stream health and detect stalls:

class HealthMonitor {
private lastCheckpointTime: number = Date.now();
private healthCheckInterval: number = 10000; // 10 seconds
private maxStallDuration: number = 30000; // 30 seconds

startMonitoring(): void {
setInterval(() => {
this.checkHealth();
}, this.healthCheckInterval);
}

onCheckpointReceived(): void {
this.lastCheckpointTime = Date.now();
}

private checkHealth(): void {
const stallDuration = Date.now() - this.lastCheckpointTime;

if (stallDuration > this.maxStallDuration) {
console.error('⚠️ Stream stalled! No checkpoints received.');
this.handleStall();
}
}

private handleStall(): void {
// Restart stream, alert operators, switch to backup
}
}

Performance Considerations#

Bandwidth Optimization#

Minimize bandwidth with selective field masking:

Field MaskCheckpoint SizeBandwidth (1000 checkpoints)
All fields~2.5 KB2.5 MB
Standard (5 fields)~400 bytes400 KB
Minimal (2 fields)~80 bytes80 KB

Recommendation: Request only fields needed for your use case.

Backpressure Handling#

Process checkpoints faster than they arrive:

class BackpressureHandler {
private queue: any[] = [];
private processing: boolean = false;
private maxQueueSize: number = 100;

async onCheckpoint(checkpoint: any): Promise<void> {
if (this.queue.length >= this.maxQueueSize) {
console.warn('⚠️ Queue full, dropping checkpoint');
return;
}

this.queue.push(checkpoint);

if (!this.processing) {
this.processQueue();
}
}

private async processQueue(): Promise<void> {
this.processing = true;

while (this.queue.length > 0) {
const checkpoint = this.queue.shift();
await this.processCheckpoint(checkpoint);
}

this.processing = false;
}

private async processCheckpoint(checkpoint: any): Promise<void> {
// Heavy processing logic
}
}

Error Handling#

Common streaming errors and solutions:

stream.on('error', (error: any) => {
switch (error.code) {
case grpc.status.UNAVAILABLE:
console.error('Service unavailable. Reconnecting...');
this.reconnect();
break;

case grpc.status.PERMISSION_DENIED:
console.error('Authentication failed. Check API token.');
// Don't reconnect, fix authentication
break;

case grpc.status.RESOURCE_EXHAUSTED:
console.error('Rate limit exceeded. Backoff required.');
this.reconnectWithBackoff();
break;

case grpc.status.CANCELLED:
console.log('Stream cancelled by client');
// Expected during shutdown
break;

default:
console.error('Unexpected error:', error.message);
this.reconnect();
}
});

Best Practices#

Implement Graceful Shutdown#

Handle termination signals properly:

let isShuttingDown = false;

process.on('SIGINT', async () => {
if (isShuttingDown) return;

isShuttingDown = true;
console.log('\nGraceful shutdown initiated...');

// Cancel stream
stream.cancel();

// Flush pending data
await flushPendingData();

// Close connections
await closeConnections();

process.exit(0);
});

Monitor Stream Health#

Track metrics and detect issues:

const metrics = {
checkpointsReceived: 0,
lastCheckpointTime: Date.now(),
errors: 0,
reconnections: 0
};

setInterval(() => {
console.log('Stream Metrics:', metrics);

if (Date.now() - metrics.lastCheckpointTime > 60000) {
console.error('No checkpoints for 60s!');
}
}, 30000);

Use Field Masking#

Request only necessary fields:

// ✅ Good: Only request needed fields
const request = {
read_mask: {
paths: ['sequence_number', 'transactions']
}
};

// ❌ Bad: Request all fields
const request = {}; // Wastes bandwidth

Performance Benefits#

gRPC Streaming vs HTTP Polling:

MetricgRPC StreamingHTTP Polling (5s interval)
LatencyLess than 1s2.5s average
Bandwidth (1 hour)1.4 MB8.6 MB
OverheadMinimal (single connection)High (720 requests)
Missed UpdatesNonePossible

Need help with real-time streaming? Contact our support team or check the gRPC overview.