StreamFills
Stream continuous fill data starting from a position, providing real-time access to order executions on Hyperliquid.
Full Code Examples
Clone our gRPC Code Examples Repository for complete, runnable implementations. See the copy-trading-bot for a production-ready example using StreamFills.
When to Use This Method#
StreamFills is essential for:
- Trade Monitoring - Track order executions in real-time
- Position Management - Monitor fills for active trading strategies
- Settlement Tracking - Verify order completions and partial fills
- Analytics & Reporting - Collect comprehensive trade execution data
Method Signature#
rpc StreamFills(Position) returns (stream BlockFills) {}
Request Message#
message Position {
// Leave all fields unset or zero to target the latest data.
oneof position {
int64 timestamp_ms = 1; // ms since Unix epoch, inclusive
int64 block_height = 2; // block height, inclusive
}
}
The Position message allows flexible stream starting points:
- timestamp_ms: Start streaming from a specific time (milliseconds since Unix epoch)
- block_height: Start streaming from a specific block height
- Empty/zero: Start streaming from the latest fills
Response Stream#
message BlockFills {
// JSON-encoded object from "node_fills" or "node_fills_by_block".
bytes data = 1;
}
The data field contains a JSON-encoded fills object with:
- Fill execution details (price, size, side)
- Order identifiers (order ID, client order ID)
- Counterparty information
- Timestamp and block reference
Implementation Examples#
- Go
- Python
- Node.js
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strings"
pb "hyperliquid-grpc-client/api/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
func streamFills() {
endpoint := os.Getenv("HYPERLIQUID_ENDPOINT")
apiKey := os.Getenv("API_KEY")
fmt.Println("🚀 Hyperliquid Go gRPC Client - Stream Fills")
fmt.Println("=============================================")
fmt.Printf("📡 Endpoint: %s\n\n", endpoint)
// Set up TLS connection
creds := credentials.NewTLS(nil)
// Connection options
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(150 * 1024 * 1024), // 150MB max
),
}
// Connect to server
fmt.Println("🔌 Connecting to gRPC server...")
conn, err := grpc.NewClient(endpoint, opts...)
if err != nil {
log.Fatalf("❌ Failed to connect: %v", err)
}
defer conn.Close()
// Create the client using generated code
client := pb.NewHyperliquidL1GatewayClient(conn)
fmt.Println("✅ Connected successfully!\n")
// Create context with API key
ctx := metadata.AppendToOutgoingContext(context.Background(), "x-api-key", apiKey)
// Create request - empty Position means latest/current fills
request := &pb.Position{}
fmt.Println("📥 Starting fills stream...")
fmt.Println("Press Ctrl+C to stop streaming\n")
// Start streaming fills
stream, err := client.StreamFills(ctx, request)
if err != nil {
log.Fatalf("❌ Failed to start stream: %v", err)
}
fillCount := 0
for {
response, err := stream.Recv()
if err != nil {
fmt.Printf("❌ Stream ended: %v\n", err)
break
}
fillCount++
fmt.Printf("\n===== FILLS #%d =====\n", fillCount)
fmt.Printf("📦 Response size: %d bytes\n", len(response.Data))
// Process each fill
processFills(response.Data, fillCount)
fmt.Println("\n" + strings.Repeat("─", 50))
}
fmt.Printf("\n📊 Total fill batches received: %d\n", fillCount)
}
func processFills(data []byte, batchNum int) {
// Parse JSON
var fills interface{}
if err := json.Unmarshal(data, &fills); err != nil {
fmt.Printf("❌ Failed to parse JSON: %v\n", err)
fmt.Printf("Raw data (first 200 bytes): %.200s\n", data)
return
}
fmt.Printf("🔄 FILLS BATCH #%d DETAILS\n", batchNum)
fmt.Println("=========================")
// Pretty print the fills data
prettyJSON, err := json.MarshalIndent(fills, "", " ")
if err != nil {
fmt.Printf("Data: %v\n", fills)
} else {
// Limit output to first 500 chars for readability
output := string(prettyJSON)
if len(output) > 500 {
output = output[:500] + "..."
}
fmt.Printf("%s\n", output)
}
}
func main() {
streamFills()
}
import grpc
import json
import signal
import sys
import os
from datetime import datetime
from dotenv import load_dotenv
import hyperliquid_pb2
import hyperliquid_pb2_grpc
# Load environment variables
load_dotenv()
def stream_fills():
endpoint = os.getenv('HYPERLIQUID_ENDPOINT')
api_key = os.getenv('API_KEY')
if not endpoint:
print("Error: HYPERLIQUID_ENDPOINT environment variable is required.")
print("Please create a .env file from .env.example and set your endpoint.")
sys.exit(1)
if not api_key:
print("Error: API_KEY environment variable is required.")
print("Please set your API key in the .env file.")
sys.exit(1)
print('🚀 Hyperliquid Python gRPC Client - Stream Fills')
print('================================================')
print(f'📡 Endpoint: {endpoint}\n')
# Create SSL credentials
credentials = grpc.ssl_channel_credentials()
# Create channel with options
options = [
('grpc.max_receive_message_length', 150 * 1024 * 1024), # 150MB max
]
# Prepare metadata with API key
metadata = [('x-api-key', api_key)]
print('🔌 Connecting to gRPC server...')
with grpc.secure_channel(endpoint, credentials, options=options) as channel:
# Create client stub
client = hyperliquid_pb2_grpc.HyperliquidL1GatewayStub(channel)
print('✅ Connected successfully!\n')
# Create request - empty Position means latest/current fills
request = hyperliquid_pb2.Position()
print('📥 Starting fills stream...')
print('Press Ctrl+C to stop streaming\n')
fill_count = 0
# Set up signal handler for graceful shutdown
def signal_handler(sig, frame):
print('\n🛑 Stopping stream...')
print(f'📊 Total fill batches received: {fill_count}')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
try:
# Start streaming fills with metadata
for response in client.StreamFills(request, metadata=metadata):
fill_count += 1
print(f'\n===== FILLS #{fill_count} =====')
print(f'📦 Response size: {len(response.data)} bytes')
# Process each fill batch
process_fills(response.data, fill_count)
print('\n' + '─' * 50)
except grpc.RpcError as e:
print(f'❌ Stream error: {e}')
except KeyboardInterrupt:
print('\n🛑 Stopping stream...')
print(f'\n📊 Total fill batches received: {fill_count}')
def process_fills(data, batch_num):
try:
# Parse JSON
fills = json.loads(data.decode('utf-8'))
print(f'🔄 FILLS BATCH #{batch_num} DETAILS')
print('=========================')
# Pretty print (limited output)
output = json.dumps(fills, indent=2)
if len(output) > 500:
output = output[:500] + '...'
print(output)
except json.JSONDecodeError as e:
print(f'❌ Failed to parse JSON: {e}')
print(f'Raw data (first 200 bytes): {data[:200]}')
except Exception as e:
print(f'❌ Error processing fills: {e}')
if __name__ == '__main__':
stream_fills()
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');
require('dotenv').config();
// Load proto file
const PROTO_PATH = path.join(__dirname, 'v2.proto');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const proto = grpc.loadPackageDefinition(packageDefinition);
// Main function
async function streamFills() {
const endpoint = process.env.HYPERLIQUID_ENDPOINT;
const apiKey = process.env.API_KEY;
if (!endpoint) {
console.error('Error: HYPERLIQUID_ENDPOINT environment variable is required.');
console.error('Please create a .env file from .env.example and set your endpoint.');
process.exit(1);
}
if (!apiKey) {
console.error('Error: API_KEY environment variable is required.');
console.error('Please set your API key in the .env file.');
process.exit(1);
}
console.log('🚀 Hyperliquid Node.js gRPC Client - Stream Fills');
console.log('=================================================');
console.log(`📡 Endpoint: ${endpoint}\n`);
// Create metadata with API key
const metadata = new grpc.Metadata();
metadata.add('x-api-key', apiKey);
// Create client with SSL credentials and 150MB message size limit
const client = new proto.hyperliquid_l1_gateway.v2.HyperliquidL1Gateway(
endpoint,
grpc.credentials.createSsl(),
{
'grpc.max_receive_message_length': 150 * 1024 * 1024
}
);
console.log('📥 Starting fills stream...');
console.log('Press Ctrl+C to stop streaming\n');
// Make the gRPC call - empty Position for latest
const stream = client.StreamFills({}, metadata);
let fillCount = 0;
stream.on('data', (data) => {
fillCount++;
try {
const fills = JSON.parse(data.data);
console.log(`\n===== FILLS #${fillCount} =====`);
console.log(`📦 Response size: ${data.data.length} bytes`);
// Process each fill batch
processFills(fills, fillCount);
console.log('\n' + '─'.repeat(50));
} catch (error) {
console.error(`❌ Failed to parse message #${fillCount}:`, error.message);
}
});
stream.on('error', (error) => {
console.error('❌ Stream error:', error.message);
});
stream.on('end', () => {
console.log('Stream ended');
console.log(`\n📊 Total fill batches received: ${fillCount}`);
});
}
function processFills(fills, batchNum) {
console.log(`🔄 FILLS BATCH #${batchNum} DETAILS`);
console.log('=========================');
// Pretty print (limited output)
let output = JSON.stringify(fills, null, 2);
if (output.length > 500) {
output = output.substring(0, 500) + '...';
}
console.log(output);
}
// Run it
streamFills();
Common Use Cases#
1. Trade Execution Tracker#
class TradeExecutionTracker {
constructor(streamManager) {
this.streamManager = streamManager;
this.executedTrades = new Map();
streamManager.on('fill', (fillData) => {
this.processFill(fillData);
});
}
processFill(fillData) {
// Track each fill by order ID
const orderId = fillData.oid;
if (!this.executedTrades.has(orderId)) {
this.executedTrades.set(orderId, {
orderId,
fills: [],
totalFilled: 0,
avgPrice: 0
});
}
const trade = this.executedTrades.get(orderId);
trade.fills.push(fillData);
trade.totalFilled += parseFloat(fillData.sz);
// Recalculate average price
const totalValue = trade.fills.reduce(
(sum, f) => sum + parseFloat(f.px) * parseFloat(f.sz), 0
);
trade.avgPrice = totalValue / trade.totalFilled;
console.log(`Order ${orderId}: Filled ${trade.totalFilled} @ avg ${trade.avgPrice}`);
}
}
2. Fill Rate Monitor#
class FillRateMonitor:
def __init__(self):
self.fills_per_minute = []
self.current_minute_fills = 0
self.last_minute = None
def record_fill(self, fill_data):
"""Record a fill and track rates"""
from datetime import datetime
current_minute = datetime.now().replace(second=0, microsecond=0)
if self.last_minute != current_minute:
if self.last_minute is not None:
self.fills_per_minute.append({
'minute': self.last_minute,
'count': self.current_minute_fills
})
self.last_minute = current_minute
self.current_minute_fills = 0
self.current_minute_fills += 1
def get_average_rate(self, minutes=5):
"""Get average fills per minute over last N minutes"""
recent = self.fills_per_minute[-minutes:]
if not recent:
return 0
return sum(r['count'] for r in recent) / len(recent)
3. Position Reconciliation#
type PositionReconciler struct {
client pb.HyperliquidL1GatewayClient
positions map[string]float64
}
func (pr *PositionReconciler) ReconcileFills(ctx context.Context) {
stream, err := pr.client.StreamFills(ctx, &pb.Position{})
if err != nil {
log.Fatal(err)
}
for {
fill, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
return
}
pr.updatePosition(fill.Data)
}
}
func (pr *PositionReconciler) updatePosition(data []byte) {
var fillData map[string]interface{}
json.Unmarshal(data, &fillData)
// Extract fill details and update position
if coin, ok := fillData["coin"].(string); ok {
size, _ := fillData["sz"].(float64)
side, _ := fillData["side"].(string)
if side == "B" {
pr.positions[coin] += size
} else {
pr.positions[coin] -= size
}
log.Printf("Position %s: %f", coin, pr.positions[coin])
}
}
Error Handling and Reconnection#
class RobustFillStreamer {
constructor(endpoint) {
this.endpoint = endpoint;
this.maxRetries = 5;
this.retryDelay = 1000;
this.currentRetries = 0;
}
async startStreamWithRetry() {
while (this.currentRetries < this.maxRetries) {
try {
await this.startStream();
this.currentRetries = 0;
this.retryDelay = 1000;
} catch (error) {
this.currentRetries++;
console.error(`Stream attempt ${this.currentRetries} failed:`, error.message);
if (this.currentRetries >= this.maxRetries) {
throw new Error('Max retry attempts exceeded');
}
// Exponential backoff
await this.sleep(this.retryDelay);
this.retryDelay *= 2;
}
}
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Best Practices#
- Connection Management: Implement robust reconnection logic with exponential backoff
- Memory Management: Use bounded collections for storing recent fills to prevent memory leaks
- Performance: Process fills asynchronously to avoid blocking the stream
- Monitoring: Track stream health and fill rates
- Error Recovery: Handle various error types (network, parsing, processing) gracefully
- Resource Cleanup: Properly close streams and connections on shutdown
Current Limitations#
- Historical Data: Cannot stream from historical timestamps; only real-time streaming available
- Data Retention: Node maintains only 24 hours of historical fill data
- Backpressure: High-volume periods may require careful handling to avoid overwhelming downstream systems
Resources#
- GitHub: gRPC Code Examples - Complete working examples
- Copy Trading Bot - Uses
StreamFillsto mirror trades in real-time
Need help? Contact our support team or check the Hyperliquid gRPC documentation.