StreamOrderbookSnapshots
Stream continuous order book snapshots starting from a position, providing real-time access to market depth and liquidity data on Hyperliquid.
Full Code Examples
Clone our gRPC Code Examples Repository for complete, runnable implementations in Go, Python, and Node.js.
When to Use This Method#
StreamOrderbookSnapshots is essential for:
- Market Making - Monitor bid/ask spreads and adjust quotes in real-time
- Trading Algorithms - Access live order book data for execution strategies
- Market Analysis - Track liquidity and depth changes over time
- Risk Management - Monitor market conditions and liquidity availability
Method Signature#
rpc StreamOrderbookSnapshots(Position) returns (stream OrderBookSnapshot) {}
Request Message#
message Position {
// Leave all fields unset or zero to target the latest data.
oneof position {
int64 timestamp_ms = 1; // ms since Unix epoch, inclusive
int64 block_height = 2; // block height, inclusive
}
}
The Position message allows flexible stream starting points:
- timestamp_ms: Start streaming from a specific time (milliseconds since Unix epoch)
- block_height: Start streaming from a specific block height
- Empty/zero: Start streaming from the latest order book snapshot
Response Stream#
message OrderBookSnapshot {
// JSON-encoded Hyperliquid order book snapshot.
bytes data = 1;
}
The data field contains a JSON-encoded order book snapshot with:
- Bid and ask levels with price, size, and order count
- Trading pair symbol
- Timestamp of the snapshot
Data Structure#
{
"coin": "string", // Trading pair symbol (e.g., "@1" for ETH)
"time": 1757672867000, // Unix timestamp in milliseconds
"levels": [
[ // Bid levels (index 0)
{
"px": "18.414", // Price level
"sz": "100.0", // Total size at this level
"n": 1 // Number of orders at this level
}
],
[ // Ask levels (index 1)
{
"px": "18.515", // Price level
"sz": "50.5", // Total size at this level
"n": 2 // Number of orders at this level
}
]
]
}
Implementation Examples#
- Go
- Python
- Node.js
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strings"
pb "hyperliquid-grpc-client/api/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
type OrderBookLevel struct {
Price string `json:"px"`
Size string `json:"sz"`
Orders int `json:"n"`
}
type OrderBookData struct {
Coin string `json:"coin"`
Time float64 `json:"time"`
Levels [][]OrderBookLevel `json:"levels"`
}
func streamOrderbookSnapshots() {
endpoint := os.Getenv("HYPERLIQUID_ENDPOINT")
apiKey := os.Getenv("API_KEY")
fmt.Println("🚀 Hyperliquid Go gRPC Client - Stream Order Book Snapshots")
fmt.Println("============================================================")
fmt.Printf("📡 Endpoint: %s\n\n", endpoint)
// Set up TLS connection
creds := credentials.NewTLS(nil)
// Connection options
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(150 * 1024 * 1024), // 150MB max
),
}
// Connect to server
fmt.Println("🔌 Connecting to gRPC server...")
conn, err := grpc.NewClient(endpoint, opts...)
if err != nil {
log.Fatalf("❌ Failed to connect: %v", err)
}
defer conn.Close()
// Create the client using generated code
client := pb.NewHyperliquidL1GatewayClient(conn)
fmt.Println("✅ Connected successfully!\n")
// Create context with API key
ctx := metadata.AppendToOutgoingContext(context.Background(), "x-api-key", apiKey)
// Create request - empty Position means latest/current snapshots
request := &pb.Position{}
fmt.Println("📥 Starting order book snapshots stream...")
fmt.Println("Press Ctrl+C to stop streaming\n")
// Start streaming order book snapshots
stream, err := client.StreamOrderbookSnapshots(ctx, request)
if err != nil {
log.Fatalf("❌ Failed to start stream: %v", err)
}
snapshotCount := 0
for {
response, err := stream.Recv()
if err != nil {
fmt.Printf("❌ Stream ended: %v\n", err)
break
}
snapshotCount++
fmt.Printf("\n===== SNAPSHOT #%d =====\n", snapshotCount)
fmt.Printf("📦 Response size: %d bytes\n", len(response.Data))
// Process each snapshot
processOrderBookSnapshot(response.Data, snapshotCount)
fmt.Println("\n" + strings.Repeat("─", 50))
}
fmt.Printf("\n📊 Total snapshots received: %d\n", snapshotCount)
}
func processOrderBookSnapshot(data []byte, snapshotNum int) {
// Parse JSON
var orderBook OrderBookData
if err := json.Unmarshal(data, &orderBook); err != nil {
fmt.Printf("❌ Failed to parse JSON: %v\n", err)
fmt.Printf("Raw data (first 200 bytes): %.200s\n", data)
return
}
fmt.Printf("📊 ORDER BOOK SNAPSHOT #%d\n", snapshotNum)
fmt.Println("==========================")
fmt.Printf("🪙 Coin: %s\n", orderBook.Coin)
if len(orderBook.Levels) >= 2 {
bids := orderBook.Levels[0]
asks := orderBook.Levels[1]
fmt.Printf("📋 Bids: %d levels\n", len(bids))
fmt.Printf("📋 Asks: %d levels\n", len(asks))
// Display best bid and ask
if len(bids) > 0 && len(asks) > 0 {
fmt.Printf("\n💰 Best Bid: %s @ %s (Orders: %d)\n",
bids[0].Size, bids[0].Price, bids[0].Orders)
fmt.Printf("💰 Best Ask: %s @ %s (Orders: %d)\n",
asks[0].Size, asks[0].Price, asks[0].Orders)
}
}
}
func main() {
streamOrderbookSnapshots()
}
import grpc
import json
import signal
import sys
import os
from datetime import datetime
from dotenv import load_dotenv
import hyperliquid_pb2
import hyperliquid_pb2_grpc
# Load environment variables
load_dotenv()
def stream_orderbook_snapshots():
endpoint = os.getenv('HYPERLIQUID_ENDPOINT')
api_key = os.getenv('API_KEY')
if not endpoint:
print("Error: HYPERLIQUID_ENDPOINT environment variable is required.")
print("Please create a .env file from .env.example and set your endpoint.")
sys.exit(1)
if not api_key:
print("Error: API_KEY environment variable is required.")
print("Please set your API key in the .env file.")
sys.exit(1)
print('🚀 Hyperliquid Python gRPC Client - Stream Order Book Snapshots')
print('================================================================')
print(f'📡 Endpoint: {endpoint}\n')
# Create SSL credentials
credentials = grpc.ssl_channel_credentials()
# Create channel with options
options = [
('grpc.max_receive_message_length', 150 * 1024 * 1024), # 150MB max
]
# Prepare metadata with API key
metadata = [('x-api-key', api_key)]
print('🔌 Connecting to gRPC server...')
with grpc.secure_channel(endpoint, credentials, options=options) as channel:
# Create client stub
client = hyperliquid_pb2_grpc.HyperliquidL1GatewayStub(channel)
print('✅ Connected successfully!\n')
# Create request - empty Position means latest/current snapshots
request = hyperliquid_pb2.Position()
print('📥 Starting order book snapshots stream...')
print('Press Ctrl+C to stop streaming\n')
snapshot_count = 0
# Set up signal handler for graceful shutdown
def signal_handler(sig, frame):
print('\n🛑 Stopping stream...')
print(f'📊 Total snapshots received: {snapshot_count}')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
try:
# Start streaming order book snapshots with metadata
for response in client.StreamOrderbookSnapshots(request, metadata=metadata):
snapshot_count += 1
print(f'\n===== SNAPSHOT #{snapshot_count} =====')
print(f'📦 Response size: {len(response.data)} bytes')
# Process each snapshot
process_orderbook_snapshot(response.data, snapshot_count)
print('\n' + '─' * 50)
except grpc.RpcError as e:
print(f'❌ Stream error: {e}')
except KeyboardInterrupt:
print('\n🛑 Stopping stream...')
print(f'\n📊 Total snapshots received: {snapshot_count}')
def process_orderbook_snapshot(data, snapshot_num):
try:
# Parse JSON
order_book = json.loads(data.decode('utf-8'))
print(f'📊 ORDER BOOK SNAPSHOT #{snapshot_num}')
print('==========================')
# Display coin/symbol
if 'coin' in order_book:
print(f'🪙 Coin: {order_book["coin"]}')
# Display timestamp
if 'time' in order_book:
timestamp = order_book['time']
dt = datetime.fromtimestamp(timestamp / 1000)
print(f'⏰ Time: {dt.strftime("%Y-%m-%d %H:%M:%S UTC")}')
# Display levels
if 'levels' in order_book and len(order_book['levels']) >= 2:
bids = order_book['levels'][0]
asks = order_book['levels'][1]
print(f'\n📋 Bids: {len(bids)} levels')
print(f'📋 Asks: {len(asks)} levels')
# Display best bid and ask
if len(bids) > 0 and len(asks) > 0:
best_bid = bids[0]
best_ask = asks[0]
print(f'\n💰 Best Bid: {best_bid["sz"]} @ {best_bid["px"]} (Orders: {best_bid["n"]})')
print(f'💰 Best Ask: {best_ask["sz"]} @ {best_ask["px"]} (Orders: {best_ask["n"]})')
# Calculate spread
bid_price = float(best_bid['px'])
ask_price = float(best_ask['px'])
spread = ask_price - bid_price
spread_bps = (spread / bid_price) * 10000
print(f'\n📊 Spread: {spread:.6f} ({spread_bps:.2f} bps)')
except json.JSONDecodeError as e:
print(f'❌ Failed to parse JSON: {e}')
print(f'Raw data (first 200 bytes): {data[:200]}')
except Exception as e:
print(f'❌ Error processing snapshot: {e}')
if __name__ == '__main__':
stream_orderbook_snapshots()
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');
require('dotenv').config();
// Load proto file
const PROTO_PATH = path.join(__dirname, 'v2.proto');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const proto = grpc.loadPackageDefinition(packageDefinition);
// Main function
async function streamOrderbookSnapshots() {
const endpoint = process.env.HYPERLIQUID_ENDPOINT;
const apiKey = process.env.API_KEY;
if (!endpoint) {
console.error('Error: HYPERLIQUID_ENDPOINT environment variable is required.');
console.error('Please create a .env file from .env.example and set your endpoint.');
process.exit(1);
}
if (!apiKey) {
console.error('Error: API_KEY environment variable is required.');
console.error('Please set your API key in the .env file.');
process.exit(1);
}
console.log('🚀 Hyperliquid Node.js gRPC Client - Stream Order Book Snapshots');
console.log('=================================================================');
console.log(`📡 Endpoint: ${endpoint}\n`);
// Create metadata with API key
const metadata = new grpc.Metadata();
metadata.add('x-api-key', apiKey);
// Create client with SSL credentials and 150MB message size limit
const client = new proto.hyperliquid_l1_gateway.v2.HyperliquidL1Gateway(
endpoint,
grpc.credentials.createSsl(),
{
'grpc.max_receive_message_length': 150 * 1024 * 1024
}
);
console.log('📥 Starting order book snapshots stream...');
console.log('Press Ctrl+C to stop streaming\n');
// Make the gRPC call - empty Position for latest
const stream = client.StreamOrderbookSnapshots({}, metadata);
let snapshotCount = 0;
stream.on('data', (data) => {
snapshotCount++;
try {
const orderBook = JSON.parse(data.data);
console.log(`\n===== SNAPSHOT #${snapshotCount} =====`);
console.log(`📦 Response size: ${data.data.length} bytes`);
// Process each snapshot
processOrderbookSnapshot(orderBook, snapshotCount);
console.log('\n' + '─'.repeat(50));
} catch (error) {
console.error(`❌ Failed to parse message #${snapshotCount}:`, error.message);
}
});
stream.on('error', (error) => {
console.error('❌ Stream error:', error.message);
});
stream.on('end', () => {
console.log('Stream ended');
console.log(`\n📊 Total snapshots received: ${snapshotCount}`);
});
}
function processOrderbookSnapshot(orderBook, snapshotNum) {
console.log(`📊 ORDER BOOK SNAPSHOT #${snapshotNum}`);
console.log('==========================');
// Display coin/symbol
if (orderBook.coin) {
console.log(`🪙 Coin: ${orderBook.coin}`);
}
// Display timestamp
if (orderBook.time) {
const date = new Date(orderBook.time);
console.log(`⏰ Time: ${date.toISOString()}`);
}
// Display levels
const levels = orderBook.levels || [];
if (levels.length >= 2) {
const bids = levels[0];
const asks = levels[1];
console.log(`\n📋 Bids: ${bids.length} levels`);
console.log(`📋 Asks: ${asks.length} levels`);
// Display best bid and ask
if (bids.length > 0 && asks.length > 0) {
const bestBid = bids[0];
const bestAsk = asks[0];
console.log(`\n💰 Best Bid: ${bestBid.sz} @ ${bestBid.px} (Orders: ${bestBid.n})`);
console.log(`💰 Best Ask: ${bestAsk.sz} @ ${bestAsk.px} (Orders: ${bestAsk.n})`);
// Calculate spread
const bidPrice = parseFloat(bestBid.px);
const askPrice = parseFloat(bestAsk.px);
const spread = askPrice - bidPrice;
const spreadBps = (spread / bidPrice) * 10000;
console.log(`\n📊 Spread: ${spread.toFixed(6)} (${spreadBps.toFixed(2)} bps)`);
}
}
}
// Run it
streamOrderbookSnapshots();
Common Use Cases#
1. Real-time Market Making#
class MarketMaker {
constructor(streamManager) {
this.streamManager = streamManager;
this.currentSpread = null;
this.quotesActive = false;
streamManager.on('orderbook', (snapshot) => {
this.updateQuotes(snapshot);
});
}
updateQuotes(snapshot) {
const levels = snapshot.levels || [];
if (levels.length < 2) return;
const bids = levels[0];
const asks = levels[1];
if (bids.length === 0 || asks.length === 0) return;
const bestBid = parseFloat(bids[0].px);
const bestAsk = parseFloat(asks[0].px);
const midPrice = (bestBid + bestAsk) / 2;
const spread = bestAsk - bestBid;
// Calculate new quote prices
const targetSpread = Math.max(spread * 0.9, 0.001);
const newBid = midPrice - (targetSpread / 2);
const newAsk = midPrice + (targetSpread / 2);
console.log(`Mid: ${midPrice}, Spread: ${spread}, Quotes: ${newBid}/${newAsk}`);
// Update quotes if spread has changed significantly
if (this.shouldUpdateQuotes(newBid, newAsk)) {
this.sendQuotes(newBid, newAsk);
}
}
shouldUpdateQuotes(newBid, newAsk) {
// Implement quote update logic based on threshold
return true;
}
sendQuotes(bid, ask) {
// Send quote updates to exchange
console.log(`Sending quotes: ${bid} / ${ask}`);
}
}
2. Liquidity Depth Monitor#
class LiquidityMonitor:
def __init__(self):
self.depth_history = []
self.alerts_threshold = 0.2 # 20% depth change
def analyze_depth(self, snapshot):
"""Analyze order book depth and detect significant changes"""
levels = snapshot.get('levels', [])
if len(levels) < 2:
return None
bids = levels[0]
asks = levels[1]
# Calculate depth at different price levels
bid_depth_1pct = self.calculate_depth_at_percent(bids, 0.01)
ask_depth_1pct = self.calculate_depth_at_percent(asks, 0.01)
depth_data = {
'timestamp': snapshot.get('time'),
'coin': snapshot.get('coin'),
'bid_depth_1pct': bid_depth_1pct,
'ask_depth_1pct': ask_depth_1pct,
'total_depth': bid_depth_1pct + ask_depth_1pct,
'imbalance': (bid_depth_1pct - ask_depth_1pct) / (bid_depth_1pct + ask_depth_1pct)
}
self.check_for_alerts(depth_data)
self.depth_history.append(depth_data)
return depth_data
def calculate_depth_at_percent(self, levels, percent):
"""Calculate total depth within X% of best price"""
if not levels:
return 0
best_price = float(levels[0]['px'])
threshold = best_price * percent
total_depth = 0
for level in levels:
price = float(level['px'])
if abs(price - best_price) <= threshold:
total_depth += float(level['sz'])
else:
break
return total_depth
def check_for_alerts(self, current_depth):
"""Check for significant depth changes"""
if len(self.depth_history) < 2:
return
prev_depth = self.depth_history[-1]
change = abs(current_depth['total_depth'] - prev_depth['total_depth'])
change_pct = change / prev_depth['total_depth'] if prev_depth['total_depth'] > 0 else 0
if change_pct > self.alerts_threshold:
print(f"⚠️ Significant depth change: {change_pct:.1%}")
3. Spread Analytics#
type SpreadAnalyzer struct {
client pb.HyperliquidL1GatewayClient
spreads []SpreadData
}
type SpreadData struct {
Timestamp int64
BestBid float64
BestAsk float64
Spread float64
SpreadBps float64
}
func (sa *SpreadAnalyzer) AnalyzeSpreads(ctx context.Context) {
stream, err := sa.client.StreamOrderbookSnapshots(ctx, &pb.Position{})
if err != nil {
log.Fatal(err)
}
for {
snapshot, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return
}
sa.processSnapshot(snapshot.Data)
}
}
func (sa *SpreadAnalyzer) processSnapshot(data []byte) {
var orderBook map[string]interface{}
json.Unmarshal(data, &orderBook)
levels, ok := orderBook["levels"].([]interface{})
if !ok || len(levels) < 2 {
return
}
bids := levels[0].([]interface{})
asks := levels[1].([]interface{})
if len(bids) == 0 || len(asks) == 0 {
return
}
bestBid := parseLevel(bids[0])
bestAsk := parseLevel(asks[0])
spread := bestAsk.Price - bestBid.Price
spreadBps := (spread / bestBid.Price) * 10000
spreadData := SpreadData{
Timestamp: time.Now().Unix(),
BestBid: bestBid.Price,
BestAsk: bestAsk.Price,
Spread: spread,
SpreadBps: spreadBps,
}
sa.spreads = append(sa.spreads, spreadData)
log.Printf("Spread: %.6f (%.2f bps)", spread, spreadBps)
}
Error Handling and Reconnection#
class RobustOrderbookStreamer {
constructor(endpoint) {
this.endpoint = endpoint;
this.maxRetries = 5;
this.retryDelay = 1000;
this.currentRetries = 0;
}
async startStreamWithRetry() {
while (this.currentRetries < this.maxRetries) {
try {
await this.startStream();
this.currentRetries = 0;
this.retryDelay = 1000;
} catch (error) {
this.currentRetries++;
console.error(`Stream attempt ${this.currentRetries} failed:`, error.message);
if (this.currentRetries >= this.maxRetries) {
throw new Error('Max retry attempts exceeded');
}
// Exponential backoff
await this.sleep(this.retryDelay);
this.retryDelay *= 2;
}
}
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Best Practices#
- Connection Management: Implement robust reconnection logic with exponential backoff
- Memory Management: Use bounded collections for storing historical snapshots
- Performance: Process snapshots asynchronously to avoid blocking the stream
- Monitoring: Track stream health and snapshot rates
- Error Recovery: Handle various error types gracefully
- Resource Cleanup: Properly close streams and connections on shutdown
Current Limitations#
- Historical Data: Cannot stream from historical timestamps; only real-time streaming available
- Data Retention: Node maintains only 24 hours of historical order book data
- Backpressure: High-volume periods may require careful handling to avoid overwhelming downstream systems
Resources#
- GitHub: gRPC Code Examples - Complete working examples
- Copy Trading Bot - Production-ready trading bot example
Need help? Contact our support team or check the Hyperliquid gRPC documentation.