StreamOrderBookSnapshots
Stream continuous order book snapshots for real-time market data and liquidity tracking.
When to Use This Method
StreamOrderBookSnapshots
is essential for:
- Algorithmic Trading - Real-time order book analysis for trading algorithms
- Market Making - Monitor market depth and adjust spread strategies
- Risk Management - Track liquidity changes and market impact
- Price Discovery - Analyze bid/ask dynamics and market sentiment
Method Signature
rpc StreamOrderBookSnapshots(Timestamp) returns (stream OrderBookSnapshot) {}
Request Message
message Timestamp {
int64 timestamp = 1;
}
Current Limitation: The timestamp parameter is not yet implemented. Streams currently start from "now" and stream forward only, regardless of the timestamp value provided.
Response Stream
message OrderBookSnapshot {
// JSON-encoded object conforming to a
// Hyperliquid order book snapshot
bytes data = 1;
}
Each snapshot contains:
- Complete bid orders sorted by price (descending)
- Complete ask orders sorted by price (ascending)
- Market depth at various price levels
- Timestamp of the snapshot
- Sequence number for ordering
Implementation Examples
- Go
- Python
- Node.js
package main
import (
"context"
"encoding/json"
"log"
"sync"
"time"
pb "your-project/hyperliquid_l1_gateway/v1"
"google.golang.org/grpc"
)
type OrderBookSnapshot struct {
Bids [][]string `json:"bids"`
Asks [][]string `json:"asks"`
Timestamp int64 `json:"timestamp"`
Sequence int64 `json:"sequence"`
}
type MarketDataManager struct {
client pb.HyperliquidL1GatewayClient
currentBook *OrderBookSnapshot
bookMutex sync.RWMutex
subscribers []chan *OrderBookSnapshot
subscribersMux sync.Mutex
// Analytics
updateCount int64
lastUpdate time.Time
spreadHistory []float64
}
func NewMarketDataManager(client pb.HyperliquidL1GatewayClient) *MarketDataManager {
return &MarketDataManager{
client: client,
subscribers: make([]chan *OrderBookSnapshot, 0),
spreadHistory: make([]float64, 0, 1000), // Keep last 1000 spreads
}
}
func (mdm *MarketDataManager) StartStreaming(ctx context.Context) error {
stream, err := mdm.client.StreamOrderBookSnapshots(
ctx,
&pb.Timestamp{Timestamp: 0},
)
if err != nil {
return err
}
log.Println("Starting order book stream...")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
response, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return err
}
if err := mdm.processSnapshot(response.Data); err != nil {
log.Printf("Failed to process snapshot: %v", err)
continue
}
}
}
}
func (mdm *MarketDataManager) processSnapshot(data []byte) error {
var snapshot OrderBookSnapshot
if err := json.Unmarshal(data, &snapshot); err != nil {
return err
}
// Update current book with thread safety
mdm.bookMutex.Lock()
mdm.currentBook = &snapshot
mdm.updateCount++
mdm.lastUpdate = time.Now()
mdm.bookMutex.Unlock()
// Calculate and store spread
spread := mdm.calculateSpread(&snapshot)
if spread > 0 {
mdm.addSpreadSample(spread)
}
// Notify subscribers
mdm.notifySubscribers(&snapshot)
log.Printf("Order book updated - Bids: %d, Asks: %d, Spread: %.6f",
len(snapshot.Bids), len(snapshot.Asks), spread)
return nil
}
func (mdm *MarketDataManager) calculateSpread(book *OrderBookSnapshot) float64 {
if len(book.Bids) == 0 || len(book.Asks) == 0 {
return 0
}
bestBid := parsePrice(book.Bids[0][0])
bestAsk := parsePrice(book.Asks[0][0])
if bestBid <= 0 || bestAsk <= 0 {
return 0
}
return bestAsk - bestBid
}
func (mdm *MarketDataManager) addSpreadSample(spread float64) {
if len(mdm.spreadHistory) >= cap(mdm.spreadHistory) {
// Remove oldest sample
copy(mdm.spreadHistory, mdm.spreadHistory[1:])
mdm.spreadHistory = mdm.spreadHistory[:len(mdm.spreadHistory)-1]
}
mdm.spreadHistory = append(mdm.spreadHistory, spread)
}
func (mdm *MarketDataManager) GetCurrentBook() *OrderBookSnapshot {
mdm.bookMutex.RLock()
defer mdm.bookMutex.RUnlock()
if mdm.currentBook == nil {
return nil
}
// Return a copy to avoid race conditions
bookCopy := *mdm.currentBook
return &bookCopy
}
func (mdm *MarketDataManager) Subscribe() <-chan *OrderBookSnapshot {
mdm.subscribersMux.Lock()
defer mdm.subscribersMux.Unlock()
ch := make(chan *OrderBookSnapshot, 100) // Buffered channel
mdm.subscribers = append(mdm.subscribers, ch)
return ch
}
func (mdm *MarketDataManager) notifySubscribers(snapshot *OrderBookSnapshot) {
mdm.subscribersMux.Lock()
defer mdm.subscribersMux.Unlock()
for _, ch := range mdm.subscribers {
select {
case ch <- snapshot:
default:
// Channel full, skip this update
log.Printf("Warning: Subscriber channel full, skipping update")
}
}
}
func (mdm *MarketDataManager) GetAnalytics() map[string]interface{} {
mdm.bookMutex.RLock()
defer mdm.bookMutex.RUnlock()
analytics := map[string]interface{}{
"update_count": mdm.updateCount,
"last_update": mdm.lastUpdate,
"spread_samples": len(mdm.spreadHistory),
}
if len(mdm.spreadHistory) > 0 {
sum := 0.0
min := mdm.spreadHistory[0]
max := mdm.spreadHistory[0]
for _, spread := range mdm.spreadHistory {
sum += spread
if spread < min {
min = spread
}
if spread > max {
max = spread
}
}
analytics["avg_spread"] = sum / float64(len(mdm.spreadHistory))
analytics["min_spread"] = min
analytics["max_spread"] = max
}
return analytics
}
func parsePrice(priceStr string) float64 {
// Implement price parsing logic
// This is a placeholder - implement based on actual price format
var price float64
// ... parsing logic
return price
}
func main() {
// Connect to Hyperliquid L1 Gateway
conn, err := grpc.Dial(
"<YOUR_HYPERLIQUID_ENDPOINT>", // Contact support@dwellir.com
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewHyperliquidL1GatewayClient(conn)
manager := NewMarketDataManager(client)
// Subscribe to order book updates
bookUpdates := manager.Subscribe()
// Start streaming in background
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
if err := manager.StartStreaming(ctx); err != nil {
log.Printf("Stream ended: %v", err)
}
}()
// Process order book updates
go func() {
for book := range bookUpdates {
// Implement your trading logic here
processTradingSignals(book)
}
}()
// Print analytics periodically
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
analytics := manager.GetAnalytics()
log.Printf("Analytics: %+v", analytics)
}
}
}
func processTradingSignals(book *OrderBookSnapshot) {
// Implement your trading algorithm here
// Example: Check for arbitrage opportunities, adjust orders, etc.
}
import grpc
import json
import time
import threading
from collections import deque
from statistics import mean, stdev
from hyperliquid_l1_gateway_pb2 import Timestamp
from hyperliquid_l1_gateway_pb2_grpc import HyperliquidL1GatewayStub
class OrderBookAnalyzer:
def __init__(self, endpoint):
self.channel = grpc.insecure_channel(endpoint)
self.stub = HyperliquidL1GatewayStub(self.channel)
self.current_book = None
self.book_lock = threading.RLock()
self.subscribers = []
self.running = False
# Analytics
self.update_count = 0
self.spread_history = deque(maxlen=1000)
self.depth_history = deque(maxlen=100)
self.last_update_time = None
def stream_order_books(self):
"""Start streaming order book snapshots"""
self.running = True
timestamp = Timestamp(timestamp=0)
try:
for snapshot in self.stub.StreamOrderBookSnapshots(timestamp):
if not self.running:
break
try:
book_data = json.loads(snapshot.data)
self.process_snapshot(book_data)
except json.JSONDecodeError as e:
print(f"JSON parsing error: {e}")
continue
except grpc.RpcError as e:
print(f"gRPC error: {e}")
except KeyboardInterrupt:
print("Stream interrupted by user")
finally:
self.running = False
def process_snapshot(self, book_data):
"""Process individual order book snapshot"""
with self.book_lock:
self.current_book = book_data
self.update_count += 1
self.last_update_time = time.time()
# Calculate metrics
spread = self.calculate_spread(book_data)
if spread is not None:
self.spread_history.append(spread)
depth_metrics = self.calculate_depth_metrics(book_data)
if depth_metrics:
self.depth_history.append(depth_metrics)
# Notify subscribers
self.notify_subscribers(book_data)
print(f"Book update - Spread: {spread:.6f}" if spread else "Book update")
def calculate_spread(self, book_data):
"""Calculate bid-ask spread"""
bids = book_data.get('bids', [])
asks = book_data.get('asks', [])
if not bids or not asks:
return None
try:
best_bid = float(bids[0][0])
best_ask = float(asks[0][0])
return best_ask - best_bid
except (ValueError, IndexError):
return None
def calculate_depth_metrics(self, book_data, levels=10):
"""Calculate market depth metrics"""
bids = book_data.get('bids', [])[:levels]
asks = book_data.get('asks', [])[:levels]
try:
bid_volume = sum(float(order[1]) for order in bids)
ask_volume = sum(float(order[1]) for order in asks)
return {
'bid_depth': bid_volume,
'ask_depth': ask_volume,
'total_depth': bid_volume + ask_volume,
'depth_imbalance': (bid_volume - ask_volume) / (bid_volume + ask_volume) if (bid_volume + ask_volume) > 0 else 0
}
except (ValueError, IndexError):
return None
def get_current_book(self):
"""Get current order book snapshot"""
with self.book_lock:
return self.current_book.copy() if self.current_book else None
def subscribe(self, callback):
"""Subscribe to order book updates"""
self.subscribers.append(callback)
def notify_subscribers(self, book_data):
"""Notify all subscribers of new data"""
for callback in self.subscribers:
try:
callback(book_data)
except Exception as e:
print(f"Subscriber callback error: {e}")
def get_spread_statistics(self):
"""Get spread statistics"""
if not self.spread_history:
return None
spreads = list(self.spread_history)
return {
'count': len(spreads),
'mean': mean(spreads),
'std': stdev(spreads) if len(spreads) > 1 else 0,
'min': min(spreads),
'max': max(spreads),
'latest': spreads[-1]
}
def get_depth_statistics(self):
"""Get depth statistics"""
if not self.depth_history:
return None
latest = self.depth_history[-1]
avg_depth = mean([d['total_depth'] for d in self.depth_history])
avg_imbalance = mean([d['depth_imbalance'] for d in self.depth_history])
return {
'latest_depth': latest['total_depth'],
'average_depth': avg_depth,
'average_imbalance': avg_imbalance,
'latest_imbalance': latest['depth_imbalance']
}
def detect_market_events(self, book_data):
"""Detect significant market events"""
events = []
spread = self.calculate_spread(book_data)
if spread and self.spread_history:
recent_spreads = list(self.spread_history)[-20:] # Last 20 spreads
if len(recent_spreads) > 5:
avg_recent_spread = mean(recent_spreads)
# Detect spread expansion
if spread > avg_recent_spread * 2:
events.append({
'type': 'SPREAD_EXPANSION',
'severity': 'HIGH' if spread > avg_recent_spread * 3 else 'MEDIUM',
'data': {'current_spread': spread, 'average_spread': avg_recent_spread}
})
# Detect spread compression
elif spread < avg_recent_spread * 0.5:
events.append({
'type': 'SPREAD_COMPRESSION',
'severity': 'MEDIUM',
'data': {'current_spread': spread, 'average_spread': avg_recent_spread}
})
# Detect depth changes
depth_metrics = self.calculate_depth_metrics(book_data)
if depth_metrics and self.depth_history:
recent_depth = mean([d['total_depth'] for d in list(self.depth_history)[-10:]])
if depth_metrics['total_depth'] < recent_depth * 0.5:
events.append({
'type': 'LIQUIDITY_DROP',
'severity': 'HIGH',
'data': {'current_depth': depth_metrics['total_depth'], 'average_depth': recent_depth}
})
return events
def stop(self):
"""Stop streaming"""
self.running = False
class TradingStrategy:
def __init__(self, analyzer):
self.analyzer = analyzer
self.position = 0
self.orders = []
# Subscribe to order book updates
analyzer.subscribe(self.on_book_update)
def on_book_update(self, book_data):
"""Handle order book updates"""
# Detect market events
events = self.analyzer.detect_market_events(book_data)
for event in events:
self.handle_market_event(event, book_data)
# Run main strategy logic
self.run_strategy_logic(book_data)
def handle_market_event(self, event, book_data):
"""Handle detected market events"""
if event['type'] == 'SPREAD_EXPANSION':
print(f"⚠️ Spread expansion detected: {event['data']['current_spread']:.6f}")
# Consider reducing position or widening spreads
elif event['type'] == 'LIQUIDITY_DROP':
print(f"📉 Liquidity drop detected: {event['data']['current_depth']:.4f}")
# Consider tightening risk management
def run_strategy_logic(self, book_data):
"""Main strategy logic"""
# Implement your trading strategy here
pass
def main():
analyzer = OrderBookAnalyzer('<YOUR_HYPERLIQUID_ENDPOINT>')
strategy = TradingStrategy(analyzer)
# Start streaming in background thread
stream_thread = threading.Thread(target=analyzer.stream_order_books)
stream_thread.start()
try:
# Print statistics every 30 seconds
while analyzer.running:
time.sleep(30)
spread_stats = analyzer.get_spread_statistics()
depth_stats = analyzer.get_depth_statistics()
if spread_stats:
print(f"\n📊 Spread Stats:")
print(f" Current: {spread_stats['latest']:.6f}")
print(f" Average: {spread_stats['mean']:.6f}")
print(f" Min/Max: {spread_stats['min']:.6f}/{spread_stats['max']:.6f}")
if depth_stats:
print(f"📈 Depth Stats:")
print(f" Current: {depth_stats['latest_depth']:.4f}")
print(f" Average: {depth_stats['average_depth']:.4f}")
print(f" Imbalance: {depth_stats['latest_imbalance']:.3f}")
print(f"Updates processed: {analyzer.update_count}")
except KeyboardInterrupt:
print("\nStopping...")
analyzer.stop()
stream_thread.join()
if __name__ == '__main__':
main()
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const EventEmitter = require('events');
// Load proto file
const packageDefinition = protoLoader.loadSync(
'hyperliquid_l1_gateway.proto',
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
}
);
class OrderBookStreamer extends EventEmitter {
constructor(endpoint) {
super();
const proto = grpc.loadPackageDefinition(packageDefinition);
this.client = new proto.hyperliquid_l1_gateway.v1.HyperliquidL1Gateway(
endpoint, // Contact support@dwellir.com
grpc.credentials.createInsecure()
);
this.currentBook = null;
this.updateCount = 0;
this.spreadHistory = [];
this.maxHistorySize = 1000;
this.isStreaming = false;
// Market making specific metrics
this.tickSize = 0.000001; // Adjust based on market
this.minSpread = 0.0001;
}
startStream() {
if (this.isStreaming) {
console.log('Stream already running');
return;
}
this.isStreaming = true;
const stream = this.client.StreamOrderBookSnapshots({ timestamp: 0 });
stream.on('data', (snapshot) => {
try {
const bookData = JSON.parse(snapshot.data.toString());
this.processSnapshot(bookData);
} catch (error) {
console.error('JSON parsing error:', error.message);
}
});
stream.on('error', (error) => {
console.error('Stream error:', error.message);
this.emit('error', error);
this.isStreaming = false;
});
stream.on('end', () => {
console.log('Stream ended');
this.emit('end');
this.isStreaming = false;
});
this.stream = stream;
console.log('Order book stream started');
}
processSnapshot(bookData) {
this.currentBook = bookData;
this.updateCount++;
// Calculate spread
const spread = this.calculateSpread(bookData);
if (spread !== null) {
this.addSpreadSample(spread);
}
// Emit events
this.emit('snapshot', bookData);
// Check for market making opportunities
const mmSignals = this.analyzeMarketMakingOpportunities(bookData);
if (mmSignals.length > 0) {
this.emit('marketMakingSignals', mmSignals);
}
// Detect significant changes
const marketEvents = this.detectMarketEvents(bookData);
if (marketEvents.length > 0) {
this.emit('marketEvents', marketEvents);
}
}
calculateSpread(bookData) {
const bids = bookData.bids || [];
const asks = bookData.asks || [];
if (bids.length === 0 || asks.length === 0) {
return null;
}
try {
const bestBid = parseFloat(bids[0][0]);
const bestAsk = parseFloat(asks[0][0]);
return bestAsk - bestBid;
} catch (error) {
return null;
}
}
addSpreadSample(spread) {
this.spreadHistory.push(spread);
if (this.spreadHistory.length > this.maxHistorySize) {
this.spreadHistory.shift();
}
}
analyzeMarketMakingOpportunities(bookData) {
const signals = [];
const bids = bookData.bids || [];
const asks = bookData.asks || [];
if (bids.length === 0 || asks.length === 0) {
return signals;
}
const bestBid = parseFloat(bids[0][0]);
const bestAsk = parseFloat(asks[0][0]);
const spread = bestAsk - bestBid;
const midPrice = (bestBid + bestAsk) / 2;
// Analyze spread opportunity
if (spread > this.minSpread * 2) {
signals.push({
type: 'WIDE_SPREAD_OPPORTUNITY',
data: {
spread,
suggestedBid: midPrice - (spread * 0.4),
suggestedAsk: midPrice + (spread * 0.4),
expectedProfit: spread * 0.2
}
});
}
// Analyze depth imbalance
const bidDepth = this.calculateDepth(bids, 10);
const askDepth = this.calculateDepth(asks, 10);
const totalDepth = bidDepth + askDepth;
if (totalDepth > 0) {
const imbalance = (bidDepth - askDepth) / totalDepth;
if (Math.abs(imbalance) > 0.3) {
signals.push({
type: 'DEPTH_IMBALANCE',
data: {
imbalance,
direction: imbalance > 0 ? 'BID_HEAVY' : 'ASK_HEAVY',
bidDepth,
askDepth
}
});
}
}
return signals;
}
calculateDepth(orders, levels = 10) {
return orders
.slice(0, levels)
.reduce((sum, order) => sum + parseFloat(order[1] || 0), 0);
}
detectMarketEvents(bookData) {
const events = [];
if (this.spreadHistory.length < 20) return events;
const currentSpread = this.calculateSpread(bookData);
if (currentSpread === null) return events;
const recentSpreads = this.spreadHistory.slice(-20);
const avgSpread = recentSpreads.reduce((a, b) => a + b, 0) / recentSpreads.length;
// Detect spread anomalies
if (currentSpread > avgSpread * 2) {
events.push({
type: 'SPREAD_EXPANSION',
severity: currentSpread > avgSpread * 3 ? 'HIGH' : 'MEDIUM',
data: { currentSpread, averageSpread: avgSpread }
});
} else if (currentSpread < avgSpread * 0.5) {
events.push({
type: 'SPREAD_COMPRESSION',
severity: 'MEDIUM',
data: { currentSpread, averageSpread: avgSpread }
});
}
return events;
}
getSpreadStatistics() {
if (this.spreadHistory.length === 0) return null;
const spreads = [...this.spreadHistory];
const mean = spreads.reduce((a, b) => a + b, 0) / spreads.length;
const variance = spreads.reduce((a, b) => a + Math.pow(b - mean, 2), 0) / spreads.length;
const stdDev = Math.sqrt(variance);
return {
count: spreads.length,
mean,
stdDev,
min: Math.min(...spreads),
max: Math.max(...spreads),
latest: spreads[spreads.length - 1]
};
}
getCurrentBook() {
return this.currentBook;
}
stopStream() {
if (this.stream) {
this.stream.cancel();
this.isStreaming = false;
console.log('Stream stopped');
}
}
}
// Market making strategy example
class MarketMakingStrategy {
constructor(streamer) {
this.streamer = streamer;
this.position = 0;
this.orders = new Map(); // orderId -> order
this.maxPosition = 100;
this.targetSpread = 0.0002;
this.setupEventHandlers();
}
setupEventHandlers() {
this.streamer.on('snapshot', (bookData) => {
this.updateStrategy(bookData);
});
this.streamer.on('marketMakingSignals', (signals) => {
this.handleMarketMakingSignals(signals);
});
this.streamer.on('marketEvents', (events) => {
this.handleMarketEvents(events);
});
}
updateStrategy(bookData) {
// Main strategy logic
const bids = bookData.bids || [];
const asks = bookData.asks || [];
if (bids.length === 0 || asks.length === 0) return;
const bestBid = parseFloat(bids[0][0]);
const bestAsk = parseFloat(asks[0][0]);
const midPrice = (bestBid + bestAsk) / 2;
// Calculate target prices
const targetBid = midPrice - (this.targetSpread / 2);
const targetAsk = midPrice + (this.targetSpread / 2);
console.log(`Strategy update - Target bid: ${targetBid.toFixed(6)}, Target ask: ${targetAsk.toFixed(6)}`);
// In a real implementation, you would place/update orders here
this.manageOrders(targetBid, targetAsk);
}
handleMarketMakingSignals(signals) {
for (const signal of signals) {
switch (signal.type) {
case 'WIDE_SPREAD_OPPORTUNITY':
console.log('🎯 Wide spread opportunity:', signal.data);
break;
case 'DEPTH_IMBALANCE':
console.log('⚖️ Depth imbalance:', signal.data);
break;
}
}
}
handleMarketEvents(events) {
for (const event of events) {
switch (event.type) {
case 'SPREAD_EXPANSION':
console.log('📈 Spread expansion detected:', event.data);
// Widen our spreads
this.targetSpread *= 1.2;
break;
case 'SPREAD_COMPRESSION':
console.log('📉 Spread compression detected:', event.data);
// Tighten our spreads
this.targetSpread *= 0.9;
break;
}
}
}
manageOrders(targetBid, targetAsk) {
// Placeholder for order management logic
// In a real implementation, you would:
// 1. Check existing orders
// 2. Cancel orders that are too far from target
// 3. Place new orders at target prices
// 4. Manage position risk
}
}
// Usage example
function main() {
const streamer = new OrderBookStreamer('<YOUR_HYPERLIQUID_ENDPOINT>');
const strategy = new MarketMakingStrategy(streamer);
streamer.startStream();
// Print statistics every 30 seconds
setInterval(() => {
const stats = streamer.getSpreadStatistics();
if (stats) {
console.log('\n📊 Spread Statistics:');
console.log(` Updates: ${streamer.updateCount}`);
console.log(` Mean spread: ${stats.mean.toFixed(6)}`);
console.log(` Std dev: ${stats.stdDev.toFixed(6)}`);
console.log(` Min/Max: ${stats.min.toFixed(6)}/${stats.max.toFixed(6)}`);
console.log(` Latest: ${stats.latest.toFixed(6)}`);
}
}, 30000);
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\nShutting down...');
streamer.stopStream();
process.exit(0);
});
}
main();
Common Use Cases
1. Algorithmic Trading
class AlgoTradingEngine:
def __init__(self, analyzer):
self.analyzer = analyzer
self.signals = []
analyzer.subscribe(self.on_book_update)
def on_book_update(self, book_data):
# Mean reversion strategy
signals = self.detect_mean_reversion_signals(book_data)
# Momentum strategy
momentum_signals = self.detect_momentum_signals(book_data)
# Combine signals
all_signals = signals + momentum_signals
for signal in all_signals:
self.execute_signal(signal)
def detect_mean_reversion_signals(self, book_data):
# Implement mean reversion logic
pass
def execute_signal(self, signal):
# Execute trading signal
print(f"Executing signal: {signal}")
2. Risk Management
class RiskManager {
constructor(streamer) {
this.streamer = streamer;
this.positionLimits = { max: 1000, current: 0 };
this.spreadThresholds = { warning: 0.001, critical: 0.005 };
streamer.on('snapshot', (bookData) => {
this.assessRisk(bookData);
});
}
assessRisk(bookData) {
const spread = this.streamer.calculateSpread(bookData);
const depth = this.calculateMarketDepth(bookData);
// Check spread risk
if (spread > this.spreadThresholds.critical) {
this.emit('riskAlert', {
type: 'WIDE_SPREAD',
severity: 'CRITICAL',
data: { spread, threshold: this.spreadThresholds.critical }
});
}
// Check liquidity risk
if (depth < 100) { // Minimum depth threshold
this.emit('riskAlert', {
type: 'LOW_LIQUIDITY',
severity: 'HIGH',
data: { depth }
});
}
}
calculateMarketDepth(bookData, maxLevels = 10) {
const bids = (bookData.bids || []).slice(0, maxLevels);
const asks = (bookData.asks || []).slice(0, maxLevels);
const bidDepth = bids.reduce((sum, order) => sum + parseFloat(order[1] || 0), 0);
const askDepth = asks.reduce((sum, order) => sum + parseFloat(order[1] || 0), 0);
return bidDepth + askDepth;
}
}
3. Market Data Distribution
type MarketDataDistributor struct {
manager *MarketDataManager
clients map[string]chan *OrderBookSnapshot
clientsMutex sync.RWMutex
}
func (mdd *MarketDataDistributor) AddClient(clientID string) <-chan *OrderBookSnapshot {
mdd.clientsMutex.Lock()
defer mdd.clientsMutex.Unlock()
ch := make(chan *OrderBookSnapshot, 100)
mdd.clients[clientID] = ch
return ch
}
func (mdd *MarketDataDistributor) RemoveClient(clientID string) {
mdd.clientsMutex.Lock()
defer mdd.clientsMutex.Unlock()
if ch, exists := mdd.clients[clientID]; exists {
close(ch)
delete(mdd.clients, clientID)
}
}
func (mdd *MarketDataDistributor) DistributeSnapshot(snapshot *OrderBookSnapshot) {
mdd.clientsMutex.RLock()
defer mdd.clientsMutex.RUnlock()
for clientID, ch := range mdd.clients {
select {
case ch <- snapshot:
default:
log.Printf("Client %s channel full, dropping update", clientID)
}
}
}
Performance Optimization
1. Efficient Processing
class OptimizedBookProcessor {
constructor() {
this.processingQueue = [];
this.isProcessing = false;
this.batchSize = 10;
}
async processSnapshot(snapshot) {
// Add to queue instead of processing immediately
this.processingQueue.push(snapshot);
if (!this.isProcessing) {
this.processBatch();
}
}
async processBatch() {
this.isProcessing = true;
while (this.processingQueue.length > 0) {
const batch = this.processingQueue.splice(0, this.batchSize);
// Process batch in parallel
await Promise.all(batch.map(snapshot => this.processIndividual(snapshot)));
}
this.isProcessing = false;
}
async processIndividual(snapshot) {
// Individual processing logic
}
}
2. Memory Management
class MemoryEfficientAnalyzer:
def __init__(self, max_history=1000):
self.max_history = max_history
self.spread_buffer = collections.deque(maxlen=max_history)
self.depth_buffer = collections.deque(maxlen=max_history)
def add_sample(self, spread, depth):
# Automatically manages memory with fixed-size deques
self.spread_buffer.append(spread)
self.depth_buffer.append(depth)
def get_recent_stats(self, window=100):
recent_spreads = list(itertools.islice(self.spread_buffer, max(0, len(self.spread_buffer) - window), None))
return {
'mean_spread': statistics.mean(recent_spreads) if recent_spreads else 0,
'samples': len(recent_spreads)
}
Best Practices
- Buffer Management: Use appropriately sized buffers to handle bursts of updates
- Statistical Analysis: Track key metrics like spread, depth, and volatility over time
- Event Detection: Implement algorithms to detect significant market events
- Resource Management: Monitor memory usage and implement cleanup for long-running processes
- Error Recovery: Handle stream interruptions gracefully with automatic reconnection
- Data Validation: Validate order book data integrity before processing
Current Limitations
- Historical Data: Cannot stream from historical timestamps; only real-time streaming available
- Data Retention: Node maintains only 24 hours of historical data
- Order Book Depth: Full depth may vary depending on market conditions
- Update Frequency: Update frequency depends on market activity and network conditions
Need help? Contact our support team or check the Hyperliquid gRPC documentation.