StreamBlockFills
Stream continuous fill data for executed orders, providing real-time visibility into order execution and settlement.
When to Use This Method
StreamBlockFills
is essential for:
- Order Execution Tracking - Monitor fill rates and execution quality
- Portfolio Management - Track position changes and settlement
- Execution Analytics - Analyze slippage and execution performance
- Trade Settlement - Confirm order fills and update balances
Method Signature
rpc StreamBlockFills(Timestamp) returns (stream BlockFills) {}
Request Message
message Timestamp {
int64 timestamp = 1;
}
Current Limitation: The timestamp parameter is not yet implemented. Streams currently start from "now" and stream forward only, regardless of the timestamp value provided.
Response Stream
message BlockFills {
// JSON-encoded object conforming to files of
// Hyperliquid data dir "node_fills"
bytes data = 1;
}
The data
field contains JSON-encoded fill information including:
- Fill execution details (filled size, execution price)
- Order references and matching information
- User accounts and position updates
- Settlement timestamps and block references
- Fee calculations and distributions
Implementation Examples
- Go
- Python
- Node.js
package main
import (
"context"
"encoding/json"
"log"
"sync"
"time"
pb "your-project/hyperliquid_l1_gateway/v1"
"google.golang.org/grpc"
)
type Fill struct {
ID string `json:"id"`
OrderID string `json:"order_id"`
UserAddress string `json:"user_address"`
Symbol string `json:"symbol"`
Side string `json:"side"`
FilledSize float64 `json:"filled_size"`
FilledPrice float64 `json:"filled_price"`
Fee float64 `json:"fee"`
FeeAsset string `json:"fee_asset"`
Timestamp int64 `json:"timestamp"`
BlockHeight int64 `json:"block_height"`
BlockHash string `json:"block_hash"`
SequenceNum int64 `json:"sequence_num"`
}
type ExecutionStats struct {
TotalFills int64
TotalVolume float64
TotalFees float64
AverageSlippage float64
FillRate float64 // Percentage of orders filled
// Per-user statistics
UserFills map[string]int64
UserVolume map[string]float64
UserFees map[string]float64
mutex sync.RWMutex
}
type FillStreamProcessor struct {
client pb.HyperliquidL1GatewayClient
stats *ExecutionStats
recentFills []Fill
maxHistory int
// Event handlers
fillHandlers []func(*Fill)
mutex sync.RWMutex
}
func NewFillStreamProcessor(client pb.HyperliquidL1GatewayClient) *FillStreamProcessor {
return &FillStreamProcessor{
client: client,
stats: &ExecutionStats{
UserFills: make(map[string]int64),
UserVolume: make(map[string]float64),
UserFees: make(map[string]float64),
},
recentFills: make([]Fill, 0),
maxHistory: 10000,
fillHandlers: make([]func(*Fill), 0),
}
}
func (fsp *FillStreamProcessor) StreamFills(ctx context.Context) error {
stream, err := fsp.client.StreamBlockFills(
ctx,
&pb.Timestamp{Timestamp: 0},
)
if err != nil {
return err
}
log.Println("Starting fill stream...")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
blockFills, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return err
}
if err := fsp.processBlockFills(blockFills.Data); err != nil {
log.Printf("Failed to process fills: %v", err)
continue
}
}
}
}
func (fsp *FillStreamProcessor) processBlockFills(data []byte) error {
var fillsData struct {
Fills []Fill `json:"fills"`
Block struct {
Height int64 `json:"height"`
Hash string `json:"hash"`
Timestamp int64 `json:"timestamp"`
} `json:"block"`
}
if err := json.Unmarshal(data, &fillsData); err != nil {
return err
}
log.Printf("Processing %d fills from block %d",
len(fillsData.Fills), fillsData.Block.Height)
fsp.mutex.Lock()
defer fsp.mutex.Unlock()
// Process each fill
for _, fill := range fillsData.Fills {
fill.BlockHeight = fillsData.Block.Height
fill.BlockHash = fillsData.Block.Hash
fsp.updateStats(&fill)
fsp.storeFill(&fill)
fsp.notifyHandlers(&fill)
}
return nil
}
func (fsp *FillStreamProcessor) updateStats(fill *Fill) {
fsp.stats.mutex.Lock()
defer fsp.stats.mutex.Unlock()
// Update global stats
fsp.stats.TotalFills++
fsp.stats.TotalVolume += fill.FilledSize
fsp.stats.TotalFees += fill.Fee
// Update user-specific stats
user := fill.UserAddress
fsp.stats.UserFills[user]++
fsp.stats.UserVolume[user] += fill.FilledSize
fsp.stats.UserFees[user] += fill.Fee
log.Printf("Fill: %s %s %.6f @ %.6f (Fee: %.6f %s)",
fill.UserAddress, fill.Symbol, fill.FilledSize,
fill.FilledPrice, fill.Fee, fill.FeeAsset)
}
func (fsp *FillStreamProcessor) storeFill(fill *Fill) {
fsp.recentFills = append(fsp.recentFills, *fill)
// Maintain history limit
if len(fsp.recentFills) > fsp.maxHistory {
fsp.recentFills = fsp.recentFills[len(fsp.recentFills)-fsp.maxHistory:]
}
}
func (fsp *FillStreamProcessor) AddFillHandler(handler func(*Fill)) {
fsp.fillHandlers = append(fsp.fillHandlers, handler)
}
func (fsp *FillStreamProcessor) notifyHandlers(fill *Fill) {
for _, handler := range fsp.fillHandlers {
go handler(fill) // Handle asynchronously
}
}
func (fsp *FillStreamProcessor) GetUserStats(userAddress string) map[string]interface{} {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()
return map[string]interface{}{
"fills": fsp.stats.UserFills[userAddress],
"volume": fsp.stats.UserVolume[userAddress],
"fees": fsp.stats.UserFees[userAddress],
"avg_size": fsp.calculateAvgFillSize(userAddress),
"avg_fee": fsp.calculateAvgFeePerFill(userAddress),
}
}
func (fsp *FillStreamProcessor) calculateAvgFillSize(userAddress string) float64 {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()
fills := fsp.stats.UserFills[userAddress]
volume := fsp.stats.UserVolume[userAddress]
if fills == 0 {
return 0
}
return volume / float64(fills)
}
func (fsp *FillStreamProcessor) calculateAvgFeePerFill(userAddress string) float64 {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()
fills := fsp.stats.UserFills[userAddress]
fees := fsp.stats.UserFees[userAddress]
if fills == 0 {
return 0
}
return fees / float64(fills)
}
func (fsp *FillStreamProcessor) GetGlobalStats() map[string]interface{} {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()
avgFillSize := float64(0)
if fsp.stats.TotalFills > 0 {
avgFillSize = fsp.stats.TotalVolume / float64(fsp.stats.TotalFills)
}
return map[string]interface{}{
"total_fills": fsp.stats.TotalFills,
"total_volume": fsp.stats.TotalVolume,
"total_fees": fsp.stats.TotalFees,
"avg_fill_size": avgFillSize,
"unique_users": len(fsp.stats.UserFills),
"avg_fee_per_fill": fsp.stats.TotalFees / float64(fsp.stats.TotalFills),
}
}
func (fsp *FillStreamProcessor) GetRecentFills(limit int) []Fill {
fsp.mutex.RLock()
defer fsp.mutex.RUnlock()
if limit > len(fsp.recentFills) {
limit = len(fsp.recentFills)
}
if limit == 0 {
return []Fill{}
}
start := len(fsp.recentFills) - limit
result := make([]Fill, limit)
copy(result, fsp.recentFills[start:])
return result
}
// Advanced analytics
func (fsp *FillStreamProcessor) CalculateExecutionQuality() map[string]float64 {
fsp.mutex.RLock()
defer fsp.mutex.RUnlock()
if len(fsp.recentFills) == 0 {
return map[string]float64{}
}
// Group fills by symbol for analysis
symbolFills := make(map[string][]Fill)
for _, fill := range fsp.recentFills {
symbolFills[fill.Symbol] = append(symbolFills[fill.Symbol], fill)
}
quality := make(map[string]float64)
for symbol, fills := range symbolFills {
if len(fills) < 2 {
continue
}
// Calculate price consistency (lower variance = better execution)
prices := make([]float64, len(fills))
for i, fill := range fills {
prices[i] = fill.FilledPrice
}
variance := calculateVariance(prices)
quality[symbol] = 1.0 / (1.0 + variance) // Higher score = better consistency
}
return quality
}
func calculateVariance(values []float64) float64 {
if len(values) == 0 {
return 0
}
mean := 0.0
for _, v := range values {
mean += v
}
mean /= float64(len(values))
variance := 0.0
for _, v := range values {
diff := v - mean
variance += diff * diff
}
variance /= float64(len(values))
return variance
}
func main() {
// Connect to Hyperliquid L1 Gateway
conn, err := grpc.Dial(
"<YOUR_HYPERLIQUID_ENDPOINT>", // Contact support@dwellir.com
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewHyperliquidL1GatewayClient(conn)
processor := NewFillStreamProcessor(client)
// Add custom fill handlers
processor.AddFillHandler(func(fill *Fill) {
// Large fill alert
if fill.FilledSize > 1000 {
log.Printf("🔥 Large fill: %s %.2f @ %.6f",
fill.Symbol, fill.FilledSize, fill.FilledPrice)
}
})
processor.AddFillHandler(func(fill *Fill) {
// High fee alert
feeRate := fill.Fee / (fill.FilledSize * fill.FilledPrice)
if feeRate > 0.001 { // 0.1% fee rate
log.Printf("💰 High fee: %s %.4f%% fee rate",
fill.Symbol, feeRate*100)
}
})
// Start streaming in background
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
if err := processor.StreamFills(ctx); err != nil {
log.Printf("Stream ended: %v", err)
}
}()
// Print analytics periodically
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
globalStats := processor.GetGlobalStats()
executionQuality := processor.CalculateExecutionQuality()
log.Println("=== FILL ANALYTICS ===")
log.Printf("Total fills: %v, Volume: %.2f, Fees: %.6f",
globalStats["total_fills"], globalStats["total_volume"],
globalStats["total_fees"])
log.Printf("Avg fill size: %.4f, Unique users: %v",
globalStats["avg_fill_size"], globalStats["unique_users"])
if len(executionQuality) > 0 {
log.Println("Execution Quality:")
for symbol, quality := range executionQuality {
log.Printf(" %s: %.4f", symbol, quality)
}
}
}
}
}
import grpc
import json
import time
import threading
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Dict, List, Optional, Callable
from statistics import mean, variance
from hyperliquid_l1_gateway_pb2 import Timestamp
from hyperliquid_l1_gateway_pb2_grpc import HyperliquidL1GatewayStub
@dataclass
class Fill:
id: str
order_id: str
user_address: str
symbol: str
side: str
filled_size: float
filled_price: float
fee: float
fee_asset: str
timestamp: int
block_height: int = 0
block_hash: str = ""
sequence_num: int = 0
class FillStreamAnalyzer:
def __init__(self, endpoint):
self.channel = grpc.insecure_channel(endpoint)
self.stub = HyperliquidL1GatewayStub(self.channel)
# Fill storage
self.recent_fills = deque(maxlen=10000)
self.fill_handlers = []
self.running = False
# Analytics
self.user_stats = defaultdict(lambda: {
'fills': 0,
'volume': 0.0,
'fees': 0.0,
'avg_price': 0.0,
'symbols': set()
})
self.symbol_stats = defaultdict(lambda: {
'fills': 0,
'volume': 0.0,
'unique_users': set(),
'price_history': deque(maxlen=1000),
'size_history': deque(maxlen=1000)
})
self.global_stats = {
'total_fills': 0,
'total_volume': 0.0,
'total_fees': 0.0,
'start_time': time.time()
}
self.lock = threading.RLock()
def stream_fills(self):
"""Start streaming fill data"""
self.running = True
timestamp = Timestamp(timestamp=0)
try:
for block_fills in self.stub.StreamBlockFills(timestamp):
if not self.running:
break
try:
fills_data = json.loads(block_fills.data)
self.process_block_fills(fills_data)
except json.JSONDecodeError as e:
print(f"JSON parsing error: {e}")
continue
except grpc.RpcError as e:
print(f"gRPC error: {e}")
except KeyboardInterrupt:
print("Stream interrupted by user")
finally:
self.running = False
def process_block_fills(self, fills_data):
"""Process fills from a single block"""
block_info = fills_data.get('block', {})
fills = fills_data.get('fills', [])
print(f"Processing {len(fills)} fills from block {block_info.get('height', '?')}")
with self.lock:
for fill_data in fills:
fill = Fill(
id=fill_data.get('id', ''),
order_id=fill_data.get('order_id', ''),
user_address=fill_data.get('user_address', ''),
symbol=fill_data.get('symbol', ''),
side=fill_data.get('side', ''),
filled_size=float(fill_data.get('filled_size', 0)),
filled_price=float(fill_data.get('filled_price', 0)),
fee=float(fill_data.get('fee', 0)),
fee_asset=fill_data.get('fee_asset', ''),
timestamp=int(fill_data.get('timestamp', 0)),
block_height=int(block_info.get('height', 0)),
block_hash=block_info.get('hash', ''),
sequence_num=int(fill_data.get('sequence_num', 0))
)
self.update_analytics(fill)
self.notify_handlers(fill)
def update_analytics(self, fill: Fill):
"""Update fill analytics"""
user = fill.user_address
symbol = fill.symbol
# Update user stats
user_stat = self.user_stats[user]
user_stat['fills'] += 1
user_stat['volume'] += fill.filled_size
user_stat['fees'] += fill.fee
user_stat['symbols'].add(symbol)
# Update weighted average price
prev_total_value = user_stat['avg_price'] * (user_stat['volume'] - fill.filled_size)
new_total_value = prev_total_value + (fill.filled_price * fill.filled_size)
user_stat['avg_price'] = new_total_value / user_stat['volume'] if user_stat['volume'] > 0 else 0
# Update symbol stats
symbol_stat = self.symbol_stats[symbol]
symbol_stat['fills'] += 1
symbol_stat['volume'] += fill.filled_size
symbol_stat['unique_users'].add(user)
symbol_stat['price_history'].append(fill.filled_price)
symbol_stat['size_history'].append(fill.filled_size)
# Update global stats
self.global_stats['total_fills'] += 1
self.global_stats['total_volume'] += fill.filled_size
self.global_stats['total_fees'] += fill.fee
# Store recent fill
self.recent_fills.append(fill)
print(f"Fill: {user[:8]}...{user[-4:]} {symbol} {fill.filled_size:.6f} @ {fill.filled_price:.6f}")
def add_fill_handler(self, handler: Callable[[Fill], None]):
"""Add a custom fill handler"""
self.fill_handlers.append(handler)
def notify_handlers(self, fill: Fill):
"""Notify all registered handlers"""
for handler in self.fill_handlers:
try:
handler(fill)
except Exception as e:
print(f"Handler error: {e}")
def get_user_analytics(self, user_address: str) -> Dict:
"""Get analytics for specific user"""
with self.lock:
user_stat = self.user_stats[user_address]
# Get user's recent fills
user_fills = [f for f in list(self.recent_fills)[-1000:] if f.user_address == user_address]
analytics = {
'total_fills': user_stat['fills'],
'total_volume': user_stat['volume'],
'total_fees': user_stat['fees'],
'avg_fill_price': user_stat['avg_price'],
'symbols_traded': len(user_stat['symbols']),
'avg_fill_size': user_stat['volume'] / user_stat['fills'] if user_stat['fills'] > 0 else 0,
'avg_fee_per_fill': user_stat['fees'] / user_stat['fills'] if user_stat['fills'] > 0 else 0,
'recent_fills_count': len(user_fills)
}
return analytics
def get_symbol_analytics(self, symbol: str) -> Dict:
"""Get analytics for specific symbol"""
with self.lock:
symbol_stat = self.symbol_stats[symbol]
price_history = list(symbol_stat['price_history'])
size_history = list(symbol_stat['size_history'])
analytics = {
'total_fills': symbol_stat['fills'],
'total_volume': symbol_stat['volume'],
'unique_traders': len(symbol_stat['unique_users']),
'avg_fill_size': symbol_stat['volume'] / symbol_stat['fills'] if symbol_stat['fills'] > 0 else 0,
}
if price_history:
analytics.update({
'price_mean': mean(price_history),
'price_variance': variance(price_history) if len(price_history) > 1 else 0,
'price_range': max(price_history) - min(price_history),
'latest_price': price_history[-1]
})
if size_history:
analytics.update({
'size_mean': mean(size_history),
'size_variance': variance(size_history) if len(size_history) > 1 else 0,
'largest_fill': max(size_history),
'smallest_fill': min(size_history)
})
return analytics
def detect_large_fills(self, fill: Fill, percentile: float = 95) -> bool:
"""Detect unusually large fills based on historical data"""
with self.lock:
symbol_stat = self.symbol_stats[fill.symbol]
size_history = list(symbol_stat['size_history'])
if len(size_history) < 20:
return fill.filled_size > 1000 # Default threshold
# Calculate percentile threshold
sorted_sizes = sorted(size_history)
threshold_index = int(len(sorted_sizes) * (percentile / 100))
threshold = sorted_sizes[threshold_index]
return fill.filled_size > threshold
def calculate_execution_quality(self, symbol: str, window: int = 100) -> Optional[float]:
"""Calculate execution quality based on price consistency"""
with self.lock:
symbol_stat = self.symbol_stats[symbol]
price_history = list(symbol_stat['price_history'])[-window:]
if len(price_history) < 10:
return None
price_var = variance(price_history)
price_mean = mean(price_history)
# Quality score: lower coefficient of variation = better execution
if price_mean > 0:
cv = (price_var ** 0.5) / price_mean
return max(0, 1 - cv) # Score from 0 to 1
return None
def get_fee_analysis(self) -> Dict:
"""Analyze fee patterns"""
with self.lock:
recent_fills = list(self.recent_fills)[-1000:] # Last 1000 fills
if not recent_fills:
return {}
fees = [f.fee for f in recent_fills]
fee_rates = [
f.fee / (f.filled_size * f.filled_price)
for f in recent_fills
if f.filled_size > 0 and f.filled_price > 0
]
fee_by_symbol = defaultdict(list)
for f in recent_fills:
if f.filled_size > 0 and f.filled_price > 0:
rate = f.fee / (f.filled_size * f.filled_price)
fee_by_symbol[f.symbol].append(rate)
analysis = {
'total_fees_paid': sum(fees),
'avg_fee_per_fill': mean(fees),
'avg_fee_rate': mean(fee_rates) if fee_rates else 0,
'fee_rate_by_symbol': {}
}
for symbol, rates in fee_by_symbol.items():
if rates:
analysis['fee_rate_by_symbol'][symbol] = {
'avg_rate': mean(rates),
'min_rate': min(rates),
'max_rate': max(rates)
}
return analysis
def get_market_participation(self) -> Dict:
"""Analyze market participation patterns"""
with self.lock:
total_users = len(self.user_stats)
total_symbols = len(self.symbol_stats)
# Calculate user activity distribution
user_fills = [stat['fills'] for stat in self.user_stats.values()]
user_volumes = [stat['volume'] for stat in self.user_stats.values()]
participation = {
'total_unique_users': total_users,
'total_symbols_traded': total_symbols,
'avg_fills_per_user': mean(user_fills) if user_fills else 0,
'avg_volume_per_user': mean(user_volumes) if user_volumes else 0
}
# Top traders by volume
top_traders = sorted(
[(user, stat['volume']) for user, stat in self.user_stats.items()],
key=lambda x: x[1],
reverse=True
)[:10]
participation['top_traders_by_volume'] = [
{'user': user, 'volume': volume}
for user, volume in top_traders
]
return participation
def stop(self):
"""Stop streaming"""
self.running = False
class FillAlertSystem:
def __init__(self, analyzer: FillStreamAnalyzer):
self.analyzer = analyzer
self.alerts = []
# Register as fill handler
analyzer.add_fill_handler(self.on_fill)
def on_fill(self, fill: Fill):
"""Process individual fills for alerts"""
# Large fill alert
if self.analyzer.detect_large_fills(fill):
self.generate_alert({
'type': 'LARGE_FILL',
'symbol': fill.symbol,
'user': fill.user_address[:8] + "..." + fill.user_address[-4:],
'size': fill.filled_size,
'price': fill.filled_price,
'timestamp': fill.timestamp
})
# High fee alert
if fill.filled_size > 0 and fill.filled_price > 0:
fee_rate = fill.fee / (fill.filled_size * fill.filled_price)
if fee_rate > 0.005: # 0.5% fee rate
self.generate_alert({
'type': 'HIGH_FEE',
'symbol': fill.symbol,
'user': fill.user_address[:8] + "..." + fill.user_address[-4:],
'fee_rate': fee_rate * 100,
'timestamp': fill.timestamp
})
# Rapid execution alert (user making many fills quickly)
self.check_rapid_execution(fill)
def check_rapid_execution(self, fill: Fill):
"""Check for rapid execution patterns"""
recent_fills = [
f for f in list(self.analyzer.recent_fills)[-100:]
if f.user_address == fill.user_address and
f.timestamp > fill.timestamp - 60 # Last minute
]
if len(recent_fills) > 10: # More than 10 fills in a minute
self.generate_alert({
'type': 'RAPID_EXECUTION',
'user': fill.user_address[:8] + "..." + fill.user_address[-4:],
'fills_count': len(recent_fills),
'time_window': '1 minute',
'timestamp': fill.timestamp
})
def generate_alert(self, alert: Dict):
"""Generate and store alert"""
self.alerts.append(alert)
# Keep only recent alerts
if len(self.alerts) > 1000:
self.alerts = self.alerts[-500:]
print(f"🚨 Alert: {alert['type']} - {alert.get('symbol', '')}")
def get_recent_alerts(self, limit: int = 10) -> List[Dict]:
"""Get recent alerts"""
return self.alerts[-limit:] if self.alerts else []
def main():
analyzer = FillStreamAnalyzer('<YOUR_HYPERLIQUID_ENDPOINT>')
alert_system = FillAlertSystem(analyzer)
# Add custom handlers
def settlement_tracker(fill: Fill):
"""Track settlement patterns"""
print(f"Settlement: {fill.symbol} {fill.filled_size:.4f} @ {fill.filled_price:.6f}")
def volume_tracker(fill: Fill):
"""Track volume patterns"""
symbol_analytics = analyzer.get_symbol_analytics(fill.symbol)
if symbol_analytics['total_volume'] > 10000: # High volume symbol
print(f"📊 High volume symbol: {fill.symbol} (Total: {symbol_analytics['total_volume']:.2f})")
analyzer.add_fill_handler(settlement_tracker)
analyzer.add_fill_handler(volume_tracker)
# Start streaming in background thread
stream_thread = threading.Thread(target=analyzer.stream_fills)
stream_thread.start()
try:
# Print analytics every 30 seconds
while analyzer.running:
time.sleep(30)
print(f"\n📊 Fill Analytics:")
print(f"Global: {analyzer.global_stats}")
# Show top symbols by volume
symbol_volumes = [
(symbol, stat['volume'])
for symbol, stat in analyzer.symbol_stats.items()
]
top_symbols = sorted(symbol_volumes, key=lambda x: x[1], reverse=True)[:5]
print(f"Top symbols by volume:")
for symbol, volume in top_symbols:
analytics = analyzer.get_symbol_analytics(symbol)
quality = analyzer.calculate_execution_quality(symbol)
print(f" {symbol}: {volume:.2f} volume, {analytics['unique_traders']} traders, "
f"quality: {quality:.3f}" if quality else "N/A")
# Show recent alerts
recent_alerts = alert_system.get_recent_alerts(3)
if recent_alerts:
print(f"\n🚨 Recent Alerts:")
for alert in recent_alerts:
print(f" {alert['type']}: {alert}")
except KeyboardInterrupt:
print("\nStopping...")
analyzer.stop()
stream_thread.join()
if __name__ == '__main__':
main()
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const EventEmitter = require('events');
// Load proto file
const packageDefinition = protoLoader.loadSync(
'hyperliquid_l1_gateway.proto',
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
}
);
class FillStreamProcessor extends EventEmitter {
constructor(endpoint) {
super();
const proto = grpc.loadPackageDefinition(packageDefinition);
this.client = new proto.hyperliquid_l1_gateway.v1.HyperliquidL1Gateway(
endpoint, // Contact support@dwellir.com
grpc.credentials.createInsecure()
);
// Fill storage and analytics
this.recentFills = [];
this.maxFillHistory = 10000;
this.userStats = new Map();
this.symbolStats = new Map();
this.globalStats = {
totalFills: 0,
totalVolume: 0,
totalFees: 0,
startTime: Date.now()
};
this.isStreaming = false;
}
startStream() {
if (this.isStreaming) {
console.log('Stream already running');
return;
}
this.isStreaming = true;
const stream = this.client.StreamBlockFills({ timestamp: 0 });
stream.on('data', (blockFills) => {
try {
const fillsData = JSON.parse(blockFills.data.toString());
this.processBlockFills(fillsData);
} catch (error) {
console.error('JSON parsing error:', error.message);
}
});
stream.on('error', (error) => {
console.error('Stream error:', error.message);
this.emit('error', error);
this.isStreaming = false;
});
stream.on('end', () => {
console.log('Stream ended');
this.emit('end');
this.isStreaming = false;
});
this.stream = stream;
console.log('Fill stream started');
}
processBlockFills(fillsData) {
const blockInfo = fillsData.block || {};
const fills = fillsData.fills || [];
console.log(`Processing ${fills.length} fills from block ${blockInfo.height || '?'}`);
for (const fillData of fills) {
const fill = {
id: fillData.id || '',
orderId: fillData.order_id || '',
userAddress: fillData.user_address || '',
symbol: fillData.symbol || '',
side: fillData.side || '',
filledSize: parseFloat(fillData.filled_size || 0),
filledPrice: parseFloat(fillData.filled_price || 0),
fee: parseFloat(fillData.fee || 0),
feeAsset: fillData.fee_asset || '',
timestamp: parseInt(fillData.timestamp || 0),
blockHeight: blockInfo.height || 0,
blockHash: blockInfo.hash || '',
sequenceNum: parseInt(fillData.sequence_num || 0)
};
this.updateAnalytics(fill);
this.emit('fill', fill);
}
}
updateAnalytics(fill) {
const { userAddress, symbol, filledSize, filledPrice, fee } = fill;
// Update user stats
if (!this.userStats.has(userAddress)) {
this.userStats.set(userAddress, {
fills: 0,
volume: 0,
fees: 0,
symbols: new Set(),
avgPrice: 0
});
}
const userStat = this.userStats.get(userAddress);
userStat.fills += 1;
userStat.volume += filledSize;
userStat.fees += fee;
userStat.symbols.add(symbol);
// Update weighted average price
const prevTotalValue = userStat.avgPrice * (userStat.volume - filledSize);
const newTotalValue = prevTotalValue + (filledPrice * filledSize);
userStat.avgPrice = userStat.volume > 0 ? newTotalValue / userStat.volume : 0;
// Update symbol stats
if (!this.symbolStats.has(symbol)) {
this.symbolStats.set(symbol, {
fills: 0,
volume: 0,
uniqueUsers: new Set(),
priceHistory: [],
sizeHistory: []
});
}
const symbolStat = this.symbolStats.get(symbol);
symbolStat.fills += 1;
symbolStat.volume += filledSize;
symbolStat.uniqueUsers.add(userAddress);
symbolStat.priceHistory.push(filledPrice);
symbolStat.sizeHistory.push(filledSize);
// Keep history bounded
if (symbolStat.priceHistory.length > 1000) {
symbolStat.priceHistory.shift();
}
if (symbolStat.sizeHistory.length > 1000) {
symbolStat.sizeHistory.shift();
}
// Update global stats
this.globalStats.totalFills += 1;
this.globalStats.totalVolume += filledSize;
this.globalStats.totalFees += fee;
// Store recent fill
this.recentFills.push(fill);
if (this.recentFills.length > this.maxFillHistory) {
this.recentFills.shift();
}
console.log(`Fill: ${userAddress.slice(0, 8)}...${userAddress.slice(-4)} ${symbol} ${filledSize.toFixed(6)} @ ${filledPrice.toFixed(6)}`);
}
getUserAnalytics(userAddress) {
const userStat = this.userStats.get(userAddress);
if (!userStat) return null;
const userFills = this.recentFills
.filter(f => f.userAddress === userAddress)
.slice(-100);
return {
totalFills: userStat.fills,
totalVolume: userStat.volume,
totalFees: userStat.fees,
avgFillPrice: userStat.avgPrice,
symbolsTraded: userStat.symbols.size,
avgFillSize: userStat.volume / userStat.fills,
avgFeePerFill: userStat.fees / userStat.fills,
recentFillsCount: userFills.length
};
}
getSymbolAnalytics(symbol) {
const symbolStat = this.symbolStats.get(symbol);
if (!symbolStat) return null;
const priceHistory = symbolStat.priceHistory;
const sizeHistory = symbolStat.sizeHistory;
const analytics = {
totalFills: symbolStat.fills,
totalVolume: symbolStat.volume,
uniqueTraders: symbolStat.uniqueUsers.size,
avgFillSize: symbolStat.volume / symbolStat.fills
};
if (priceHistory.length > 0) {
const sum = priceHistory.reduce((a, b) => a + b, 0);
const mean = sum / priceHistory.length;
const variance = priceHistory.reduce((acc, price) => acc + Math.pow(price - mean, 2), 0) / priceHistory.length;
analytics.priceMean = mean;
analytics.priceVariance = variance;
analytics.priceRange = Math.max(...priceHistory) - Math.min(...priceHistory);
analytics.latestPrice = priceHistory[priceHistory.length - 1];
}
if (sizeHistory.length > 0) {
const sum = sizeHistory.reduce((a, b) => a + b, 0);
analytics.sizeMean = sum / sizeHistory.length;
analytics.largestFill = Math.max(...sizeHistory);
analytics.smallestFill = Math.min(...sizeHistory);
}
return analytics;
}
detectLargeFill(fill, percentile = 95) {
const symbolStat = this.symbolStats.get(fill.symbol);
if (!symbolStat || symbolStat.sizeHistory.length < 20) {
return fill.filledSize > 1000; // Default threshold
}
const sortedSizes = [...symbolStat.sizeHistory].sort((a, b) => a - b);
const thresholdIndex = Math.floor(sortedSizes.length * (percentile / 100));
const threshold = sortedSizes[thresholdIndex];
return fill.filledSize > threshold;
}
calculateExecutionQuality(symbol, window = 100) {
const symbolStat = this.symbolStats.get(symbol);
if (!symbolStat) return null;
const priceHistory = symbolStat.priceHistory.slice(-window);
if (priceHistory.length < 10) return null;
const mean = priceHistory.reduce((a, b) => a + b, 0) / priceHistory.length;
const variance = priceHistory.reduce((acc, price) => acc + Math.pow(price - mean, 2), 0) / priceHistory.length;
if (mean > 0) {
const cv = Math.sqrt(variance) / mean;
return Math.max(0, 1 - cv); // Score from 0 to 1
}
return null;
}
getFeeAnalysis() {
if (this.recentFills.length === 0) return {};
const recentFills = this.recentFills.slice(-1000);
const fees = recentFills.map(f => f.fee);
const feeRates = recentFills
.filter(f => f.filledSize > 0 && f.filledPrice > 0)
.map(f => f.fee / (f.filledSize * f.filledPrice));
const feeBySymbol = new Map();
for (const fill of recentFills) {
if (fill.filledSize > 0 && fill.filledPrice > 0) {
const rate = fill.fee / (fill.filledSize * fill.filledPrice);
if (!feeBySymbol.has(fill.symbol)) {
feeBySymbol.set(fill.symbol, []);
}
feeBySymbol.get(fill.symbol).push(rate);
}
}
const analysis = {
totalFeesPaid: fees.reduce((a, b) => a + b, 0),
avgFeePerFill: fees.reduce((a, b) => a + b, 0) / fees.length,
avgFeeRate: feeRates.reduce((a, b) => a + b, 0) / feeRates.length,
feeRateBySymbol: {}
};
for (const [symbol, rates] of feeBySymbol) {
if (rates.length > 0) {
analysis.feeRateBySymbol[symbol] = {
avgRate: rates.reduce((a, b) => a + b, 0) / rates.length,
minRate: Math.min(...rates),
maxRate: Math.max(...rates)
};
}
}
return analysis;
}
getMarketParticipation() {
const totalUsers = this.userStats.size;
const totalSymbols = this.symbolStats.size;
const userFills = Array.from(this.userStats.values()).map(stat => stat.fills);
const userVolumes = Array.from(this.userStats.values()).map(stat => stat.volume);
const participation = {
totalUniqueUsers: totalUsers,
totalSymbolsTraded: totalSymbols,
avgFillsPerUser: userFills.reduce((a, b) => a + b, 0) / userFills.length,
avgVolumePerUser: userVolumes.reduce((a, b) => a + b, 0) / userVolumes.length
};
// Top traders by volume
const topTraders = Array.from(this.userStats.entries())
.sort((a, b) => b[1].volume - a[1].volume)
.slice(0, 10)
.map(([user, stat]) => ({ user, volume: stat.volume }));
participation.topTradersByVolume = topTraders;
return participation;
}
stopStream() {
if (this.stream) {
this.stream.cancel();
this.isStreaming = false;
console.log('Stream stopped');
}
}
}
class FillAlertSystem extends EventEmitter {
constructor(processor) {
super();
this.processor = processor;
this.alerts = [];
this.maxAlerts = 1000;
processor.on('fill', (fill) => {
this.analyzeFillForAlerts(fill);
});
}
analyzeFillForAlerts(fill) {
// Large fill alert
if (this.processor.detectLargeFill(fill)) {
this.generateAlert({
type: 'LARGE_FILL',
symbol: fill.symbol,
user: `${fill.userAddress.slice(0, 8)}...${fill.userAddress.slice(-4)}`,
size: fill.filledSize,
price: fill.filledPrice,
timestamp: fill.timestamp
});
}
// High fee alert
if (fill.filledSize > 0 && fill.filledPrice > 0) {
const feeRate = fill.fee / (fill.filledSize * fill.filledPrice);
if (feeRate > 0.005) { // 0.5% fee rate
this.generateAlert({
type: 'HIGH_FEE',
symbol: fill.symbol,
user: `${fill.userAddress.slice(0, 8)}...${fill.userAddress.slice(-4)}`,
feeRate: feeRate * 100,
timestamp: fill.timestamp
});
}
}
// Rapid execution alert
this.checkRapidExecution(fill);
}
checkRapidExecution(fill) {
const recentUserFills = this.processor.recentFills
.filter(f => f.userAddress === fill.userAddress &&
f.timestamp > fill.timestamp - 60) // Last minute
.slice(-100);
if (recentUserFills.length > 10) {
this.generateAlert({
type: 'RAPID_EXECUTION',
user: `${fill.userAddress.slice(0, 8)}...${fill.userAddress.slice(-4)}`,
fillsCount: recentUserFills.length,
timeWindow: '1 minute',
timestamp: fill.timestamp
});
}
}
generateAlert(alert) {
this.alerts.push(alert);
if (this.alerts.length > this.maxAlerts) {
this.alerts = this.alerts.slice(-this.maxAlerts / 2);
}
console.log(`🚨 Alert: ${alert.type} - ${alert.symbol || 'N/A'}`);
this.emit('alert', alert);
}
getRecentAlerts(limit = 10) {
return this.alerts.slice(-limit);
}
}
// Usage example
function setupFillMonitoring() {
const processor = new FillStreamProcessor('<YOUR_HYPERLIQUID_ENDPOINT>');
const alertSystem = new FillAlertSystem(processor);
// Set up event handlers
processor.on('fill', (fill) => {
// Custom fill processing
if (fill.filledSize > 1000) {
console.log(`🔥 Large fill: ${fill.symbol} ${fill.filledSize.toFixed(2)} @ ${fill.filledPrice.toFixed(6)}`);
}
});
alertSystem.on('alert', (alert) => {
// Handle alerts
console.log(`📢 Alert generated: ${JSON.stringify(alert, null, 2)}`);
});
processor.on('error', (error) => {
console.error('Stream error, attempting restart...', error.message);
setTimeout(() => {
processor.startStream();
}, 5000);
});
// Start streaming
processor.startStream();
// Print analytics periodically
const analyticsInterval = setInterval(() => {
console.log('\n📊 Fill Analytics:');
console.log(`Global: ${JSON.stringify(processor.globalStats, null, 2)}`);
// Show top symbols by volume
const symbolVolumes = Array.from(processor.symbolStats.entries())
.map(([symbol, stat]) => ({ symbol, volume: stat.volume }))
.sort((a, b) => b.volume - a.volume)
.slice(0, 5);
console.log('\nTop symbols by volume:');
for (const { symbol, volume } of symbolVolumes) {
const analytics = processor.getSymbolAnalytics(symbol);
const quality = processor.calculateExecutionQuality(symbol);
console.log(` ${symbol}: ${volume.toFixed(2)} volume, ${analytics.uniqueTraders} traders, quality: ${quality ? quality.toFixed(3) : 'N/A'}`);
}
// Show recent alerts
const recentAlerts = alertSystem.getRecentAlerts(3);
if (recentAlerts.length > 0) {
console.log('\n🚨 Recent Alerts:');
recentAlerts.forEach(alert => {
console.log(` ${alert.type}: ${alert.symbol || 'N/A'}`);
});
}
}, 30000);
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\nShutting down...');
clearInterval(analyticsInterval);
processor.stopStream();
process.exit(0);
});
return { processor, alertSystem };
}
// Start monitoring
setupFillMonitoring();
Common Use Cases
1. Portfolio Position Tracking
class PositionTracker:
def __init__(self, analyzer, user_address):
self.analyzer = analyzer
self.user_address = user_address
self.positions = defaultdict(float) # symbol -> net position
analyzer.add_fill_handler(self.track_position)
def track_position(self, fill: Fill):
if fill.user_address != self.user_address:
return
# Update position based on fill
symbol = fill.symbol
size_change = fill.filled_size
if fill.side == 'sell':
size_change = -size_change
self.positions[symbol] += size_change
print(f"Position update: {symbol} {self.positions[symbol]:+.6f}")
def get_current_positions(self):
return dict(self.positions)
2. Execution Cost Analysis
class ExecutionCostAnalyzer {
constructor(processor) {
this.processor = processor;
this.executionData = new Map(); // orderId -> execution info
processor.on('fill', (fill) => {
this.analyzeExecution(fill);
});
}
analyzeExecution(fill) {
if (!this.executionData.has(fill.orderId)) {
this.executionData.set(fill.orderId, {
fills: [],
totalFilled: 0,
weightedPrice: 0,
totalFees: 0
});
}
const execution = this.executionData.get(fill.orderId);
execution.fills.push(fill);
execution.totalFilled += fill.filledSize;
execution.totalFees += fill.fee;
// Calculate weighted average price
const totalValue = execution.fills.reduce((sum, f) => sum + (f.filledPrice * f.filledSize), 0);
execution.weightedPrice = totalValue / execution.totalFilled;
// Calculate execution cost
const feeRate = execution.totalFees / (execution.totalFilled * execution.weightedPrice);
if (execution.fills.length === 1) {
console.log(`✅ Complete fill: ${fill.symbol} ${fill.filledSize.toFixed(6)} @ ${fill.filledPrice.toFixed(6)}`);
} else {
console.log(`📊 Partial fill ${execution.fills.length}: ${fill.symbol} VWAP: ${execution.weightedPrice.toFixed(6)}, Fee rate: ${(feeRate * 100).toFixed(3)}%`);
}
}
}
3. Settlement Reconciliation
type SettlementReconciler struct {
processor *FillStreamProcessor
expectedFills map[string]Fill // orderId -> expected fill
settledFills map[string]Fill // orderId -> actual fill
discrepancies []Discrepancy
}
type Discrepancy struct {
OrderID string
ExpectedSize float64
ActualSize float64
ExpectedPrice float64
ActualPrice float64
Difference float64
Timestamp time.Time
}
func (sr *SettlementReconciler) CheckSettlement(fill *Fill) {
expected, exists := sr.expectedFills[fill.OrderID]
if !exists {
log.Printf("⚠️ Unexpected fill: %s", fill.OrderID)
return
}
// Check for discrepancies
sizeDiff := abs(expected.FilledSize - fill.FilledSize)
priceDiff := abs(expected.FilledPrice - fill.FilledPrice)
if sizeDiff > 0.000001 || priceDiff > 0.000001 {
discrepancy := Discrepancy{
OrderID: fill.OrderID,
ExpectedSize: expected.FilledSize,
ActualSize: fill.FilledSize,
ExpectedPrice: expected.FilledPrice,
ActualPrice: fill.FilledPrice,
Difference: sizeDiff + priceDiff,
Timestamp: time.Now(),
}
sr.discrepancies = append(sr.discrepancies, discrepancy)
log.Printf("🔍 Settlement discrepancy detected: %+v", discrepancy)
} else {
log.Printf("✅ Settlement confirmed: %s", fill.OrderID)
}
sr.settledFills[fill.OrderID] = *fill
delete(sr.expectedFills, fill.OrderID)
}
4. Fee Optimization Tracking
class FeeOptimizationTracker:
def __init__(self, analyzer):
self.analyzer = analyzer
self.fee_tiers = {} # user -> current fee tier
self.volume_thresholds = [1000, 5000, 10000, 50000] # Example thresholds
self.fee_rates = [0.1, 0.08, 0.06, 0.04, 0.02] # Corresponding fee rates (%)
analyzer.add_fill_handler(self.track_fees)
def track_fees(self, fill: Fill):
user = fill.user_address
# Get current user stats
user_stats = self.analyzer.get_user_analytics(user)
if not user_stats:
return
# Determine current fee tier based on volume
volume = user_stats['total_volume']
tier = 0
for i, threshold in enumerate(self.volume_thresholds):
if volume >= threshold:
tier = i + 1
else:
break
self.fee_tiers[user] = tier
# Calculate expected vs actual fee
expected_rate = self.fee_rates[tier] / 100
expected_fee = fill.filled_size * fill.filled_price * expected_rate
actual_fee = fill.fee
if abs(actual_fee - expected_fee) > 0.000001:
print(f"💰 Fee discrepancy for {user[:8]}...{user[-4:]}: "
f"Expected {expected_fee:.6f}, Actual {actual_fee:.6f}")
def get_fee_optimization_report(self):
report = {}
for user, tier in self.fee_tiers.items():
user_stats = self.analyzer.get_user_analytics(user)
if user_stats:
next_threshold = (self.volume_thresholds[tier]
if tier < len(self.volume_thresholds)
else float('inf'))
volume_to_next_tier = max(0, next_threshold - user_stats['total_volume'])
report[user] = {
'current_tier': tier,
'current_fee_rate': self.fee_rates[tier],
'volume_to_next_tier': volume_to_next_tier,
'potential_savings': self.calculate_potential_savings(user_stats, tier)
}
return report
def calculate_potential_savings(self, user_stats, current_tier):
if current_tier >= len(self.fee_rates) - 1:
return 0
current_rate = self.fee_rates[current_tier] / 100
next_rate = self.fee_rates[current_tier + 1] / 100
# Estimate savings on current volume if they upgraded tier
avg_trade_value = user_stats['total_volume'] * user_stats['avg_fill_price']
return avg_trade_value * (current_rate - next_rate)
Performance Monitoring
1. Real-time Metrics Dashboard
class FillMetricsDashboard {
constructor(processor) {
this.processor = processor;
this.metrics = {
fillsPerSecond: 0,
avgFillSize: 0,
avgExecutionPrice: 0,
totalFeesPerHour: 0,
uniqueUsersPerHour: new Set()
};
this.updateInterval = setInterval(() => {
this.updateMetrics();
}, 1000);
processor.on('fill', (fill) => {
this.trackRealtimeMetrics(fill);
});
}
trackRealtimeMetrics(fill) {
this.metrics.uniqueUsersPerHour.add(fill.userAddress);
// Update running averages
const currentTime = Date.now();
// Implementation for real-time metric calculation
}
updateMetrics() {
const now = Date.now();
const recentFills = this.processor.recentFills.filter(
f => f.timestamp * 1000 > now - 3600000 // Last hour
);
if (recentFills.length > 0) {
this.metrics.fillsPerSecond = recentFills.length / 3600;
this.metrics.avgFillSize = recentFills.reduce((sum, f) => sum + f.filledSize, 0) / recentFills.length;
this.metrics.totalFeesPerHour = recentFills.reduce((sum, f) => sum + f.fee, 0);
}
// Console dashboard
console.clear();
console.log('🖥️ Fill Stream Dashboard');
console.log('========================');
console.log(`Fills/sec: ${this.metrics.fillsPerSecond.toFixed(2)}`);
console.log(`Avg fill size: ${this.metrics.avgFillSize.toFixed(4)}`);
console.log(`Hourly fees: ${this.metrics.totalFeesPerHour.toFixed(6)}`);
console.log(`Active users: ${this.metrics.uniqueUsersPerHour.size}`);
console.log(`Total processed: ${this.processor.globalStats.totalFills}`);
}
stop() {
if (this.updateInterval) {
clearInterval(this.updateInterval);
}
}
}
Best Practices
- Data Integrity: Validate fill data and handle missing or malformed fields gracefully
- Position Tracking: Maintain accurate position tracking for portfolio management
- Settlement Verification: Reconcile expected fills with actual executions
- Performance Monitoring: Track execution quality and fee optimization
- Error Handling: Implement robust error handling for stream interruptions
- Memory Management: Use bounded collections to prevent memory leaks in long-running processes
Current Limitations
- Historical Data: Cannot stream from historical timestamps; only real-time data available
- Data Retention: Node maintains only 24 hours of historical fill data
- Order Lifecycle: Fill data is independent; order placement/cancellation requires separate tracking
- Settlement Finality: Fills represent execution but final settlement may have additional confirmation requirements
Need help? Contact our support team or check the Hyperliquid gRPC documentation.