StreamBlocks
Stream continuous block data starting from a timestamp, providing real-time access to Hyperliquid blockchain state changes.
When to Use This Method#
StreamBlocks is essential for:
- Blockchain Monitoring - Track all network activity and state changes
- Block Explorers - Build real-time blockchain data applications
- Analytics Systems - Collect comprehensive blockchain metrics
- Compliance & Auditing - Monitor all network transactions and events
Method Signature#
rpc StreamBlocks(Timestamp) returns (stream Block) {}
Request Message#
message Timestamp {
int64 timestamp = 1;
}
Current Limitation: The timestamp parameter is not yet implemented. Streams currently start from "now" and stream forward only, regardless of the timestamp value provided.
Response Stream#
message Block {
// JSON-encoded object conforming to files of
// Hyperliquid data dir "replica_cmds"
bytes data = 1;
}
The data field contains a JSON-encoded block object with:
- Block header information (height
round, timestamptime) - Transaction data and execution results
- State changes and events
- Validator signatures and consensus data
- Note: The sample format does not include a top-level block hash; bundle hashes are present per
signed_action_bundles.
- Note: The sample format does not include a top-level block hash; bundle hashes are present per
Full Block Spec#
StreamBlocks emits a JSON payload matching Hyperliquid's replica_cmds BlockData. Below is the exact structure.
Top‑level keys (always present):
{
"abci_block": {
"time": "2025-09-08T06:41:57.997372546", // ISO-8601 with nanosecond precision
"round": 992814678, // Current block number (always incrementing)
"parent_round": 992814677, // Previous block (always round - 1)
"proposer": "0x5ac99df645f3414876c816caa18b2d234024b487", // 40 hex chars, lowercase
"hardfork": {
"version": 57, // Protocol version number
"round": 990500929 // Block when this version activated
},
"signed_action_bundles": [ // Array of [hash, bundle_data] pairs
[
"0xb4b1f5a9c233f9d90fd24b9961fd12708b36cc3d56f8fda47f32b667ee8d1227", // Bundle hash (64 hex)
{
"signed_actions": [ // Array of transactions in this bundle
{
"signature": {
"r": "0xd931f13565ae66c3bc41a05da4180bb795dbd9ed2d365efaf639fd23b3774ac6",
"s": "0x4a7a0534bf0a4238dfe404a88d335ab4c9b8222909100d773635e328d2ab864c",
"v": 27 // Recovery ID: always 27 or 28
},
"action": {
"type": "order", // Action type (order/cancel/evmRawTx/etc)
"orders": [{ // Type-specific payload
"a": 170, // Asset ID
"b": true, // Buy=true, Sell=false
"p": "0.038385", // Price as string
"s": "1514", // Size as string
"r": false, // Reduce-only flag
"t": {
"limit": {
"tif": "Ioc" // Time-in-force: Ioc/Alo/Gtc
}
},
"c": "0x7192c49bcadb32d394e38617ea99cc09" // Client order ID
}]
},
"nonce": 1757313597362 // Unique transaction nonce
}
// ... more signed_actions
],
"broadcaster": "0x67e451964e0421f6e7d07be784f35c530667c2b3", // Who sent bundle
"broadcaster_nonce": 1757313597367 // Bundle-level nonce
}
]
// ... more bundles (typically 1-6 total)
]
},
"resps": {
"Full": [ // Matches signed_action_bundles structure
[
"0xb4b1f5a9c233f9d90fd24b9961fd12708b36cc3d56f8fda47f32b667ee8d1227", // Same bundle hash
[ // One response per signed_action
{
"user": "0xecb63caa47c7c4e77f60f1ce858cf28dc2b82b00", // Address of action signer
"res": {
"status": "ok", // "ok" or "err"
"response": {
"type": "order", // Response type
"data": {
"statuses": [{
"filled": { // Order state: filled/resting/error
"totalSz": "1514.0", // Filled size
"avgPx": "0.038385", // Average fill price
"oid": 156190414943, // Order ID assigned
"cloid": "0x7192c49bcadb32d394e38617ea99cc09" // Client order ID
}
}]
}
}
}
}
// ... more responses (one per action)
]
]
// ... more response bundles (matches signed_action_bundles count)
]
}
}
Bundle entry (abci_block.signed_action_bundles[i]):
[
"0x...", // bundle_hash
{
"signed_actions": [ /* SignedAction */ ],
"broadcaster": "0x...",
"broadcaster_nonce": 1757313597367
}
]
SignedAction envelope (common fields across all actions):
{
"signature": { "r": "0x...", "s": "0x...", "v": 27 },
"action": { "type": "order" }, // one of: order | cancel | cancelByCloid | batchModify | evmRawTx
"nonce": 1757313597362,
"vaultAddress": "0x...", // optional
"expiresAfter": 1757313718705 // optional
}
Action payloads (in action):
- type: "order"
{
"type": "order",
"orders": [
{
"a": 204, // asset id
"b": true, // buy (true) / sell (false)
"p": "0.20847", // price (string)
"s": "1866", // size (string)
"r": false, // reduce-only
"t": { "limit": { "tif": "Gtc" } }, // tif: Gtc | Ioc | Alo
"c": "0x..." // optional client order id
}
],
"grouping": "na"
}
- type: "cancel"
{ "type": "cancel", "cancels": [ { "a": 204, "o": 156190390863 } ] }
- type: "cancelByCloid"
{ "type": "cancelByCloid", "cancels": [ { "asset": 159, "cloid": "0x..." } ] }
- type: "batchModify"
{
"type": "batchModify",
"modifies": [
{
"oid": 156190366007,
"order": { "a": 8, "b": true, "p": "0.58625", "s": "40.2", "r": false, "t": { "limit": { "tif": "Alo" } }, "c": "0x..." }
}
]
}
- type: "evmRawTx"
{ "type": "evmRawTx", "data": "0x..." }
Execution responses (resps.Full):
[
"0xBUNDLE_HASH",
[
{
"user": "0x...",
"res": {
"status": "ok", // or "err"
"response": {
"type": "order", // e.g., order | cancel | default
"data": {
"statuses": [
{ "filled": { "totalSz": "1514.0", "avgPx": "0.038385", "oid": 156190414943, "cloid": "0x..." } },
{ "resting": { "oid": 156190414944, "cloid": "0x..." } },
{ "error": "..." }
]
}
}
}
}
]
]
Guarantees and alignment:
abci_blockandresps.Fullare always present.resps.Full.length === abci_block.signed_action_bundles.length.- For each bundle,
responses.length === signed_actions.length. - Height is
abci_block.round, parent isabci_block.parent_round. - Timestamp
abci_block.timeis ISO‑8601 with nanosecond precision (treat as UTC if no suffix).
Developer tips:
- Dispatch on
action.typeand handle new types defensively. - Store
bundle_hashas the stable join key between actions and responses. - Normalize prices/sizes (strings) to numeric types as appropriate.
Implementation Examples#
- Go
- Python
- Node.js
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"time"
pb "hyperliquid-grpc-client/api"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
func streamBlocks() {
const endpoint = "hl-cendars.n.dwellir.com"
fmt.Println("🚀 Hyperliquid Go gRPC Client - Stream Blocks")
fmt.Println("===========================================")
fmt.Printf("📡 Endpoint: %s\n\n", endpoint)
// Set up TLS connection
creds := credentials.NewTLS(nil)
// Connection options
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(150 * 1024 * 1024), // 150MB max
),
}
// Connect to server
fmt.Println("🔌 Connecting to gRPC server...")
conn, err := grpc.NewClient(endpoint, opts...)
if err != nil {
log.Fatalf("❌ Failed to connect: %v", err)
}
defer conn.Close()
// Create the client using generated code
client := pb.NewHyperLiquidL1GatewayClient(conn)
fmt.Println("✅ Connected successfully!\n")
// Create context without timeout for continuous streaming
ctx := context.Background()
// Create request - 0 means latest/current blocks
request := &pb.Timestamp{
Timestamp: 0,
}
fmt.Println("📥 Starting block stream...")
fmt.Println("Press Ctrl+C to stop streaming\n")
// Start streaming blocks
stream, err := client.StreamBlocks(ctx, request)
if err != nil {
log.Fatalf("❌ Failed to start stream: %v", err)
}
blockCount := 0
for {
response, err := stream.Recv()
if err != nil {
fmt.Printf("❌ Stream ended: %v\n", err)
break
}
blockCount++
fmt.Printf("\n===== BLOCK #%d =====\n", blockCount)
fmt.Printf("📦 Response size: %d bytes\n", len(response.Data))
// Process each block
processBlock(response.Data, blockCount)
fmt.Println("\n" + strings.Repeat("─", 50))
}
fmt.Printf("\n📊 Total blocks received: %d\n", blockCount)
}
func processBlock(data []byte, blockNum int) {
// Parse JSON
var block map[string]interface{}
if err := json.Unmarshal(data, &block); err != nil {
fmt.Printf("❌ Failed to parse JSON: %v\n", err)
fmt.Printf("Raw data (first 200 bytes): %.200s\n", data)
return
}
fmt.Printf("🧱 BLOCK #%d DETAILS\n", blockNum)
fmt.Println("===================")
// Display block height
if height, ok := block["height"].(float64); ok {
fmt.Printf("📏 Height: %.0f\n", height)
}
// Display timestamp
if timestamp, ok := block["time"].(float64); ok {
t := time.Unix(int64(timestamp/1000), 0)
fmt.Printf("⏰ Time: %s\n", t.Format("2006-01-02 15:04:05 UTC"))
}
// Display hash if available
if hash, ok := block["hash"].(string); ok {
fmt.Printf("🔗 Hash: %s\n", hash)
}
// Display number of transactions
if txs, ok := block["txs"].([]interface{}); ok {
fmt.Printf("📋 Transactions: %d\n", len(txs))
// Show first few transaction details
maxTxs := 3
if len(txs) < maxTxs {
maxTxs = len(txs)
}
for i := 0; i < maxTxs; i++ {
if tx, ok := txs[i].(map[string]interface{}); ok {
fmt.Printf(" • TX %d: ", i+1)
if txType, ok := tx["type"].(string); ok {
fmt.Printf("Type: %s", txType)
}
if txHash, ok := tx["hash"].(string); ok {
fmt.Printf(", Hash: %.12s...", txHash)
}
fmt.Println()
}
}
if len(txs) > maxTxs {
fmt.Printf(" ... and %d more transactions\n", len(txs)-maxTxs)
}
}
// Display any other interesting fields
fmt.Printf("\n📊 Block Summary:\n")
for key, value := range block {
switch key {
case "height", "time", "hash", "txs":
// Already displayed above
continue
default:
// Display other fields
fmt.Printf("• %s: %v\n", key, value)
}
}
}
func runStreamBlocks() {
streamBlocks()
}
func main() {
streamBlocks()
}
import grpc
import json
import signal
import sys
import os
from datetime import datetime
from dotenv import load_dotenv
import hyperliquid_pb2
import hyperliquid_pb2_grpc
from pprint import pprint as pp
# Load environment variables
load_dotenv()
def stream_blocks():
endpoint = os.getenv('HYPERLIQUID_ENDPOINT')
api_key = os.getenv('API_KEY')
if not endpoint:
print("Error: HYPERLIQUID_ENDPOINT environment variable is required.")
print("Please create a .env file from .env.example and set your endpoint.")
sys.exit(1)
if not api_key:
print("Error: API_KEY environment variable is required.")
print("Please set your API key in the .env file.")
sys.exit(1)
print('🚀 Hyperliquid Python gRPC Client - Stream Blocks')
print('===============================================')
print(f'📡 Endpoint: {endpoint}\n')
# Create SSL credentials
credentials = grpc.ssl_channel_credentials()
# Create channel with options
options = [
('grpc.max_receive_message_length', 150 * 1024 * 1024), # 150MB max
]
# Prepare metadata with API key
metadata = [('x-api-key', api_key)]
print('🔌 Connecting to gRPC server...')
with grpc.secure_channel(endpoint, credentials, options=options) as channel:
# Create client stub
client = hyperliquid_pb2_grpc.HyperLiquidL1GatewayStub(channel)
print('✅ Connected successfully!\n')
# Create request - 0 means latest/current blocks
request = hyperliquid_pb2.Timestamp(timestamp=0)
print('📥 Starting block stream...')
print('Press Ctrl+C to stop streaming\n')
block_count = 0
# Set up signal handler for graceful shutdown
def signal_handler(sig, frame):
print('\n🛑 Stopping stream...')
print(f'📊 Total blocks received: {block_count}')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
try:
# Start streaming blocks with metadata
for response in client.StreamBlocks(request, metadata=metadata):
block_count += 1
print(f'\n===== BLOCK #{block_count} =====')
print(f'📦 Response size: {len(response.data)} bytes')
# Process each block
process_block(response.data, block_count)
print('\n' + '─' * 50)
except grpc.RpcError as e:
print(f'❌ Stream error: {e}')
except KeyboardInterrupt:
print('\n🛑 Stopping stream...')
print(f'\n📊 Total blocks received: {block_count}')
def process_block(data, block_num):
try:
# Parse JSON
block = json.loads(data.decode('utf-8'))
print(f'🧱 BLOCK #{block_num} DETAILS')
print('===================')
# Count action types (counting individual orders within each action)
action_type_counts = {}
if 'abci_block' in block:
abci_block = block['abci_block']
if 'proposer' in abci_block:
print(f'👤 Proposer: {abci_block["proposer"]}')
if 'signed_action_bundles' in abci_block:
if isinstance(abci_block['signed_action_bundles'], list):
for action_bundle in abci_block['signed_action_bundles']:
# Each action_bundle is [hash, data_object]
if isinstance(action_bundle, list) and len(action_bundle) > 1:
bundle_data = action_bundle[1]
if 'signed_actions' in bundle_data:
if isinstance(bundle_data['signed_actions'], list):
for signed_action in bundle_data['signed_actions']:
if 'action' in signed_action:
action = signed_action['action']
if isinstance(action, dict) and 'type' in action:
action_type = action['type']
# For order type, count the number of orders
if action_type == 'order' and 'orders' in action:
if isinstance(action['orders'], list):
count = len(action['orders'])
action_type_counts[action_type] = action_type_counts.get(action_type, 0) + count
else:
action_type_counts[action_type] = action_type_counts.get(action_type, 0) + 1
else:
# For other action types, count as 1
action_type_counts[action_type] = action_type_counts.get(action_type, 0) + 1
total_actions = sum(action_type_counts.values())
print(f'📋 Action types:')
for action_type, count in action_type_counts.items():
print(f' • {action_type}: {count}')
print(f' Total actions: {total_actions}')
# Count order statuses (success vs error) from resps.Full section
success_count = 0
error_count = 0
if 'resps' in block:
resps = block['resps']
if isinstance(resps, dict) and 'Full' in resps:
full_data = resps['Full']
if isinstance(full_data, list):
for item in full_data:
# Each item is [hash, [entries...]]
if isinstance(item, list) and len(item) > 1:
entries = item[1] # Skip the hash, get the second part
if isinstance(entries, list):
for entry in entries:
if isinstance(entry, dict) and 'res' in entry:
res = entry['res']
if isinstance(res, dict) and 'response' in res:
response = res['response']
if isinstance(response, dict) and response.get('type') == 'order':
if 'data' in response and 'statuses' in response['data']:
statuses = response['data']['statuses']
if isinstance(statuses, list):
for status in statuses:
if isinstance(status, dict):
if 'error' in status:
error_count += 1
else:
# Success status (resting, filled, etc.)
success_count += 1
total_statuses = success_count + error_count
print(f'\n📊 Order Statuses:')
print(f' ✅ Success: {success_count}')
print(f' ❌ Error: {error_count}')
print(f' Total statuses: {total_statuses}')
print(f'\n🔍 Match check: Actions={total_actions}, Statuses={total_statuses}, Match={total_actions == total_statuses}')
except json.JSONDecodeError as e:
print(f'❌ Failed to parse JSON: {e}')
print(f'Raw data (first 200 bytes): {data[:200]}')
except Exception as e:
print(f'❌ Error processing block: {e}')
if __name__ == '__main__':
stream_blocks()
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');
require('dotenv').config();
// Load proto file
const PROTO_PATH = path.join(__dirname, 'hyperliquid.proto');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const proto = grpc.loadPackageDefinition(packageDefinition);
// Main function
async function streamBlocks() {
const endpoint = process.env.HYPERLIQUID_ENDPOINT;
const apiKey = process.env.API_KEY;
if (!endpoint) {
console.error('Error: HYPERLIQUID_ENDPOINT environment variable is required.');
console.error('Please create a .env file from .env.example and set your endpoint.');
process.exit(1);
}
if (!apiKey) {
console.error('Error: API_KEY environment variable is required.');
console.error('Please set your API key in the .env file.');
process.exit(1);
}
console.log('🚀 Hyperliquid Node.js gRPC Client - Stream Blocks');
console.log('================================================');
console.log(`📡 Endpoint: ${endpoint}\n`);
// Create metadata with API key
const metadata = new grpc.Metadata();
metadata.add('x-api-key', apiKey);
// Create client with SSL credentials and 150MB message size limit
const client = new proto.hyperliquid_l1_gateway.v1.HyperLiquidL1Gateway(
endpoint,
grpc.credentials.createSsl(),
{
'grpc.max_receive_message_length': 150 * 1024 * 1024
}
);
console.log('📥 Starting block stream...');
console.log('Press Ctrl+C to stop streaming\n');
// Make the gRPC call
const stream = client.StreamBlocks({ timestamp: 0 }, metadata);
let blockCount = 0;
stream.on('data', (data) => {
blockCount++;
try {
const block = JSON.parse(data.data);
console.log(`\n===== BLOCK #${blockCount} =====`);
console.log(`📦 Response size: ${data.data.length} bytes`);
// Process each block
processBlock(block, blockCount);
console.log('\n' + '─'.repeat(50));
} catch (error) {
console.error(`❌ Failed to parse message #${blockCount}:`, error.message);
}
});
stream.on('error', (error) => {
console.error('❌ Stream error:', error.message);
});
stream.on('end', () => {
console.log('Stream ended');
console.log(`\n📊 Total blocks received: ${blockCount}`);
});
}
function processBlock(block, blockNum) {
console.log(`🧱 BLOCK #${blockNum} DETAILS`);
console.log('===================');
// Count action types (counting individual orders within each action)
const actionTypeCounts = {};
if (block.abci_block && block.abci_block.proposer) {
console.log(`👤 Proposer: ${block.abci_block.proposer}`);
}
if (block.abci_block && block.abci_block.signed_action_bundles) {
for (const actionBundle of block.abci_block.signed_action_bundles) {
if (Array.isArray(actionBundle) && actionBundle.length > 1) {
const bundleData = actionBundle[1];
if (bundleData.signed_actions && Array.isArray(bundleData.signed_actions)) {
for (const signedAction of bundleData.signed_actions) {
if (signedAction.action && signedAction.action.type) {
const actionType = signedAction.action.type;
// For order type, count the number of orders
if (actionType === 'order' && signedAction.action.orders) {
const count = Array.isArray(signedAction.action.orders) ? signedAction.action.orders.length : 1;
actionTypeCounts[actionType] = (actionTypeCounts[actionType] || 0) + count;
} else {
// For other action types, count as 1
actionTypeCounts[actionType] = (actionTypeCounts[actionType] || 0) + 1;
}
}
}
}
}
}
}
const totalActions = Object.values(actionTypeCounts).reduce((sum, count) => sum + count, 0);
console.log('📋 Action types:');
for (const [actionType, count] of Object.entries(actionTypeCounts)) {
console.log(` • ${actionType}: ${count}`);
}
console.log(` Total actions: ${totalActions}`);
// Count order statuses (success vs error) from resps.Full section
let successCount = 0;
let errorCount = 0;
if (block.resps && block.resps.Full && Array.isArray(block.resps.Full)) {
for (const item of block.resps.Full) {
if (Array.isArray(item) && item.length > 1) {
const entries = item[1];
if (Array.isArray(entries)) {
for (const entry of entries) {
if (entry.res && entry.res.response && entry.res.response.type === 'order' && entry.res.response.data && entry.res.response.data.statuses) {
for (const status of entry.res.response.data.statuses) {
if (status.error) {
errorCount++;
} else {
successCount++;
}
}
}
}
}
}
}
}
const totalStatuses = successCount + errorCount;
console.log('\n📊 Order Statuses:');
console.log(` ✅ Success: ${successCount}`);
console.log(` ❌ Error: ${errorCount}`);
console.log(` Total statuses: ${totalStatuses}`);
console.log(`\n🔍 Match check: Actions=${totalActions}, Statuses=${totalStatuses}, Match=${totalActions === totalStatuses}`);
}
// Run it
streamBlocks();
Common Use Cases#
1. Block Explorer Backend#
class BlockExplorerService {
constructor(streamManager) {
this.streamManager = streamManager;
this.blockCache = new Map();
streamManager.on('block', (blockData) => {
this.indexBlock(blockData);
});
}
async indexBlock(blockData) {
// Store block in cache/database
this.blockCache.set(blockData.height, blockData);
// Index transactions
if (blockData.transactions) {
for (const tx of blockData.transactions) {
await this.indexTransaction(tx, blockData.height);
}
}
// Update blockchain metrics
await this.updateChainMetrics(blockData);
}
async indexTransaction(tx, blockHeight) {
// Store transaction data with block reference
// Update address balances
// Index events and logs
}
async updateChainMetrics(blockData) {
// Update total supply, validator info, etc.
}
}
2. Network Health Monitor#
class NetworkHealthMonitor:
def __init__(self, analyzer):
self.analyzer = analyzer
self.health_metrics = {
'avg_block_time': [],
'tx_throughput': [],
'block_sizes': []
}
def analyze_health(self):
"""Analyze network health from recent blocks"""
if len(self.analyzer.recent_blocks) < 10:
return None
recent = list(self.analyzer.recent_blocks)[-10:]
# Calculate average block time
time_diffs = []
for i in range(1, len(recent)):
diff = recent[i]['timestamp'] - recent[i-1]['timestamp']
time_diffs.append(diff)
avg_block_time = sum(time_diffs) / len(time_diffs)
# Calculate transaction throughput
total_txs = sum(b['tx_count'] for b in recent)
time_span = recent[-1]['timestamp'] - recent[0]['timestamp']
throughput = total_txs / time_span if time_span > 0 else 0
return {
'average_block_time': avg_block_time,
'transaction_throughput': throughput,
'health_score': self.calculate_health_score(avg_block_time, throughput)
}
def calculate_health_score(self, block_time, throughput):
"""Calculate network health score (0-100)"""
score = 100
# Penalize slow blocks
if block_time > 15:
score -= min(30, (block_time - 15) * 2)
# Penalize low throughput
if throughput < 1:
score -= 20
return max(0, score)
3. Compliance and Auditing#
type ComplianceMonitor struct {
client pb.HyperliquidL1GatewayClient
alerts chan Alert
}
type Alert struct {
Type string
Message string
BlockData interface{}
Timestamp time.Time
}
func (cm *ComplianceMonitor) MonitorCompliance(ctx context.Context) {
stream, err := cm.client.StreamBlocks(ctx, &pb.Timestamp{Timestamp: 0})
if err != nil {
log.Fatal(err)
}
for {
block, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return
}
cm.analyzeBlockForCompliance(block.Data)
}
}
func (cm *ComplianceMonitor) analyzeBlockForCompliance(data []byte) {
var blockData map[string]interface{}
json.Unmarshal(data, &blockData)
// Check for large transactions
if transactions, ok := blockData["transactions"].([]interface{}); ok {
for _, tx := range transactions {
if txData, ok := tx.(map[string]interface{}); ok {
if value, exists := txData["value"]; exists {
// Flag large transactions for review
if parseValue(value) > 1000000 { // Example threshold
cm.alerts <- Alert{
Type: "LARGE_TRANSACTION",
Message: "Transaction exceeds reporting threshold",
BlockData: txData,
Timestamp: time.Now(),
}
}
}
}
}
}
}
Error Handling and Reconnection#
class RobustBlockStreamer {
constructor(endpoint) {
this.endpoint = endpoint;
this.maxRetries = 5;
this.retryDelay = 1000; // Start with 1 second
this.currentRetries = 0;
}
async startStreamWithRetry() {
while (this.currentRetries < this.maxRetries) {
try {
await this.startStream();
this.currentRetries = 0; // Reset on successful connection
this.retryDelay = 1000;
} catch (error) {
this.currentRetries++;
console.error(`Stream attempt ${this.currentRetries} failed:`, error.message);
if (this.currentRetries >= this.maxRetries) {
throw new Error('Max retry attempts exceeded');
}
// Exponential backoff
await this.sleep(this.retryDelay);
this.retryDelay *= 2;
}
}
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Best Practices#
- Connection Management: Implement robust reconnection logic with exponential backoff
- Memory Management: Use bounded collections for storing recent blocks to prevent memory leaks
- Performance: Process blocks asynchronously to avoid blocking the stream
- Monitoring: Track stream health and performance metrics
- Error Recovery: Handle various error types (network, parsing, processing) gracefully
- Resource Cleanup: Properly close streams and connections on shutdown
Current Limitations#
- Historical Data: Cannot stream from historical timestamps; only real-time streaming available
- Data Retention: Node maintains only 24 hours of historical block data
- Backpressure: High-volume periods may require careful handling to avoid overwhelming downstream systems
Need help? Contact our support team or check the Hyperliquid gRPC documentation.