perf(cubestore): reduce metastore RPC fan-out during partitioning#11095
perf(cubestore): reduce metastore RPC fan-out during partitioning#11095waralexrom wants to merge 8 commits into
Conversation
|
Claude finished @waralexrom's task in 3m 45s —— View job Code Review — perf(cubestore): reduce metastore RPC fan-out during partitioningVerdict: LGTM with minor non-blocking notes. 0 high · 0 medium · 5 low (all posted inline) Details — checklist, findings, and rationale
OverallClean, well-scoped change. The flag-gated approach ( The four real wins:
Inline notes (all low / non-blocking)
Things I checked and didn't flag
|
| if self.config.metastore_batch_rpc() { | ||
| // Create all child chunks in one metastore write, then upload their files. The | ||
| // chunks are inactive until the swap below, so creating them before the uploads | ||
| // matches the per-item path's visibility. | ||
| let mut specs = Vec::new(); | ||
| let mut spec_files = Vec::new(); | ||
| for w in written { | ||
| if w.num_rows == 0 { | ||
| let _ = tokio::fs::remove_file(&w.file).await; | ||
| continue; | ||
| } | ||
| let child = &children[w.child_index]; | ||
| specs.push(Chunk::new( | ||
| child.get_id(), | ||
| w.num_rows, | ||
| Some(Row::new(w.min)), | ||
| Some(Row::new(w.max)), | ||
| false, | ||
| ) | ||
| .await?; | ||
| let remote = ChunkStore::chunk_file_name(chunk.clone()); | ||
| let file_size = self.remote_fs.upload_file(w.file, remote).await?; | ||
| new_chunk_ids.push((chunk.get_id(), Some(file_size))); | ||
| )); | ||
| spec_files.push(w.file); | ||
| } | ||
| if !specs.is_empty() { | ||
| let chunks = self.meta_store.insert_chunks(specs).await?; | ||
| for (file, chunk) in spec_files.into_iter().zip(chunks) { | ||
| let remote = ChunkStore::chunk_file_name(chunk.clone()); | ||
| let file_size = self.remote_fs.upload_file(file, remote).await?; | ||
| new_chunk_ids.push((chunk.get_id(), Some(file_size))); | ||
| } | ||
| } | ||
| } else { | ||
| for w in written { |
There was a problem hiding this comment.
Failure semantics differ subtly between the two branches: on the per-item path, if upload of chunk k fails, chunks 0..k exist as metastore rows with uploaded=false and k+1.. have no rows at all. On the batched path, all N chunk rows are created upfront, so a mid-stream upload failure leaves the full N rows as uploaded=false orphans (vs. k orphans before).
These should be cleaned up by the same uploaded-false sweep that handled the smaller orphan set previously, so this is not a correctness bug — but it does widen the failure blast radius for one job. Worth confirming there is no codepath that interprets "an uploaded=false chunk row exists" as "data was at least partially written" before merging.
| async fn create_partitions( | ||
| &self, | ||
| partitions: Vec<Partition>, | ||
| ) -> Result<Vec<IdRow<Partition>>, CubeError> { | ||
| self.write_operation("create_partitions", move |db_ref, batch_pipe| { | ||
| let table = PartitionRocksTable::new(db_ref.clone()); | ||
| let mut result = Vec::with_capacity(partitions.len()); | ||
| for partition in partitions { | ||
| result.push(table.insert(partition, batch_pipe)?); | ||
| } | ||
| Ok(result) | ||
| }) | ||
| .await | ||
| } |
There was a problem hiding this comment.
Consider adding #[tracing::instrument(level = "trace", skip(self, partitions))] to match create_partition above — keeps tracing parity between the two paths when toggling the flag during incident triage. Same suggestion applies to get_active_partitions_for_indexes below.
|
Claude finished @waralexrom's task in 1m 16s —— View job Re-review after syncVerdict: LGTM — no new findings on the 2 commits since the prior review. Details — what changed, what got resolved, what's still open
What changed since the prior review
Threads resolved
Threads left open (still apply, not addressed in this push)
Neither blocks the merge. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## cubestore-chunk-repartition-speed-up #11095 +/- ##
=====================================================================
Coverage 58.50% 58.50%
=====================================================================
Files 216 216
Lines 17270 17270
Branches 3524 3524
=====================================================================
Hits 10103 10103
Misses 6652 6652
Partials 515 515
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
The disk-space scan single-flight on main waited up to a hardcoded 10s for the compute lock. While a worker waits, its metastore RPC connection stays open, growing the simultaneous-open-connection footprint and the window for a reset. Cap it via CUBESTORE_DISK_SPACE_LOCK_WAIT_MS (default 1000ms, in milliseconds so sub-second values work) and fall back to the existing fail-open path on timeout.
add_chunk_columns and post_process_columns each re-fetched the table over the metastore RPC for every chunk they produced, even though the table is the same for the whole import/repartition job and immutable for its duration. On a wide table this is one redundant read RPC per chunk, each a separate worker->main connection feeding the connect-per-RPC storm. Thread the table down instead: build_index_chunks loads it once before the index loop, partition_rows loads it once for the single-index path, and add_persistent_chunk loads it once. compact() now reuses the table already returned by get_partition_for_compaction instead of re-fetching it.
…oning job partition_rows_for_index ran check_node_disk_space for every partition it wrote to, each a separate get_used_disk_space_out_of_queue metastore RPC. The check resolves to a per-node total, so checking each distinct target node once is enough and the answer is identical. Track checked nodes in a set and skip repeats, cutting the per-partition RPC fan-out to one per node.
… flag During a partitioning job build_index_chunks fetched active partitions with one get_active_partitions_by_index_id RPC per index. Add a batched get_active_partitions_for_indexes(Vec<u64>) that returns them for all indexes in a single metastore round-trip, gated behind CUBESTORE_METASTORE_BATCH_RPC (default off). When the flag is off the per-index path is unchanged. partition_rows_for_index takes the active partitions as an optional preset; a missing map entry in batch mode is an internal error rather than a silent empty set (which would trip the corrupt-data path). Tests cover the metastore method against the per-index method and the flag-on path through build_index_chunks with multiple indexes.
…flag compact() and the multi-partition split both created child partitions one create_partition RPC at a time inside a loop. Add create_partitions(Vec) that inserts them all in one metastore write transaction, gated behind CUBESTORE_METASTORE_BATCH_RPC. This cuts N write RPCs / N write-lock acquisitions to one and makes the creation all-or-nothing, removing the orphan-partition window if a mid-loop insert failed. When the flag is off the per-item loop is unchanged. MultiSplit carries the flag from the compaction config. Tested at the metastore level and through the compaction split path with the flag on.
…nd the flag merge_chunk_group_into_children created one child chunk per written file with a create_chunk RPC inside a loop. When CUBESTORE_METASTORE_BATCH_RPC is on, build all child chunk specs and create them in one insert_chunks write, then upload their files; the chunks are inactive until the swap, so create-before-upload matches the per-item visibility and any chunk left behind by a failed upload is swept by the existing remove_inactive_not_uploaded_chunks reaper. The per-item path is unchanged when the flag is off. This is the repartition cascade that wide imports trigger, so it cuts a large share of the create_chunk write RPCs. Reuses the existing insert_chunks metastore method. Covered by repartition_chunk_range_merges_only_range run with the flag on.
get_active_partitions_for_indexes returned HashMap<u64, _>, but the metastore RPC serializes with flexbuffers, which rejects non-string map keys (KeyMustBeString) — it failed only over a real worker->main RPC, surfacing as "Connection closed unexpectedly" during imports with the flag on, while the node-local unit tests passed. Return a positional Vec<Vec<IdRow<Partition>>> aligned with the requested index ids instead. build_index_chunks zips by position and validates the length; a length mismatch is an internal error rather than a silent empty set. Full cubestore lib suite passes with the flag forced on.
Address review: hoist a disk_check_enabled flag so the per-partition node-name build + dedup set are skipped entirely when max_disk_space_per_worker is 0 (the common case), and guard the batched active-partition positional pairing with a debug_assert that each entry belongs to the index at the same position.
b34cf83 to
c47cdb4
Compare
|
Claude finished @waralexrom's task in 43s —— View job Re-review after force-pushVerdict: LGTM — no new findings. Branch was force-pushed (commits Details — what changed, what's still open
What changed since the last re-reviewThe branch was rebased — Threads still open (re-affirmed)
Neither blocks merge. Threads resolved (still resolved)
· · Branch |
Summary
Cuts the worker→main metastore RPC fan-out during import/repartition/compaction jobs. Each metastore method is a one-shot TCP round-trip (and, for writes, a RocksDB write-lock acquisition); under wide imports these pile up into a connection storm against the main node — the suspected mechanism behind the recent import
Connection reset by peerincident. This PR removes redundant reads, dedups per-node checks, and batches the per-index / per-child metastore calls behind a feature flag.Stacked on #11088 (
cubestore-chunk-repartition-speed-up) — review/merge that first; the base of this PR is that branch.Changes
get_table_by_idper chunk (and reuse the table already returned byget_partition_for_compaction).get_used_disk_space_out_of_queueis resolved once per distinct target node instead of once per partition written.CUBESTORE_DISK_SPACE_LOCK_WAIT_MS(default 1000ms, sub-second allowed) — bounds how long a worker's RPC connection stays open waiting on the scan lock; was a hardcoded 10s.CUBESTORE_METASTORE_BATCH_RPC(default off):get_active_partitions_for_indexes— active partitions for all of a job's indexes in one read (I→1).create_partitions— child partitions created in one write transaction in compaction split and multi-split (N→1, all-or-nothing).create_chunks— range-repartition child chunks created via the existinginsert_chunksin one write (the wide-import repartition cascade).With the flag off, every path is byte-equivalent to the previous behavior.
Testing
cargo test -p cubestore --lib: green with the flag off, and 282/282 green with the flag forced on (temporaryConfig::testflip, reverted).HashMap<u64, _>, which the flexbuffers metastore RPC can't serialize (non-string map keys); fixed by returning a positionalVec<Vec<…>>. Node-local unit tests missed it; the sql tests (real router+worker) caught it.partition_data, compaction split, and range repartition).🤖 Generated with Claude Code