⚠️Blast API (blastapi.io) ends Oct 31. Migrate to Dwellir and skip Alchemy's expensive compute units.
Switch Today →
Skip to main content

Coming soon: Need support for this? Email support@dwellir.com if you want early access

Custom Processors

Custom processors transform raw blockchain transactions into application-specific data models, enabling efficient real-time indexing and analytics. Building custom processors allows you to extract exactly the information your application needs while filtering irrelevant data, creating optimized databases tailored to your use cases.

Overview#

A custom processor subscribes to transaction streams, decodes relevant transactions and events, transforms the data into your application's schema, and writes it to your database. This approach is more efficient than polling APIs or running full nodes, providing millisecond-latency updates with minimal infrastructure overhead.

Processor Architecture#

class AptosStreamProcessor {
private stream: ClientReadableStream;
private database: Database;

async start() {
this.stream = this.subscribeToTransactions();

this.stream.on("data", async (transaction) => {
try {
await this.processTransaction(transaction);
} catch (error) {
console.error("Processing error:", error);
this.handleError(error, transaction);
}
});

this.stream.on("error", (error) => {
console.error("Stream error:", error);
this.reconnect();
});

this.stream.on("end", () => {
console.log("Stream ended");
this.reconnect();
});
}

private async processTransaction(tx: Transaction) {
// Filter transactions by type
if (!this.shouldProcess(tx)) return;

// Decode transaction payload
const decoded = this.decodeTransaction(tx);

// Transform to application model
const model = this.transformToModel(decoded);

// Write to database
await this.database.insert(model);

// Update indexes
await this.updateIndexes(model);

// Emit events for real-time features
this.emit("transaction", model);
}

private shouldProcess(tx: Transaction): boolean {
// Filter logic - only process relevant transactions
return (
tx.type === "user_transaction" &&
tx.payload?.function?.startsWith("0x1::coin::transfer")
);
}
}

Real-World Use Cases#

  1. NFT Marketplace Indexing: Process mint, transfer, and sale events in real-time to keep marketplace listings and analytics up-to-date without polling.

  2. DeFi Protocol Analytics: Track swap events, liquidity changes, and yield updates across multiple protocols for portfolio dashboards and price feeds.

  3. Wallet Transaction History: Index all transactions for user addresses into optimized databases for instant transaction history queries.

  4. Gaming State Management: Process game move events and state changes to maintain real-time leaderboards and player inventories.

  5. Compliance Monitoring: Scan transaction streams for patterns matching compliance rules, flagging suspicious activities in real-time.

  6. Price Oracle Updates: Extract DEX swap data to compute and publish price feeds with sub-second latency for DeFi applications.

Best Practices#

Idempotent Processing: Design processors to handle duplicate events safely since streaming guarantees at-least-once delivery, not exactly-once.

Checkpoint Progress: Persistently track the last processed transaction version to enable resumption after restarts without reprocessing.

Batch Database Writes: Buffer decoded events and write in batches to reduce database load and improve throughput.

Handle Reorgs Carefully: Although rare on Aptos, implement logic to handle chain reorganizations if processing near the chain head.

Monitor Performance: Track processing latency, throughput, and error rates to detect bottlenecks and degradations early.

Implement Circuit Breakers: Automatically pause processing when error rates exceed thresholds to prevent cascading failures.

Schema Versioning: Plan for schema evolution - use versioned data models to support processor upgrades without downtime.

Event Decoding#

interface DecodedEvent {
type: string;
data: any;
address: string;
sequence: bigint;
}

function decodeEvent(event: Event): DecodedEvent {
const eventType = event.type.name;

switch (eventType) {
case "0x1::coin::WithdrawEvent":
return {
type: "coin_withdraw",
data: {
amount: BigInt(event.data.amount),
coinType: event.type.typeArgs[0]
},
address: event.key.accountAddress,
sequence: BigInt(event.sequenceNumber)
};

case "0x1::coin::DepositEvent":
return {
type: "coin_deposit",
data: {
amount: BigInt(event.data.amount),
coinType: event.type.typeArgs[0]
},
address: event.key.accountAddress,
sequence: BigInt(event.sequenceNumber)
};

default:
return null;
}
}

State Management#

class ProcessorState {
private currentVersion: bigint = 0n;
private checkpointInterval: number = 100;

async saveCheckpoint() {
await this.database.upsert("processor_state", {
name: "main_processor",
last_version: this.currentVersion.toString(),
timestamp: new Date()
});
}

async loadCheckpoint(): Promise<bigint> {
const state = await this.database.findOne("processor_state", {
name: "main_processor"
});

return state ? BigInt(state.last_version) : 0n;
}

async updateVersion(version: bigint) {
this.currentVersion = version;

// Periodic checkpointing
if (version % BigInt(this.checkpointInterval) === 0n) {
await this.saveCheckpoint();
}
}
}

Error Handling#

class ProcessorErrorHandler {
private errorCounts: Map<string, number> = new Map();
private maxRetries: number = 3;

async handleProcessingError(error: Error, tx: Transaction) {
const txHash = tx.hash;
const count = (this.errorCounts.get(txHash) || 0) + 1;

if (count >= this.maxRetries) {
// Log to dead letter queue
await this.deadLetterQueue.push({
transaction: tx,
error: error.message,
attempts: count,
timestamp: new Date()
});

this.errorCounts.delete(txHash);
return;
}

this.errorCounts.set(txHash, count);

// Exponential backoff
await this.sleep(Math.pow(2, count) * 1000);

// Retry processing
await this.processTransaction(tx);
}

private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}

Performance Optimization#

  • Parallel Processing: Process independent transactions concurrently using worker pools
  • Database Connection Pooling: Reuse database connections to reduce overhead
  • Selective Decoding: Only decode fields you need rather than full transaction payloads
  • Caching: Cache frequently accessed data (token metadata, account info) to reduce database queries
  • Compression: Compress large payloads before storage to save space and I/O

Monitoring Metrics#

class ProcessorMetrics {
transactionsProcessed: number = 0;
eventsDecoded: number = 0;
processingLatency: number[] = [];
errorCount: number = 0;

recordProcessing(startTime: number) {
this.transactionsProcessed++;
this.processingLatency.push(Date.now() - startTime);

// Keep only recent latencies
if (this.processingLatency.length > 1000) {
this.processingLatency.shift();
}
}

getMetrics() {
return {
total: this.transactionsProcessed,
errors: this.errorCount,
avgLatency: this.processingLatency.reduce((a, b) => a + b, 0) / this.processingLatency.length,
throughput: this.transactionsProcessed / (Date.now() / 1000)
};
}
}