⚠️Blast API (blastapi.io) ends Oct 31. Migrate to Dwellir and skip Alchemy's expensive compute units.
Switch Today →
Skip to main content

StreamBlockFills

Stream continuous fill data for executed orders, providing real-time visibility into order execution and settlement.

When to Use This Method

StreamBlockFills is essential for:

  • Order Execution Tracking - Monitor fill rates and execution quality
  • Portfolio Management - Track position changes and settlement
  • Execution Analytics - Analyze slippage and execution performance
  • Trade Settlement - Confirm order fills and update balances

Method Signature

rpc StreamBlockFills(Timestamp) returns (stream BlockFills) {}

Request Message

message Timestamp {   
int64 timestamp = 1;
}

Current Limitation: The timestamp parameter is not yet implemented. Streams currently start from "now" and stream forward only, regardless of the timestamp value provided.

Response Stream

message BlockFills {
// JSON-encoded array containing user address and fill data
bytes data = 1;
}

Data Structure

The data field contains a JSON-encoded array with two elements:

[
"0xa4a6e0fd7528a6f5c6ccbb3240ba8a2f825446cf", // User wallet address
{ // Fill details object
"coin": "ENA", // Trading symbol/coin
"px": "0.81717", // Execution price
"sz": "24.0", // Fill size/amount
"side": "B", // Side: "B" (Buy) or "S" (Sell)
"time": 1757414494669, // Unix timestamp (milliseconds)
"startPosition": "-195643.0", // Position before fill
"dir": "Close Short", // Direction (Open Long/Short, Close Long/Short)
"closedPnl": "0.12396", // Realized PnL (if closing position)
"hash": "0xf4313130256dde67f5aa042b320bf40201da0015c060fd3a97f9dc82e461b852", // Transaction hash
"oid": 157341930362, // Order ID
"crossed": false, // Whether order crossed the spread
"fee": "-0.000392", // Trading fee (negative = fee paid)
"tid": 969007110865318, // Trade ID
"cloid": "0x00000000000000199520001057009638", // Client Order ID
"feeToken": "USDC", // Token used for fee payment
"twapId": null // TWAP order ID (if applicable)
}
]

Field Descriptions

FieldTypeDescription
User AddressstringEthereum wallet address of the trader
coinstringTrading pair symbol (e.g., "BTC", "ETH", "ENA")
pxstringExecution price as a decimal string
szstringFill size/amount as a decimal string
sidestring"B" for Buy, "S" for Sell
timenumberUnix timestamp in milliseconds
startPositionstringPosition size before this fill (negative = short)
dirstringTrade direction: "Open Long", "Open Short", "Close Long", "Close Short"
closedPnlstringRealized profit/loss if closing a position
hashstringOn-chain transaction hash
oidnumberUnique order identifier
crossedbooleanTrue if order crossed the bid-ask spread
feestringTrading fee (negative = paid, positive = rebate)
tidnumberUnique trade identifier
cloidstringClient-provided order ID (hex format)
feeTokenstringToken symbol used for fee payment
twapIdnumber/nullTWAP order ID if part of TWAP execution

Developer Insights

Understanding the Data Structure

The StreamBlockFills method provides critical real-time execution data with several important characteristics:

  1. User-Centric Format: Each fill message starts with the user's wallet address, making it easy to filter fills for specific accounts.

  2. Position Tracking: The startPosition and dir fields provide complete context for position management:

    • Track position evolution through startPosition
    • Understand trade intent through dir (Open/Close Long/Short)
    • Calculate net position changes accurately
  3. PnL Calculation: The closedPnl field provides realized profit/loss when closing positions, essential for:

    • Performance tracking
    • Tax reporting
    • Risk management
  4. Fee Analysis:

    • Fees are typically negative (indicating payment)
    • The feeToken field shows which token was used for fee payment
    • Important for calculating true execution costs
  5. Order Identification:

    • oid: Hyperliquid's internal order ID
    • tid: Unique trade identifier
    • cloid: Your client-provided order ID for matching with your system
    • hash: On-chain transaction hash for verification

Key Considerations for Implementation

Price and Size Handling

  • All numeric values (px, sz, fee, etc.) are provided as strings to preserve precision
  • Always use appropriate decimal libraries when processing these values
  • Be aware that positions can be negative (short positions)

Side vs Direction

  • side: Simple Buy (B) or Sell (S) indicator
  • dir: Provides context about position impact (Open Long, Close Short, etc.)
  • Use dir for accurate position tracking and side for order flow analysis

Time Synchronization

  • The time field uses millisecond timestamps
  • Ensure your system clock is synchronized for accurate latency measurements
  • Consider timezone handling for display purposes

Error Handling

  • Handle potential null values (e.g., twapId can be null)
  • Validate data types before processing
  • Implement reconnection logic for stream interruptions

Practical Use Cases

  1. Real-time Position Management: Combine startPosition, dir, and sz to maintain accurate position states
  2. Execution Quality Analysis: Use crossed field to identify aggressive vs passive fills
  3. Fee Optimization: Track fee rates across different order types and sizes
  4. PnL Tracking: Aggregate closedPnl values for comprehensive profit/loss reporting
  5. TWAP Monitoring: Filter by twapId to track TWAP order execution progress

Implementation Examples

package main

import (
"context"
"encoding/json"
"log"
"sync"
"time"

pb "your-project/hyperliquid_l1_gateway/v1"
"google.golang.org/grpc"
)

type Fill struct {
UserAddress string `json:"user_address"` // From array[0]
Coin string `json:"coin"` // Trading symbol
Price string `json:"px"` // Execution price
Size string `json:"sz"` // Fill size
Side string `json:"side"` // "B" or "S"
Time int64 `json:"time"` // Unix timestamp (ms)
StartPosition string `json:"startPosition"` // Position before fill
Direction string `json:"dir"` // Trade direction
ClosedPnl string `json:"closedPnl"` // Realized PnL
Hash string `json:"hash"` // Transaction hash
OrderID int64 `json:"oid"` // Order ID
Crossed bool `json:"crossed"` // Crossed spread
Fee string `json:"fee"` // Trading fee
TradeID int64 `json:"tid"` // Trade ID
ClientOrderID string `json:"cloid"` // Client order ID
FeeToken string `json:"feeToken"` // Fee token symbol
TwapID *int64 `json:"twapId"` // TWAP order ID
}

type ExecutionStats struct {
TotalFills int64
TotalVolume float64
TotalFees float64
AverageSlippage float64
FillRate float64 // Percentage of orders filled

// Per-user statistics
UserFills map[string]int64
UserVolume map[string]float64
UserFees map[string]float64

mutex sync.RWMutex
}

type FillStreamProcessor struct {
client pb.HyperliquidL1GatewayClient
stats *ExecutionStats
recentFills []Fill
maxHistory int

// Event handlers
fillHandlers []func(*Fill)

mutex sync.RWMutex
}

func NewFillStreamProcessor(client pb.HyperliquidL1GatewayClient) *FillStreamProcessor {
return &FillStreamProcessor{
client: client,
stats: &ExecutionStats{
UserFills: make(map[string]int64),
UserVolume: make(map[string]float64),
UserFees: make(map[string]float64),
},
recentFills: make([]Fill, 0),
maxHistory: 10000,
fillHandlers: make([]func(*Fill), 0),
}
}

func (fsp *FillStreamProcessor) StreamFills(ctx context.Context) error {
stream, err := fsp.client.StreamBlockFills(
ctx,
&pb.Timestamp{Timestamp: 0},
)
if err != nil {
return err
}

log.Println("Starting fill stream...")

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
blockFills, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return err
}

if err := fsp.processBlockFills(blockFills.Data); err != nil {
log.Printf("Failed to process fills: %v", err)
continue
}
}
}
}

// Helper functions for parsing fill data
func getString(data map[string]interface{}, key string) string {
if val, ok := data[key]; ok && val != nil {
return fmt.Sprintf("%v", val)
}
return ""
}

func getFloat(data map[string]interface{}, key string) float64 {
if val, ok := data[key]; ok && val != nil {
if f, ok := val.(float64); ok {
return f
}
}
return 0
}

func getBool(data map[string]interface{}, key string) bool {
if val, ok := data[key]; ok && val != nil {
if b, ok := val.(bool); ok {
return b
}
}
return false
}

func (fsp *FillStreamProcessor) processBlockFills(data []byte) error {
var fillArray []json.RawMessage
if err := json.Unmarshal(data, &fillArray); err != nil {
return err
}

if len(fillArray) != 2 {
return fmt.Errorf("unexpected fill array length: %d", len(fillArray))
}

// Parse user address
var userAddress string
if err := json.Unmarshal(fillArray[0], &userAddress); err != nil {
return err
}

// Parse fill data
var fillData map[string]interface{}
if err := json.Unmarshal(fillArray[1], &fillData); err != nil {
return err
}

// Convert to Fill struct
fill := Fill{
UserAddress: userAddress,
Coin: getString(fillData, "coin"),
Price: getString(fillData, "px"),
Size: getString(fillData, "sz"),
Side: getString(fillData, "side"),
Time: int64(getFloat(fillData, "time")),
StartPosition: getString(fillData, "startPosition"),
Direction: getString(fillData, "dir"),
ClosedPnl: getString(fillData, "closedPnl"),
Hash: getString(fillData, "hash"),
OrderID: int64(getFloat(fillData, "oid")),
Crossed: getBool(fillData, "crossed"),
Fee: getString(fillData, "fee"),
TradeID: int64(getFloat(fillData, "tid")),
ClientOrderID: getString(fillData, "cloid"),
FeeToken: getString(fillData, "feeToken"),
}

if twapId := fillData["twapId"]; twapId != nil && twapId != nil {
tid := int64(getFloat(fillData, "twapId"))
fill.TwapID = &tid
}

log.Printf("Processing fill: %s %s %s @ %s",
userAddress[:8], fill.Coin, fill.Size, fill.Price)

fsp.mutex.Lock()
defer fsp.mutex.Unlock()

fsp.updateStats(&fill)
fsp.storeFill(&fill)
fsp.notifyHandlers(&fill)

return nil
}

func (fsp *FillStreamProcessor) updateStats(fill *Fill) {
fsp.stats.mutex.Lock()
defer fsp.stats.mutex.Unlock()

// Parse numeric values
size, _ := strconv.ParseFloat(fill.Size, 64)
price, _ := strconv.ParseFloat(fill.Price, 64)
fee, _ := strconv.ParseFloat(fill.Fee, 64)

// Update global stats
fsp.stats.TotalFills++
fsp.stats.TotalVolume += size
fsp.stats.TotalFees += math.Abs(fee) // Fees are negative when paid

// Update user-specific stats
user := fill.UserAddress
fsp.stats.UserFills[user]++
fsp.stats.UserVolume[user] += size
fsp.stats.UserFees[user] += math.Abs(fee)

// Log position changes
positionInfo := ""
if fill.Direction != "" {
positionInfo = fmt.Sprintf(" [%s from %s]", fill.Direction, fill.StartPosition)
}
if fill.ClosedPnl != "" && fill.ClosedPnl != "0" {
positionInfo += fmt.Sprintf(" PnL: %s", fill.ClosedPnl)
}

log.Printf("Fill: %s %s %s %s @ %s (Fee: %s %s)%s",
fill.UserAddress[:8], fill.Coin, fill.Side, fill.Size,
fill.Price, fill.Fee, fill.FeeToken, positionInfo)
}

func (fsp *FillStreamProcessor) storeFill(fill *Fill) {
fsp.recentFills = append(fsp.recentFills, *fill)

// Maintain history limit
if len(fsp.recentFills) > fsp.maxHistory {
fsp.recentFills = fsp.recentFills[len(fsp.recentFills)-fsp.maxHistory:]
}
}

func (fsp *FillStreamProcessor) AddFillHandler(handler func(*Fill)) {
fsp.fillHandlers = append(fsp.fillHandlers, handler)
}

func (fsp *FillStreamProcessor) notifyHandlers(fill *Fill) {
for _, handler := range fsp.fillHandlers {
go handler(fill) // Handle asynchronously
}
}

func (fsp *FillStreamProcessor) GetUserStats(userAddress string) map[string]interface{} {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()

return map[string]interface{}{
"fills": fsp.stats.UserFills[userAddress],
"volume": fsp.stats.UserVolume[userAddress],
"fees": fsp.stats.UserFees[userAddress],
"avg_size": fsp.calculateAvgFillSize(userAddress),
"avg_fee": fsp.calculateAvgFeePerFill(userAddress),
}
}

func (fsp *FillStreamProcessor) calculateAvgFillSize(userAddress string) float64 {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()

fills := fsp.stats.UserFills[userAddress]
volume := fsp.stats.UserVolume[userAddress]

if fills == 0 {
return 0
}

return volume / float64(fills)
}

func (fsp *FillStreamProcessor) calculateAvgFeePerFill(userAddress string) float64 {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()

fills := fsp.stats.UserFills[userAddress]
fees := fsp.stats.UserFees[userAddress]

if fills == 0 {
return 0
}

return fees / float64(fills)
}

func (fsp *FillStreamProcessor) GetGlobalStats() map[string]interface{} {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()

avgFillSize := float64(0)
if fsp.stats.TotalFills > 0 {
avgFillSize = fsp.stats.TotalVolume / float64(fsp.stats.TotalFills)
}

return map[string]interface{}{
"total_fills": fsp.stats.TotalFills,
"total_volume": fsp.stats.TotalVolume,
"total_fees": fsp.stats.TotalFees,
"avg_fill_size": avgFillSize,
"unique_users": len(fsp.stats.UserFills),
"avg_fee_per_fill": fsp.stats.TotalFees / float64(fsp.stats.TotalFills),
}
}

func (fsp *FillStreamProcessor) GetRecentFills(limit int) []Fill {
fsp.mutex.RLock()
defer fsp.mutex.RUnlock()

if limit > len(fsp.recentFills) {
limit = len(fsp.recentFills)
}

if limit == 0 {
return []Fill{}
}

start := len(fsp.recentFills) - limit
result := make([]Fill, limit)
copy(result, fsp.recentFills[start:])

return result
}

// Advanced analytics
func (fsp *FillStreamProcessor) CalculateExecutionQuality() map[string]float64 {
fsp.mutex.RLock()
defer fsp.mutex.RUnlock()

if len(fsp.recentFills) == 0 {
return map[string]float64{}
}

// Group fills by symbol for analysis
symbolFills := make(map[string][]Fill)
for _, fill := range fsp.recentFills {
symbolFills[fill.Symbol] = append(symbolFills[fill.Symbol], fill)
}

quality := make(map[string]float64)

for symbol, fills := range symbolFills {
if len(fills) < 2 {
continue
}

// Calculate price consistency (lower variance = better execution)
prices := make([]float64, len(fills))
for i, fill := range fills {
prices[i] = fill.FilledPrice
}

variance := calculateVariance(prices)
quality[symbol] = 1.0 / (1.0 + variance) // Higher score = better consistency
}

return quality
}

func calculateVariance(values []float64) float64 {
if len(values) == 0 {
return 0
}

mean := 0.0
for _, v := range values {
mean += v
}
mean /= float64(len(values))

variance := 0.0
for _, v := range values {
diff := v - mean
variance += diff * diff
}
variance /= float64(len(values))

return variance
}

func main() {
// Connect to Hyperliquid L1 Gateway
conn, err := grpc.Dial(
"<YOUR_HYPERLIQUID_ENDPOINT>", // Contact support@dwellir.com
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()

client := pb.NewHyperliquidL1GatewayClient(conn)
processor := NewFillStreamProcessor(client)

// Add custom fill handlers
processor.AddFillHandler(func(fill *Fill) {
// Large fill alert
if fill.FilledSize > 1000 {
log.Printf("🔥 Large fill: %s %.2f @ %.6f",
fill.Symbol, fill.FilledSize, fill.FilledPrice)
}
})

processor.AddFillHandler(func(fill *Fill) {
// High fee alert
feeRate := fill.Fee / (fill.FilledSize * fill.FilledPrice)
if feeRate > 0.001 { // 0.1% fee rate
log.Printf("💰 High fee: %s %.4f%% fee rate",
fill.Symbol, feeRate*100)
}
})

// Start streaming in background
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
if err := processor.StreamFills(ctx); err != nil {
log.Printf("Stream ended: %v", err)
}
}()

// Print analytics periodically
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
globalStats := processor.GetGlobalStats()
executionQuality := processor.CalculateExecutionQuality()

log.Println("=== FILL ANALYTICS ===")
log.Printf("Total fills: %v, Volume: %.2f, Fees: %.6f",
globalStats["total_fills"], globalStats["total_volume"],
globalStats["total_fees"])
log.Printf("Avg fill size: %.4f, Unique users: %v",
globalStats["avg_fill_size"], globalStats["unique_users"])

if len(executionQuality) > 0 {
log.Println("Execution Quality:")
for symbol, quality := range executionQuality {
log.Printf(" %s: %.4f", symbol, quality)
}
}
}
}
}

Common Use Cases

1. Portfolio Position Tracking

class PositionTracker:
def __init__(self, analyzer, user_address):
self.analyzer = analyzer
self.user_address = user_address
self.positions = defaultdict(float) # coin -> net position
self.realized_pnl = defaultdict(float) # coin -> total realized PnL

analyzer.add_fill_handler(self.track_position)

def track_position(self, fill: Fill):
if fill.user_address != self.user_address:
return

coin = fill.coin
size = float(fill.size)

# Track position changes based on direction
if fill.direction == 'Open Long':
self.positions[coin] += size
elif fill.direction == 'Open Short':
self.positions[coin] -= size
elif fill.direction == 'Close Long':
self.positions[coin] -= size
elif fill.direction == 'Close Short':
self.positions[coin] += size

# Track realized PnL
if fill.closed_pnl and fill.closed_pnl != '0':
self.realized_pnl[coin] += float(fill.closed_pnl)

print(f"Position update: {coin} {self.positions[coin]:+.6f} | Direction: {fill.direction} | PnL: {fill.closed_pnl}")

def get_current_positions(self):
return dict(self.positions)

def get_realized_pnl(self):
return dict(self.realized_pnl)

2. Execution Cost Analysis

class ExecutionCostAnalyzer {
constructor(processor) {
this.processor = processor;
this.executionData = new Map(); // orderId -> execution info

processor.on('fill', (fill) => {
this.analyzeExecution(fill);
});
}

analyzeExecution(fill) {
if (!this.executionData.has(fill.orderId)) {
this.executionData.set(fill.orderId, {
fills: [],
totalFilled: 0,
weightedPrice: 0,
totalFees: 0
});
}

const execution = this.executionData.get(fill.orderId);
execution.fills.push(fill);
execution.totalFilled += fill.filledSize;
execution.totalFees += fill.fee;

// Calculate weighted average price
const totalValue = execution.fills.reduce((sum, f) => sum + (f.filledPrice * f.filledSize), 0);
execution.weightedPrice = totalValue / execution.totalFilled;

// Calculate execution cost
const feeRate = execution.totalFees / (execution.totalFilled * execution.weightedPrice);

if (execution.fills.length === 1) {
console.log(`✅ Complete fill: ${fill.symbol} ${fill.filledSize.toFixed(6)} @ ${fill.filledPrice.toFixed(6)}`);
} else {
console.log(`📊 Partial fill ${execution.fills.length}: ${fill.symbol} VWAP: ${execution.weightedPrice.toFixed(6)}, Fee rate: ${(feeRate * 100).toFixed(3)}%`);
}
}
}

3. Settlement Reconciliation

type SettlementReconciler struct {
processor *FillStreamProcessor
expectedFills map[string]Fill // orderId -> expected fill
settledFills map[string]Fill // orderId -> actual fill
discrepancies []Discrepancy
}

type Discrepancy struct {
OrderID string
ExpectedSize float64
ActualSize float64
ExpectedPrice float64
ActualPrice float64
Difference float64
Timestamp time.Time
}

func (sr *SettlementReconciler) CheckSettlement(fill *Fill) {
expected, exists := sr.expectedFills[fill.OrderID]
if !exists {
log.Printf("⚠️ Unexpected fill: %s", fill.OrderID)
return
}

// Check for discrepancies
sizeDiff := abs(expected.FilledSize - fill.FilledSize)
priceDiff := abs(expected.FilledPrice - fill.FilledPrice)

if sizeDiff > 0.000001 || priceDiff > 0.000001 {
discrepancy := Discrepancy{
OrderID: fill.OrderID,
ExpectedSize: expected.FilledSize,
ActualSize: fill.FilledSize,
ExpectedPrice: expected.FilledPrice,
ActualPrice: fill.FilledPrice,
Difference: sizeDiff + priceDiff,
Timestamp: time.Now(),
}

sr.discrepancies = append(sr.discrepancies, discrepancy)
log.Printf("🔍 Settlement discrepancy detected: %+v", discrepancy)
} else {
log.Printf("✅ Settlement confirmed: %s", fill.OrderID)
}

sr.settledFills[fill.OrderID] = *fill
delete(sr.expectedFills, fill.OrderID)
}

4. Fee Optimization Tracking

class FeeOptimizationTracker:
def __init__(self, analyzer):
self.analyzer = analyzer
self.fee_tiers = {} # user -> current fee tier
self.volume_thresholds = [1000, 5000, 10000, 50000] # Example thresholds
self.fee_rates = [0.1, 0.08, 0.06, 0.04, 0.02] # Corresponding fee rates (%)

analyzer.add_fill_handler(self.track_fees)

def track_fees(self, fill: Fill):
user = fill.user_address

# Get current user stats
user_stats = self.analyzer.get_user_analytics(user)
if not user_stats:
return

# Determine current fee tier based on volume
volume = user_stats['total_volume']
tier = 0

for i, threshold in enumerate(self.volume_thresholds):
if volume >= threshold:
tier = i + 1
else:
break

self.fee_tiers[user] = tier

# Calculate expected vs actual fee
expected_rate = self.fee_rates[tier] / 100
expected_fee = fill.filled_size * fill.filled_price * expected_rate
actual_fee = fill.fee

if abs(actual_fee - expected_fee) > 0.000001:
print(f"💰 Fee discrepancy for {user[:8]}...{user[-4:]}: "
f"Expected {expected_fee:.6f}, Actual {actual_fee:.6f}")

def get_fee_optimization_report(self):
report = {}

for user, tier in self.fee_tiers.items():
user_stats = self.analyzer.get_user_analytics(user)
if user_stats:
next_threshold = (self.volume_thresholds[tier]
if tier < len(self.volume_thresholds)
else float('inf'))

volume_to_next_tier = max(0, next_threshold - user_stats['total_volume'])

report[user] = {
'current_tier': tier,
'current_fee_rate': self.fee_rates[tier],
'volume_to_next_tier': volume_to_next_tier,
'potential_savings': self.calculate_potential_savings(user_stats, tier)
}

return report

def calculate_potential_savings(self, user_stats, current_tier):
if current_tier >= len(self.fee_rates) - 1:
return 0

current_rate = self.fee_rates[current_tier] / 100
next_rate = self.fee_rates[current_tier + 1] / 100

# Estimate savings on current volume if they upgraded tier
avg_trade_value = user_stats['total_volume'] * user_stats['avg_fill_price']
return avg_trade_value * (current_rate - next_rate)

Performance Monitoring

1. Real-time Metrics Dashboard

class FillMetricsDashboard {
constructor(processor) {
this.processor = processor;
this.metrics = {
fillsPerSecond: 0,
avgFillSize: 0,
avgExecutionPrice: 0,
totalFeesPerHour: 0,
uniqueUsersPerHour: new Set()
};

this.updateInterval = setInterval(() => {
this.updateMetrics();
}, 1000);

processor.on('fill', (fill) => {
this.trackRealtimeMetrics(fill);
});
}

trackRealtimeMetrics(fill) {
this.metrics.uniqueUsersPerHour.add(fill.userAddress);

// Update running averages
const currentTime = Date.now();
// Implementation for real-time metric calculation
}

updateMetrics() {
const now = Date.now();
const recentFills = this.processor.recentFills.filter(
f => f.timestamp * 1000 > now - 3600000 // Last hour
);

if (recentFills.length > 0) {
this.metrics.fillsPerSecond = recentFills.length / 3600;
this.metrics.avgFillSize = recentFills.reduce((sum, f) => sum + f.filledSize, 0) / recentFills.length;
this.metrics.totalFeesPerHour = recentFills.reduce((sum, f) => sum + f.fee, 0);
}

// Console dashboard
console.clear();
console.log('🖥️ Fill Stream Dashboard');
console.log('========================');
console.log(`Fills/sec: ${this.metrics.fillsPerSecond.toFixed(2)}`);
console.log(`Avg fill size: ${this.metrics.avgFillSize.toFixed(4)}`);
console.log(`Hourly fees: ${this.metrics.totalFeesPerHour.toFixed(6)}`);
console.log(`Active users: ${this.metrics.uniqueUsersPerHour.size}`);
console.log(`Total processed: ${this.processor.globalStats.totalFills}`);
}

stop() {
if (this.updateInterval) {
clearInterval(this.updateInterval);
}
}
}

Best Practices

  1. Data Integrity: Validate fill data and handle missing or malformed fields gracefully
  2. Position Tracking: Maintain accurate position tracking for portfolio management
  3. Settlement Verification: Reconcile expected fills with actual executions
  4. Performance Monitoring: Track execution quality and fee optimization
  5. Error Handling: Implement robust error handling for stream interruptions
  6. Memory Management: Use bounded collections to prevent memory leaks in long-running processes

Current Limitations

  • Historical Data: Cannot stream from historical timestamps; only real-time data available
  • Data Retention: Node maintains only 24 hours of historical fill data
  • Order Lifecycle: Fill data is independent; order placement/cancellation requires separate tracking
  • Settlement Finality: Fills represent execution but final settlement may have additional confirmation requirements

Need help? Contact our support team or check the Hyperliquid gRPC documentation.