⚠️Blast API (blastapi.io) ends Oct 31. Migrate to Dwellir and skip Alchemy's expensive compute units.
Switch Today β†’
Skip to main content

StreamBlocks

Stream continuous block data starting from a timestamp, providing real-time access to Hyperliquid blockchain state changes.

When to Use This Method​

StreamBlocks is essential for:

  • Blockchain Monitoring - Track all network activity and state changes
  • Block Explorers - Build real-time blockchain data applications
  • Analytics Systems - Collect comprehensive blockchain metrics
  • Compliance & Auditing - Monitor all network transactions and events

Method Signature​

rpc StreamBlocks(Timestamp) returns (stream Block) {}

Request Message​

message Timestamp {   
int64 timestamp = 1;
}

Current Limitation: The timestamp parameter is not yet implemented. Streams currently start from "now" and stream forward only, regardless of the timestamp value provided.

Response Stream​

message Block {
// JSON-encoded object conforming to files of
// Hyperliquid data dir "replica_cmds"
bytes data = 1;
}

The data field contains a JSON-encoded block object with:

  • Block header information (height round, timestamp time)
  • Transaction data and execution results
  • State changes and events
  • Validator signatures and consensus data
    • Note: The sample format does not include a top-level block hash; bundle hashes are present per signed_action_bundles.

Full Block Spec​

StreamBlocks emits a JSON payload matching Hyperliquid's replica_cmds BlockData. Below is the exact structure.

Top‑level keys (always present):

{
"abci_block": {
"time": "2025-09-08T06:41:57.997372546", // ISO-8601 with nanosecond precision
"round": 992814678, // Current block number (always incrementing)
"parent_round": 992814677, // Previous block (always round - 1)
"proposer": "0x5ac99df645f3414876c816caa18b2d234024b487", // 40 hex chars, lowercase
"hardfork": {
"version": 57, // Protocol version number
"round": 990500929 // Block when this version activated
},
"signed_action_bundles": [ // Array of [hash, bundle_data] pairs
[
"0xb4b1f5a9c233f9d90fd24b9961fd12708b36cc3d56f8fda47f32b667ee8d1227", // Bundle hash (64 hex)
{
"signed_actions": [ // Array of transactions in this bundle
{
"signature": {
"r": "0xd931f13565ae66c3bc41a05da4180bb795dbd9ed2d365efaf639fd23b3774ac6",
"s": "0x4a7a0534bf0a4238dfe404a88d335ab4c9b8222909100d773635e328d2ab864c",
"v": 27 // Recovery ID: always 27 or 28
},
"action": {
"type": "order", // Action type (order/cancel/evmRawTx/etc)
"orders": [{ // Type-specific payload
"a": 170, // Asset ID
"b": true, // Buy=true, Sell=false
"p": "0.038385", // Price as string
"s": "1514", // Size as string
"r": false, // Reduce-only flag
"t": {
"limit": {
"tif": "Ioc" // Time-in-force: Ioc/Alo/Gtc
}
},
"c": "0x7192c49bcadb32d394e38617ea99cc09" // Client order ID
}]
},
"nonce": 1757313597362 // Unique transaction nonce
}
// ... more signed_actions
],
"broadcaster": "0x67e451964e0421f6e7d07be784f35c530667c2b3", // Who sent bundle
"broadcaster_nonce": 1757313597367 // Bundle-level nonce
}
]
// ... more bundles (typically 1-6 total)
]
},
"resps": {
"Full": [ // Matches signed_action_bundles structure
[
"0xb4b1f5a9c233f9d90fd24b9961fd12708b36cc3d56f8fda47f32b667ee8d1227", // Same bundle hash
[ // One response per signed_action
{
"user": "0xecb63caa47c7c4e77f60f1ce858cf28dc2b82b00", // Address of action signer
"res": {
"status": "ok", // "ok" or "err"
"response": {
"type": "order", // Response type
"data": {
"statuses": [{
"filled": { // Order state: filled/resting/error
"totalSz": "1514.0", // Filled size
"avgPx": "0.038385", // Average fill price
"oid": 156190414943, // Order ID assigned
"cloid": "0x7192c49bcadb32d394e38617ea99cc09" // Client order ID
}
}]
}
}
}
}
// ... more responses (one per action)
]
]
// ... more response bundles (matches signed_action_bundles count)
]
}
}

Bundle entry (abci_block.signed_action_bundles[i]):

[
"0x...", // bundle_hash
{
"signed_actions": [ /* SignedAction */ ],
"broadcaster": "0x...",
"broadcaster_nonce": 1757313597367
}
]

SignedAction envelope (common fields across all actions):

{
"signature": { "r": "0x...", "s": "0x...", "v": 27 },
"action": { "type": "order" }, // one of: order | cancel | cancelByCloid | batchModify | evmRawTx
"nonce": 1757313597362,
"vaultAddress": "0x...", // optional
"expiresAfter": 1757313718705 // optional
}

Action payloads (in action):

  • type: "order"
{
"type": "order",
"orders": [
{
"a": 204, // asset id
"b": true, // buy (true) / sell (false)
"p": "0.20847", // price (string)
"s": "1866", // size (string)
"r": false, // reduce-only
"t": { "limit": { "tif": "Gtc" } }, // tif: Gtc | Ioc | Alo
"c": "0x..." // optional client order id
}
],
"grouping": "na"
}
  • type: "cancel"
{ "type": "cancel", "cancels": [ { "a": 204, "o": 156190390863 } ] }
  • type: "cancelByCloid"
{ "type": "cancelByCloid", "cancels": [ { "asset": 159, "cloid": "0x..." } ] }
  • type: "batchModify"
{
"type": "batchModify",
"modifies": [
{
"oid": 156190366007,
"order": { "a": 8, "b": true, "p": "0.58625", "s": "40.2", "r": false, "t": { "limit": { "tif": "Alo" } }, "c": "0x..." }
}
]
}
  • type: "evmRawTx"
{ "type": "evmRawTx", "data": "0x..." }

Execution responses (resps.Full):

[
"0xBUNDLE_HASH",
[
{
"user": "0x...",
"res": {
"status": "ok", // or "err"
"response": {
"type": "order", // e.g., order | cancel | default
"data": {
"statuses": [
{ "filled": { "totalSz": "1514.0", "avgPx": "0.038385", "oid": 156190414943, "cloid": "0x..." } },
{ "resting": { "oid": 156190414944, "cloid": "0x..." } },
{ "error": "..." }
]
}
}
}
}
]
]

Guarantees and alignment:

  • abci_block and resps.Full are always present.
  • resps.Full.length === abci_block.signed_action_bundles.length.
  • For each bundle, responses.length === signed_actions.length.
  • Height is abci_block.round, parent is abci_block.parent_round.
  • Timestamp abci_block.time is ISO‑8601 with nanosecond precision (treat as UTC if no suffix).

Developer tips:

  • Dispatch on action.type and handle new types defensively.
  • Store bundle_hash as the stable join key between actions and responses.
  • Normalize prices/sizes (strings) to numeric types as appropriate.

Implementation Examples​

package main

import (
"context"
"encoding/json"
"log"
"time"

pb "your-project/hyperliquid_l1_gateway/v1"
"google.golang.org/grpc"
)

type BlockData struct {
Height int64 `json:"height"`
Hash string `json:"hash"`
Timestamp int64 `json:"timestamp"`
TxCount int `json:"tx_count"`
// Add other fields based on actual block structure
}

type BlockProcessor struct {
client pb.HyperliquidL1GatewayClient
stats *BlockStats
}

type BlockStats struct {
BlocksProcessed int64
LastBlockHeight int64
LastBlockTime time.Time
TotalTxs int64
}

func NewBlockProcessor(client pb.HyperliquidL1GatewayClient) *BlockProcessor {
return &BlockProcessor{
client: client,
stats: &BlockStats{},
}
}

func (bp *BlockProcessor) StreamBlocks(ctx context.Context) error {
// Start streaming from now
stream, err := bp.client.StreamBlocks(
ctx,
&pb.Timestamp{Timestamp: 0},
)
if err != nil {
return err
}

log.Println("Starting block stream...")

for {
select {
case <-ctx.Done():
log.Println("Stream context cancelled")
return ctx.Err()
default:
block, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return err
}

if err := bp.processBlock(block.Data); err != nil {
log.Printf("Failed to process block: %v", err)
continue
}
}
}
}

func (bp *BlockProcessor) processBlock(data []byte) error {
var blockData BlockData
if err := json.Unmarshal(data, &blockData); err != nil {
return err
}

// Update statistics
bp.stats.BlocksProcessed++
bp.stats.LastBlockHeight = blockData.Height
bp.stats.LastBlockTime = time.Unix(blockData.Timestamp, 0)
bp.stats.TotalTxs += int64(blockData.TxCount)

// Process block data
log.Printf("Block #%d: %d transactions, hash: %s",
blockData.Height, blockData.TxCount, blockData.Hash)

// Add custom block processing logic here
bp.analyzeBlock(&blockData)

return nil
}

func (bp *BlockProcessor) analyzeBlock(block *BlockData) {
// Example: detect large blocks
if block.TxCount > 100 {
log.Printf("Large block detected: %d transactions", block.TxCount)
}

// Example: check block time intervals
if bp.stats.BlocksProcessed > 1 {
prevTime := bp.stats.LastBlockTime.Add(-time.Duration(1) * time.Second)
interval := time.Since(prevTime)
if interval > 15*time.Second {
log.Printf("Slow block: %v interval", interval)
}
}
}

func (bp *BlockProcessor) GetStats() *BlockStats {
return bp.stats
}

func main() {
// Connect to Hyperliquid L1 Gateway
conn, err := grpc.Dial(
"<YOUR_HYPERLIQUID_ENDPOINT>", // Contact support@dwellir.com
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()

client := pb.NewHyperliquidL1GatewayClient(conn)
processor := NewBlockProcessor(client)

// Start streaming with context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
if err := processor.StreamBlocks(ctx); err != nil {
log.Printf("Stream ended: %v", err)
}
}()

// Print stats periodically
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
stats := processor.GetStats()
log.Printf("Stats - Blocks: %d, Height: %d, Total TXs: %d",
stats.BlocksProcessed, stats.LastBlockHeight, stats.TotalTxs)
}
}
}

Common Use Cases​

1. Block Explorer Backend​

class BlockExplorerService {
constructor(streamManager) {
this.streamManager = streamManager;
this.blockCache = new Map();

streamManager.on('block', (blockData) => {
this.indexBlock(blockData);
});
}

async indexBlock(blockData) {
// Store block in cache/database
this.blockCache.set(blockData.height, blockData);

// Index transactions
if (blockData.transactions) {
for (const tx of blockData.transactions) {
await this.indexTransaction(tx, blockData.height);
}
}

// Update blockchain metrics
await this.updateChainMetrics(blockData);
}

async indexTransaction(tx, blockHeight) {
// Store transaction data with block reference
// Update address balances
// Index events and logs
}

async updateChainMetrics(blockData) {
// Update total supply, validator info, etc.
}
}

2. Network Health Monitor​

class NetworkHealthMonitor:
def __init__(self, analyzer):
self.analyzer = analyzer
self.health_metrics = {
'avg_block_time': [],
'tx_throughput': [],
'block_sizes': []
}

def analyze_health(self):
"""Analyze network health from recent blocks"""
if len(self.analyzer.recent_blocks) < 10:
return None

recent = list(self.analyzer.recent_blocks)[-10:]

# Calculate average block time
time_diffs = []
for i in range(1, len(recent)):
diff = recent[i]['timestamp'] - recent[i-1]['timestamp']
time_diffs.append(diff)

avg_block_time = sum(time_diffs) / len(time_diffs)

# Calculate transaction throughput
total_txs = sum(b['tx_count'] for b in recent)
time_span = recent[-1]['timestamp'] - recent[0]['timestamp']
throughput = total_txs / time_span if time_span > 0 else 0

return {
'average_block_time': avg_block_time,
'transaction_throughput': throughput,
'health_score': self.calculate_health_score(avg_block_time, throughput)
}

def calculate_health_score(self, block_time, throughput):
"""Calculate network health score (0-100)"""
score = 100

# Penalize slow blocks
if block_time > 15:
score -= min(30, (block_time - 15) * 2)

# Penalize low throughput
if throughput < 1:
score -= 20

return max(0, score)

3. Compliance and Auditing​

type ComplianceMonitor struct {
client pb.HyperliquidL1GatewayClient
alerts chan Alert
}

type Alert struct {
Type string
Message string
BlockData interface{}
Timestamp time.Time
}

func (cm *ComplianceMonitor) MonitorCompliance(ctx context.Context) {
stream, err := cm.client.StreamBlocks(ctx, &pb.Timestamp{Timestamp: 0})
if err != nil {
log.Fatal(err)
}

for {
block, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return
}

cm.analyzeBlockForCompliance(block.Data)
}
}

func (cm *ComplianceMonitor) analyzeBlockForCompliance(data []byte) {
var blockData map[string]interface{}
json.Unmarshal(data, &blockData)

// Check for large transactions
if transactions, ok := blockData["transactions"].([]interface{}); ok {
for _, tx := range transactions {
if txData, ok := tx.(map[string]interface{}); ok {
if value, exists := txData["value"]; exists {
// Flag large transactions for review
if parseValue(value) > 1000000 { // Example threshold
cm.alerts <- Alert{
Type: "LARGE_TRANSACTION",
Message: "Transaction exceeds reporting threshold",
BlockData: txData,
Timestamp: time.Now(),
}
}
}
}
}
}
}

Error Handling and Reconnection​

class RobustBlockStreamer {
constructor(endpoint) {
this.endpoint = endpoint;
this.maxRetries = 5;
this.retryDelay = 1000; // Start with 1 second
this.currentRetries = 0;
}

async startStreamWithRetry() {
while (this.currentRetries < this.maxRetries) {
try {
await this.startStream();
this.currentRetries = 0; // Reset on successful connection
this.retryDelay = 1000;
} catch (error) {
this.currentRetries++;
console.error(`Stream attempt ${this.currentRetries} failed:`, error.message);

if (this.currentRetries >= this.maxRetries) {
throw new Error('Max retry attempts exceeded');
}

// Exponential backoff
await this.sleep(this.retryDelay);
this.retryDelay *= 2;
}
}
}

sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

Best Practices​

  1. Connection Management: Implement robust reconnection logic with exponential backoff
  2. Memory Management: Use bounded collections for storing recent blocks to prevent memory leaks
  3. Performance: Process blocks asynchronously to avoid blocking the stream
  4. Monitoring: Track stream health and performance metrics
  5. Error Recovery: Handle various error types (network, parsing, processing) gracefully
  6. Resource Cleanup: Properly close streams and connections on shutdown

Current Limitations​

  • Historical Data: Cannot stream from historical timestamps; only real-time streaming available
  • Data Retention: Node maintains only 24 hours of historical block data
  • Backpressure: High-volume periods may require careful handling to avoid overwhelming downstream systems

Need help? Contact our support team or check the Hyperliquid gRPC documentation.