StreamBlockFills
Stream continuous fill data for executed orders, providing real-time visibility into order execution and settlement.
When to Use This Method#
StreamBlockFills is essential for:
- Order Execution Tracking - Monitor fill rates and execution quality
- Portfolio Management - Track position changes and settlement
- Execution Analytics - Analyze slippage and execution performance
- Trade Settlement - Confirm order fills and update balances
Method Signature#
rpc StreamBlockFills(Timestamp) returns (stream BlockFills) {}
Request Message#
message Timestamp {
int64 timestamp = 1;
}
Current Limitation: The timestamp parameter is not yet implemented. Streams currently start from "now" and stream forward only, regardless of the timestamp value provided.
Response Stream#
message BlockFills {
// JSON-encoded array containing user address and fill data
bytes data = 1;
}
Data Structure#
The data field contains a JSON-encoded array with two elements:
[
"0xa4a6e0fd7528a6f5c6ccbb3240ba8a2f825446cf", // User wallet address
{ // Fill details object
"coin": "ENA", // Trading symbol/coin
"px": "0.81717", // Execution price
"sz": "24.0", // Fill size/amount
"side": "B", // Side: "B" (Buy) or "S" (Sell)
"time": 1757414494669, // Unix timestamp (milliseconds)
"startPosition": "-195643.0", // Position before fill
"dir": "Close Short", // Direction (Open Long/Short, Close Long/Short)
"closedPnl": "0.12396", // Realized PnL (if closing position)
"hash": "0xf4313130256dde67f5aa042b320bf40201da0015c060fd3a97f9dc82e461b852", // Transaction hash
"oid": 157341930362, // Order ID
"crossed": false, // Whether order crossed the spread
"fee": "-0.000392", // Trading fee (negative = fee paid)
"tid": 969007110865318, // Trade ID
"cloid": "0x00000000000000199520001057009638", // Client Order ID
"feeToken": "USDC", // Token used for fee payment
"twapId": null // TWAP order ID (if applicable)
}
]
Field Descriptions#
| Field | Type | Description |
|---|---|---|
| User Address | string | Ethereum wallet address of the trader |
| coin | string | Trading pair symbol (e.g., "BTC", "ETH", "ENA") |
| px | string | Execution price as a decimal string |
| sz | string | Fill size/amount as a decimal string |
| side | string | "B" for Buy, "S" for Sell |
| time | number | Unix timestamp in milliseconds |
| startPosition | string | Position size before this fill (negative = short) |
| dir | string | Trade direction: "Open Long", "Open Short", "Close Long", "Close Short" |
| closedPnl | string | Realized profit/loss if closing a position |
| hash | string | On-chain transaction hash |
| oid | number | Unique order identifier |
| crossed | boolean | True if order crossed the bid-ask spread |
| fee | string | Trading fee (negative = paid, positive = rebate) |
| tid | number | Unique trade identifier |
| cloid | string | Client-provided order ID (hex format) |
| feeToken | string | Token symbol used for fee payment |
| twapId | number/null | TWAP order ID if part of TWAP execution |
Developer Insights#
Understanding the Data Structure#
The StreamBlockFills method provides critical real-time execution data with several important characteristics:
-
User-Centric Format: Each fill message starts with the user's wallet address, making it easy to filter fills for specific accounts.
-
Position Tracking: The
startPositionanddirfields provide complete context for position management:- Track position evolution through
startPosition - Understand trade intent through
dir(Open/Close Long/Short) - Calculate net position changes accurately
- Track position evolution through
-
PnL Calculation: The
closedPnlfield provides realized profit/loss when closing positions, essential for:- Performance tracking
- Tax reporting
- Risk management
-
Fee Analysis:
- Fees are typically negative (indicating payment)
- The
feeTokenfield shows which token was used for fee payment - Important for calculating true execution costs
-
Order Identification:
oid: Hyperliquid's internal order IDtid: Unique trade identifiercloid: Your client-provided order ID for matching with your systemhash: On-chain transaction hash for verification
Key Considerations for Implementation#
Price and Size Handling#
- All numeric values (
px,sz,fee, etc.) are provided as strings to preserve precision - Always use appropriate decimal libraries when processing these values
- Be aware that positions can be negative (short positions)
Side vs Direction#
side: Simple Buy (B) or Sell (S) indicatordir: Provides context about position impact (Open Long, Close Short, etc.)- Use
dirfor accurate position tracking andsidefor order flow analysis
Time Synchronization#
- The
timefield uses millisecond timestamps - Ensure your system clock is synchronized for accurate latency measurements
- Consider timezone handling for display purposes
Error Handling#
- Handle potential null values (e.g.,
twapIdcan be null) - Validate data types before processing
- Implement reconnection logic for stream interruptions
Practical Use Cases#
- Real-time Position Management: Combine
startPosition,dir, andszto maintain accurate position states - Execution Quality Analysis: Use
crossedfield to identify aggressive vs passive fills - Fee Optimization: Track fee rates across different order types and sizes
- PnL Tracking: Aggregate
closedPnlvalues for comprehensive profit/loss reporting - TWAP Monitoring: Filter by
twapIdto track TWAP order execution progress
Implementation Examples#
- Go
- Python
- Node.js
package main
import (
"context"
"encoding/json"
"log"
"sync"
"time"
pb "your-project/hyperliquid_l1_gateway/v1"
"google.golang.org/grpc"
)
type Fill struct {
UserAddress string `json:"user_address"` // From array[0]
Coin string `json:"coin"` // Trading symbol
Price string `json:"px"` // Execution price
Size string `json:"sz"` // Fill size
Side string `json:"side"` // "B" or "S"
Time int64 `json:"time"` // Unix timestamp (ms)
StartPosition string `json:"startPosition"` // Position before fill
Direction string `json:"dir"` // Trade direction
ClosedPnl string `json:"closedPnl"` // Realized PnL
Hash string `json:"hash"` // Transaction hash
OrderID int64 `json:"oid"` // Order ID
Crossed bool `json:"crossed"` // Crossed spread
Fee string `json:"fee"` // Trading fee
TradeID int64 `json:"tid"` // Trade ID
ClientOrderID string `json:"cloid"` // Client order ID
FeeToken string `json:"feeToken"` // Fee token symbol
TwapID *int64 `json:"twapId"` // TWAP order ID
}
type ExecutionStats struct {
TotalFills int64
TotalVolume float64
TotalFees float64
AverageSlippage float64
FillRate float64 // Percentage of orders filled
// Per-user statistics
UserFills map[string]int64
UserVolume map[string]float64
UserFees map[string]float64
mutex sync.RWMutex
}
type FillStreamProcessor struct {
client pb.HyperliquidL1GatewayClient
stats *ExecutionStats
recentFills []Fill
maxHistory int
// Event handlers
fillHandlers []func(*Fill)
mutex sync.RWMutex
}
func NewFillStreamProcessor(client pb.HyperliquidL1GatewayClient) *FillStreamProcessor {
return &FillStreamProcessor{
client: client,
stats: &ExecutionStats{
UserFills: make(map[string]int64),
UserVolume: make(map[string]float64),
UserFees: make(map[string]float64),
},
recentFills: make([]Fill, 0),
maxHistory: 10000,
fillHandlers: make([]func(*Fill), 0),
}
}
func (fsp *FillStreamProcessor) StreamFills(ctx context.Context) error {
stream, err := fsp.client.StreamBlockFills(
ctx,
&pb.Timestamp{Timestamp: 0},
)
if err != nil {
return err
}
log.Println("Starting fill stream...")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
blockFills, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return err
}
if err := fsp.processBlockFills(blockFills.Data); err != nil {
log.Printf("Failed to process fills: %v", err)
continue
}
}
}
}
// Helper functions for parsing fill data
func getString(data map[string]interface{}, key string) string {
if val, ok := data[key]; ok && val != nil {
return fmt.Sprintf("%v", val)
}
return ""
}
func getFloat(data map[string]interface{}, key string) float64 {
if val, ok := data[key]; ok && val != nil {
if f, ok := val.(float64); ok {
return f
}
}
return 0
}
func getBool(data map[string]interface{}, key string) bool {
if val, ok := data[key]; ok && val != nil {
if b, ok := val.(bool); ok {
return b
}
}
return false
}
func (fsp *FillStreamProcessor) processBlockFills(data []byte) error {
var fillArray []json.RawMessage
if err := json.Unmarshal(data, &fillArray); err != nil {
return err
}
if len(fillArray) != 2 {
return fmt.Errorf("unexpected fill array length: %d", len(fillArray))
}
// Parse user address
var userAddress string
if err := json.Unmarshal(fillArray[0], &userAddress); err != nil {
return err
}
// Parse fill data
var fillData map[string]interface{}
if err := json.Unmarshal(fillArray[1], &fillData); err != nil {
return err
}
// Convert to Fill struct
fill := Fill{
UserAddress: userAddress,
Coin: getString(fillData, "coin"),
Price: getString(fillData, "px"),
Size: getString(fillData, "sz"),
Side: getString(fillData, "side"),
Time: int64(getFloat(fillData, "time")),
StartPosition: getString(fillData, "startPosition"),
Direction: getString(fillData, "dir"),
ClosedPnl: getString(fillData, "closedPnl"),
Hash: getString(fillData, "hash"),
OrderID: int64(getFloat(fillData, "oid")),
Crossed: getBool(fillData, "crossed"),
Fee: getString(fillData, "fee"),
TradeID: int64(getFloat(fillData, "tid")),
ClientOrderID: getString(fillData, "cloid"),
FeeToken: getString(fillData, "feeToken"),
}
if twapId := fillData["twapId"]; twapId != nil && twapId != nil {
tid := int64(getFloat(fillData, "twapId"))
fill.TwapID = &tid
}
log.Printf("Processing fill: %s %s %s @ %s",
userAddress[:8], fill.Coin, fill.Size, fill.Price)
fsp.mutex.Lock()
defer fsp.mutex.Unlock()
fsp.updateStats(&fill)
fsp.storeFill(&fill)
fsp.notifyHandlers(&fill)
return nil
}
func (fsp *FillStreamProcessor) updateStats(fill *Fill) {
fsp.stats.mutex.Lock()
defer fsp.stats.mutex.Unlock()
// Parse numeric values
size, _ := strconv.ParseFloat(fill.Size, 64)
price, _ := strconv.ParseFloat(fill.Price, 64)
fee, _ := strconv.ParseFloat(fill.Fee, 64)
// Update global stats
fsp.stats.TotalFills++
fsp.stats.TotalVolume += size
fsp.stats.TotalFees += math.Abs(fee) // Fees are negative when paid
// Update user-specific stats
user := fill.UserAddress
fsp.stats.UserFills[user]++
fsp.stats.UserVolume[user] += size
fsp.stats.UserFees[user] += math.Abs(fee)
// Log position changes
positionInfo := ""
if fill.Direction != "" {
positionInfo = fmt.Sprintf(" [%s from %s]", fill.Direction, fill.StartPosition)
}
if fill.ClosedPnl != "" && fill.ClosedPnl != "0" {
positionInfo += fmt.Sprintf(" PnL: %s", fill.ClosedPnl)
}
log.Printf("Fill: %s %s %s %s @ %s (Fee: %s %s)%s",
fill.UserAddress[:8], fill.Coin, fill.Side, fill.Size,
fill.Price, fill.Fee, fill.FeeToken, positionInfo)
}
func (fsp *FillStreamProcessor) storeFill(fill *Fill) {
fsp.recentFills = append(fsp.recentFills, *fill)
// Maintain history limit
if len(fsp.recentFills) > fsp.maxHistory {
fsp.recentFills = fsp.recentFills[len(fsp.recentFills)-fsp.maxHistory:]
}
}
func (fsp *FillStreamProcessor) AddFillHandler(handler func(*Fill)) {
fsp.fillHandlers = append(fsp.fillHandlers, handler)
}
func (fsp *FillStreamProcessor) notifyHandlers(fill *Fill) {
for _, handler := range fsp.fillHandlers {
go handler(fill) // Handle asynchronously
}
}
func (fsp *FillStreamProcessor) GetUserStats(userAddress string) map[string]interface{} {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()
return map[string]interface{}{
"fills": fsp.stats.UserFills[userAddress],
"volume": fsp.stats.UserVolume[userAddress],
"fees": fsp.stats.UserFees[userAddress],
"avg_size": fsp.calculateAvgFillSize(userAddress),
"avg_fee": fsp.calculateAvgFeePerFill(userAddress),
}
}
func (fsp *FillStreamProcessor) calculateAvgFillSize(userAddress string) float64 {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()
fills := fsp.stats.UserFills[userAddress]
volume := fsp.stats.UserVolume[userAddress]
if fills == 0 {
return 0
}
return volume / float64(fills)
}
func (fsp *FillStreamProcessor) calculateAvgFeePerFill(userAddress string) float64 {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()
fills := fsp.stats.UserFills[userAddress]
fees := fsp.stats.UserFees[userAddress]
if fills == 0 {
return 0
}
return fees / float64(fills)
}
func (fsp *FillStreamProcessor) GetGlobalStats() map[string]interface{} {
fsp.stats.mutex.RLock()
defer fsp.stats.mutex.RUnlock()
avgFillSize := float64(0)
if fsp.stats.TotalFills > 0 {
avgFillSize = fsp.stats.TotalVolume / float64(fsp.stats.TotalFills)
}
return map[string]interface{}{
"total_fills": fsp.stats.TotalFills,
"total_volume": fsp.stats.TotalVolume,
"total_fees": fsp.stats.TotalFees,
"avg_fill_size": avgFillSize,
"unique_users": len(fsp.stats.UserFills),
"avg_fee_per_fill": fsp.stats.TotalFees / float64(fsp.stats.TotalFills),
}
}
func (fsp *FillStreamProcessor) GetRecentFills(limit int) []Fill {
fsp.mutex.RLock()
defer fsp.mutex.RUnlock()
if limit > len(fsp.recentFills) {
limit = len(fsp.recentFills)
}
if limit == 0 {
return []Fill{}
}
start := len(fsp.recentFills) - limit
result := make([]Fill, limit)
copy(result, fsp.recentFills[start:])
return result
}
// Advanced analytics
func (fsp *FillStreamProcessor) CalculateExecutionQuality() map[string]float64 {
fsp.mutex.RLock()
defer fsp.mutex.RUnlock()
if len(fsp.recentFills) == 0 {
return map[string]float64{}
}
// Group fills by symbol for analysis
symbolFills := make(map[string][]Fill)
for _, fill := range fsp.recentFills {
symbolFills[fill.Symbol] = append(symbolFills[fill.Symbol], fill)
}
quality := make(map[string]float64)
for symbol, fills := range symbolFills {
if len(fills) < 2 {
continue
}
// Calculate price consistency (lower variance = better execution)
prices := make([]float64, len(fills))
for i, fill := range fills {
prices[i] = fill.FilledPrice
}
variance := calculateVariance(prices)
quality[symbol] = 1.0 / (1.0 + variance) // Higher score = better consistency
}
return quality
}
func calculateVariance(values []float64) float64 {
if len(values) == 0 {
return 0
}
mean := 0.0
for _, v := range values {
mean += v
}
mean /= float64(len(values))
variance := 0.0
for _, v := range values {
diff := v - mean
variance += diff * diff
}
variance /= float64(len(values))
return variance
}
func main() {
// Connect to Hyperliquid L1 Gateway
conn, err := grpc.Dial(
"<YOUR_HYPERLIQUID_ENDPOINT>", // Contact support@dwellir.com
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewHyperliquidL1GatewayClient(conn)
processor := NewFillStreamProcessor(client)
// Add custom fill handlers
processor.AddFillHandler(func(fill *Fill) {
// Large fill alert
if fill.FilledSize > 1000 {
log.Printf("🔥 Large fill: %s %.2f @ %.6f",
fill.Symbol, fill.FilledSize, fill.FilledPrice)
}
})
processor.AddFillHandler(func(fill *Fill) {
// High fee alert
feeRate := fill.Fee / (fill.FilledSize * fill.FilledPrice)
if feeRate > 0.001 { // 0.1% fee rate
log.Printf("💰 High fee: %s %.4f%% fee rate",
fill.Symbol, feeRate*100)
}
})
// Start streaming in background
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
if err := processor.StreamFills(ctx); err != nil {
log.Printf("Stream ended: %v", err)
}
}()
// Print analytics periodically
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
globalStats := processor.GetGlobalStats()
executionQuality := processor.CalculateExecutionQuality()
log.Println("=== FILL ANALYTICS ===")
log.Printf("Total fills: %v, Volume: %.2f, Fees: %.6f",
globalStats["total_fills"], globalStats["total_volume"],
globalStats["total_fees"])
log.Printf("Avg fill size: %.4f, Unique users: %v",
globalStats["avg_fill_size"], globalStats["unique_users"])
if len(executionQuality) > 0 {
log.Println("Execution Quality:")
for symbol, quality := range executionQuality {
log.Printf(" %s: %.4f", symbol, quality)
}
}
}
}
}
import grpc
import json
import signal
import sys
import os
from datetime import datetime
from dotenv import load_dotenv
import hyperliquid_pb2
import hyperliquid_pb2_grpc
# Load environment variables
load_dotenv()
def stream_block_fills():
endpoint = os.getenv('HYPERLIQUID_ENDPOINT')
api_key = os.getenv('API_KEY')
if not endpoint:
print("Error: HYPERLIQUID_ENDPOINT environment variable is required.")
print("Please create a .env file from .env.example and set your endpoint.")
sys.exit(1)
if not api_key:
print("Error: API_KEY environment variable is required.")
print("Please set your API key in the .env file.")
sys.exit(1)
print('🚀 Hyperliquid Python gRPC Client - Stream Block Fills')
print('===================================================')
print(f'📡 Endpoint: {endpoint}\n')
# Create SSL credentials
credentials = grpc.ssl_channel_credentials()
# Create channel with options
options = [
('grpc.max_receive_message_length', 150 * 1024 * 1024), # 150MB max
]
# Prepare metadata with API key
metadata = [('x-api-key', api_key)]
print('🔌 Connecting to gRPC server...')
with grpc.secure_channel(endpoint, credentials, options=options) as channel:
# Create client stub
client = hyperliquid_pb2_grpc.HyperLiquidL1GatewayStub(channel)
print('✅ Connected successfully!\n')
# Create request - 0 means latest/current block fills
request = hyperliquid_pb2.Timestamp(timestamp=0)
print('📥 Starting block fills stream...')
print('Press Ctrl+C to stop streaming\n')
block_fills_count = 0
# Set up signal handler for graceful shutdown
def signal_handler(sig, frame):
print('\n🛑 Stopping stream...')
print(f'📊 Total block fills received: {block_fills_count}')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
try:
# Start streaming block fills with metadata
for response in client.StreamBlockFills(request, metadata=metadata):
block_fills_count += 1
print(f'\n===== BLOCK FILLS #{block_fills_count} =====')
print(f'📦 Response size: {len(response.data)} bytes')
# Process each block fills response
process_block_fills(response.data, block_fills_count)
print('\n' + '─' * 50)
except grpc.RpcError as e:
print(f'❌ Stream error: {e}')
except KeyboardInterrupt:
print('\n🛑 Stopping stream...')
print(f'\n📊 Total block fills received: {block_fills_count}')
def process_block_fills(data, block_fills_num):
try:
# Parse JSON
block_fills = json.loads(data.decode('utf-8'))
print(f'💰 BLOCK FILLS #{block_fills_num} DETAILS')
print('========================')
# Display block height if available
if 'height' in block_fills:
print(f'📏 Block Height: {block_fills["height"]}')
# Display timestamp
if 'time' in block_fills:
timestamp = block_fills['time']
if isinstance(timestamp, (int, float)):
# Convert from milliseconds to seconds if needed
if timestamp > 10**10: # Likely milliseconds
timestamp = timestamp / 1000
dt = datetime.fromtimestamp(timestamp)
print(f'⏰ Time: {dt.strftime("%Y-%m-%d %H:%M:%S UTC")}')
# Display fills data
if 'fills' in block_fills and isinstance(block_fills['fills'], list):
fills = block_fills['fills']
print(f'📋 Total Fills: {len(fills)}')
# Show first few fill details
max_fills = min(3, len(fills))
for i in range(max_fills):
fill = fills[i]
fill_info = f' • FILL {i + 1}: '
if isinstance(fill, dict):
if 'symbol' in fill:
fill_info += f'Symbol: {fill["symbol"]}'
if 'side' in fill:
fill_info += f', Side: {fill["side"]}'
if 'price' in fill:
fill_info += f', Price: {fill["price"]}'
if 'size' in fill:
fill_info += f', Size: {fill["size"]}'
if 'hash' in fill:
fill_info += f', Hash: {fill["hash"][:12]}...'
else:
fill_info += str(fill)
print(fill_info)
if len(fills) > max_fills:
print(f' ... and {len(fills) - max_fills} more fills')
# Display any other interesting fields
print('\n📊 Block Fills Summary:')
if isinstance(block_fills, dict):
for key, value in block_fills.items():
if key in ['height', 'time', 'fills']:
# Already displayed above
continue
# Display other fields
if isinstance(value, (dict, list)):
print(f'• {key}: {json.dumps(value, separators=(",", ":"))[:100]}...')
else:
print(f'• {key}: {value}')
elif isinstance(block_fills, list):
print(f'• Block fills is a list with {len(block_fills)} items')
# Display first few items if it's a simple list
if len(block_fills) > 0:
print(f'• First item type: {type(block_fills[0]).__name__}')
except json.JSONDecodeError as e:
print(f'❌ Failed to parse JSON: {e}')
print(f'Raw data (first 200 bytes): {data[:200]}')
except Exception as e:
print(f'❌ Error processing block fills: {e}')
if __name__ == '__main__':
stream_block_fills()
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');
require('dotenv').config();
// Load proto file
const PROTO_PATH = path.join(__dirname, 'hyperliquid.proto');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const proto = grpc.loadPackageDefinition(packageDefinition);
// Main function
async function streamBlockFills() {
const endpoint = process.env.HYPERLIQUID_ENDPOINT;
const apiKey = process.env.API_KEY;
if (!endpoint) {
console.error('Error: HYPERLIQUID_ENDPOINT environment variable is required.');
console.error('Please create a .env file from .env.example and set your endpoint.');
process.exit(1);
}
if (!apiKey) {
console.error('Error: API_KEY environment variable is required.');
console.error('Please set your API key in the .env file.');
process.exit(1);
}
console.log('🚀 Hyperliquid Node.js gRPC Client - Stream Block Fills');
console.log('======================================================');
console.log(`📡 Endpoint: ${endpoint}\n`);
// Create metadata with API key
const metadata = new grpc.Metadata();
metadata.add('x-api-key', apiKey);
// Create client with SSL credentials and 150MB message size limit
const client = new proto.hyperliquid_l1_gateway.v1.HyperLiquidL1Gateway(
endpoint,
grpc.credentials.createSsl(),
{
'grpc.max_receive_message_length': 150 * 1024 * 1024
}
);
console.log('📥 Starting block fills stream...');
console.log('Press Ctrl+C to stop streaming\n');
// Make the gRPC call
const stream = client.StreamBlockFills({ timestamp: 0 }, metadata);
let blockFillsCount = 0;
stream.on('data', (data) => {
blockFillsCount++;
try {
const blockFills = JSON.parse(data.data);
console.log(`\n===== BLOCK FILLS #${blockFillsCount} =====`);
console.log(`📦 Response size: ${data.data.length} bytes`);
// Process each block fills response
processBlockFills(blockFills, blockFillsCount);
console.log('\n' + '─'.repeat(50));
} catch (error) {
console.error(`❌ Failed to parse message #${blockFillsCount}:`, error.message);
}
});
stream.on('error', (error) => {
console.error('❌ Stream error:', error.message);
});
stream.on('end', () => {
console.log('Stream ended');
console.log(`\n📊 Total block fills received: ${blockFillsCount}`);
});
}
function processBlockFills(blockFills, blockFillsNum) {
console.log(`💰 BLOCK FILLS #${blockFillsNum} DETAILS`);
console.log('========================');
// Display block height if available
if (blockFills.height) {
console.log(`📏 Block Height: ${blockFills.height}`);
}
// Display timestamp
if (blockFills.time) {
let timestamp = blockFills.time;
// Convert from milliseconds to seconds if needed
if (timestamp > 10**10) { // Likely milliseconds
timestamp = timestamp / 1000;
}
const dt = new Date(timestamp * 1000);
console.log(`⏰ Time: ${dt.toISOString().replace('T', ' ').substring(0, 19)} UTC`);
}
// Display fills data
if (blockFills.fills && Array.isArray(blockFills.fills)) {
const fills = blockFills.fills;
console.log(`📋 Total Fills: ${fills.length}`);
// Show first few fill details
const maxFills = Math.min(3, fills.length);
for (let i = 0; i < maxFills; i++) {
const fill = fills[i];
let fillInfo = ` • FILL ${i + 1}: `;
if (typeof fill === 'object') {
if (fill.symbol) fillInfo += `Symbol: ${fill.symbol}`;
if (fill.side) fillInfo += `, Side: ${fill.side}`;
if (fill.price) fillInfo += `, Price: ${fill.price}`;
if (fill.size) fillInfo += `, Size: ${fill.size}`;
if (fill.hash) fillInfo += `, Hash: ${fill.hash.substring(0, 12)}...`;
} else {
fillInfo += JSON.stringify(fill);
}
console.log(fillInfo);
}
if (fills.length > maxFills) {
console.log(` ... and ${fills.length - maxFills} more fills`);
}
}
// Display any other interesting fields
console.log('\n📊 Block Fills Summary:');
for (const [key, value] of Object.entries(blockFills)) {
if (['height', 'time', 'fills'].includes(key)) {
// Already displayed above
continue;
}
// Display other fields
if (typeof value === 'object') {
const jsonStr = JSON.stringify(value);
console.log(`• ${key}: ${jsonStr.substring(0, 100)}...`);
} else {
console.log(`• ${key}: ${value}`);
}
}
}
// Run it
streamBlockFills();
Common Use Cases#
1. Portfolio Position Tracking#
class PositionTracker:
def __init__(self, analyzer, user_address):
self.analyzer = analyzer
self.user_address = user_address
self.positions = defaultdict(float) # coin -> net position
self.realized_pnl = defaultdict(float) # coin -> total realized PnL
analyzer.add_fill_handler(self.track_position)
def track_position(self, fill: Fill):
if fill.user_address != self.user_address:
return
coin = fill.coin
size = float(fill.size)
# Track position changes based on direction
if fill.direction == 'Open Long':
self.positions[coin] += size
elif fill.direction == 'Open Short':
self.positions[coin] -= size
elif fill.direction == 'Close Long':
self.positions[coin] -= size
elif fill.direction == 'Close Short':
self.positions[coin] += size
# Track realized PnL
if fill.closed_pnl and fill.closed_pnl != '0':
self.realized_pnl[coin] += float(fill.closed_pnl)
print(f"Position update: {coin} {self.positions[coin]:+.6f} | Direction: {fill.direction} | PnL: {fill.closed_pnl}")
def get_current_positions(self):
return dict(self.positions)
def get_realized_pnl(self):
return dict(self.realized_pnl)
2. Execution Cost Analysis#
class ExecutionCostAnalyzer {
constructor(processor) {
this.processor = processor;
this.executionData = new Map(); // orderId -> execution info
processor.on('fill', (fill) => {
this.analyzeExecution(fill);
});
}
analyzeExecution(fill) {
if (!this.executionData.has(fill.orderId)) {
this.executionData.set(fill.orderId, {
fills: [],
totalFilled: 0,
weightedPrice: 0,
totalFees: 0
});
}
const execution = this.executionData.get(fill.orderId);
execution.fills.push(fill);
execution.totalFilled += fill.filledSize;
execution.totalFees += fill.fee;
// Calculate weighted average price
const totalValue = execution.fills.reduce((sum, f) => sum + (f.filledPrice * f.filledSize), 0);
execution.weightedPrice = totalValue / execution.totalFilled;
// Calculate execution cost
const feeRate = execution.totalFees / (execution.totalFilled * execution.weightedPrice);
if (execution.fills.length === 1) {
console.log(`✅ Complete fill: ${fill.symbol} ${fill.filledSize.toFixed(6)} @ ${fill.filledPrice.toFixed(6)}`);
} else {
console.log(`📊 Partial fill ${execution.fills.length}: ${fill.symbol} VWAP: ${execution.weightedPrice.toFixed(6)}, Fee rate: ${(feeRate * 100).toFixed(3)}%`);
}
}
}
3. Settlement Reconciliation#
type SettlementReconciler struct {
processor *FillStreamProcessor
expectedFills map[string]Fill // orderId -> expected fill
settledFills map[string]Fill // orderId -> actual fill
discrepancies []Discrepancy
}
type Discrepancy struct {
OrderID string
ExpectedSize float64
ActualSize float64
ExpectedPrice float64
ActualPrice float64
Difference float64
Timestamp time.Time
}
func (sr *SettlementReconciler) CheckSettlement(fill *Fill) {
expected, exists := sr.expectedFills[fill.OrderID]
if !exists {
log.Printf("⚠️ Unexpected fill: %s", fill.OrderID)
return
}
// Check for discrepancies
sizeDiff := abs(expected.FilledSize - fill.FilledSize)
priceDiff := abs(expected.FilledPrice - fill.FilledPrice)
if sizeDiff > 0.000001 || priceDiff > 0.000001 {
discrepancy := Discrepancy{
OrderID: fill.OrderID,
ExpectedSize: expected.FilledSize,
ActualSize: fill.FilledSize,
ExpectedPrice: expected.FilledPrice,
ActualPrice: fill.FilledPrice,
Difference: sizeDiff + priceDiff,
Timestamp: time.Now(),
}
sr.discrepancies = append(sr.discrepancies, discrepancy)
log.Printf("🔍 Settlement discrepancy detected: %+v", discrepancy)
} else {
log.Printf("✅ Settlement confirmed: %s", fill.OrderID)
}
sr.settledFills[fill.OrderID] = *fill
delete(sr.expectedFills, fill.OrderID)
}
4. Fee Optimization Tracking#
class FeeOptimizationTracker:
def __init__(self, analyzer):
self.analyzer = analyzer
self.fee_tiers = {} # user -> current fee tier
self.volume_thresholds = [1000, 5000, 10000, 50000] # Example thresholds
self.fee_rates = [0.1, 0.08, 0.06, 0.04, 0.02] # Corresponding fee rates (%)
analyzer.add_fill_handler(self.track_fees)
def track_fees(self, fill: Fill):
user = fill.user_address
# Get current user stats
user_stats = self.analyzer.get_user_analytics(user)
if not user_stats:
return
# Determine current fee tier based on volume
volume = user_stats['total_volume']
tier = 0
for i, threshold in enumerate(self.volume_thresholds):
if volume >= threshold:
tier = i + 1
else:
break
self.fee_tiers[user] = tier
# Calculate expected vs actual fee
expected_rate = self.fee_rates[tier] / 100
expected_fee = fill.filled_size * fill.filled_price * expected_rate
actual_fee = fill.fee
if abs(actual_fee - expected_fee) > 0.000001:
print(f"💰 Fee discrepancy for {user[:8]}...{user[-4:]}: "
f"Expected {expected_fee:.6f}, Actual {actual_fee:.6f}")
def get_fee_optimization_report(self):
report = {}
for user, tier in self.fee_tiers.items():
user_stats = self.analyzer.get_user_analytics(user)
if user_stats:
next_threshold = (self.volume_thresholds[tier]
if tier < len(self.volume_thresholds)
else float('inf'))
volume_to_next_tier = max(0, next_threshold - user_stats['total_volume'])
report[user] = {
'current_tier': tier,
'current_fee_rate': self.fee_rates[tier],
'volume_to_next_tier': volume_to_next_tier,
'potential_savings': self.calculate_potential_savings(user_stats, tier)
}
return report
def calculate_potential_savings(self, user_stats, current_tier):
if current_tier >= len(self.fee_rates) - 1:
return 0
current_rate = self.fee_rates[current_tier] / 100
next_rate = self.fee_rates[current_tier + 1] / 100
# Estimate savings on current volume if they upgraded tier
avg_trade_value = user_stats['total_volume'] * user_stats['avg_fill_price']
return avg_trade_value * (current_rate - next_rate)
Performance Monitoring#
1. Real-time Metrics Dashboard#
class FillMetricsDashboard {
constructor(processor) {
this.processor = processor;
this.metrics = {
fillsPerSecond: 0,
avgFillSize: 0,
avgExecutionPrice: 0,
totalFeesPerHour: 0,
uniqueUsersPerHour: new Set()
};
this.updateInterval = setInterval(() => {
this.updateMetrics();
}, 1000);
processor.on('fill', (fill) => {
this.trackRealtimeMetrics(fill);
});
}
trackRealtimeMetrics(fill) {
this.metrics.uniqueUsersPerHour.add(fill.userAddress);
// Update running averages
const currentTime = Date.now();
// Implementation for real-time metric calculation
}
updateMetrics() {
const now = Date.now();
const recentFills = this.processor.recentFills.filter(
f => f.timestamp * 1000 > now - 3600000 // Last hour
);
if (recentFills.length > 0) {
this.metrics.fillsPerSecond = recentFills.length / 3600;
this.metrics.avgFillSize = recentFills.reduce((sum, f) => sum + f.filledSize, 0) / recentFills.length;
this.metrics.totalFeesPerHour = recentFills.reduce((sum, f) => sum + f.fee, 0);
}
// Console dashboard
console.clear();
console.log('🖥️ Fill Stream Dashboard');
console.log('========================');
console.log(`Fills/sec: ${this.metrics.fillsPerSecond.toFixed(2)}`);
console.log(`Avg fill size: ${this.metrics.avgFillSize.toFixed(4)}`);
console.log(`Hourly fees: ${this.metrics.totalFeesPerHour.toFixed(6)}`);
console.log(`Active users: ${this.metrics.uniqueUsersPerHour.size}`);
console.log(`Total processed: ${this.processor.globalStats.totalFills}`);
}
stop() {
if (this.updateInterval) {
clearInterval(this.updateInterval);
}
}
}
Best Practices#
- Data Integrity: Validate fill data and handle missing or malformed fields gracefully
- Position Tracking: Maintain accurate position tracking for portfolio management
- Settlement Verification: Reconcile expected fills with actual executions
- Performance Monitoring: Track execution quality and fee optimization
- Error Handling: Implement robust error handling for stream interruptions
- Memory Management: Use bounded collections to prevent memory leaks in long-running processes
Current Limitations#
- Historical Data: Cannot stream from historical timestamps; only real-time data available
- Data Retention: Node maintains only 24 hours of historical fill data
- Order Lifecycle: Fill data is independent; order placement/cancellation requires separate tracking
- Settlement Finality: Fills represent execution but final settlement may have additional confirmation requirements
Need help? Contact our support team or check the Hyperliquid gRPC documentation.