Coming soon: Need support for this? Email support@dwellir.com if you want early access
Real-Time Streaming
Real-time transaction streaming provides millisecond-latency access to finalized Aptos transactions as they occur on-chain, enabling responsive applications, trading systems, monitoring tools, and live dashboards. This feature delivers transactions with minimal delay after consensus, significantly faster than polling-based approaches.
Overview#
Real-time streaming subscribes to the head of the blockchain, delivering transactions immediately as they are finalized by consensus. This approach provides consistent low-latency updates without the overhead and inconsistency of polling, ideal for applications requiring immediate awareness of on-chain state changes.
Basic Real-Time Subscription#
import { TransactionStreamClient } from "./generated/aptos_stream";
import { credentials, Metadata } from "@grpc/grpc-js";
const client = new TransactionStreamClient(
"stream.aptos.dwellir.com:443",
credentials.createSsl()
);
const metadata = new Metadata();
metadata.add("authorization", `Bearer ${process.env.API_KEY}`);
// Subscribe to head of chain (no starting version = latest)
const request = {
includeEvents: true,
includeChanges: true
};
const stream = client.subscribe(request, metadata);
stream.on("data", (transaction) => {
console.log(`New transaction: ${transaction.hash}`);
console.log(`Version: ${transaction.version}`);
console.log(`Timestamp: ${transaction.timestamp}`);
// Process transaction in real-time
processTransaction(transaction);
});
stream.on("error", (error) => {
console.error("Stream error:", error);
// Implement reconnection logic
});
stream.on("end", () => {
console.log("Stream ended");
// Reconnect
});
Real-World Use Cases#
-
Trading Bots: Execute arbitrage or market-making strategies based on millisecond-fresh DEX swap data and price movements.
-
Live Dashboards: Display real-time protocol metrics, transaction volumes, gas prices, and network activity without refresh delays.
-
Notification Systems: Send instant push notifications when users receive payments, NFTs transfer, or important events occur.
-
Gaming Applications: Update game state, leaderboards, and player inventories immediately as transactions finalize on-chain.
-
Security Monitoring: Detect suspicious patterns, unusual transactions, or potential attacks in real-time for immediate response.
-
Price Oracles: Publish up-to-the-second price feeds by processing DEX trades and liquidity changes as they happen.
Best Practices#
Handle Reconnections Gracefully: Implement automatic reconnection with exponential backoff when streams disconnect to maintain continuous coverage.
Track Last Processed Version: Persist the last successfully processed version to resume streams without gaps after restarts.
Implement Buffering: Buffer incoming transactions during processing spikes to prevent backpressure and dropped connections.
Monitor Stream Health: Track message rates, latency, and error rates to detect and respond to degradation quickly.
Use Asynchronous Processing: Process transactions asynchronously to avoid blocking the stream and falling behind.
Set Appropriate Timeouts: Configure keepalive and timeout values to detect dead connections quickly.
Handle Duplicates: Implement idempotent processing since network issues may cause duplicate transaction deliveries.
Event Filtering#
class RealTimeProcessor {
private stream: ClientReadableStream;
async start() {
const request = {
includeEvents: true,
// Optional: filter by transaction type
transactionFilters: {
userTransaction: true,
genesisTransaction: false,
blockMetadataTransaction: false,
stateCheckpointTransaction: false
}
};
this.stream = client.subscribe(request, metadata);
this.stream.on("data", (tx) => {
// Further filter by event type
if (this.isRelevantTransaction(tx)) {
this.processTransaction(tx);
}
});
}
private isRelevantTransaction(tx: Transaction): boolean {
// Filter by function calls
if (tx.payload?.function) {
return tx.payload.function.startsWith("0x1::coin::transfer");
}
// Filter by events
if (tx.events) {
return tx.events.some(event =>
event.type.includes("WithdrawEvent") ||
event.type.includes("DepositEvent")
);
}
return false;
}
}
Latency Optimization#
class LowLatencyProcessor {
private processingQueue: Transaction[] = [];
private processing: boolean = false;
onTransaction(tx: Transaction) {
this.processingQueue.push(tx);
if (!this.processing) {
this.processBatch();
}
}
private async processBatch() {
this.processing = true;
while (this.processingQueue.length > 0) {
const batch = this.processingQueue.splice(0, 100);
// Process batch in parallel
await Promise.all(
batch.map(tx => this.processTransactionFast(tx))
);
}
this.processing = false;
}
private async processTransactionFast(tx: Transaction) {
// Minimal processing for lowest latency
const essential = this.extractEssentialData(tx);
await this.fastWrite(essential);
this.emitEvent(essential);
}
}
Connection Management#
class StreamManager {
private stream: ClientReadableStream | null = null;
private reconnectAttempts: number = 0;
private maxReconnectAttempts: number = 10;
async connect() {
try {
this.stream = client.subscribe(request, metadata);
this.stream.on("data", (tx) => {
this.reconnectAttempts = 0; // Reset on successful data
this.handleTransaction(tx);
});
this.stream.on("error", (error) => {
console.error("Stream error:", error);
this.reconnect();
});
this.stream.on("end", () => {
console.log("Stream ended");
this.reconnect();
});
} catch (error) {
console.error("Connection error:", error);
this.reconnect();
}
}
private async reconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error("Max reconnection attempts reached");
return;
}
this.reconnectAttempts++;
const backoff = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(`Reconnecting in ${backoff}ms (attempt ${this.reconnectAttempts})`);
await this.sleep(backoff);
await this.connect();
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
disconnect() {
if (this.stream) {
this.stream.cancel();
this.stream = null;
}
}
}
Metrics and Monitoring#
class StreamMetrics {
private lastTransactionTime: number = Date.now();
private transactionCount: number = 0;
private latencies: number[] = [];
recordTransaction(tx: Transaction, receivedAt: number) {
this.transactionCount++;
this.lastTransactionTime = receivedAt;
// Calculate latency (time from tx timestamp to receipt)
const txTime = new Date(tx.timestamp).getTime();
const latency = receivedAt - txTime;
this.latencies.push(latency);
// Keep only recent latencies
if (this.latencies.length > 1000) {
this.latencies.shift();
}
}
getMetrics() {
const avgLatency = this.latencies.reduce((a, b) => a + b, 0) / this.latencies.length;
const timeSinceLastTx = Date.now() - this.lastTransactionTime;
return {
totalProcessed: this.transactionCount,
averageLatency: avgLatency.toFixed(0) + "ms",
timeSinceLastTransaction: timeSinceLastTx + "ms",
isHealthy: timeSinceLastTx < 5000 // Alert if > 5s without tx
};
}
}
Performance Considerations#
- Network Proximity: Deploy processors close to streaming endpoints to minimize network latency
- Concurrent Processing: Use worker pools to process transactions in parallel without blocking the stream
- Memory Management: Implement backpressure handling to prevent memory exhaustion during bursts
- Database Optimization: Use connection pooling and bulk operations for database writes
- Caching: Cache frequently accessed data to avoid database lookups during processing
Related Concepts#
- Streaming Overview - Introduction to streaming
- Historical Replay - Processing past transactions
- Custom Processors - Building stream processors
- Authentication - Securing stream connections