⚠️Blast API (blastapi.io) ends Oct 31. Migrate to Dwellir and skip Alchemy's expensive compute units.
Switch Today →
Skip to main content

Coming soon: Need support for this? Email support@dwellir.com if you want early access

Real-Time Streaming

Real-time transaction streaming provides millisecond-latency access to finalized Aptos transactions as they occur on-chain, enabling responsive applications, trading systems, monitoring tools, and live dashboards. This feature delivers transactions with minimal delay after consensus, significantly faster than polling-based approaches.

Overview#

Real-time streaming subscribes to the head of the blockchain, delivering transactions immediately as they are finalized by consensus. This approach provides consistent low-latency updates without the overhead and inconsistency of polling, ideal for applications requiring immediate awareness of on-chain state changes.

Basic Real-Time Subscription#

import { TransactionStreamClient } from "./generated/aptos_stream";
import { credentials, Metadata } from "@grpc/grpc-js";

const client = new TransactionStreamClient(
"stream.aptos.dwellir.com:443",
credentials.createSsl()
);

const metadata = new Metadata();
metadata.add("authorization", `Bearer ${process.env.API_KEY}`);

// Subscribe to head of chain (no starting version = latest)
const request = {
includeEvents: true,
includeChanges: true
};

const stream = client.subscribe(request, metadata);

stream.on("data", (transaction) => {
console.log(`New transaction: ${transaction.hash}`);
console.log(`Version: ${transaction.version}`);
console.log(`Timestamp: ${transaction.timestamp}`);

// Process transaction in real-time
processTransaction(transaction);
});

stream.on("error", (error) => {
console.error("Stream error:", error);
// Implement reconnection logic
});

stream.on("end", () => {
console.log("Stream ended");
// Reconnect
});

Real-World Use Cases#

  1. Trading Bots: Execute arbitrage or market-making strategies based on millisecond-fresh DEX swap data and price movements.

  2. Live Dashboards: Display real-time protocol metrics, transaction volumes, gas prices, and network activity without refresh delays.

  3. Notification Systems: Send instant push notifications when users receive payments, NFTs transfer, or important events occur.

  4. Gaming Applications: Update game state, leaderboards, and player inventories immediately as transactions finalize on-chain.

  5. Security Monitoring: Detect suspicious patterns, unusual transactions, or potential attacks in real-time for immediate response.

  6. Price Oracles: Publish up-to-the-second price feeds by processing DEX trades and liquidity changes as they happen.

Best Practices#

Handle Reconnections Gracefully: Implement automatic reconnection with exponential backoff when streams disconnect to maintain continuous coverage.

Track Last Processed Version: Persist the last successfully processed version to resume streams without gaps after restarts.

Implement Buffering: Buffer incoming transactions during processing spikes to prevent backpressure and dropped connections.

Monitor Stream Health: Track message rates, latency, and error rates to detect and respond to degradation quickly.

Use Asynchronous Processing: Process transactions asynchronously to avoid blocking the stream and falling behind.

Set Appropriate Timeouts: Configure keepalive and timeout values to detect dead connections quickly.

Handle Duplicates: Implement idempotent processing since network issues may cause duplicate transaction deliveries.

Event Filtering#

class RealTimeProcessor {
private stream: ClientReadableStream;

async start() {
const request = {
includeEvents: true,
// Optional: filter by transaction type
transactionFilters: {
userTransaction: true,
genesisTransaction: false,
blockMetadataTransaction: false,
stateCheckpointTransaction: false
}
};

this.stream = client.subscribe(request, metadata);

this.stream.on("data", (tx) => {
// Further filter by event type
if (this.isRelevantTransaction(tx)) {
this.processTransaction(tx);
}
});
}

private isRelevantTransaction(tx: Transaction): boolean {
// Filter by function calls
if (tx.payload?.function) {
return tx.payload.function.startsWith("0x1::coin::transfer");
}

// Filter by events
if (tx.events) {
return tx.events.some(event =>
event.type.includes("WithdrawEvent") ||
event.type.includes("DepositEvent")
);
}

return false;
}
}

Latency Optimization#

class LowLatencyProcessor {
private processingQueue: Transaction[] = [];
private processing: boolean = false;

onTransaction(tx: Transaction) {
this.processingQueue.push(tx);

if (!this.processing) {
this.processBatch();
}
}

private async processBatch() {
this.processing = true;

while (this.processingQueue.length > 0) {
const batch = this.processingQueue.splice(0, 100);

// Process batch in parallel
await Promise.all(
batch.map(tx => this.processTransactionFast(tx))
);
}

this.processing = false;
}

private async processTransactionFast(tx: Transaction) {
// Minimal processing for lowest latency
const essential = this.extractEssentialData(tx);
await this.fastWrite(essential);
this.emitEvent(essential);
}
}

Connection Management#

class StreamManager {
private stream: ClientReadableStream | null = null;
private reconnectAttempts: number = 0;
private maxReconnectAttempts: number = 10;

async connect() {
try {
this.stream = client.subscribe(request, metadata);

this.stream.on("data", (tx) => {
this.reconnectAttempts = 0; // Reset on successful data
this.handleTransaction(tx);
});

this.stream.on("error", (error) => {
console.error("Stream error:", error);
this.reconnect();
});

this.stream.on("end", () => {
console.log("Stream ended");
this.reconnect();
});

} catch (error) {
console.error("Connection error:", error);
this.reconnect();
}
}

private async reconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error("Max reconnection attempts reached");
return;
}

this.reconnectAttempts++;
const backoff = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);

console.log(`Reconnecting in ${backoff}ms (attempt ${this.reconnectAttempts})`);

await this.sleep(backoff);
await this.connect();
}

private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}

disconnect() {
if (this.stream) {
this.stream.cancel();
this.stream = null;
}
}
}

Metrics and Monitoring#

class StreamMetrics {
private lastTransactionTime: number = Date.now();
private transactionCount: number = 0;
private latencies: number[] = [];

recordTransaction(tx: Transaction, receivedAt: number) {
this.transactionCount++;
this.lastTransactionTime = receivedAt;

// Calculate latency (time from tx timestamp to receipt)
const txTime = new Date(tx.timestamp).getTime();
const latency = receivedAt - txTime;
this.latencies.push(latency);

// Keep only recent latencies
if (this.latencies.length > 1000) {
this.latencies.shift();
}
}

getMetrics() {
const avgLatency = this.latencies.reduce((a, b) => a + b, 0) / this.latencies.length;
const timeSinceLastTx = Date.now() - this.lastTransactionTime;

return {
totalProcessed: this.transactionCount,
averageLatency: avgLatency.toFixed(0) + "ms",
timeSinceLastTransaction: timeSinceLastTx + "ms",
isHealthy: timeSinceLastTx < 5000 // Alert if > 5s without tx
};
}
}

Performance Considerations#

  • Network Proximity: Deploy processors close to streaming endpoints to minimize network latency
  • Concurrent Processing: Use worker pools to process transactions in parallel without blocking the stream
  • Memory Management: Implement backpressure handling to prevent memory exhaustion during bursts
  • Database Optimization: Use connection pooling and bulk operations for database writes
  • Caching: Cache frequently accessed data to avoid database lookups during processing