Coming soon: Need support for this? Email support@dwellir.com if you want early access
Historical Replay
Historical replay enables replaying transactions from any point in blockchain history, essential for backfilling indexes, auditing past events, rebuilding state, and analyzing historical patterns. This feature provides efficient access to the entire transaction history without running full archive nodes.
Overview#
The streaming API supports replaying transactions from genesis or any specific version forward, delivering historical data with the same performance and reliability as real-time streams. This enables applications to build complete indexes from scratch, verify historical states, or analyze long-term trends efficiently.
Starting from Specific Versions#
import { TransactionStreamClient } from "./generated/aptos_stream";
const client = new TransactionStreamClient(
"stream.aptos.dwellir.com:443",
credentials.createSsl()
);
// Replay from specific version
const request = {
startingVersion: 100000000n, // Start from version 100M
includeEvents: true,
includeChanges: true
};
const stream = client.subscribe(request, metadata);
stream.on("data", (transaction) => {
console.log(`Processing historical transaction: ${transaction.version}`);
// Process transaction
});
Genesis Replay#
// Replay from genesis (version 0)
const genesisRequest = {
startingVersion: 0n,
includeEvents: true,
includeChanges: false // Optimize bandwidth if changes not needed
};
const genesisStream = client.subscribe(genesisRequest, metadata);
let processedCount = 0;
const startTime = Date.now();
genesisStream.on("data", (transaction) => {
processedCount++;
if (processedCount % 10000 === 0) {
const elapsed = (Date.now() - startTime) / 1000;
const rate = processedCount / elapsed;
console.log(`Processed ${processedCount} transactions (${rate.toFixed(0)} tx/s)`);
}
});
Real-World Use Cases#
-
Index Backfilling: Build complete indexes from genesis when launching new applications or adding new index types to existing systems.
-
Data Migration: Migrate historical data from one database or schema to another by replaying transactions through updated processors.
-
Historical Analysis: Analyze long-term trends, patterns, and statistics by processing years of blockchain history efficiently.
-
Audit and Compliance: Replay specific time periods to audit transactions, verify compliance, or investigate historical events.
-
State Reconstruction: Rebuild application state from scratch by replaying all relevant transactions to verify correctness or recover from corruption.
-
Research and Analytics: Process complete blockchain history for academic research, market analysis, or protocol performance studies.
Best Practices#
Checkpoint Regularly: Save progress frequently during historical replay to enable resumption without reprocessing millions of transactions.
Batch Processing: Process transactions in batches and commit to database periodically rather than per-transaction to maximize throughput.
Resource Management: Monitor memory usage and implement buffering strategies to handle high-throughput replay without overwhelming systems.
Progress Tracking: Implement detailed progress tracking with estimated completion times to monitor long-running historical replays.
Parallel Processing: Split historical ranges across multiple workers to parallelize processing and reduce total replay time.
Validate Completeness: Track version gaps and verify continuous coverage to ensure no transactions are missed during replay.
Optimize Queries: Use efficient database bulk insert operations and disable unnecessary indexes during replay for maximum throughput.
Batch Replay Implementation#
class HistoricalReplayProcessor {
private batchSize: number = 1000;
private buffer: Transaction[] = [];
async replayRange(startVersion: bigint, endVersion: bigint) {
const request = {
startingVersion: startVersion,
includeEvents: true
};
const stream = client.subscribe(request, metadata);
stream.on("data", async (transaction) => {
// Stop at end version
if (transaction.version > endVersion) {
stream.cancel();
await this.flushBuffer();
return;
}
this.buffer.push(transaction);
// Process in batches
if (this.buffer.length >= this.batchSize) {
await this.processBatch(this.buffer);
this.buffer = [];
}
});
}
private async processBatch(transactions: Transaction[]) {
const records = transactions.map(tx => this.transformTransaction(tx));
// Bulk insert
await this.database.bulkInsert("transactions", records);
// Update checkpoint
const lastVersion = transactions[transactions.length - 1].version;
await this.saveCheckpoint(lastVersion);
}
private async flushBuffer() {
if (this.buffer.length > 0) {
await this.processBatch(this.buffer);
}
}
}
Progress Monitoring#
class ReplayMonitor {
private startVersion: bigint;
private endVersion: bigint;
private currentVersion: bigint;
private startTime: number;
constructor(start: bigint, end: bigint) {
this.startVersion = start;
this.endVersion = end;
this.currentVersion = start;
this.startTime = Date.now();
}
update(version: bigint) {
this.currentVersion = version;
}
getProgress(): ReplayProgress {
const total = Number(this.endVersion - this.startVersion);
const processed = Number(this.currentVersion - this.startVersion);
const percent = (processed / total) * 100;
const elapsed = Date.now() - this.startTime;
const rate = processed / (elapsed / 1000);
const remaining = (total - processed) / rate;
return {
percent: percent.toFixed(2),
processed,
total,
rate: rate.toFixed(0),
eta: new Date(Date.now() + remaining * 1000).toISOString()
};
}
}
Parallel Replay Strategy#
async function parallelReplay(
startVersion: bigint,
endVersion: bigint,
workerCount: number
) {
const totalRange = endVersion - startVersion;
const rangePerWorker = totalRange / BigInt(workerCount);
const workers = [];
for (let i = 0; i < workerCount; i++) {
const workerStart = startVersion + (rangePerWorker * BigInt(i));
const workerEnd = i === workerCount - 1
? endVersion
: workerStart + rangePerWorker;
workers.push(replayRange(workerStart, workerEnd, i));
}
await Promise.all(workers);
}
async function replayRange(
start: bigint,
end: bigint,
workerId: number
) {
console.log(`Worker ${workerId}: Replaying ${start} to ${end}`);
const processor = new HistoricalReplayProcessor();
await processor.replayRange(start, end);
console.log(`Worker ${workerId}: Complete`);
}
Recovery and Resumption#
class ReplayState {
async saveCheckpoint(version: bigint, workerId: string) {
await db.upsert("replay_checkpoints", {
worker_id: workerId,
last_version: version.toString(),
updated_at: new Date()
});
}
async loadCheckpoint(workerId: string): Promise<bigint | null> {
const checkpoint = await db.findOne("replay_checkpoints", {
worker_id: workerId
});
return checkpoint ? BigInt(checkpoint.last_version) : null;
}
async resumeReplay(startVersion: bigint, endVersion: bigint, workerId: string) {
// Try to resume from last checkpoint
const checkpoint = await this.loadCheckpoint(workerId);
const resumeFrom = checkpoint || startVersion;
console.log(`Resuming from version ${resumeFrom}`);
return this.replayRange(resumeFrom, endVersion);
}
}
Performance Optimization#
- Disable Indexes: Drop or disable indexes during bulk replay, rebuild after completion
- Batch Commits: Commit database transactions in large batches (10k-100k records)
- Skip Validation: Disable expensive validations during replay since data is already validated
- Compression: Use compression for network transfer to reduce bandwidth usage
- SSD Storage: Use fast SSDs for database writes during high-throughput replay
Related Concepts#
- Streaming Overview - Introduction to streaming
- Real-Time Streaming - Processing current transactions
- Custom Processors - Building replay processors
- Authentication - Securing replay streams