Skip to content

Commit a4cd243

Browse files
committed
node: Address change-shard review feedback
1 parent 4a72320 commit a4cd243

1 file changed

Lines changed: 40 additions & 132 deletions

File tree

node/src/manager/commands/chain.rs

Lines changed: 40 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use graph::{
2222
use graph_chain_ethereum::EthereumAdapter;
2323
use graph_chain_ethereum::EthereumAdapterTrait as _;
2424
use graph_chain_ethereum::chain::BlockFinality;
25-
use graph_store_postgres::AsyncPgConnection;
2625
use graph_store_postgres::BlockStore;
2726
use graph_store_postgres::ChainStore;
2827
use graph_store_postgres::PoolCoordinator;
@@ -238,59 +237,6 @@ pub async fn update_chain_genesis(
238237
Ok(())
239238
}
240239

241-
struct ChainSwapOutcome {
242-
reused_previous_backup: bool,
243-
allocated_chain: bool,
244-
}
245-
246-
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
247-
enum ExistingBackupDisposition<'a> {
248-
ProceedFresh,
249-
PromptReuse,
250-
AbortWrongShard(&'a str),
251-
}
252-
253-
fn backup_name(chain: &str, base: &str) -> String {
254-
format!("{chain}-{base}")
255-
}
256-
257-
fn suffixed_backup_name(backup_name: &str, suffix: usize) -> String {
258-
format!("{backup_name}-{suffix}")
259-
}
260-
261-
fn existing_backup_disposition<'a>(
262-
existing_backup_shard: Option<&'a str>,
263-
target_shard: &str,
264-
) -> ExistingBackupDisposition<'a> {
265-
match existing_backup_shard {
266-
None => ExistingBackupDisposition::ProceedFresh,
267-
Some(shard) if shard == target_shard => ExistingBackupDisposition::PromptReuse,
268-
Some(shard) => ExistingBackupDisposition::AbortWrongShard(shard),
269-
}
270-
}
271-
272-
async fn next_temporary_backup_name(
273-
conn: &mut AsyncPgConnection,
274-
chain: &str,
275-
base: &str,
276-
) -> Result<String, StoreError> {
277-
let backup_name = backup_name(chain, base);
278-
if find_chain(conn, &backup_name).await?.is_none() {
279-
return Ok(backup_name);
280-
}
281-
282-
let mut suffix = 1usize;
283-
284-
loop {
285-
let candidate = suffixed_backup_name(&backup_name, suffix);
286-
if find_chain(conn, &candidate).await?.is_none() {
287-
return Ok(candidate);
288-
}
289-
290-
suffix += 1;
291-
}
292-
}
293-
294240
pub async fn change_block_cache_shard(
295241
primary_store: ConnectionPool,
296242
store: BlockStore,
@@ -316,28 +262,33 @@ pub async fn change_block_cache_shard(
316262
.ok_or_else(|| anyhow!("unknown chain: {}", &chain_name))?;
317263
let ident = chain_store.chain_identifier().await?;
318264
let target_shard = Shard::new(shard.clone())?;
319-
let existing_backup_disposition = existing_backup_disposition(
320-
existing_backup.as_ref().map(|backup| backup.shard.as_str()),
321-
target_shard.as_str(),
322-
);
323-
let reuse_existing_backup = matches!(
324-
existing_backup_disposition,
325-
ExistingBackupDisposition::PromptReuse
326-
);
327265

328-
match existing_backup_disposition {
329-
ExistingBackupDisposition::ProceedFresh => {}
330-
ExistingBackupDisposition::AbortWrongShard(backup_shard) => {
266+
let reuse_existing_backup = match existing_backup.as_ref() {
267+
None => false,
268+
Some(backup) if backup.shard != target_shard => {
331269
bail!(
332270
"`{}` already exists on shard `{}`. Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`",
333271
canonical_backup_name,
334-
backup_shard,
272+
backup.shard,
335273
canonical_backup_name,
336274
chain_name,
337275
target_shard,
338276
);
339277
}
340-
ExistingBackupDisposition::PromptReuse => {
278+
Some(backup) => {
279+
let backup_ident = backup.network_identifier()?;
280+
if backup_ident != ident {
281+
bail!(
282+
"`{}` has a different chain identifier ({}) than `{}` ({}). Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`",
283+
canonical_backup_name,
284+
backup_ident,
285+
chain_name,
286+
ident,
287+
canonical_backup_name,
288+
chain_name,
289+
target_shard,
290+
);
291+
}
341292
let prompt = format!(
342293
"`{}` already exists on shard `{}` and will be reused as the active `{}` chain.\nProceed?",
343294
canonical_backup_name, target_shard, chain_name
@@ -349,8 +300,9 @@ pub async fn change_block_cache_shard(
349300
);
350301
return Ok(());
351302
}
303+
true
352304
}
353-
}
305+
};
354306

355307
let existing_backup_store = if reuse_existing_backup {
356308
store.chain_store(&canonical_backup_name).await
@@ -367,26 +319,32 @@ pub async fn change_block_cache_shard(
367319
Some(chain)
368320
};
369321

370-
let outcome = conn.transaction::<ChainSwapOutcome, StoreError, _>(|conn| {
322+
let temp_backup_name = format!("{chain_name}-old-temp");
323+
if reuse_existing_backup && find_chain(&mut conn, &temp_backup_name).await?.is_some() {
324+
bail!(
325+
"`{}` already exists. Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`",
326+
temp_backup_name,
327+
temp_backup_name,
328+
chain_name,
329+
target_shard,
330+
);
331+
}
332+
333+
conn.transaction::<(), StoreError, _>(|conn| {
371334
async {
372335
sql_query(
373336
"alter table deployment_schemas drop constraint deployment_schemas_network_fkey;",
374337
)
375338
.execute(conn).await?;
376339

377-
let temp_backup_name = if let Some(backup) = existing_backup.as_ref() {
378-
let temp_name = next_temporary_backup_name(conn, &chain_name, "old").await?;
379-
update_chain_name(conn, &backup.name, &temp_name).await?;
380-
Some(temp_name)
381-
} else {
382-
None
383-
};
340+
if let Some(backup) = existing_backup.as_ref() {
341+
update_chain_name(conn, &backup.name, &temp_backup_name).await?;
342+
}
384343

385344
update_chain_name(conn, &chain_name, &canonical_backup_name).await?;
386345

387346
if reuse_existing_backup {
388-
debug_assert!(temp_backup_name.is_some());
389-
update_chain_name(conn, temp_backup_name.as_ref().unwrap(), &chain_name).await?;
347+
update_chain_name(conn, &temp_backup_name, &chain_name).await?;
390348
} else {
391349
add_chain(conn, &chain_name, &target_shard, ident.clone()).await?;
392350
}
@@ -396,21 +354,13 @@ pub async fn change_block_cache_shard(
396354
)
397355
.execute(conn).await?;
398356

399-
Ok(ChainSwapOutcome {
400-
reused_previous_backup: reuse_existing_backup,
401-
allocated_chain: allocated_chain.is_some(),
402-
})
357+
Ok(())
403358
}.scope_boxed()
404359
}).await?;
405360

406-
let ChainSwapOutcome {
407-
reused_previous_backup,
408-
allocated_chain,
409-
} = outcome;
410-
411361
chain_store.update_name(&canonical_backup_name).await?;
412362

413-
if reused_previous_backup && let Some(backup_store) = existing_backup_store.as_ref() {
363+
if reuse_existing_backup && let Some(backup_store) = existing_backup_store.as_ref() {
414364
backup_store.update_name(&chain_name).await?;
415365
}
416366

@@ -420,14 +370,14 @@ pub async fn change_block_cache_shard(
420370
);
421371
println!("Latest backup recorded as `{}`", canonical_backup_name);
422372

423-
if reused_previous_backup {
373+
if reuse_existing_backup {
424374
println!(
425375
"Reused existing backup `{}` as the active `{}` chain",
426376
canonical_backup_name, chain_name
427377
);
428378
}
429379

430-
if allocated_chain {
380+
if allocated_chain.is_some() {
431381
println!(
432382
"Allocated new chain state for `{}` on shard {}",
433383
chain_name, shard
@@ -467,45 +417,3 @@ pub async fn ingest(
467417
}
468418
Ok(())
469419
}
470-
471-
#[cfg(test)]
472-
mod tests {
473-
use super::{
474-
ExistingBackupDisposition, backup_name, existing_backup_disposition, suffixed_backup_name,
475-
};
476-
477-
#[test]
478-
fn backup_name_uses_plain_name_first() {
479-
assert_eq!(backup_name("mainnet", "old"), "mainnet-old");
480-
}
481-
482-
#[test]
483-
fn suffixed_backup_name_adds_numeric_suffixes() {
484-
assert_eq!(suffixed_backup_name("mainnet-old", 1), "mainnet-old-1");
485-
assert_eq!(suffixed_backup_name("mainnet-old", 42), "mainnet-old-42");
486-
}
487-
488-
#[test]
489-
fn existing_backup_disposition_proceeds_when_backup_missing() {
490-
assert_eq!(
491-
existing_backup_disposition(None, "shard_b"),
492-
ExistingBackupDisposition::ProceedFresh
493-
);
494-
}
495-
496-
#[test]
497-
fn existing_backup_disposition_prompts_when_backup_matches_target_shard() {
498-
assert_eq!(
499-
existing_backup_disposition(Some("shard_b"), "shard_b"),
500-
ExistingBackupDisposition::PromptReuse
501-
);
502-
}
503-
504-
#[test]
505-
fn existing_backup_disposition_aborts_when_backup_is_on_another_shard() {
506-
assert_eq!(
507-
existing_backup_disposition(Some("shard_a"), "shard_b"),
508-
ExistingBackupDisposition::AbortWrongShard("shard_a")
509-
);
510-
}
511-
}

0 commit comments

Comments
 (0)