Docs

real_time

Real-time transaction streaming

Coming soon: Need support for this? Email support@dwellir.com if you want early access

Real-time transaction streaming provides millisecond-latency access to finalized Aptos transactions as they occur on-chain, enabling responsive applications, trading systems, monitoring tools, and live dashboards. This feature delivers transactions with minimal delay after consensus, significantly faster than polling-based approaches.

Overview

Real-time streaming subscribes to the head of the blockchain, delivering transactions immediately as they are finalized by consensus. This approach provides consistent low-latency updates without the overhead and inconsistency of polling, ideal for applications requiring immediate awareness of on-chain state changes.

Basic Real-Time Subscription

TypeScript
import { TransactionStreamClient } from "./generated/aptos_stream";
import { credentials, Metadata } from "@grpc/grpc-js";

const client = new TransactionStreamClient(
  "stream.aptos.dwellir.com:443",
  credentials.createSsl()
);

const metadata = new Metadata();
metadata.add("authorization", `Bearer ${process.env.API_KEY}`);

// Subscribe to head of chain (no starting version = latest)
const request = {
  includeEvents: true,
  includeChanges: true
};

const stream = client.subscribe(request, metadata);

stream.on("data", (transaction) => {
  console.log(`New transaction: ${transaction.hash}`);
  console.log(`Version: ${transaction.version}`);
  console.log(`Timestamp: ${transaction.timestamp}`);

  // Process transaction in real-time
  processTransaction(transaction);
});

stream.on("error", (error) => {
  console.error("Stream error:", error);
  // Implement reconnection logic
});

stream.on("end", () => {
  console.log("Stream ended");
  // Reconnect
});

Real-World Use Cases

  1. Trading Bots: Execute arbitrage or market-making strategies based on millisecond-fresh DEX swap data and price movements.

  2. Live Dashboards: Display real-time protocol metrics, transaction volumes, gas prices, and network activity without refresh delays.

  3. Notification Systems: Send instant push notifications when users receive payments, NFTs transfer, or important events occur.

  4. Gaming Applications: Update game state, leaderboards, and player inventories immediately as transactions finalize on-chain.

  5. Security Monitoring: Detect suspicious patterns, unusual transactions, or potential attacks in real-time for immediate response.

  6. Price Oracles: Publish up-to-the-second price feeds by processing DEX trades and liquidity changes as they happen.

Best Practices

Handle Reconnections Gracefully: Implement automatic reconnection with exponential backoff when streams disconnect to maintain continuous coverage.

Track Last Processed Version: Persist the last successfully processed version to resume streams without gaps after restarts.

Implement Buffering: Buffer incoming transactions during processing spikes to prevent backpressure and dropped connections.

Monitor Stream Health: Track message rates, latency, and error rates to detect and respond to degradation quickly.

Use Asynchronous Processing: Process transactions asynchronously to avoid blocking the stream and falling behind.

Set Appropriate Timeouts: Configure keepalive and timeout values to detect dead connections quickly.

Handle Duplicates: Implement idempotent processing since network issues may cause duplicate transaction deliveries.

Event Filtering

TypeScript
class RealTimeProcessor {
  private stream: ClientReadableStream;

  async start() {
    const request = {
      includeEvents: true,
      // Optional: filter by transaction type
      transactionFilters: {
        userTransaction: true,
        genesisTransaction: false,
        blockMetadataTransaction: false,
        stateCheckpointTransaction: false
      }
    };

    this.stream = client.subscribe(request, metadata);

    this.stream.on("data", (tx) => {
      // Further filter by event type
      if (this.isRelevantTransaction(tx)) {
        this.processTransaction(tx);
      }
    });
  }

  private isRelevantTransaction(tx: Transaction): boolean {
    // Filter by function calls
    if (tx.payload?.function) {
      return tx.payload.function.startsWith("0x1::coin::transfer");
    }

    // Filter by events
    if (tx.events) {
      return tx.events.some(event =>
        event.type.includes("WithdrawEvent") ||
        event.type.includes("DepositEvent")
      );
    }

    return false;
  }
}

Latency Optimization

TypeScript
class LowLatencyProcessor {
  private processingQueue: Transaction[] = [];
  private processing: boolean = false;

  onTransaction(tx: Transaction) {
    this.processingQueue.push(tx);

    if (!this.processing) {
      this.processBatch();
    }
  }

  private async processBatch() {
    this.processing = true;

    while (this.processingQueue.length > 0) {
      const batch = this.processingQueue.splice(0, 100);

      // Process batch in parallel
      await Promise.all(
        batch.map(tx => this.processTransactionFast(tx))
      );
    }

    this.processing = false;
  }

  private async processTransactionFast(tx: Transaction) {
    // Minimal processing for lowest latency
    const essential = this.extractEssentialData(tx);
    await this.fastWrite(essential);
    this.emitEvent(essential);
  }
}

Connection Management

TypeScript
class StreamManager {
  private stream: ClientReadableStream | null = null;
  private reconnectAttempts: number = 0;
  private maxReconnectAttempts: number = 10;

  async connect() {
    try {
      this.stream = client.subscribe(request, metadata);

      this.stream.on("data", (tx) => {
        this.reconnectAttempts = 0; // Reset on successful data
        this.handleTransaction(tx);
      });

      this.stream.on("error", (error) => {
        console.error("Stream error:", error);
        this.reconnect();
      });

      this.stream.on("end", () => {
        console.log("Stream ended");
        this.reconnect();
      });

    } catch (error) {
      console.error("Connection error:", error);
      this.reconnect();
    }
  }

  private async reconnect() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error("Max reconnection attempts reached");
      return;
    }

    this.reconnectAttempts++;
    const backoff = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);

    console.log(`Reconnecting in ${backoff}ms (attempt ${this.reconnectAttempts})`);

    await this.sleep(backoff);
    await this.connect();
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }

  disconnect() {
    if (this.stream) {
      this.stream.cancel();
      this.stream = null;
    }
  }
}

Metrics and Monitoring

TypeScript
class StreamMetrics {
  private lastTransactionTime: number = Date.now();
  private transactionCount: number = 0;
  private latencies: number[] = [];

  recordTransaction(tx: Transaction, receivedAt: number) {
    this.transactionCount++;
    this.lastTransactionTime = receivedAt;

    // Calculate latency (time from tx timestamp to receipt)
    const txTime = new Date(tx.timestamp).getTime();
    const latency = receivedAt - txTime;
    this.latencies.push(latency);

    // Keep only recent latencies
    if (this.latencies.length > 1000) {
      this.latencies.shift();
    }
  }

  getMetrics() {
    const avgLatency = this.latencies.reduce((a, b) => a + b, 0) / this.latencies.length;
    const timeSinceLastTx = Date.now() - this.lastTransactionTime;

    return {
      totalProcessed: this.transactionCount,
      averageLatency: avgLatency.toFixed(0) + "ms",
      timeSinceLastTransaction: timeSinceLastTx + "ms",
      isHealthy: timeSinceLastTx < 5000 // Alert if > 5s without tx
    };
  }
}

Performance Considerations

  • Network Proximity: Deploy processors close to streaming endpoints to minimize network latency
  • Concurrent Processing: Use worker pools to process transactions in parallel without blocking the stream
  • Memory Management: Implement backpressure handling to prevent memory exhaustion during bursts
  • Database Optimization: Use connection pooling and bulk operations for database writes
  • Caching: Cache frequently accessed data to avoid database lookups during processing