⚠️Blast API (blastapi.io) ends Oct 31. Migrate to Dwellir and skip Alchemy's expensive compute units.
Switch Today →
Skip to main content

StreamBlockTrades

Stream continuous trade data as blocks are produced, providing real-time access to all trading activity on Hyperliquid.

When to Use This Method

StreamBlockTrades is essential for:

  • Trading Analytics - Analyze volume patterns and market trends
  • Price Discovery - Track executed trades and market prices
  • Market Surveillance - Monitor trading behavior and detect anomalies
  • Performance Analytics - Measure execution quality and slippage

Method Signature

rpc StreamBlockTrades(Timestamp) returns (stream BlockTrades) {}

Request Message

message Timestamp {   
int64 timestamp = 1;
}

Current Limitation: The timestamp parameter is not yet implemented. Streams currently start from "now" and stream forward only, regardless of the timestamp value provided.

Response Stream

message BlockTrades {
// JSON-encoded object conforming to files of
// Hyperliquid data dir "node_trades"
bytes data = 1;
}

The data field contains JSON-encoded trade information including:

  • Trade execution details (price, size, side)
  • Market identifiers and trading pairs
  • Execution timestamps and block heights
  • Trade IDs and order matching information

Implementation Examples

package main

import (
"context"
"encoding/json"
"log"
"sync"
"time"

pb "your-project/hyperliquid_l1_gateway/v1"
"google.golang.org/grpc"
)

type Trade struct {
ID string `json:"id"`
Symbol string `json:"symbol"`
Price float64 `json:"price"`
Size float64 `json:"size"`
Side string `json:"side"` // "buy" or "sell"
Timestamp int64 `json:"timestamp"`
BlockHash string `json:"block_hash"`
}

type TradeAnalytics struct {
TotalVolume float64
TradeCount int64
BuyVolume float64
SellVolume float64
WeightedAvgPrice float64
LastTrade *Trade
TimeWindow time.Duration
StartTime time.Time

mutex sync.RWMutex
}

type TradeStreamProcessor struct {
client pb.HyperliquidL1GatewayClient
analytics map[string]*TradeAnalytics // symbol -> analytics
mutex sync.RWMutex

// Event handlers
tradeHandlers []func(*Trade)
}

func NewTradeStreamProcessor(client pb.HyperliquidL1GatewayClient) *TradeStreamProcessor {
return &TradeStreamProcessor{
client: client,
analytics: make(map[string]*TradeAnalytics),
tradeHandlers: make([]func(*Trade), 0),
}
}

func (tsp *TradeStreamProcessor) StreamTrades(ctx context.Context) error {
stream, err := tsp.client.StreamBlockTrades(
ctx,
&pb.Timestamp{Timestamp: 0},
)
if err != nil {
return err
}

log.Println("Starting trade stream...")

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
blockTrades, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return err
}

if err := tsp.processBlockTrades(blockTrades.Data); err != nil {
log.Printf("Failed to process trades: %v", err)
continue
}
}
}
}

func (tsp *TradeStreamProcessor) processBlockTrades(data []byte) error {
var tradesData struct {
Trades []Trade `json:"trades"`
Block struct {
Height int64 `json:"height"`
Hash string `json:"hash"`
Timestamp int64 `json:"timestamp"`
} `json:"block"`
}

if err := json.Unmarshal(data, &tradesData); err != nil {
return err
}

log.Printf("Processing %d trades from block %d",
len(tradesData.Trades), tradesData.Block.Height)

// Process each trade
for _, trade := range tradesData.Trades {
trade.BlockHash = tradesData.Block.Hash

tsp.updateAnalytics(&trade)
tsp.notifyHandlers(&trade)
}

return nil
}

func (tsp *TradeStreamProcessor) updateAnalytics(trade *Trade) {
tsp.mutex.Lock()
defer tsp.mutex.Unlock()

analytics, exists := tsp.analytics[trade.Symbol]
if !exists {
analytics = &TradeAnalytics{
StartTime: time.Now(),
TimeWindow: time.Hour, // 1-hour window
}
tsp.analytics[trade.Symbol] = analytics
}

analytics.mutex.Lock()
defer analytics.mutex.Unlock()

// Update trade counts and volume
analytics.TradeCount++
analytics.TotalVolume += trade.Size
analytics.LastTrade = trade

// Track buy/sell volume
if trade.Side == "buy" {
analytics.BuyVolume += trade.Size
} else {
analytics.SellVolume += trade.Size
}

// Update weighted average price
totalValue := analytics.WeightedAvgPrice * (analytics.TotalVolume - trade.Size)
totalValue += trade.Price * trade.Size
analytics.WeightedAvgPrice = totalValue / analytics.TotalVolume

log.Printf("Trade: %s %.6f @ %.6f (%s)",
trade.Symbol, trade.Size, trade.Price, trade.Side)
}

func (tsp *TradeStreamProcessor) AddTradeHandler(handler func(*Trade)) {
tsp.tradeHandlers = append(tsp.tradeHandlers, handler)
}

func (tsp *TradeStreamProcessor) notifyHandlers(trade *Trade) {
for _, handler := range tsp.tradeHandlers {
go handler(trade) // Handle asynchronously
}
}

func (tsp *TradeStreamProcessor) GetAnalytics(symbol string) *TradeAnalytics {
tsp.mutex.RLock()
defer tsp.mutex.RUnlock()

analytics, exists := tsp.analytics[symbol]
if !exists {
return nil
}

analytics.mutex.RLock()
defer analytics.mutex.RUnlock()

// Return a copy to avoid race conditions
analyticsCopy := *analytics
return &analyticsCopy
}

func (tsp *TradeStreamProcessor) GetAllAnalytics() map[string]*TradeAnalytics {
tsp.mutex.RLock()
defer tsp.mutex.RUnlock()

result := make(map[string]*TradeAnalytics)
for symbol, analytics := range tsp.analytics {
analytics.mutex.RLock()
analyticsCopy := *analytics
analytics.mutex.RUnlock()
result[symbol] = &analyticsCopy
}

return result
}

// Advanced analytics functions
func (tsp *TradeStreamProcessor) CalculateVWAP(symbol string, window time.Duration) float64 {
analytics := tsp.GetAnalytics(symbol)
if analytics == nil {
return 0
}

// In a real implementation, you'd maintain a time-windowed history
// This is a simplified version using current session data
return analytics.WeightedAvgPrice
}

func (tsp *TradeStreamProcessor) GetMarketSentiment(symbol string) string {
analytics := tsp.GetAnalytics(symbol)
if analytics == nil {
return "UNKNOWN"
}

totalVolume := analytics.BuyVolume + analytics.SellVolume
if totalVolume == 0 {
return "NEUTRAL"
}

buyRatio := analytics.BuyVolume / totalVolume

if buyRatio > 0.6 {
return "BULLISH"
} else if buyRatio < 0.4 {
return "BEARISH"
}

return "NEUTRAL"
}

func main() {
// Connect to Hyperliquid L1 Gateway
conn, err := grpc.Dial(
"<YOUR_HYPERLIQUID_ENDPOINT>", // Contact support@dwellir.com
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()

client := pb.NewHyperliquidL1GatewayClient(conn)
processor := NewTradeStreamProcessor(client)

// Add custom trade handlers
processor.AddTradeHandler(func(trade *Trade) {
// Large trade alert
if trade.Size > 1000 {
log.Printf("🚨 Large trade alert: %s %.2f @ %.6f",
trade.Symbol, trade.Size, trade.Price)
}
})

processor.AddTradeHandler(func(trade *Trade) {
// Price change detection
analytics := processor.GetAnalytics(trade.Symbol)
if analytics != nil && analytics.LastTrade != nil {
prevPrice := analytics.LastTrade.Price
priceChange := (trade.Price - prevPrice) / prevPrice

if abs(priceChange) > 0.01 { // 1% price change
log.Printf("📈 Price movement: %s %.2f%% to %.6f",
trade.Symbol, priceChange*100, trade.Price)
}
}
})

// Start streaming in background
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
if err := processor.StreamTrades(ctx); err != nil {
log.Printf("Stream ended: %v", err)
}
}()

// Print analytics periodically
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
allAnalytics := processor.GetAllAnalytics()

log.Println("=== TRADE ANALYTICS ===")
for symbol, analytics := range allAnalytics {
sentiment := processor.GetMarketSentiment(symbol)
log.Printf("%s: %d trades, %.2f volume, VWAP: %.6f, Sentiment: %s",
symbol, analytics.TradeCount, analytics.TotalVolume,
analytics.WeightedAvgPrice, sentiment)
}
}
}
}

func abs(x float64) float64 {
if x < 0 {
return -x
}
return x
}

Common Use Cases

1. Volume-Weighted Average Price (VWAP) Calculation

class VWAPCalculator:
def __init__(self, analyzer):
self.analyzer = analyzer
self.vwap_windows = {
'short': 50, # 50 trades
'medium': 200, # 200 trades
'long': 500 # 500 trades
}

def calculate_vwap(self, symbol: str, window_type: str = 'medium') -> float:
window_size = self.vwap_windows.get(window_type, 200)
return self.analyzer.get_vwap(symbol, window_size)

def get_vwap_deviation(self, trade: Trade, window_type: str = 'medium') -> float:
vwap = self.calculate_vwap(trade.symbol, window_type)
if vwap is None:
return 0.0

return (trade.price - vwap) / vwap * 100 # Percentage deviation

2. Market Impact Analysis

class MarketImpactAnalyzer {
constructor(processor) {
this.processor = processor;
this.impactThresholds = {
small: 100,
medium: 500,
large: 1000,
whale: 5000
};

processor.on('trade', (trade) => {
this.analyzeMarketImpact(trade);
});
}

analyzeMarketImpact(trade) {
const priceStats = this.processor.getPriceStatistics(trade.symbol);
if (!priceStats || priceStats.samples < 20) return;

// Calculate price impact
const recentAvg = priceStats.mean;
const priceImpact = (trade.price - recentAvg) / recentAvg * 100;

// Categorize trade size
let sizeCategory = 'small';
for (const [category, threshold] of Object.entries(this.impactThresholds)) {
if (trade.size >= threshold) {
sizeCategory = category;
} else {
break;
}
}

if (Math.abs(priceImpact) > 0.1) { // 0.1% price impact
console.log(`💥 Market Impact: ${trade.symbol} ${sizeCategory} trade (${trade.size.toFixed(2)}) caused ${priceImpact.toFixed(2)}% price ${priceImpact > 0 ? 'increase' : 'decrease'}`);
}
}
}

3. Arbitrage Opportunity Detection

type ArbitrageDetector struct {
processor *TradeStreamProcessor
priceFeeds map[string]map[string]float64 // symbol -> exchange -> price
spreadAlerts []ArbitrageAlert
}

type ArbitrageAlert struct {
Symbol string
Exchange1 string
Price1 float64
Exchange2 string
Price2 float64
SpreadPct float64
Timestamp time.Time
}

func (ad *ArbitrageDetector) CheckArbitrageOpportunity(trade *Trade) {
// Compare with other exchange prices
exchangePrices, exists := ad.priceFeeds[trade.Symbol]
if !exists {
return
}

for exchange, price := range exchangePrices {
if exchange == "hyperliquid" {
continue
}

spread := abs(trade.Price - price) / trade.Price

if spread > 0.005 { // 0.5% arbitrage opportunity
alert := ArbitrageAlert{
Symbol: trade.Symbol,
Exchange1: "hyperliquid",
Price1: trade.Price,
Exchange2: exchange,
Price2: price,
SpreadPct: spread * 100,
Timestamp: time.Now(),
}

ad.spreadAlerts = append(ad.spreadAlerts, alert)
log.Printf("🔄 Arbitrage opportunity: %s %.2f%% spread between %s (%.6f) and %s (%.6f)",
alert.Symbol, alert.SpreadPct, alert.Exchange1, alert.Price1, alert.Exchange2, alert.Price2)
}
}
}

4. Trade Pattern Recognition

class PatternRecognizer:
def __init__(self, analyzer):
self.analyzer = analyzer
self.patterns = []
analyzer.add_trade_handler(self.analyze_patterns)

def analyze_patterns(self, trade: Trade):
# Detect accumulation patterns
self.detect_accumulation(trade)

# Detect distribution patterns
self.detect_distribution(trade)

# Detect momentum patterns
self.detect_momentum_patterns(trade)

def detect_accumulation(self, trade: Trade):
"""Detect accumulation patterns (many small buys)"""
recent_trades = list(self.analyzer.recent_trades[trade.symbol])[-20:]

if len(recent_trades) < 20:
return

buy_trades = [t for t in recent_trades if t.side == 'buy']

# Check if majority are small buy trades
if len(buy_trades) > 15 and all(t.size < 100 for t in buy_trades):
self.patterns.append({
'type': 'ACCUMULATION',
'symbol': trade.symbol,
'buy_trades': len(buy_trades),
'avg_size': sum(t.size for t in buy_trades) / len(buy_trades),
'timestamp': trade.timestamp
})
print(f"📈 Accumulation pattern detected: {trade.symbol}")

def detect_distribution(self, trade: Trade):
"""Detect distribution patterns (many small sells)"""
# Similar logic for sell trades
pass

def detect_momentum_patterns(self, trade: Trade):
"""Detect momentum breakout patterns"""
prices = list(self.analyzer.price_history[trade.symbol])

if len(prices) < 50:
return

# Check for price breakout above recent highs
recent_high = max(prices[-20:-1]) # Exclude current price

if trade.price > recent_high * 1.02: # 2% breakout
sentiment = self.analyzer.get_market_sentiment(trade.symbol)
if sentiment in ['BULLISH', 'STRONGLY_BULLISH']:
self.patterns.append({
'type': 'BULLISH_BREAKOUT',
'symbol': trade.symbol,
'breakout_price': trade.price,
'previous_high': recent_high,
'breakout_pct': (trade.price - recent_high) / recent_high * 100,
'timestamp': trade.timestamp
})
print(f"🚀 Bullish breakout: {trade.symbol} broke above {recent_high:.6f}")

Error Handling and Reconnection

class RobustTradeStreamer extends TradeStreamProcessor {
constructor(endpoint) {
super(endpoint);
this.maxRetries = 5;
this.retryDelay = 1000;
this.currentRetries = 0;
this.reconnectInterval = null;
}

startStreamWithRetry() {
this.startStream();

this.on('error', (error) => {
this.handleStreamError(error);
});

this.on('end', () => {
this.handleStreamEnd();
});
}

handleStreamError(error) {
console.error(`Stream error (attempt ${this.currentRetries + 1}):`, error.message);

if (this.currentRetries < this.maxRetries) {
this.currentRetries++;
const delay = this.retryDelay * Math.pow(2, this.currentRetries - 1);

console.log(`Retrying in ${delay}ms...`);
setTimeout(() => {
this.startStream();
}, delay);
} else {
console.error('Max retry attempts exceeded. Manual intervention required.');
this.emit('maxRetriesExceeded');
}
}

handleStreamEnd() {
console.log('Stream ended unexpectedly, attempting reconnection...');
this.currentRetries = 0; // Reset retry counter on clean disconnect
this.handleStreamError(new Error('Stream ended'));
}

stopStream() {
if (this.reconnectInterval) {
clearInterval(this.reconnectInterval);
}
super.stopStream();
}
}

Best Practices

  1. Memory Management: Use bounded collections to prevent memory leaks in long-running processes
  2. Performance Optimization: Process trades asynchronously to maintain stream throughput
  3. Signal Generation: Implement sophisticated algorithms for detecting trading opportunities
  4. Risk Management: Monitor trade patterns for unusual activity or market manipulation
  5. Data Validation: Verify trade data integrity and handle malformed messages gracefully
  6. Persistence: Consider storing critical trade data for historical analysis and recovery

Current Limitations

  • Historical Data: Cannot stream from historical timestamps; only real-time data available
  • Data Retention: Node maintains only 24 hours of historical trade data
  • Market Coverage: Trade data limited to active markets on Hyperliquid
  • Latency: Network latency may affect real-time trading applications

Need help? Contact our support team or check the Hyperliquid gRPC documentation.