SubscribeCheckpoints
Real-Time Blockchain Monitoring with gRPC Streaming#
The SubscribeCheckpoints method from the Subscription Service enables real-time streaming of Sui blockchain checkpoints through server-streaming RPC. This powerful capability allows applications to monitor network activity continuously, receive checkpoint data as it finalizes, and build responsive event-driven systems with minimal latency.
Overview#
Checkpoints represent finalized states of the Sui blockchain. Each checkpoint contains a batch of executed transactions and represents a guaranteed point of finality. By subscribing to checkpoints, applications can:
- Monitor blockchain activity in real-time
- Track transaction finality as it occurs
- Build event-driven architectures without polling
- Maintain synchronized local state
- Analyze network throughput and gas consumption
Key Features#
- Server Streaming: Continuous data flow from server to client
- Real-Time Updates: Receive checkpoints as they finalize (sub-second latency)
- Field Masking: Control data volume by selecting specific fields
- Automatic Resumption: Reconnection support for reliable streaming
- Low Overhead: HTTP/2 multiplexing enables efficient long-lived connections
Method Signature#
Service: sui.rpc.v2beta2.SubscriptionService
Method: SubscribeCheckpoints
Type: Server-streaming RPC (single request, stream of responses)
Parameters#
| Parameter | Type | Required | Description |
|---|---|---|---|
read_mask | FieldMask | No | Specifies which checkpoint fields to include in the stream |
Field Mask Options#
The read_mask parameter controls which fields are streamed. Available paths include:
| Path | Description |
|---|---|
sequence_number | Checkpoint sequence number (incrementing counter) |
digest | Cryptographic hash of the checkpoint |
network_total_transactions | Cumulative transaction count across network |
previous_digest | Previous checkpoint's digest (for chain verification) |
epoch_rolling_gas_cost_summary | Gas cost aggregates for the epoch |
timestamp_ms | Checkpoint finalization timestamp (milliseconds) |
transactions | Array of transaction digests included in checkpoint |
end_of_epoch_data | Epoch transition data (present only at epoch boundaries) |
validator_signature | Validator signatures proving checkpoint validity |
Example Field Masks#
// Minimal: Only sequence and timestamp
read_mask: {
paths: ["sequence_number", "timestamp_ms"]
}
// Standard: Core checkpoint data
read_mask: {
paths: [
"sequence_number",
"digest",
"timestamp_ms",
"network_total_transactions"
]
}
// Complete: All checkpoint fields
read_mask: {
paths: [
"sequence_number",
"digest",
"network_total_transactions",
"previous_digest",
"timestamp_ms",
"transactions",
"end_of_epoch_data"
]
}
Response Structure#
The stream yields a series of Checkpoint messages:
message Checkpoint {
uint64 sequence_number = 1;
string digest = 2;
uint64 network_total_transactions = 3;
string previous_digest = 4;
uint64 timestamp_ms = 5;
repeated string transactions = 6;
EpochData end_of_epoch_data = 7;
ValidatorSignature validator_signature = 8;
GasCostSummary epoch_rolling_gas_cost_summary = 9;
}
Response Fields#
| Field | Type | Description |
|---|---|---|
sequence_number | uint64 | Monotonically increasing checkpoint identifier |
digest | string | Checkpoint digest hash |
network_total_transactions | uint64 | Total transactions executed on network since genesis |
previous_digest | string | Previous checkpoint's digest for chain linking |
timestamp_ms | uint64 | Unix timestamp in milliseconds |
transactions | repeated string | Transaction digests in this checkpoint |
end_of_epoch_data | EpochData | Epoch transition information (only at epoch end) |
validator_signature | ValidatorSignature | Signature data from validators |
epoch_rolling_gas_cost_summary | GasCostSummary | Cumulative gas costs for current epoch |
Code Examples#
- TypeScript
- Python
- Go
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
// Configuration
const ENDPOINT = 'api-sui-mainnet-full.n.dwellir.com';
const API_TOKEN = 'your_api_token_here';
// Load proto definition
const packageDefinition = protoLoader.loadSync(
'./protos/subscription.proto',
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
includeDirs: ['./protos']
}
);
const protoDescriptor = grpc.loadPackageDefinition(packageDefinition) as any;
// Create client with TLS
const credentials = grpc.credentials.createSsl();
const client = new protoDescriptor.sui.rpc.v2beta2.SubscriptionService(
ENDPOINT,
credentials
);
// Setup authentication
const metadata = new grpc.Metadata();
metadata.add('x-api-key', API_TOKEN);
// Subscribe to checkpoint stream
class CheckpointSubscriber {
private stream: any;
private isConnected: boolean = false;
connect(): void {
const request = {
read_mask: {
paths: [
'sequence_number',
'digest',
'timestamp_ms',
'network_total_transactions',
'transactions'
]
}
};
this.stream = client.SubscribeCheckpoints(request, metadata);
this.isConnected = true;
// Handle incoming checkpoints
this.stream.on('data', (checkpoint: any) => {
this.handleCheckpoint(checkpoint);
});
// Handle stream errors
this.stream.on('error', (error: any) => {
console.error('Stream error:', error.message);
this.isConnected = false;
this.reconnect();
});
// Handle stream end
this.stream.on('end', () => {
console.log('Stream ended');
this.isConnected = false;
this.reconnect();
});
console.log('✓ Checkpoint subscription started');
}
private handleCheckpoint(checkpoint: any): void {
const timestamp = new Date(parseInt(checkpoint.timestamp_ms));
console.log('\n📦 New Checkpoint:');
console.log('================');
console.log('Sequence:', checkpoint.sequence_number);
console.log('Digest:', checkpoint.digest);
console.log('Time:', timestamp.toISOString());
console.log('Total Transactions:', checkpoint.network_total_transactions);
console.log('Transactions in Checkpoint:', checkpoint.transactions?.length || 0);
// Process checkpoint data
this.processCheckpoint(checkpoint);
}
private processCheckpoint(checkpoint: any): void {
// Custom processing logic
// - Update local database
// - Trigger webhooks
// - Analyze transaction patterns
// - Calculate network metrics
}
private reconnect(): void {
console.log('Reconnecting in 5 seconds...');
setTimeout(() => {
if (!this.isConnected) {
this.connect();
}
}, 5000);
}
disconnect(): void {
if (this.stream) {
this.stream.cancel();
this.isConnected = false;
console.log('✓ Disconnected from checkpoint stream');
}
}
}
// Usage
const subscriber = new CheckpointSubscriber();
subscriber.connect();
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\nShutting down...');
subscriber.disconnect();
process.exit(0);
});
// Keep process alive
process.stdin.resume();
import grpc
import signal
import sys
from datetime import datetime
from google.protobuf import field_mask_pb2
import subscription_service_pb2
import subscription_service_pb2_grpc
# Configuration
ENDPOINT = 'api-sui-mainnet-full.n.dwellir.com'
API_TOKEN = 'your_api_token_here'
class CheckpointSubscriber:
def __init__(self, endpoint: str, api_token: str):
self.endpoint = endpoint
self.api_token = api_token
self.channel = None
self.is_running = False
def connect(self):
"""Establish secure gRPC connection and start streaming"""
# Create secure channel
credentials = grpc.ssl_channel_credentials()
self.channel = grpc.secure_channel(self.endpoint, credentials)
# Create client
client = subscription_service_pb2_grpc.SubscriptionServiceStub(self.channel)
# Setup authentication
metadata = [('x-api-key', self.api_token)]
# Build subscription request
request = subscription_service_pb2.SubscribeCheckpointsRequest(
read_mask=field_mask_pb2.FieldMask(
paths=[
'sequence_number',
'digest',
'timestamp_ms',
'network_total_transactions',
'transactions'
]
)
)
self.is_running = True
print('✓ Checkpoint subscription started')
try:
# Start streaming
for checkpoint in client.SubscribeCheckpoints(request, metadata=metadata):
if not self.is_running:
break
self.handle_checkpoint(checkpoint)
except grpc.RpcError as e:
print(f'Stream error: {e.code()}: {e.details()}')
self.reconnect()
except Exception as e:
print(f'Unexpected error: {e}')
self.reconnect()
def handle_checkpoint(self, checkpoint):
"""Process incoming checkpoint data"""
timestamp = datetime.fromtimestamp(checkpoint.timestamp_ms / 1000)
print('\n📦 New Checkpoint:')
print('================')
print(f'Sequence: {checkpoint.sequence_number}')
print(f'Digest: {checkpoint.digest}')
print(f'Time: {timestamp.isoformat()}')
print(f'Total Transactions: {checkpoint.network_total_transactions}')
print(f'Transactions in Checkpoint: {len(checkpoint.transactions)}')
# Custom processing
self.process_checkpoint(checkpoint)
def process_checkpoint(self, checkpoint):
"""Custom checkpoint processing logic"""
# - Store in database
# - Trigger events
# - Update metrics
# - Analyze patterns
pass
def reconnect(self):
"""Attempt to reconnect after disconnection"""
if self.is_running:
print('Reconnecting in 5 seconds...')
import time
time.sleep(5)
self.connect()
def stop(self):
"""Gracefully stop the subscription"""
print('\nShutting down...')
self.is_running = False
if self.channel:
self.channel.close()
print('✓ Disconnected from checkpoint stream')
# Usage
def main():
subscriber = CheckpointSubscriber(ENDPOINT, API_TOKEN)
# Setup signal handlers for graceful shutdown
def signal_handler(sig, frame):
subscriber.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Start subscription
subscriber.connect()
if __name__ == '__main__':
main()
package main
import (
"context"
"fmt"
"io"
"log"
"os"
"os/signal"
"syscall"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/fieldmaskpb"
pb "sui-grpc-client/sui/rpc/v2beta2"
)
const (
endpoint = "api-sui-mainnet-full.n.dwellir.com"
apiToken = "your_api_token_here"
)
type CheckpointSubscriber struct {
conn *grpc.ClientConn
client pb.SubscriptionServiceClient
cancel context.CancelFunc
}
func NewCheckpointSubscriber(endpoint, token string) (*CheckpointSubscriber, error) {
// Create TLS credentials
creds := credentials.NewClientTLSFromCert(nil, "")
// Establish connection
conn, err := grpc.Dial(
endpoint,
grpc.WithTransportCredentials(creds),
)
if err != nil {
return nil, fmt.Errorf("failed to connect: %w", err)
}
// Create client
client := pb.NewSubscriptionServiceClient(conn)
return &CheckpointSubscriber{
conn: conn,
client: client,
}, nil
}
func (cs *CheckpointSubscriber) Subscribe(ctx context.Context) error {
// Add authentication metadata
ctx = metadata.AppendToOutgoingContext(ctx, "x-api-key", apiToken)
// Create cancellable context
ctx, cancel := context.WithCancel(ctx)
cs.cancel = cancel
// Build subscription request
request := &pb.SubscribeCheckpointsRequest{
ReadMask: &fieldmaskpb.FieldMask{
Paths: []string{
"sequence_number",
"digest",
"timestamp_ms",
"network_total_transactions",
"transactions",
},
},
}
// Start streaming
stream, err := cs.client.SubscribeCheckpoints(ctx, request)
if err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
}
log.Println("✓ Checkpoint subscription started")
// Receive stream data
for {
checkpoint, err := stream.Recv()
if err == io.EOF {
log.Println("Stream ended")
return nil
}
if err != nil {
return fmt.Errorf("stream error: %w", err)
}
cs.handleCheckpoint(checkpoint)
}
}
func (cs *CheckpointSubscriber) handleCheckpoint(checkpoint *pb.Checkpoint) {
timestamp := time.UnixMilli(int64(checkpoint.TimestampMs))
fmt.Println("\n📦 New Checkpoint:")
fmt.Println("================")
fmt.Printf("Sequence: %d\n", checkpoint.SequenceNumber)
fmt.Printf("Digest: %s\n", checkpoint.Digest)
fmt.Printf("Time: %s\n", timestamp.Format(time.RFC3339))
fmt.Printf("Total Transactions: %d\n", checkpoint.NetworkTotalTransactions)
fmt.Printf("Transactions in Checkpoint: %d\n", len(checkpoint.Transactions))
// Process checkpoint
cs.processCheckpoint(checkpoint)
}
func (cs *CheckpointSubscriber) processCheckpoint(checkpoint *pb.Checkpoint) {
// Custom processing logic
// - Store in database
// - Trigger webhooks
// - Update metrics
// - Analyze data
}
func (cs *CheckpointSubscriber) Close() error {
if cs.cancel != nil {
cs.cancel()
}
return cs.conn.Close()
}
func main() {
// Create subscriber
subscriber, err := NewCheckpointSubscriber(endpoint, apiToken)
if err != nil {
log.Fatalf("Failed to create subscriber: %v", err)
}
defer subscriber.Close()
// Setup signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
// Start subscription in goroutine
errChan := make(chan error, 1)
go func() {
errChan <- subscriber.Subscribe(context.Background())
}()
// Wait for signal or error
select {
case <-sigChan:
log.Println("\nShutting down...")
subscriber.Close()
case err := <-errChan:
if err != nil {
log.Fatalf("Subscription error: %v", err)
}
}
log.Println("✓ Disconnected from checkpoint stream")
}
Use Cases#
Real-Time Network Monitor#
Build a live blockchain monitoring dashboard:
class NetworkMonitor {
private checkpointCount: number = 0;
private startTime: number;
private lastSequence: number = 0;
constructor() {
this.startTime = Date.now();
}
processCheckpoint(checkpoint: any): void {
this.checkpointCount++;
const currentSequence = parseInt(checkpoint.sequence_number);
// Calculate metrics
const elapsed = (Date.now() - this.startTime) / 1000;
const checkpointsPerSecond = this.checkpointCount / elapsed;
// Detect missed checkpoints
if (this.lastSequence > 0) {
const gap = currentSequence - this.lastSequence - 1;
if (gap > 0) {
console.warn(`⚠️ Missed ${gap} checkpoint(s)`);
}
}
this.lastSequence = currentSequence;
// Display metrics
console.log('\n📊 Network Metrics:');
console.log(`Checkpoints/sec: ${checkpointsPerSecond.toFixed(2)}`);
console.log(`Total Checkpoints: ${this.checkpointCount}`);
console.log(`Network TPS: ${this.calculateTPS(checkpoint)}`);
}
private calculateTPS(checkpoint: any): number {
const txCount = checkpoint.transactions?.length || 0;
// Assuming ~1 checkpoint per 0.5 seconds
return txCount * 2;
}
}
Transaction Finality Tracker#
Monitor when transactions reach finality:
class FinalityTracker {
private pendingTxs: Set<string> = new Set();
watchTransaction(txDigest: string): void {
this.pendingTxs.add(txDigest);
console.log(`Watching transaction: ${txDigest}`);
}
processCheckpoint(checkpoint: any): void {
const finalizedTxs: string[] = [];
for (const txDigest of checkpoint.transactions || []) {
if (this.pendingTxs.has(txDigest)) {
finalizedTxs.push(txDigest);
this.pendingTxs.delete(txDigest);
}
}
if (finalizedTxs.length > 0) {
console.log('\n✅ Transactions Finalized:');
finalizedTxs.forEach(tx => {
console.log(` ${tx}`);
this.onTransactionFinalized(tx, checkpoint.sequence_number);
});
}
}
private onTransactionFinalized(txDigest: string, checkpoint: string): void {
// Trigger callbacks, update UI, send notifications
}
}
Epoch Transition Detector#
Detect and handle epoch transitions:
class EpochMonitor {
private currentEpoch: number = 0;
processCheckpoint(checkpoint: any): void {
if (checkpoint.end_of_epoch_data) {
const newEpoch = this.currentEpoch + 1;
console.log('\n🔄 Epoch Transition Detected:');
console.log(`Old Epoch: ${this.currentEpoch}`);
console.log(`New Epoch: ${newEpoch}`);
console.log(`Transition Checkpoint: ${checkpoint.sequence_number}`);
this.handleEpochTransition(newEpoch, checkpoint.end_of_epoch_data);
this.currentEpoch = newEpoch;
}
}
private handleEpochTransition(newEpoch: number, epochData: any): void {
// Process epoch change
// - Update validator set
// - Record gas costs
// - Calculate staking rewards
// - Update system parameters
}
}
Blockchain Analytics Pipeline#
Stream checkpoint data to analytics system:
interface CheckpointMetrics {
sequence: number;
timestamp: Date;
transactionCount: number;
totalTransactions: number;
throughput: number;
}
class AnalyticsPipeline {
private metrics: CheckpointMetrics[] = [];
private windowSize: number = 100; // Keep last 100 checkpoints
processCheckpoint(checkpoint: any): void {
const metrics: CheckpointMetrics = {
sequence: parseInt(checkpoint.sequence_number),
timestamp: new Date(parseInt(checkpoint.timestamp_ms)),
transactionCount: checkpoint.transactions?.length || 0,
totalTransactions: parseInt(checkpoint.network_total_transactions),
throughput: this.calculateThroughput(checkpoint)
};
this.metrics.push(metrics);
// Maintain rolling window
if (this.metrics.length > this.windowSize) {
this.metrics.shift();
}
// Compute aggregates
this.computeAggregates();
}
private calculateThroughput(checkpoint: any): number {
const txCount = checkpoint.transactions?.length || 0;
// Approximate TPS based on checkpoint interval
return txCount * 2; // Assuming 0.5s per checkpoint
}
private computeAggregates(): void {
const totalTx = this.metrics.reduce((sum, m) => sum + m.transactionCount, 0);
const avgThroughput = totalTx / this.metrics.length;
console.log('\n📈 Analytics (last 100 checkpoints):');
console.log(`Average TPS: ${avgThroughput.toFixed(2)}`);
console.log(`Total Transactions: ${totalTx}`);
}
}
Stream Management#
Reconnection Strategy#
Implement robust reconnection logic:
class ResilientSubscriber {
private maxRetries: number = 10;
private retryCount: number = 0;
private baseDelay: number = 1000; // 1 second
async connectWithRetry(): Promise<void> {
while (this.retryCount < this.maxRetries) {
try {
await this.connect();
this.retryCount = 0; // Reset on success
return;
} catch (error) {
this.retryCount++;
const delay = Math.min(
this.baseDelay * Math.pow(2, this.retryCount),
30000 // Max 30 seconds
);
console.log(
`Connection failed. Retry ${this.retryCount}/${this.maxRetries} in ${delay}ms`
);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw new Error('Max retries exceeded');
}
private connect(): Promise<void> {
return new Promise((resolve, reject) => {
const stream = client.SubscribeCheckpoints(request, metadata);
stream.on('data', (checkpoint) => {
this.handleCheckpoint(checkpoint);
});
stream.on('error', (error) => {
reject(error);
});
stream.on('end', () => {
resolve();
});
});
}
}
Health Monitoring#
Monitor stream health and detect stalls:
class HealthMonitor {
private lastCheckpointTime: number = Date.now();
private healthCheckInterval: number = 10000; // 10 seconds
private maxStallDuration: number = 30000; // 30 seconds
startMonitoring(): void {
setInterval(() => {
this.checkHealth();
}, this.healthCheckInterval);
}
onCheckpointReceived(): void {
this.lastCheckpointTime = Date.now();
}
private checkHealth(): void {
const stallDuration = Date.now() - this.lastCheckpointTime;
if (stallDuration > this.maxStallDuration) {
console.error('⚠️ Stream stalled! No checkpoints received.');
this.handleStall();
}
}
private handleStall(): void {
// Restart stream, alert operators, switch to backup
}
}
Performance Considerations#
Bandwidth Optimization#
Minimize bandwidth with selective field masking:
| Field Mask | Checkpoint Size | Bandwidth (1000 checkpoints) |
|---|---|---|
| All fields | ~2.5 KB | 2.5 MB |
| Standard (5 fields) | ~400 bytes | 400 KB |
| Minimal (2 fields) | ~80 bytes | 80 KB |
Recommendation: Request only fields needed for your use case.
Backpressure Handling#
Process checkpoints faster than they arrive:
class BackpressureHandler {
private queue: any[] = [];
private processing: boolean = false;
private maxQueueSize: number = 100;
async onCheckpoint(checkpoint: any): Promise<void> {
if (this.queue.length >= this.maxQueueSize) {
console.warn('⚠️ Queue full, dropping checkpoint');
return;
}
this.queue.push(checkpoint);
if (!this.processing) {
this.processQueue();
}
}
private async processQueue(): Promise<void> {
this.processing = true;
while (this.queue.length > 0) {
const checkpoint = this.queue.shift();
await this.processCheckpoint(checkpoint);
}
this.processing = false;
}
private async processCheckpoint(checkpoint: any): Promise<void> {
// Heavy processing logic
}
}
Error Handling#
Common streaming errors and solutions:
stream.on('error', (error: any) => {
switch (error.code) {
case grpc.status.UNAVAILABLE:
console.error('Service unavailable. Reconnecting...');
this.reconnect();
break;
case grpc.status.PERMISSION_DENIED:
console.error('Authentication failed. Check API token.');
// Don't reconnect, fix authentication
break;
case grpc.status.RESOURCE_EXHAUSTED:
console.error('Rate limit exceeded. Backoff required.');
this.reconnectWithBackoff();
break;
case grpc.status.CANCELLED:
console.log('Stream cancelled by client');
// Expected during shutdown
break;
default:
console.error('Unexpected error:', error.message);
this.reconnect();
}
});
Best Practices#
Implement Graceful Shutdown#
Handle termination signals properly:
let isShuttingDown = false;
process.on('SIGINT', async () => {
if (isShuttingDown) return;
isShuttingDown = true;
console.log('\nGraceful shutdown initiated...');
// Cancel stream
stream.cancel();
// Flush pending data
await flushPendingData();
// Close connections
await closeConnections();
process.exit(0);
});
Monitor Stream Health#
Track metrics and detect issues:
const metrics = {
checkpointsReceived: 0,
lastCheckpointTime: Date.now(),
errors: 0,
reconnections: 0
};
setInterval(() => {
console.log('Stream Metrics:', metrics);
if (Date.now() - metrics.lastCheckpointTime > 60000) {
console.error('No checkpoints for 60s!');
}
}, 30000);
Use Field Masking#
Request only necessary fields:
// ✅ Good: Only request needed fields
const request = {
read_mask: {
paths: ['sequence_number', 'transactions']
}
};
// ❌ Bad: Request all fields
const request = {}; // Wastes bandwidth
Related Methods#
- GetCheckpoint - Query historical checkpoints
- GetTransaction - Get transaction details from checkpoint
Performance Benefits#
gRPC Streaming vs HTTP Polling:
| Metric | gRPC Streaming | HTTP Polling (5s interval) |
|---|---|---|
| Latency | Less than 1s | 2.5s average |
| Bandwidth (1 hour) | 1.4 MB | 8.6 MB |
| Overhead | Minimal (single connection) | High (720 requests) |
| Missed Updates | None | Possible |
Need help with real-time streaming? Contact our support team or check the gRPC overview.