⚠️Blast API (blastapi.io) ends Oct 31. Migrate to Dwellir and skip Alchemy's expensive compute units.
Switch Today →
Article Image

Building a Real-Time Hyperliquid Liquidation Tracker: From Stream to Dashboard in 200 Lines of Python

15th October 2025

Liquidations are one of the most critical events in DeFi trading. They signal market stress, volatility spikes, and can provide valuable insights into risk management and trading opportunities. In this tutorial, we'll build a real-time liquidation tracker for Hyperliquid using Dwellir's gRPC infrastructure, Python, and WebSockets.

By the end, you'll have a live dashboard that displays liquidation events as they happen on-chain—processing over 70 blocks per second with sub-second latency.

Video Tutorial

Watch the full step-by-step tutorial on YouTube:

Why Track Liquidations?

Understanding liquidation patterns gives traders and developers powerful insights:

The Architecture

Our liquidation tracker consists of three main components:

  1. gRPC Stream Client: Connects to Hyperliquid via Dwellir's high-performance gRPC endpoint
  2. WebSocket Server: Broadcasts liquidation events to connected web clients in real-time
  3. Frontend Dashboard: Displays liquidations as they occur with relevant details

Let's dive into the backend implementation.

Setting Up the gRPC Connection

First, we establish a secure connection to Hyperliquid's blockchain data through Dwellir's gRPC gateway:

# Setup gRPC connection
credentials = grpc.ssl_channel_credentials()
options = [('grpc.max_receive_message_length', 150 * 1024 * 1024)]
metadata = [('x-api-key', api_key)]

with grpc.secure_channel(endpoint, credentials, options=options) as channel:
    client = hyperliquid_pb2_grpc.HyperLiquidL1GatewayStub(channel)
    request = hyperliquid_pb2.Timestamp(timestamp=0)

    for response in client.StreamBlockFills(request, metadata=metadata):
        # Process each block fill
        liquidations = extract_liquidations(response.data)

The StreamBlockFills method provides a continuous stream of all fills (trades) happening on Hyperliquid. This is where we'll find our liquidation events.

Key Points:

Extracting Liquidation Events

Not every fill is a liquidation. We need to parse the incoming data and filter for liquidation-specific fields:

def extract_liquidations(block_fills_data):
    """Extract liquidation events from block fills data"""
    liquidations = []

    # Parse the incoming data
    block_fills = json.loads(block_fills_data.decode('utf-8'))

    # Stream sends fills as [user_address, fill_data] pairs
    if isinstance(block_fills, list) and len(block_fills) == 2:
        user_address, fill_data = block_fills

        # Check if this fill has a liquidation
        if 'liquidation' in fill_data:
            liquidation_info = fill_data.get('liquidation', {})
            liquidated_user = liquidation_info.get('liquidatedUser', '')
            direction = fill_data.get('dir', '')

            # Only include closing positions where user matches liquidated user
            is_closing = 'close' in direction.lower()
            is_liquidated_user = user_address.lower() == liquidated_user.lower()

            if is_closing and is_liquidated_user:
                liquidation_event = {
                    'userAddress': user_address,
                    'coin': fill_data.get('coin'),
                    'price': fill_data.get('px'),
                    'size': fill_data.get('sz'),
                    'side': fill_data.get('side'),
                    'timestamp': fill_data.get('time'),
                    'direction': direction,
                    'closedPnl': fill_data.get('closedPnl', '0'),
                    'liquidation': liquidation_info
                }
                liquidations.append(liquidation_event)

    return liquidations

Understanding the Logic:

  1. Data Format: Each block fill arrives as a [user_address, fill_data] pair
  2. Liquidation Field: Only fills with a liquidation key are liquidation-related
  3. Filtering Logic: We specifically look for:
    • Closing positions: The dir field contains "Close Long" or "Close Short"
    • Victim match: The user_address matches liquidatedUser (this is the trader being liquidated, not the liquidator)
  4. Event Structure: We extract relevant fields like coin, price, size, P&L, and liquidation details

This filtering is crucial because liquidation events create multiple fills—one for the liquidated trader and others for the liquidators taking over the position. We only want to track the actual liquidation victim.

Broadcasting to WebSocket Clients

Once we identify a liquidation, we broadcast it to all connected web clients:

async def broadcast_liquidation(liquidation_data):
    """Broadcast liquidation event to all connected clients"""
    if not connected_clients:
        return

    message = json.dumps(liquidation_data)
    disconnected = set()

    for ws in connected_clients:
        try:
            await ws.send_str(message)
        except Exception as e:
            disconnected.add(ws)

    # Remove disconnected clients
    connected_clients.difference_update(disconnected)

This function:

The Main Event Loop

The core of our application processes blocks continuously:

for response in client.StreamBlockFills(request, metadata=metadata):
    block_count += 1

    # Check for liquidations
    liquidations = extract_liquidations(response.data)

    if liquidations:
        liquidation_count += len(liquidations)
        for liq in liquidations:
            print(f'🔥 LIQUIDATION #{liquidation_count}')
            print(f'   {liq["coin"]}: {liq["size"]} @ ${liq["price"]}')
            print(f'   P&L: ${liq["closedPnl"]} | {liq["direction"]}')
            await broadcast_liquidation(liq)

    await asyncio.sleep(0.01)

This simple loop:

  1. Receives each block fill from the gRPC stream
  2. Extracts any liquidations
  3. Logs details to the console
  4. Broadcasts to connected clients
  5. Yields control to process WebSocket messages

WebSocket Server Setup

We use aiohttp to create a lightweight WebSocket server:

async def handle_websocket(request):
    """Handle WebSocket connections from frontend clients"""
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    connected_clients.add(ws)
    print(f'✅ New client connected. Total clients: {len(connected_clients)}')

    try:
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                if msg.data == 'ping':
                    await ws.send_str('pong')
    finally:
        connected_clients.discard(ws)
        print(f'👋 Client disconnected. Total clients: {len(connected_clients)}')

    return ws

The server:

Frontend Dashboard

The frontend is a simple HTML/JavaScript application that connects to the WebSocket server and displays liquidations in a clean, real-time interface. It features:

The dashboard updates instantly as liquidations occur, providing a live view into Hyperliquid's liquidation activity.

Performance Characteristics

Our tracker handles impressive throughput:

This performance is made possible by Dwellir's optimized gRPC infrastructure, which provides:

Why Use Dwellir's gRPC Service?

Building production-grade blockchain infrastructure is complex and resource-intensive. Dwellir's gRPC service provides:

🚀 Enterprise Performance

🔒 Reliability & Security

💰 Cost-Effective Scaling

🛠️ Developer Experience

Getting Started

Ready to build your own real-time trading applications? Here's how to get started:

  1. Sign up for Dwellir: Get your API key at dwellir.com
  2. Clone the repo: Full code available on GitHub
  3. Watch the tutorial: Step-by-step video walkthrough on our YouTube channel
  4. Read the docs: Comprehensive API documentation at dwellir.com/docs

Use Cases Beyond Liquidations

This architecture isn't limited to liquidation tracking. You can adapt it to build:

Conclusion

Real-time blockchain data access opens up countless possibilities for DeFi applications. With Dwellir's gRPC infrastructure, you can focus on building innovative features rather than managing complex infrastructure.

The liquidation tracker we built demonstrates how just ~200 lines of Python can create a production-ready monitoring system that processes thousands of blockchain events per second.

Whether you're building trading tools, analytics platforms, or data pipelines, Dwellir provides the reliable, high-performance infrastructure you need to succeed.


Ready to Start Building?

For more information and resources, visit:

References

Trading cryptocurrencies involves significant risk. This content is for educational purposes only and does not constitute financial advice.

Get your API key

and join other leading Web3 companies using Dwellir's infrastructure