StreamBlockTrades
Stream continuous trade data as blocks are produced, providing real-time access to all trading activity on Hyperliquid.
When to Use This Method
StreamBlockTrades
is essential for:
- Trading Analytics - Analyze volume patterns and market trends
- Price Discovery - Track executed trades and market prices
- Market Surveillance - Monitor trading behavior and detect anomalies
- Performance Analytics - Measure execution quality and slippage
Method Signature
rpc StreamBlockTrades(Timestamp) returns (stream BlockTrades) {}
Request Message
message Timestamp {
int64 timestamp = 1;
}
Current Limitation: The timestamp parameter is not yet implemented. Streams currently start from "now" and stream forward only, regardless of the timestamp value provided.
Response Stream
message BlockTrades {
// JSON-encoded object conforming to files of
// Hyperliquid data dir "node_trades"
bytes data = 1;
}
The data
field contains JSON-encoded trade information including:
- Trade execution details (price, size, side)
- Market identifiers and trading pairs
- Execution timestamps and block heights
- Trade IDs and order matching information
Implementation Examples
- Go
- Python
- Node.js
package main
import (
"context"
"encoding/json"
"log"
"sync"
"time"
pb "your-project/hyperliquid_l1_gateway/v1"
"google.golang.org/grpc"
)
type Trade struct {
ID string `json:"id"`
Symbol string `json:"symbol"`
Price float64 `json:"price"`
Size float64 `json:"size"`
Side string `json:"side"` // "buy" or "sell"
Timestamp int64 `json:"timestamp"`
BlockHash string `json:"block_hash"`
}
type TradeAnalytics struct {
TotalVolume float64
TradeCount int64
BuyVolume float64
SellVolume float64
WeightedAvgPrice float64
LastTrade *Trade
TimeWindow time.Duration
StartTime time.Time
mutex sync.RWMutex
}
type TradeStreamProcessor struct {
client pb.HyperliquidL1GatewayClient
analytics map[string]*TradeAnalytics // symbol -> analytics
mutex sync.RWMutex
// Event handlers
tradeHandlers []func(*Trade)
}
func NewTradeStreamProcessor(client pb.HyperliquidL1GatewayClient) *TradeStreamProcessor {
return &TradeStreamProcessor{
client: client,
analytics: make(map[string]*TradeAnalytics),
tradeHandlers: make([]func(*Trade), 0),
}
}
func (tsp *TradeStreamProcessor) StreamTrades(ctx context.Context) error {
stream, err := tsp.client.StreamBlockTrades(
ctx,
&pb.Timestamp{Timestamp: 0},
)
if err != nil {
return err
}
log.Println("Starting trade stream...")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
blockTrades, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return err
}
if err := tsp.processBlockTrades(blockTrades.Data); err != nil {
log.Printf("Failed to process trades: %v", err)
continue
}
}
}
}
func (tsp *TradeStreamProcessor) processBlockTrades(data []byte) error {
var tradesData struct {
Trades []Trade `json:"trades"`
Block struct {
Height int64 `json:"height"`
Hash string `json:"hash"`
Timestamp int64 `json:"timestamp"`
} `json:"block"`
}
if err := json.Unmarshal(data, &tradesData); err != nil {
return err
}
log.Printf("Processing %d trades from block %d",
len(tradesData.Trades), tradesData.Block.Height)
// Process each trade
for _, trade := range tradesData.Trades {
trade.BlockHash = tradesData.Block.Hash
tsp.updateAnalytics(&trade)
tsp.notifyHandlers(&trade)
}
return nil
}
func (tsp *TradeStreamProcessor) updateAnalytics(trade *Trade) {
tsp.mutex.Lock()
defer tsp.mutex.Unlock()
analytics, exists := tsp.analytics[trade.Symbol]
if !exists {
analytics = &TradeAnalytics{
StartTime: time.Now(),
TimeWindow: time.Hour, // 1-hour window
}
tsp.analytics[trade.Symbol] = analytics
}
analytics.mutex.Lock()
defer analytics.mutex.Unlock()
// Update trade counts and volume
analytics.TradeCount++
analytics.TotalVolume += trade.Size
analytics.LastTrade = trade
// Track buy/sell volume
if trade.Side == "buy" {
analytics.BuyVolume += trade.Size
} else {
analytics.SellVolume += trade.Size
}
// Update weighted average price
totalValue := analytics.WeightedAvgPrice * (analytics.TotalVolume - trade.Size)
totalValue += trade.Price * trade.Size
analytics.WeightedAvgPrice = totalValue / analytics.TotalVolume
log.Printf("Trade: %s %.6f @ %.6f (%s)",
trade.Symbol, trade.Size, trade.Price, trade.Side)
}
func (tsp *TradeStreamProcessor) AddTradeHandler(handler func(*Trade)) {
tsp.tradeHandlers = append(tsp.tradeHandlers, handler)
}
func (tsp *TradeStreamProcessor) notifyHandlers(trade *Trade) {
for _, handler := range tsp.tradeHandlers {
go handler(trade) // Handle asynchronously
}
}
func (tsp *TradeStreamProcessor) GetAnalytics(symbol string) *TradeAnalytics {
tsp.mutex.RLock()
defer tsp.mutex.RUnlock()
analytics, exists := tsp.analytics[symbol]
if !exists {
return nil
}
analytics.mutex.RLock()
defer analytics.mutex.RUnlock()
// Return a copy to avoid race conditions
analyticsCopy := *analytics
return &analyticsCopy
}
func (tsp *TradeStreamProcessor) GetAllAnalytics() map[string]*TradeAnalytics {
tsp.mutex.RLock()
defer tsp.mutex.RUnlock()
result := make(map[string]*TradeAnalytics)
for symbol, analytics := range tsp.analytics {
analytics.mutex.RLock()
analyticsCopy := *analytics
analytics.mutex.RUnlock()
result[symbol] = &analyticsCopy
}
return result
}
// Advanced analytics functions
func (tsp *TradeStreamProcessor) CalculateVWAP(symbol string, window time.Duration) float64 {
analytics := tsp.GetAnalytics(symbol)
if analytics == nil {
return 0
}
// In a real implementation, you'd maintain a time-windowed history
// This is a simplified version using current session data
return analytics.WeightedAvgPrice
}
func (tsp *TradeStreamProcessor) GetMarketSentiment(symbol string) string {
analytics := tsp.GetAnalytics(symbol)
if analytics == nil {
return "UNKNOWN"
}
totalVolume := analytics.BuyVolume + analytics.SellVolume
if totalVolume == 0 {
return "NEUTRAL"
}
buyRatio := analytics.BuyVolume / totalVolume
if buyRatio > 0.6 {
return "BULLISH"
} else if buyRatio < 0.4 {
return "BEARISH"
}
return "NEUTRAL"
}
func main() {
// Connect to Hyperliquid L1 Gateway
conn, err := grpc.Dial(
"<YOUR_HYPERLIQUID_ENDPOINT>", // Contact support@dwellir.com
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewHyperliquidL1GatewayClient(conn)
processor := NewTradeStreamProcessor(client)
// Add custom trade handlers
processor.AddTradeHandler(func(trade *Trade) {
// Large trade alert
if trade.Size > 1000 {
log.Printf("🚨 Large trade alert: %s %.2f @ %.6f",
trade.Symbol, trade.Size, trade.Price)
}
})
processor.AddTradeHandler(func(trade *Trade) {
// Price change detection
analytics := processor.GetAnalytics(trade.Symbol)
if analytics != nil && analytics.LastTrade != nil {
prevPrice := analytics.LastTrade.Price
priceChange := (trade.Price - prevPrice) / prevPrice
if abs(priceChange) > 0.01 { // 1% price change
log.Printf("📈 Price movement: %s %.2f%% to %.6f",
trade.Symbol, priceChange*100, trade.Price)
}
}
})
// Start streaming in background
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
if err := processor.StreamTrades(ctx); err != nil {
log.Printf("Stream ended: %v", err)
}
}()
// Print analytics periodically
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
allAnalytics := processor.GetAllAnalytics()
log.Println("=== TRADE ANALYTICS ===")
for symbol, analytics := range allAnalytics {
sentiment := processor.GetMarketSentiment(symbol)
log.Printf("%s: %d trades, %.2f volume, VWAP: %.6f, Sentiment: %s",
symbol, analytics.TradeCount, analytics.TotalVolume,
analytics.WeightedAvgPrice, sentiment)
}
}
}
}
func abs(x float64) float64 {
if x < 0 {
return -x
}
return x
}
import grpc
import json
import time
import threading
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Dict, List, Optional, Callable
from statistics import mean, median
from hyperliquid_l1_gateway_pb2 import Timestamp
from hyperliquid_l1_gateway_pb2_grpc import HyperliquidL1GatewayStub
@dataclass
class Trade:
id: str
symbol: str
price: float
size: float
side: str # 'buy' or 'sell'
timestamp: int
block_hash: str = ""
class TradeStreamAnalyzer:
def __init__(self, endpoint):
self.channel = grpc.insecure_channel(endpoint)
self.stub = HyperliquidL1GatewayStub(self.channel)
# Trade storage
self.recent_trades = defaultdict(lambda: deque(maxlen=1000))
self.trade_handlers = []
self.running = False
# Analytics
self.trade_counts = defaultdict(int)
self.volume_by_symbol = defaultdict(float)
self.buy_volume = defaultdict(float)
self.sell_volume = defaultdict(float)
self.price_history = defaultdict(lambda: deque(maxlen=100))
self.lock = threading.RLock()
def stream_trades(self):
"""Start streaming trade data"""
self.running = True
timestamp = Timestamp(timestamp=0)
try:
for block_trades in self.stub.StreamBlockTrades(timestamp):
if not self.running:
break
try:
trades_data = json.loads(block_trades.data)
self.process_block_trades(trades_data)
except json.JSONDecodeError as e:
print(f"JSON parsing error: {e}")
continue
except grpc.RpcError as e:
print(f"gRPC error: {e}")
except KeyboardInterrupt:
print("Stream interrupted by user")
finally:
self.running = False
def process_block_trades(self, trades_data):
"""Process trades from a single block"""
block_info = trades_data.get('block', {})
trades = trades_data.get('trades', [])
print(f"Processing {len(trades)} trades from block {block_info.get('height', '?')}")
with self.lock:
for trade_data in trades:
trade = Trade(
id=trade_data.get('id', ''),
symbol=trade_data.get('symbol', ''),
price=float(trade_data.get('price', 0)),
size=float(trade_data.get('size', 0)),
side=trade_data.get('side', ''),
timestamp=int(trade_data.get('timestamp', 0)),
block_hash=block_info.get('hash', '')
)
self.update_analytics(trade)
self.notify_handlers(trade)
def update_analytics(self, trade: Trade):
"""Update trade analytics"""
symbol = trade.symbol
# Update counters
self.trade_counts[symbol] += 1
self.volume_by_symbol[symbol] += trade.size
# Track buy/sell volume
if trade.side == 'buy':
self.buy_volume[symbol] += trade.size
else:
self.sell_volume[symbol] += trade.size
# Store recent trades and prices
self.recent_trades[symbol].append(trade)
self.price_history[symbol].append(trade.price)
print(f"Trade: {symbol} {trade.size:.6f} @ {trade.price:.6f} ({trade.side})")
def add_trade_handler(self, handler: Callable[[Trade], None]):
"""Add a custom trade handler"""
self.trade_handlers.append(handler)
def notify_handlers(self, trade: Trade):
"""Notify all registered handlers"""
for handler in self.trade_handlers:
try:
handler(trade)
except Exception as e:
print(f"Handler error: {e}")
def get_vwap(self, symbol: str, window_trades: int = 100) -> Optional[float]:
"""Calculate Volume Weighted Average Price"""
with self.lock:
trades = list(self.recent_trades[symbol])
if not trades:
return None
# Use last N trades
recent_trades = trades[-window_trades:] if len(trades) > window_trades else trades
total_value = sum(trade.price * trade.size for trade in recent_trades)
total_volume = sum(trade.size for trade in recent_trades)
return total_value / total_volume if total_volume > 0 else None
def get_price_statistics(self, symbol: str) -> Dict:
"""Get price statistics for a symbol"""
with self.lock:
prices = list(self.price_history[symbol])
if not prices:
return {}
return {
'current': prices[-1],
'high': max(prices),
'low': min(prices),
'mean': mean(prices),
'median': median(prices),
'samples': len(prices)
}
def get_market_sentiment(self, symbol: str) -> str:
"""Determine market sentiment based on buy/sell volume"""
with self.lock:
buy_vol = self.buy_volume[symbol]
sell_vol = self.sell_volume[symbol]
total_vol = buy_vol + sell_vol
if total_vol == 0:
return "NEUTRAL"
buy_ratio = buy_vol / total_vol
if buy_ratio > 0.65:
return "STRONGLY_BULLISH"
elif buy_ratio > 0.55:
return "BULLISH"
elif buy_ratio < 0.35:
return "STRONGLY_BEARISH"
elif buy_ratio < 0.45:
return "BEARISH"
else:
return "NEUTRAL"
def detect_large_trades(self, trade: Trade, threshold: float = 1000.0) -> bool:
"""Detect unusually large trades"""
return trade.size > threshold
def detect_price_anomalies(self, trade: Trade, threshold: float = 0.02) -> Optional[Dict]:
"""Detect significant price movements"""
with self.lock:
recent_prices = list(self.price_history[trade.symbol])
if len(recent_prices) < 10:
return None
# Calculate recent average price
recent_avg = mean(recent_prices[-10:-1]) # Exclude current price
price_change = (trade.price - recent_avg) / recent_avg
if abs(price_change) > threshold:
return {
'symbol': trade.symbol,
'price_change_pct': price_change * 100,
'new_price': trade.price,
'recent_avg': recent_avg,
'direction': 'UP' if price_change > 0 else 'DOWN'
}
return None
def get_trade_velocity(self, symbol: str, time_window: int = 300) -> float:
"""Calculate trades per second in recent time window"""
with self.lock:
trades = list(self.recent_trades[symbol])
if not trades:
return 0.0
current_time = time.time()
recent_trades = [
t for t in trades
if current_time - t.timestamp <= time_window
]
return len(recent_trades) / time_window if recent_trades else 0.0
def get_analytics_summary(self) -> Dict:
"""Get comprehensive analytics summary"""
with self.lock:
summary = {}
for symbol in self.trade_counts.keys():
summary[symbol] = {
'trade_count': self.trade_counts[symbol],
'total_volume': self.volume_by_symbol[symbol],
'buy_volume': self.buy_volume[symbol],
'sell_volume': self.sell_volume[symbol],
'vwap': self.get_vwap(symbol),
'sentiment': self.get_market_sentiment(symbol),
'price_stats': self.get_price_statistics(symbol),
'trade_velocity': self.get_trade_velocity(symbol)
}
return summary
def stop(self):
"""Stop streaming"""
self.running = False
class TradingSignalGenerator:
def __init__(self, analyzer: TradeStreamAnalyzer):
self.analyzer = analyzer
self.signals = []
# Register as trade handler
analyzer.add_trade_handler(self.on_trade)
def on_trade(self, trade: Trade):
"""Process individual trades for signals"""
# Check for large trade signals
if self.analyzer.detect_large_trades(trade, threshold=500):
self.generate_signal({
'type': 'LARGE_TRADE',
'symbol': trade.symbol,
'size': trade.size,
'price': trade.price,
'side': trade.side,
'timestamp': trade.timestamp
})
# Check for price anomaly signals
anomaly = self.analyzer.detect_price_anomalies(trade, threshold=0.015)
if anomaly:
self.generate_signal({
'type': 'PRICE_ANOMALY',
**anomaly,
'timestamp': trade.timestamp
})
# Check for momentum signals
self.check_momentum_signals(trade)
def check_momentum_signals(self, trade: Trade):
"""Check for momentum-based trading signals"""
vwap = self.analyzer.get_vwap(trade.symbol, 50)
if vwap is None:
return
# Price above VWAP with increasing volume
if trade.price > vwap * 1.005: # 0.5% above VWAP
sentiment = self.analyzer.get_market_sentiment(trade.symbol)
if sentiment in ['BULLISH', 'STRONGLY_BULLISH']:
self.generate_signal({
'type': 'BULLISH_MOMENTUM',
'symbol': trade.symbol,
'price': trade.price,
'vwap': vwap,
'sentiment': sentiment,
'timestamp': trade.timestamp
})
def generate_signal(self, signal: Dict):
"""Generate and store trading signal"""
self.signals.append(signal)
# Keep only recent signals
if len(self.signals) > 1000:
self.signals = self.signals[-500:]
print(f"🔔 Signal: {signal['type']} - {signal.get('symbol', '')}")
def get_recent_signals(self, limit: int = 20) -> List[Dict]:
"""Get recent trading signals"""
return self.signals[-limit:] if self.signals else []
def main():
analyzer = TradeStreamAnalyzer('<YOUR_HYPERLIQUID_ENDPOINT>')
signal_generator = TradingSignalGenerator(analyzer)
# Add custom handlers
def large_trade_handler(trade: Trade):
if trade.size > 1000:
print(f"🐋 Whale alert: {trade.symbol} {trade.size:.2f} @ {trade.price:.6f}")
def volume_spike_handler(trade: Trade):
velocity = analyzer.get_trade_velocity(trade.symbol, 60) # 1-minute window
if velocity > 0.5: # More than 0.5 trades/second
print(f"🚀 Volume spike: {trade.symbol} - {velocity:.2f} trades/sec")
analyzer.add_trade_handler(large_trade_handler)
analyzer.add_trade_handler(volume_spike_handler)
# Start streaming in background thread
stream_thread = threading.Thread(target=analyzer.stream_trades)
stream_thread.start()
try:
# Print analytics every 30 seconds
while analyzer.running:
time.sleep(30)
summary = analyzer.get_analytics_summary()
signals = signal_generator.get_recent_signals(5)
print(f"\n📊 Trading Analytics:")
for symbol, stats in summary.items():
print(f" {symbol}:")
print(f" Trades: {stats['trade_count']}, Volume: {stats['total_volume']:.2f}")
print(f" VWAP: {stats['vwap']:.6f}, Sentiment: {stats['sentiment']}")
print(f" Velocity: {stats['trade_velocity']:.3f} trades/sec")
if signals:
print(f"\n🔔 Recent Signals:")
for signal in signals[-3:]:
print(f" {signal['type']}: {signal.get('symbol', '')} @ {signal.get('timestamp', '')}")
except KeyboardInterrupt:
print("\nStopping...")
analyzer.stop()
stream_thread.join()
if __name__ == '__main__':
main()
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const EventEmitter = require('events');
// Load proto file
const packageDefinition = protoLoader.loadSync(
'hyperliquid_l1_gateway.proto',
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
}
);
class TradeStreamProcessor extends EventEmitter {
constructor(endpoint) {
super();
const proto = grpc.loadPackageDefinition(packageDefinition);
this.client = new proto.hyperliquid_l1_gateway.v1.HyperliquidL1Gateway(
endpoint, // Contact support@dwellir.com
grpc.credentials.createInsecure()
);
// Trade storage and analytics
this.recentTrades = new Map(); // symbol -> array of recent trades
this.tradeCounts = new Map();
this.volumeBySymbol = new Map();
this.buyVolume = new Map();
this.sellVolume = new Map();
this.priceHistory = new Map();
this.maxTradeHistory = 1000;
this.maxPriceHistory = 100;
this.isStreaming = false;
// Performance metrics
this.tradesProcessed = 0;
this.startTime = Date.now();
}
startStream() {
if (this.isStreaming) {
console.log('Stream already running');
return;
}
this.isStreaming = true;
const stream = this.client.StreamBlockTrades({ timestamp: 0 });
stream.on('data', (blockTrades) => {
try {
const tradesData = JSON.parse(blockTrades.data.toString());
this.processBlockTrades(tradesData);
} catch (error) {
console.error('JSON parsing error:', error.message);
}
});
stream.on('error', (error) => {
console.error('Stream error:', error.message);
this.emit('error', error);
this.isStreaming = false;
});
stream.on('end', () => {
console.log('Stream ended');
this.emit('end');
this.isStreaming = false;
});
this.stream = stream;
console.log('Trade stream started');
}
processBlockTrades(tradesData) {
const blockInfo = tradesData.block || {};
const trades = tradesData.trades || [];
console.log(`Processing ${trades.length} trades from block ${blockInfo.height || '?'}`);
for (const tradeData of trades) {
const trade = {
id: tradeData.id || '',
symbol: tradeData.symbol || '',
price: parseFloat(tradeData.price || 0),
size: parseFloat(tradeData.size || 0),
side: tradeData.side || '',
timestamp: parseInt(tradeData.timestamp || 0),
blockHash: blockInfo.hash || '',
blockHeight: blockInfo.height || 0
};
this.updateAnalytics(trade);
this.emit('trade', trade);
this.tradesProcessed++;
}
}
updateAnalytics(trade) {
const { symbol, price, size, side } = trade;
// Initialize maps if needed
if (!this.recentTrades.has(symbol)) {
this.recentTrades.set(symbol, []);
this.tradeCounts.set(symbol, 0);
this.volumeBySymbol.set(symbol, 0);
this.buyVolume.set(symbol, 0);
this.sellVolume.set(symbol, 0);
this.priceHistory.set(symbol, []);
}
// Update trade storage
const trades = this.recentTrades.get(symbol);
trades.push(trade);
if (trades.length > this.maxTradeHistory) {
trades.shift();
}
// Update counters
this.tradeCounts.set(symbol, this.tradeCounts.get(symbol) + 1);
this.volumeBySymbol.set(symbol, this.volumeBySymbol.get(symbol) + size);
// Update buy/sell volume
if (side === 'buy') {
this.buyVolume.set(symbol, this.buyVolume.get(symbol) + size);
} else {
this.sellVolume.set(symbol, this.sellVolume.get(symbol) + size);
}
// Update price history
const prices = this.priceHistory.get(symbol);
prices.push(price);
if (prices.length > this.maxPriceHistory) {
prices.shift();
}
console.log(`Trade: ${symbol} ${size.toFixed(6)} @ ${price.toFixed(6)} (${side})`);
}
getVWAP(symbol, windowSize = 100) {
const trades = this.recentTrades.get(symbol);
if (!trades || trades.length === 0) return null;
const recentTrades = trades.slice(-windowSize);
const totalValue = recentTrades.reduce((sum, trade) => sum + (trade.price * trade.size), 0);
const totalVolume = recentTrades.reduce((sum, trade) => sum + trade.size, 0);
return totalVolume > 0 ? totalValue / totalVolume : null;
}
getPriceStatistics(symbol) {
const prices = this.priceHistory.get(symbol);
if (!prices || prices.length === 0) return null;
const sortedPrices = [...prices].sort((a, b) => a - b);
const sum = prices.reduce((a, b) => a + b, 0);
return {
current: prices[prices.length - 1],
high: Math.max(...prices),
low: Math.min(...prices),
mean: sum / prices.length,
median: sortedPrices[Math.floor(sortedPrices.length / 2)],
samples: prices.length
};
}
getMarketSentiment(symbol) {
const buyVol = this.buyVolume.get(symbol) || 0;
const sellVol = this.sellVolume.get(symbol) || 0;
const totalVol = buyVol + sellVol;
if (totalVol === 0) return 'NEUTRAL';
const buyRatio = buyVol / totalVol;
if (buyRatio > 0.65) return 'STRONGLY_BULLISH';
if (buyRatio > 0.55) return 'BULLISH';
if (buyRatio < 0.35) return 'STRONGLY_BEARISH';
if (buyRatio < 0.45) return 'BEARISH';
return 'NEUTRAL';
}
detectLargeTrade(trade, threshold = 1000) {
return trade.size > threshold;
}
detectPriceAnomaly(trade, thresholdPercent = 2) {
const prices = this.priceHistory.get(trade.symbol);
if (!prices || prices.length < 10) return null;
// Calculate recent average (excluding current price)
const recentPrices = prices.slice(-10, -1);
const recentAvg = recentPrices.reduce((a, b) => a + b, 0) / recentPrices.length;
const priceChange = (trade.price - recentAvg) / recentAvg;
const priceChangePercent = Math.abs(priceChange) * 100;
if (priceChangePercent > thresholdPercent) {
return {
symbol: trade.symbol,
priceChangePercent: priceChange * 100,
newPrice: trade.price,
recentAvg: recentAvg,
direction: priceChange > 0 ? 'UP' : 'DOWN'
};
}
return null;
}
getTradeVelocity(symbol, timeWindowSeconds = 300) {
const trades = this.recentTrades.get(symbol);
if (!trades) return 0;
const cutoffTime = Date.now() - (timeWindowSeconds * 1000);
const recentTrades = trades.filter(trade => trade.timestamp * 1000 > cutoffTime);
return recentTrades.length / timeWindowSeconds;
}
getAnalyticsSummary() {
const summary = {};
for (const symbol of this.recentTrades.keys()) {
summary[symbol] = {
tradeCount: this.tradeCounts.get(symbol),
totalVolume: this.volumeBySymbol.get(symbol),
buyVolume: this.buyVolume.get(symbol),
sellVolume: this.sellVolume.get(symbol),
vwap: this.getVWAP(symbol),
sentiment: this.getMarketSentiment(symbol),
priceStats: this.getPriceStatistics(symbol),
tradeVelocity: this.getTradeVelocity(symbol)
};
}
return summary;
}
getPerformanceMetrics() {
const uptimeSeconds = (Date.now() - this.startTime) / 1000;
return {
tradesProcessed: this.tradesProcessed,
uptimeSeconds,
tradesPerSecond: this.tradesProcessed / uptimeSeconds,
activeSymbols: this.recentTrades.size
};
}
stopStream() {
if (this.stream) {
this.stream.cancel();
this.isStreaming = false;
console.log('Stream stopped');
}
}
}
class TradingSignalEngine {
constructor(processor) {
this.processor = processor;
this.signals = [];
this.maxSignals = 1000;
processor.on('trade', (trade) => {
this.analyzeTradeForSignals(trade);
});
}
analyzeTradeForSignals(trade) {
// Large trade detection
if (this.processor.detectLargeTrade(trade, 500)) {
this.generateSignal({
type: 'LARGE_TRADE',
symbol: trade.symbol,
size: trade.size,
price: trade.price,
side: trade.side,
timestamp: trade.timestamp
});
}
// Price anomaly detection
const anomaly = this.processor.detectPriceAnomaly(trade, 1.5);
if (anomaly) {
this.generateSignal({
type: 'PRICE_ANOMALY',
...anomaly,
timestamp: trade.timestamp
});
}
// Momentum signals
this.checkMomentumSignals(trade);
}
checkMomentumSignals(trade) {
const vwap = this.processor.getVWAP(trade.symbol, 50);
if (!vwap) return;
const priceVsVwap = (trade.price - vwap) / vwap;
const sentiment = this.processor.getMarketSentiment(trade.symbol);
// Bullish momentum: price significantly above VWAP with bullish sentiment
if (priceVsVwap > 0.005 && ['BULLISH', 'STRONGLY_BULLISH'].includes(sentiment)) {
this.generateSignal({
type: 'BULLISH_MOMENTUM',
symbol: trade.symbol,
price: trade.price,
vwap: vwap,
priceVsVwap: priceVsVwap,
sentiment: sentiment,
timestamp: trade.timestamp
});
}
// Bearish momentum: price significantly below VWAP with bearish sentiment
if (priceVsVwap < -0.005 && ['BEARISH', 'STRONGLY_BEARISH'].includes(sentiment)) {
this.generateSignal({
type: 'BEARISH_MOMENTUM',
symbol: trade.symbol,
price: trade.price,
vwap: vwap,
priceVsVwap: priceVsVwap,
sentiment: sentiment,
timestamp: trade.timestamp
});
}
}
generateSignal(signal) {
this.signals.push(signal);
// Keep only recent signals
if (this.signals.length > this.maxSignals) {
this.signals = this.signals.slice(-this.maxSignals / 2);
}
console.log(`🔔 Signal: ${signal.type} - ${signal.symbol || 'N/A'}`);
// Emit for external handlers
this.processor.emit('signal', signal);
}
getRecentSignals(limit = 20) {
return this.signals.slice(-limit);
}
getSignalsByType(type, limit = 10) {
return this.signals
.filter(signal => signal.type === type)
.slice(-limit);
}
}
// Usage example
function setupTradeMonitoring() {
const processor = new TradeStreamProcessor('<YOUR_HYPERLIQUID_ENDPOINT>');
const signalEngine = new TradingSignalEngine(processor);
// Set up event handlers
processor.on('trade', (trade) => {
// Custom trade processing
if (trade.size > 1000) {
console.log(`🐋 Whale trade: ${trade.symbol} ${trade.size.toFixed(2)} @ ${trade.price.toFixed(6)}`);
}
});
processor.on('signal', (signal) => {
// Handle trading signals
console.log(`📈 Trading signal: ${JSON.stringify(signal, null, 2)}`);
});
processor.on('error', (error) => {
console.error('Stream error, attempting restart...', error.message);
setTimeout(() => {
processor.startStream();
}, 5000);
});
// Start streaming
processor.startStream();
// Print analytics periodically
const analyticsInterval = setInterval(() => {
const summary = processor.getAnalyticsSummary();
const performance = processor.getPerformanceMetrics();
const recentSignals = signalEngine.getRecentSignals(3);
console.log('\n📊 Trading Analytics:');
console.log(`Performance: ${performance.tradesProcessed} trades, ${performance.tradesPerSecond.toFixed(2)} trades/sec`);
for (const [symbol, stats] of Object.entries(summary)) {
console.log(` ${symbol}:`);
console.log(` Trades: ${stats.tradeCount}, Volume: ${stats.totalVolume.toFixed(2)}`);
console.log(` VWAP: ${stats.vwap ? stats.vwap.toFixed(6) : 'N/A'}, Sentiment: ${stats.sentiment}`);
console.log(` Velocity: ${stats.tradeVelocity.toFixed(3)} trades/sec`);
}
if (recentSignals.length > 0) {
console.log('\n🔔 Recent Signals:');
recentSignals.forEach(signal => {
console.log(` ${signal.type}: ${signal.symbol || 'N/A'}`);
});
}
}, 30000);
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\nShutting down...');
clearInterval(analyticsInterval);
processor.stopStream();
process.exit(0);
});
return { processor, signalEngine };
}
// Start monitoring
setupTradeMonitoring();
Common Use Cases
1. Volume-Weighted Average Price (VWAP) Calculation
class VWAPCalculator:
def __init__(self, analyzer):
self.analyzer = analyzer
self.vwap_windows = {
'short': 50, # 50 trades
'medium': 200, # 200 trades
'long': 500 # 500 trades
}
def calculate_vwap(self, symbol: str, window_type: str = 'medium') -> float:
window_size = self.vwap_windows.get(window_type, 200)
return self.analyzer.get_vwap(symbol, window_size)
def get_vwap_deviation(self, trade: Trade, window_type: str = 'medium') -> float:
vwap = self.calculate_vwap(trade.symbol, window_type)
if vwap is None:
return 0.0
return (trade.price - vwap) / vwap * 100 # Percentage deviation
2. Market Impact Analysis
class MarketImpactAnalyzer {
constructor(processor) {
this.processor = processor;
this.impactThresholds = {
small: 100,
medium: 500,
large: 1000,
whale: 5000
};
processor.on('trade', (trade) => {
this.analyzeMarketImpact(trade);
});
}
analyzeMarketImpact(trade) {
const priceStats = this.processor.getPriceStatistics(trade.symbol);
if (!priceStats || priceStats.samples < 20) return;
// Calculate price impact
const recentAvg = priceStats.mean;
const priceImpact = (trade.price - recentAvg) / recentAvg * 100;
// Categorize trade size
let sizeCategory = 'small';
for (const [category, threshold] of Object.entries(this.impactThresholds)) {
if (trade.size >= threshold) {
sizeCategory = category;
} else {
break;
}
}
if (Math.abs(priceImpact) > 0.1) { // 0.1% price impact
console.log(`💥 Market Impact: ${trade.symbol} ${sizeCategory} trade (${trade.size.toFixed(2)}) caused ${priceImpact.toFixed(2)}% price ${priceImpact > 0 ? 'increase' : 'decrease'}`);
}
}
}
3. Arbitrage Opportunity Detection
type ArbitrageDetector struct {
processor *TradeStreamProcessor
priceFeeds map[string]map[string]float64 // symbol -> exchange -> price
spreadAlerts []ArbitrageAlert
}
type ArbitrageAlert struct {
Symbol string
Exchange1 string
Price1 float64
Exchange2 string
Price2 float64
SpreadPct float64
Timestamp time.Time
}
func (ad *ArbitrageDetector) CheckArbitrageOpportunity(trade *Trade) {
// Compare with other exchange prices
exchangePrices, exists := ad.priceFeeds[trade.Symbol]
if !exists {
return
}
for exchange, price := range exchangePrices {
if exchange == "hyperliquid" {
continue
}
spread := abs(trade.Price - price) / trade.Price
if spread > 0.005 { // 0.5% arbitrage opportunity
alert := ArbitrageAlert{
Symbol: trade.Symbol,
Exchange1: "hyperliquid",
Price1: trade.Price,
Exchange2: exchange,
Price2: price,
SpreadPct: spread * 100,
Timestamp: time.Now(),
}
ad.spreadAlerts = append(ad.spreadAlerts, alert)
log.Printf("🔄 Arbitrage opportunity: %s %.2f%% spread between %s (%.6f) and %s (%.6f)",
alert.Symbol, alert.SpreadPct, alert.Exchange1, alert.Price1, alert.Exchange2, alert.Price2)
}
}
}
4. Trade Pattern Recognition
class PatternRecognizer:
def __init__(self, analyzer):
self.analyzer = analyzer
self.patterns = []
analyzer.add_trade_handler(self.analyze_patterns)
def analyze_patterns(self, trade: Trade):
# Detect accumulation patterns
self.detect_accumulation(trade)
# Detect distribution patterns
self.detect_distribution(trade)
# Detect momentum patterns
self.detect_momentum_patterns(trade)
def detect_accumulation(self, trade: Trade):
"""Detect accumulation patterns (many small buys)"""
recent_trades = list(self.analyzer.recent_trades[trade.symbol])[-20:]
if len(recent_trades) < 20:
return
buy_trades = [t for t in recent_trades if t.side == 'buy']
# Check if majority are small buy trades
if len(buy_trades) > 15 and all(t.size < 100 for t in buy_trades):
self.patterns.append({
'type': 'ACCUMULATION',
'symbol': trade.symbol,
'buy_trades': len(buy_trades),
'avg_size': sum(t.size for t in buy_trades) / len(buy_trades),
'timestamp': trade.timestamp
})
print(f"📈 Accumulation pattern detected: {trade.symbol}")
def detect_distribution(self, trade: Trade):
"""Detect distribution patterns (many small sells)"""
# Similar logic for sell trades
pass
def detect_momentum_patterns(self, trade: Trade):
"""Detect momentum breakout patterns"""
prices = list(self.analyzer.price_history[trade.symbol])
if len(prices) < 50:
return
# Check for price breakout above recent highs
recent_high = max(prices[-20:-1]) # Exclude current price
if trade.price > recent_high * 1.02: # 2% breakout
sentiment = self.analyzer.get_market_sentiment(trade.symbol)
if sentiment in ['BULLISH', 'STRONGLY_BULLISH']:
self.patterns.append({
'type': 'BULLISH_BREAKOUT',
'symbol': trade.symbol,
'breakout_price': trade.price,
'previous_high': recent_high,
'breakout_pct': (trade.price - recent_high) / recent_high * 100,
'timestamp': trade.timestamp
})
print(f"🚀 Bullish breakout: {trade.symbol} broke above {recent_high:.6f}")
Error Handling and Reconnection
class RobustTradeStreamer extends TradeStreamProcessor {
constructor(endpoint) {
super(endpoint);
this.maxRetries = 5;
this.retryDelay = 1000;
this.currentRetries = 0;
this.reconnectInterval = null;
}
startStreamWithRetry() {
this.startStream();
this.on('error', (error) => {
this.handleStreamError(error);
});
this.on('end', () => {
this.handleStreamEnd();
});
}
handleStreamError(error) {
console.error(`Stream error (attempt ${this.currentRetries + 1}):`, error.message);
if (this.currentRetries < this.maxRetries) {
this.currentRetries++;
const delay = this.retryDelay * Math.pow(2, this.currentRetries - 1);
console.log(`Retrying in ${delay}ms...`);
setTimeout(() => {
this.startStream();
}, delay);
} else {
console.error('Max retry attempts exceeded. Manual intervention required.');
this.emit('maxRetriesExceeded');
}
}
handleStreamEnd() {
console.log('Stream ended unexpectedly, attempting reconnection...');
this.currentRetries = 0; // Reset retry counter on clean disconnect
this.handleStreamError(new Error('Stream ended'));
}
stopStream() {
if (this.reconnectInterval) {
clearInterval(this.reconnectInterval);
}
super.stopStream();
}
}
Best Practices
- Memory Management: Use bounded collections to prevent memory leaks in long-running processes
- Performance Optimization: Process trades asynchronously to maintain stream throughput
- Signal Generation: Implement sophisticated algorithms for detecting trading opportunities
- Risk Management: Monitor trade patterns for unusual activity or market manipulation
- Data Validation: Verify trade data integrity and handle malformed messages gracefully
- Persistence: Consider storing critical trade data for historical analysis and recovery
Current Limitations
- Historical Data: Cannot stream from historical timestamps; only real-time data available
- Data Retention: Node maintains only 24 hours of historical trade data
- Market Coverage: Trade data limited to active markets on Hyperliquid
- Latency: Network latency may affect real-time trading applications
Need help? Contact our support team or check the Hyperliquid gRPC documentation.