From c558fabdea109a58d863c138de940b051bea9b2c Mon Sep 17 00:00:00 2001 From: Ethan Date: Fri, 8 May 2026 18:07:21 -0500 Subject: [PATCH 1/5] *Adds ZK aggregation service options *Adds runtime flags to configure delegated ZK aggregation, including RPC access, credentials, worker limits, reward thresholds, claim strategy, and dry-run support. *Validates aggregation settings at startup and logs the active setup so operators can enable the worker more safely and predictably. *Covers the new arguments with parsing tests to reduce CLI regressions. --- Cargo.lock | 39 +++++ Cargo.toml | 1 + crates/miner-cli/src/main.rs | 162 ++++++++++++++++++++- crates/miner-service/Cargo.toml | 3 + crates/miner-service/src/lib.rs | 15 ++ crates/miner-service/src/zk_aggregation.rs | 114 +++++++++++++++ 6 files changed, 332 insertions(+), 2 deletions(-) create mode 100644 crates/miner-service/src/zk_aggregation.rs diff --git a/Cargo.lock b/Cargo.lock index 893ddf5..655ae89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -633,6 +633,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "fastrand" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -1399,6 +1405,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + [[package]] name = "litemap" version = "0.8.1" @@ -1519,6 +1531,7 @@ dependencies = [ "quantus-miner-api", "quinn", "rustls 0.21.12", + "tempfile", "tokio", ] @@ -2515,6 +2528,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.21.12" @@ -2884,6 +2910,19 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tempfile" +version = "3.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" +dependencies = [ + "fastrand", + "getrandom 0.4.2", + "once_cell", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "termcolor" version = "1.4.1" diff --git a/Cargo.toml b/Cargo.toml index fa14b13..54917e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0.132", default-features = false } thiserror = "1" tokio = { version = "1.36", features = ["full"] } +tempfile = "3.8" warp = "0.3" [profile.release] diff --git a/crates/miner-cli/src/main.rs b/crates/miner-cli/src/main.rs index 51778f0..3afb6ce 100644 --- a/crates/miner-cli/src/main.rs +++ b/crates/miner-cli/src/main.rs @@ -1,8 +1,13 @@ -use clap::{Parser, Subcommand}; +use clap::{Parser, Subcommand, ValueEnum}; use engine_cpu::{AtomicBoolCancelCheck, EngineRange, MinerEngine}; -use miner_service::{run, ServiceConfig}; +use miner_service::{ + run, + zk_aggregation::{ClaimStrategy, ZkAggregationConfig}, + ServiceConfig, +}; use primitive_types::U512; use rand::RngCore; +use std::path::PathBuf; use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::thread; @@ -12,6 +17,21 @@ use std::time::{Duration, Instant}; const DEFAULT_GPU_BATCH_SIZE: u64 = 1_000_000; const DEFAULT_CPU_BATCH_SIZE: u64 = 10_000; +#[derive(Clone, Copy, Debug, PartialEq, Eq, ValueEnum)] +enum CliClaimStrategy { + Oldest, + RewardDensity, +} + +impl From for ClaimStrategy { + fn from(value: CliClaimStrategy) -> Self { + match value { + CliClaimStrategy::Oldest => ClaimStrategy::Oldest, + CliClaimStrategy::RewardDensity => ClaimStrategy::RewardDensity, + } + } +} + #[derive(Subcommand, Debug)] enum Command { /// Run the mining service @@ -47,6 +67,63 @@ enum Command { /// Enable verbose logging #[arg(short, long, env = "MINER_VERBOSE")] verbose: bool, + + /// Enable delegated ZK L1 aggregation worker + #[arg(long = "enable-zk-aggregation", env = "MINER_ENABLE_ZK_AGGREGATION")] + enable_zk_aggregation: bool, + + /// Chain RPC endpoint used by the ZK aggregation watcher + #[arg( + long = "node-rpc", + env = "MINER_NODE_RPC", + default_value = "ws://127.0.0.1:9944" + )] + node_rpc: String, + + /// Aggregation account address + #[arg(long = "aggregation-account", env = "MINER_AGGREGATION_ACCOUNT")] + aggregation_account: Option, + + /// Aggregation signing key or keystore path + #[arg(long = "aggregation-key", env = "MINER_AGGREGATION_KEY")] + aggregation_key: Option, + + /// Directory containing generated ZK proving/verifier artifacts + #[arg(long = "zk-bins-dir", env = "MINER_ZK_BINS_DIR")] + zk_bins_dir: Option, + + /// Number of dedicated ZK aggregation workers + #[arg(long = "zk-workers", env = "MINER_ZK_WORKERS", default_value_t = 1)] + zk_workers: usize, + + /// Maximum active bonded ZK aggregation jobs + #[arg( + long = "max-active-zk-jobs", + env = "MINER_MAX_ACTIVE_ZK_JOBS", + default_value_t = 1 + )] + max_active_zk_jobs: usize, + + /// Minimum aggregation reward required before claiming a bundle + #[arg( + long = "min-aggregation-reward", + env = "MINER_MIN_AGGREGATION_REWARD", + default_value_t = 0 + )] + min_aggregation_reward: u128, + + /// Bundle claiming strategy + #[arg( + long = "claim-strategy", + env = "MINER_CLAIM_STRATEGY", + value_enum, + default_value = "oldest" + )] + claim_strategy: CliClaimStrategy, + + /// Validate opportunities without claiming or proving + #[arg(long = "dry-run-zk-aggregation", env = "MINER_DRY_RUN_ZK_AGGREGATION")] + dry_run_zk_aggregation: bool, }, /// Run a quick benchmark of the mining engines @@ -104,6 +181,16 @@ async fn main() { cpu_batch_size, metrics_port, verbose, + enable_zk_aggregation, + node_rpc, + aggregation_account, + aggregation_key, + zk_bins_dir, + zk_workers, + max_active_zk_jobs, + min_aggregation_reward, + claim_strategy, + dry_run_zk_aggregation, } => { init_logger(verbose); @@ -125,6 +212,17 @@ async fn main() { gpu_devices, gpu_batch_size, cpu_batch_size, + zk_aggregation: enable_zk_aggregation.then_some(ZkAggregationConfig { + node_rpc, + aggregation_account, + aggregation_key, + zk_bins_dir, + workers: zk_workers, + max_active_jobs: max_active_zk_jobs, + min_aggregation_reward, + claim_strategy: claim_strategy.into(), + dry_run: dry_run_zk_aggregation, + }), }; if let Err(e) = run(config).await { @@ -335,3 +433,63 @@ fn format_hash_rate(rate: f64) -> String { format!("{:.0}", rate) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn serve_parses_zk_aggregation_flags() { + let args = Args::try_parse_from([ + "quantus-miner", + "serve", + "--enable-zk-aggregation", + "--node-rpc", + "ws://127.0.0.1:9944", + "--aggregation-account", + "alice", + "--aggregation-key", + "test-key", + "--zk-bins-dir", + "/tmp/zk-bins", + "--zk-workers", + "2", + "--max-active-zk-jobs", + "3", + "--min-aggregation-reward", + "42", + "--claim-strategy", + "reward-density", + "--dry-run-zk-aggregation", + ]) + .expect("serve args should parse"); + + let Some(Command::Serve { + enable_zk_aggregation, + node_rpc, + aggregation_account, + aggregation_key, + zk_bins_dir, + zk_workers, + max_active_zk_jobs, + min_aggregation_reward, + claim_strategy, + dry_run_zk_aggregation, + .. + }) = args.command + else { + panic!("expected serve command"); + }; + + assert!(enable_zk_aggregation); + assert_eq!(node_rpc, "ws://127.0.0.1:9944"); + assert_eq!(aggregation_account.as_deref(), Some("alice")); + assert_eq!(aggregation_key.as_deref(), Some("test-key")); + assert_eq!(zk_bins_dir, Some(PathBuf::from("/tmp/zk-bins"))); + assert_eq!(zk_workers, 2); + assert_eq!(max_active_zk_jobs, 3); + assert_eq!(min_aggregation_reward, 42); + assert_eq!(claim_strategy, CliClaimStrategy::RewardDensity); + assert!(dry_run_zk_aggregation); + } +} diff --git a/crates/miner-service/Cargo.toml b/crates/miner-service/Cargo.toml index b0baca7..e89d768 100644 --- a/crates/miner-service/Cargo.toml +++ b/crates/miner-service/Cargo.toml @@ -32,3 +32,6 @@ pow-core = { path = "../pow-core" } engine-cpu = { path = "../engine-cpu", optional = true } engine-gpu = { path = "../engine-gpu" } metrics = { path = "../metrics" } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/crates/miner-service/src/lib.rs b/crates/miner-service/src/lib.rs index a815fae..978ccf9 100644 --- a/crates/miner-service/src/lib.rs +++ b/crates/miner-service/src/lib.rs @@ -9,6 +9,7 @@ #![forbid(unsafe_code)] pub mod quic; +pub mod zk_aggregation; use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; use engine_cpu::{EngineCandidate, EngineRange, JobIdCancelCheck, MinerEngine}; @@ -31,6 +32,8 @@ pub struct ServiceConfig { pub gpu_batch_size: u64, /// CPU batch size in hashes pub cpu_batch_size: u64, + /// Optional delegated ZK aggregation worker configuration. + pub zk_aggregation: Option, } /// Engine type for tracking metrics per compute type. @@ -458,6 +461,18 @@ pub fn resolve_gpu_configuration( /// Start the miner service with the given configuration. pub async fn run(config: ServiceConfig) -> anyhow::Result<()> { + if let Some(ref zk_config) = config.zk_aggregation { + zk_config.validate()?; + log::info!( + "ZK aggregation enabled: rpc={}, workers={}, max_active_jobs={}, strategy={:?}, dry_run={}", + zk_config.node_rpc, + zk_config.workers, + zk_config.max_active_jobs, + zk_config.claim_strategy, + zk_config.dry_run + ); + } + // Detect effective CPU count let effective_cpus = num_cpus::get().max(1); diff --git a/crates/miner-service/src/zk_aggregation.rs b/crates/miner-service/src/zk_aggregation.rs new file mode 100644 index 0000000..cec62fd --- /dev/null +++ b/crates/miner-service/src/zk_aggregation.rs @@ -0,0 +1,114 @@ +use std::path::PathBuf; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ClaimStrategy { + Oldest, + RewardDensity, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ZkAggregationConfig { + pub node_rpc: String, + pub aggregation_account: Option, + pub aggregation_key: Option, + pub zk_bins_dir: Option, + pub workers: usize, + pub max_active_jobs: usize, + pub min_aggregation_reward: u128, + pub claim_strategy: ClaimStrategy, + pub dry_run: bool, +} + +impl ZkAggregationConfig { + pub fn validate(&self) -> anyhow::Result<()> { + if self.workers == 0 { + anyhow::bail!("--zk-workers must be greater than zero when ZK aggregation is enabled"); + } + if self.max_active_jobs == 0 { + anyhow::bail!( + "--max-active-zk-jobs must be greater than zero when ZK aggregation is enabled" + ); + } + let Some(zk_bins_dir) = &self.zk_bins_dir else { + anyhow::bail!("--zk-bins-dir is required when ZK aggregation is enabled"); + }; + if !zk_bins_dir.exists() { + anyhow::bail!( + "ZK bins directory does not exist: {}", + zk_bins_dir.display() + ); + } + Ok(()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct AggregationJob { + pub bundle_id: [u8; 32], + pub aggregator_address: [u8; 32], + pub candidate_proofs: Vec>, + pub ordered_candidate_ids: Vec<[u8; 32]>, + pub deadline_block: u32, +} + +#[derive(Debug)] +pub struct AggregationWorkerPool { + config: ZkAggregationConfig, +} + +impl AggregationWorkerPool { + pub fn new(config: ZkAggregationConfig) -> anyhow::Result { + config.validate()?; + Ok(Self { config }) + } + + pub fn max_active_jobs(&self) -> usize { + self.config.max_active_jobs + } + + pub fn should_cancel_for_new_pow_job(&self) -> bool { + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn config_validation_rejects_missing_zk_bins() { + let config = ZkAggregationConfig { + node_rpc: "ws://127.0.0.1:9944".to_string(), + aggregation_account: None, + aggregation_key: None, + zk_bins_dir: None, + workers: 1, + max_active_jobs: 1, + min_aggregation_reward: 0, + claim_strategy: ClaimStrategy::Oldest, + dry_run: true, + }; + + let err = config.validate().unwrap_err(); + assert!(err.to_string().contains("--zk-bins-dir is required")); + } + + #[test] + fn worker_pool_does_not_cancel_on_pow_job() { + let dir = tempfile::tempdir().unwrap(); + let config = ZkAggregationConfig { + node_rpc: "ws://127.0.0.1:9944".to_string(), + aggregation_account: None, + aggregation_key: None, + zk_bins_dir: Some(dir.path().to_path_buf()), + workers: 1, + max_active_jobs: 1, + min_aggregation_reward: 0, + claim_strategy: ClaimStrategy::Oldest, + dry_run: true, + }; + + let pool = AggregationWorkerPool::new(config).unwrap(); + assert!(!pool.should_cancel_for_new_pow_job()); + } +} From 8b7ec0129b3a72c75f06d87452792c53379d23c1 Mon Sep 17 00:00:00 2001 From: Ethan Date: Sat, 9 May 2026 16:45:07 -0500 Subject: [PATCH 2/5] *Adds delegated ZK aggregation flow *Enables end-to-end delegated aggregation so workers can validate pending proofs, choose profitable batches, claim bundles, generate L1 proofs, and submit results. *Improves safety by checking required proving artifacts, validating public inputs and duplicate nullifiers locally, and requiring a miner bond outside dry runs. *Updates the CLI, docs, and tests to cover the new worker configuration, selection strategies, and proving path. --- Cargo.lock | 119 ++- Cargo.toml | 4 + README.md | 11 + crates/miner-cli/src/main.rs | 14 + crates/miner-service/Cargo.toml | 4 + crates/miner-service/src/zk_aggregation.rs | 948 ++++++++++++++++++++- 6 files changed, 1065 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 655ae89..c7694cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,17 @@ dependencies = [ "libloading", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -468,6 +479,12 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "critical-section" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -1518,6 +1535,7 @@ name = "miner-service" version = "3.1.0" dependencies = [ "anyhow", + "async-trait", "crossbeam-channel", "engine-cpu", "engine-gpu", @@ -1528,6 +1546,9 @@ dependencies = [ "num_cpus", "pow-core", "primitive-types 0.13.1", + "qp-plonky2", + "qp-wormhole-aggregator", + "qp-wormhole-verifier", "quantus-miner-api", "quinn", "rustls 0.21.12", @@ -1713,6 +1734,10 @@ name = "once_cell" version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +dependencies = [ + "critical-section", + "portable-atomic", +] [[package]] name = "once_cell_polyfill" @@ -2114,18 +2139,20 @@ dependencies = [ [[package]] name = "qp-plonky2" -version = "1.1.5" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "593bccf15b8e2f9eb904ef4010f68b81ddcceb70aaf90116ce29ec09d7578dd4" +checksum = "a1d18516ef5ecd81ddcccb6beacdfe1578f44e4f05ccb0890998afcd3b87d01f" dependencies = [ "ahash", "anyhow", + "critical-section", "getrandom 0.2.17", "hashbrown 0.14.5", "itertools 0.11.0", "keccak-hash", "log", "num", + "once_cell", "p3-field", "p3-goldilocks", "p3-poseidon2", @@ -2146,9 +2173,9 @@ dependencies = [ [[package]] name = "qp-plonky2-core" -version = "1.1.5" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7d30fabfd90e359640f2371c8b3e9b377d215f7dcf4e61da1f38776c5b84540" +checksum = "0ad9961c2e2f6aca563eefd902697b0d21e9807c2c3b719ba1d4488bb03383c4" dependencies = [ "ahash", "anyhow", @@ -2172,9 +2199,9 @@ dependencies = [ [[package]] name = "qp-plonky2-field" -version = "1.1.5" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20c9f8259bf4f220b1d81001458cc6c09a1372f2b3e8dac2fb489a66230385c3" +checksum = "ecc7089b7ae09ef8fe4889353d2f2be7dffe6d87edfb5b03f9553ca0a1ca55da" dependencies = [ "anyhow", "itertools 0.11.0", @@ -2189,17 +2216,19 @@ dependencies = [ [[package]] name = "qp-plonky2-verifier" -version = "1.1.5" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd0eb89fd3cc40c4b25be95399635957d416406328169ba939db989c0444f364" +checksum = "eb155f950f3df3c0cf5fce846290f19c6b7a059fbefd9bdf9011fd6f01f84e79" dependencies = [ "ahash", "anyhow", + "critical-section", "hashbrown 0.14.5", "itertools 0.11.0", "keccak-hash", "log", "num", + "once_cell", "p3-field", "p3-goldilocks", "p3-poseidon2", @@ -2240,6 +2269,80 @@ dependencies = [ "rand_chacha 0.9.0", ] +[[package]] +name = "qp-wormhole-aggregator" +version = "2.0.1" +source = "git+https://github.com/Quantus-Network/qp-zk-circuits?rev=982fd4de36019c108169fedcff348003a06ad465#982fd4de36019c108169fedcff348003a06ad465" +dependencies = [ + "anyhow", + "hex", + "qp-plonky2", + "qp-wormhole-circuit", + "qp-wormhole-inputs", + "qp-wormhole-prover", + "qp-zk-circuits-common", + "rand 0.8.5", + "serde", + "serde_json", +] + +[[package]] +name = "qp-wormhole-circuit" +version = "2.0.1" +source = "git+https://github.com/Quantus-Network/qp-zk-circuits?rev=982fd4de36019c108169fedcff348003a06ad465#982fd4de36019c108169fedcff348003a06ad465" +dependencies = [ + "anyhow", + "hex", + "qp-plonky2", + "qp-wormhole-inputs", + "qp-zk-circuits-common", +] + +[[package]] +name = "qp-wormhole-inputs" +version = "2.0.1" +source = "git+https://github.com/Quantus-Network/qp-zk-circuits?rev=982fd4de36019c108169fedcff348003a06ad465#982fd4de36019c108169fedcff348003a06ad465" +dependencies = [ + "anyhow", +] + +[[package]] +name = "qp-wormhole-prover" +version = "2.0.1" +source = "git+https://github.com/Quantus-Network/qp-zk-circuits?rev=982fd4de36019c108169fedcff348003a06ad465#982fd4de36019c108169fedcff348003a06ad465" +dependencies = [ + "anyhow", + "qp-plonky2", + "qp-wormhole-circuit", + "qp-wormhole-inputs", + "qp-zk-circuits-common", +] + +[[package]] +name = "qp-wormhole-verifier" +version = "2.0.1" +source = "git+https://github.com/Quantus-Network/qp-zk-circuits?rev=982fd4de36019c108169fedcff348003a06ad465#982fd4de36019c108169fedcff348003a06ad465" +dependencies = [ + "anyhow", + "qp-plonky2-verifier", + "qp-wormhole-inputs", +] + +[[package]] +name = "qp-zk-circuits-common" +version = "2.0.1" +source = "git+https://github.com/Quantus-Network/qp-zk-circuits?rev=982fd4de36019c108169fedcff348003a06ad465#982fd4de36019c108169fedcff348003a06ad465" +dependencies = [ + "anyhow", + "hex", + "qp-plonky2", + "qp-poseidon-constants", + "qp-poseidon-core", + "qp-wormhole-inputs", + "rand 0.8.5", + "serde", +] + [[package]] name = "qpow-math" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 54917e5..f697451 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ version = "3.1.0" [workspace.dependencies] anyhow = "1" +async-trait = "0.1" clap = { version = "4.5", features = ["derive", "env"] } codec = { version = "3.6.12", default-features = false, package = "parity-scale-codec", features = ["derive", "std"] } crossbeam-channel = "0.5" @@ -30,6 +31,9 @@ num-traits = "0.2" num_cpus = "1.16" primitive-types = { version = "0.13.1", default-features = false } qp-poseidon-core = { version = "1.4.0", default-features = false } +qp-plonky2 = { version = "1.4.1", default-features = false, features = ["std"] } +qp-wormhole-aggregator = { version = "2.0.1", git = "https://github.com/Quantus-Network/qp-zk-circuits", rev = "982fd4de36019c108169fedcff348003a06ad465", default-features = false, features = ["std"] } +qp-wormhole-verifier = { version = "2.0.1", git = "https://github.com/Quantus-Network/qp-zk-circuits", rev = "982fd4de36019c108169fedcff348003a06ad465", default-features = false, features = ["std"] } qpow-math = { git = "https://github.com/Quantus-Network/chain.git", tag = "v0.4.11-star-dust", package = "qpow-math", default-features = false } quantus-miner-api = { git = "https://github.com/Quantus-Network/chain.git", tag = "v0.4.11-star-dust" } rand = { version = "0.8.5", default-features = false } diff --git a/README.md b/README.md index 836dfc7..f6837c6 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,17 @@ cargo build -p miner-cli --release The binary will be available at `target/release/quantus-miner`. +## Delegated ZK Aggregation + +The delegated L1 aggregation worker depends on the same `qp-zk-circuits` revision as the chain +runtime. + +Any command that generates ZK proofs must run in release mode (otherwise proving time will take several minutes instead of seconds): + +```bash +cargo test --release -p miner-service zk_aggregation_prove -- --nocapture +``` + ## Running ```bash diff --git a/crates/miner-cli/src/main.rs b/crates/miner-cli/src/main.rs index 3afb6ce..af2b639 100644 --- a/crates/miner-cli/src/main.rs +++ b/crates/miner-cli/src/main.rs @@ -112,6 +112,14 @@ enum Command { )] min_aggregation_reward: u128, + /// Miner bond amount reserved when claiming a ZK aggregation bundle + #[arg( + long = "zk-miner-bond", + env = "MINER_ZK_MINER_BOND", + default_value_t = 0 + )] + zk_miner_bond: u128, + /// Bundle claiming strategy #[arg( long = "claim-strategy", @@ -189,6 +197,7 @@ async fn main() { zk_workers, max_active_zk_jobs, min_aggregation_reward, + zk_miner_bond, claim_strategy, dry_run_zk_aggregation, } => { @@ -220,6 +229,7 @@ async fn main() { workers: zk_workers, max_active_jobs: max_active_zk_jobs, min_aggregation_reward, + miner_bond: zk_miner_bond, claim_strategy: claim_strategy.into(), dry_run: dry_run_zk_aggregation, }), @@ -458,6 +468,8 @@ mod tests { "3", "--min-aggregation-reward", "42", + "--zk-miner-bond", + "50", "--claim-strategy", "reward-density", "--dry-run-zk-aggregation", @@ -473,6 +485,7 @@ mod tests { zk_workers, max_active_zk_jobs, min_aggregation_reward, + zk_miner_bond, claim_strategy, dry_run_zk_aggregation, .. @@ -489,6 +502,7 @@ mod tests { assert_eq!(zk_workers, 2); assert_eq!(max_active_zk_jobs, 3); assert_eq!(min_aggregation_reward, 42); + assert_eq!(zk_miner_bond, 50); assert_eq!(claim_strategy, CliClaimStrategy::RewardDensity); assert!(dry_run_zk_aggregation); } diff --git a/crates/miner-service/Cargo.toml b/crates/miner-service/Cargo.toml index e89d768..0c81ecd 100644 --- a/crates/miner-service/Cargo.toml +++ b/crates/miner-service/Cargo.toml @@ -18,6 +18,10 @@ log = { workspace = true } crossbeam-channel = { workspace = true } num_cpus = { workspace = true } anyhow = { workspace = true } +async-trait = { workspace = true } +qp-plonky2 = { workspace = true } +qp-wormhole-aggregator = { workspace = true } +qp-wormhole-verifier = { workspace = true } # QUIC transport quinn = "0.10" diff --git a/crates/miner-service/src/zk_aggregation.rs b/crates/miner-service/src/zk_aggregation.rs index cec62fd..1e76b28 100644 --- a/crates/miner-service/src/zk_aggregation.rs +++ b/crates/miner-service/src/zk_aggregation.rs @@ -1,4 +1,26 @@ -use std::path::PathBuf; +use anyhow::{anyhow, bail, Context}; +use async_trait::async_trait; +use plonky2::plonk::proof::ProofWithPublicInputs as ProverProofWithPublicInputs; +use qp_wormhole_aggregator::aggregator::{AggregationBackend, CircuitType, Layer1Aggregator}; +use qp_wormhole_verifier::{ + parse_aggregated_public_inputs, AggregatedPublicCircuitInputs, BytesDigest, + ProofWithPublicInputs as VerifierProofWithPublicInputs, PublicInputsByAccount, + WormholeVerifier, C, D, F, +}; +use std::{ + cmp::Ordering, + collections::BTreeSet, + path::{Path, PathBuf}, +}; + +pub const REQUIRED_ZK_ARTIFACTS: &[&str] = &[ + "aggregated_common.bin", + "aggregated_verifier.bin", + "layer1_common.bin", + "layer1_prover.bin", + "layer1_verifier.bin", + "config.json", +]; #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum ClaimStrategy { @@ -15,42 +37,284 @@ pub struct ZkAggregationConfig { pub workers: usize, pub max_active_jobs: usize, pub min_aggregation_reward: u128, + pub miner_bond: u128, pub claim_strategy: ClaimStrategy, pub dry_run: bool, } impl ZkAggregationConfig { pub fn validate(&self) -> anyhow::Result<()> { + if self.node_rpc.trim().is_empty() { + bail!("--node-rpc must not be empty when ZK aggregation is enabled"); + } if self.workers == 0 { - anyhow::bail!("--zk-workers must be greater than zero when ZK aggregation is enabled"); + bail!("--zk-workers must be greater than zero when ZK aggregation is enabled"); } if self.max_active_jobs == 0 { - anyhow::bail!( - "--max-active-zk-jobs must be greater than zero when ZK aggregation is enabled" + bail!("--max-active-zk-jobs must be greater than zero when ZK aggregation is enabled"); + } + if !self.dry_run && self.miner_bond == 0 { + bail!( + "--zk-miner-bond must be greater than zero unless --dry-run-zk-aggregation is set" ); } + let Some(zk_bins_dir) = &self.zk_bins_dir else { - anyhow::bail!("--zk-bins-dir is required when ZK aggregation is enabled"); + bail!("--zk-bins-dir is required when ZK aggregation is enabled"); }; - if !zk_bins_dir.exists() { - anyhow::bail!( - "ZK bins directory does not exist: {}", - zk_bins_dir.display() - ); - } - Ok(()) + validate_zk_bins_dir(zk_bins_dir) + } +} + +pub fn validate_zk_bins_dir(path: &Path) -> anyhow::Result<()> { + if !path.exists() { + bail!("ZK bins directory does not exist: {}", path.display()); + } + if !path.is_dir() { + bail!("ZK bins path is not a directory: {}", path.display()); + } + + let missing = REQUIRED_ZK_ARTIFACTS + .iter() + .filter(|file| !path.join(file).is_file()) + .copied() + .collect::>(); + if !missing.is_empty() { + bail!( + "ZK bins directory {} is missing required artifact(s): {}", + path.display(), + missing.join(", ") + ); + } + + Ok(()) +} + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct BundleGroupKey { + pub circuit_id: [u8; 32], + pub public_input_layout_version: u32, + pub num_leaf_proofs: u32, + pub num_layer0_proofs: u32, + pub asset_id: u32, + pub volume_fee_bps: u32, + pub block_hash: [u8; 32], + pub block_number: u32, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ExitSlotSummary { + pub summed_output_amount: u32, + pub exit_account: [u8; 32], +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct L0CandidateSummary { + pub candidate_id: [u8; 32], + pub group_key: BundleGroupKey, + pub submitted_at: u32, + pub expires_at: u32, + pub aggregation_tip: u128, + pub estimated_reward: u128, + pub estimated_proving_cost: u64, + pub nullifiers: Vec<[u8; 32]>, + pub exit_summary: Vec, +} + +impl L0CandidateSummary { + fn reward_for_selection(&self) -> u128 { + self.estimated_reward.max(self.aggregation_tip) + } + + fn proving_cost_for_selection(&self) -> u64 { + self.estimated_proving_cost.max(1) } } #[derive(Clone, Debug, PartialEq, Eq)] -pub struct AggregationJob { +pub struct ClaimedBundle { pub bundle_id: [u8; 32], - pub aggregator_address: [u8; 32], - pub candidate_proofs: Vec>, + pub group_key: BundleGroupKey, pub ordered_candidate_ids: Vec<[u8; 32]>, + pub aggregator_address: [u8; 32], pub deadline_block: u32, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct CandidateBatch { + pub group: BundleGroupKey, + pub candidates: Vec, + pub total_reward: u128, + pub estimated_proving_cost: u64, + pub oldest_submitted_at: u32, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ValidatedL0Candidate { + pub candidate_id: [u8; 32], + pub nullifiers: Vec<[u8; 32]>, + pub inputs: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum AggregationRunOutcome { + NoOpportunity, + DryRunValidated { + group: BundleGroupKey, + candidate_count: usize, + total_reward: u128, + }, + Submitted { + bundle_id: [u8; 32], + proof_len: usize, + }, +} + +#[async_trait] +pub trait AggregationChainClient: Send + Sync { + async fn current_block(&self) -> anyhow::Result; + async fn pending_groups(&self) -> anyhow::Result>; + async fn pending_candidates( + &self, + group: &BundleGroupKey, + ) -> anyhow::Result>; + async fn fetch_candidate_proof(&self, candidate_id: [u8; 32]) -> anyhow::Result>; + async fn register_aggregator_if_needed( + &self, + config: &ZkAggregationConfig, + ) -> anyhow::Result<()>; + async fn claim_bundle( + &self, + group: BundleGroupKey, + miner_bond: u128, + ) -> anyhow::Result; + async fn fetch_bundle(&self, bundle_id: [u8; 32]) -> anyhow::Result; + async fn submit_l1_aggregate( + &self, + bundle_id: [u8; 32], + proof_bytes: Vec, + ) -> anyhow::Result<()>; +} + +pub trait L0ProofValidator: Send + Sync { + fn validate_candidate( + &self, + group: &BundleGroupKey, + candidate: &L0CandidateSummary, + proof_bytes: &[u8], + ) -> anyhow::Result; +} + +#[async_trait] +pub trait L1ProofGenerator: Send + Sync { + async fn prove_l1( + &self, + bundle: &ClaimedBundle, + candidate_proofs: Vec>, + ) -> anyhow::Result>; +} + +#[derive(Debug)] +pub struct LocalL0ProofValidator { + verifier: WormholeVerifier, +} + +impl LocalL0ProofValidator { + pub fn new_from_bins_dir(bins_dir: &Path) -> anyhow::Result { + validate_zk_bins_dir(bins_dir)?; + let verifier = WormholeVerifier::new_from_files( + &bins_dir.join("aggregated_verifier.bin"), + &bins_dir.join("aggregated_common.bin"), + ) + .context("failed to load layer-0 aggregate verifier artifacts")?; + + Ok(Self { verifier }) + } +} + +impl L0ProofValidator for LocalL0ProofValidator { + fn validate_candidate( + &self, + group: &BundleGroupKey, + candidate: &L0CandidateSummary, + proof_bytes: &[u8], + ) -> anyhow::Result { + if candidate.group_key != *group { + bail!("candidate group does not match selected bundle group"); + } + + let proof = VerifierProofWithPublicInputs::::from_bytes( + proof_bytes.to_vec(), + &self.verifier.circuit_data.common, + ) + .map_err(|err| anyhow!("failed to deserialize L0 aggregate proof: {}", err))?; + let inputs = parse_aggregated_public_inputs(&proof) + .context("failed to parse L0 aggregate public inputs")?; + validate_l0_public_inputs_against_group(group, &inputs)?; + self.verifier + .verify_ref(&proof) + .context("local L0 aggregate proof verification failed")?; + + let nullifiers = digest_vec_to_arrays(&inputs.nullifiers)?; + ensure_no_duplicate_nullifiers(&nullifiers)?; + + Ok(ValidatedL0Candidate { + candidate_id: candidate.candidate_id, + nullifiers, + inputs: Some(inputs), + }) + } +} + +#[derive(Clone, Debug)] +pub struct ZkBinsL1ProofGenerator { + bins_dir: PathBuf, +} + +impl ZkBinsL1ProofGenerator { + pub fn new(bins_dir: PathBuf) -> anyhow::Result { + validate_zk_bins_dir(&bins_dir)?; + Ok(Self { bins_dir }) + } +} + +#[async_trait] +impl L1ProofGenerator for ZkBinsL1ProofGenerator { + async fn prove_l1( + &self, + bundle: &ClaimedBundle, + candidate_proofs: Vec>, + ) -> anyhow::Result> { + let mut aggregator = Layer1Aggregator::new( + &self.bins_dir, + BytesDigest::new_unchecked(bundle.aggregator_address), + ) + .context("failed to load layer-1 aggregation prover")?; + if candidate_proofs.len() != aggregator.batch_size() { + bail!( + "claimed bundle has {} candidate proof(s), but L1 prover expects {}", + candidate_proofs.len(), + aggregator.batch_size() + ); + } + + let layer0_common = aggregator + .load_common_data(CircuitType::Leaf) + .context("failed to load layer-0 common data")?; + for proof_bytes in candidate_proofs { + let proof = + ProverProofWithPublicInputs::::from_bytes(proof_bytes, &layer0_common) + .map_err(|err| anyhow!("failed to deserialize claimed L0 proof: {}", err))?; + aggregator + .push_proof(proof) + .context("failed to enqueue L0 proof for layer-1 aggregation")?; + } + + let proof = aggregator.aggregate().context("layer-1 proving failed")?; + Ok(proof.to_bytes()) + } +} + #[derive(Debug)] pub struct AggregationWorkerPool { config: ZkAggregationConfig, @@ -69,11 +333,463 @@ impl AggregationWorkerPool { pub fn should_cancel_for_new_pow_job(&self) -> bool { false } + + pub async fn next_candidate_batch( + &self, + client: &C, + ) -> anyhow::Result> + where + C: AggregationChainClient, + { + let current_block = client.current_block().await?; + let mut batches = Vec::new(); + + for group in client.pending_groups().await? { + let required = group.num_layer0_proofs as usize; + if required == 0 { + continue; + } + + let mut candidates = client.pending_candidates(&group).await?; + candidates.retain(|candidate| { + candidate.group_key == group && candidate.expires_at > current_block + }); + candidates.sort_by_key(|candidate| candidate.submitted_at); + if candidates.len() < required { + continue; + } + + let selected = candidates.into_iter().take(required).collect::>(); + let total_reward = selected + .iter() + .map(L0CandidateSummary::reward_for_selection) + .sum::(); + if total_reward < self.config.min_aggregation_reward { + continue; + } + let estimated_proving_cost = selected + .iter() + .map(L0CandidateSummary::proving_cost_for_selection) + .sum::() + .max(1); + let oldest_submitted_at = selected + .iter() + .map(|candidate| candidate.submitted_at) + .min() + .unwrap_or(u32::MAX); + + batches.push(CandidateBatch { + group, + candidates: selected, + total_reward, + estimated_proving_cost, + oldest_submitted_at, + }); + } + + Ok(select_batch(&batches, self.config.claim_strategy).cloned()) + } + + pub async fn fetch_and_validate_candidate_batch( + &self, + client: &C, + validator: &V, + batch: &CandidateBatch, + ) -> anyhow::Result>> + where + C: AggregationChainClient, + V: L0ProofValidator, + { + let mut proof_bytes = Vec::with_capacity(batch.candidates.len()); + let mut validated = Vec::with_capacity(batch.candidates.len()); + + for candidate in &batch.candidates { + let bytes = client.fetch_candidate_proof(candidate.candidate_id).await?; + let validated_candidate = + validator.validate_candidate(&batch.group, candidate, &bytes)?; + proof_bytes.push(bytes); + validated.push(validated_candidate); + } + + let nullifiers = validated + .iter() + .flat_map(|candidate| candidate.nullifiers.iter().copied()) + .collect::>(); + ensure_no_duplicate_nullifiers(&nullifiers)?; + + Ok(proof_bytes) + } + + pub async fn run_once( + &self, + client: &C, + validator: &V, + prover: &P, + ) -> anyhow::Result + where + C: AggregationChainClient, + V: L0ProofValidator, + P: L1ProofGenerator, + { + let Some(batch) = self.next_candidate_batch(client).await? else { + return Ok(AggregationRunOutcome::NoOpportunity); + }; + + let _validated_proofs = self + .fetch_and_validate_candidate_batch(client, validator, &batch) + .await?; + + if self.config.dry_run { + return Ok(AggregationRunOutcome::DryRunValidated { + group: batch.group, + candidate_count: batch.candidates.len(), + total_reward: batch.total_reward, + }); + } + + client.register_aggregator_if_needed(&self.config).await?; + let claimed = client + .claim_bundle(batch.group.clone(), self.config.miner_bond) + .await?; + let claimed = client.fetch_bundle(claimed.bundle_id).await?; + + let mut claimed_proofs = Vec::with_capacity(claimed.ordered_candidate_ids.len()); + for candidate_id in &claimed.ordered_candidate_ids { + claimed_proofs.push(client.fetch_candidate_proof(*candidate_id).await?); + } + + let l1_proof = prover.prove_l1(&claimed, claimed_proofs).await?; + let proof_len = l1_proof.len(); + client + .submit_l1_aggregate(claimed.bundle_id, l1_proof) + .await?; + let _observed = client.fetch_bundle(claimed.bundle_id).await?; + + Ok(AggregationRunOutcome::Submitted { + bundle_id: claimed.bundle_id, + proof_len, + }) + } +} + +fn select_batch<'a>( + batches: &'a [CandidateBatch], + strategy: ClaimStrategy, +) -> Option<&'a CandidateBatch> { + match strategy { + ClaimStrategy::Oldest => batches.iter().min_by_key(|batch| batch.oldest_submitted_at), + ClaimStrategy::RewardDensity => batches.iter().max_by(|left, right| { + compare_reward_density(left, right) + .then_with(|| right.oldest_submitted_at.cmp(&left.oldest_submitted_at)) + }), + } +} + +fn compare_reward_density(left: &CandidateBatch, right: &CandidateBatch) -> Ordering { + let left_score = left + .total_reward + .saturating_mul(right.estimated_proving_cost as u128); + let right_score = right + .total_reward + .saturating_mul(left.estimated_proving_cost as u128); + left_score.cmp(&right_score) +} + +pub fn validate_l0_public_inputs_against_group( + group: &BundleGroupKey, + inputs: &AggregatedPublicCircuitInputs, +) -> anyhow::Result<()> { + if inputs.asset_id != group.asset_id { + bail!("candidate asset_id does not match bundle group"); + } + if inputs.volume_fee_bps != group.volume_fee_bps { + bail!("candidate volume_fee_bps does not match bundle group"); + } + if digest_to_array(&inputs.block_data.block_hash)? != group.block_hash { + bail!("candidate block_hash does not match bundle group"); + } + if inputs.block_data.block_number != group.block_number { + bail!("candidate block_number does not match bundle group"); + } + Ok(()) +} + +pub fn ensure_no_duplicate_nullifiers(nullifiers: &[[u8; 32]]) -> anyhow::Result<()> { + let mut seen = BTreeSet::new(); + for nullifier in nullifiers { + if !seen.insert(*nullifier) { + bail!("duplicate nullifier in candidate batch"); + } + } + Ok(()) +} + +fn digest_to_array(digest: &BytesDigest) -> anyhow::Result<[u8; 32]> { + digest + .as_ref() + .try_into() + .map_err(|_| anyhow!("digest has invalid length")) +} + +fn digest_vec_to_arrays(digests: &[BytesDigest]) -> anyhow::Result> { + digests.iter().map(digest_to_array).collect() +} + +#[allow(dead_code)] +fn exits_from_public_inputs(inputs: &[PublicInputsByAccount]) -> Vec { + inputs + .iter() + .filter_map(|exit| { + Some(ExitSlotSummary { + summed_output_amount: exit.summed_output_amount, + exit_account: digest_to_array(&exit.exit_account).ok()?, + }) + }) + .collect() } #[cfg(test)] mod tests { use super::*; + use std::{ + collections::BTreeMap, + fs, + sync::{Arc, Mutex}, + }; + + fn write_required_artifacts(dir: &Path) { + for file in REQUIRED_ZK_ARTIFACTS { + fs::write(dir.join(file), b"fixture").unwrap(); + } + } + + fn test_config(dir: &Path, dry_run: bool, strategy: ClaimStrategy) -> ZkAggregationConfig { + ZkAggregationConfig { + node_rpc: "ws://127.0.0.1:9944".to_string(), + aggregation_account: Some("aggregator".to_string()), + aggregation_key: Some("key".to_string()), + zk_bins_dir: Some(dir.to_path_buf()), + workers: 1, + max_active_jobs: 1, + min_aggregation_reward: 0, + miner_bond: if dry_run { 0 } else { 50 }, + claim_strategy: strategy, + dry_run, + } + } + + fn group(id: u8, submitted_block: u32) -> BundleGroupKey { + let mut circuit_id = [0u8; 32]; + circuit_id[0] = id; + let mut block_hash = [0u8; 32]; + block_hash[0] = id; + BundleGroupKey { + circuit_id, + public_input_layout_version: 1, + num_leaf_proofs: 16, + num_layer0_proofs: 1, + asset_id: 0, + volume_fee_bps: 10, + block_hash, + block_number: submitted_block, + } + } + + fn candidate( + id: u8, + group: BundleGroupKey, + submitted_at: u32, + reward: u128, + ) -> L0CandidateSummary { + let mut candidate_id = [0u8; 32]; + candidate_id[0] = id; + let mut nullifier = [0u8; 32]; + nullifier[0] = id; + L0CandidateSummary { + candidate_id, + group_key: group, + submitted_at, + expires_at: 100, + aggregation_tip: reward, + estimated_reward: reward, + estimated_proving_cost: 1, + nullifiers: vec![nullifier], + exit_summary: Vec::new(), + } + } + + #[derive(Default)] + struct MockState { + current_block: u32, + groups: Vec, + candidates: BTreeMap>, + proofs: BTreeMap<[u8; 32], Vec>, + claimed_bundle: Option, + registered_count: usize, + claim_count: usize, + submit_count: usize, + submitted_proof: Option>, + } + + #[derive(Clone, Default)] + struct MockChainClient { + state: Arc>, + } + + impl MockChainClient { + fn with_group(group: BundleGroupKey, candidates: Vec) -> Self { + let client = Self::default(); + { + let mut state = client.state.lock().unwrap(); + state.current_block = 1; + state.groups.push(group.clone()); + for candidate in &candidates { + state + .proofs + .insert(candidate.candidate_id, vec![candidate.candidate_id[0]]); + } + state.candidates.insert(group, candidates); + } + client + } + + fn counters(&self) -> (usize, usize, usize) { + let state = self.state.lock().unwrap(); + ( + state.registered_count, + state.claim_count, + state.submit_count, + ) + } + } + + #[async_trait] + impl AggregationChainClient for MockChainClient { + async fn current_block(&self) -> anyhow::Result { + Ok(self.state.lock().unwrap().current_block) + } + + async fn pending_groups(&self) -> anyhow::Result> { + Ok(self.state.lock().unwrap().groups.clone()) + } + + async fn pending_candidates( + &self, + group: &BundleGroupKey, + ) -> anyhow::Result> { + Ok(self + .state + .lock() + .unwrap() + .candidates + .get(group) + .cloned() + .unwrap_or_default()) + } + + async fn fetch_candidate_proof(&self, candidate_id: [u8; 32]) -> anyhow::Result> { + self.state + .lock() + .unwrap() + .proofs + .get(&candidate_id) + .cloned() + .ok_or_else(|| anyhow!("missing candidate proof")) + } + + async fn register_aggregator_if_needed( + &self, + _config: &ZkAggregationConfig, + ) -> anyhow::Result<()> { + self.state.lock().unwrap().registered_count += 1; + Ok(()) + } + + async fn claim_bundle( + &self, + group: BundleGroupKey, + _miner_bond: u128, + ) -> anyhow::Result { + let candidates = self + .state + .lock() + .unwrap() + .candidates + .get(&group) + .cloned() + .unwrap_or_default(); + let mut bundle_id = [0u8; 32]; + bundle_id[0] = 9; + let bundle = ClaimedBundle { + bundle_id, + group_key: group, + ordered_candidate_ids: candidates + .iter() + .map(|candidate| candidate.candidate_id) + .collect(), + aggregator_address: [2u8; 32], + deadline_block: 50, + }; + let mut state = self.state.lock().unwrap(); + state.claim_count += 1; + state.claimed_bundle = Some(bundle.clone()); + Ok(bundle) + } + + async fn fetch_bundle(&self, _bundle_id: [u8; 32]) -> anyhow::Result { + self.state + .lock() + .unwrap() + .claimed_bundle + .clone() + .ok_or_else(|| anyhow!("missing claimed bundle")) + } + + async fn submit_l1_aggregate( + &self, + _bundle_id: [u8; 32], + proof_bytes: Vec, + ) -> anyhow::Result<()> { + let mut state = self.state.lock().unwrap(); + state.submit_count += 1; + state.submitted_proof = Some(proof_bytes); + Ok(()) + } + } + + struct SummaryProofValidator; + + impl L0ProofValidator for SummaryProofValidator { + fn validate_candidate( + &self, + group: &BundleGroupKey, + candidate: &L0CandidateSummary, + _proof_bytes: &[u8], + ) -> anyhow::Result { + if candidate.group_key != *group { + bail!("candidate group does not match selected bundle group"); + } + ensure_no_duplicate_nullifiers(&candidate.nullifiers)?; + Ok(ValidatedL0Candidate { + candidate_id: candidate.candidate_id, + nullifiers: candidate.nullifiers.clone(), + inputs: None, + }) + } + } + + struct StaticProofGenerator; + + #[async_trait] + impl L1ProofGenerator for StaticProofGenerator { + async fn prove_l1( + &self, + _bundle: &ClaimedBundle, + _candidate_proofs: Vec>, + ) -> anyhow::Result> { + Ok(vec![1, 2, 3]) + } + } #[test] fn config_validation_rejects_missing_zk_bins() { @@ -85,6 +801,7 @@ mod tests { workers: 1, max_active_jobs: 1, min_aggregation_reward: 0, + miner_bond: 0, claim_strategy: ClaimStrategy::Oldest, dry_run: true, }; @@ -93,22 +810,199 @@ mod tests { assert!(err.to_string().contains("--zk-bins-dir is required")); } + #[test] + fn config_validation_requires_artifacts() { + let dir = tempfile::tempdir().unwrap(); + let err = test_config(dir.path(), true, ClaimStrategy::Oldest) + .validate() + .unwrap_err(); + assert!(err.to_string().contains("missing required artifact")); + } + #[test] fn worker_pool_does_not_cancel_on_pow_job() { let dir = tempfile::tempdir().unwrap(); - let config = ZkAggregationConfig { - node_rpc: "ws://127.0.0.1:9944".to_string(), - aggregation_account: None, - aggregation_key: None, - zk_bins_dir: Some(dir.path().to_path_buf()), - workers: 1, - max_active_jobs: 1, - min_aggregation_reward: 0, - claim_strategy: ClaimStrategy::Oldest, - dry_run: true, + write_required_artifacts(dir.path()); + let pool = AggregationWorkerPool::new(test_config(dir.path(), true, ClaimStrategy::Oldest)) + .unwrap(); + assert!(!pool.should_cancel_for_new_pow_job()); + } + + #[tokio::test] + async fn dry_run_does_not_submit_claim() { + let dir = tempfile::tempdir().unwrap(); + write_required_artifacts(dir.path()); + let group = group(1, 1); + let client = + MockChainClient::with_group(group.clone(), vec![candidate(1, group.clone(), 3, 10)]); + let pool = AggregationWorkerPool::new(test_config(dir.path(), true, ClaimStrategy::Oldest)) + .unwrap(); + + let outcome = pool + .run_once(&client, &SummaryProofValidator, &StaticProofGenerator) + .await + .unwrap(); + + assert!(matches!( + outcome, + AggregationRunOutcome::DryRunValidated { + candidate_count: 1, + .. + } + )); + assert_eq!(client.counters(), (0, 0, 0)); + } + + #[tokio::test] + async fn claim_strategy_selects_oldest_group() { + let dir = tempfile::tempdir().unwrap(); + write_required_artifacts(dir.path()); + let old_group = group(1, 1); + let new_group = group(2, 2); + let client = MockChainClient::default(); + { + let mut state = client.state.lock().unwrap(); + state.current_block = 1; + state.groups = vec![new_group.clone(), old_group.clone()]; + state.candidates.insert( + old_group.clone(), + vec![candidate(1, old_group.clone(), 2, 10)], + ); + state.candidates.insert( + new_group.clone(), + vec![candidate(2, new_group.clone(), 20, 1000)], + ); + state.proofs.insert([1u8; 32], vec![1]); + state.proofs.insert([2u8; 32], vec![2]); + } + let pool = AggregationWorkerPool::new(test_config(dir.path(), true, ClaimStrategy::Oldest)) + .unwrap(); + + let batch = pool.next_candidate_batch(&client).await.unwrap().unwrap(); + assert_eq!(batch.group, old_group); + } + + #[tokio::test] + async fn reward_density_strategy_selects_best_reward_per_cost() { + let dir = tempfile::tempdir().unwrap(); + write_required_artifacts(dir.path()); + let dense_group = group(1, 1); + let high_reward_group = group(2, 2); + let mut dense_candidate = candidate(1, dense_group.clone(), 10, 50); + dense_candidate.estimated_proving_cost = 1; + let mut costly_candidate = candidate(2, high_reward_group.clone(), 2, 100); + costly_candidate.estimated_proving_cost = 10; + let client = MockChainClient::default(); + { + let mut state = client.state.lock().unwrap(); + state.current_block = 1; + state.groups = vec![high_reward_group.clone(), dense_group.clone()]; + state + .candidates + .insert(dense_group.clone(), vec![dense_candidate]); + state + .candidates + .insert(high_reward_group.clone(), vec![costly_candidate]); + } + let pool = + AggregationWorkerPool::new(test_config(dir.path(), true, ClaimStrategy::RewardDensity)) + .unwrap(); + + let batch = pool.next_candidate_batch(&client).await.unwrap().unwrap(); + assert_eq!(batch.group, dense_group); + } + + #[test] + fn local_validation_rejects_incompatible_candidates() { + let group = group(1, 1); + let inputs = AggregatedPublicCircuitInputs { + num_unique_exits: 0, + asset_id: 99, + volume_fee_bps: 10, + block_data: qp_wormhole_verifier::BlockData { + block_hash: BytesDigest::new_unchecked(group.block_hash), + block_number: group.block_number, + }, + account_data: Vec::new(), + nullifiers: Vec::new(), }; - let pool = AggregationWorkerPool::new(config).unwrap(); - assert!(!pool.should_cancel_for_new_pow_job()); + let err = validate_l0_public_inputs_against_group(&group, &inputs).unwrap_err(); + assert!(err.to_string().contains("asset_id")); + } + + #[tokio::test] + async fn local_validation_rejects_duplicate_nullifiers() { + let dir = tempfile::tempdir().unwrap(); + write_required_artifacts(dir.path()); + let group = group(1, 1); + let mut candidate = candidate(1, group.clone(), 3, 10); + candidate.nullifiers = vec![[7u8; 32], [7u8; 32]]; + let client = MockChainClient::with_group(group.clone(), vec![candidate]); + let pool = AggregationWorkerPool::new(test_config(dir.path(), true, ClaimStrategy::Oldest)) + .unwrap(); + + let err = pool + .run_once(&client, &SummaryProofValidator, &StaticProofGenerator) + .await + .unwrap_err(); + assert!(err.to_string().contains("duplicate nullifier")); + } + + #[tokio::test] + async fn mocked_worker_processes_claimed_bundle_end_to_end() { + let dir = tempfile::tempdir().unwrap(); + write_required_artifacts(dir.path()); + let group = group(1, 1); + let client = + MockChainClient::with_group(group.clone(), vec![candidate(1, group.clone(), 3, 10)]); + let pool = + AggregationWorkerPool::new(test_config(dir.path(), false, ClaimStrategy::Oldest)) + .unwrap(); + + let outcome = pool + .run_once(&client, &SummaryProofValidator, &StaticProofGenerator) + .await + .unwrap(); + + assert!(matches!( + outcome, + AggregationRunOutcome::Submitted { proof_len: 3, .. } + )); + assert_eq!(client.counters(), (1, 1, 1)); + } + + #[test] + fn zk_aggregation_prove_generates_l1_proof_from_fixture_when_configured() { + let Ok(bins_dir) = std::env::var("ZK_AGGREGATION_TEST_BINS_DIR") else { + eprintln!("skipping proving test: ZK_AGGREGATION_TEST_BINS_DIR is not set"); + return; + }; + let Ok(l0_proof_path) = std::env::var("ZK_AGGREGATION_TEST_L0_PROOF") else { + eprintln!("skipping proving test: ZK_AGGREGATION_TEST_L0_PROOF is not set"); + return; + }; + + let proof_hex = fs::read_to_string(l0_proof_path).unwrap(); + let proof_bytes = hex::decode(proof_hex.trim()).unwrap(); + let group = group(1, 1); + let bundle = ClaimedBundle { + bundle_id: [9u8; 32], + group_key: group, + ordered_candidate_ids: vec![[1u8; 32]], + aggregator_address: { + let mut address = [0u8; 32]; + address[..8].copy_from_slice(&2u64.to_le_bytes()); + address + }, + deadline_block: 50, + }; + let prover = ZkBinsL1ProofGenerator::new(PathBuf::from(bins_dir)).unwrap(); + let proof = tokio::runtime::Runtime::new() + .unwrap() + .block_on(prover.prove_l1(&bundle, vec![proof_bytes])) + .unwrap(); + + assert!(!proof.is_empty()); } } From f76423a868545031cabd2c0bc699a9ab7ff0af56 Mon Sep 17 00:00:00 2001 From: Ethan Date: Sun, 10 May 2026 20:34:24 -0500 Subject: [PATCH 3/5] *Hardens ZK aggregation signing and proof checks *Requires a secure keystore path and account for live aggregation, while still allowing dry runs, to prevent weak signing setup and reduce secret exposure in logs. *Revalidates claimed proofs after claiming and verifies aggregate proofs locally before submission so tampered inputs and invalid aggregates are caught before they reach the network. --- Cargo.toml | 4 +- crates/miner-cli/src/main.rs | 21 +- crates/miner-service/src/lib.rs | 2 +- crates/miner-service/src/zk_aggregation.rs | 326 ++++++++++++++++++++- docs/delegated-aggregation-miner.md | 70 +++++ docs/key-management.md | 51 ++++ 6 files changed, 452 insertions(+), 22 deletions(-) create mode 100644 docs/delegated-aggregation-miner.md create mode 100644 docs/key-management.md diff --git a/Cargo.toml b/Cargo.toml index f697451..daea046 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,8 +30,8 @@ num-bigint = { version = "0.4", features = ["rand"] } num-traits = "0.2" num_cpus = "1.16" primitive-types = { version = "0.13.1", default-features = false } -qp-poseidon-core = { version = "1.4.0", default-features = false } qp-plonky2 = { version = "1.4.1", default-features = false, features = ["std"] } +qp-poseidon-core = { version = "1.4.0", default-features = false } qp-wormhole-aggregator = { version = "2.0.1", git = "https://github.com/Quantus-Network/qp-zk-circuits", rev = "982fd4de36019c108169fedcff348003a06ad465", default-features = false, features = ["std"] } qp-wormhole-verifier = { version = "2.0.1", git = "https://github.com/Quantus-Network/qp-zk-circuits", rev = "982fd4de36019c108169fedcff348003a06ad465", default-features = false, features = ["std"] } qpow-math = { git = "https://github.com/Quantus-Network/chain.git", tag = "v0.4.11-star-dust", package = "qpow-math", default-features = false } @@ -39,9 +39,9 @@ quantus-miner-api = { git = "https://github.com/Quantus-Network/chain.git", tag rand = { version = "0.8.5", default-features = false } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0.132", default-features = false } +tempfile = "3.8" thiserror = "1" tokio = { version = "1.36", features = ["full"] } -tempfile = "3.8" warp = "0.3" [profile.release] diff --git a/crates/miner-cli/src/main.rs b/crates/miner-cli/src/main.rs index af2b639..13ad6be 100644 --- a/crates/miner-cli/src/main.rs +++ b/crates/miner-cli/src/main.rs @@ -84,9 +84,9 @@ enum Command { #[arg(long = "aggregation-account", env = "MINER_AGGREGATION_ACCOUNT")] aggregation_account: Option, - /// Aggregation signing key or keystore path - #[arg(long = "aggregation-key", env = "MINER_AGGREGATION_KEY")] - aggregation_key: Option, + /// Secure signer/keystore path for delegated aggregation signing + #[arg(long = "aggregation-keystore", env = "MINER_AGGREGATION_KEYSTORE")] + aggregation_keystore: Option, /// Directory containing generated ZK proving/verifier artifacts #[arg(long = "zk-bins-dir", env = "MINER_ZK_BINS_DIR")] @@ -192,7 +192,7 @@ async fn main() { enable_zk_aggregation, node_rpc, aggregation_account, - aggregation_key, + aggregation_keystore, zk_bins_dir, zk_workers, max_active_zk_jobs, @@ -224,7 +224,7 @@ async fn main() { zk_aggregation: enable_zk_aggregation.then_some(ZkAggregationConfig { node_rpc, aggregation_account, - aggregation_key, + aggregation_keystore, zk_bins_dir, workers: zk_workers, max_active_jobs: max_active_zk_jobs, @@ -458,8 +458,8 @@ mod tests { "ws://127.0.0.1:9944", "--aggregation-account", "alice", - "--aggregation-key", - "test-key", + "--aggregation-keystore", + "/tmp/miner-keystore", "--zk-bins-dir", "/tmp/zk-bins", "--zk-workers", @@ -480,7 +480,7 @@ mod tests { enable_zk_aggregation, node_rpc, aggregation_account, - aggregation_key, + aggregation_keystore, zk_bins_dir, zk_workers, max_active_zk_jobs, @@ -497,7 +497,10 @@ mod tests { assert!(enable_zk_aggregation); assert_eq!(node_rpc, "ws://127.0.0.1:9944"); assert_eq!(aggregation_account.as_deref(), Some("alice")); - assert_eq!(aggregation_key.as_deref(), Some("test-key")); + assert_eq!( + aggregation_keystore, + Some(PathBuf::from("/tmp/miner-keystore")) + ); assert_eq!(zk_bins_dir, Some(PathBuf::from("/tmp/zk-bins"))); assert_eq!(zk_workers, 2); assert_eq!(max_active_zk_jobs, 3); diff --git a/crates/miner-service/src/lib.rs b/crates/miner-service/src/lib.rs index 978ccf9..0d329a3 100644 --- a/crates/miner-service/src/lib.rs +++ b/crates/miner-service/src/lib.rs @@ -465,7 +465,7 @@ pub async fn run(config: ServiceConfig) -> anyhow::Result<()> { zk_config.validate()?; log::info!( "ZK aggregation enabled: rpc={}, workers={}, max_active_jobs={}, strategy={:?}, dry_run={}", - zk_config.node_rpc, + zk_aggregation::redact_rpc_url(&zk_config.node_rpc), zk_config.workers, zk_config.max_active_jobs, zk_config.claim_strategy, diff --git a/crates/miner-service/src/zk_aggregation.rs b/crates/miner-service/src/zk_aggregation.rs index 1e76b28..ca29682 100644 --- a/crates/miner-service/src/zk_aggregation.rs +++ b/crates/miner-service/src/zk_aggregation.rs @@ -9,7 +9,8 @@ use qp_wormhole_verifier::{ }; use std::{ cmp::Ordering, - collections::BTreeSet, + collections::{BTreeMap, BTreeSet}, + fmt, path::{Path, PathBuf}, }; @@ -28,11 +29,11 @@ pub enum ClaimStrategy { RewardDensity, } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, PartialEq, Eq)] pub struct ZkAggregationConfig { pub node_rpc: String, pub aggregation_account: Option, - pub aggregation_key: Option, + pub aggregation_keystore: Option, pub zk_bins_dir: Option, pub workers: usize, pub max_active_jobs: usize, @@ -42,6 +43,23 @@ pub struct ZkAggregationConfig { pub dry_run: bool, } +impl fmt::Debug for ZkAggregationConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ZkAggregationConfig") + .field("node_rpc", &redact_rpc_url(&self.node_rpc)) + .field("aggregation_account", &self.aggregation_account) + .field("aggregation_keystore", &self.aggregation_keystore) + .field("zk_bins_dir", &self.zk_bins_dir) + .field("workers", &self.workers) + .field("max_active_jobs", &self.max_active_jobs) + .field("min_aggregation_reward", &self.min_aggregation_reward) + .field("miner_bond", &self.miner_bond) + .field("claim_strategy", &self.claim_strategy) + .field("dry_run", &self.dry_run) + .finish() + } +} + impl ZkAggregationConfig { pub fn validate(&self) -> anyhow::Result<()> { if self.node_rpc.trim().is_empty() { @@ -58,6 +76,20 @@ impl ZkAggregationConfig { "--zk-miner-bond must be greater than zero unless --dry-run-zk-aggregation is set" ); } + if !self.dry_run { + match self.aggregation_account.as_deref().map(str::trim) { + Some(account) if !account.is_empty() => {} + _ => bail!( + "--aggregation-account is required unless --dry-run-zk-aggregation is set" + ), + } + let Some(keystore_path) = &self.aggregation_keystore else { + bail!("--aggregation-keystore is required unless --dry-run-zk-aggregation is set"); + }; + validate_keystore_path(keystore_path)?; + } else if let Some(keystore_path) = &self.aggregation_keystore { + validate_keystore_path(keystore_path)?; + } let Some(zk_bins_dir) = &self.zk_bins_dir else { bail!("--zk-bins-dir is required when ZK aggregation is enabled"); @@ -66,6 +98,78 @@ impl ZkAggregationConfig { } } +pub fn redact_rpc_url(url: &str) -> String { + let Some(scheme_end) = url.find("://") else { + return redact_authority(url); + }; + let (scheme, rest_with_sep) = url.split_at(scheme_end); + let rest = &rest_with_sep[3..]; + match rest.find('@') { + Some(at) => { + let after_auth = &rest[at + 1..]; + let redacted_auth = if rest[..at].contains(':') { + "***:***" + } else { + "***" + }; + format!("{scheme}://{redacted_auth}@{after_auth}") + } + None => url.to_string(), + } +} + +fn redact_authority(value: &str) -> String { + match value.find('@') { + Some(at) if value[..at].contains(':') => format!("***:***@{}", &value[at + 1..]), + Some(at) => format!("***@{}", &value[at + 1..]), + None => value.to_string(), + } +} + +pub fn validate_keystore_path(path: &Path) -> anyhow::Result<()> { + if !path.exists() { + bail!( + "aggregation keystore path does not exist: {}", + path.display() + ); + } + let metadata = path.metadata().with_context(|| { + format!( + "failed to read aggregation keystore metadata: {}", + path.display() + ) + })?; + if !metadata.is_file() && !metadata.is_dir() { + bail!( + "aggregation keystore path must be a file or directory: {}", + path.display() + ); + } + validate_keystore_permissions(path, &metadata) +} + +#[cfg(unix)] +fn validate_keystore_permissions(path: &Path, metadata: &std::fs::Metadata) -> anyhow::Result<()> { + use std::os::unix::fs::PermissionsExt; + + let mode = metadata.permissions().mode(); + if mode & 0o077 != 0 { + bail!( + "aggregation keystore path permissions are too broad for {}: expected no group/other permissions", + path.display() + ); + } + Ok(()) +} + +#[cfg(not(unix))] +fn validate_keystore_permissions( + _path: &Path, + _metadata: &std::fs::Metadata, +) -> anyhow::Result<()> { + Ok(()) +} + pub fn validate_zk_bins_dir(path: &Path) -> anyhow::Result<()> { if !path.exists() { bail!("ZK bins directory does not exist: {}", path.display()); @@ -212,6 +316,8 @@ pub trait L1ProofGenerator: Send + Sync { bundle: &ClaimedBundle, candidate_proofs: Vec>, ) -> anyhow::Result>; + + fn verify_l1(&self, proof_bytes: &[u8]) -> anyhow::Result<()>; } #[derive(Debug)] @@ -311,8 +417,27 @@ impl L1ProofGenerator for ZkBinsL1ProofGenerator { } let proof = aggregator.aggregate().context("layer-1 proving failed")?; + aggregator + .verify(proof.clone()) + .context("local layer-1 proof verification failed")?; Ok(proof.to_bytes()) } + + fn verify_l1(&self, proof_bytes: &[u8]) -> anyhow::Result<()> { + let aggregator = Layer1Aggregator::new(&self.bins_dir, BytesDigest::new_unchecked([0; 32])) + .context("failed to load layer-1 verifier")?; + let layer1_common = aggregator + .load_common_data(CircuitType::Root) + .context("failed to load layer-1 common data")?; + let proof = ProverProofWithPublicInputs::::from_bytes( + proof_bytes.to_vec(), + &layer1_common, + ) + .map_err(|err| anyhow!("failed to deserialize L1 aggregate proof: {}", err))?; + aggregator + .verify(proof) + .context("local layer-1 proof verification failed") + } } #[derive(Debug)] @@ -453,12 +578,36 @@ impl AggregationWorkerPool { .await?; let claimed = client.fetch_bundle(claimed.bundle_id).await?; + let selected_by_id = batch + .candidates + .iter() + .map(|candidate| (candidate.candidate_id, candidate)) + .collect::>(); let mut claimed_proofs = Vec::with_capacity(claimed.ordered_candidate_ids.len()); + let mut claimed_validated = Vec::with_capacity(claimed.ordered_candidate_ids.len()); for candidate_id in &claimed.ordered_candidate_ids { - claimed_proofs.push(client.fetch_candidate_proof(*candidate_id).await?); + let candidate = selected_by_id.get(candidate_id).ok_or_else(|| { + anyhow!( + "claimed bundle contains candidate {:02x?} that was not selected pre-claim", + candidate_id + ) + })?; + let bytes = client.fetch_candidate_proof(*candidate_id).await?; + let validated_candidate = + validator.validate_candidate(&claimed.group_key, candidate, &bytes)?; + claimed_proofs.push(bytes); + claimed_validated.push(validated_candidate); } + let claimed_nullifiers = claimed_validated + .iter() + .flat_map(|candidate| candidate.nullifiers.iter().copied()) + .collect::>(); + ensure_no_duplicate_nullifiers(&claimed_nullifiers)?; let l1_proof = prover.prove_l1(&claimed, claimed_proofs).await?; + prover + .verify_l1(&l1_proof) + .context("local L1 aggregate proof verification failed before submission")?; let proof_len = l1_proof.len(); client .submit_l1_aggregate(claimed.bundle_id, l1_proof) @@ -563,11 +712,22 @@ mod tests { } } + fn write_secure_keystore(dir: &Path) -> PathBuf { + let keystore = dir.join("aggregation-keystore"); + fs::create_dir_all(&keystore).unwrap(); + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + fs::set_permissions(&keystore, fs::Permissions::from_mode(0o700)).unwrap(); + } + keystore + } + fn test_config(dir: &Path, dry_run: bool, strategy: ClaimStrategy) -> ZkAggregationConfig { ZkAggregationConfig { node_rpc: "ws://127.0.0.1:9944".to_string(), aggregation_account: Some("aggregator".to_string()), - aggregation_key: Some("key".to_string()), + aggregation_keystore: (!dry_run).then(|| write_secure_keystore(dir)), zk_bins_dir: Some(dir.to_path_buf()), workers: 1, max_active_jobs: 1, @@ -624,6 +784,7 @@ mod tests { groups: Vec, candidates: BTreeMap>, proofs: BTreeMap<[u8; 32], Vec>, + post_claim_proofs: BTreeMap<[u8; 32], Vec>, claimed_bundle: Option, registered_count: usize, claim_count: usize, @@ -688,9 +849,13 @@ mod tests { } async fn fetch_candidate_proof(&self, candidate_id: [u8; 32]) -> anyhow::Result> { - self.state - .lock() - .unwrap() + let state = self.state.lock().unwrap(); + if state.claim_count > 0 { + if let Some(proof) = state.post_claim_proofs.get(&candidate_id) { + return Ok(proof.clone()); + } + } + state .proofs .get(&candidate_id) .cloned() @@ -764,11 +929,14 @@ mod tests { &self, group: &BundleGroupKey, candidate: &L0CandidateSummary, - _proof_bytes: &[u8], + proof_bytes: &[u8], ) -> anyhow::Result { if candidate.group_key != *group { bail!("candidate group does not match selected bundle group"); } + if proof_bytes.first().copied() != Some(candidate.candidate_id[0]) { + bail!("candidate proof bytes do not match candidate id"); + } ensure_no_duplicate_nullifiers(&candidate.nullifiers)?; Ok(ValidatedL0Candidate { candidate_id: candidate.candidate_id, @@ -789,6 +957,31 @@ mod tests { ) -> anyhow::Result> { Ok(vec![1, 2, 3]) } + + fn verify_l1(&self, proof_bytes: &[u8]) -> anyhow::Result<()> { + if proof_bytes == [1, 2, 3] { + Ok(()) + } else { + bail!("mock L1 proof failed local verification") + } + } + } + + struct UnverifiedProofGenerator; + + #[async_trait] + impl L1ProofGenerator for UnverifiedProofGenerator { + async fn prove_l1( + &self, + _bundle: &ClaimedBundle, + _candidate_proofs: Vec>, + ) -> anyhow::Result> { + Ok(vec![9, 9, 9]) + } + + fn verify_l1(&self, _proof_bytes: &[u8]) -> anyhow::Result<()> { + bail!("mock L1 proof failed local verification") + } } #[test] @@ -796,7 +989,7 @@ mod tests { let config = ZkAggregationConfig { node_rpc: "ws://127.0.0.1:9944".to_string(), aggregation_account: None, - aggregation_key: None, + aggregation_keystore: None, zk_bins_dir: None, workers: 1, max_active_jobs: 1, @@ -819,6 +1012,70 @@ mod tests { assert!(err.to_string().contains("missing required artifact")); } + #[test] + fn dry_run_config_allows_missing_signing_credentials() { + let dir = tempfile::tempdir().unwrap(); + write_required_artifacts(dir.path()); + let config = ZkAggregationConfig { + node_rpc: "ws://127.0.0.1:9944".to_string(), + aggregation_account: None, + aggregation_keystore: None, + zk_bins_dir: Some(dir.path().to_path_buf()), + workers: 1, + max_active_jobs: 1, + min_aggregation_reward: 0, + miner_bond: 0, + claim_strategy: ClaimStrategy::Oldest, + dry_run: true, + }; + + config.validate().unwrap(); + } + + #[test] + fn non_dry_run_config_requires_account_and_keystore() { + let dir = tempfile::tempdir().unwrap(); + write_required_artifacts(dir.path()); + let mut config = test_config(dir.path(), false, ClaimStrategy::Oldest); + config.aggregation_account = None; + let err = config.validate().unwrap_err(); + assert!(err + .to_string() + .contains("--aggregation-account is required")); + + let mut config = test_config(dir.path(), false, ClaimStrategy::Oldest); + config.aggregation_keystore = None; + let err = config.validate().unwrap_err(); + assert!(err + .to_string() + .contains("--aggregation-keystore is required")); + } + + #[test] + fn rpc_url_redaction_hides_credentials() { + assert_eq!( + redact_rpc_url("wss://user:token@example.com/path"), + "wss://***:***@example.com/path" + ); + assert_eq!( + redact_rpc_url("https://token@example.com"), + "https://***@example.com" + ); + assert_eq!(redact_rpc_url("ws://127.0.0.1:9944"), "ws://127.0.0.1:9944"); + } + + #[test] + fn config_debug_redacts_rpc_credentials() { + let dir = tempfile::tempdir().unwrap(); + write_required_artifacts(dir.path()); + let mut config = test_config(dir.path(), false, ClaimStrategy::Oldest); + config.node_rpc = "wss://user:token@example.com/path".to_string(); + + let debug = format!("{config:?}"); + assert!(debug.contains("wss://***:***@example.com/path")); + assert!(!debug.contains("user:token")); + } + #[test] fn worker_pool_does_not_cancel_on_pow_job() { let dir = tempfile::tempdir().unwrap(); @@ -972,6 +1229,55 @@ mod tests { assert_eq!(client.counters(), (1, 1, 1)); } + #[tokio::test] + async fn worker_revalidates_exact_claimed_proofs_after_claim() { + let dir = tempfile::tempdir().unwrap(); + write_required_artifacts(dir.path()); + let group = group(1, 1); + let candidate = candidate(1, group.clone(), 3, 10); + let candidate_id = candidate.candidate_id; + let client = MockChainClient::with_group(group.clone(), vec![candidate]); + { + let mut state = client.state.lock().unwrap(); + state.post_claim_proofs.insert(candidate_id, vec![0]); + } + let pool = + AggregationWorkerPool::new(test_config(dir.path(), false, ClaimStrategy::Oldest)) + .unwrap(); + + let err = pool + .run_once(&client, &SummaryProofValidator, &StaticProofGenerator) + .await + .unwrap_err(); + + assert!(err + .to_string() + .contains("candidate proof bytes do not match")); + assert_eq!(client.counters(), (1, 1, 0)); + } + + #[tokio::test] + async fn worker_does_not_submit_l1_proof_that_fails_local_verification() { + let dir = tempfile::tempdir().unwrap(); + write_required_artifacts(dir.path()); + let group = group(1, 1); + let client = + MockChainClient::with_group(group.clone(), vec![candidate(1, group.clone(), 3, 10)]); + let pool = + AggregationWorkerPool::new(test_config(dir.path(), false, ClaimStrategy::Oldest)) + .unwrap(); + + let err = pool + .run_once(&client, &SummaryProofValidator, &UnverifiedProofGenerator) + .await + .unwrap_err(); + + assert!(err + .to_string() + .contains("local L1 aggregate proof verification failed")); + assert_eq!(client.counters(), (1, 1, 0)); + } + #[test] fn zk_aggregation_prove_generates_l1_proof_from_fixture_when_configured() { let Ok(bins_dir) = std::env::var("ZK_AGGREGATION_TEST_BINS_DIR") else { diff --git a/docs/delegated-aggregation-miner.md b/docs/delegated-aggregation-miner.md new file mode 100644 index 0000000..14ac5cd --- /dev/null +++ b/docs/delegated-aggregation-miner.md @@ -0,0 +1,70 @@ +# Delegated Aggregation Miner Guide + +The miner delegated aggregation worker watches for compatible L0 Wormhole +candidates, claims a bundle, validates the exact claimed proofs, generates an +L1 aggregate proof, verifies that proof locally, and submits it to the chain. + +## Configuration + +Dry-run mode is allowed without signing credentials: + +```bash +quantus-miner serve \ + --enable-zk-aggregation \ + --dry-run-zk-aggregation \ + --node-rpc ws://127.0.0.1:9944 \ + --zk-bins-dir ../qp-zk-circuits/generated-bins +``` + +Non-dry-run mode requires an aggregation account and secure keystore path: + +```bash +quantus-miner serve \ + --enable-zk-aggregation \ + --node-rpc ws://127.0.0.1:9944 \ + --aggregation-account \ + --aggregation-keystore /secure/path/aggregation-keystore \ + --zk-bins-dir ../qp-zk-circuits/generated-bins \ + --zk-miner-bond 1000000000000 \ + --min-aggregation-reward 0 +``` + +Do not pass raw signing key material through CLI arguments or environment +variables in non-dry-run mode. + +## Worker Flow + +1. Select compatible pending L0 candidates. +2. Validate candidate proof bytes before claim. +3. Claim a bundle and lock nullifiers on chain. +4. Fetch the exact claimed bundle. +5. Re-fetch and revalidate the exact claimed candidate proofs. +6. Generate the L1 aggregate proof. +7. Locally verify the L1 aggregate proof. +8. Submit the L1 aggregate to chain. +9. Retry or let the runtime timeout/challenge path clean up failures. + +The worker must never submit an L1 proof that fails local verification. + +## Operational Notes + +- Use `--min-aggregation-reward` to avoid uneconomical bundles. +- Use `--max-active-zk-jobs` conservatively until the E2E restart/timeout tests + pass on a live devnet. +- RPC URLs with embedded credentials are redacted in debug/log output, but avoid + credential-bearing URLs where possible. +- The `--zk-bins-dir` directory must contain matching L0 and L1 artifacts for + the same circuits revision used by the chain runtime. + +## Troubleshooting + +- Missing artifact error: regenerate `generated-bins`, including `layer1_*` + artifacts. +- Keystore rejected: ensure the path exists, is a file or directory, and on + Unix has no group/other permissions. +- Worker skips a batch: check reward threshold, candidate compatibility, and + active job limits. +- Claim succeeds but submit does not: inspect local proof validation and L1 + verification errors; do not bypass verification. +- Restart mid-bundle: the runtime timeout path must release locks if the miner + cannot resume and settle. diff --git a/docs/key-management.md b/docs/key-management.md new file mode 100644 index 0000000..6f299af --- /dev/null +++ b/docs/key-management.md @@ -0,0 +1,51 @@ +# Miner Key Management + +Delegated aggregation signing must use a secure signer, keystore, or provider. +Raw private key CLI arguments and raw private key environment variables are not +accepted for non-dry-run delegated aggregation. + +## Requirements + +- Non-dry-run mode requires `--aggregation-account`. +- Non-dry-run mode requires `--aggregation-keystore`. +- Dry-run mode may omit signing credentials. +- If a keystore path is supplied, it must exist and be a file or directory. +- On Unix, the keystore path must not be readable, writable, or executable by + group or other users. + +Example Unix setup: + +```bash +install -d -m 700 /secure/path/aggregation-keystore +quantus-miner serve \ + --enable-zk-aggregation \ + --aggregation-account \ + --aggregation-keystore /secure/path/aggregation-keystore \ + --node-rpc ws://127.0.0.1:9944 \ + --zk-bins-dir ../qp-zk-circuits/generated-bins \ + --zk-miner-bond 1000000000000 +``` + +## Secret Handling + +- Do not commit keystores. +- Do not put signing keys in shell history. +- Do not pass raw signing keys through process args. +- Do not print credential-bearing RPC URLs in support bundles. +- Prefer isolated operator accounts funded only for delegated aggregation bond + and fee needs. + +## Rotation + +1. Register or fund the replacement aggregation account. +2. Start a miner with the replacement keystore in dry-run mode. +3. Stop new claims from the old miner. +4. Let old active bundles settle or timeout. +5. Start non-dry-run mode for the replacement account. + +## Incident Response + +- If a signing key is suspected compromised, stop the miner immediately. +- Let active bundle locks resolve via settlement, timeout, or challenge. +- Rotate to a new aggregation account. +- Review reward and slash events around the incident window. From 10dcdda3bb95978379734112e08b0773f7552747 Mon Sep 17 00:00:00 2001 From: Ethan Date: Sun, 10 May 2026 21:25:19 -0500 Subject: [PATCH 4/5] *fix clippy --- crates/miner-service/src/zk_aggregation.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/miner-service/src/zk_aggregation.rs b/crates/miner-service/src/zk_aggregation.rs index ca29682..0959085 100644 --- a/crates/miner-service/src/zk_aggregation.rs +++ b/crates/miner-service/src/zk_aggregation.rs @@ -621,10 +621,10 @@ impl AggregationWorkerPool { } } -fn select_batch<'a>( - batches: &'a [CandidateBatch], +fn select_batch( + batches: &[CandidateBatch], strategy: ClaimStrategy, -) -> Option<&'a CandidateBatch> { +) -> Option<&CandidateBatch> { match strategy { ClaimStrategy::Oldest => batches.iter().min_by_key(|batch| batch.oldest_submitted_at), ClaimStrategy::RewardDensity => batches.iter().max_by(|left, right| { From 2604eff49a931cf34afffb2bf38b9379e9b95e1d Mon Sep 17 00:00:00 2001 From: Ethan Date: Sun, 10 May 2026 22:51:08 -0500 Subject: [PATCH 5/5] *fmt --- crates/miner-service/src/zk_aggregation.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/miner-service/src/zk_aggregation.rs b/crates/miner-service/src/zk_aggregation.rs index 0959085..750e91c 100644 --- a/crates/miner-service/src/zk_aggregation.rs +++ b/crates/miner-service/src/zk_aggregation.rs @@ -621,10 +621,7 @@ impl AggregationWorkerPool { } } -fn select_batch( - batches: &[CandidateBatch], - strategy: ClaimStrategy, -) -> Option<&CandidateBatch> { +fn select_batch(batches: &[CandidateBatch], strategy: ClaimStrategy) -> Option<&CandidateBatch> { match strategy { ClaimStrategy::Oldest => batches.iter().min_by_key(|batch| batch.oldest_submitted_at), ClaimStrategy::RewardDensity => batches.iter().max_by(|left, right| {