Guides
Backfill the Full Hyperliquid OHLCV Archive cover

Backfill the Full Hyperliquid OHLCV Archive

Pull the full Hyperliquid OHLCV archive for one market and one interval from Dwellir's index endpoint with a resumable multi-process Python exporter that sustains thousands of requests per second.

LanguagePython
FormatCSV
ProtocolREST

If you need the full OHLCV archive for one Hyperliquid market, the contract is straightforward: request one candle at a time and iterate the bucket-open timestamps yourself.

That is the intended archival path on api-hyperliquid-index.n.dwellir.com today. There is no separate bulk history route, and range-style query params do not change the response shape. In this guide, we show you how to build a resumable Python exporter that discovers the endpoint with the Dwellir CLI, spreads work across multiple worker processes over HTTP/2, respects your plan's request-rate ceiling, skips empty sparse gaps, and merges the results back into one ordered CSV.

The exporter is designed to sustain close to the Scale plan's 5000 RPS ceiling from a single workstation. On our measurements it sustains around 4500–5000 useful RPS while backfilling one month of 1s data with zero errors.

Copy-Paste Prompt for Your Coding Tool

Paste this into Claude Code, Codex, Cursor, Windsurf, or another coding agent:

Text
Build me a resumable Python script that backfills the full Hyperliquid candle archive from Dwellir for one market and one interval at a time, using a multi-process HTTP/2 client that can sustain several thousand requests per second.

Requirements:
- First check whether the `dwellir` CLI is installed.
- If it is not installed, recommend installing it with one of these options:
  - `brew tap dwellir-public/homebrew-tap && brew install dwellir`
  - `curl -fsSL https://raw.githubusercontent.com/dwellir-public/cli/main/scripts/install.sh | sh`
- After installation, ask me to authenticate it by running `dwellir auth login`.
- Check authentication with `dwellir auth status`.
- Get an enabled API key with `dwellir keys list --toon`. If there are multiple good candidates, ask me which key to use.
- Discover the Hyperliquid Index endpoint with `dwellir endpoints search hyperliquid --ecosystem hyperliquid --network mainnet --key <name> --toon`.
- Use the Dwellir Hyperliquid REST candles endpoint at `api-hyperliquid-index.n.dwellir.com`.
- Export one market and one interval at a time.
- Use `httpx` with `http2=True` (not `requests`). A single Python asyncio process caps around `1000 RPS` regardless of transport; HTTP/2 multi-process is what scales past that.
- Split the time range into N worker processes using `multiprocessing.spawn`. Each worker runs its own asyncio event loop and its own `httpx.AsyncClient(http2=True)` pool.
- Warm separate HTTP/2 connections per worker so they pick up distinct `DWSESSION` cookies from the platform's backend router.
- Within each worker, run many concurrent async tasks that share the worker's HTTP/2 connections; HTTP/2 multiplexing lets one connection carry many in-flight streams.
- Apply a token-bucket rate limiter per worker so the aggregate request rate matches the plan ceiling (Scale: `5000 RPS`, Growth: `500 RPS`, Developer: `100 RPS`, Free: `20 RPS`).
- For the Scale plan, 8 workers is a good default. For Free/Developer/Growth, a single worker is enough.
- Treat the data as sparse: if a bucket returns `404`, count it and skip it rather than fabricating a candle.
- The OHLCV archive starts at `2025-07-27T08:00:00Z`.
- Iterate bucket-open timestamps between a start and end time.
- Support `1s`, `1m`, and `5m`.
- Write per-shard CSVs with columns: `s, i, t, T, o, h, l, c, v, q, n, x`.
- Persist resume state per shard so the export can continue after an interruption.
- Make the resume logic safe if the process stops between writing a CSV row and updating the shard state files.
- Write the shard state files atomically so a partial write does not break the next run.
- Merge the shard outputs back into one CSV sorted by the candle open time `t` when the run completes.
- Show me how to run a short validation window first, then how to run a larger backfill for BTC `1m` and `1s`.

What You Will Learn

  • discover the Hyperliquid Index endpoint and your API key name with the Dwellir CLI
  • confirm the single-candle REST contract before starting a long backfill
  • export one market and one interval into a resumable CSV dataset
  • handle sparse 404 buckets without fabricating candles
  • run a multi-process HTTP/2 exporter that sustains close to 5000 RPS
  • resume a large archive download after an interruption

Prerequisites

  • Python 3.10+
  • the dwellir CLI installed and authenticated
  • httpx with the HTTP/2 extras installed
  • a Dwellir API key name you can use with dwellir endpoints search ... --key <name>

Check your setup:

Bash
python --version
dwellir auth status
dwellir keys list --toon
pip install 'httpx[http2]'

If you do not have the CLI yet, install it with one of these:

Bash
curl -fsSL https://raw.githubusercontent.com/dwellir-public/cli/main/scripts/install.sh | sh

Discover the Endpoint and Verify the Contract

Start by listing your available keys, then ask the CLI to inject the key you want into the Hyperliquid Index endpoint URL:

Bash
dwellir keys list --toon
dwellir endpoints search hyperliquid --ecosystem hyperliquid --network mainnet --key YOUR_KEY_NAME --toon

You should see a Hyperliquid HyperCore Index entry with an HTTPS URL like:

Text
https://api-hyperliquid-index.n.dwellir.com/YOUR_API_KEY

Now verify the REST contract with a single known candle:

Bash
curl "https://api-hyperliquid-index.n.dwellir.com/YOUR_API_KEY/v1/candles?market=BTC&interval=1m&time=2026-03-30T23:00:00Z"

You should get one candle back:

JSON
{
  "s": "BTC",
  "i": "1m",
  "t": 1774911600000,
  "T": 1774911659999,
  "o": "66745",
  "h": "66746",
  "l": "66687",
  "c": "66687",
  "v": "4.78586",
  "q": "319316.665",
  "n": 256,
  "x": true
}

Important: the current route is still one candle per response even if you add range-like params such as limit, bars, after, before, start, or end. For archival retrieval, iterate bucket-open timestamps and issue one request per bucket.

Why Multi-Process + HTTP/2

A single Python asyncio process backfilling this endpoint caps at around 1000 useful RPS, regardless of whether you use requests, aiohttp, or httpx. That cap is a per-process ceiling — not an endpoint throttle, not an RTT bound, not a per-source-IP throttle.

Running the same workload across several processes scales past that cap. On the same workstation:

  • 1 process: about 1000–1300 RPS
  • 8 processes: about 5500 RPS aggregate
  • 12 processes: about 5400 RPS aggregate (diminishing returns above 8)

HTTP/2 matters too. In an apples-to-apples test of 8 parallel processes, httpx with http2=True sustained about 5500 RPS aggregate, while aiohttp over HTTP/1.1 reached about 3100 and threaded requests over HTTP/1.1 only 2600. HTTP/2 stream multiplexing means each TCP connection can carry many in-flight requests, which is why fewer connections hit higher aggregate throughput.

The exporter below therefore combines both: multiple worker processes, each running httpx with http2=True and discovering its own DWSESSION-pinned connections. That is what lets a single customer machine sit close to the Scale plan's 5000 RPS ceiling.

Use a Small Pool of Sticky HTTP/2 Connections Per Worker

Dwellir uses the DWSESSION cookie to keep a client pinned to one backend. That is useful for consistency, but it also means a single long-lived connection can spend an entire archive pull on one backend.

The exporter below takes a middle path:

  • it spawns a small number of worker processes
  • each worker warms a small pool of httpx.AsyncClient(http2=True) connections and keeps one per discovered DWSESSION backend when possible
  • each HTTP/2 connection carries many concurrent streams inside the worker
  • each worker gets one contiguous time shard of the archive
  • the shard CSVs are merged back into one ordered output file at the end

That keeps connection count low, spreads work across the available backends, and still lets one process sustain close to its per-process ceiling.

Choose the Time Window You Want to Backfill

The exporter in this guide works on one market and one interval at a time.

Use these rules when you pick your window:

  • the intended archive floor is 2025-07-27T08:00:00Z
  • 1s data must be aligned to exact seconds
  • 1m data must be aligned to exact minutes
  • 5m data must be aligned to 00, 05, 10, and so on
  • candles are sparse, so some buckets will return 404 and should be skipped
  • 404s are more common on 1s than on 1m or 5m but can appear on any interval

For rough planning, 30 days of one market is approximately:

IntervalTheoretical buckets in 30 days
1s2,592,000
1m43,200
5m8,640

Build a Resumable Backfill Script

Save this as backfill_hyperliquid_ohlcv.py:

Python
"""Resumable Hyperliquid OHLCV backfill via the Dwellir Hyperliquid Index endpoint.

One market and one interval per run. Splits the time range into N worker
processes, each running an asyncio + HTTP/2 (httpx) loop with sticky-session
discovery for backend pinning. Writes one CSV part per shard plus an atomic
resume state file. Merges parts into one ordered CSV when the run completes.

Usage:

    python backfill_hyperliquid_ohlcv.py \\
        --api-key $DWELLIR_API_KEY \\
        --market BTC --interval 1m \\
        --start 2026-03-01T00:00:00Z --end 2026-04-01T00:00:00Z \\
        --output btc-1m.csv \\
        --rate 5000 --workers 8

Per-plan recommended settings:

    Free       --rate 20    --workers 1
    Developer  --rate 100   --workers 1
    Growth     --rate 500   --workers 1
    Scale      --rate 5000  --workers 8
"""
from __future__ import annotations

import argparse
import asyncio
import csv
import json
import multiprocessing as mp
import os
import sys
import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path

import httpx

ENDPOINT_HOST = "https://api-hyperliquid-index.n.dwellir.com"
ARCHIVE_FLOOR = "2025-07-27T08:00:00Z"

INTERVAL_STEP = {
    "1s": timedelta(seconds=1),
    "1m": timedelta(minutes=1),
    "5m": timedelta(minutes=5),
}

CSV_FIELDS = ["s", "i", "t", "T", "o", "h", "l", "c", "v", "q", "n", "x"]


def parse_utc(value: str) -> datetime:
    return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)


def format_utc(value: datetime) -> str:
    return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")


def split_range(start: datetime, end: datetime, step: timedelta, n_shards: int):
    total = max(1, int((end - start) / step))
    n_shards = max(1, min(n_shards, total))
    base, rem = divmod(total, n_shards)
    shards = []
    cursor = start
    for i in range(n_shards):
        size = base + (1 if i < rem else 0)
        shard_end = cursor + step * size
        if shard_end > end:
            shard_end = end
        shards.append((cursor, shard_end))
        cursor = shard_end
    return shards


def atomic_write(path: Path, payload: str):
    tmp = path.with_name(path.name + ".tmp")
    tmp.write_text(payload)
    tmp.replace(path)


def write_state(path: Path, payload: dict):
    atomic_write(path, json.dumps(payload, indent=2, sort_keys=True) + "\n")


def load_state(path: Path):
    if not path.exists():
        return None
    try:
        return json.loads(path.read_text())
    except json.JSONDecodeError:
        return None


class TokenBucketAsync:
    def __init__(self, rate_per_second: float):
        self.rate = float(rate_per_second)
        self.capacity = max(self.rate, 1.0)
        self.tokens = self.capacity
        self.last = time.monotonic()

    async def acquire(self):
        while True:
            now = time.monotonic()
            self.tokens = min(self.capacity, self.tokens + (now - self.last) * self.rate)
            self.last = now
            if self.tokens >= 1.0:
                self.tokens -= 1.0
                return
            await asyncio.sleep((1.0 - self.tokens) / self.rate)


async def warm_clients(base: str, market: str, interval: str, probe_time: datetime,
                       desired: int, max_attempts: int):
    seen = set()
    out = []
    for _ in range(max_attempts):
        if len(out) >= desired:
            break
        limits = httpx.Limits(max_connections=200, max_keepalive_connections=200)
        client = httpx.AsyncClient(http2=True, timeout=30.0, limits=limits)
        try:
            r = await client.get(
                f"{base}/v1/candles",
                params={"market": market, "interval": interval, "time": format_utc(probe_time)},
            )
        except Exception:
            await client.aclose()
            continue
        if r.status_code not in (200, 404):
            await client.aclose()
            continue
        backend = r.cookies.get("DWSESSION") or "unknown"
        if backend in seen:
            await client.aclose()
            continue
        seen.add(backend)
        out.append((backend, client))
    if not out:
        out.append(("unknown", httpx.AsyncClient(http2=True, timeout=30.0)))
    return out


@dataclass
class ShardResult:
    shard_index: int
    fetched: int
    skipped: int
    errors: int
    seconds: float
    sticky_backends: list


async def shard_worker(shard_index, base, market, interval, shard_start, shard_end,
                       rate_per_worker, sticky_per_worker, workers_per_client,
                       csv_path, state_path):
    """Process one time shard with crash-safe resume.

    Correctness invariants:
    - The state file's `next_time` is always backed by the highest *contiguous*
      completed bucket, never just the max in-flight completion. Async tasks
      complete out of order, so a max-watermark would skip holes on resume.
    - Candles reach the CSV before they are counted in state. Consumers only
      append to in-memory buffers; a periodic flusher writes the CSV first,
      then updates counters and advances the frontier, then writes state.
    - The merge step deduplicates by candle open time `t` to absorb the rare
      duplicate that can occur if the process is killed between flushing the
      CSV and writing the new state.
    """
    step = INTERVAL_STEP[interval]
    state = load_state(state_path)
    cursor = shard_start
    fetched = skipped = errors = 0
    if state:
        cursor = parse_utc(state["next_time"])
        fetched = state.get("fetched", 0)
        skipped = state.get("skipped", 0)
        errors = state.get("errors", 0)
    if cursor >= shard_end:
        return ShardResult(shard_index, fetched, skipped, errors, 0.0, [])

    clients = await warm_clients(base, market, interval, shard_end - step,
                                 sticky_per_worker, sticky_per_worker * 3)
    backends = [b for b, _ in clients]

    if not csv_path.exists() or csv_path.stat().st_size == 0:
        with csv_path.open("w", newline="") as f:
            csv.DictWriter(f, fieldnames=CSV_FIELDS).writeheader()

    queue: asyncio.Queue = asyncio.Queue()
    cur = cursor
    while cur < shard_end:
        queue.put_nowait(cur)
        cur += step
    if queue.qsize() == 0:
        for _, c in clients:
            await c.aclose()
        return ShardResult(shard_index, fetched, skipped, errors, 0.0, backends)

    limiter = None if rate_per_worker is None else TokenBucketAsync(float(rate_per_worker))

    write_buffer: list = []
    completion_buffer: list = []  # (ts, "ok" | "404" | "err")
    completed_kind: dict = {}  # ts -> kind, for completions past the frontier
    counters = {"fetched": fetched, "skipped": skipped, "errors": errors}
    frontier = cursor - step  # last contiguous-completed bucket
    stop_event = asyncio.Event()

    def write_checkpoint(*, completed: bool = False) -> None:
        write_state(state_path, {
            "shard_index": shard_index,
            "interval": interval, "market": market,
            "next_time": format_utc(shard_end if completed else frontier + step),
            "fetched": counters["fetched"],
            "skipped": counters["skipped"],
            "errors": counters["errors"],
            "range_start": format_utc(shard_start),
            "range_end": format_utc(shard_end),
            **({"completed": True} if completed else {}),
        })

    def flush_once() -> None:
        # Snapshot buffers atomically — these statements have no awaits, so
        # asyncio cannot interleave a consumer between the snapshot and the
        # clear, and a consumer's two buffer appends always land together.
        nonlocal frontier
        to_write = write_buffer[:]
        write_buffer.clear()
        to_complete = completion_buffer[:]
        completion_buffer.clear()
        if not to_write and not to_complete:
            return

        # Make candles durable BEFORE counting them — otherwise an interrupt
        # between the count and the disk write would silently lose data.
        if to_write:
            with csv_path.open("a", newline="") as f:
                w = csv.DictWriter(f, fieldnames=CSV_FIELDS)
                for row in to_write:
                    w.writerow(row)

        # Defer counting until the frontier advances through each timestamp,
        # so the persisted counters match exactly what the frontier covers.
        # Without this, an interrupt would persist counts for ahead-of-frontier
        # completions; resume would re-fetch and double-count them.
        for ts, kind in to_complete:
            completed_kind[ts] = kind

        next_ts = frontier + step
        while next_ts in completed_kind:
            kind = completed_kind.pop(next_ts)
            if kind == "ok":
                counters["fetched"] += 1
            elif kind == "404":
                counters["skipped"] += 1
            else:
                counters["errors"] += 1
            frontier = next_ts
            next_ts += step

        write_checkpoint()

    async def flush_loop():
        while not stop_event.is_set():
            try:
                await asyncio.wait_for(stop_event.wait(), timeout=1.0)
                break
            except asyncio.TimeoutError:
                pass
            try:
                flush_once()
            except Exception:
                pass

    async def consumer(client):
        url = f"{base}/v1/candles"
        while True:
            try:
                ts = queue.get_nowait()
            except asyncio.QueueEmpty:
                return
            if limiter is not None:
                await limiter.acquire()
            params = {"market": market, "interval": interval, "time": format_utc(ts)}
            try:
                r = await client.get(url, params=params)
                if r.status_code == 200:
                    try:
                        candle = r.json()
                        # These two appends MUST be back-to-back with no await
                        # between them so the flusher cannot snapshot a CSV
                        # candle without its matching completion entry.
                        write_buffer.append(candle)
                        completion_buffer.append((ts, "ok"))
                    except Exception:
                        completion_buffer.append((ts, "err"))
                elif r.status_code == 404:
                    completion_buffer.append((ts, "404"))
                else:
                    completion_buffer.append((ts, "err"))
            except Exception:
                completion_buffer.append((ts, "err"))

    t0 = time.monotonic()
    flusher = asyncio.create_task(flush_loop())
    tasks = []
    for _, c in clients:
        for _ in range(workers_per_client):
            tasks.append(asyncio.create_task(consumer(c)))
    try:
        await asyncio.gather(*tasks)
    finally:
        try:
            flush_once()
        except Exception:
            pass
        stop_event.set()
        try:
            await flusher
        except Exception:
            pass
        for _, c in clients:
            try:
                await c.aclose()
            except Exception:
                pass

    write_checkpoint(completed=True)

    return ShardResult(shard_index, counters["fetched"], counters["skipped"],
                      counters["errors"], time.monotonic() - t0, backends)


def shard_entry(queue_mp, shard_index, api_key, market, interval, shard_start_iso,
                shard_end_iso, rate_per_worker, sticky_per_worker, workers_per_client,
                csv_path, state_path):
    base = f"{ENDPOINT_HOST}/{api_key}"
    try:
        result = asyncio.run(shard_worker(
            shard_index, base, market, interval,
            parse_utc(shard_start_iso), parse_utc(shard_end_iso),
            rate_per_worker, sticky_per_worker, workers_per_client,
            Path(csv_path), Path(state_path),
        ))
        queue_mp.put(("ok", shard_index, {
            "fetched": result.fetched,
            "skipped": result.skipped,
            "errors": result.errors,
            "seconds": result.seconds,
            "sticky_backends": result.sticky_backends,
        }))
    except Exception as exc:
        queue_mp.put(("err", shard_index, repr(exc)))


def merge_parts(part_paths, final_output):
    """Merge per-shard CSVs into one file ordered by candle open time `t`.

    Shards write rows concurrently so they arrive unsorted. We collect the
    combined rows in memory, deduplicate by `t` (the rare duplicate can occur
    when a process is killed between flushing the CSV and writing the new
    state — the row is durable but the state still asks for it on resume),
    and sort before writing the final file. A month of `1s` with ~30%
    materialized is roughly 800k rows.
    """
    by_t: dict = {}
    for p in part_paths:
        if not p.exists():
            continue
        with p.open(newline="") as src:
            for row in csv.DictReader(src):
                by_t[int(row["t"])] = row
    rows = [by_t[k] for k in sorted(by_t)]
    tmp = final_output.with_name(final_output.name + ".tmp")
    with tmp.open("w", newline="") as out:
        w = csv.DictWriter(out, fieldnames=CSV_FIELDS)
        w.writeheader()
        w.writerows(rows)
    tmp.replace(final_output)


def discover_api_key(env_var="DWELLIR_API_KEY", key_name="support_bot"):
    val = os.environ.get(env_var)
    if val:
        return val
    import subprocess
    try:
        out = subprocess.check_output(["dwellir", "keys", "list", "--toon"], text=True)
    except (FileNotFoundError, subprocess.CalledProcessError):
        raise SystemExit(f"set {env_var} or install/auth the `dwellir` CLI")
    for line in out.splitlines():
        if f",{key_name}," in line and ",true," in line:
            return line.split(",", 1)[0].strip()
    raise SystemExit(f"no enabled key '{key_name}' via dwellir CLI; pass --api-key or set {env_var}")


def main():
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    parser.add_argument("--api-key", default=None,
                        help="Dwellir API key UUID; defaults to $DWELLIR_API_KEY or `dwellir keys list`")
    parser.add_argument("--api-key-name", default="support_bot",
                        help="key name to look up via `dwellir keys list` if --api-key not given")
    parser.add_argument("--market", required=True)
    parser.add_argument("--interval", required=True, choices=sorted(INTERVAL_STEP))
    parser.add_argument("--start", required=True,
                        help=f"ISO-8601 UTC start (inclusive); archive floor is {ARCHIVE_FLOOR}")
    parser.add_argument("--end", required=True, help="ISO-8601 UTC end (exclusive)")
    parser.add_argument("--output", required=True, type=Path)
    parser.add_argument("--rate", type=int, default=5000,
                        help="aggregate request-rate cap (RPS); match your plan ceiling")
    parser.add_argument("--workers", type=int, default=8,
                        help="worker processes (Scale=8, Growth/Developer/Free=1)")
    parser.add_argument("--sticky-per-worker", type=int, default=4)
    parser.add_argument("--workers-per-client", type=int, default=128)
    args = parser.parse_args()

    api_key = args.api_key or discover_api_key(key_name=args.api_key_name)
    start = parse_utc(args.start)
    end = parse_utc(args.end)
    step = INTERVAL_STEP[args.interval]
    if end <= start:
        raise SystemExit("--end must be after --start")
    if end > datetime.now(tz=timezone.utc) - step:
        raise SystemExit(f"--end must be at most one bucket-step ({step}) before now")

    workers = max(1, args.workers)
    rate_per_worker = None if args.rate <= 0 else args.rate / workers
    shards = split_range(start, end, step, workers)

    parts_dir = args.output.with_suffix(args.output.suffix + ".parts")
    parts_dir.mkdir(parents=True, exist_ok=True)
    print(f"backfill: market={args.market} interval={args.interval} "
          f"buckets={int((end - start)/step)} shards={len(shards)} "
          f"rate={args.rate} workers={workers}", file=sys.stderr)

    ctx = mp.get_context("spawn")
    queue_mp = ctx.Queue()
    procs = []
    csv_paths = []
    for i, (s, e) in enumerate(shards):
        csv_path = parts_dir / f"shard-{i:03d}.csv"
        state_path = parts_dir / f"shard-{i:03d}.state.json"
        csv_paths.append(csv_path)
        p = ctx.Process(
            target=shard_entry,
            args=(queue_mp, i, api_key, args.market, args.interval,
                  format_utc(s), format_utc(e), rate_per_worker,
                  args.sticky_per_worker, args.workers_per_client,
                  str(csv_path), str(state_path)),
        )
        p.start()
        procs.append(p)

    t0 = time.monotonic()
    results = []
    failed = []
    for _ in range(len(procs)):
        tag, sid, payload = queue_mp.get()
        if tag == "ok":
            results.append((sid, payload))
            r = payload
            print(f"  shard {sid:>3} done: fetched={r['fetched']} skipped={r['skipped']} "
                  f"errors={r['errors']} {r['seconds']:.1f}s backends={r['sticky_backends']}",
                  file=sys.stderr)
        else:
            failed.append((sid, payload))
            print(f"  shard {sid:>3} ERROR: {payload}", file=sys.stderr)
    for p in procs:
        p.join(timeout=10.0)

    if failed:
        print(f"FAILED: {len(failed)} shard(s) errored — partial CSV in {parts_dir}",
              file=sys.stderr)
        sys.exit(1)

    total_fetched = sum(r["fetched"] for _, r in results)
    total_skipped = sum(r["skipped"] for _, r in results)
    total_errors = sum(r["errors"] for _, r in results)
    wall = time.monotonic() - t0
    useful = total_fetched + total_skipped

    merge_parts(csv_paths, args.output)
    print(f"merged {len(csv_paths)} shards -> {args.output}", file=sys.stderr)
    print(f"summary: fetched={total_fetched} skipped(404)={total_skipped} "
          f"errors={total_errors} wall={wall:.1f}s useful_rps={useful/max(wall,1e-9):.1f}",
          file=sys.stderr)


if __name__ == "__main__":
    main()

This script does six things that matter for fast archive retrieval:

  • splits the time range into one contiguous shard per worker process
  • warms a small pool of httpx HTTP/2 connections per worker, keeping one per discovered DWSESSION backend when possible
  • runs many concurrent async tasks inside each worker, sharing the worker's HTTP/2 connections via stream multiplexing
  • treats 404 as an empty sparse bucket
  • keeps one state file per shard so you can resume long runs without replaying the last written candle, with atomic state writes
  • merges the shard CSVs back into one output file sorted by candle open time

Run a Short Validation Window First

Before you launch a long archive job, validate the script on a bounded window using a single worker:

Bash
python backfill_hyperliquid_ohlcv.py \
  --api-key YOUR_API_KEY \
  --market BTC --interval 1m \
  --start 2026-03-30T22:56:00Z --end 2026-03-30T23:00:00Z \
  --output btc-1m-validation.csv \
  --rate 500 --workers 1

You should see output like:

Text
backfill: market=BTC interval=1m buckets=4 shards=1 rate=500 workers=1
  shard   0 done: fetched=4 skipped=0 errors=0 0.3s backends=['juju-...']
merged 1 shards -> btc-1m-validation.csv
summary: fetched=4 skipped(404)=0 errors=0 wall=3.5s useful_rps=1.1

The resulting CSV should start like this:

csv
s,i,t,T,o,h,l,c,v,q,n,x
BTC,1m,1774911360000,1774911419999,66737,66738,66731,66735,0.95317,63618.157,103,True
BTC,1m,1774911420000,1774911479999,66735,66748,66724,66747,2.80108,186911.222,130,True
BTC,1m,1774911480000,1774911539999,66748,66755,66747,66754,16.60528,1108386.895,174,True

Backfill a Larger Archive

Once the validation window looks correct, extend the time range and pick a worker count that matches your plan.

Example: one month of BTC 1m candles on the Scale plan:

Bash
python backfill_hyperliquid_ohlcv.py \
  --api-key YOUR_API_KEY \
  --market BTC --interval 1m \
  --start 2026-03-01T00:00:00Z --end 2026-03-31T00:00:00Z \
  --output btc-1m-march-2026.csv \
  --rate 5000 --workers 8

Example: one month of BTC 1s candles on the Scale plan:

Bash
python backfill_hyperliquid_ohlcv.py \
  --api-key YOUR_API_KEY \
  --market BTC --interval 1s \
  --start 2026-03-01T00:00:00Z --end 2026-03-31T00:00:00Z \
  --output btc-1s-march-2026.csv \
  --rate 5000 --workers 8

Example: one hour of BTC 1s candles on the Growth plan:

Bash
python backfill_hyperliquid_ohlcv.py \
  --api-key YOUR_API_KEY \
  --market BTC --interval 1s \
  --start 2026-03-30T23:00:00Z --end 2026-03-30T23:59:59Z \
  --output btc-1s-2026-03-30T23.csv \
  --rate 500 --workers 1

In a sampled live check against the endpoint, a 1s minute window returned 50 candles and 10 sparse 404 gaps. That is normal. The archive is sparse by design, and 404s count toward your plan rate even though no candle is materialized.

Putting It All Together

The full archival workflow is:

  1. discover your API key name with dwellir keys list --toon
  2. confirm the index endpoint with dwellir endpoints search hyperliquid --ecosystem hyperliquid --network mainnet --key YOUR_KEY_NAME --toon
  3. validate one known candle with curl
  4. run a short backfill window with --workers 1
  5. scale to --workers 8 --rate 5000 for a Scale-plan run across the full date range
  6. resume from the shard state files in the .parts/ work directory if the job stops midway

When the run is complete, you will have:

  • a CSV containing only materialized candles in the public s i t T o h l c v q n x schema, sorted by candle open time
  • a .parts/ work directory with one part CSV and one state file per shard
  • a repeatable pull process for any supported market slug and interval

Rough Extraction Times

For planning purposes, these are good rough estimates for pulling one full market with the multi-process exporter in this guide. Times assume the exporter sustains close to each plan's request-rate ceiling; we measured roughly 4500–5000 useful RPS on the Scale plan from a single workstation with --workers 8.

IntervalFree 20 RPSDeveloper 100 RPSGrowth 500 RPSScale 5000 RPS
5mabout 7 minutesabout 1.5 minutesabout 20 secondsabout 3 seconds
1mabout 36 minutesabout 7 minutesabout 1.5 minutesabout 10 seconds
1sdaily cap dominatesabout 7 hoursabout 1.5 hoursabout 10 minutes

Treat these as rough extraction times, not guarantees. Actual runtime varies with archive sparsity, retry behavior, and the number of backends available behind the endpoint.

For the Free plan, the 100,000 daily response limit dominates once the archive gets large enough. A full month of 1s is roughly 2,600,000 candles, so the daily cap is the real ceiling, not the 20 RPS rate.

Going to Production

For long or repeated archive pulls, tighten the workflow before you automate it:

  • keep the API key in an environment variable or secret manager instead of shell history
  • split very large jobs into date partitions such as daily or weekly windows
  • match --workers and --rate to your plan: 8 and 5000 for Scale, 1 and your plan's rate for Free/Developer/Growth
  • write outputs to a durable location before merging them downstream
  • treat the CSV as sparse source data and densify later only if your analytics pipeline needs it
  • add retry and backoff logic around transient 5xx or network errors if you see a non-zero error count

If you need a typed analytics format after the backfill is complete, convert the CSV to Parquet as a second step instead of changing the retrieval pattern.

Next Steps