sui_getCheckpoint
Retrieves detailed information about a specific checkpoint on the Sui blockchain, including transactions, network metrics, and consensus data.
Overview​
The sui_getCheckpoint
method provides comprehensive information about Sui network checkpoints. Checkpoints represent finalized batches of transactions that have been agreed upon by the network consensus. This method is essential for blockchain analytics, network monitoring, historical data analysis, and understanding the state progression of the Sui blockchain.
Parameters​
Parameter | Type | Required | Description |
---|---|---|---|
id | string | Yes | Checkpoint identifier - can be checkpoint sequence number or digest |
Checkpoint Identifier Types​
You can query checkpoints using either:
- Sequence Number: String representation of the checkpoint number (e.g., "1234567")
- Digest: Checkpoint content hash (e.g., "0x123abc...")
Returns​
Returns a comprehensive checkpoint object containing all checkpoint information.
Field | Type | Description |
---|---|---|
epoch | string | Epoch number containing this checkpoint |
sequenceNumber | string | Sequential checkpoint number |
digest | string | Content hash of the checkpoint |
networkTotalTransactions | string | Total transactions processed by the network at this checkpoint |
previousDigest | string | Digest of the previous checkpoint |
timestampMs | string | Checkpoint timestamp in milliseconds |
transactions | array | Array of transaction digests included in this checkpoint |
checkpointCommitments | array | Validator commitments for this checkpoint |
validatorSignature | string | Aggregated validator signature |
endOfEpochData | object | End-of-epoch information (if applicable) |
End of Epoch Data​
When a checkpoint marks the end of an epoch, additional information is included:
Field | Type | Description |
---|---|---|
nextEpochCommittee | array | Validator committee for the next epoch |
nextEpochProtocolVersion | string | Protocol version for the next epoch |
epochCommitments | array | Epoch-level commitments |
Code Examples​
- cURL
- JavaScript
- Python
# Get checkpoint by sequence number
curl -X POST https://sui-mainnet.dwellir.com/YOUR_API_KEY \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"method": "sui_getCheckpoint",
"params": [
"1234567"
],
"id": 1
}'
# Get checkpoint by digest
curl -X POST https://sui-mainnet.dwellir.com/YOUR_API_KEY \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"method": "sui_getCheckpoint",
"params": [
"0x8c123c0b23c456789abcdef0123456789abcdef0123456789abcdef0123456789a"
],
"id": 1
}'
# Get latest checkpoint (using highest known sequence number)
curl -X POST https://sui-mainnet.dwellir.com/YOUR_API_KEY \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"method": "sui_getLatestCheckpointSequenceNumber",
"params": [],
"id": 1
}'
# Then use that number to get the checkpoint
curl -X POST https://sui-mainnet.dwellir.com/YOUR_API_KEY \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"method": "sui_getCheckpoint",
"params": [
"LATEST_SEQUENCE_NUMBER"
],
"id": 2
}'
import { SuiClient } from '@mysten/sui.js/client';
const client = new SuiClient({
url: 'https://sui-mainnet.dwellir.com/YOUR_API_KEY'
});
// Get checkpoint by sequence number
async function getCheckpointBySequence(sequenceNumber) {
try {
const checkpoint = await client.getCheckpoint({
id: sequenceNumber.toString()
});
console.log(`Checkpoint ${checkpoint.sequenceNumber}:`);
console.log(` Epoch: ${checkpoint.epoch}`);
console.log(` Timestamp: ${new Date(parseInt(checkpoint.timestampMs))}`);
console.log(` Transactions: ${checkpoint.transactions.length}`);
console.log(` Network Total Txs: ${checkpoint.networkTotalTransactions}`);
return checkpoint;
} catch (error) {
console.error(`Failed to get checkpoint ${sequenceNumber}:`, error);
return null;
}
}
// Get checkpoint by digest
async function getCheckpointByDigest(digest) {
try {
return await client.getCheckpoint({ id: digest });
} catch (error) {
console.error(`Failed to get checkpoint ${digest}:`, error);
return null;
}
}
// Get latest checkpoint
async function getLatestCheckpoint() {
try {
const latestSequence = await client.getLatestCheckpointSequenceNumber();
return await getCheckpointBySequence(latestSequence);
} catch (error) {
console.error('Failed to get latest checkpoint:', error);
return null;
}
}
// Checkpoint range analysis
async function analyzeCheckpointRange(startSeq, endSeq) {
const analysis = {
startSequence: startSeq,
endSequence: endSeq,
totalCheckpoints: endSeq - startSeq + 1,
totalTransactions: 0,
averageTxPerCheckpoint: 0,
timeSpan: 0,
epochsSpanned: new Set(),
checkpointDetails: []
};
const checkpoints = [];
for (let seq = startSeq; seq <= endSeq; seq++) {
const checkpoint = await getCheckpointBySequence(seq);
if (checkpoint) {
checkpoints.push(checkpoint);
analysis.totalTransactions += checkpoint.transactions.length;
analysis.epochsSpanned.add(checkpoint.epoch);
analysis.checkpointDetails.push({
sequence: checkpoint.sequenceNumber,
epoch: checkpoint.epoch,
timestamp: checkpoint.timestampMs,
txCount: checkpoint.transactions.length,
networkTotalTxs: checkpoint.networkTotalTransactions
});
}
}
if (checkpoints.length > 0) {
analysis.averageTxPerCheckpoint = analysis.totalTransactions / checkpoints.length;
const startTime = parseInt(checkpoints[0].timestampMs);
const endTime = parseInt(checkpoints[checkpoints.length - 1].timestampMs);
analysis.timeSpan = endTime - startTime;
analysis.timeSpanReadable = `${Math.round(analysis.timeSpan / 1000)} seconds`;
}
analysis.epochsSpanned = analysis.epochsSpanned.size;
return analysis;
}
// Network metrics from checkpoints
class NetworkMetricsTracker {
constructor(client) {
this.client = client;
this.metrics = {
currentEpoch: null,
checkpointsPerEpoch: {},
transactionThroughput: [],
networkGrowth: [],
validatorActivity: {}
};
}
async trackMetrics(checkpointCount = 100) {
const latestSequence = await this.client.getLatestCheckpointSequenceNumber();
const startSequence = Math.max(1, parseInt(latestSequence) - checkpointCount + 1);
console.log(`Tracking metrics from checkpoint ${startSequence} to ${latestSequence}`);
const checkpoints = [];
for (let seq = startSequence; seq <= parseInt(latestSequence); seq++) {
const checkpoint = await getCheckpointBySequence(seq);
if (checkpoint) {
checkpoints.push(checkpoint);
}
// Small delay to prevent overwhelming the RPC
if (seq % 10 === 0) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}
this.analyzeCheckpoints(checkpoints);
return this.metrics;
}
analyzeCheckpoints(checkpoints) {
if (checkpoints.length === 0) return;
this.metrics.currentEpoch = checkpoints[checkpoints.length - 1].epoch;
// Analyze per-epoch metrics
checkpoints.forEach(checkpoint => {
const epoch = checkpoint.epoch;
if (!this.metrics.checkpointsPerEpoch[epoch]) {
this.metrics.checkpointsPerEpoch[epoch] = {
count: 0,
totalTransactions: 0,
startTime: null,
endTime: null
};
}
const epochMetrics = this.metrics.checkpointsPerEpoch[epoch];
epochMetrics.count++;
epochMetrics.totalTransactions += checkpoint.transactions.length;
const timestamp = parseInt(checkpoint.timestampMs);
if (!epochMetrics.startTime || timestamp < epochMetrics.startTime) {
epochMetrics.startTime = timestamp;
}
if (!epochMetrics.endTime || timestamp > epochMetrics.endTime) {
epochMetrics.endTime = timestamp;
}
});
// Calculate throughput metrics
let previousTimestamp = null;
let previousNetworkTxs = null;
checkpoints.forEach(checkpoint => {
const timestamp = parseInt(checkpoint.timestampMs);
const networkTxs = parseInt(checkpoint.networkTotalTransactions);
if (previousTimestamp && previousNetworkTxs) {
const timeDiff = timestamp - previousTimestamp; // milliseconds
const txDiff = networkTxs - previousNetworkTxs;
if (timeDiff > 0) {
const tps = (txDiff / timeDiff) * 1000; // transactions per second
this.metrics.transactionThroughput.push({
checkpoint: checkpoint.sequenceNumber,
timestamp: timestamp,
tps: tps,
transactionCount: txDiff
});
}
}
previousTimestamp = timestamp;
previousNetworkTxs = networkTxs;
// Track network growth
this.metrics.networkGrowth.push({
checkpoint: checkpoint.sequenceNumber,
timestamp: timestamp,
totalTransactions: networkTxs,
epoch: checkpoint.epoch
});
});
// Calculate average TPS
if (this.metrics.transactionThroughput.length > 0) {
const totalTPS = this.metrics.transactionThroughput.reduce(
(sum, metric) => sum + metric.tps, 0
);
this.metrics.averageTPS = totalTPS / this.metrics.transactionThroughput.length;
}
}
getEpochSummary(epoch) {
const epochData = this.metrics.checkpointsPerEpoch[epoch];
if (!epochData) return null;
const duration = epochData.endTime - epochData.startTime;
const avgTxPerCheckpoint = epochData.totalTransactions / epochData.count;
return {
epoch: epoch,
checkpointCount: epochData.count,
totalTransactions: epochData.totalTransactions,
averageTransactionsPerCheckpoint: avgTxPerCheckpoint,
durationMs: duration,
durationReadable: `${Math.round(duration / 1000)} seconds`
};
}
getNetworkSummary() {
const epochs = Object.keys(this.metrics.checkpointsPerEpoch);
const latestGrowth = this.metrics.networkGrowth[this.metrics.networkGrowth.length - 1];
return {
currentEpoch: this.metrics.currentEpoch,
epochsAnalyzed: epochs.length,
totalNetworkTransactions: latestGrowth?.totalTransactions || 0,
averageTPS: this.metrics.averageTPS || 0,
checkpointsAnalyzed: this.metrics.networkGrowth.length,
timeSpan: {
start: this.metrics.networkGrowth[0]?.timestamp || 0,
end: latestGrowth?.timestamp || 0,
duration: (latestGrowth?.timestamp || 0) - (this.metrics.networkGrowth[0]?.timestamp || 0)
}
};
}
}
// Checkpoint monitoring and alerts
class CheckpointMonitor {
constructor(client) {
this.client = client;
this.lastCheckpoint = null;
this.isMonitoring = false;
this.alerts = [];
this.thresholds = {
maxTransactionsPerCheckpoint: 10000,
minTimeBetweenCheckpoints: 1000, // ms
maxTimeBetweenCheckpoints: 30000 // ms
};
}
async startMonitoring(intervalMs = 5000) {
if (this.isMonitoring) return;
console.log('Starting checkpoint monitoring...');
this.isMonitoring = true;
// Get initial checkpoint
this.lastCheckpoint = await getLatestCheckpoint();
const monitorLoop = async () => {
if (!this.isMonitoring) return;
try {
const currentCheckpoint = await getLatestCheckpoint();
if (currentCheckpoint &&
(!this.lastCheckpoint ||
currentCheckpoint.sequenceNumber !== this.lastCheckpoint.sequenceNumber)) {
await this.analyzeNewCheckpoint(currentCheckpoint);
this.lastCheckpoint = currentCheckpoint;
}
} catch (error) {
console.error('Error in checkpoint monitoring:', error);
}
setTimeout(monitorLoop, intervalMs);
};
setTimeout(monitorLoop, intervalMs);
}
async analyzeNewCheckpoint(checkpoint) {
console.log(`📦 New checkpoint: ${checkpoint.sequenceNumber}`);
console.log(` Epoch: ${checkpoint.epoch}`);
console.log(` Transactions: ${checkpoint.transactions.length}`);
console.log(` Network Total: ${checkpoint.networkTotalTransactions}`);
// Check for alerts
await this.checkThresholds(checkpoint);
// Emit events
this.onNewCheckpoint?.(checkpoint);
}
async checkThresholds(checkpoint) {
const alerts = [];
// Check transaction count
if (checkpoint.transactions.length > this.thresholds.maxTransactionsPerCheckpoint) {
alerts.push({
type: 'HIGH_TRANSACTION_COUNT',
checkpoint: checkpoint.sequenceNumber,
value: checkpoint.transactions.length,
threshold: this.thresholds.maxTransactionsPerCheckpoint,
message: `Checkpoint has ${checkpoint.transactions.length} transactions (threshold: ${this.thresholds.maxTransactionsPerCheckpoint})`
});
}
// Check timing between checkpoints
if (this.lastCheckpoint) {
const timeDiff = parseInt(checkpoint.timestampMs) - parseInt(this.lastCheckpoint.timestampMs);
if (timeDiff < this.thresholds.minTimeBetweenCheckpoints) {
alerts.push({
type: 'FAST_CHECKPOINT_CREATION',
checkpoint: checkpoint.sequenceNumber,
value: timeDiff,
threshold: this.thresholds.minTimeBetweenCheckpoints,
message: `Only ${timeDiff}ms between checkpoints (min threshold: ${this.thresholds.minTimeBetweenCheckpoints}ms)`
});
}
if (timeDiff > this.thresholds.maxTimeBetweenCheckpoints) {
alerts.push({
type: 'SLOW_CHECKPOINT_CREATION',
checkpoint: checkpoint.sequenceNumber,
value: timeDiff,
threshold: this.thresholds.maxTimeBetweenCheckpoints,
message: `${timeDiff}ms between checkpoints (max threshold: ${this.thresholds.maxTimeBetweenCheckpoints}ms)`
});
}
}
// Check for end of epoch
if (checkpoint.endOfEpochData) {
alerts.push({
type: 'END_OF_EPOCH',
checkpoint: checkpoint.sequenceNumber,
epoch: checkpoint.epoch,
message: `End of epoch ${checkpoint.epoch} detected`
});
}
// Log alerts
alerts.forEach(alert => {
console.log(`🚨 Alert: ${alert.type} - ${alert.message}`);
this.alerts.push({
...alert,
timestamp: Date.now()
});
});
// Keep only recent alerts
const hourAgo = Date.now() - (60 * 60 * 1000);
this.alerts = this.alerts.filter(alert => alert.timestamp > hourAgo);
return alerts;
}
stopMonitoring() {
this.isMonitoring = false;
console.log('Checkpoint monitoring stopped');
}
getRecentAlerts(hours = 1) {
const cutoff = Date.now() - (hours * 60 * 60 * 1000);
return this.alerts.filter(alert => alert.timestamp > cutoff);
}
}
// Historical analysis
async function analyzeNetworkHistory(daysBack = 7) {
const latestSequence = await client.getLatestCheckpointSequenceNumber();
const latestCheckpoint = await getCheckpointBySequence(latestSequence);
if (!latestCheckpoint) {
throw new Error('Failed to get latest checkpoint');
}
const endTime = parseInt(latestCheckpoint.timestampMs);
const startTime = endTime - (daysBack * 24 * 60 * 60 * 1000);
console.log(`Analyzing network history from ${new Date(startTime)} to ${new Date(endTime)}`);
// Binary search to find approximate starting checkpoint
let startSeq = Math.max(1, parseInt(latestSequence) - (daysBack * 24 * 60 * 10)); // Rough estimate
let endSeq = parseInt(latestSequence);
// Find more accurate start sequence
while (startSeq < endSeq - 1) {
const midSeq = Math.floor((startSeq + endSeq) / 2);
const midCheckpoint = await getCheckpointBySequence(midSeq);
if (midCheckpoint && parseInt(midCheckpoint.timestampMs) > startTime) {
endSeq = midSeq;
} else {
startSeq = midSeq;
}
await new Promise(resolve => setTimeout(resolve, 50));
}
console.log(`Found starting checkpoint: ${startSeq}`);
// Analyze the range
const analysis = await analyzeCheckpointRange(startSeq, parseInt(latestSequence));
return {
period: `${daysBack} days`,
startTime: new Date(startTime).toISOString(),
endTime: new Date(endTime).toISOString(),
...analysis
};
}
// Usage examples
// Get specific checkpoint
const checkpoint = await getCheckpointBySequence(1000000);
console.log('Checkpoint:', checkpoint?.sequenceNumber);
// Get latest checkpoint
const latest = await getLatestCheckpoint();
console.log('Latest checkpoint:', latest?.sequenceNumber);
// Analyze checkpoint range
const rangeAnalysis = await analyzeCheckpointRange(1000, 1100);
console.log('Range Analysis:', rangeAnalysis);
// Network metrics tracking
const metricsTracker = new NetworkMetricsTracker(client);
const metrics = await metricsTracker.trackMetrics(50);
console.log('Network Summary:', metricsTracker.getNetworkSummary());
// Checkpoint monitoring
const monitor = new CheckpointMonitor(client);
monitor.onNewCheckpoint = (checkpoint) => {
console.log(`🎯 New checkpoint processed: ${checkpoint.sequenceNumber}`);
};
await monitor.startMonitoring(10000); // Check every 10 seconds
// Historical analysis
const history = await analyzeNetworkHistory(1); // Last 1 day
console.log('Network History:', history);
import requests
import json
import time
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from collections import defaultdict
import threading
@dataclass
class CheckpointAnalysis:
start_sequence: int
end_sequence: int
total_checkpoints: int
total_transactions: int
average_tx_per_checkpoint: float
time_span_ms: int
epochs_spanned: int
checkpoint_details: List[Dict[str, Any]] = field(default_factory=list)
class SuiCheckpointClient:
def __init__(self, rpc_url: str):
self.rpc_url = rpc_url
def get_checkpoint(self, checkpoint_id: Union[str, int]) -> Optional[Dict[str, Any]]:
"""Get checkpoint by sequence number or digest"""
payload = {
"jsonrpc": "2.0",
"method": "sui_getCheckpoint",
"params": [str(checkpoint_id)],
"id": 1
}
try:
response = requests.post(
self.rpc_url,
headers={'Content-Type': 'application/json'},
data=json.dumps(payload),
timeout=30
)
result = response.json()
if 'error' in result:
print(f"RPC Error for checkpoint {checkpoint_id}: {result['error']}")
return None
return result['result']
except Exception as e:
print(f"Error fetching checkpoint {checkpoint_id}: {e}")
return None
def get_latest_checkpoint_sequence(self) -> Optional[int]:
"""Get the latest checkpoint sequence number"""
payload = {
"jsonrpc": "2.0",
"method": "sui_getLatestCheckpointSequenceNumber",
"params": [],
"id": 1
}
try:
response = requests.post(
self.rpc_url,
headers={'Content-Type': 'application/json'},
data=json.dumps(payload),
timeout=30
)
result = response.json()
if 'error' in result:
print(f"RPC Error getting latest sequence: {result['error']}")
return None
return int(result['result'])
except Exception as e:
print(f"Error fetching latest checkpoint sequence: {e}")
return None
def get_latest_checkpoint(self) -> Optional[Dict[str, Any]]:
"""Get the latest checkpoint"""
latest_seq = self.get_latest_checkpoint_sequence()
if latest_seq is None:
return None
return self.get_checkpoint(latest_seq)
def analyze_checkpoint_range(
self,
start_seq: int,
end_seq: int
) -> CheckpointAnalysis:
"""Analyze a range of checkpoints"""
print(f"Analyzing checkpoints {start_seq} to {end_seq}")
checkpoints = []
total_transactions = 0
epochs = set()
for seq in range(start_seq, end_seq + 1):
checkpoint = self.get_checkpoint(seq)
if checkpoint:
checkpoints.append(checkpoint)
total_transactions += len(checkpoint.get('transactions', []))
epochs.add(checkpoint['epoch'])
# Small delay to prevent overwhelming the RPC
if seq % 10 == 0:
time.sleep(0.1)
if not checkpoints:
raise ValueError("No checkpoints found in range")
# Calculate time span
start_time = int(checkpoints[0]['timestampMs'])
end_time = int(checkpoints[-1]['timestampMs'])
time_span = end_time - start_time
# Create detailed list
checkpoint_details = [
{
'sequence': cp['sequenceNumber'],
'epoch': cp['epoch'],
'timestamp': cp['timestampMs'],
'tx_count': len(cp.get('transactions', [])),
'network_total_txs': cp['networkTotalTransactions'],
'digest': cp['digest']
}
for cp in checkpoints
]
return CheckpointAnalysis(
start_sequence=start_seq,
end_sequence=end_seq,
total_checkpoints=len(checkpoints),
total_transactions=total_transactions,
average_tx_per_checkpoint=total_transactions / len(checkpoints),
time_span_ms=time_span,
epochs_spanned=len(epochs),
checkpoint_details=checkpoint_details
)
class NetworkMetricsTracker:
def __init__(self, client: SuiCheckpointClient):
self.client = client
self.metrics = {
'current_epoch': None,
'checkpoints_per_epoch': defaultdict(lambda: {
'count': 0,
'total_transactions': 0,
'start_time': None,
'end_time': None
}),
'transaction_throughput': [],
'network_growth': [],
'average_tps': 0
}
def track_metrics(self, checkpoint_count: int = 100) -> Dict[str, Any]:
"""Track network metrics from recent checkpoints"""
latest_sequence = self.client.get_latest_checkpoint_sequence()
if not latest_sequence:
raise ValueError("Failed to get latest checkpoint sequence")
start_sequence = max(1, latest_sequence - checkpoint_count + 1)
print(f"Tracking metrics from checkpoint {start_sequence} to {latest_sequence}")
checkpoints = []
for seq in range(start_sequence, latest_sequence + 1):
checkpoint = self.client.get_checkpoint(seq)
if checkpoint:
checkpoints.append(checkpoint)
# Progress indicator
if seq % 20 == 0:
print(f" Processed {seq - start_sequence + 1}/{checkpoint_count} checkpoints")
time.sleep(0.1)
self._analyze_checkpoints(checkpoints)
return self.metrics
def _analyze_checkpoints(self, checkpoints: List[Dict[str, Any]]):
"""Analyze checkpoint data and update metrics"""
if not checkpoints:
return
self.metrics['current_epoch'] = checkpoints[-1]['epoch']
# Analyze per-epoch metrics
for checkpoint in checkpoints:
epoch = checkpoint['epoch']
epoch_metrics = self.metrics['checkpoints_per_epoch'][epoch]
epoch_metrics['count'] += 1
epoch_metrics['total_transactions'] += len(checkpoint.get('transactions', []))
timestamp = int(checkpoint['timestampMs'])
if epoch_metrics['start_time'] is None or timestamp < epoch_metrics['start_time']:
epoch_metrics['start_time'] = timestamp
if epoch_metrics['end_time'] is None or timestamp > epoch_metrics['end_time']:
epoch_metrics['end_time'] = timestamp
# Calculate throughput metrics
previous_timestamp = None
previous_network_txs = None
for checkpoint in checkpoints:
timestamp = int(checkpoint['timestampMs'])
network_txs = int(checkpoint['networkTotalTransactions'])
if previous_timestamp is not None and previous_network_txs is not None:
time_diff = timestamp - previous_timestamp # milliseconds
tx_diff = network_txs - previous_network_txs
if time_diff > 0:
tps = (tx_diff / time_diff) * 1000 # transactions per second
self.metrics['transaction_throughput'].append({
'checkpoint': checkpoint['sequenceNumber'],
'timestamp': timestamp,
'tps': tps,
'transaction_count': tx_diff
})
previous_timestamp = timestamp
previous_network_txs = network_txs
# Track network growth
self.metrics['network_growth'].append({
'checkpoint': checkpoint['sequenceNumber'],
'timestamp': timestamp,
'total_transactions': network_txs,
'epoch': checkpoint['epoch']
})
# Calculate average TPS
if self.metrics['transaction_throughput']:
total_tps = sum(metric['tps'] for metric in self.metrics['transaction_throughput'])
self.metrics['average_tps'] = total_tps / len(self.metrics['transaction_throughput'])
def get_epoch_summary(self, epoch: str) -> Optional[Dict[str, Any]]:
"""Get summary for a specific epoch"""
epoch_data = self.metrics['checkpoints_per_epoch'].get(epoch)
if not epoch_data:
return None
duration = epoch_data['end_time'] - epoch_data['start_time'] if epoch_data['start_time'] and epoch_data['end_time'] else 0
avg_tx_per_checkpoint = epoch_data['total_transactions'] / epoch_data['count'] if epoch_data['count'] > 0 else 0
return {
'epoch': epoch,
'checkpoint_count': epoch_data['count'],
'total_transactions': epoch_data['total_transactions'],
'average_transactions_per_checkpoint': avg_tx_per_checkpoint,
'duration_ms': duration,
'duration_readable': f"{duration / 1000:.1f} seconds"
}
def get_network_summary(self) -> Dict[str, Any]:
"""Get overall network summary"""
epochs = list(self.metrics['checkpoints_per_epoch'].keys())
latest_growth = self.metrics['network_growth'][-1] if self.metrics['network_growth'] else {}
time_span = 0
if len(self.metrics['network_growth']) >= 2:
start_time = self.metrics['network_growth'][0]['timestamp']
end_time = self.metrics['network_growth'][-1]['timestamp']
time_span = end_time - start_time
return {
'current_epoch': self.metrics['current_epoch'],
'epochs_analyzed': len(epochs),
'total_network_transactions': latest_growth.get('total_transactions', 0),
'average_tps': self.metrics['average_tps'],
'checkpoints_analyzed': len(self.metrics['network_growth']),
'time_span': {
'start': self.metrics['network_growth'][0]['timestamp'] if self.metrics['network_growth'] else 0,
'end': latest_growth.get('timestamp', 0),
'duration_ms': time_span,
'duration_readable': f"{time_span / 1000:.1f} seconds"
}
}
class CheckpointMonitor:
def __init__(self, client: SuiCheckpointClient):
self.client = client
self.last_checkpoint = None
self.is_monitoring = False
self.alerts = []
self.thresholds = {
'max_transactions_per_checkpoint': 10000,
'min_time_between_checkpoints': 1000, # ms
'max_time_between_checkpoints': 30000 # ms
}
self.monitor_thread = None
def start_monitoring(
self,
interval_seconds: int = 5,
max_duration_minutes: int = 60
):
"""Start monitoring checkpoints for alerts"""
if self.is_monitoring:
print("Monitoring already in progress")
return
print(f"Starting checkpoint monitoring (max {max_duration_minutes} minutes)")
self.is_monitoring = True
# Get initial checkpoint
self.last_checkpoint = self.client.get_latest_checkpoint()
def monitor_loop():
start_time = time.time()
max_duration = max_duration_minutes * 60
while self.is_monitoring and (time.time() - start_time) < max_duration:
try:
current_checkpoint = self.client.get_latest_checkpoint()
if (current_checkpoint and
(not self.last_checkpoint or
current_checkpoint['sequenceNumber'] != self.last_checkpoint['sequenceNumber'])):
self._analyze_new_checkpoint(current_checkpoint)
self.last_checkpoint = current_checkpoint
time.sleep(interval_seconds)
except Exception as e:
print(f"Error in checkpoint monitoring: {e}")
time.sleep(interval_seconds)
self.is_monitoring = False
print("Checkpoint monitoring completed")
self.monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
self.monitor_thread.start()
def _analyze_new_checkpoint(self, checkpoint: Dict[str, Any]):
"""Analyze new checkpoint and generate alerts"""
print(f"📦 New checkpoint: {checkpoint['sequenceNumber']}")
print(f" Epoch: {checkpoint['epoch']}")
print(f" Transactions: {len(checkpoint.get('transactions', []))}")
print(f" Network Total: {checkpoint['networkTotalTransactions']}")
# Check for alerts
alerts = self._check_thresholds(checkpoint)
# Log alerts
for alert in alerts:
print(f"🚨 Alert: {alert['type']} - {alert['message']}")
self.alerts.append({
**alert,
'timestamp': time.time()
})
# Keep only recent alerts (last hour)
hour_ago = time.time() - 3600
self.alerts = [alert for alert in self.alerts if alert['timestamp'] > hour_ago]
def _check_thresholds(self, checkpoint: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Check checkpoint against thresholds and generate alerts"""
alerts = []
# Check transaction count
tx_count = len(checkpoint.get('transactions', []))
if tx_count > self.thresholds['max_transactions_per_checkpoint']:
alerts.append({
'type': 'HIGH_TRANSACTION_COUNT',
'checkpoint': checkpoint['sequenceNumber'],
'value': tx_count,
'threshold': self.thresholds['max_transactions_per_checkpoint'],
'message': f"Checkpoint has {tx_count} transactions (threshold: {self.thresholds['max_transactions_per_checkpoint']})"
})
# Check timing between checkpoints
if self.last_checkpoint:
time_diff = int(checkpoint['timestampMs']) - int(self.last_checkpoint['timestampMs'])
if time_diff < self.thresholds['min_time_between_checkpoints']:
alerts.append({
'type': 'FAST_CHECKPOINT_CREATION',
'checkpoint': checkpoint['sequenceNumber'],
'value': time_diff,
'threshold': self.thresholds['min_time_between_checkpoints'],
'message': f"Only {time_diff}ms between checkpoints (min threshold: {self.thresholds['min_time_between_checkpoints']}ms)"
})
if time_diff > self.thresholds['max_time_between_checkpoints']:
alerts.append({
'type': 'SLOW_CHECKPOINT_CREATION',
'checkpoint': checkpoint['sequenceNumber'],
'value': time_diff,
'threshold': self.thresholds['max_time_between_checkpoints'],
'message': f"{time_diff}ms between checkpoints (max threshold: {self.thresholds['max_time_between_checkpoints']}ms)"
})
# Check for end of epoch
if checkpoint.get('endOfEpochData'):
alerts.append({
'type': 'END_OF_EPOCH',
'checkpoint': checkpoint['sequenceNumber'],
'epoch': checkpoint['epoch'],
'message': f"End of epoch {checkpoint['epoch']} detected"
})
return alerts
def stop_monitoring(self):
"""Stop checkpoint monitoring"""
self.is_monitoring = False
if self.monitor_thread:
self.monitor_thread.join(timeout=5)
print("Checkpoint monitoring stopped")
def get_recent_alerts(self, hours: int = 1) -> List[Dict[str, Any]]:
"""Get alerts from the last N hours"""
cutoff = time.time() - (hours * 3600)
return [alert for alert in self.alerts if alert['timestamp'] > cutoff]
class NetworkHistoryAnalyzer:
def __init__(self, client: SuiCheckpointClient):
self.client = client
def analyze_network_history(self, days_back: int = 7) -> Dict[str, Any]:
"""Analyze network history over specified time period"""
latest_checkpoint = self.client.get_latest_checkpoint()
if not latest_checkpoint:
raise ValueError("Failed to get latest checkpoint")
end_time = int(latest_checkpoint['timestampMs'])
start_time = end_time - (days_back * 24 * 60 * 60 * 1000)
print(f"Analyzing network history from {datetime.fromtimestamp(start_time/1000)} to {datetime.fromtimestamp(end_time/1000)}")
# Binary search to find approximate starting checkpoint
latest_seq = int(latest_checkpoint['sequenceNumber'])
start_seq = self._find_checkpoint_by_time(start_time, 1, latest_seq)
print(f"Found starting checkpoint: {start_seq}")
# Analyze the range
analysis = self.client.analyze_checkpoint_range(start_seq, latest_seq)
return {
'period': f"{days_back} days",
'start_time': datetime.fromtimestamp(start_time/1000).isoformat(),
'end_time': datetime.fromtimestamp(end_time/1000).isoformat(),
'start_sequence': analysis.start_sequence,
'end_sequence': analysis.end_sequence,
'total_checkpoints': analysis.total_checkpoints,
'total_transactions': analysis.total_transactions,
'average_tx_per_checkpoint': analysis.average_tx_per_checkpoint,
'time_span_ms': analysis.time_span_ms,
'epochs_spanned': analysis.epochs_spanned,
'checkpoints_per_day': analysis.total_checkpoints / days_back,
'transactions_per_day': analysis.total_transactions / days_back
}
def _find_checkpoint_by_time(self, target_time: int, start_seq: int, end_seq: int) -> int:
"""Binary search to find checkpoint near target time"""
while start_seq < end_seq - 1:
mid_seq = (start_seq + end_seq) // 2
mid_checkpoint = self.client.get_checkpoint(mid_seq)
if mid_checkpoint and int(mid_checkpoint['timestampMs']) > target_time:
end_seq = mid_seq
else:
start_seq = mid_seq
time.sleep(0.05) # Small delay
return start_seq
# Usage examples
client = SuiCheckpointClient('https://sui-mainnet.dwellir.com/YOUR_API_KEY')
# Example 1: Get specific checkpoint
checkpoint = client.get_checkpoint(1000000)
if checkpoint:
print(f"Checkpoint {checkpoint['sequenceNumber']}:")
print(f" Epoch: {checkpoint['epoch']}")
print(f" Transactions: {len(checkpoint.get('transactions', []))}")
print(f" Timestamp: {datetime.fromtimestamp(int(checkpoint['timestampMs'])/1000)}")
# Example 2: Get latest checkpoint
latest = client.get_latest_checkpoint()
if latest:
print(f"\nLatest checkpoint: {latest['sequenceNumber']}")
print(f" Network total transactions: {latest['networkTotalTransactions']}")
# Example 3: Analyze checkpoint range
try:
range_analysis = client.analyze_checkpoint_range(1000, 1100)
print(f"\nRange Analysis (1000-1100):")
print(f" Total checkpoints: {range_analysis.total_checkpoints}")
print(f" Total transactions: {range_analysis.total_transactions}")
print(f" Average tx per checkpoint: {range_analysis.average_tx_per_checkpoint:.2f}")
print(f" Time span: {range_analysis.time_span_ms / 1000:.1f} seconds")
print(f" Epochs spanned: {range_analysis.epochs_spanned}")
except Exception as e:
print(f"Error in range analysis: {e}")
# Example 4: Network metrics tracking
metrics_tracker = NetworkMetricsTracker(client)
try:
metrics = metrics_tracker.track_metrics(50)
summary = metrics_tracker.get_network_summary()
print(f"\nNetwork Summary:")
print(f" Current epoch: {summary['current_epoch']}")
print(f" Epochs analyzed: {summary['epochs_analyzed']}")
print(f" Total network transactions: {summary['total_network_transactions']}")
print(f" Average TPS: {summary['average_tps']:.2f}")
print(f" Checkpoints analyzed: {summary['checkpoints_analyzed']}")
except Exception as e:
print(f"Error in metrics tracking: {e}")
# Example 5: Checkpoint monitoring
monitor = CheckpointMonitor(client)
# Start monitoring for 2 minutes
monitor.start_monitoring(interval_seconds=10, max_duration_minutes=2)
# Wait for monitoring to complete
time.sleep(130)
# Check for any alerts
recent_alerts = monitor.get_recent_alerts()
print(f"\nRecent alerts: {len(recent_alerts)}")
for alert in recent_alerts:
print(f" {alert['type']}: {alert['message']}")
# Example 6: Historical analysis
analyzer = NetworkHistoryAnalyzer(client)
try:
history = analyzer.analyze_network_history(1) # Last 1 day
print(f"\nNetwork History Analysis:")
print(f" Period: {history['period']}")
print(f" Total checkpoints: {history['total_checkpoints']}")
print(f" Total transactions: {history['total_transactions']}")
print(f" Checkpoints per day: {history['checkpoints_per_day']:.1f}")
print(f" Transactions per day: {history['transactions_per_day']:.0f}")
except Exception as e:
print(f"Error in historical analysis: {e}")
Response Example​
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"epoch": "251",
"sequenceNumber": "18523456",
"digest": "0x8c123c0b23c456789abcdef0123456789abcdef0123456789abcdef0123456789a",
"networkTotalTransactions": "450123789",
"previousDigest": "0x7d456e78f90123456789abcdef0123456789abcdef0123456789abcdef012345",
"timestampMs": "1703097600000",
"transactions": [
"0x9f234567890abcdef0123456789abcdef0123456789abcdef0123456789abcdef0",
"0xa1b2c3d4e5f6789012345678901234567890123456789012345678901234567890",
"0xb2c3d4e5f6789012345678901234567890123456789012345678901234567890ab"
],
"checkpointCommitments": [],
"validatorSignature": "0xValidatorSignatureHere123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0",
"endOfEpochData": null
}
}
End of Epoch Checkpoint Example​
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"epoch": "251",
"sequenceNumber": "18525000",
"digest": "0x1a2b3c4d5e6f789012345678901234567890123456789012345678901234567890",
"networkTotalTransactions": "450125000",
"previousDigest": "0x8c123c0b23c456789abcdef0123456789abcdef0123456789abcdef0123456789a",
"timestampMs": "1703100000000",
"transactions": [
"0xe1f2a3b4c5d6789012345678901234567890123456789012345678901234567890"
],
"checkpointCommitments": [],
"validatorSignature": "0xEndOfEpochValidatorSignature123456789abcdef0123456789abcdef0123456789abcdef0",
"endOfEpochData": {
"nextEpochCommittee": [
{
"authorityName": "0x1234567890abcdef0123456789abcdef0123456789abcdef0123456789abcdef01",
"stakeUnit": "100000000000",
"networkAddress": "/ip4/192.168.1.1/tcp/8080",
"primaryAddress": "/ip4/192.168.1.1/tcp/8081"
}
],
"nextEpochProtocolVersion": "1.0.0",
"epochCommitments": []
}
}
}
Common Use Cases​
1. Network Health Monitoring​
async function monitorNetworkHealth() {
const monitor = new CheckpointMonitor(client);
// Set custom thresholds for alerts
monitor.thresholds = {
maxTransactionsPerCheckpoint: 5000,
minTimeBetweenCheckpoints: 2000,
maxTimeBetweenCheckpoints: 15000
};
monitor.onNewCheckpoint = (checkpoint) => {
const txCount = checkpoint.transactions.length;
const timestamp = new Date(parseInt(checkpoint.timestampMs));
console.log(`📊 Network Status:`);
console.log(` Checkpoint: ${checkpoint.sequenceNumber}`);
console.log(` TPS: ${txCount} transactions`);
console.log(` Time: ${timestamp.toLocaleTimeString()}`);
};
await monitor.startMonitoring(5000); // Check every 5 seconds
}
2. Transaction Throughput Analysis​
async function analyzeThroughput(checkpointCount = 100) {
const metricsTracker = new NetworkMetricsTracker(client);
const metrics = await metricsTracker.trackMetrics(checkpointCount);
const throughput = metrics.transactionThroughput;
const analysis = {
averageTPS: metrics.averageTPS,
peakTPS: Math.max(...throughput.map(t => t.tps)),
minTPS: Math.min(...throughput.map(t => t.tps)),
medianTPS: throughput.map(t => t.tps).sort()[Math.floor(throughput.length / 2)],
tpsStandardDeviation: calculateStandardDeviation(throughput.map(t => t.tps))
};
return analysis;
}
3. Epoch Transition Tracking​
async function trackEpochTransitions(epochCount = 5) {
const latestCheckpoint = await getLatestCheckpoint();
const currentEpoch = parseInt(latestCheckpoint.epoch);
const epochTransitions = [];
for (let epoch = currentEpoch - epochCount; epoch <= currentEpoch; epoch++) {
// Find last checkpoint of epoch (simplified search)
let foundTransition = false;
let checkpointSeq = parseInt(latestCheckpoint.sequenceNumber) - (currentEpoch - epoch) * 1000;
for (let i = 0; i < 2000 && !foundTransition; i++) {
const checkpoint = await getCheckpointBySequence(checkpointSeq + i);
if (checkpoint && checkpoint.endOfEpochData) {
epochTransitions.push({
epoch: checkpoint.epoch,
checkpoint: checkpoint.sequenceNumber,
timestamp: checkpoint.timestampMs,
nextEpochCommitteeSize: checkpoint.endOfEpochData.nextEpochCommittee?.length || 0
});
foundTransition = true;
}
}
}
return epochTransitions;
}
4. Data Availability Verification​
async function verifyDataAvailability(startSeq, endSeq) {
const verification = {
requestedRange: { start: startSeq, end: endSeq },
availableCheckpoints: [],
missingCheckpoints: [],
consecutiveRanges: [],
availabilityPercentage: 0
};
let currentRange = null;
for (let seq = startSeq; seq <= endSeq; seq++) {
const checkpoint = await getCheckpointBySequence(seq);
if (checkpoint) {
verification.availableCheckpoints.push(seq);
if (currentRange) {
currentRange.end = seq;
} else {
currentRange = { start: seq, end: seq };
}
} else {
verification.missingCheckpoints.push(seq);
if (currentRange) {
verification.consecutiveRanges.push(currentRange);
currentRange = null;
}
}
}
if (currentRange) {
verification.consecutiveRanges.push(currentRange);
}
verification.availabilityPercentage =
(verification.availableCheckpoints.length / (endSeq - startSeq + 1)) * 100;
return verification;
}
Best Practices​
1. Efficient Checkpoint Queries​
// Cache frequently accessed checkpoints
const checkpointCache = new Map();
async function getCachedCheckpoint(id) {
if (checkpointCache.has(id)) {
return checkpointCache.get(id);
}
const checkpoint = await client.getCheckpoint({ id });
if (checkpoint) {
checkpointCache.set(id, checkpoint);
// Limit cache size
if (checkpointCache.size > 1000) {
const firstKey = checkpointCache.keys().next().value;
checkpointCache.delete(firstKey);
}
}
return checkpoint;
}
2. Batch Processing​
// Process checkpoints in batches to avoid overwhelming the RPC
async function processCheckpointRange(start, end, batchSize = 10) {
const results = [];
for (let i = start; i <= end; i += batchSize) {
const batchEnd = Math.min(i + batchSize - 1, end);
const batchPromises = [];
for (let j = i; j <= batchEnd; j++) {
batchPromises.push(getCheckpointBySequence(j));
}
const batchResults = await Promise.allSettled(batchPromises);
results.push(...batchResults.map(r => r.value).filter(Boolean));
// Small delay between batches
if (batchEnd < end) {
await new Promise(resolve => setTimeout(resolve, 200));
}
}
return results;
}
3. Error Handling and Retry Logic​
async function robustGetCheckpoint(id, retries = 3) {
for (let attempt = 1; attempt <= retries; attempt++) {
try {
const checkpoint = await client.getCheckpoint({ id });
return { success: true, data: checkpoint };
} catch (error) {
console.warn(`Attempt ${attempt} failed for checkpoint ${id}:`, error.message);
if (attempt === retries) {
return { success: false, error: error.message };
}
// Exponential backoff
await new Promise(resolve =>
setTimeout(resolve, Math.pow(2, attempt - 1) * 1000)
);
}
}
}
Performance Considerations​
- Rate Limiting: Implement delays between requests to avoid overwhelming the RPC endpoint
- Caching: Cache checkpoint data as it's immutable once finalized
- Batch Processing: Process checkpoints in batches rather than individual requests
- Selective Queries: Only fetch the checkpoint data you need
- Parallel Processing: Use Promise.all for independent checkpoint queries
Error Handling​
async function handleCheckpointErrors(id) {
try {
const checkpoint = await client.getCheckpoint({ id });
return { success: true, data: checkpoint };
} catch (error) {
if (error.message.includes('not found')) {
return {
success: false,
error: 'Checkpoint not found',
notFound: true
};
}
if (error.message.includes('timeout')) {
return {
success: false,
error: 'Request timeout',
retry: true
};
}
return {
success: false,
error: error.message
};
}
}
Related Methods​
- suix_getLatestSuiSystemState - Get complete system state
- sui_getTransactionBlock - Get transactions in checkpoint
- suix_queryTransactionBlocks - Query transactions by checkpoint
Notes​
- Checkpoints are immutable once finalized by consensus
- Sequential numbering ensures no gaps in the checkpoint sequence
- End of epoch checkpoints contain additional validator committee information
- Timestamps are in milliseconds since Unix epoch
- Transaction digests in checkpoints can be used to fetch detailed transaction data
- Validator signatures provide cryptographic proof of consensus
Need help? Contact our support team or check the Sui documentation.