⚠️Blast API (blastapi.io) ends Oct 31. Migrate to Dwellir and skip Alchemy's expensive compute units.
Switch Today →
Skip to main content

StreamBlocks

Stream continuous block data starting from a timestamp, providing real-time access to Hyperliquid blockchain state changes.

When to Use This Method

StreamBlocks is essential for:

  • Blockchain Monitoring - Track all network activity and state changes
  • Block Explorers - Build real-time blockchain data applications
  • Analytics Systems - Collect comprehensive blockchain metrics
  • Compliance & Auditing - Monitor all network transactions and events

Method Signature

rpc StreamBlocks(Timestamp) returns (stream Block) {}

Request Message

message Timestamp {   
int64 timestamp = 1;
}

Current Limitation: The timestamp parameter is not yet implemented. Streams currently start from "now" and stream forward only, regardless of the timestamp value provided.

Response Stream

message Block {
// JSON-encoded object conforming to files of
// Hyperliquid data dir "replica_cmds"
bytes data = 1;
}

The data field contains a JSON-encoded block object with:

  • Block header information (height, timestamp, hash)
  • Transaction data and execution results
  • State changes and events
  • Validator signatures and consensus data

Implementation Examples

package main

import (
"context"
"encoding/json"
"log"
"time"

pb "your-project/hyperliquid_l1_gateway/v1"
"google.golang.org/grpc"
)

type BlockData struct {
Height int64 `json:"height"`
Hash string `json:"hash"`
Timestamp int64 `json:"timestamp"`
TxCount int `json:"tx_count"`
// Add other fields based on actual block structure
}

type BlockProcessor struct {
client pb.HyperliquidL1GatewayClient
stats *BlockStats
}

type BlockStats struct {
BlocksProcessed int64
LastBlockHeight int64
LastBlockTime time.Time
TotalTxs int64
}

func NewBlockProcessor(client pb.HyperliquidL1GatewayClient) *BlockProcessor {
return &BlockProcessor{
client: client,
stats: &BlockStats{},
}
}

func (bp *BlockProcessor) StreamBlocks(ctx context.Context) error {
// Start streaming from now
stream, err := bp.client.StreamBlocks(
ctx,
&pb.Timestamp{Timestamp: 0},
)
if err != nil {
return err
}

log.Println("Starting block stream...")

for {
select {
case <-ctx.Done():
log.Println("Stream context cancelled")
return ctx.Err()
default:
block, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return err
}

if err := bp.processBlock(block.Data); err != nil {
log.Printf("Failed to process block: %v", err)
continue
}
}
}
}

func (bp *BlockProcessor) processBlock(data []byte) error {
var blockData BlockData
if err := json.Unmarshal(data, &blockData); err != nil {
return err
}

// Update statistics
bp.stats.BlocksProcessed++
bp.stats.LastBlockHeight = blockData.Height
bp.stats.LastBlockTime = time.Unix(blockData.Timestamp, 0)
bp.stats.TotalTxs += int64(blockData.TxCount)

// Process block data
log.Printf("Block #%d: %d transactions, hash: %s",
blockData.Height, blockData.TxCount, blockData.Hash)

// Add custom block processing logic here
bp.analyzeBlock(&blockData)

return nil
}

func (bp *BlockProcessor) analyzeBlock(block *BlockData) {
// Example: detect large blocks
if block.TxCount > 100 {
log.Printf("Large block detected: %d transactions", block.TxCount)
}

// Example: check block time intervals
if bp.stats.BlocksProcessed > 1 {
prevTime := bp.stats.LastBlockTime.Add(-time.Duration(1) * time.Second)
interval := time.Since(prevTime)
if interval > 15*time.Second {
log.Printf("Slow block: %v interval", interval)
}
}
}

func (bp *BlockProcessor) GetStats() *BlockStats {
return bp.stats
}

func main() {
// Connect to Hyperliquid L1 Gateway
conn, err := grpc.Dial(
"<YOUR_HYPERLIQUID_ENDPOINT>", // Contact support@dwellir.com
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()

client := pb.NewHyperliquidL1GatewayClient(conn)
processor := NewBlockProcessor(client)

// Start streaming with context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
if err := processor.StreamBlocks(ctx); err != nil {
log.Printf("Stream ended: %v", err)
}
}()

// Print stats periodically
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
stats := processor.GetStats()
log.Printf("Stats - Blocks: %d, Height: %d, Total TXs: %d",
stats.BlocksProcessed, stats.LastBlockHeight, stats.TotalTxs)
}
}
}

Common Use Cases

1. Block Explorer Backend

class BlockExplorerService {
constructor(streamManager) {
this.streamManager = streamManager;
this.blockCache = new Map();

streamManager.on('block', (blockData) => {
this.indexBlock(blockData);
});
}

async indexBlock(blockData) {
// Store block in cache/database
this.blockCache.set(blockData.height, blockData);

// Index transactions
if (blockData.transactions) {
for (const tx of blockData.transactions) {
await this.indexTransaction(tx, blockData.height);
}
}

// Update blockchain metrics
await this.updateChainMetrics(blockData);
}

async indexTransaction(tx, blockHeight) {
// Store transaction data with block reference
// Update address balances
// Index events and logs
}

async updateChainMetrics(blockData) {
// Update total supply, validator info, etc.
}
}

2. Network Health Monitor

class NetworkHealthMonitor:
def __init__(self, analyzer):
self.analyzer = analyzer
self.health_metrics = {
'avg_block_time': [],
'tx_throughput': [],
'block_sizes': []
}

def analyze_health(self):
"""Analyze network health from recent blocks"""
if len(self.analyzer.recent_blocks) < 10:
return None

recent = list(self.analyzer.recent_blocks)[-10:]

# Calculate average block time
time_diffs = []
for i in range(1, len(recent)):
diff = recent[i]['timestamp'] - recent[i-1]['timestamp']
time_diffs.append(diff)

avg_block_time = sum(time_diffs) / len(time_diffs)

# Calculate transaction throughput
total_txs = sum(b['tx_count'] for b in recent)
time_span = recent[-1]['timestamp'] - recent[0]['timestamp']
throughput = total_txs / time_span if time_span > 0 else 0

return {
'average_block_time': avg_block_time,
'transaction_throughput': throughput,
'health_score': self.calculate_health_score(avg_block_time, throughput)
}

def calculate_health_score(self, block_time, throughput):
"""Calculate network health score (0-100)"""
score = 100

# Penalize slow blocks
if block_time > 15:
score -= min(30, (block_time - 15) * 2)

# Penalize low throughput
if throughput < 1:
score -= 20

return max(0, score)

3. Compliance and Auditing

type ComplianceMonitor struct {
client pb.HyperliquidL1GatewayClient
alerts chan Alert
}

type Alert struct {
Type string
Message string
BlockData interface{}
Timestamp time.Time
}

func (cm *ComplianceMonitor) MonitorCompliance(ctx context.Context) {
stream, err := cm.client.StreamBlocks(ctx, &pb.Timestamp{Timestamp: 0})
if err != nil {
log.Fatal(err)
}

for {
block, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return
}

cm.analyzeBlockForCompliance(block.Data)
}
}

func (cm *ComplianceMonitor) analyzeBlockForCompliance(data []byte) {
var blockData map[string]interface{}
json.Unmarshal(data, &blockData)

// Check for large transactions
if transactions, ok := blockData["transactions"].([]interface{}); ok {
for _, tx := range transactions {
if txData, ok := tx.(map[string]interface{}); ok {
if value, exists := txData["value"]; exists {
// Flag large transactions for review
if parseValue(value) > 1000000 { // Example threshold
cm.alerts <- Alert{
Type: "LARGE_TRANSACTION",
Message: "Transaction exceeds reporting threshold",
BlockData: txData,
Timestamp: time.Now(),
}
}
}
}
}
}
}

Error Handling and Reconnection

class RobustBlockStreamer {
constructor(endpoint) {
this.endpoint = endpoint;
this.maxRetries = 5;
this.retryDelay = 1000; // Start with 1 second
this.currentRetries = 0;
}

async startStreamWithRetry() {
while (this.currentRetries < this.maxRetries) {
try {
await this.startStream();
this.currentRetries = 0; // Reset on successful connection
this.retryDelay = 1000;
} catch (error) {
this.currentRetries++;
console.error(`Stream attempt ${this.currentRetries} failed:`, error.message);

if (this.currentRetries >= this.maxRetries) {
throw new Error('Max retry attempts exceeded');
}

// Exponential backoff
await this.sleep(this.retryDelay);
this.retryDelay *= 2;
}
}
}

sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

Best Practices

  1. Connection Management: Implement robust reconnection logic with exponential backoff
  2. Memory Management: Use bounded collections for storing recent blocks to prevent memory leaks
  3. Performance: Process blocks asynchronously to avoid blocking the stream
  4. Monitoring: Track stream health and performance metrics
  5. Error Recovery: Handle various error types (network, parsing, processing) gracefully
  6. Resource Cleanup: Properly close streams and connections on shutdown

Current Limitations

  • Historical Data: Cannot stream from historical timestamps; only real-time streaming available
  • Data Retention: Node maintains only 24 hours of historical block data
  • Backpressure: High-volume periods may require careful handling to avoid overwhelming downstream systems

Need help? Contact our support team or check the Hyperliquid gRPC documentation.