Skip to content

Commit c5439e8

Browse files
committed
node: Fix duplicate key constraint when reverting chain shard changes
- Implement robust backup naming system to avoid conflicts - Add support for reusing existing backups when appropriate - Preserve previous backups with unique names - Add comprehensive logging for backup operations Fixes #6196
1 parent 78e94cd commit c5439e8

1 file changed

Lines changed: 131 additions & 16 deletions

File tree

node/src/manager/commands/chain.rs

Lines changed: 131 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use graph::{
2222
use graph_chain_ethereum::EthereumAdapter;
2323
use graph_chain_ethereum::EthereumAdapterTrait as _;
2424
use graph_chain_ethereum::chain::BlockFinality;
25+
use graph_store_postgres::AsyncPgConnection;
2526
use graph_store_postgres::BlockStore;
2627
use graph_store_postgres::ChainStore;
2728
use graph_store_postgres::PoolCoordinator;
@@ -236,6 +237,43 @@ pub async fn update_chain_genesis(
236237
Ok(())
237238
}
238239

240+
struct ChainSwapOutcome {
241+
latest_backup_name: String,
242+
previous_backup_final_name: Option<String>,
243+
reused_previous_backup: bool,
244+
allocated_chain: bool,
245+
}
246+
247+
fn backup_name(chain: &str, base: &str) -> String {
248+
format!("{chain}-{base}")
249+
}
250+
251+
fn suffixed_backup_name(backup_name: &str, suffix: usize) -> String {
252+
format!("{backup_name}-{suffix}")
253+
}
254+
255+
async fn next_backup_name(
256+
conn: &mut AsyncPgConnection,
257+
chain: &str,
258+
base: &str,
259+
) -> Result<String, StoreError> {
260+
let backup_name = backup_name(chain, base);
261+
if find_chain(conn, &backup_name).await?.is_none() {
262+
return Ok(backup_name);
263+
}
264+
265+
let mut suffix = 1usize;
266+
267+
loop {
268+
let candidate = suffixed_backup_name(&backup_name, suffix);
269+
if find_chain(conn, &candidate).await?.is_none() {
270+
return Ok(candidate);
271+
}
272+
273+
suffix += 1;
274+
}
275+
}
276+
239277
pub async fn change_block_cache_shard(
240278
primary_store: ConnectionPool,
241279
store: BlockStore,
@@ -250,51 +288,112 @@ pub async fn change_block_cache_shard(
250288
.await?
251289
.ok_or_else(|| anyhow!("unknown chain: {}", chain_name))?;
252290
let old_shard = chain.shard;
291+
let canonical_backup_name = format!("{chain_name}-old");
292+
293+
let existing_backup = find_chain(&mut conn, &canonical_backup_name).await?;
294+
let existing_backup_store = store.chain_store(&canonical_backup_name).await;
253295

254296
println!("Current shard: {}", old_shard);
255297

256298
let chain_store = store
257299
.chain_store(&chain_name)
258300
.await
259301
.ok_or_else(|| anyhow!("unknown chain: {}", &chain_name))?;
260-
let new_name = format!("{}-old", &chain_name);
261302
let ident = chain_store.chain_identifier().await?;
303+
let target_shard = Shard::new(shard.clone())?;
304+
let reuse_existing_backup = existing_backup
305+
.as_ref()
306+
.map(|backup| backup.shard.as_str() == target_shard.as_str())
307+
.unwrap_or(false);
308+
309+
let allocated_chain = if reuse_existing_backup {
310+
None
311+
} else {
312+
let chain =
313+
BlockStore::allocate_chain(&mut conn, &chain_name, &target_shard, &ident).await?;
314+
store.add_chain_store(&chain, true).await?;
315+
Some(chain)
316+
};
262317

263-
conn.transaction::<(), StoreError, _>(|conn| {
318+
let outcome = conn.transaction::<ChainSwapOutcome, StoreError, _>(|conn| {
264319
async {
265-
let shard = Shard::new(shard.to_string())?;
266-
267-
let chain = BlockStore::allocate_chain(conn, &chain_name, &shard, &ident).await?;
268-
269-
store.add_chain_store(&chain, true).await?;
270-
271-
// Drop the foreign key constraint on deployment_schemas
272320
sql_query(
273321
"alter table deployment_schemas drop constraint deployment_schemas_network_fkey;",
274322
)
275323
.execute(conn).await?;
276324

277-
// Update the current chain name to chain-old
278-
update_chain_name(conn, &chain_name, &new_name).await?;
325+
let previous_backup_final_name = if let Some(backup) = existing_backup.as_ref() {
326+
let temp_name = next_backup_name(conn, &chain_name, "old").await?;
327+
update_chain_name(conn, &backup.name, &temp_name).await?;
328+
329+
if reuse_existing_backup {
330+
update_chain_name(conn, &temp_name, &chain_name).await?;
331+
Some(chain_name.clone())
332+
} else {
333+
Some(temp_name)
334+
}
335+
} else {
336+
None
337+
};
279338

280-
// Create a new chain with the name in the destination shard
281-
let _ = add_chain(conn, &chain_name, &shard, ident).await?;
339+
let latest_backup_name = next_backup_name(conn, &chain_name, "old").await?;
340+
update_chain_name(conn, &chain_name, &latest_backup_name).await?;
341+
342+
if !reuse_existing_backup {
343+
add_chain(conn, &chain_name, &target_shard, ident.clone()).await?;
344+
}
282345

283-
// Re-add the foreign key constraint
284346
sql_query(
285347
"alter table deployment_schemas add constraint deployment_schemas_network_fkey foreign key (network) references chains(name);",
286348
)
287349
.execute(conn).await?;
288-
Ok(())
350+
351+
Ok(ChainSwapOutcome {
352+
latest_backup_name,
353+
previous_backup_final_name,
354+
reused_previous_backup: reuse_existing_backup,
355+
allocated_chain: allocated_chain.is_some(),
356+
})
289357
}.scope_boxed()
290358
}).await?;
291359

292-
chain_store.update_name(&new_name).await?;
360+
let ChainSwapOutcome {
361+
latest_backup_name,
362+
previous_backup_final_name,
363+
reused_previous_backup,
364+
allocated_chain,
365+
} = outcome;
366+
367+
chain_store.update_name(&latest_backup_name).await?;
368+
369+
if let (Some(backup_store), Some(final_name)) = (
370+
existing_backup_store.as_ref(),
371+
previous_backup_final_name.as_ref(),
372+
) {
373+
backup_store.update_name(final_name).await?;
374+
}
293375

294376
println!(
295377
"Changed block cache shard for {} from {} to {}",
296378
chain_name, old_shard, shard
297379
);
380+
println!("Latest backup recorded as `{}`", latest_backup_name);
381+
382+
if reused_previous_backup {
383+
println!(
384+
"Reused existing backup `{}` as the active `{}` chain",
385+
canonical_backup_name, chain_name
386+
);
387+
} else if let Some(ref preserved) = previous_backup_final_name {
388+
println!("Preserved earlier backup as `{}`", preserved);
389+
}
390+
391+
if allocated_chain {
392+
println!(
393+
"Allocated new chain state for `{}` on shard {}",
394+
chain_name, shard
395+
);
396+
}
298397

299398
Ok(())
300399
}
@@ -329,3 +428,19 @@ pub async fn ingest(
329428
}
330429
Ok(())
331430
}
431+
432+
#[cfg(test)]
433+
mod tests {
434+
use super::{backup_name, suffixed_backup_name};
435+
436+
#[test]
437+
fn backup_name_uses_plain_name_first() {
438+
assert_eq!(backup_name("mainnet", "old"), "mainnet-old");
439+
}
440+
441+
#[test]
442+
fn suffixed_backup_name_adds_numeric_suffixes() {
443+
assert_eq!(suffixed_backup_name("mainnet-old", 1), "mainnet-old-1");
444+
assert_eq!(suffixed_backup_name("mainnet-old", 42), "mainnet-old-42");
445+
}
446+
}

0 commit comments

Comments
 (0)