⚠️Blast API (blastapi.io) ends Oct 31. Migrate to Dwellir and skip Alchemy's expensive compute units.
Switch Today →
Skip to main content

StreamOrderBookSnapshots

Stream continuous order book snapshots for real-time market data and liquidity tracking.

When to Use This Method

StreamOrderBookSnapshots is essential for:

  • Algorithmic Trading - Real-time order book analysis for trading algorithms
  • Market Making - Monitor market depth and adjust spread strategies
  • Risk Management - Track liquidity changes and market impact
  • Price Discovery - Analyze bid/ask dynamics and market sentiment

Method Signature

rpc StreamOrderBookSnapshots(Timestamp) returns (stream OrderBookSnapshot) {}

Request Message

message Timestamp {   
int64 timestamp = 1;
}

Current Limitation: The timestamp parameter is not yet implemented. Streams currently start from "now" and stream forward only, regardless of the timestamp value provided.

Response Stream

message OrderBookSnapshot {
// JSON-encoded object conforming to a
// Hyperliquid order book snapshot
bytes data = 1;
}

Each snapshot contains:

  • Complete bid orders sorted by price (descending)
  • Complete ask orders sorted by price (ascending)
  • Market depth at various price levels
  • Timestamp of the snapshot
  • Sequence number for ordering

Implementation Examples

package main

import (
"context"
"encoding/json"
"log"
"sync"
"time"

pb "your-project/hyperliquid_l1_gateway/v1"
"google.golang.org/grpc"
)

type OrderBookSnapshot struct {
Bids [][]string `json:"bids"`
Asks [][]string `json:"asks"`
Timestamp int64 `json:"timestamp"`
Sequence int64 `json:"sequence"`
}

type MarketDataManager struct {
client pb.HyperliquidL1GatewayClient
currentBook *OrderBookSnapshot
bookMutex sync.RWMutex
subscribers []chan *OrderBookSnapshot
subscribersMux sync.Mutex

// Analytics
updateCount int64
lastUpdate time.Time
spreadHistory []float64
}

func NewMarketDataManager(client pb.HyperliquidL1GatewayClient) *MarketDataManager {
return &MarketDataManager{
client: client,
subscribers: make([]chan *OrderBookSnapshot, 0),
spreadHistory: make([]float64, 0, 1000), // Keep last 1000 spreads
}
}

func (mdm *MarketDataManager) StartStreaming(ctx context.Context) error {
stream, err := mdm.client.StreamOrderBookSnapshots(
ctx,
&pb.Timestamp{Timestamp: 0},
)
if err != nil {
return err
}

log.Println("Starting order book stream...")

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
response, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return err
}

if err := mdm.processSnapshot(response.Data); err != nil {
log.Printf("Failed to process snapshot: %v", err)
continue
}
}
}
}

func (mdm *MarketDataManager) processSnapshot(data []byte) error {
var snapshot OrderBookSnapshot
if err := json.Unmarshal(data, &snapshot); err != nil {
return err
}

// Update current book with thread safety
mdm.bookMutex.Lock()
mdm.currentBook = &snapshot
mdm.updateCount++
mdm.lastUpdate = time.Now()
mdm.bookMutex.Unlock()

// Calculate and store spread
spread := mdm.calculateSpread(&snapshot)
if spread > 0 {
mdm.addSpreadSample(spread)
}

// Notify subscribers
mdm.notifySubscribers(&snapshot)

log.Printf("Order book updated - Bids: %d, Asks: %d, Spread: %.6f",
len(snapshot.Bids), len(snapshot.Asks), spread)

return nil
}

func (mdm *MarketDataManager) calculateSpread(book *OrderBookSnapshot) float64 {
if len(book.Bids) == 0 || len(book.Asks) == 0 {
return 0
}

bestBid := parsePrice(book.Bids[0][0])
bestAsk := parsePrice(book.Asks[0][0])

if bestBid <= 0 || bestAsk <= 0 {
return 0
}

return bestAsk - bestBid
}

func (mdm *MarketDataManager) addSpreadSample(spread float64) {
if len(mdm.spreadHistory) >= cap(mdm.spreadHistory) {
// Remove oldest sample
copy(mdm.spreadHistory, mdm.spreadHistory[1:])
mdm.spreadHistory = mdm.spreadHistory[:len(mdm.spreadHistory)-1]
}
mdm.spreadHistory = append(mdm.spreadHistory, spread)
}

func (mdm *MarketDataManager) GetCurrentBook() *OrderBookSnapshot {
mdm.bookMutex.RLock()
defer mdm.bookMutex.RUnlock()

if mdm.currentBook == nil {
return nil
}

// Return a copy to avoid race conditions
bookCopy := *mdm.currentBook
return &bookCopy
}

func (mdm *MarketDataManager) Subscribe() <-chan *OrderBookSnapshot {
mdm.subscribersMux.Lock()
defer mdm.subscribersMux.Unlock()

ch := make(chan *OrderBookSnapshot, 100) // Buffered channel
mdm.subscribers = append(mdm.subscribers, ch)
return ch
}

func (mdm *MarketDataManager) notifySubscribers(snapshot *OrderBookSnapshot) {
mdm.subscribersMux.Lock()
defer mdm.subscribersMux.Unlock()

for _, ch := range mdm.subscribers {
select {
case ch <- snapshot:
default:
// Channel full, skip this update
log.Printf("Warning: Subscriber channel full, skipping update")
}
}
}

func (mdm *MarketDataManager) GetAnalytics() map[string]interface{} {
mdm.bookMutex.RLock()
defer mdm.bookMutex.RUnlock()

analytics := map[string]interface{}{
"update_count": mdm.updateCount,
"last_update": mdm.lastUpdate,
"spread_samples": len(mdm.spreadHistory),
}

if len(mdm.spreadHistory) > 0 {
sum := 0.0
min := mdm.spreadHistory[0]
max := mdm.spreadHistory[0]

for _, spread := range mdm.spreadHistory {
sum += spread
if spread < min {
min = spread
}
if spread > max {
max = spread
}
}

analytics["avg_spread"] = sum / float64(len(mdm.spreadHistory))
analytics["min_spread"] = min
analytics["max_spread"] = max
}

return analytics
}

func parsePrice(priceStr string) float64 {
// Implement price parsing logic
// This is a placeholder - implement based on actual price format
var price float64
// ... parsing logic
return price
}

func main() {
// Connect to Hyperliquid L1 Gateway
conn, err := grpc.Dial(
"<YOUR_HYPERLIQUID_ENDPOINT>", // Contact support@dwellir.com
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()

client := pb.NewHyperliquidL1GatewayClient(conn)
manager := NewMarketDataManager(client)

// Subscribe to order book updates
bookUpdates := manager.Subscribe()

// Start streaming in background
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
if err := manager.StartStreaming(ctx); err != nil {
log.Printf("Stream ended: %v", err)
}
}()

// Process order book updates
go func() {
for book := range bookUpdates {
// Implement your trading logic here
processTradingSignals(book)
}
}()

// Print analytics periodically
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
analytics := manager.GetAnalytics()
log.Printf("Analytics: %+v", analytics)
}
}
}

func processTradingSignals(book *OrderBookSnapshot) {
// Implement your trading algorithm here
// Example: Check for arbitrage opportunities, adjust orders, etc.
}

Common Use Cases

1. Algorithmic Trading

class AlgoTradingEngine:
def __init__(self, analyzer):
self.analyzer = analyzer
self.signals = []
analyzer.subscribe(self.on_book_update)

def on_book_update(self, book_data):
# Mean reversion strategy
signals = self.detect_mean_reversion_signals(book_data)

# Momentum strategy
momentum_signals = self.detect_momentum_signals(book_data)

# Combine signals
all_signals = signals + momentum_signals

for signal in all_signals:
self.execute_signal(signal)

def detect_mean_reversion_signals(self, book_data):
# Implement mean reversion logic
pass

def execute_signal(self, signal):
# Execute trading signal
print(f"Executing signal: {signal}")

2. Risk Management

class RiskManager {
constructor(streamer) {
this.streamer = streamer;
this.positionLimits = { max: 1000, current: 0 };
this.spreadThresholds = { warning: 0.001, critical: 0.005 };

streamer.on('snapshot', (bookData) => {
this.assessRisk(bookData);
});
}

assessRisk(bookData) {
const spread = this.streamer.calculateSpread(bookData);
const depth = this.calculateMarketDepth(bookData);

// Check spread risk
if (spread > this.spreadThresholds.critical) {
this.emit('riskAlert', {
type: 'WIDE_SPREAD',
severity: 'CRITICAL',
data: { spread, threshold: this.spreadThresholds.critical }
});
}

// Check liquidity risk
if (depth < 100) { // Minimum depth threshold
this.emit('riskAlert', {
type: 'LOW_LIQUIDITY',
severity: 'HIGH',
data: { depth }
});
}
}

calculateMarketDepth(bookData, maxLevels = 10) {
const bids = (bookData.bids || []).slice(0, maxLevels);
const asks = (bookData.asks || []).slice(0, maxLevels);

const bidDepth = bids.reduce((sum, order) => sum + parseFloat(order[1] || 0), 0);
const askDepth = asks.reduce((sum, order) => sum + parseFloat(order[1] || 0), 0);

return bidDepth + askDepth;
}
}

3. Market Data Distribution

type MarketDataDistributor struct {
manager *MarketDataManager
clients map[string]chan *OrderBookSnapshot
clientsMutex sync.RWMutex
}

func (mdd *MarketDataDistributor) AddClient(clientID string) <-chan *OrderBookSnapshot {
mdd.clientsMutex.Lock()
defer mdd.clientsMutex.Unlock()

ch := make(chan *OrderBookSnapshot, 100)
mdd.clients[clientID] = ch
return ch
}

func (mdd *MarketDataDistributor) RemoveClient(clientID string) {
mdd.clientsMutex.Lock()
defer mdd.clientsMutex.Unlock()

if ch, exists := mdd.clients[clientID]; exists {
close(ch)
delete(mdd.clients, clientID)
}
}

func (mdd *MarketDataDistributor) DistributeSnapshot(snapshot *OrderBookSnapshot) {
mdd.clientsMutex.RLock()
defer mdd.clientsMutex.RUnlock()

for clientID, ch := range mdd.clients {
select {
case ch <- snapshot:
default:
log.Printf("Client %s channel full, dropping update", clientID)
}
}
}

Performance Optimization

1. Efficient Processing

class OptimizedBookProcessor {
constructor() {
this.processingQueue = [];
this.isProcessing = false;
this.batchSize = 10;
}

async processSnapshot(snapshot) {
// Add to queue instead of processing immediately
this.processingQueue.push(snapshot);

if (!this.isProcessing) {
this.processBatch();
}
}

async processBatch() {
this.isProcessing = true;

while (this.processingQueue.length > 0) {
const batch = this.processingQueue.splice(0, this.batchSize);

// Process batch in parallel
await Promise.all(batch.map(snapshot => this.processIndividual(snapshot)));
}

this.isProcessing = false;
}

async processIndividual(snapshot) {
// Individual processing logic
}
}

2. Memory Management

class MemoryEfficientAnalyzer:
def __init__(self, max_history=1000):
self.max_history = max_history
self.spread_buffer = collections.deque(maxlen=max_history)
self.depth_buffer = collections.deque(maxlen=max_history)

def add_sample(self, spread, depth):
# Automatically manages memory with fixed-size deques
self.spread_buffer.append(spread)
self.depth_buffer.append(depth)

def get_recent_stats(self, window=100):
recent_spreads = list(itertools.islice(self.spread_buffer, max(0, len(self.spread_buffer) - window), None))
return {
'mean_spread': statistics.mean(recent_spreads) if recent_spreads else 0,
'samples': len(recent_spreads)
}

Best Practices

  1. Buffer Management: Use appropriately sized buffers to handle bursts of updates
  2. Statistical Analysis: Track key metrics like spread, depth, and volatility over time
  3. Event Detection: Implement algorithms to detect significant market events
  4. Resource Management: Monitor memory usage and implement cleanup for long-running processes
  5. Error Recovery: Handle stream interruptions gracefully with automatic reconnection
  6. Data Validation: Validate order book data integrity before processing

Current Limitations

  • Historical Data: Cannot stream from historical timestamps; only real-time streaming available
  • Data Retention: Node maintains only 24 hours of historical data
  • Order Book Depth: Full depth may vary depending on market conditions
  • Update Frequency: Update frequency depends on market activity and network conditions

Need help? Contact our support team or check the Hyperliquid gRPC documentation.