Docs

StreamFills - Real-time Fill Streaming

Stream continuous fill data from Hyperliquid L1 Gateway via gRPC. Monitor order executions in real-time.

Stream continuous fill data starting from a position, providing real-time access to order executions on Hyperliquid.

Full Code Examples

Clone our gRPC Code Examples Repository for complete, runnable implementations. See the copy-trading-bot for a production-ready example using StreamFills.

When to Use This Method

StreamFills is essential for:

  • Trade Monitoring - Track order executions in real-time
  • Position Management - Monitor fills for active trading strategies
  • Settlement Tracking - Verify order completions and partial fills
  • Analytics & Reporting - Collect comprehensive trade execution data

Method Signature

protobuf
rpc StreamFills(Position) returns (stream BlockFills) {}

Request Message

protobuf
message Position {
  // Leave all fields unset or zero to target the latest data.
  oneof position {
    int64 timestamp_ms = 1; // ms since Unix epoch, inclusive
    int64 block_height = 2; // block height, inclusive
  }
}

The Position message allows flexible stream starting points:

  • timestamp_ms: Start streaming from a specific time (milliseconds since Unix epoch)
  • block_height: Start streaming from a specific block height
  • Empty/zero: Start streaming from the latest fills

Response Stream

protobuf
message BlockFills {
  // JSON-encoded object from "node_fills" or "node_fills_by_block".
  bytes data = 1;
}

The data field contains a JSON-encoded fills object with:

  • Fill execution details (price, size, side)
  • Order identifiers (order ID, client order ID)
  • Counterparty information
  • Timestamp and block reference

Full Fill Spec

StreamFills emits a JSON payload matching Hyperliquid's node_fills format. Below is the exact structure.

Top-level keys (always present):

JSON
{
  "local_time": "2025-07-27T08:50:10.334741319",   // ISO-8601 timestamp when node processed fill (nanosecond precision)
  "block_time": "2025-07-27T08:50:10.273720809",   // ISO-8601 timestamp from block consensus
  "block_number": 676607012,                        // Block height containing this fill
  "events": [                                       // Array of [address, fill_data] pairs
    [
      "0x7839e2f2c375dd2935193f2736167514efff9916",  // User address (40 hex chars, lowercase)
      {
        "coin": "BTC",                              // Trading pair symbol
        "px": "118136.0",                           // Fill price (string)
        "sz": "0.00009",                            // Fill size (string)
        "side": "B",                                // "B" (buy) or "A" (sell/ask)
        "time": 1753606210273,                      // Fill timestamp (ms since Unix epoch)
        "startPosition": "-1.41864",                // Position size before fill (string)
        "dir": "Close Short",                       // Direction: "Open Long" | "Open Short" | "Close Long" | "Close Short"
        "closedPnl": "-0.003753",                   // Realized PnL from closing position (string, can be negative)
        "hash": "0xe7822040155eaa2e737e042854342401120052bbf063906ce8c8f3babe853a79",  // Transaction hash (64 hex)
        "oid": 121670079265,                        // Order ID (numeric)
        "crossed": false,                           // Whether order crossed the spread
        "fee": "-0.000212",                         // Trading fee (string, negative = rebate)
        "tid": 161270588369408,                     // Trade ID (unique identifier)
        "cloid": "0x09367b9f8541c581f95b02aaf05f1508",  // Client order ID (optional, 32 hex)
        "feeToken": "USDC",                         // Token used for fee payment
        "builder": "0x49ae63056b3a0be0b166813ee687309ab653c07c",  // Builder address (optional)
        "builderFee": "0.005528"                    // Builder fee amount (optional, string)
      }
    ]
    // ... more [address, fill_data] pairs
  ]
}

Fill event entry (events[i]):

jsonc
[
  "0x...",  // user_address - the trader's address
  {
    // Fill details object
  }
]

Fill details fields:

FieldTypeDescription
coinstringTrading pair symbol (e.g., "BTC", "ETH", "SOL")
pxstringFill price
szstringFill size
sidestring"B" for buy, "A" for sell
timenumberFill timestamp in milliseconds since Unix epoch
startPositionstringPosition size before this fill
dirstringPosition direction: "Open Long", "Open Short", "Close Long", "Close Short"
closedPnlstringRealized PnL if closing a position (can be negative)
hashstringTransaction hash (64 hex characters)
oidnumberOrder ID assigned by the system
crossedbooleanWhether the order crossed the spread (taker vs maker)
feestringTrading fee (negative values indicate rebates)
tidnumberUnique trade identifier
cloidstringClient order ID if provided (optional)
feeTokenstringToken used for fee payment
builderstringBuilder address if order was routed through a builder (optional)
builderFeestringFee paid to builder (optional)

Guarantees and alignment:

  • events array contains all fills from the block for all users.
  • Each event pairs a user address with their fill details.
  • block_number corresponds to abci_block.round in StreamBlocks.
  • block_time aligns with abci_block.time in StreamBlocks.
  • Multiple fills per block are delivered in a single message.

Developer tips:

  • Use tid as the unique identifier for deduplication.
  • Track startPosition and dir to reconstruct position changes.
  • Sum closedPnl across fills to calculate realized PnL.
  • Normalize px, sz, fee, and closedPnl (strings) to numeric types as appropriate.
  • Handle optional fields (cloid, builder, builderFee) defensively.

Implementation Examples

Go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"strings"

	pb "hyperliquid-grpc-client/api/v2"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/metadata"
)

func streamFills() {
	endpoint := os.Getenv("HYPERLIQUID_ENDPOINT")
	apiKey := os.Getenv("API_KEY")

	fmt.Println("🚀 Hyperliquid Go gRPC Client - Stream Fills")
	fmt.Println("=============================================")
	fmt.Printf("📡 Endpoint: %s\n\n", endpoint)

	// Set up TLS connection
	creds := credentials.NewTLS(nil)

	// Connection options
	opts := []grpc.DialOption{
		grpc.WithTransportCredentials(creds),
		grpc.WithDefaultCallOptions(
			grpc.MaxCallRecvMsgSize(150 * 1024 * 1024), // 150MB max
		),
	}

	// Connect to server
	fmt.Println("🔌 Connecting to gRPC server...")
	conn, err := grpc.NewClient(endpoint, opts...)
	if err != nil {
		log.Fatalf("❌ Failed to connect: %v", err)
	}
	defer conn.Close()

	// Create the client using generated code
	client := pb.NewHyperliquidL1GatewayClient(conn)
	fmt.Println("✅ Connected successfully!\n")

	// Create context with API key
	ctx := metadata.AppendToOutgoingContext(context.Background(), "x-api-key", apiKey)

	// Create request - empty Position means latest/current fills
	request := &pb.Position{}

	fmt.Println("📥 Starting fills stream...")
	fmt.Println("Press Ctrl+C to stop streaming\n")

	// Start streaming fills
	stream, err := client.StreamFills(ctx, request)
	if err != nil {
		log.Fatalf("❌ Failed to start stream: %v", err)
	}

	fillCount := 0
	for {
		response, err := stream.Recv()
		if err != nil {
			fmt.Printf("❌ Stream ended: %v\n", err)
			break
		}

		fillCount++
		fmt.Printf("\n===== FILLS #%d =====\n", fillCount)
		fmt.Printf("📦 Response size: %d bytes\n", len(response.Data))

		// Process each fill
		processFills(response.Data, fillCount)

		fmt.Println("\n" + strings.Repeat("", 50))
	}

	fmt.Printf("\n📊 Total fill batches received: %d\n", fillCount)
}

func processFills(data []byte, batchNum int) {
	// Parse JSON
	var fills interface{}
	if err := json.Unmarshal(data, &fills); err != nil {
		fmt.Printf("❌ Failed to parse JSON: %v\n", err)
		fmt.Printf("Raw data (first 200 bytes): %.200s\n", data)
		return
	}

	fmt.Printf("🔄 FILLS BATCH #%d DETAILS\n", batchNum)
	fmt.Println("=========================")

	// Pretty print the fills data
	prettyJSON, err := json.MarshalIndent(fills, "", "  ")
	if err != nil {
		fmt.Printf("Data: %v\n", fills)
	} else {
		// Limit output to first 500 chars for readability
		output := string(prettyJSON)
		if len(output) > 500 {
			output = output[:500] + "..."
		}
		fmt.Printf("%s\n", output)
	}
}

func main() {
	streamFills()
}
Python
import grpc
import json
import signal
import sys
import os
from datetime import datetime
from dotenv import load_dotenv
import hyperliquid_pb2
import hyperliquid_pb2_grpc

# Load environment variables
load_dotenv()

def stream_fills():
    endpoint = os.getenv('HYPERLIQUID_ENDPOINT')
    api_key = os.getenv('API_KEY')

    if not endpoint:
        print("Error: HYPERLIQUID_ENDPOINT environment variable is required.")
        print("Please create a .env file from .env.example and set your endpoint.")
        sys.exit(1)

    if not api_key:
        print("Error: API_KEY environment variable is required.")
        print("Please set your API key in the .env file.")
        sys.exit(1)

    print('🚀 Hyperliquid Python gRPC Client - Stream Fills')
    print('================================================')
    print(f'📡 Endpoint: {endpoint}\n')

    # Create SSL credentials
    credentials = grpc.ssl_channel_credentials()

    # Create channel with options
    options = [
        ('grpc.max_receive_message_length', 150 * 1024 * 1024),  # 150MB max
    ]

    # Prepare metadata with API key
    metadata = [('x-api-key', api_key)]

    print('🔌 Connecting to gRPC server...')
    with grpc.secure_channel(endpoint, credentials, options=options) as channel:
        # Create client stub
        client = hyperliquid_pb2_grpc.HyperliquidL1GatewayStub(channel)
        print('✅ Connected successfully!\n')

        # Create request - empty Position means latest/current fills
        request = hyperliquid_pb2.Position()

        print('📥 Starting fills stream...')
        print('Press Ctrl+C to stop streaming\n')

        fill_count = 0

        # Set up signal handler for graceful shutdown
        def signal_handler(sig, frame):
            print('\n🛑 Stopping stream...')
            print(f'📊 Total fill batches received: {fill_count}')
            sys.exit(0)

        signal.signal(signal.SIGINT, signal_handler)

        try:
            # Start streaming fills with metadata
            for response in client.StreamFills(request, metadata=metadata):
                fill_count += 1
                print(f'\n===== FILLS #{fill_count} =====')
                print(f'📦 Response size: {len(response.data)} bytes')

                # Process each fill batch
                process_fills(response.data, fill_count)

                print('\n' + '' * 50)

        except grpc.RpcError as e:
            print(f'❌ Stream error: {e}')
        except KeyboardInterrupt:
            print('\n🛑 Stopping stream...')

        print(f'\n📊 Total fill batches received: {fill_count}')

def process_fills(data, batch_num):
    try:
        # Parse JSON
        fills = json.loads(data.decode('utf-8'))

        print(f'🔄 FILLS BATCH #{batch_num} DETAILS')
        print('=========================')

        # Pretty print (limited output)
        output = json.dumps(fills, indent=2)
        if len(output) > 500:
            output = output[:500] + '...'
        print(output)

    except json.JSONDecodeError as e:
        print(f'❌ Failed to parse JSON: {e}')
        print(f'Raw data (first 200 bytes): {data[:200]}')
    except Exception as e:
        print(f'❌ Error processing fills: {e}')

if __name__ == '__main__':
    stream_fills()
JavaScript
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');
require('dotenv').config();

// Load proto file
const PROTO_PATH = path.join(__dirname, 'v2.proto');

const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
});

const proto = grpc.loadPackageDefinition(packageDefinition);

// Main function
async function streamFills() {
    const endpoint = process.env.HYPERLIQUID_ENDPOINT;
    const apiKey = process.env.API_KEY;

    if (!endpoint) {
        console.error('Error: HYPERLIQUID_ENDPOINT environment variable is required.');
        console.error('Please create a .env file from .env.example and set your endpoint.');
        process.exit(1);
    }

    if (!apiKey) {
        console.error('Error: API_KEY environment variable is required.');
        console.error('Please set your API key in the .env file.');
        process.exit(1);
    }

    console.log('🚀 Hyperliquid Node.js gRPC Client - Stream Fills');
    console.log('=================================================');
    console.log(`📡 Endpoint: ${endpoint}\n`);

    // Create metadata with API key
    const metadata = new grpc.Metadata();
    metadata.add('x-api-key', apiKey);

    // Create client with SSL credentials and 150MB message size limit
    const client = new proto.hyperliquid_l1_gateway.v2.HyperliquidL1Gateway(
        endpoint,
        grpc.credentials.createSsl(),
        {
            'grpc.max_receive_message_length': 150 * 1024 * 1024
        }
    );

    console.log('📥 Starting fills stream...');
    console.log('Press Ctrl+C to stop streaming\n');

    // Make the gRPC call - empty Position for latest
    const stream = client.StreamFills({}, metadata);

    let fillCount = 0;

    stream.on('data', (data) => {
        fillCount++;

        try {
            const fills = JSON.parse(data.data);
            console.log(`\n===== FILLS #${fillCount} =====`);
            console.log(`📦 Response size: ${data.data.length} bytes`);

            // Process each fill batch
            processFills(fills, fillCount);

            console.log('\n' + ''.repeat(50));

        } catch (error) {
            console.error(`❌ Failed to parse message #${fillCount}:`, error.message);
        }
    });

    stream.on('error', (error) => {
        console.error('❌ Stream error:', error.message);
    });

    stream.on('end', () => {
        console.log('Stream ended');
        console.log(`\n📊 Total fill batches received: ${fillCount}`);
    });
}

function processFills(fills, batchNum) {
    console.log(`🔄 FILLS BATCH #${batchNum} DETAILS`);
    console.log('=========================');

    // Pretty print (limited output)
    let output = JSON.stringify(fills, null, 2);
    if (output.length > 500) {
        output = output.substring(0, 500) + '...';
    }
    console.log(output);
}

// Run it
streamFills();

Common Use Cases

1. Trade Execution Tracker

JavaScript
class TradeExecutionTracker {
    constructor(streamManager) {
        this.streamManager = streamManager;
        this.executedTrades = new Map();

        streamManager.on('fill', (fillData) => {
            this.processFill(fillData);
        });
    }

    processFill(fillData) {
        // Track each fill by order ID
        const orderId = fillData.oid;

        if (!this.executedTrades.has(orderId)) {
            this.executedTrades.set(orderId, {
                orderId,
                fills: [],
                totalFilled: 0,
                avgPrice: 0
            });
        }

        const trade = this.executedTrades.get(orderId);
        trade.fills.push(fillData);
        trade.totalFilled += parseFloat(fillData.sz);

        // Recalculate average price
        const totalValue = trade.fills.reduce(
            (sum, f) => sum + parseFloat(f.px) * parseFloat(f.sz), 0
        );
        trade.avgPrice = totalValue / trade.totalFilled;

        console.log(`Order ${orderId}: Filled ${trade.totalFilled} @ avg ${trade.avgPrice}`);
    }
}

2. Fill Rate Monitor

Python
class FillRateMonitor:
    def __init__(self):
        self.fills_per_minute = []
        self.current_minute_fills = 0
        self.last_minute = None

    def record_fill(self, fill_data):
        """Record a fill and track rates"""
        from datetime import datetime

        current_minute = datetime.now().replace(second=0, microsecond=0)

        if self.last_minute != current_minute:
            if self.last_minute is not None:
                self.fills_per_minute.append({
                    'minute': self.last_minute,
                    'count': self.current_minute_fills
                })
            self.last_minute = current_minute
            self.current_minute_fills = 0

        self.current_minute_fills += 1

    def get_average_rate(self, minutes=5):
        """Get average fills per minute over last N minutes"""
        recent = self.fills_per_minute[-minutes:]
        if not recent:
            return 0
        return sum(r['count'] for r in recent) / len(recent)

3. Position Reconciliation

Go
type PositionReconciler struct {
    client pb.HyperliquidL1GatewayClient
    positions map[string]float64
}

func (pr *PositionReconciler) ReconcileFills(ctx context.Context) {
    stream, err := pr.client.StreamFills(ctx, &pb.Position{})
    if err != nil {
        log.Fatal(err)
    }

    for {
        fill, err := stream.Recv()
        if err != nil {
            log.Printf("Stream error: %v", err)
            return
        }

        pr.updatePosition(fill.Data)
    }
}

func (pr *PositionReconciler) updatePosition(data []byte) {
    var fillData map[string]interface{}
    json.Unmarshal(data, &fillData)

    // Extract fill details and update position
    if coin, ok := fillData["coin"].(string); ok {
        size, _ := fillData["sz"].(float64)
        side, _ := fillData["side"].(string)

        if side == "B" {
            pr.positions[coin] += size
        } else {
            pr.positions[coin] -= size
        }

        log.Printf("Position %s: %f", coin, pr.positions[coin])
    }
}

Error Handling and Reconnection

JavaScript
class RobustFillStreamer {
    constructor(endpoint) {
        this.endpoint = endpoint;
        this.maxRetries = 5;
        this.retryDelay = 1000;
        this.currentRetries = 0;
    }

    async startStreamWithRetry() {
        while (this.currentRetries < this.maxRetries) {
            try {
                await this.startStream();
                this.currentRetries = 0;
                this.retryDelay = 1000;
            } catch (error) {
                this.currentRetries++;
                console.error(`Stream attempt ${this.currentRetries} failed:`, error.message);

                if (this.currentRetries >= this.maxRetries) {
                    throw new Error('Max retry attempts exceeded');
                }

                // Exponential backoff
                await this.sleep(this.retryDelay);
                this.retryDelay *= 2;
            }
        }
    }

    sleep(ms) {
        return new Promise(resolve => setTimeout(resolve, ms));
    }
}

Best Practices

  1. Connection Management: Implement robust reconnection logic with exponential backoff
  2. Memory Management: Use bounded collections for storing recent fills to prevent memory leaks
  3. Performance: Process fills asynchronously to avoid blocking the stream
  4. Monitoring: Track stream health and fill rates
  5. Error Recovery: Handle various error types (network, parsing, processing) gracefully
  6. Resource Cleanup: Properly close streams and connections on shutdown

Current Limitations

  • Historical Data: Cannot stream from historical timestamps; only real-time streaming available
  • Data Retention: Node maintains only 24 hours of historical fill data
  • Backpressure: High-volume periods may require careful handling to avoid overwhelming downstream systems

Resources

Need help? Contact our support team or check the Hyperliquid gRPC documentation.