⚠️Blast API (blastapi.io) ends Oct 31. Migrate to Dwellir and skip Alchemy's expensive compute units.
Switch Today →
Skip to main content

StreamBlockFills

Stream continuous fill data for executed orders, providing real-time visibility into order execution and settlement.

When to Use This Method

StreamBlockFills is essential for:

  • Order Execution Tracking - Monitor fill rates and execution quality
  • Portfolio Management - Track position changes and settlement
  • Execution Analytics - Analyze slippage and execution performance
  • Trade Settlement - Confirm order fills and update balances

Method Signature

rpc StreamBlockFills(Timestamp) returns (stream BlockFills) {}

Request Message

message Timestamp {   
int64 timestamp = 1;
}

Current Limitation: The timestamp parameter is not yet implemented. Streams currently start from "now" and stream forward only, regardless of the timestamp value provided.

Response Stream

message BlockFills {
// JSON-encoded object conforming to files of
// Hyperliquid data dir "node_fills"
bytes data = 1;
}

The data field contains JSON-encoded fill information including:

  • Fill execution details (filled size, execution price)
  • Order references and matching information
  • User accounts and position updates
  • Settlement timestamps and block references
  • Fee calculations and distributions

Implementation Examples

package main

import (
"context"
"encoding/json"
"log"
"sync"
"time"

pb "your-project/hyperliquid_l1_gateway/v1"
"google.golang.org/grpc"
)

type Fill struct {
ID string `json:"id"`
OrderID string `json:"order_id"`
UserAddress string `json:"user_address"`
Symbol string `json:"symbol"`
Side string `json:"side"`
FilledSize float64 `json:"filled_size"`
FilledPrice float64 `json:"filled_price"`
Fee float64 `json:"fee"`
FeeAsset string `json:"fee_asset"`
Timestamp int64 `json:"timestamp"`
BlockHeight int64 `json:"block_height"`
BlockHash string `json:"block_hash"`
SequenceNum int64 `json:"sequence_num"`
}

type ExecutionStats struct {
TotalFills int64
TotalVolume float64
TotalFees float64
AverageSlippage float64
FillRate float64 // Percentage of orders filled

// Per-user statistics
UserFills map[string]int64
UserVolume map[string]float64
UserFees map[string]float64

mutex sync.RWMutex
}

type FillStreamProcessor struct {
client pb.HyperliquidL1GatewayClient
stats *ExecutionStats
recentFills []Fill
maxHistory int

// Event handlers
fillHandlers []func(*Fill)

mutex sync.RWMutex
}

func NewFillStreamProcessor(client pb.HyperliquidL1GatewayClient) *FillStreamProcessor {
return &FillStreamProcessor{
client: client,
stats: &ExecutionStats{
UserFills: make(map[string]int64),
UserVolume: make(map[string]float64),
UserFees: make(map[string]float64),
},
recentFills: make([]Fill, 0),
maxHistory: 10000,
fillHandlers: make([]func(*Fill), 0),
}
}

func (fsp *FillStreamProcessor) StreamFills(ctx context.Context) error {
stream, err := fsp.client.StreamBlockFills(
ctx,
&pb.Timestamp{Timestamp: 0},
)
if err != nil {
return err
}

log.Println("Starting fill stream...")

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
blockFills, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return err
}

if err := fsp.processBlockFills(blockFills.Data); err != nil {
log.Printf("Failed to process fills: %v", err)
continue
}
}
}
}

func (fsp *FillStreamProcessor) processBlockFills(data []byte) error {
var fillsData struct {
Fills []Fill `json:"fills"`
Block struct {
Height int64 `json:"height"`
Hash string `json:"hash"`
Timestamp int64 `json:"timestamp"`
} `json:"block"`
}

if err := json.Unmarshal(data, &fillsData); err != nil {
return err
}

log.Printf("Processing %d fills from block %d",
len(fillsData.Fills), fillsData.Block.Height)

fsp.mutex.Lock()
defer fsp.mutex.Unlock()

// Process each fill
for _, fill := range fillsData.Fills {
fill.BlockHeight = fillsData.Block.Height
fill.BlockHash = fillsData.Block.Hash

fsp.updateStats(&fill)
fsp.storeFill(&fill)
fsp.notifyHandlers(&fill)
}

return nil
}

func (fsp *FillStreamProcessor) updateStats(fill *Fill) {
fsp.stats.mutex.Lock()
defer fsp.stats.mutex.Unlock()

// Update global stats
fsp.stats.TotalFills++
fsp.stats.TotalVolume += fill.FilledSize
fsp.stats.TotalFees += fill.Fee

// Update user-specific stats
user := fill.UserAddress
fsp.stats.UserFills[user]++
fsp.stats.UserVolume[user] += fill.FilledSize
fsp.stats.UserFees[user] += fill.Fee

log.Printf("Fill: %s %s %.6f @ %.6f (Fee: %.6f %s)",
fill.UserAddress, fill.Symbol, fill.FilledSize,
fill.FilledPrice, fill.Fee, fill.FeeAsset)
}

func (fsp *FillStreamProcessor) storeFill(fill *Fill) {
fsp.recentFills = append(fsp.recentFills, *fill)

// Maintain history limit
if len(fsp.recentFills) > fsp.maxHistory {
fsp.recentFills = fsp.recentFills[len(fsp.recentFills)-fsp.maxHistory:]
}
}

func (fsp *FillStreamProcessor) AddFillHandler(handler func(*Fill)) {
fsp.fillHandlers = append(fsp.fillHandlers, handler)
}

func (fsp *FillStreamProcessor) notifyHandlers(fill *Fill) {
for _, handler := range fsp.fillHandlers {
go handler(fill) // Handle asynchronously
}
}

func (fsp *FillStreamProcessor) GetUserStats(userAddress string) map[string]interface{} {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()

return map[string]interface{}{
"fills": fsp.stats.UserFills[userAddress],
"volume": fsp.stats.UserVolume[userAddress],
"fees": fsp.stats.UserFees[userAddress],
"avg_size": fsp.calculateAvgFillSize(userAddress),
"avg_fee": fsp.calculateAvgFeePerFill(userAddress),
}
}

func (fsp *FillStreamProcessor) calculateAvgFillSize(userAddress string) float64 {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()

fills := fsp.stats.UserFills[userAddress]
volume := fsp.stats.UserVolume[userAddress]

if fills == 0 {
return 0
}

return volume / float64(fills)
}

func (fsp *FillStreamProcessor) calculateAvgFeePerFill(userAddress string) float64 {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()

fills := fsp.stats.UserFills[userAddress]
fees := fsp.stats.UserFees[userAddress]

if fills == 0 {
return 0
}

return fees / float64(fills)
}

func (fsp *FillStreamProcessor) GetGlobalStats() map[string]interface{} {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()

avgFillSize := float64(0)
if fsp.stats.TotalFills > 0 {
avgFillSize = fsp.stats.TotalVolume / float64(fsp.stats.TotalFills)
}

return map[string]interface{}{
"total_fills": fsp.stats.TotalFills,
"total_volume": fsp.stats.TotalVolume,
"total_fees": fsp.stats.TotalFees,
"avg_fill_size": avgFillSize,
"unique_users": len(fsp.stats.UserFills),
"avg_fee_per_fill": fsp.stats.TotalFees / float64(fsp.stats.TotalFills),
}
}

func (fsp *FillStreamProcessor) GetRecentFills(limit int) []Fill {
fsp.mutex.RLock()
defer fsp.mutex.RUnlock()

if limit > len(fsp.recentFills) {
limit = len(fsp.recentFills)
}

if limit == 0 {
return []Fill{}
}

start := len(fsp.recentFills) - limit
result := make([]Fill, limit)
copy(result, fsp.recentFills[start:])

return result
}

// Advanced analytics
func (fsp *FillStreamProcessor) CalculateExecutionQuality() map[string]float64 {
fsp.mutex.RLock()
defer fsp.mutex.RUnlock()

if len(fsp.recentFills) == 0 {
return map[string]float64{}
}

// Group fills by symbol for analysis
symbolFills := make(map[string][]Fill)
for _, fill := range fsp.recentFills {
symbolFills[fill.Symbol] = append(symbolFills[fill.Symbol], fill)
}

quality := make(map[string]float64)

for symbol, fills := range symbolFills {
if len(fills) < 2 {
continue
}

// Calculate price consistency (lower variance = better execution)
prices := make([]float64, len(fills))
for i, fill := range fills {
prices[i] = fill.FilledPrice
}

variance := calculateVariance(prices)
quality[symbol] = 1.0 / (1.0 + variance) // Higher score = better consistency
}

return quality
}

func calculateVariance(values []float64) float64 {
if len(values) == 0 {
return 0
}

mean := 0.0
for _, v := range values {
mean += v
}
mean /= float64(len(values))

variance := 0.0
for _, v := range values {
diff := v - mean
variance += diff * diff
}
variance /= float64(len(values))

return variance
}

func main() {
// Connect to Hyperliquid L1 Gateway
conn, err := grpc.Dial(
"<YOUR_HYPERLIQUID_ENDPOINT>", // Contact support@dwellir.com
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()

client := pb.NewHyperliquidL1GatewayClient(conn)
processor := NewFillStreamProcessor(client)

// Add custom fill handlers
processor.AddFillHandler(func(fill *Fill) {
// Large fill alert
if fill.FilledSize > 1000 {
log.Printf("🔥 Large fill: %s %.2f @ %.6f",
fill.Symbol, fill.FilledSize, fill.FilledPrice)
}
})

processor.AddFillHandler(func(fill *Fill) {
// High fee alert
feeRate := fill.Fee / (fill.FilledSize * fill.FilledPrice)
if feeRate > 0.001 { // 0.1% fee rate
log.Printf("💰 High fee: %s %.4f%% fee rate",
fill.Symbol, feeRate*100)
}
})

// Start streaming in background
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
if err := processor.StreamFills(ctx); err != nil {
log.Printf("Stream ended: %v", err)
}
}()

// Print analytics periodically
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
globalStats := processor.GetGlobalStats()
executionQuality := processor.CalculateExecutionQuality()

log.Println("=== FILL ANALYTICS ===")
log.Printf("Total fills: %v, Volume: %.2f, Fees: %.6f",
globalStats["total_fills"], globalStats["total_volume"],
globalStats["total_fees"])
log.Printf("Avg fill size: %.4f, Unique users: %v",
globalStats["avg_fill_size"], globalStats["unique_users"])

if len(executionQuality) > 0 {
log.Println("Execution Quality:")
for symbol, quality := range executionQuality {
log.Printf(" %s: %.4f", symbol, quality)
}
}
}
}
}

Common Use Cases

1. Portfolio Position Tracking

class PositionTracker:
def __init__(self, analyzer, user_address):
self.analyzer = analyzer
self.user_address = user_address
self.positions = defaultdict(float) # symbol -> net position

analyzer.add_fill_handler(self.track_position)

def track_position(self, fill: Fill):
if fill.user_address != self.user_address:
return

# Update position based on fill
symbol = fill.symbol
size_change = fill.filled_size

if fill.side == 'sell':
size_change = -size_change

self.positions[symbol] += size_change

print(f"Position update: {symbol} {self.positions[symbol]:+.6f}")

def get_current_positions(self):
return dict(self.positions)

2. Execution Cost Analysis

class ExecutionCostAnalyzer {
constructor(processor) {
this.processor = processor;
this.executionData = new Map(); // orderId -> execution info

processor.on('fill', (fill) => {
this.analyzeExecution(fill);
});
}

analyzeExecution(fill) {
if (!this.executionData.has(fill.orderId)) {
this.executionData.set(fill.orderId, {
fills: [],
totalFilled: 0,
weightedPrice: 0,
totalFees: 0
});
}

const execution = this.executionData.get(fill.orderId);
execution.fills.push(fill);
execution.totalFilled += fill.filledSize;
execution.totalFees += fill.fee;

// Calculate weighted average price
const totalValue = execution.fills.reduce((sum, f) => sum + (f.filledPrice * f.filledSize), 0);
execution.weightedPrice = totalValue / execution.totalFilled;

// Calculate execution cost
const feeRate = execution.totalFees / (execution.totalFilled * execution.weightedPrice);

if (execution.fills.length === 1) {
console.log(`✅ Complete fill: ${fill.symbol} ${fill.filledSize.toFixed(6)} @ ${fill.filledPrice.toFixed(6)}`);
} else {
console.log(`📊 Partial fill ${execution.fills.length}: ${fill.symbol} VWAP: ${execution.weightedPrice.toFixed(6)}, Fee rate: ${(feeRate * 100).toFixed(3)}%`);
}
}
}

3. Settlement Reconciliation

type SettlementReconciler struct {
processor *FillStreamProcessor
expectedFills map[string]Fill // orderId -> expected fill
settledFills map[string]Fill // orderId -> actual fill
discrepancies []Discrepancy
}

type Discrepancy struct {
OrderID string
ExpectedSize float64
ActualSize float64
ExpectedPrice float64
ActualPrice float64
Difference float64
Timestamp time.Time
}

func (sr *SettlementReconciler) CheckSettlement(fill *Fill) {
expected, exists := sr.expectedFills[fill.OrderID]
if !exists {
log.Printf("⚠️ Unexpected fill: %s", fill.OrderID)
return
}

// Check for discrepancies
sizeDiff := abs(expected.FilledSize - fill.FilledSize)
priceDiff := abs(expected.FilledPrice - fill.FilledPrice)

if sizeDiff > 0.000001 || priceDiff > 0.000001 {
discrepancy := Discrepancy{
OrderID: fill.OrderID,
ExpectedSize: expected.FilledSize,
ActualSize: fill.FilledSize,
ExpectedPrice: expected.FilledPrice,
ActualPrice: fill.FilledPrice,
Difference: sizeDiff + priceDiff,
Timestamp: time.Now(),
}

sr.discrepancies = append(sr.discrepancies, discrepancy)
log.Printf("🔍 Settlement discrepancy detected: %+v", discrepancy)
} else {
log.Printf("✅ Settlement confirmed: %s", fill.OrderID)
}

sr.settledFills[fill.OrderID] = *fill
delete(sr.expectedFills, fill.OrderID)
}

4. Fee Optimization Tracking

class FeeOptimizationTracker:
def __init__(self, analyzer):
self.analyzer = analyzer
self.fee_tiers = {} # user -> current fee tier
self.volume_thresholds = [1000, 5000, 10000, 50000] # Example thresholds
self.fee_rates = [0.1, 0.08, 0.06, 0.04, 0.02] # Corresponding fee rates (%)

analyzer.add_fill_handler(self.track_fees)

def track_fees(self, fill: Fill):
user = fill.user_address

# Get current user stats
user_stats = self.analyzer.get_user_analytics(user)
if not user_stats:
return

# Determine current fee tier based on volume
volume = user_stats['total_volume']
tier = 0

for i, threshold in enumerate(self.volume_thresholds):
if volume >= threshold:
tier = i + 1
else:
break

self.fee_tiers[user] = tier

# Calculate expected vs actual fee
expected_rate = self.fee_rates[tier] / 100
expected_fee = fill.filled_size * fill.filled_price * expected_rate
actual_fee = fill.fee

if abs(actual_fee - expected_fee) > 0.000001:
print(f"💰 Fee discrepancy for {user[:8]}...{user[-4:]}: "
f"Expected {expected_fee:.6f}, Actual {actual_fee:.6f}")

def get_fee_optimization_report(self):
report = {}

for user, tier in self.fee_tiers.items():
user_stats = self.analyzer.get_user_analytics(user)
if user_stats:
next_threshold = (self.volume_thresholds[tier]
if tier < len(self.volume_thresholds)
else float('inf'))

volume_to_next_tier = max(0, next_threshold - user_stats['total_volume'])

report[user] = {
'current_tier': tier,
'current_fee_rate': self.fee_rates[tier],
'volume_to_next_tier': volume_to_next_tier,
'potential_savings': self.calculate_potential_savings(user_stats, tier)
}

return report

def calculate_potential_savings(self, user_stats, current_tier):
if current_tier >= len(self.fee_rates) - 1:
return 0

current_rate = self.fee_rates[current_tier] / 100
next_rate = self.fee_rates[current_tier + 1] / 100

# Estimate savings on current volume if they upgraded tier
avg_trade_value = user_stats['total_volume'] * user_stats['avg_fill_price']
return avg_trade_value * (current_rate - next_rate)

Performance Monitoring

1. Real-time Metrics Dashboard

class FillMetricsDashboard {
constructor(processor) {
this.processor = processor;
this.metrics = {
fillsPerSecond: 0,
avgFillSize: 0,
avgExecutionPrice: 0,
totalFeesPerHour: 0,
uniqueUsersPerHour: new Set()
};

this.updateInterval = setInterval(() => {
this.updateMetrics();
}, 1000);

processor.on('fill', (fill) => {
this.trackRealtimeMetrics(fill);
});
}

trackRealtimeMetrics(fill) {
this.metrics.uniqueUsersPerHour.add(fill.userAddress);

// Update running averages
const currentTime = Date.now();
// Implementation for real-time metric calculation
}

updateMetrics() {
const now = Date.now();
const recentFills = this.processor.recentFills.filter(
f => f.timestamp * 1000 > now - 3600000 // Last hour
);

if (recentFills.length > 0) {
this.metrics.fillsPerSecond = recentFills.length / 3600;
this.metrics.avgFillSize = recentFills.reduce((sum, f) => sum + f.filledSize, 0) / recentFills.length;
this.metrics.totalFeesPerHour = recentFills.reduce((sum, f) => sum + f.fee, 0);
}

// Console dashboard
console.clear();
console.log('🖥️ Fill Stream Dashboard');
console.log('========================');
console.log(`Fills/sec: ${this.metrics.fillsPerSecond.toFixed(2)}`);
console.log(`Avg fill size: ${this.metrics.avgFillSize.toFixed(4)}`);
console.log(`Hourly fees: ${this.metrics.totalFeesPerHour.toFixed(6)}`);
console.log(`Active users: ${this.metrics.uniqueUsersPerHour.size}`);
console.log(`Total processed: ${this.processor.globalStats.totalFills}`);
}

stop() {
if (this.updateInterval) {
clearInterval(this.updateInterval);
}
}
}

Best Practices

  1. Data Integrity: Validate fill data and handle missing or malformed fields gracefully
  2. Position Tracking: Maintain accurate position tracking for portfolio management
  3. Settlement Verification: Reconcile expected fills with actual executions
  4. Performance Monitoring: Track execution quality and fee optimization
  5. Error Handling: Implement robust error handling for stream interruptions
  6. Memory Management: Use bounded collections to prevent memory leaks in long-running processes

Current Limitations

  • Historical Data: Cannot stream from historical timestamps; only real-time data available
  • Data Retention: Node maintains only 24 hours of historical fill data
  • Order Lifecycle: Fill data is independent; order placement/cancellation requires separate tracking
  • Settlement Finality: Fills represent execution but final settlement may have additional confirmation requirements

Need help? Contact our support team or check the Hyperliquid gRPC documentation.