Skip to main content

StreamOrderbookSnapshots

Stream continuous order book snapshots starting from a position, providing real-time access to market depth and liquidity data on Hyperliquid.

Full Code Examples

Clone our gRPC Code Examples Repository for complete, runnable implementations in Go, Python, and Node.js.

When to Use This Method#

StreamOrderbookSnapshots is essential for:

  • Market Making - Monitor bid/ask spreads and adjust quotes in real-time
  • Trading Algorithms - Access live order book data for execution strategies
  • Market Analysis - Track liquidity and depth changes over time
  • Risk Management - Monitor market conditions and liquidity availability

Method Signature#

rpc StreamOrderbookSnapshots(Position) returns (stream OrderBookSnapshot) {}

Request Message#

message Position {
// Leave all fields unset or zero to target the latest data.
oneof position {
int64 timestamp_ms = 1; // ms since Unix epoch, inclusive
int64 block_height = 2; // block height, inclusive
}
}

The Position message allows flexible stream starting points:

  • timestamp_ms: Start streaming from a specific time (milliseconds since Unix epoch)
  • block_height: Start streaming from a specific block height
  • Empty/zero: Start streaming from the latest order book snapshot

Response Stream#

message OrderBookSnapshot {
// JSON-encoded Hyperliquid order book snapshot.
bytes data = 1;
}

The data field contains a JSON-encoded order book snapshot with:

  • Bid and ask levels with price, size, and order count
  • Trading pair symbol
  • Timestamp of the snapshot

Data Structure#

{
"coin": "string", // Trading pair symbol (e.g., "@1" for ETH)
"time": 1757672867000, // Unix timestamp in milliseconds
"levels": [
[ // Bid levels (index 0)
{
"px": "18.414", // Price level
"sz": "100.0", // Total size at this level
"n": 1 // Number of orders at this level
}
],
[ // Ask levels (index 1)
{
"px": "18.515", // Price level
"sz": "50.5", // Total size at this level
"n": 2 // Number of orders at this level
}
]
]
}

Implementation Examples#

package main

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strings"

pb "hyperliquid-grpc-client/api/v2"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)

type OrderBookLevel struct {
Price string `json:"px"`
Size string `json:"sz"`
Orders int `json:"n"`
}

type OrderBookData struct {
Coin string `json:"coin"`
Time float64 `json:"time"`
Levels [][]OrderBookLevel `json:"levels"`
}

func streamOrderbookSnapshots() {
endpoint := os.Getenv("HYPERLIQUID_ENDPOINT")
apiKey := os.Getenv("API_KEY")

fmt.Println("🚀 Hyperliquid Go gRPC Client - Stream Order Book Snapshots")
fmt.Println("============================================================")
fmt.Printf("📡 Endpoint: %s\n\n", endpoint)

// Set up TLS connection
creds := credentials.NewTLS(nil)

// Connection options
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(150 * 1024 * 1024), // 150MB max
),
}

// Connect to server
fmt.Println("🔌 Connecting to gRPC server...")
conn, err := grpc.NewClient(endpoint, opts...)
if err != nil {
log.Fatalf("❌ Failed to connect: %v", err)
}
defer conn.Close()

// Create the client using generated code
client := pb.NewHyperliquidL1GatewayClient(conn)
fmt.Println("✅ Connected successfully!\n")

// Create context with API key
ctx := metadata.AppendToOutgoingContext(context.Background(), "x-api-key", apiKey)

// Create request - empty Position means latest/current snapshots
request := &pb.Position{}

fmt.Println("📥 Starting order book snapshots stream...")
fmt.Println("Press Ctrl+C to stop streaming\n")

// Start streaming order book snapshots
stream, err := client.StreamOrderbookSnapshots(ctx, request)
if err != nil {
log.Fatalf("❌ Failed to start stream: %v", err)
}

snapshotCount := 0
for {
response, err := stream.Recv()
if err != nil {
fmt.Printf("❌ Stream ended: %v\n", err)
break
}

snapshotCount++
fmt.Printf("\n===== SNAPSHOT #%d =====\n", snapshotCount)
fmt.Printf("📦 Response size: %d bytes\n", len(response.Data))

// Process each snapshot
processOrderBookSnapshot(response.Data, snapshotCount)

fmt.Println("\n" + strings.Repeat("─", 50))
}

fmt.Printf("\n📊 Total snapshots received: %d\n", snapshotCount)
}

func processOrderBookSnapshot(data []byte, snapshotNum int) {
// Parse JSON
var orderBook OrderBookData
if err := json.Unmarshal(data, &orderBook); err != nil {
fmt.Printf("❌ Failed to parse JSON: %v\n", err)
fmt.Printf("Raw data (first 200 bytes): %.200s\n", data)
return
}

fmt.Printf("📊 ORDER BOOK SNAPSHOT #%d\n", snapshotNum)
fmt.Println("==========================")

fmt.Printf("🪙 Coin: %s\n", orderBook.Coin)

if len(orderBook.Levels) >= 2 {
bids := orderBook.Levels[0]
asks := orderBook.Levels[1]

fmt.Printf("📋 Bids: %d levels\n", len(bids))
fmt.Printf("📋 Asks: %d levels\n", len(asks))

// Display best bid and ask
if len(bids) > 0 && len(asks) > 0 {
fmt.Printf("\n💰 Best Bid: %s @ %s (Orders: %d)\n",
bids[0].Size, bids[0].Price, bids[0].Orders)
fmt.Printf("💰 Best Ask: %s @ %s (Orders: %d)\n",
asks[0].Size, asks[0].Price, asks[0].Orders)
}
}
}

func main() {
streamOrderbookSnapshots()
}

Common Use Cases#

1. Real-time Market Making#

class MarketMaker {
constructor(streamManager) {
this.streamManager = streamManager;
this.currentSpread = null;
this.quotesActive = false;

streamManager.on('orderbook', (snapshot) => {
this.updateQuotes(snapshot);
});
}

updateQuotes(snapshot) {
const levels = snapshot.levels || [];
if (levels.length < 2) return;

const bids = levels[0];
const asks = levels[1];

if (bids.length === 0 || asks.length === 0) return;

const bestBid = parseFloat(bids[0].px);
const bestAsk = parseFloat(asks[0].px);
const midPrice = (bestBid + bestAsk) / 2;
const spread = bestAsk - bestBid;

// Calculate new quote prices
const targetSpread = Math.max(spread * 0.9, 0.001);
const newBid = midPrice - (targetSpread / 2);
const newAsk = midPrice + (targetSpread / 2);

console.log(`Mid: ${midPrice}, Spread: ${spread}, Quotes: ${newBid}/${newAsk}`);

// Update quotes if spread has changed significantly
if (this.shouldUpdateQuotes(newBid, newAsk)) {
this.sendQuotes(newBid, newAsk);
}
}

shouldUpdateQuotes(newBid, newAsk) {
// Implement quote update logic based on threshold
return true;
}

sendQuotes(bid, ask) {
// Send quote updates to exchange
console.log(`Sending quotes: ${bid} / ${ask}`);
}
}

2. Liquidity Depth Monitor#

class LiquidityMonitor:
def __init__(self):
self.depth_history = []
self.alerts_threshold = 0.2 # 20% depth change

def analyze_depth(self, snapshot):
"""Analyze order book depth and detect significant changes"""
levels = snapshot.get('levels', [])
if len(levels) < 2:
return None

bids = levels[0]
asks = levels[1]

# Calculate depth at different price levels
bid_depth_1pct = self.calculate_depth_at_percent(bids, 0.01)
ask_depth_1pct = self.calculate_depth_at_percent(asks, 0.01)

depth_data = {
'timestamp': snapshot.get('time'),
'coin': snapshot.get('coin'),
'bid_depth_1pct': bid_depth_1pct,
'ask_depth_1pct': ask_depth_1pct,
'total_depth': bid_depth_1pct + ask_depth_1pct,
'imbalance': (bid_depth_1pct - ask_depth_1pct) / (bid_depth_1pct + ask_depth_1pct)
}

self.check_for_alerts(depth_data)
self.depth_history.append(depth_data)

return depth_data

def calculate_depth_at_percent(self, levels, percent):
"""Calculate total depth within X% of best price"""
if not levels:
return 0

best_price = float(levels[0]['px'])
threshold = best_price * percent
total_depth = 0

for level in levels:
price = float(level['px'])
if abs(price - best_price) <= threshold:
total_depth += float(level['sz'])
else:
break

return total_depth

def check_for_alerts(self, current_depth):
"""Check for significant depth changes"""
if len(self.depth_history) < 2:
return

prev_depth = self.depth_history[-1]
change = abs(current_depth['total_depth'] - prev_depth['total_depth'])
change_pct = change / prev_depth['total_depth'] if prev_depth['total_depth'] > 0 else 0

if change_pct > self.alerts_threshold:
print(f"⚠️ Significant depth change: {change_pct:.1%}")

3. Spread Analytics#

type SpreadAnalyzer struct {
client pb.HyperliquidL1GatewayClient
spreads []SpreadData
}

type SpreadData struct {
Timestamp int64
BestBid float64
BestAsk float64
Spread float64
SpreadBps float64
}

func (sa *SpreadAnalyzer) AnalyzeSpreads(ctx context.Context) {
stream, err := sa.client.StreamOrderbookSnapshots(ctx, &pb.Position{})
if err != nil {
log.Fatal(err)
}

for {
snapshot, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return
}

sa.processSnapshot(snapshot.Data)
}
}

func (sa *SpreadAnalyzer) processSnapshot(data []byte) {
var orderBook map[string]interface{}
json.Unmarshal(data, &orderBook)

levels, ok := orderBook["levels"].([]interface{})
if !ok || len(levels) < 2 {
return
}

bids := levels[0].([]interface{})
asks := levels[1].([]interface{})

if len(bids) == 0 || len(asks) == 0 {
return
}

bestBid := parseLevel(bids[0])
bestAsk := parseLevel(asks[0])

spread := bestAsk.Price - bestBid.Price
spreadBps := (spread / bestBid.Price) * 10000

spreadData := SpreadData{
Timestamp: time.Now().Unix(),
BestBid: bestBid.Price,
BestAsk: bestAsk.Price,
Spread: spread,
SpreadBps: spreadBps,
}

sa.spreads = append(sa.spreads, spreadData)
log.Printf("Spread: %.6f (%.2f bps)", spread, spreadBps)
}

Error Handling and Reconnection#

class RobustOrderbookStreamer {
constructor(endpoint) {
this.endpoint = endpoint;
this.maxRetries = 5;
this.retryDelay = 1000;
this.currentRetries = 0;
}

async startStreamWithRetry() {
while (this.currentRetries < this.maxRetries) {
try {
await this.startStream();
this.currentRetries = 0;
this.retryDelay = 1000;
} catch (error) {
this.currentRetries++;
console.error(`Stream attempt ${this.currentRetries} failed:`, error.message);

if (this.currentRetries >= this.maxRetries) {
throw new Error('Max retry attempts exceeded');
}

// Exponential backoff
await this.sleep(this.retryDelay);
this.retryDelay *= 2;
}
}
}

sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

Best Practices#

  1. Connection Management: Implement robust reconnection logic with exponential backoff
  2. Memory Management: Use bounded collections for storing historical snapshots
  3. Performance: Process snapshots asynchronously to avoid blocking the stream
  4. Monitoring: Track stream health and snapshot rates
  5. Error Recovery: Handle various error types gracefully
  6. Resource Cleanup: Properly close streams and connections on shutdown

Current Limitations#

  • Historical Data: Cannot stream from historical timestamps; only real-time streaming available
  • Data Retention: Node maintains only 24 hours of historical order book data
  • Backpressure: High-volume periods may require careful handling to avoid overwhelming downstream systems

Resources#

Need help? Contact our support team or check the Hyperliquid gRPC documentation.