Skip to content

Commit 334940b

Browse files
committed
node: Prompt before reusing change-shard backup
1 parent a15a4fc commit 334940b

1 file changed

Lines changed: 100 additions & 35 deletions

File tree

node/src/manager/commands/chain.rs

Lines changed: 100 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use graph_store_postgres::find_chain;
3333
use graph_store_postgres::update_chain_name;
3434
use graph_store_postgres::{ConnectionPool, command_support::catalog::block_store};
3535

36+
use crate::manager::prompt::prompt_for_confirmation;
3637
use crate::network_setup::Networks;
3738

3839
pub async fn list(primary: ConnectionPool, store: BlockStore) -> Result<(), Error> {
@@ -238,12 +239,17 @@ pub async fn update_chain_genesis(
238239
}
239240

240241
struct ChainSwapOutcome {
241-
latest_backup_name: String,
242-
previous_backup_final_name: Option<String>,
243242
reused_previous_backup: bool,
244243
allocated_chain: bool,
245244
}
246245

246+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
247+
enum ExistingBackupDisposition<'a> {
248+
ProceedFresh,
249+
PromptReuse,
250+
AbortWrongShard(&'a str),
251+
}
252+
247253
fn backup_name(chain: &str, base: &str) -> String {
248254
format!("{chain}-{base}")
249255
}
@@ -252,7 +258,18 @@ fn suffixed_backup_name(backup_name: &str, suffix: usize) -> String {
252258
format!("{backup_name}-{suffix}")
253259
}
254260

255-
async fn next_backup_name(
261+
fn existing_backup_disposition<'a>(
262+
existing_backup_shard: Option<&'a str>,
263+
target_shard: &str,
264+
) -> ExistingBackupDisposition<'a> {
265+
match existing_backup_shard {
266+
None => ExistingBackupDisposition::ProceedFresh,
267+
Some(shard) if shard == target_shard => ExistingBackupDisposition::PromptReuse,
268+
Some(shard) => ExistingBackupDisposition::AbortWrongShard(shard),
269+
}
270+
}
271+
272+
async fn next_temporary_backup_name(
256273
conn: &mut AsyncPgConnection,
257274
chain: &str,
258275
base: &str,
@@ -289,9 +306,7 @@ pub async fn change_block_cache_shard(
289306
.ok_or_else(|| anyhow!("unknown chain: {}", chain_name))?;
290307
let old_shard = chain.shard;
291308
let canonical_backup_name = format!("{chain_name}-old");
292-
293309
let existing_backup = find_chain(&mut conn, &canonical_backup_name).await?;
294-
let existing_backup_store = store.chain_store(&canonical_backup_name).await;
295310

296311
println!("Current shard: {}", old_shard);
297312

@@ -301,10 +316,47 @@ pub async fn change_block_cache_shard(
301316
.ok_or_else(|| anyhow!("unknown chain: {}", &chain_name))?;
302317
let ident = chain_store.chain_identifier().await?;
303318
let target_shard = Shard::new(shard.clone())?;
304-
let reuse_existing_backup = existing_backup
305-
.as_ref()
306-
.map(|backup| backup.shard.as_str() == target_shard.as_str())
307-
.unwrap_or(false);
319+
let existing_backup_disposition = existing_backup_disposition(
320+
existing_backup.as_ref().map(|backup| backup.shard.as_str()),
321+
target_shard.as_str(),
322+
);
323+
let reuse_existing_backup = matches!(
324+
existing_backup_disposition,
325+
ExistingBackupDisposition::PromptReuse
326+
);
327+
328+
match existing_backup_disposition {
329+
ExistingBackupDisposition::ProceedFresh => {}
330+
ExistingBackupDisposition::AbortWrongShard(backup_shard) => {
331+
bail!(
332+
"`{}` already exists on shard `{}`. Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`",
333+
canonical_backup_name,
334+
backup_shard,
335+
canonical_backup_name,
336+
chain_name,
337+
target_shard,
338+
);
339+
}
340+
ExistingBackupDisposition::PromptReuse => {
341+
let prompt = format!(
342+
"`{}` already exists on shard `{}` and will be reused as the active `{}` chain.\nProceed?",
343+
canonical_backup_name, target_shard, chain_name
344+
);
345+
if !prompt_for_confirmation(&prompt)? {
346+
println!(
347+
"Aborting. Remove `{}` with `graphman chain remove {}` if you want to create a fresh cache on shard `{}`.",
348+
canonical_backup_name, canonical_backup_name, target_shard
349+
);
350+
return Ok(());
351+
}
352+
}
353+
}
354+
355+
let existing_backup_store = if reuse_existing_backup {
356+
store.chain_store(&canonical_backup_name).await
357+
} else {
358+
None
359+
};
308360

309361
let allocated_chain = if reuse_existing_backup {
310362
None
@@ -322,24 +374,20 @@ pub async fn change_block_cache_shard(
322374
)
323375
.execute(conn).await?;
324376

325-
let previous_backup_final_name = if let Some(backup) = existing_backup.as_ref() {
326-
let temp_name = next_backup_name(conn, &chain_name, "old").await?;
377+
let temp_backup_name = if let Some(backup) = existing_backup.as_ref() {
378+
let temp_name = next_temporary_backup_name(conn, &chain_name, "old").await?;
327379
update_chain_name(conn, &backup.name, &temp_name).await?;
328-
329-
if reuse_existing_backup {
330-
update_chain_name(conn, &temp_name, &chain_name).await?;
331-
Some(chain_name.clone())
332-
} else {
333-
Some(temp_name)
334-
}
380+
Some(temp_name)
335381
} else {
336382
None
337383
};
338384

339-
let latest_backup_name = next_backup_name(conn, &chain_name, "old").await?;
340-
update_chain_name(conn, &chain_name, &latest_backup_name).await?;
385+
update_chain_name(conn, &chain_name, &canonical_backup_name).await?;
341386

342-
if !reuse_existing_backup {
387+
if reuse_existing_backup {
388+
debug_assert!(temp_backup_name.is_some());
389+
update_chain_name(conn, temp_backup_name.as_ref().unwrap(), &chain_name).await?;
390+
} else {
343391
add_chain(conn, &chain_name, &target_shard, ident.clone()).await?;
344392
}
345393

@@ -349,43 +397,34 @@ pub async fn change_block_cache_shard(
349397
.execute(conn).await?;
350398

351399
Ok(ChainSwapOutcome {
352-
latest_backup_name,
353-
previous_backup_final_name,
354400
reused_previous_backup: reuse_existing_backup,
355401
allocated_chain: allocated_chain.is_some(),
356402
})
357403
}.scope_boxed()
358404
}).await?;
359405

360406
let ChainSwapOutcome {
361-
latest_backup_name,
362-
previous_backup_final_name,
363407
reused_previous_backup,
364408
allocated_chain,
365409
} = outcome;
366410

367-
chain_store.update_name(&latest_backup_name).await?;
411+
chain_store.update_name(&canonical_backup_name).await?;
368412

369-
if let (Some(backup_store), Some(final_name)) = (
370-
existing_backup_store.as_ref(),
371-
previous_backup_final_name.as_ref(),
372-
) {
373-
backup_store.update_name(final_name).await?;
413+
if reused_previous_backup && let Some(backup_store) = existing_backup_store.as_ref() {
414+
backup_store.update_name(&chain_name).await?;
374415
}
375416

376417
println!(
377418
"Changed block cache shard for {} from {} to {}",
378419
chain_name, old_shard, shard
379420
);
380-
println!("Latest backup recorded as `{}`", latest_backup_name);
421+
println!("Latest backup recorded as `{}`", canonical_backup_name);
381422

382423
if reused_previous_backup {
383424
println!(
384425
"Reused existing backup `{}` as the active `{}` chain",
385426
canonical_backup_name, chain_name
386427
);
387-
} else if let Some(ref preserved) = previous_backup_final_name {
388-
println!("Preserved earlier backup as `{}`", preserved);
389428
}
390429

391430
if allocated_chain {
@@ -431,7 +470,9 @@ pub async fn ingest(
431470

432471
#[cfg(test)]
433472
mod tests {
434-
use super::{backup_name, suffixed_backup_name};
473+
use super::{
474+
ExistingBackupDisposition, backup_name, existing_backup_disposition, suffixed_backup_name,
475+
};
435476

436477
#[test]
437478
fn backup_name_uses_plain_name_first() {
@@ -443,4 +484,28 @@ mod tests {
443484
assert_eq!(suffixed_backup_name("mainnet-old", 1), "mainnet-old-1");
444485
assert_eq!(suffixed_backup_name("mainnet-old", 42), "mainnet-old-42");
445486
}
487+
488+
#[test]
489+
fn existing_backup_disposition_proceeds_when_backup_missing() {
490+
assert_eq!(
491+
existing_backup_disposition(None, "shard_b"),
492+
ExistingBackupDisposition::ProceedFresh
493+
);
494+
}
495+
496+
#[test]
497+
fn existing_backup_disposition_prompts_when_backup_matches_target_shard() {
498+
assert_eq!(
499+
existing_backup_disposition(Some("shard_b"), "shard_b"),
500+
ExistingBackupDisposition::PromptReuse
501+
);
502+
}
503+
504+
#[test]
505+
fn existing_backup_disposition_aborts_when_backup_is_on_another_shard() {
506+
assert_eq!(
507+
existing_backup_disposition(Some("shard_a"), "shard_b"),
508+
ExistingBackupDisposition::AbortWrongShard("shard_a")
509+
);
510+
}
446511
}

0 commit comments

Comments
 (0)