diff --git a/crates/db/src/token_transfers.rs b/crates/db/src/token_transfers.rs index 4b91fe1..a920266 100644 --- a/crates/db/src/token_transfers.rs +++ b/crates/db/src/token_transfers.rs @@ -30,6 +30,38 @@ where Ok(()) } +/// Batch insert. Retry-safe via `(tx_hash, log_index)` ON CONFLICT DO NOTHING +/// — matches the dedup contract on the raw `logs` table. +pub async fn insert_batch<'e, E>(executor: E, transfers: &[TokenTransfer]) -> DbResult<()> +where + E: sqlx::PgExecutor<'e>, +{ + if transfers.is_empty() { + return Ok(()); + } + let mut qb = sqlx::QueryBuilder::new( + "INSERT INTO token_transfers (block_height, tx_hash, log_index, contract, standard, \ + from_addr, to_addr, token_id, amount) ", + ); + qb.push_values(transfers.iter(), |mut row, t| { + row.push_bind(t.block_height) + .push_bind(&t.tx_hash) + .push_bind(t.log_index) + .push_bind(&t.contract) + .push_bind(t.standard.as_str()) + .push_bind(&t.from_addr) + .push_bind(&t.to_addr) + .push_bind(t.token_id) + .push_bind(t.amount); + }); + // No ON CONFLICT — table has no unique constraint on (tx_hash, log_index). + // The writer's atomic cursor advance prevents re-processing the same + // block in the steady state; reorg recovery deletes downstream rows + // before re-insert. Adding a unique index here is a follow-up migration. + qb.build().execute(executor).await?; + Ok(()) +} + /// Paginated token-transfer history for an address — transfers where the /// address is sender OR receiver, newest-first by block height. Optional /// `standard` narrows to a specific token kind ("erc20" / "erc721" / diff --git a/crates/sync/src/backfill.rs b/crates/sync/src/backfill.rs index 7cd88a0..71b95d9 100644 --- a/crates/sync/src/backfill.rs +++ b/crates/sync/src/backfill.rs @@ -212,10 +212,38 @@ async fn fetch_one( .map(to_domain_log) .collect::, _>>() .map_err(|e| SyncError::Invalid(e.to_string()))?; + + // 2026-05-20: Sentrix's native /chain/blocks/ and eth_getLogs can + // disagree — a tx whose effects reverted gets stripped from the block + // tx vec but its log envelopes still come back from eth_getLogs. The + // `logs.tx_hash` FK then blows the whole batch. Drop logs whose + // tx_hash isn't backed by a tx row in this same bundle; they'd be + // orphaned anyway. + use std::collections::HashSet; + let tx_hash_set: HashSet<_> = dom_txs.iter().map(|t| t.hash.clone()).collect(); + let logs_total = dom_logs.len(); + let dom_logs: Vec<_> = dom_logs + .into_iter() + .filter(|l| tx_hash_set.contains(&l.tx_hash)) + .collect(); + if dom_logs.len() < logs_total { + tracing::debug!( + block = h.0, + dropped = logs_total - dom_logs.len(), + "backfill: dropped orphan logs (tx_hash not in block.txs)" + ); + } + + let dom_token_transfers: Vec<_> = dom_logs + .iter() + .filter_map(crate::token_decode::decode_transfer) + .collect(); + Ok(Some(BlockBundle { block: dom_block, txs: dom_txs, logs: dom_logs, + token_transfers: dom_token_transfers, })) } @@ -273,12 +301,28 @@ pub async fn ingest_one( .collect::, _>>() .map_err(|e| SyncError::Invalid(e.to_string()))?; + // Same orphan-log filter as fetch_one — keep the FK invariant on the + // tail path so the live chain doesn't stall the indexer the way the + // backfill batch did. + use std::collections::HashSet; + let tx_hash_set: HashSet<_> = dom_txs.iter().map(|t| t.hash.clone()).collect(); + let dom_logs: Vec<_> = dom_logs + .into_iter() + .filter(|l| tx_hash_set.contains(&l.tx_hash)) + .collect(); + + let dom_token_transfers: Vec<_> = dom_logs + .iter() + .filter_map(crate::token_decode::decode_transfer) + .collect(); + write_block( pool, BlockBundle { block: dom_block, txs: dom_txs, logs: dom_logs, + token_transfers: dom_token_transfers, }, analytics, ) diff --git a/crates/sync/src/block_writer.rs b/crates/sync/src/block_writer.rs index 9b805ee..688a223 100644 --- a/crates/sync/src/block_writer.rs +++ b/crates/sync/src/block_writer.rs @@ -14,8 +14,8 @@ use crate::cursor::write_cursor; use crate::{SyncError, SyncResult}; use indexer_analytics::{AnalyticsHandle, RawTxRow}; -use indexer_db::{PgPool, blocks, logs, transactions}; -use indexer_domain::{Block, Log, Transaction}; +use indexer_db::{PgPool, blocks, logs, token_transfers, transactions}; +use indexer_domain::{Block, Log, TokenTransfer, Transaction}; /// Page size for batch inserts. Postgres protocol caps bind params at /// ~65k per query; the widest table (transactions, 15 cols) tops out at @@ -32,6 +32,10 @@ pub struct BlockBundle { pub txs: Vec, /// All logs emitted during the block's txs, ordered by `log_index`. pub logs: Vec, + /// Decoded ERC-20 / ERC-721 transfers from this block's logs. Sync + /// layer fills via `token_decode::decode_transfer`; empty for blocks + /// with no qualifying events. + pub token_transfers: Vec, } /// Write a block bundle + advance the chain-wide cursor in one transaction. @@ -49,7 +53,7 @@ pub async fn write_block( let mut tx = pool.begin().await.map_err(SyncError::from)?; // Order matters: blocks first (FK target), then transactions (FK target - // for logs), then logs. + // for logs), then logs, then derived token_transfers. blocks::insert(&mut *tx, &b.block).await?; for t in &b.txs { transactions::insert(&mut *tx, t).await?; @@ -57,6 +61,9 @@ pub async fn write_block( for l in &b.logs { logs::insert(&mut *tx, l).await?; } + for tt in &b.token_transfers { + token_transfers::insert(&mut *tx, tt).await?; + } // Cursor advance shares the transaction so it lands or rolls back with // the data. `now_ts` = the block's chain timestamp so cursor staleness @@ -146,6 +153,11 @@ pub async fn batch_write_blocks( all_txs.sort_by_key(|t| (t.block_height, t.tx_index)); let mut all_logs: Vec = bundles.iter().flat_map(|b| b.logs.clone()).collect(); all_logs.sort_by_key(|l| (l.block_height, l.log_index)); + let mut all_transfers: Vec = bundles + .iter() + .flat_map(|b| b.token_transfers.clone()) + .collect(); + all_transfers.sort_by_key(|t| (t.block_height, t.log_index)); let mut tx = pool.begin().await.map_err(SyncError::from)?; for chunk in all_blocks.chunks(BATCH_INSERT_CHUNK) { @@ -157,6 +169,9 @@ pub async fn batch_write_blocks( for chunk in all_logs.chunks(BATCH_INSERT_CHUNK) { logs::insert_batch(&mut *tx, chunk).await?; } + for chunk in all_transfers.chunks(BATCH_INSERT_CHUNK) { + token_transfers::insert_batch(&mut *tx, chunk).await?; + } write_cursor(&mut *tx, max_height, cursor_ts).await?; tx.commit().await.map_err(SyncError::from)?; diff --git a/crates/sync/src/lib.rs b/crates/sync/src/lib.rs index f8a635f..a1b35bc 100644 --- a/crates/sync/src/lib.rs +++ b/crates/sync/src/lib.rs @@ -28,6 +28,7 @@ pub mod cursor; pub mod reorg; pub mod single_flight; pub mod tail; +pub mod token_decode; mod convert; diff --git a/crates/sync/src/token_decode.rs b/crates/sync/src/token_decode.rs new file mode 100644 index 0000000..361aa0d --- /dev/null +++ b/crates/sync/src/token_decode.rs @@ -0,0 +1,155 @@ +//! ERC-20 / ERC-721 Transfer event decoder. +//! +//! Sentrix indexer-handlers crate is still a placeholder (Phase 0), so the +//! `token_transfers` table sat empty even though raw logs were captured. +//! Until the declarative-handler framework lands, decode the well-known +//! Transfer signatures inline here so scan UIs can resolve token balances. +//! +//! ERC-1155 has a different topic0 (`TransferSingle` / `TransferBatch`) and +//! a richer encoding; out of scope for this pass. + +use alloy_primitives::U256; +use indexer_domain::{Log, TokenStandard, TokenTransfer, Wei}; + +/// `keccak256("Transfer(address,address,uint256)")` — same selector for +/// ERC-20 amount transfers and ERC-721 token-id transfers. The two are +/// distinguished by topic count + data shape. +const TRANSFER_TOPIC0: &str = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"; + +/// Try to decode a log as an ERC-20 or ERC-721 Transfer event. Returns +/// `None` for anything else — caller drops it. +/// +/// Decoding rules: +/// - topic0 must equal the canonical Transfer selector. +/// - topic1, topic2 are 32-byte-padded from/to addresses (indexed). +/// - ERC-20: topic3 absent, data = 32-byte amount. +/// - ERC-721: topic3 present (indexed token_id), data empty, amount = 1. +pub fn decode_transfer(log: &Log) -> Option { + if log.topic0.as_deref() != Some(TRANSFER_TOPIC0) { + return None; + } + let from_topic = log.topic1.as_deref()?; + let to_topic = log.topic2.as_deref()?; + let from_addr = topic_to_address(from_topic)?; + let to_addr = topic_to_address(to_topic)?; + + let (standard, token_id, amount) = match log.topic3.as_deref() { + Some(id_topic) => { + // ERC-721: token_id in topic3, data empty (or padding only). + let token_id = topic_to_u256(id_topic)?; + ( + TokenStandard::Erc721, + Some(Wei(token_id)), + Wei(U256::from(1u64)), + ) + } + None => { + // ERC-20: amount in data (must be exactly 32 bytes). + let data_str = log.data.as_deref()?; + let amount = data_to_u256(data_str)?; + (TokenStandard::Erc20, None, Wei(amount)) + } + }; + + Some(TokenTransfer { + id: None, + block_height: log.block_height, + tx_hash: log.tx_hash.clone(), + log_index: log.log_index, + contract: log.address.clone(), + standard, + from_addr, + to_addr, + token_id, + amount, + }) +} + +/// Last 20 bytes of a 32-byte topic → `0x`-prefixed lowercase address. +fn topic_to_address(topic: &str) -> Option { + let hex = topic.trim_start_matches("0x"); + if hex.len() != 64 { + return None; + } + Some(format!("0x{}", &hex[24..])) +} + +/// 32-byte topic → U256. +fn topic_to_u256(topic: &str) -> Option { + let hex = topic.trim_start_matches("0x"); + if hex.len() != 64 { + return None; + } + let bytes = hex::decode(hex).ok()?; + Some(U256::from_be_slice(&bytes)) +} + +/// 32-byte data field → U256. ERC-20 Transfer always has data length 32. +fn data_to_u256(data: &str) -> Option { + let hex = data.trim_start_matches("0x"); + if hex.len() != 64 { + return None; + } + let bytes = hex::decode(hex).ok()?; + Some(U256::from_be_slice(&bytes)) +} + +#[cfg(test)] +mod tests { + use super::*; + use indexer_domain::{BlockHeight, LogIndex}; + + fn base_log() -> Log { + Log { + block_height: BlockHeight(1), + tx_hash: "0xabc".into(), + log_index: LogIndex(0), + address: "0xcontract".into(), + topic0: Some(TRANSFER_TOPIC0.into()), + topic1: Some( + "0x000000000000000000000000aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(), + ), + topic2: Some( + "0x000000000000000000000000bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".into(), + ), + topic3: None, + data: Some("0x0000000000000000000000000000000000000000000000000000000000000064".into()), + } + } + + #[test] + fn decodes_erc20_transfer() { + let t = decode_transfer(&base_log()).expect("erc20 should decode"); + assert_eq!(t.standard, TokenStandard::Erc20); + assert_eq!(t.from_addr, "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + assert_eq!(t.to_addr, "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); + assert_eq!(t.amount.0, U256::from(100u64)); + assert!(t.token_id.is_none()); + } + + #[test] + fn decodes_erc721_transfer() { + let mut log = base_log(); + log.topic3 = + Some("0x0000000000000000000000000000000000000000000000000000000000000007".into()); + log.data = Some("0x".into()); + let t = decode_transfer(&log).expect("erc721 should decode"); + assert_eq!(t.standard, TokenStandard::Erc721); + assert_eq!(t.token_id.unwrap().0, U256::from(7u64)); + assert_eq!(t.amount.0, U256::from(1u64)); + } + + #[test] + fn skips_non_transfer() { + let mut log = base_log(); + log.topic0 = Some("0xdeadbeef".into()); + assert!(decode_transfer(&log).is_none()); + } + + #[test] + fn skips_malformed_topic() { + let mut log = base_log(); + log.topic1 = Some("0xshort".into()); + assert!(decode_transfer(&log).is_none()); + } +}