⚠️Blast API (blastapi.io) ends Oct 31. Migrate to Dwellir and skip Alchemy's expensive compute units.
Switch Today →
Skip to main content

StreamBlocks

Stream continuous block data starting from a timestamp, providing real-time access to Hyperliquid blockchain state changes.

When to Use This Method#

StreamBlocks is essential for:

  • Blockchain Monitoring - Track all network activity and state changes
  • Block Explorers - Build real-time blockchain data applications
  • Analytics Systems - Collect comprehensive blockchain metrics
  • Compliance & Auditing - Monitor all network transactions and events

Method Signature#

rpc StreamBlocks(Timestamp) returns (stream Block) {}

Request Message#

message Timestamp {   
int64 timestamp = 1;
}

Current Limitation: The timestamp parameter is not yet implemented. Streams currently start from "now" and stream forward only, regardless of the timestamp value provided.

Response Stream#

message Block {
// JSON-encoded object conforming to files of
// Hyperliquid data dir "replica_cmds"
bytes data = 1;
}

The data field contains a JSON-encoded block object with:

  • Block header information (height round, timestamp time)
  • Transaction data and execution results
  • State changes and events
  • Validator signatures and consensus data
    • Note: The sample format does not include a top-level block hash; bundle hashes are present per signed_action_bundles.

Full Block Spec#

StreamBlocks emits a JSON payload matching Hyperliquid's replica_cmds BlockData. Below is the exact structure.

Top‑level keys (always present):

{
"abci_block": {
"time": "2025-09-08T06:41:57.997372546", // ISO-8601 with nanosecond precision
"round": 992814678, // Current block number (always incrementing)
"parent_round": 992814677, // Previous block (always round - 1)
"proposer": "0x5ac99df645f3414876c816caa18b2d234024b487", // 40 hex chars, lowercase
"hardfork": {
"version": 57, // Protocol version number
"round": 990500929 // Block when this version activated
},
"signed_action_bundles": [ // Array of [hash, bundle_data] pairs
[
"0xb4b1f5a9c233f9d90fd24b9961fd12708b36cc3d56f8fda47f32b667ee8d1227", // Bundle hash (64 hex)
{
"signed_actions": [ // Array of transactions in this bundle
{
"signature": {
"r": "0xd931f13565ae66c3bc41a05da4180bb795dbd9ed2d365efaf639fd23b3774ac6",
"s": "0x4a7a0534bf0a4238dfe404a88d335ab4c9b8222909100d773635e328d2ab864c",
"v": 27 // Recovery ID: always 27 or 28
},
"action": {
"type": "order", // Action type (order/cancel/evmRawTx/etc)
"orders": [{ // Type-specific payload
"a": 170, // Asset ID
"b": true, // Buy=true, Sell=false
"p": "0.038385", // Price as string
"s": "1514", // Size as string
"r": false, // Reduce-only flag
"t": {
"limit": {
"tif": "Ioc" // Time-in-force: Ioc/Alo/Gtc
}
},
"c": "0x7192c49bcadb32d394e38617ea99cc09" // Client order ID
}]
},
"nonce": 1757313597362 // Unique transaction nonce
}
// ... more signed_actions
],
"broadcaster": "0x67e451964e0421f6e7d07be784f35c530667c2b3", // Who sent bundle
"broadcaster_nonce": 1757313597367 // Bundle-level nonce
}
]
// ... more bundles (typically 1-6 total)
]
},
"resps": {
"Full": [ // Matches signed_action_bundles structure
[
"0xb4b1f5a9c233f9d90fd24b9961fd12708b36cc3d56f8fda47f32b667ee8d1227", // Same bundle hash
[ // One response per signed_action
{
"user": "0xecb63caa47c7c4e77f60f1ce858cf28dc2b82b00", // Address of action signer
"res": {
"status": "ok", // "ok" or "err"
"response": {
"type": "order", // Response type
"data": {
"statuses": [{
"filled": { // Order state: filled/resting/error
"totalSz": "1514.0", // Filled size
"avgPx": "0.038385", // Average fill price
"oid": 156190414943, // Order ID assigned
"cloid": "0x7192c49bcadb32d394e38617ea99cc09" // Client order ID
}
}]
}
}
}
}
// ... more responses (one per action)
]
]
// ... more response bundles (matches signed_action_bundles count)
]
}
}

Bundle entry (abci_block.signed_action_bundles[i]):

[
"0x...", // bundle_hash
{
"signed_actions": [ /* SignedAction */ ],
"broadcaster": "0x...",
"broadcaster_nonce": 1757313597367
}
]

SignedAction envelope (common fields across all actions):

{
"signature": { "r": "0x...", "s": "0x...", "v": 27 },
"action": { "type": "order" }, // one of: order | cancel | cancelByCloid | batchModify | evmRawTx
"nonce": 1757313597362,
"vaultAddress": "0x...", // optional
"expiresAfter": 1757313718705 // optional
}

Action payloads (in action):

  • type: "order"
{
"type": "order",
"orders": [
{
"a": 204, // asset id
"b": true, // buy (true) / sell (false)
"p": "0.20847", // price (string)
"s": "1866", // size (string)
"r": false, // reduce-only
"t": { "limit": { "tif": "Gtc" } }, // tif: Gtc | Ioc | Alo
"c": "0x..." // optional client order id
}
],
"grouping": "na"
}
  • type: "cancel"
{ "type": "cancel", "cancels": [ { "a": 204, "o": 156190390863 } ] }
  • type: "cancelByCloid"
{ "type": "cancelByCloid", "cancels": [ { "asset": 159, "cloid": "0x..." } ] }
  • type: "batchModify"
{
"type": "batchModify",
"modifies": [
{
"oid": 156190366007,
"order": { "a": 8, "b": true, "p": "0.58625", "s": "40.2", "r": false, "t": { "limit": { "tif": "Alo" } }, "c": "0x..." }
}
]
}
  • type: "evmRawTx"
{ "type": "evmRawTx", "data": "0x..." }

Execution responses (resps.Full):

[
"0xBUNDLE_HASH",
[
{
"user": "0x...",
"res": {
"status": "ok", // or "err"
"response": {
"type": "order", // e.g., order | cancel | default
"data": {
"statuses": [
{ "filled": { "totalSz": "1514.0", "avgPx": "0.038385", "oid": 156190414943, "cloid": "0x..." } },
{ "resting": { "oid": 156190414944, "cloid": "0x..." } },
{ "error": "..." }
]
}
}
}
}
]
]

Guarantees and alignment:

  • abci_block and resps.Full are always present.
  • resps.Full.length === abci_block.signed_action_bundles.length.
  • For each bundle, responses.length === signed_actions.length.
  • Height is abci_block.round, parent is abci_block.parent_round.
  • Timestamp abci_block.time is ISO‑8601 with nanosecond precision (treat as UTC if no suffix).

Developer tips:

  • Dispatch on action.type and handle new types defensively.
  • Store bundle_hash as the stable join key between actions and responses.
  • Normalize prices/sizes (strings) to numeric types as appropriate.

Implementation Examples#

package main

import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"time"

pb "hyperliquid-grpc-client/api"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

func streamBlocks() {
const endpoint = "hl-cendars.n.dwellir.com"

fmt.Println("🚀 Hyperliquid Go gRPC Client - Stream Blocks")
fmt.Println("===========================================")
fmt.Printf("📡 Endpoint: %s\n\n", endpoint)

// Set up TLS connection
creds := credentials.NewTLS(nil)

// Connection options
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(150 * 1024 * 1024), // 150MB max
),
}

// Connect to server
fmt.Println("🔌 Connecting to gRPC server...")
conn, err := grpc.NewClient(endpoint, opts...)
if err != nil {
log.Fatalf("❌ Failed to connect: %v", err)
}
defer conn.Close()

// Create the client using generated code
client := pb.NewHyperLiquidL1GatewayClient(conn)
fmt.Println("✅ Connected successfully!\n")

// Create context without timeout for continuous streaming
ctx := context.Background()

// Create request - 0 means latest/current blocks
request := &pb.Timestamp{
Timestamp: 0,
}

fmt.Println("📥 Starting block stream...")
fmt.Println("Press Ctrl+C to stop streaming\n")

// Start streaming blocks
stream, err := client.StreamBlocks(ctx, request)
if err != nil {
log.Fatalf("❌ Failed to start stream: %v", err)
}

blockCount := 0
for {
response, err := stream.Recv()
if err != nil {
fmt.Printf("❌ Stream ended: %v\n", err)
break
}

blockCount++
fmt.Printf("\n===== BLOCK #%d =====\n", blockCount)
fmt.Printf("📦 Response size: %d bytes\n", len(response.Data))

// Process each block
processBlock(response.Data, blockCount)

fmt.Println("\n" + strings.Repeat("─", 50))
}

fmt.Printf("\n📊 Total blocks received: %d\n", blockCount)
}

func processBlock(data []byte, blockNum int) {
// Parse JSON
var block map[string]interface{}
if err := json.Unmarshal(data, &block); err != nil {
fmt.Printf("❌ Failed to parse JSON: %v\n", err)
fmt.Printf("Raw data (first 200 bytes): %.200s\n", data)
return
}

fmt.Printf("🧱 BLOCK #%d DETAILS\n", blockNum)
fmt.Println("===================")

// Display block height
if height, ok := block["height"].(float64); ok {
fmt.Printf("📏 Height: %.0f\n", height)
}

// Display timestamp
if timestamp, ok := block["time"].(float64); ok {
t := time.Unix(int64(timestamp/1000), 0)
fmt.Printf("⏰ Time: %s\n", t.Format("2006-01-02 15:04:05 UTC"))
}

// Display hash if available
if hash, ok := block["hash"].(string); ok {
fmt.Printf("🔗 Hash: %s\n", hash)
}

// Display number of transactions
if txs, ok := block["txs"].([]interface{}); ok {
fmt.Printf("📋 Transactions: %d\n", len(txs))

// Show first few transaction details
maxTxs := 3
if len(txs) < maxTxs {
maxTxs = len(txs)
}

for i := 0; i < maxTxs; i++ {
if tx, ok := txs[i].(map[string]interface{}); ok {
fmt.Printf(" • TX %d: ", i+1)
if txType, ok := tx["type"].(string); ok {
fmt.Printf("Type: %s", txType)
}
if txHash, ok := tx["hash"].(string); ok {
fmt.Printf(", Hash: %.12s...", txHash)
}
fmt.Println()
}
}

if len(txs) > maxTxs {
fmt.Printf(" ... and %d more transactions\n", len(txs)-maxTxs)
}
}

// Display any other interesting fields
fmt.Printf("\n📊 Block Summary:\n")
for key, value := range block {
switch key {
case "height", "time", "hash", "txs":
// Already displayed above
continue
default:
// Display other fields
fmt.Printf("• %s: %v\n", key, value)
}
}
}

func runStreamBlocks() {
streamBlocks()
}

func main() {
streamBlocks()
}

Common Use Cases#

1. Block Explorer Backend#

class BlockExplorerService {
constructor(streamManager) {
this.streamManager = streamManager;
this.blockCache = new Map();

streamManager.on('block', (blockData) => {
this.indexBlock(blockData);
});
}

async indexBlock(blockData) {
// Store block in cache/database
this.blockCache.set(blockData.height, blockData);

// Index transactions
if (blockData.transactions) {
for (const tx of blockData.transactions) {
await this.indexTransaction(tx, blockData.height);
}
}

// Update blockchain metrics
await this.updateChainMetrics(blockData);
}

async indexTransaction(tx, blockHeight) {
// Store transaction data with block reference
// Update address balances
// Index events and logs
}

async updateChainMetrics(blockData) {
// Update total supply, validator info, etc.
}
}

2. Network Health Monitor#

class NetworkHealthMonitor:
def __init__(self, analyzer):
self.analyzer = analyzer
self.health_metrics = {
'avg_block_time': [],
'tx_throughput': [],
'block_sizes': []
}

def analyze_health(self):
"""Analyze network health from recent blocks"""
if len(self.analyzer.recent_blocks) < 10:
return None

recent = list(self.analyzer.recent_blocks)[-10:]

# Calculate average block time
time_diffs = []
for i in range(1, len(recent)):
diff = recent[i]['timestamp'] - recent[i-1]['timestamp']
time_diffs.append(diff)

avg_block_time = sum(time_diffs) / len(time_diffs)

# Calculate transaction throughput
total_txs = sum(b['tx_count'] for b in recent)
time_span = recent[-1]['timestamp'] - recent[0]['timestamp']
throughput = total_txs / time_span if time_span > 0 else 0

return {
'average_block_time': avg_block_time,
'transaction_throughput': throughput,
'health_score': self.calculate_health_score(avg_block_time, throughput)
}

def calculate_health_score(self, block_time, throughput):
"""Calculate network health score (0-100)"""
score = 100

# Penalize slow blocks
if block_time > 15:
score -= min(30, (block_time - 15) * 2)

# Penalize low throughput
if throughput < 1:
score -= 20

return max(0, score)

3. Compliance and Auditing#

type ComplianceMonitor struct {
client pb.HyperliquidL1GatewayClient
alerts chan Alert
}

type Alert struct {
Type string
Message string
BlockData interface{}
Timestamp time.Time
}

func (cm *ComplianceMonitor) MonitorCompliance(ctx context.Context) {
stream, err := cm.client.StreamBlocks(ctx, &pb.Timestamp{Timestamp: 0})
if err != nil {
log.Fatal(err)
}

for {
block, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return
}

cm.analyzeBlockForCompliance(block.Data)
}
}

func (cm *ComplianceMonitor) analyzeBlockForCompliance(data []byte) {
var blockData map[string]interface{}
json.Unmarshal(data, &blockData)

// Check for large transactions
if transactions, ok := blockData["transactions"].([]interface{}); ok {
for _, tx := range transactions {
if txData, ok := tx.(map[string]interface{}); ok {
if value, exists := txData["value"]; exists {
// Flag large transactions for review
if parseValue(value) > 1000000 { // Example threshold
cm.alerts <- Alert{
Type: "LARGE_TRANSACTION",
Message: "Transaction exceeds reporting threshold",
BlockData: txData,
Timestamp: time.Now(),
}
}
}
}
}
}
}

Error Handling and Reconnection#

class RobustBlockStreamer {
constructor(endpoint) {
this.endpoint = endpoint;
this.maxRetries = 5;
this.retryDelay = 1000; // Start with 1 second
this.currentRetries = 0;
}

async startStreamWithRetry() {
while (this.currentRetries < this.maxRetries) {
try {
await this.startStream();
this.currentRetries = 0; // Reset on successful connection
this.retryDelay = 1000;
} catch (error) {
this.currentRetries++;
console.error(`Stream attempt ${this.currentRetries} failed:`, error.message);

if (this.currentRetries >= this.maxRetries) {
throw new Error('Max retry attempts exceeded');
}

// Exponential backoff
await this.sleep(this.retryDelay);
this.retryDelay *= 2;
}
}
}

sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

Best Practices#

  1. Connection Management: Implement robust reconnection logic with exponential backoff
  2. Memory Management: Use bounded collections for storing recent blocks to prevent memory leaks
  3. Performance: Process blocks asynchronously to avoid blocking the stream
  4. Monitoring: Track stream health and performance metrics
  5. Error Recovery: Handle various error types (network, parsing, processing) gracefully
  6. Resource Cleanup: Properly close streams and connections on shutdown

Current Limitations#

  • Historical Data: Cannot stream from historical timestamps; only real-time streaming available
  • Data Retention: Node maintains only 24 hours of historical block data
  • Backpressure: High-volume periods may require careful handling to avoid overwhelming downstream systems

Need help? Contact our support team or check the Hyperliquid gRPC documentation.