Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions crates/db/src/token_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,38 @@ where
Ok(())
}

/// Batch insert. Retry-safe via `(tx_hash, log_index)` ON CONFLICT DO NOTHING
/// — matches the dedup contract on the raw `logs` table.
Comment on lines +33 to +34
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Doc comment claims ON CONFLICT handling that doesn't exist.

The doc comment states "Retry-safe via (tx_hash, log_index) ON CONFLICT DO NOTHING" but line 57 explicitly notes "No ON CONFLICT — table has no unique constraint." This inconsistency could mislead future maintainers into assuming duplicate protection exists when it doesn't.

📝 Suggested fix
-/// Batch insert. Retry-safe via `(tx_hash, log_index)` ON CONFLICT DO NOTHING
-/// — matches the dedup contract on the raw `logs` table.
+/// Batch insert. Note: table lacks a unique constraint on `(tx_hash, log_index)`,
+/// so retry safety relies on the writer's atomic cursor advance preventing
+/// re-processing. Adding the unique index is a follow-up migration.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Batch insert. Retry-safe via `(tx_hash, log_index)` ON CONFLICT DO NOTHING
/// — matches the dedup contract on the raw `logs` table.
/// Batch insert. Note: table lacks a unique constraint on `(tx_hash, log_index)`,
/// so retry safety relies on the writer's atomic cursor advance preventing
/// re-processing. Adding the unique index is a follow-up migration.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/db/src/token_transfers.rs` around lines 33 - 34, The doc comment above
the batch insert claiming "Retry-safe via `(tx_hash, log_index)` ON CONFLICT DO
NOTHING" is incorrect; either implement the promised dedup behavior by adding a
unique constraint on (tx_hash, log_index) and using ON CONFLICT DO NOTHING in
the batch insert SQL, or update the doc comment to remove the ON CONFLICT claim
and clearly state that the table has no unique constraint (matching the note at
line 57). Locate the comment and the batch-insert logic in token_transfers.rs
(search for the doc string mentioning tx_hash, log_index and the batch insert
function) and make the change: if you choose the DB change, add the unique
constraint and adjust SQL to use ON CONFLICT; otherwise, revise the comment to
accurately reflect no conflict-handling.

pub async fn insert_batch<'e, E>(executor: E, transfers: &[TokenTransfer]) -> DbResult<()>
where
E: sqlx::PgExecutor<'e>,
{
if transfers.is_empty() {
return Ok(());
}
let mut qb = sqlx::QueryBuilder::new(
"INSERT INTO token_transfers (block_height, tx_hash, log_index, contract, standard, \
from_addr, to_addr, token_id, amount) ",
);
qb.push_values(transfers.iter(), |mut row, t| {
row.push_bind(t.block_height)
.push_bind(&t.tx_hash)
.push_bind(t.log_index)
.push_bind(&t.contract)
.push_bind(t.standard.as_str())
.push_bind(&t.from_addr)
.push_bind(&t.to_addr)
.push_bind(t.token_id)
.push_bind(t.amount);
});
// No ON CONFLICT — table has no unique constraint on (tx_hash, log_index).
// The writer's atomic cursor advance prevents re-processing the same
// block in the steady state; reorg recovery deletes downstream rows
// before re-insert. Adding a unique index here is a follow-up migration.
qb.build().execute(executor).await?;
Ok(())
}

/// Paginated token-transfer history for an address — transfers where the
/// address is sender OR receiver, newest-first by block height. Optional
/// `standard` narrows to a specific token kind ("erc20" / "erc721" /
Expand Down
44 changes: 44 additions & 0 deletions crates/sync/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,38 @@ async fn fetch_one(
.map(to_domain_log)
.collect::<Result<Vec<_>, _>>()
.map_err(|e| SyncError::Invalid(e.to_string()))?;

// 2026-05-20: Sentrix's native /chain/blocks/<n> and eth_getLogs can
// disagree — a tx whose effects reverted gets stripped from the block
// tx vec but its log envelopes still come back from eth_getLogs. The
// `logs.tx_hash` FK then blows the whole batch. Drop logs whose
// tx_hash isn't backed by a tx row in this same bundle; they'd be
// orphaned anyway.
use std::collections::HashSet;
let tx_hash_set: HashSet<_> = dom_txs.iter().map(|t| t.hash.clone()).collect();
let logs_total = dom_logs.len();
let dom_logs: Vec<_> = dom_logs
.into_iter()
.filter(|l| tx_hash_set.contains(&l.tx_hash))
.collect();
if dom_logs.len() < logs_total {
tracing::debug!(
block = h.0,
dropped = logs_total - dom_logs.len(),
"backfill: dropped orphan logs (tx_hash not in block.txs)"
);
}

let dom_token_transfers: Vec<_> = dom_logs
.iter()
.filter_map(crate::token_decode::decode_transfer)
.collect();

Ok(Some(BlockBundle {
block: dom_block,
txs: dom_txs,
logs: dom_logs,
token_transfers: dom_token_transfers,
}))
}

Expand Down Expand Up @@ -273,12 +301,28 @@ pub async fn ingest_one(
.collect::<Result<Vec<_>, _>>()
.map_err(|e| SyncError::Invalid(e.to_string()))?;

// Same orphan-log filter as fetch_one — keep the FK invariant on the
// tail path so the live chain doesn't stall the indexer the way the
// backfill batch did.
use std::collections::HashSet;
let tx_hash_set: HashSet<_> = dom_txs.iter().map(|t| t.hash.clone()).collect();
let dom_logs: Vec<_> = dom_logs
.into_iter()
.filter(|l| tx_hash_set.contains(&l.tx_hash))
.collect();

let dom_token_transfers: Vec<_> = dom_logs
.iter()
.filter_map(crate::token_decode::decode_transfer)
.collect();

write_block(
pool,
BlockBundle {
block: dom_block,
txs: dom_txs,
logs: dom_logs,
token_transfers: dom_token_transfers,
},
analytics,
)
Expand Down
21 changes: 18 additions & 3 deletions crates/sync/src/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
use crate::cursor::write_cursor;
use crate::{SyncError, SyncResult};
use indexer_analytics::{AnalyticsHandle, RawTxRow};
use indexer_db::{PgPool, blocks, logs, transactions};
use indexer_domain::{Block, Log, Transaction};
use indexer_db::{PgPool, blocks, logs, token_transfers, transactions};
use indexer_domain::{Block, Log, TokenTransfer, Transaction};

/// Page size for batch inserts. Postgres protocol caps bind params at
/// ~65k per query; the widest table (transactions, 15 cols) tops out at
Expand All @@ -32,6 +32,10 @@ pub struct BlockBundle {
pub txs: Vec<Transaction>,
/// All logs emitted during the block's txs, ordered by `log_index`.
pub logs: Vec<Log>,
/// Decoded ERC-20 / ERC-721 transfers from this block's logs. Sync
/// layer fills via `token_decode::decode_transfer`; empty for blocks
/// with no qualifying events.
pub token_transfers: Vec<TokenTransfer>,
}

/// Write a block bundle + advance the chain-wide cursor in one transaction.
Expand All @@ -49,14 +53,17 @@ pub async fn write_block(
let mut tx = pool.begin().await.map_err(SyncError::from)?;

// Order matters: blocks first (FK target), then transactions (FK target
// for logs), then logs.
// for logs), then logs, then derived token_transfers.
blocks::insert(&mut *tx, &b.block).await?;
for t in &b.txs {
transactions::insert(&mut *tx, t).await?;
}
for l in &b.logs {
logs::insert(&mut *tx, l).await?;
}
for tt in &b.token_transfers {
token_transfers::insert(&mut *tx, tt).await?;
}

// Cursor advance shares the transaction so it lands or rolls back with
// the data. `now_ts` = the block's chain timestamp so cursor staleness
Expand Down Expand Up @@ -146,6 +153,11 @@ pub async fn batch_write_blocks(
all_txs.sort_by_key(|t| (t.block_height, t.tx_index));
let mut all_logs: Vec<Log> = bundles.iter().flat_map(|b| b.logs.clone()).collect();
all_logs.sort_by_key(|l| (l.block_height, l.log_index));
let mut all_transfers: Vec<TokenTransfer> = bundles
.iter()
.flat_map(|b| b.token_transfers.clone())
.collect();
all_transfers.sort_by_key(|t| (t.block_height, t.log_index));

let mut tx = pool.begin().await.map_err(SyncError::from)?;
for chunk in all_blocks.chunks(BATCH_INSERT_CHUNK) {
Expand All @@ -157,6 +169,9 @@ pub async fn batch_write_blocks(
for chunk in all_logs.chunks(BATCH_INSERT_CHUNK) {
logs::insert_batch(&mut *tx, chunk).await?;
}
for chunk in all_transfers.chunks(BATCH_INSERT_CHUNK) {
token_transfers::insert_batch(&mut *tx, chunk).await?;
}
write_cursor(&mut *tx, max_height, cursor_ts).await?;
tx.commit().await.map_err(SyncError::from)?;

Expand Down
1 change: 1 addition & 0 deletions crates/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod cursor;
pub mod reorg;
pub mod single_flight;
pub mod tail;
pub mod token_decode;

mod convert;

Expand Down
155 changes: 155 additions & 0 deletions crates/sync/src/token_decode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
//! ERC-20 / ERC-721 Transfer event decoder.
//!
//! Sentrix indexer-handlers crate is still a placeholder (Phase 0), so the
//! `token_transfers` table sat empty even though raw logs were captured.
//! Until the declarative-handler framework lands, decode the well-known
//! Transfer signatures inline here so scan UIs can resolve token balances.
//!
//! ERC-1155 has a different topic0 (`TransferSingle` / `TransferBatch`) and
//! a richer encoding; out of scope for this pass.

use alloy_primitives::U256;
use indexer_domain::{Log, TokenStandard, TokenTransfer, Wei};

/// `keccak256("Transfer(address,address,uint256)")` — same selector for
/// ERC-20 amount transfers and ERC-721 token-id transfers. The two are
/// distinguished by topic count + data shape.
const TRANSFER_TOPIC0: &str = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef";

/// Try to decode a log as an ERC-20 or ERC-721 Transfer event. Returns
/// `None` for anything else — caller drops it.
///
/// Decoding rules:
/// - topic0 must equal the canonical Transfer selector.
/// - topic1, topic2 are 32-byte-padded from/to addresses (indexed).
/// - ERC-20: topic3 absent, data = 32-byte amount.
/// - ERC-721: topic3 present (indexed token_id), data empty, amount = 1.
pub fn decode_transfer(log: &Log) -> Option<TokenTransfer> {
if log.topic0.as_deref() != Some(TRANSFER_TOPIC0) {
return None;
}
let from_topic = log.topic1.as_deref()?;
let to_topic = log.topic2.as_deref()?;
let from_addr = topic_to_address(from_topic)?;
let to_addr = topic_to_address(to_topic)?;

let (standard, token_id, amount) = match log.topic3.as_deref() {
Some(id_topic) => {
// ERC-721: token_id in topic3, data empty (or padding only).
let token_id = topic_to_u256(id_topic)?;
(
TokenStandard::Erc721,
Some(Wei(token_id)),
Wei(U256::from(1u64)),
)
}
None => {
// ERC-20: amount in data (must be exactly 32 bytes).
let data_str = log.data.as_deref()?;
let amount = data_to_u256(data_str)?;
(TokenStandard::Erc20, None, Wei(amount))
}
};

Some(TokenTransfer {
id: None,
block_height: log.block_height,
tx_hash: log.tx_hash.clone(),
log_index: log.log_index,
contract: log.address.clone(),
standard,
from_addr,
to_addr,
token_id,
amount,
})
}

/// Last 20 bytes of a 32-byte topic → `0x`-prefixed lowercase address.
fn topic_to_address(topic: &str) -> Option<String> {
let hex = topic.trim_start_matches("0x");
if hex.len() != 64 {
return None;
}
Some(format!("0x{}", &hex[24..]))
}

/// 32-byte topic → U256.
fn topic_to_u256(topic: &str) -> Option<U256> {
let hex = topic.trim_start_matches("0x");
if hex.len() != 64 {
return None;
}
let bytes = hex::decode(hex).ok()?;
Some(U256::from_be_slice(&bytes))
}

/// 32-byte data field → U256. ERC-20 Transfer always has data length 32.
fn data_to_u256(data: &str) -> Option<U256> {
let hex = data.trim_start_matches("0x");
if hex.len() != 64 {
return None;
}
let bytes = hex::decode(hex).ok()?;
Some(U256::from_be_slice(&bytes))
}

#[cfg(test)]
mod tests {
use super::*;
use indexer_domain::{BlockHeight, LogIndex};

fn base_log() -> Log {
Log {
block_height: BlockHeight(1),
tx_hash: "0xabc".into(),
log_index: LogIndex(0),
address: "0xcontract".into(),
topic0: Some(TRANSFER_TOPIC0.into()),
topic1: Some(
"0x000000000000000000000000aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
),
topic2: Some(
"0x000000000000000000000000bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".into(),
),
topic3: None,
data: Some("0x0000000000000000000000000000000000000000000000000000000000000064".into()),
}
}

#[test]
fn decodes_erc20_transfer() {
let t = decode_transfer(&base_log()).expect("erc20 should decode");
assert_eq!(t.standard, TokenStandard::Erc20);
assert_eq!(t.from_addr, "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
assert_eq!(t.to_addr, "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
assert_eq!(t.amount.0, U256::from(100u64));
assert!(t.token_id.is_none());
}

#[test]
fn decodes_erc721_transfer() {
let mut log = base_log();
log.topic3 =
Some("0x0000000000000000000000000000000000000000000000000000000000000007".into());
log.data = Some("0x".into());
let t = decode_transfer(&log).expect("erc721 should decode");
assert_eq!(t.standard, TokenStandard::Erc721);
assert_eq!(t.token_id.unwrap().0, U256::from(7u64));
assert_eq!(t.amount.0, U256::from(1u64));
}

#[test]
fn skips_non_transfer() {
let mut log = base_log();
log.topic0 = Some("0xdeadbeef".into());
assert!(decode_transfer(&log).is_none());
}

#[test]
fn skips_malformed_topic() {
let mut log = base_log();
log.topic1 = Some("0xshort".into());
assert!(decode_transfer(&log).is_none());
}
}
Loading