StreamBlocks
Stream continuous block data starting from a timestamp, providing real-time access to Hyperliquid blockchain state changes.
When to Use This Method
StreamBlocks
is essential for:
- Blockchain Monitoring - Track all network activity and state changes
- Block Explorers - Build real-time blockchain data applications
- Analytics Systems - Collect comprehensive blockchain metrics
- Compliance & Auditing - Monitor all network transactions and events
Method Signature
rpc StreamBlocks(Timestamp) returns (stream Block) {}
Request Message
message Timestamp {
int64 timestamp = 1;
}
Current Limitation: The timestamp parameter is not yet implemented. Streams currently start from "now" and stream forward only, regardless of the timestamp value provided.
Response Stream
message Block {
// JSON-encoded object conforming to files of
// Hyperliquid data dir "replica_cmds"
bytes data = 1;
}
The data
field contains a JSON-encoded block object with:
- Block header information (height, timestamp, hash)
- Transaction data and execution results
- State changes and events
- Validator signatures and consensus data
Implementation Examples
- Go
- Python
- Node.js
package main
import (
"context"
"encoding/json"
"log"
"time"
pb "your-project/hyperliquid_l1_gateway/v1"
"google.golang.org/grpc"
)
type BlockData struct {
Height int64 `json:"height"`
Hash string `json:"hash"`
Timestamp int64 `json:"timestamp"`
TxCount int `json:"tx_count"`
// Add other fields based on actual block structure
}
type BlockProcessor struct {
client pb.HyperliquidL1GatewayClient
stats *BlockStats
}
type BlockStats struct {
BlocksProcessed int64
LastBlockHeight int64
LastBlockTime time.Time
TotalTxs int64
}
func NewBlockProcessor(client pb.HyperliquidL1GatewayClient) *BlockProcessor {
return &BlockProcessor{
client: client,
stats: &BlockStats{},
}
}
func (bp *BlockProcessor) StreamBlocks(ctx context.Context) error {
// Start streaming from now
stream, err := bp.client.StreamBlocks(
ctx,
&pb.Timestamp{Timestamp: 0},
)
if err != nil {
return err
}
log.Println("Starting block stream...")
for {
select {
case <-ctx.Done():
log.Println("Stream context cancelled")
return ctx.Err()
default:
block, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return err
}
if err := bp.processBlock(block.Data); err != nil {
log.Printf("Failed to process block: %v", err)
continue
}
}
}
}
func (bp *BlockProcessor) processBlock(data []byte) error {
var blockData BlockData
if err := json.Unmarshal(data, &blockData); err != nil {
return err
}
// Update statistics
bp.stats.BlocksProcessed++
bp.stats.LastBlockHeight = blockData.Height
bp.stats.LastBlockTime = time.Unix(blockData.Timestamp, 0)
bp.stats.TotalTxs += int64(blockData.TxCount)
// Process block data
log.Printf("Block #%d: %d transactions, hash: %s",
blockData.Height, blockData.TxCount, blockData.Hash)
// Add custom block processing logic here
bp.analyzeBlock(&blockData)
return nil
}
func (bp *BlockProcessor) analyzeBlock(block *BlockData) {
// Example: detect large blocks
if block.TxCount > 100 {
log.Printf("Large block detected: %d transactions", block.TxCount)
}
// Example: check block time intervals
if bp.stats.BlocksProcessed > 1 {
prevTime := bp.stats.LastBlockTime.Add(-time.Duration(1) * time.Second)
interval := time.Since(prevTime)
if interval > 15*time.Second {
log.Printf("Slow block: %v interval", interval)
}
}
}
func (bp *BlockProcessor) GetStats() *BlockStats {
return bp.stats
}
func main() {
// Connect to Hyperliquid L1 Gateway
conn, err := grpc.Dial(
"<YOUR_HYPERLIQUID_ENDPOINT>", // Contact support@dwellir.com
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewHyperliquidL1GatewayClient(conn)
processor := NewBlockProcessor(client)
// Start streaming with context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
if err := processor.StreamBlocks(ctx); err != nil {
log.Printf("Stream ended: %v", err)
}
}()
// Print stats periodically
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
stats := processor.GetStats()
log.Printf("Stats - Blocks: %d, Height: %d, Total TXs: %d",
stats.BlocksProcessed, stats.LastBlockHeight, stats.TotalTxs)
}
}
}
import grpc
import json
import time
import threading
from collections import deque
from hyperliquid_l1_gateway_pb2 import Timestamp
from hyperliquid_l1_gateway_pb2_grpc import HyperliquidL1GatewayStub
class BlockStreamAnalyzer:
def __init__(self, endpoint):
self.channel = grpc.insecure_channel(endpoint)
self.stub = HyperliquidL1GatewayStub(self.channel)
self.stats = {
'blocks_processed': 0,
'last_block_height': 0,
'last_block_time': 0,
'total_transactions': 0,
'start_time': time.time()
}
self.recent_blocks = deque(maxlen=100) # Keep last 100 blocks
self.running = False
def stream_blocks(self, callback=None):
"""Start streaming blocks"""
self.running = True
timestamp = Timestamp(timestamp=0) # Start from now
try:
for block in self.stub.StreamBlocks(timestamp):
if not self.running:
break
# Parse block data
try:
block_data = json.loads(block.data)
self.process_block(block_data)
# Call custom callback if provided
if callback:
callback(block_data)
except json.JSONDecodeError as e:
print(f"JSON parsing error: {e}")
continue
except grpc.RpcError as e:
print(f"gRPC error: {e}")
except KeyboardInterrupt:
print("Stream interrupted by user")
finally:
self.running = False
def process_block(self, block_data):
"""Process individual block"""
height = block_data.get('height', 0)
timestamp = block_data.get('timestamp', 0)
tx_count = block_data.get('tx_count', 0)
# Update statistics
self.stats['blocks_processed'] += 1
self.stats['last_block_height'] = height
self.stats['last_block_time'] = timestamp
self.stats['total_transactions'] += tx_count
# Store block info
block_info = {
'height': height,
'timestamp': timestamp,
'tx_count': tx_count,
'hash': block_data.get('hash', ''),
'processed_at': time.time()
}
self.recent_blocks.append(block_info)
print(f"Block #{height}: {tx_count} transactions")
# Perform analysis
self.analyze_block_patterns()
def analyze_block_patterns(self):
"""Analyze recent block patterns"""
if len(self.recent_blocks) < 2:
return
current = self.recent_blocks[-1]
previous = self.recent_blocks[-2]
# Calculate block interval
time_diff = current['timestamp'] - previous['timestamp']
if time_diff > 15: # Assuming ~12 second block time
print(f"⚠️ Slow block: {time_diff}s interval")
# Detect transaction volume spikes
if current['tx_count'] > 50:
print(f"📈 High activity block: {current['tx_count']} transactions")
def get_throughput_stats(self, window_minutes=5):
"""Calculate throughput statistics"""
cutoff_time = time.time() - (window_minutes * 60)
recent = [b for b in self.recent_blocks
if b['processed_at'] > cutoff_time]
if not recent:
return None
total_blocks = len(recent)
total_txs = sum(b['tx_count'] for b in recent)
avg_block_time = window_minutes * 60 / total_blocks if total_blocks > 0 else 0
return {
'blocks_per_minute': total_blocks / window_minutes,
'transactions_per_second': total_txs / (window_minutes * 60),
'average_block_time': avg_block_time,
'average_txs_per_block': total_txs / total_blocks if total_blocks > 0 else 0
}
def stop(self):
"""Stop the stream"""
self.running = False
def custom_block_handler(block_data):
"""Example custom block handler"""
height = block_data.get('height', 0)
# Example: Save blocks to database
# save_block_to_db(block_data)
# Example: Trigger alerts for specific conditions
if block_data.get('tx_count', 0) > 100:
print(f"🚨 Alert: Block {height} has high transaction count")
def main():
analyzer = BlockStreamAnalyzer('<YOUR_HYPERLIQUID_ENDPOINT>')
# Start streaming in background thread
stream_thread = threading.Thread(
target=analyzer.stream_blocks,
args=(custom_block_handler,)
)
stream_thread.start()
try:
# Print stats every 30 seconds
while analyzer.running:
time.sleep(30)
stats = analyzer.get_throughput_stats()
if stats:
print(f"\n📊 5-min Stats:")
print(f" Blocks/min: {stats['blocks_per_minute']:.2f}")
print(f" TXs/sec: {stats['transactions_per_second']:.2f}")
print(f" Avg block time: {stats['average_block_time']:.1f}s")
print(f" Avg TXs/block: {stats['average_txs_per_block']:.1f}")
print(f"Total processed: {analyzer.stats['blocks_processed']} blocks")
except KeyboardInterrupt:
print("\nStopping stream...")
analyzer.stop()
stream_thread.join()
if __name__ == '__main__':
main()
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const EventEmitter = require('events');
// Load proto file
const packageDefinition = protoLoader.loadSync(
'hyperliquid_l1_gateway.proto',
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
}
);
class BlockStreamManager extends EventEmitter {
constructor(endpoint) {
super();
const proto = grpc.loadPackageDefinition(packageDefinition);
this.client = new proto.hyperliquid_l1_gateway.v1.HyperliquidL1Gateway(
endpoint, // Contact support@dwellir.com
grpc.credentials.createInsecure()
);
this.stats = {
blocksProcessed: 0,
lastBlockHeight: 0,
lastBlockTime: 0,
totalTransactions: 0,
startTime: Date.now()
};
this.recentBlocks = [];
this.maxRecentBlocks = 100;
this.isStreaming = false;
}
startStream() {
if (this.isStreaming) {
console.log('Stream already running');
return;
}
this.isStreaming = true;
const stream = this.client.StreamBlocks({ timestamp: 0 });
stream.on('data', (block) => {
try {
const blockData = JSON.parse(block.data.toString());
this.processBlock(blockData);
} catch (error) {
console.error('JSON parsing error:', error.message);
}
});
stream.on('error', (error) => {
console.error('Stream error:', error.message);
this.emit('error', error);
this.isStreaming = false;
});
stream.on('end', () => {
console.log('Stream ended');
this.emit('end');
this.isStreaming = false;
});
this.stream = stream;
console.log('Block stream started');
}
processBlock(blockData) {
const height = blockData.height || 0;
const timestamp = blockData.timestamp || 0;
const txCount = blockData.tx_count || 0;
// Update statistics
this.stats.blocksProcessed++;
this.stats.lastBlockHeight = height;
this.stats.lastBlockTime = timestamp;
this.stats.totalTransactions += txCount;
// Store recent block info
const blockInfo = {
height,
timestamp,
txCount,
hash: blockData.hash || '',
processedAt: Date.now()
};
this.recentBlocks.push(blockInfo);
if (this.recentBlocks.length > this.maxRecentBlocks) {
this.recentBlocks.shift();
}
console.log(`Block #${height}: ${txCount} transactions`);
// Emit events for different block types
this.emit('block', blockData);
if (txCount > 50) {
this.emit('highActivity', blockInfo);
}
// Analyze block timing
this.analyzeBlockTiming(blockInfo);
}
analyzeBlockTiming(currentBlock) {
if (this.recentBlocks.length < 2) return;
const previousBlock = this.recentBlocks[this.recentBlocks.length - 2];
const timeDiff = currentBlock.timestamp - previousBlock.timestamp;
if (timeDiff > 15) {
console.log(`⚠️ Slow block: ${timeDiff}s interval`);
this.emit('slowBlock', { current: currentBlock, interval: timeDiff });
}
}
getThroughputMetrics(windowMinutes = 5) {
const cutoffTime = Date.now() - (windowMinutes * 60 * 1000);
const recentBlocks = this.recentBlocks.filter(
block => block.processedAt > cutoffTime
);
if (recentBlocks.length === 0) return null;
const totalBlocks = recentBlocks.length;
const totalTxs = recentBlocks.reduce((sum, block) => sum + block.txCount, 0);
const timeSpan = windowMinutes * 60;
return {
blocksPerMinute: totalBlocks / windowMinutes,
transactionsPerSecond: totalTxs / timeSpan,
averageTxsPerBlock: totalTxs / totalBlocks,
blockCount: totalBlocks
};
}
getRealtimeStats() {
const now = Date.now();
const uptimeSeconds = (now - this.stats.startTime) / 1000;
return {
...this.stats,
uptimeSeconds,
blocksPerSecond: this.stats.blocksProcessed / uptimeSeconds,
txsPerSecond: this.stats.totalTransactions / uptimeSeconds
};
}
stopStream() {
if (this.stream) {
this.stream.cancel();
this.isStreaming = false;
console.log('Stream stopped');
}
}
}
// Example usage and monitoring
function setupBlockMonitoring() {
const manager = new BlockStreamManager('<YOUR_HYPERLIQUID_ENDPOINT>');
// Set up event listeners
manager.on('block', (blockData) => {
// Custom block processing logic
processBlockForAnalytics(blockData);
});
manager.on('highActivity', (blockInfo) => {
console.log(`📈 High activity block #${blockInfo.height}: ${blockInfo.txCount} txs`);
});
manager.on('slowBlock', ({ current, interval }) => {
console.log(`🐌 Slow block #${current.height}: ${interval}s interval`);
});
manager.on('error', (error) => {
console.error('Stream error, attempting restart...', error.message);
setTimeout(() => {
manager.startStream();
}, 5000);
});
// Start streaming
manager.startStream();
// Print stats every 30 seconds
const statsInterval = setInterval(() => {
const realtimeStats = manager.getRealtimeStats();
const throughputMetrics = manager.getThroughputMetrics(5);
console.log('\n📊 Stream Statistics:');
console.log(` Blocks processed: ${realtimeStats.blocksProcessed}`);
console.log(` Last block height: ${realtimeStats.lastBlockHeight}`);
console.log(` Uptime: ${Math.floor(realtimeStats.uptimeSeconds)}s`);
if (throughputMetrics) {
console.log(` Recent throughput:`);
console.log(` ${throughputMetrics.blocksPerMinute.toFixed(2)} blocks/min`);
console.log(` ${throughputMetrics.transactionsPerSecond.toFixed(2)} txs/sec`);
console.log(` ${throughputMetrics.averageTxsPerBlock.toFixed(1)} avg txs/block`);
}
}, 30000);
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\nShutting down...');
clearInterval(statsInterval);
manager.stopStream();
process.exit(0);
});
return manager;
}
function processBlockForAnalytics(blockData) {
// Example: Save to database
// saveBlockToDatabase(blockData);
// Example: Update metrics dashboard
// updateMetricsDashboard(blockData);
// Example: Trigger alerts based on block content
if (blockData.tx_count > 100) {
console.log(`🚨 Alert: Block ${blockData.height} has high transaction volume`);
}
}
// Start monitoring
setupBlockMonitoring();
Common Use Cases
1. Block Explorer Backend
class BlockExplorerService {
constructor(streamManager) {
this.streamManager = streamManager;
this.blockCache = new Map();
streamManager.on('block', (blockData) => {
this.indexBlock(blockData);
});
}
async indexBlock(blockData) {
// Store block in cache/database
this.blockCache.set(blockData.height, blockData);
// Index transactions
if (blockData.transactions) {
for (const tx of blockData.transactions) {
await this.indexTransaction(tx, blockData.height);
}
}
// Update blockchain metrics
await this.updateChainMetrics(blockData);
}
async indexTransaction(tx, blockHeight) {
// Store transaction data with block reference
// Update address balances
// Index events and logs
}
async updateChainMetrics(blockData) {
// Update total supply, validator info, etc.
}
}
2. Network Health Monitor
class NetworkHealthMonitor:
def __init__(self, analyzer):
self.analyzer = analyzer
self.health_metrics = {
'avg_block_time': [],
'tx_throughput': [],
'block_sizes': []
}
def analyze_health(self):
"""Analyze network health from recent blocks"""
if len(self.analyzer.recent_blocks) < 10:
return None
recent = list(self.analyzer.recent_blocks)[-10:]
# Calculate average block time
time_diffs = []
for i in range(1, len(recent)):
diff = recent[i]['timestamp'] - recent[i-1]['timestamp']
time_diffs.append(diff)
avg_block_time = sum(time_diffs) / len(time_diffs)
# Calculate transaction throughput
total_txs = sum(b['tx_count'] for b in recent)
time_span = recent[-1]['timestamp'] - recent[0]['timestamp']
throughput = total_txs / time_span if time_span > 0 else 0
return {
'average_block_time': avg_block_time,
'transaction_throughput': throughput,
'health_score': self.calculate_health_score(avg_block_time, throughput)
}
def calculate_health_score(self, block_time, throughput):
"""Calculate network health score (0-100)"""
score = 100
# Penalize slow blocks
if block_time > 15:
score -= min(30, (block_time - 15) * 2)
# Penalize low throughput
if throughput < 1:
score -= 20
return max(0, score)
3. Compliance and Auditing
type ComplianceMonitor struct {
client pb.HyperliquidL1GatewayClient
alerts chan Alert
}
type Alert struct {
Type string
Message string
BlockData interface{}
Timestamp time.Time
}
func (cm *ComplianceMonitor) MonitorCompliance(ctx context.Context) {
stream, err := cm.client.StreamBlocks(ctx, &pb.Timestamp{Timestamp: 0})
if err != nil {
log.Fatal(err)
}
for {
block, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return
}
cm.analyzeBlockForCompliance(block.Data)
}
}
func (cm *ComplianceMonitor) analyzeBlockForCompliance(data []byte) {
var blockData map[string]interface{}
json.Unmarshal(data, &blockData)
// Check for large transactions
if transactions, ok := blockData["transactions"].([]interface{}); ok {
for _, tx := range transactions {
if txData, ok := tx.(map[string]interface{}); ok {
if value, exists := txData["value"]; exists {
// Flag large transactions for review
if parseValue(value) > 1000000 { // Example threshold
cm.alerts <- Alert{
Type: "LARGE_TRANSACTION",
Message: "Transaction exceeds reporting threshold",
BlockData: txData,
Timestamp: time.Now(),
}
}
}
}
}
}
}
Error Handling and Reconnection
class RobustBlockStreamer {
constructor(endpoint) {
this.endpoint = endpoint;
this.maxRetries = 5;
this.retryDelay = 1000; // Start with 1 second
this.currentRetries = 0;
}
async startStreamWithRetry() {
while (this.currentRetries < this.maxRetries) {
try {
await this.startStream();
this.currentRetries = 0; // Reset on successful connection
this.retryDelay = 1000;
} catch (error) {
this.currentRetries++;
console.error(`Stream attempt ${this.currentRetries} failed:`, error.message);
if (this.currentRetries >= this.maxRetries) {
throw new Error('Max retry attempts exceeded');
}
// Exponential backoff
await this.sleep(this.retryDelay);
this.retryDelay *= 2;
}
}
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Best Practices
- Connection Management: Implement robust reconnection logic with exponential backoff
- Memory Management: Use bounded collections for storing recent blocks to prevent memory leaks
- Performance: Process blocks asynchronously to avoid blocking the stream
- Monitoring: Track stream health and performance metrics
- Error Recovery: Handle various error types (network, parsing, processing) gracefully
- Resource Cleanup: Properly close streams and connections on shutdown
Current Limitations
- Historical Data: Cannot stream from historical timestamps; only real-time streaming available
- Data Retention: Node maintains only 24 hours of historical block data
- Backpressure: High-volume periods may require careful handling to avoid overwhelming downstream systems
Need help? Contact our support team or check the Hyperliquid gRPC documentation.