Skip to main content

StreamFills

Stream continuous fill data starting from a position, providing real-time access to order executions on Hyperliquid.

Full Code Examples

Clone our gRPC Code Examples Repository for complete, runnable implementations. See the copy-trading-bot for a production-ready example using StreamFills.

When to Use This Method#

StreamFills is essential for:

  • Trade Monitoring - Track order executions in real-time
  • Position Management - Monitor fills for active trading strategies
  • Settlement Tracking - Verify order completions and partial fills
  • Analytics & Reporting - Collect comprehensive trade execution data

Method Signature#

rpc StreamFills(Position) returns (stream BlockFills) {}

Request Message#

message Position {
// Leave all fields unset or zero to target the latest data.
oneof position {
int64 timestamp_ms = 1; // ms since Unix epoch, inclusive
int64 block_height = 2; // block height, inclusive
}
}

The Position message allows flexible stream starting points:

  • timestamp_ms: Start streaming from a specific time (milliseconds since Unix epoch)
  • block_height: Start streaming from a specific block height
  • Empty/zero: Start streaming from the latest fills

Response Stream#

message BlockFills {
// JSON-encoded object from "node_fills" or "node_fills_by_block".
bytes data = 1;
}

The data field contains a JSON-encoded fills object with:

  • Fill execution details (price, size, side)
  • Order identifiers (order ID, client order ID)
  • Counterparty information
  • Timestamp and block reference

Full Fill Spec#

StreamFills emits a JSON payload matching Hyperliquid's node_fills format. Below is the exact structure.

Top-level keys (always present):

{
"local_time": "2025-07-27T08:50:10.334741319", // ISO-8601 timestamp when node processed fill (nanosecond precision)
"block_time": "2025-07-27T08:50:10.273720809", // ISO-8601 timestamp from block consensus
"block_number": 676607012, // Block height containing this fill
"events": [ // Array of [address, fill_data] pairs
[
"0x7839e2f2c375dd2935193f2736167514efff9916", // User address (40 hex chars, lowercase)
{
"coin": "BTC", // Trading pair symbol
"px": "118136.0", // Fill price (string)
"sz": "0.00009", // Fill size (string)
"side": "B", // "B" (buy) or "A" (sell/ask)
"time": 1753606210273, // Fill timestamp (ms since Unix epoch)
"startPosition": "-1.41864", // Position size before fill (string)
"dir": "Close Short", // Direction: "Open Long" | "Open Short" | "Close Long" | "Close Short"
"closedPnl": "-0.003753", // Realized PnL from closing position (string, can be negative)
"hash": "0xe7822040155eaa2e737e042854342401120052bbf063906ce8c8f3babe853a79", // Transaction hash (64 hex)
"oid": 121670079265, // Order ID (numeric)
"crossed": false, // Whether order crossed the spread
"fee": "-0.000212", // Trading fee (string, negative = rebate)
"tid": 161270588369408, // Trade ID (unique identifier)
"cloid": "0x09367b9f8541c581f95b02aaf05f1508", // Client order ID (optional, 32 hex)
"feeToken": "USDC", // Token used for fee payment
"builder": "0x49ae63056b3a0be0b166813ee687309ab653c07c", // Builder address (optional)
"builderFee": "0.005528" // Builder fee amount (optional, string)
}
]
// ... more [address, fill_data] pairs
]
}

Fill event entry (events[i]):

[
"0x...", // user_address - the trader's address
{
// Fill details object
}
]

Fill details fields:

FieldTypeDescription
coinstringTrading pair symbol (e.g., "BTC", "ETH", "SOL")
pxstringFill price
szstringFill size
sidestring"B" for buy, "A" for sell
timenumberFill timestamp in milliseconds since Unix epoch
startPositionstringPosition size before this fill
dirstringPosition direction: "Open Long", "Open Short", "Close Long", "Close Short"
closedPnlstringRealized PnL if closing a position (can be negative)
hashstringTransaction hash (64 hex characters)
oidnumberOrder ID assigned by the system
crossedbooleanWhether the order crossed the spread (taker vs maker)
feestringTrading fee (negative values indicate rebates)
tidnumberUnique trade identifier
cloidstringClient order ID if provided (optional)
feeTokenstringToken used for fee payment
builderstringBuilder address if order was routed through a builder (optional)
builderFeestringFee paid to builder (optional)

Guarantees and alignment:

  • events array contains all fills from the block for all users.
  • Each event pairs a user address with their fill details.
  • block_number corresponds to abci_block.round in StreamBlocks.
  • block_time aligns with abci_block.time in StreamBlocks.
  • Multiple fills per block are delivered in a single message.

Developer tips:

  • Use tid as the unique identifier for deduplication.
  • Track startPosition and dir to reconstruct position changes.
  • Sum closedPnl across fills to calculate realized PnL.
  • Normalize px, sz, fee, and closedPnl (strings) to numeric types as appropriate.
  • Handle optional fields (cloid, builder, builderFee) defensively.

Implementation Examples#

package main

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strings"

pb "hyperliquid-grpc-client/api/v2"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)

func streamFills() {
endpoint := os.Getenv("HYPERLIQUID_ENDPOINT")
apiKey := os.Getenv("API_KEY")

fmt.Println("🚀 Hyperliquid Go gRPC Client - Stream Fills")
fmt.Println("=============================================")
fmt.Printf("📡 Endpoint: %s\n\n", endpoint)

// Set up TLS connection
creds := credentials.NewTLS(nil)

// Connection options
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(150 * 1024 * 1024), // 150MB max
),
}

// Connect to server
fmt.Println("🔌 Connecting to gRPC server...")
conn, err := grpc.NewClient(endpoint, opts...)
if err != nil {
log.Fatalf("❌ Failed to connect: %v", err)
}
defer conn.Close()

// Create the client using generated code
client := pb.NewHyperliquidL1GatewayClient(conn)
fmt.Println("✅ Connected successfully!\n")

// Create context with API key
ctx := metadata.AppendToOutgoingContext(context.Background(), "x-api-key", apiKey)

// Create request - empty Position means latest/current fills
request := &pb.Position{}

fmt.Println("📥 Starting fills stream...")
fmt.Println("Press Ctrl+C to stop streaming\n")

// Start streaming fills
stream, err := client.StreamFills(ctx, request)
if err != nil {
log.Fatalf("❌ Failed to start stream: %v", err)
}

fillCount := 0
for {
response, err := stream.Recv()
if err != nil {
fmt.Printf("❌ Stream ended: %v\n", err)
break
}

fillCount++
fmt.Printf("\n===== FILLS #%d =====\n", fillCount)
fmt.Printf("📦 Response size: %d bytes\n", len(response.Data))

// Process each fill
processFills(response.Data, fillCount)

fmt.Println("\n" + strings.Repeat("─", 50))
}

fmt.Printf("\n📊 Total fill batches received: %d\n", fillCount)
}

func processFills(data []byte, batchNum int) {
// Parse JSON
var fills interface{}
if err := json.Unmarshal(data, &fills); err != nil {
fmt.Printf("❌ Failed to parse JSON: %v\n", err)
fmt.Printf("Raw data (first 200 bytes): %.200s\n", data)
return
}

fmt.Printf("🔄 FILLS BATCH #%d DETAILS\n", batchNum)
fmt.Println("=========================")

// Pretty print the fills data
prettyJSON, err := json.MarshalIndent(fills, "", " ")
if err != nil {
fmt.Printf("Data: %v\n", fills)
} else {
// Limit output to first 500 chars for readability
output := string(prettyJSON)
if len(output) > 500 {
output = output[:500] + "..."
}
fmt.Printf("%s\n", output)
}
}

func main() {
streamFills()
}

Common Use Cases#

1. Trade Execution Tracker#

class TradeExecutionTracker {
constructor(streamManager) {
this.streamManager = streamManager;
this.executedTrades = new Map();

streamManager.on('fill', (fillData) => {
this.processFill(fillData);
});
}

processFill(fillData) {
// Track each fill by order ID
const orderId = fillData.oid;

if (!this.executedTrades.has(orderId)) {
this.executedTrades.set(orderId, {
orderId,
fills: [],
totalFilled: 0,
avgPrice: 0
});
}

const trade = this.executedTrades.get(orderId);
trade.fills.push(fillData);
trade.totalFilled += parseFloat(fillData.sz);

// Recalculate average price
const totalValue = trade.fills.reduce(
(sum, f) => sum + parseFloat(f.px) * parseFloat(f.sz), 0
);
trade.avgPrice = totalValue / trade.totalFilled;

console.log(`Order ${orderId}: Filled ${trade.totalFilled} @ avg ${trade.avgPrice}`);
}
}

2. Fill Rate Monitor#

class FillRateMonitor:
def __init__(self):
self.fills_per_minute = []
self.current_minute_fills = 0
self.last_minute = None

def record_fill(self, fill_data):
"""Record a fill and track rates"""
from datetime import datetime

current_minute = datetime.now().replace(second=0, microsecond=0)

if self.last_minute != current_minute:
if self.last_minute is not None:
self.fills_per_minute.append({
'minute': self.last_minute,
'count': self.current_minute_fills
})
self.last_minute = current_minute
self.current_minute_fills = 0

self.current_minute_fills += 1

def get_average_rate(self, minutes=5):
"""Get average fills per minute over last N minutes"""
recent = self.fills_per_minute[-minutes:]
if not recent:
return 0
return sum(r['count'] for r in recent) / len(recent)

3. Position Reconciliation#

type PositionReconciler struct {
client pb.HyperliquidL1GatewayClient
positions map[string]float64
}

func (pr *PositionReconciler) ReconcileFills(ctx context.Context) {
stream, err := pr.client.StreamFills(ctx, &pb.Position{})
if err != nil {
log.Fatal(err)
}

for {
fill, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return
}

pr.updatePosition(fill.Data)
}
}

func (pr *PositionReconciler) updatePosition(data []byte) {
var fillData map[string]interface{}
json.Unmarshal(data, &fillData)

// Extract fill details and update position
if coin, ok := fillData["coin"].(string); ok {
size, _ := fillData["sz"].(float64)
side, _ := fillData["side"].(string)

if side == "B" {
pr.positions[coin] += size
} else {
pr.positions[coin] -= size
}

log.Printf("Position %s: %f", coin, pr.positions[coin])
}
}

Error Handling and Reconnection#

class RobustFillStreamer {
constructor(endpoint) {
this.endpoint = endpoint;
this.maxRetries = 5;
this.retryDelay = 1000;
this.currentRetries = 0;
}

async startStreamWithRetry() {
while (this.currentRetries < this.maxRetries) {
try {
await this.startStream();
this.currentRetries = 0;
this.retryDelay = 1000;
} catch (error) {
this.currentRetries++;
console.error(`Stream attempt ${this.currentRetries} failed:`, error.message);

if (this.currentRetries >= this.maxRetries) {
throw new Error('Max retry attempts exceeded');
}

// Exponential backoff
await this.sleep(this.retryDelay);
this.retryDelay *= 2;
}
}
}

sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

Best Practices#

  1. Connection Management: Implement robust reconnection logic with exponential backoff
  2. Memory Management: Use bounded collections for storing recent fills to prevent memory leaks
  3. Performance: Process fills asynchronously to avoid blocking the stream
  4. Monitoring: Track stream health and fill rates
  5. Error Recovery: Handle various error types (network, parsing, processing) gracefully
  6. Resource Cleanup: Properly close streams and connections on shutdown

Current Limitations#

  • Historical Data: Cannot stream from historical timestamps; only real-time streaming available
  • Data Retention: Node maintains only 24 hours of historical fill data
  • Backpressure: High-volume periods may require careful handling to avoid overwhelming downstream systems

Resources#

Need help? Contact our support team or check the Hyperliquid gRPC documentation.