Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/indexer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ WORKDIR /app/apps/indexer
EXPOSE 8082

HEALTHCHECK --interval=30s --timeout=5s --start-period=20s --retries=3 \
CMD wget --no-verbose --tries=1 --spider http://127.0.0.1:8082/health || exit 1
CMD wget --no-verbose --tries=1 --spider "http://127.0.0.1:${INDEXER_HEALTH_PORT:-8082}/health" || exit 1

CMD ["npx", "tsx", "src/index.ts"]
39 changes: 35 additions & 4 deletions apps/indexer/src/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import {
tokenTransfers,
transactions as txsTable,
} from "@sentriscloud/indexer-db";
import type { SentrixClient } from "@sentriscloud/indexer-chain";
import {
BlockNotFoundError,
type SentrixClient,
} from "@sentriscloud/indexer-chain";
import { dispatch } from "./handlers/index.js";

interface SyncOnceArgs {
Expand All @@ -49,7 +52,16 @@ export async function syncOnce(args: SyncOnceArgs): Promise<bigint> {
log.info({ from: start.toString(), to: end.toString() }, "backfill batch");

for (let h = start; h <= end; h++) {
await indexBlock({ db, chain, height: h, log });
try {
await indexBlock({ db, chain, height: h, log });
} catch (err) {
if (!(err instanceof BlockNotFoundError)) throw err;
log.warn(
{ height: h.toString() },
"block missing from RPC — advancing cursor",
);
await advanceLastSynced(db, h);
}
}
return end;
}
Expand Down Expand Up @@ -120,7 +132,10 @@ export async function indexBlock(args: IndexBlockArgs) {
const natives = await mapWithConcurrency(
txEntries,
TX_FETCH_CONCURRENCY,
async (e) => ({ entry: e, native: await chain.getNativeTransaction(e.hash) }),
async (e) => ({
entry: e,
native: await chain.getNativeTransaction(e.hash),
}),
);

// ── PHASE 2: build batch INSERT row arrays.
Expand Down Expand Up @@ -294,7 +309,7 @@ export async function indexBlock(args: IndexBlockArgs) {
.onConflictDoUpdate({
target: meta.key,
set: {
value: sql`excluded.value`,
value: sql`GREATEST(${meta.value}::numeric, excluded.value::numeric)::text`,
updatedAt: sql`excluded.updated_at`,
},
});
Expand All @@ -313,3 +328,19 @@ async function readLastSynced(db: DbClient): Promise<bigint> {
return BigInt(rows[0].value);
}

async function advanceLastSynced(db: DbClient, height: bigint): Promise<void> {
await db
.insert(meta)
.values({
key: "last_synced_height",
value: height.toString(),
updatedAt: BigInt(Math.floor(Date.now() / 1000)),
})
.onConflictDoUpdate({
target: meta.key,
set: {
value: sql`GREATEST(${meta.value}::numeric, excluded.value::numeric)::text`,
updatedAt: sql`excluded.updated_at`,
},
});
}
24 changes: 16 additions & 8 deletions docker-compose.testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,28 @@ services:
depends_on:
postgres:
condition: service_healthy
network_mode: host
environment:
INDEXER_DATABASE_URL: ${INDEXER_DATABASE_URL:?env required}
INDEXER_DATABASE_URL: postgres://${POSTGRES_USER:?env required}:${POSTGRES_PASSWORD:?env required}@127.0.0.1:5433/${POSTGRES_DB:?env required}
INDEXER_NETWORK: testnet
INDEXER_HEALTH_PORT: 8084
INDEXER_RPC_HTTP_URL: http://127.0.0.1:9545/rpc
INDEXER_RPC_WS_URL: ws://127.0.0.1:9545/ws
LOG_LEVEL: info
# Bumped from default 50 — testnet currently 2.5M blocks ahead of the
# backfill cursor; at 50/batch the catch-up ETA is ~70h. 500/batch
# tightens that to ~7h with no observed RPC pressure increase
# (each block fetch is independent and our retry429 wrapper handles
# transient 429/502s anyway).
INDEXER_BATCH_SIZE: "500"
ports:
- "127.0.0.1:8084:8084"
# Override the Dockerfile's built-in healthcheck — the image bakes
# `wget http://127.0.0.1:8082/health` (mainnet default) but the
# testnet stack runs the worker on 8084 via INDEXER_HEALTH_PORT.
# Keep an explicit compose healthcheck for testnet; the worker listens
# on 8084 via INDEXER_HEALTH_PORT.
healthcheck:
test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://127.0.0.1:8084/health || exit 1"]
test:
[
"CMD-SHELL",
"wget --no-verbose --tries=1 --spider http://127.0.0.1:8084/health || exit 1",
]
interval: 30s
timeout: 5s
start_period: 20s
Expand All @@ -80,7 +84,11 @@ services:
# Same Dockerfile-port-mismatch story as the worker — bake-time
# default is 8081, testnet runs on 8083 via API_PORT env.
healthcheck:
test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://127.0.0.1:8083/health || exit 1"]
test:
[
"CMD-SHELL",
"wget --no-verbose --tries=1 --spider http://127.0.0.1:8083/health || exit 1",
]
interval: 30s
timeout: 5s
start_period: 15s
Expand Down
103 changes: 89 additions & 14 deletions packages/chain/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ async function retry429<T>(fn: () => Promise<T>, attempts = 6): Promise<T> {
} catch (err) {
lastErr = err;
const msg = String((err as Error)?.message ?? "");
if (!msg.includes("Status: 429") && !msg.includes("rate limit")) throw err;
if (!msg.includes("Status: 429") && !msg.includes("rate limit"))
throw err;
// Exponential backoff: 0.5s → 1s → 2s → 4s → 8s → 16s.
await new Promise((r) => setTimeout(r, delayMs));
delayMs = Math.min(delayMs * 2, 16_000);
Expand Down Expand Up @@ -102,6 +103,37 @@ interface GrpcBlockHeader {
hash: string;
}

type RpcQuantity = `0x${string}`;
type RpcHash = `0x${string}`;

interface RpcBlock {
baseFeePerGas?: RpcQuantity | null;
gasLimit?: RpcQuantity | null;
gasUsed?: RpcQuantity | null;
hash?: RpcHash | null;
miner?: `0x${string}` | null;
number?: RpcQuantity | null;
parentHash: RpcHash;
stateRoot?: RpcHash | null;
timestamp: RpcQuantity;
transactions: (RpcHash | { hash: RpcHash })[];
}

function quantityToBigInt(value: RpcQuantity | null | undefined): bigint {
return value == null ? 0n : BigInt(value);
}

function bigintToQuantity(value: bigint): RpcQuantity {
return `0x${value.toString(16)}`;
}

export class BlockNotFoundError extends Error {
constructor(readonly height: bigint) {
super(`Block at number "${height}" could not be found.`);
this.name = "BlockNotFoundError";
}
}

/**
* Tip watcher backed by the side-car gRPC `GetBlock {latest:true}`. Called
* once per `intervalMs`; emits the new tip height whenever it advances.
Expand Down Expand Up @@ -135,17 +167,17 @@ export interface NativeTransaction {
block_index: number;
block_timestamp: number;
transaction: {
amount: number; // sentri — 1 SRX = 1e8 sentri
amount: number; // sentri — 1 SRX = 1e8 sentri
chain_id: number;
data: string;
fee: number; // sentri
fee: number; // sentri
from_address: string; // 0x… hex OR 'COINBASE' sentinel
nonce: number;
public_key: string;
signature: string;
timestamp: number;
to_address: string;
txid: string; // bare hex, NOT 0x-prefixed
txid: string; // bare hex, NOT 0x-prefixed
};
}

Expand Down Expand Up @@ -241,13 +273,19 @@ export class SentrixClient {
// chain genuinely doesn't have the tx.
let delayMs = 250;
for (let attempt = 0; attempt < 4; attempt++) {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 10_000);
try {
const r = await fetch(`${this.restBase}/transactions/${bareHash}`);
const r = await fetch(`${this.restBase}/transactions/${bareHash}`, {
signal: controller.signal,
});
if (r.status === 404) return null;
if (r.ok) return (await r.json()) as NativeTransaction;
// Non-2xx, non-404 → retry. Falls through to backoff below.
} catch {
// Network error / aborted fetch → retry. Falls through.
} finally {
clearTimeout(timeout);
}
if (attempt < 3) {
await new Promise((r) => setTimeout(r, delayMs));
Expand Down Expand Up @@ -284,7 +322,9 @@ export class SentrixClient {
// proto-loader returns `index` as string when longs:String. Hash is
// { value: Buffer }.
const idx = BigInt(resp.index);
const hash = Buffer.from(resp.hash?.value ?? new Uint8Array()).toString("hex");
const hash = Buffer.from(
resp.hash?.value ?? new Uint8Array(),
).toString("hex");
resolve({ index: idx, hash });
},
);
Expand Down Expand Up @@ -323,7 +363,10 @@ export class SentrixClient {
| { kind: "block"; height: bigint; hash: string; latencyMs: number }
| { kind: "lagged"; skipped: bigint },
) => void,
opts: { onError?: (err: unknown) => void; onReconnect?: (attempt: number) => void } = {},
opts: {
onError?: (err: unknown) => void;
onReconnect?: (attempt: number) => void;
} = {},
): BlockStreamSub {
let stopped = false;
let backoffMs = 500;
Expand Down Expand Up @@ -352,7 +395,9 @@ export class SentrixClient {
onBlock({
kind: "block",
height: BigInt(b.index),
hash: Buffer.from(b.hash?.value ?? new Uint8Array()).toString("hex"),
hash: Buffer.from(b.hash?.value ?? new Uint8Array()).toString(
"hex",
),
latencyMs,
});
} else if (msg.lagged) {
Expand Down Expand Up @@ -468,16 +513,46 @@ export class SentrixClient {
* string entries (`typeof t === "string" → continue`), so the blocks
* table populates fine and the transactions table stays empty until
* the chain RPC is brought into spec OR the indexer reads tx via the
* REST `/transactions/<hash>` shape with a native-format adapter. See
* Sentriscloud/indexer issue tracker.
* REST `/transactions/<hash>` shape with a native-format adapter. Viem's
* `getBlock` parser can reject historical Sentrix block responses even
* when raw `eth_getBlockByNumber` succeeds, so this method intentionally
* stays on raw JSON-RPC and maps only the fields the indexer uses.
*/
async getBlock(height: bigint): Promise<Block<bigint, true>> {
return retry429(() =>
this.http.getBlock({ blockNumber: height, includeTransactions: true }),
);
const block = (await retry429(() =>
this.http.request({
method: "eth_getBlockByNumber",
params: [bigintToQuantity(height), true],
}),
)) as RpcBlock | null;

if (!block) {
throw new BlockNotFoundError(height);
}

return {
baseFeePerGas:
block.baseFeePerGas == null
? null
: quantityToBigInt(block.baseFeePerGas),
gasLimit: quantityToBigInt(block.gasLimit),
gasUsed: quantityToBigInt(block.gasUsed),
hash: block.hash ?? null,
miner: block.miner ?? null,
number: block.number == null ? null : quantityToBigInt(block.number),
parentHash: block.parentHash,
stateRoot: block.stateRoot ?? null,
timestamp: quantityToBigInt(block.timestamp),
transactions: block.transactions.map((tx) =>
typeof tx === "string" ? tx : { hash: tx.hash },
),
} as Block<bigint, true>;
}

async getLogsRange(fromBlock: bigint, toBlock: bigint): Promise<GetLogsReturnType> {
async getLogsRange(
fromBlock: bigint,
toBlock: bigint,
): Promise<GetLogsReturnType> {
return retry429(() => this.http.getLogs({ fromBlock, toBlock }));
}

Expand Down
Loading