From 98fbb8b0d057108d509eb7db49304d377f63c9c9 Mon Sep 17 00:00:00 2001 From: satyakwok Date: Tue, 26 May 2026 08:21:26 +0200 Subject: [PATCH] fix(indexer): unblock testnet backfill --- apps/indexer/Dockerfile | 2 +- apps/indexer/src/sync.ts | 39 ++++++++++++-- docker-compose.testnet.yml | 24 ++++++--- packages/chain/src/index.ts | 103 +++++++++++++++++++++++++++++++----- 4 files changed, 141 insertions(+), 27 deletions(-) diff --git a/apps/indexer/Dockerfile b/apps/indexer/Dockerfile index 2233cea..659f8ff 100644 --- a/apps/indexer/Dockerfile +++ b/apps/indexer/Dockerfile @@ -51,6 +51,6 @@ WORKDIR /app/apps/indexer EXPOSE 8082 HEALTHCHECK --interval=30s --timeout=5s --start-period=20s --retries=3 \ - CMD wget --no-verbose --tries=1 --spider http://127.0.0.1:8082/health || exit 1 + CMD wget --no-verbose --tries=1 --spider "http://127.0.0.1:${INDEXER_HEALTH_PORT:-8082}/health" || exit 1 CMD ["npx", "tsx", "src/index.ts"] diff --git a/apps/indexer/src/sync.ts b/apps/indexer/src/sync.ts index 0b9fffc..767ff19 100644 --- a/apps/indexer/src/sync.ts +++ b/apps/indexer/src/sync.ts @@ -23,7 +23,10 @@ import { tokenTransfers, transactions as txsTable, } from "@sentriscloud/indexer-db"; -import type { SentrixClient } from "@sentriscloud/indexer-chain"; +import { + BlockNotFoundError, + type SentrixClient, +} from "@sentriscloud/indexer-chain"; import { dispatch } from "./handlers/index.js"; interface SyncOnceArgs { @@ -49,7 +52,16 @@ export async function syncOnce(args: SyncOnceArgs): Promise { log.info({ from: start.toString(), to: end.toString() }, "backfill batch"); for (let h = start; h <= end; h++) { - await indexBlock({ db, chain, height: h, log }); + try { + await indexBlock({ db, chain, height: h, log }); + } catch (err) { + if (!(err instanceof BlockNotFoundError)) throw err; + log.warn( + { height: h.toString() }, + "block missing from RPC — advancing cursor", + ); + await advanceLastSynced(db, h); + } } return end; } @@ -120,7 +132,10 @@ export async function indexBlock(args: IndexBlockArgs) { const natives = await mapWithConcurrency( txEntries, TX_FETCH_CONCURRENCY, - async (e) => ({ entry: e, native: await chain.getNativeTransaction(e.hash) }), + async (e) => ({ + entry: e, + native: await chain.getNativeTransaction(e.hash), + }), ); // ── PHASE 2: build batch INSERT row arrays. @@ -294,7 +309,7 @@ export async function indexBlock(args: IndexBlockArgs) { .onConflictDoUpdate({ target: meta.key, set: { - value: sql`excluded.value`, + value: sql`GREATEST(${meta.value}::numeric, excluded.value::numeric)::text`, updatedAt: sql`excluded.updated_at`, }, }); @@ -313,3 +328,19 @@ async function readLastSynced(db: DbClient): Promise { return BigInt(rows[0].value); } +async function advanceLastSynced(db: DbClient, height: bigint): Promise { + await db + .insert(meta) + .values({ + key: "last_synced_height", + value: height.toString(), + updatedAt: BigInt(Math.floor(Date.now() / 1000)), + }) + .onConflictDoUpdate({ + target: meta.key, + set: { + value: sql`GREATEST(${meta.value}::numeric, excluded.value::numeric)::text`, + updatedAt: sql`excluded.updated_at`, + }, + }); +} diff --git a/docker-compose.testnet.yml b/docker-compose.testnet.yml index 27d9467..97742ce 100644 --- a/docker-compose.testnet.yml +++ b/docker-compose.testnet.yml @@ -37,10 +37,13 @@ services: depends_on: postgres: condition: service_healthy + network_mode: host environment: - INDEXER_DATABASE_URL: ${INDEXER_DATABASE_URL:?env required} + INDEXER_DATABASE_URL: postgres://${POSTGRES_USER:?env required}:${POSTGRES_PASSWORD:?env required}@127.0.0.1:5433/${POSTGRES_DB:?env required} INDEXER_NETWORK: testnet INDEXER_HEALTH_PORT: 8084 + INDEXER_RPC_HTTP_URL: http://127.0.0.1:9545/rpc + INDEXER_RPC_WS_URL: ws://127.0.0.1:9545/ws LOG_LEVEL: info # Bumped from default 50 — testnet currently 2.5M blocks ahead of the # backfill cursor; at 50/batch the catch-up ETA is ~70h. 500/batch @@ -48,13 +51,14 @@ services: # (each block fetch is independent and our retry429 wrapper handles # transient 429/502s anyway). INDEXER_BATCH_SIZE: "500" - ports: - - "127.0.0.1:8084:8084" - # Override the Dockerfile's built-in healthcheck — the image bakes - # `wget http://127.0.0.1:8082/health` (mainnet default) but the - # testnet stack runs the worker on 8084 via INDEXER_HEALTH_PORT. + # Keep an explicit compose healthcheck for testnet; the worker listens + # on 8084 via INDEXER_HEALTH_PORT. healthcheck: - test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://127.0.0.1:8084/health || exit 1"] + test: + [ + "CMD-SHELL", + "wget --no-verbose --tries=1 --spider http://127.0.0.1:8084/health || exit 1", + ] interval: 30s timeout: 5s start_period: 20s @@ -80,7 +84,11 @@ services: # Same Dockerfile-port-mismatch story as the worker — bake-time # default is 8081, testnet runs on 8083 via API_PORT env. healthcheck: - test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://127.0.0.1:8083/health || exit 1"] + test: + [ + "CMD-SHELL", + "wget --no-verbose --tries=1 --spider http://127.0.0.1:8083/health || exit 1", + ] interval: 30s timeout: 5s start_period: 15s diff --git a/packages/chain/src/index.ts b/packages/chain/src/index.ts index ca260f2..a7969a0 100644 --- a/packages/chain/src/index.ts +++ b/packages/chain/src/index.ts @@ -56,7 +56,8 @@ async function retry429(fn: () => Promise, attempts = 6): Promise { } catch (err) { lastErr = err; const msg = String((err as Error)?.message ?? ""); - if (!msg.includes("Status: 429") && !msg.includes("rate limit")) throw err; + if (!msg.includes("Status: 429") && !msg.includes("rate limit")) + throw err; // Exponential backoff: 0.5s → 1s → 2s → 4s → 8s → 16s. await new Promise((r) => setTimeout(r, delayMs)); delayMs = Math.min(delayMs * 2, 16_000); @@ -102,6 +103,37 @@ interface GrpcBlockHeader { hash: string; } +type RpcQuantity = `0x${string}`; +type RpcHash = `0x${string}`; + +interface RpcBlock { + baseFeePerGas?: RpcQuantity | null; + gasLimit?: RpcQuantity | null; + gasUsed?: RpcQuantity | null; + hash?: RpcHash | null; + miner?: `0x${string}` | null; + number?: RpcQuantity | null; + parentHash: RpcHash; + stateRoot?: RpcHash | null; + timestamp: RpcQuantity; + transactions: (RpcHash | { hash: RpcHash })[]; +} + +function quantityToBigInt(value: RpcQuantity | null | undefined): bigint { + return value == null ? 0n : BigInt(value); +} + +function bigintToQuantity(value: bigint): RpcQuantity { + return `0x${value.toString(16)}`; +} + +export class BlockNotFoundError extends Error { + constructor(readonly height: bigint) { + super(`Block at number "${height}" could not be found.`); + this.name = "BlockNotFoundError"; + } +} + /** * Tip watcher backed by the side-car gRPC `GetBlock {latest:true}`. Called * once per `intervalMs`; emits the new tip height whenever it advances. @@ -135,17 +167,17 @@ export interface NativeTransaction { block_index: number; block_timestamp: number; transaction: { - amount: number; // sentri — 1 SRX = 1e8 sentri + amount: number; // sentri — 1 SRX = 1e8 sentri chain_id: number; data: string; - fee: number; // sentri + fee: number; // sentri from_address: string; // 0x… hex OR 'COINBASE' sentinel nonce: number; public_key: string; signature: string; timestamp: number; to_address: string; - txid: string; // bare hex, NOT 0x-prefixed + txid: string; // bare hex, NOT 0x-prefixed }; } @@ -241,13 +273,19 @@ export class SentrixClient { // chain genuinely doesn't have the tx. let delayMs = 250; for (let attempt = 0; attempt < 4; attempt++) { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), 10_000); try { - const r = await fetch(`${this.restBase}/transactions/${bareHash}`); + const r = await fetch(`${this.restBase}/transactions/${bareHash}`, { + signal: controller.signal, + }); if (r.status === 404) return null; if (r.ok) return (await r.json()) as NativeTransaction; // Non-2xx, non-404 → retry. Falls through to backoff below. } catch { // Network error / aborted fetch → retry. Falls through. + } finally { + clearTimeout(timeout); } if (attempt < 3) { await new Promise((r) => setTimeout(r, delayMs)); @@ -284,7 +322,9 @@ export class SentrixClient { // proto-loader returns `index` as string when longs:String. Hash is // { value: Buffer }. const idx = BigInt(resp.index); - const hash = Buffer.from(resp.hash?.value ?? new Uint8Array()).toString("hex"); + const hash = Buffer.from( + resp.hash?.value ?? new Uint8Array(), + ).toString("hex"); resolve({ index: idx, hash }); }, ); @@ -323,7 +363,10 @@ export class SentrixClient { | { kind: "block"; height: bigint; hash: string; latencyMs: number } | { kind: "lagged"; skipped: bigint }, ) => void, - opts: { onError?: (err: unknown) => void; onReconnect?: (attempt: number) => void } = {}, + opts: { + onError?: (err: unknown) => void; + onReconnect?: (attempt: number) => void; + } = {}, ): BlockStreamSub { let stopped = false; let backoffMs = 500; @@ -352,7 +395,9 @@ export class SentrixClient { onBlock({ kind: "block", height: BigInt(b.index), - hash: Buffer.from(b.hash?.value ?? new Uint8Array()).toString("hex"), + hash: Buffer.from(b.hash?.value ?? new Uint8Array()).toString( + "hex", + ), latencyMs, }); } else if (msg.lagged) { @@ -468,16 +513,46 @@ export class SentrixClient { * string entries (`typeof t === "string" → continue`), so the blocks * table populates fine and the transactions table stays empty until * the chain RPC is brought into spec OR the indexer reads tx via the - * REST `/transactions/` shape with a native-format adapter. See - * Sentriscloud/indexer issue tracker. + * REST `/transactions/` shape with a native-format adapter. Viem's + * `getBlock` parser can reject historical Sentrix block responses even + * when raw `eth_getBlockByNumber` succeeds, so this method intentionally + * stays on raw JSON-RPC and maps only the fields the indexer uses. */ async getBlock(height: bigint): Promise> { - return retry429(() => - this.http.getBlock({ blockNumber: height, includeTransactions: true }), - ); + const block = (await retry429(() => + this.http.request({ + method: "eth_getBlockByNumber", + params: [bigintToQuantity(height), true], + }), + )) as RpcBlock | null; + + if (!block) { + throw new BlockNotFoundError(height); + } + + return { + baseFeePerGas: + block.baseFeePerGas == null + ? null + : quantityToBigInt(block.baseFeePerGas), + gasLimit: quantityToBigInt(block.gasLimit), + gasUsed: quantityToBigInt(block.gasUsed), + hash: block.hash ?? null, + miner: block.miner ?? null, + number: block.number == null ? null : quantityToBigInt(block.number), + parentHash: block.parentHash, + stateRoot: block.stateRoot ?? null, + timestamp: quantityToBigInt(block.timestamp), + transactions: block.transactions.map((tx) => + typeof tx === "string" ? tx : { hash: tx.hash }, + ), + } as Block; } - async getLogsRange(fromBlock: bigint, toBlock: bigint): Promise { + async getLogsRange( + fromBlock: bigint, + toBlock: bigint, + ): Promise { return retry429(() => this.http.getLogs({ fromBlock, toBlock })); }