Skip to main content

StreamFills

Stream continuous fill data starting from a position, providing real-time access to order executions on Hyperliquid.

Full Code Examples

Clone our gRPC Code Examples Repository for complete, runnable implementations. See the copy-trading-bot for a production-ready example using StreamFills.

When to Use This Method#

StreamFills is essential for:

  • Trade Monitoring - Track order executions in real-time
  • Position Management - Monitor fills for active trading strategies
  • Settlement Tracking - Verify order completions and partial fills
  • Analytics & Reporting - Collect comprehensive trade execution data

Method Signature#

rpc StreamFills(Position) returns (stream BlockFills) {}

Request Message#

message Position {
// Leave all fields unset or zero to target the latest data.
oneof position {
int64 timestamp_ms = 1; // ms since Unix epoch, inclusive
int64 block_height = 2; // block height, inclusive
}
}

The Position message allows flexible stream starting points:

  • timestamp_ms: Start streaming from a specific time (milliseconds since Unix epoch)
  • block_height: Start streaming from a specific block height
  • Empty/zero: Start streaming from the latest fills

Response Stream#

message BlockFills {
// JSON-encoded object from "node_fills" or "node_fills_by_block".
bytes data = 1;
}

The data field contains a JSON-encoded fills object with:

  • Fill execution details (price, size, side)
  • Order identifiers (order ID, client order ID)
  • Counterparty information
  • Timestamp and block reference

Implementation Examples#

package main

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strings"

pb "hyperliquid-grpc-client/api/v2"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)

func streamFills() {
endpoint := os.Getenv("HYPERLIQUID_ENDPOINT")
apiKey := os.Getenv("API_KEY")

fmt.Println("🚀 Hyperliquid Go gRPC Client - Stream Fills")
fmt.Println("=============================================")
fmt.Printf("📡 Endpoint: %s\n\n", endpoint)

// Set up TLS connection
creds := credentials.NewTLS(nil)

// Connection options
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(150 * 1024 * 1024), // 150MB max
),
}

// Connect to server
fmt.Println("🔌 Connecting to gRPC server...")
conn, err := grpc.NewClient(endpoint, opts...)
if err != nil {
log.Fatalf("❌ Failed to connect: %v", err)
}
defer conn.Close()

// Create the client using generated code
client := pb.NewHyperliquidL1GatewayClient(conn)
fmt.Println("✅ Connected successfully!\n")

// Create context with API key
ctx := metadata.AppendToOutgoingContext(context.Background(), "x-api-key", apiKey)

// Create request - empty Position means latest/current fills
request := &pb.Position{}

fmt.Println("📥 Starting fills stream...")
fmt.Println("Press Ctrl+C to stop streaming\n")

// Start streaming fills
stream, err := client.StreamFills(ctx, request)
if err != nil {
log.Fatalf("❌ Failed to start stream: %v", err)
}

fillCount := 0
for {
response, err := stream.Recv()
if err != nil {
fmt.Printf("❌ Stream ended: %v\n", err)
break
}

fillCount++
fmt.Printf("\n===== FILLS #%d =====\n", fillCount)
fmt.Printf("📦 Response size: %d bytes\n", len(response.Data))

// Process each fill
processFills(response.Data, fillCount)

fmt.Println("\n" + strings.Repeat("─", 50))
}

fmt.Printf("\n📊 Total fill batches received: %d\n", fillCount)
}

func processFills(data []byte, batchNum int) {
// Parse JSON
var fills interface{}
if err := json.Unmarshal(data, &fills); err != nil {
fmt.Printf("❌ Failed to parse JSON: %v\n", err)
fmt.Printf("Raw data (first 200 bytes): %.200s\n", data)
return
}

fmt.Printf("🔄 FILLS BATCH #%d DETAILS\n", batchNum)
fmt.Println("=========================")

// Pretty print the fills data
prettyJSON, err := json.MarshalIndent(fills, "", " ")
if err != nil {
fmt.Printf("Data: %v\n", fills)
} else {
// Limit output to first 500 chars for readability
output := string(prettyJSON)
if len(output) > 500 {
output = output[:500] + "..."
}
fmt.Printf("%s\n", output)
}
}

func main() {
streamFills()
}

Common Use Cases#

1. Trade Execution Tracker#

class TradeExecutionTracker {
constructor(streamManager) {
this.streamManager = streamManager;
this.executedTrades = new Map();

streamManager.on('fill', (fillData) => {
this.processFill(fillData);
});
}

processFill(fillData) {
// Track each fill by order ID
const orderId = fillData.oid;

if (!this.executedTrades.has(orderId)) {
this.executedTrades.set(orderId, {
orderId,
fills: [],
totalFilled: 0,
avgPrice: 0
});
}

const trade = this.executedTrades.get(orderId);
trade.fills.push(fillData);
trade.totalFilled += parseFloat(fillData.sz);

// Recalculate average price
const totalValue = trade.fills.reduce(
(sum, f) => sum + parseFloat(f.px) * parseFloat(f.sz), 0
);
trade.avgPrice = totalValue / trade.totalFilled;

console.log(`Order ${orderId}: Filled ${trade.totalFilled} @ avg ${trade.avgPrice}`);
}
}

2. Fill Rate Monitor#

class FillRateMonitor:
def __init__(self):
self.fills_per_minute = []
self.current_minute_fills = 0
self.last_minute = None

def record_fill(self, fill_data):
"""Record a fill and track rates"""
from datetime import datetime

current_minute = datetime.now().replace(second=0, microsecond=0)

if self.last_minute != current_minute:
if self.last_minute is not None:
self.fills_per_minute.append({
'minute': self.last_minute,
'count': self.current_minute_fills
})
self.last_minute = current_minute
self.current_minute_fills = 0

self.current_minute_fills += 1

def get_average_rate(self, minutes=5):
"""Get average fills per minute over last N minutes"""
recent = self.fills_per_minute[-minutes:]
if not recent:
return 0
return sum(r['count'] for r in recent) / len(recent)

3. Position Reconciliation#

type PositionReconciler struct {
client pb.HyperliquidL1GatewayClient
positions map[string]float64
}

func (pr *PositionReconciler) ReconcileFills(ctx context.Context) {
stream, err := pr.client.StreamFills(ctx, &pb.Position{})
if err != nil {
log.Fatal(err)
}

for {
fill, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return
}

pr.updatePosition(fill.Data)
}
}

func (pr *PositionReconciler) updatePosition(data []byte) {
var fillData map[string]interface{}
json.Unmarshal(data, &fillData)

// Extract fill details and update position
if coin, ok := fillData["coin"].(string); ok {
size, _ := fillData["sz"].(float64)
side, _ := fillData["side"].(string)

if side == "B" {
pr.positions[coin] += size
} else {
pr.positions[coin] -= size
}

log.Printf("Position %s: %f", coin, pr.positions[coin])
}
}

Error Handling and Reconnection#

class RobustFillStreamer {
constructor(endpoint) {
this.endpoint = endpoint;
this.maxRetries = 5;
this.retryDelay = 1000;
this.currentRetries = 0;
}

async startStreamWithRetry() {
while (this.currentRetries < this.maxRetries) {
try {
await this.startStream();
this.currentRetries = 0;
this.retryDelay = 1000;
} catch (error) {
this.currentRetries++;
console.error(`Stream attempt ${this.currentRetries} failed:`, error.message);

if (this.currentRetries >= this.maxRetries) {
throw new Error('Max retry attempts exceeded');
}

// Exponential backoff
await this.sleep(this.retryDelay);
this.retryDelay *= 2;
}
}
}

sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

Best Practices#

  1. Connection Management: Implement robust reconnection logic with exponential backoff
  2. Memory Management: Use bounded collections for storing recent fills to prevent memory leaks
  3. Performance: Process fills asynchronously to avoid blocking the stream
  4. Monitoring: Track stream health and fill rates
  5. Error Recovery: Handle various error types (network, parsing, processing) gracefully
  6. Resource Cleanup: Properly close streams and connections on shutdown

Current Limitations#

  • Historical Data: Cannot stream from historical timestamps; only real-time streaming available
  • Data Retention: Node maintains only 24 hours of historical fill data
  • Backpressure: High-volume periods may require careful handling to avoid overwhelming downstream systems

Resources#

Need help? Contact our support team or check the Hyperliquid gRPC documentation.