perf(cubestore): faster repartition (prefetch, per-partition merge, range jobs)#11088
perf(cubestore): faster repartition (prefetch, per-partition merge, range jobs)#11088waralexrom wants to merge 11 commits into
Conversation
b09e8e6 to
36caefe
Compare
|
Claude finished @waralexrom's task in 4m 0s —— View job Code Review —
|
|
Claude finished @waralexrom's task in 1m 0s —— View job Code Review — follow-up on
|
| Prior finding | Fix in 2f67894 |
Verdict |
|---|---|---|
Medium — compute_repartition_children deactivates table on empty children (race) |
store/mod.rs:1150 — early if children.is_empty() returns CubeError::internal(...) for scheduler retry; tiling check only runs when children are present |
✅ Correct. Deactivation now only fires on the genuine "children present but don't tile" corruption case. |
Low — Periodic all_inactive_partitions_to_repartition() in GC hot path |
scheduler/mod.rs — new draining_repartition_parents() with 5s-TTL Mutex<Option<(Instant, Arc<HashSet<u64>>)>> memo. Inline comment explains staleness is bounded by not_used_timeout. |
✅ Correct. Cache is per-scheduler; a thundering-herd refresh would just do redundant metastore reads (harmless). |
Low — Prefetch bills unknown file_size at 1 byte |
store/mod.rs:1006 — .unwrap_or(capacity as u64) so an unknown chunk takes the full budget and serializes against any other. |
✅ Correct, and the right side of conservative — degrades to fully serialized prefetch on missing metadata rather than overshooting the on-disk budget. |
Low — No clamp on max_rows / max_files (0 silently degrades) |
cluster/mod.rs:957 — both clamped with .max(1) and an inline note that the sane range is >= 2. |
✅ Correct. Loop now always advances; the comment correctly flags that 1 still degrades quality (just doesn't break). |
Things considered and dismissed
- Cache refresh race: two concurrent callers can both miss and both refresh — extra metastore call, no correctness impact.
- Unknown-
file_sizeserialization cost: a missingfile_sizeblocks all other prefetches; in practice persisted chunks have a size, so this is a safety net, not a hot path. children.is_empty()retry loop: if children are genuinely missing (not a race), the job will retry forever — but the scheduler's existing job-failure backoff handles this, and the alternative (deactivating a healthy table) is strictly worse.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #11088 +/- ##
=======================================
Coverage 58.50% 58.50%
=======================================
Files 216 216
Lines 17270 17270
Branches 3524 3524
=======================================
Hits 10103 10103
Misses 6652 6652
Partials 515 515
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
Claude finished @waralexrom's task in 50s —— View job Code Review — follow-up on
|
|
Claude finished @waralexrom's task in 37s —— View job Code Review — follow-up on
|
Add an opt-in sequential prefetch to the batch repartition loop. A producer downloads upcoming chunk parquets (anchor last, others by ascending id) into the local cache while the consumer repartitions the current chunk, so the download latency is hidden behind processing. A byte-budget semaphore bounds how much fetched-but-unprocessed data sits on local disk: each chunk holds permits worth its file size from before its download until the consumer finishes it. Chunks prefetched past the time budget stay local and warm the follow-up job, which lands on the same node by partition. download_file is idempotent and dedups in-flight downloads, so the consumer's repartition_chunk just hits the warm file. The producer makes no metastore calls; the chunk list is still read once. Controlled by CUBESTORE_REPARTITION_PREFETCH_BUDGET (accepts size suffixes, e.g. 512MB); None (unset) or 0 disables prefetching. Added env_optparse_size to keep None distinct from an explicit value.
Add an opt-in merge path for repartitioning an inactive parent's persisted chunks. Instead of splitting each chunk independently into the active children, the parent's chunks are merged k-way (in groups of up to a configurable cap) and the sorted stream is split directly into the children at the wal-split limit in one streaming pass, producing full-size chunks and avoiding the per-chunk fragment fan-out plus the compaction that would later merge them. Each group commits with a single atomic swap_chunks; a group whose source was already repartitioned by a racing job is skipped (its new chunks stay inactive and are GC'd). A fully empty group deactivates its sources directly. Children must tile the parent's range exactly, otherwise the table is deactivated as on the per-chunk path. The anchor is processed last so it keeps holding the job dedup key until the run finishes or yields on the time budget. Reuses the compaction streaming writer machinery via a new write_chunks_split_into_children that cuts files on child boundaries and the row-count limit. Controlled by CUBESTORE_REPARTITION_MERGE_MAX_INPUT_FILES (Option<usize>); None / 0 / 1 disable it, Some(m >= 2) caps each group at m input chunks. Default off.
…ector Add a third repartition strategy that slices an inactive parent's persisted chunks into RepartitionRange jobs at schedule time. Slicing walks all chunks (active and inactive) ordered by id and cuts a range once it reaches the row or chunk-count cap, so the [start, end] bounds stay pinned to chunk ids and a re-slice reproduces them; the end is carried as job data, not the dedup key, so a tail that extends the trailing range dedups on its start instead of spawning a second job. Each range runs as one atomic swap on the worker chosen by the hash of its bounds, restoring cross-worker parallelism. A GC gate keeps an inactive parent's chunks until it fully drains so slicing stays stable. Replace the ad-hoc flags with a single CUBESTORE_REPARTITION_STRATEGY selector (per_chunk default, per_partition, range); an unknown value logs a warning and falls back to per_chunk. The merge caps (CUBESTORE_REPARTITION_MERGE_MAX_INPUT_FILES default 50, CUBESTORE_REPARTITION_MERGE_MAX_ROWS default 4000000) become plain caps with defaults. The per-partition merge core is shared between the in-job loop and the range handler. JobType::RepartitionRange deserializes as an unknown variant on binaries that predate it, so it is only safe behind the skip-unknown-jobs handling; enable the strategy per-deployment.
- compute_repartition_children: an empty children set is treated as a transient topology read and returns an error to retry, instead of deactivating the table; deactivation now only happens when children are present but do not tile the parent's range (genuine corruption). - Cache the draining-parents set (inactive parents with active chunks) with a short TTL so the GC loop does not re-scan the chunk table every cycle when the range strategy is enabled. - Prefetch: bill a chunk with no recorded file_size at the full budget instead of ~free, so an unknown-size fetch cannot overshoot the on-disk budget. - Clamp the range slicing caps to >= 1 so a misconfigured 0 does not break the slicing loop before adding a chunk.
…e dedup - repartition_partition_chunks_merged now groups chunks by max_rows OR max_input_files (whichever is hit first), matching how the range strategy slices its jobs, instead of grouping purely by chunk count. - Comment the last-row/aggregate dedup in merge_chunk_group_into_children: it is the same merge_chunks compaction uses; for unique-key tables the sort key ends with the seq column so the latest version wins, and group order only breaks ties between rows with an identical (unique key, seq).
… strategy repartition_partition_chunks_yields_on_budget and repartition_partition_chunks_prefetch_drains assert per-chunk-path behavior, so set the strategy explicitly instead of inheriting the default — keeps them valid regardless of the configured strategy (mirrors how the merge/range tests pin theirs).
…urrent download in merge Remove CUBESTORE_BATCH_REPARTITION / batch_repartition_enabled. The repartition strategy (per_chunk / per_partition / range) is now the sole selector: - per_chunk: one job per chunk -> repartition_chunk - per_partition: one anchor job per partition -> merge - range: RepartitionRange jobs -> merge The job handler and scheduler dispatch on the strategy directly. The dead per_chunk+batch hybrid (per-chunk loop and its producer/consumer prefetch) is gone, along with its two now-obsolete tests. Prefetch is reworked: drop the byte-budget producer/consumer and CUBESTORE_REPARTITION_PREFETCH_BUDGET; add a plain bool CUBESTORE_REPARTITION_CONCURRENT_DOWNLOAD (default off) that downloads a merge group's chunk parquets concurrently before building the merge inputs. It applies to both per_partition and range; the group is already bounded by repartition_merge_max_input_files and the pool by download_concurrency, so no extra budget is needed.
18b967e to
1ed20eb
Compare
|
Claude finished @waralexrom's task in 1m 40s —— View job Code Review — follow-up on
|
The merge-based repartition path (per_partition / range strategies) commits its new chunks with swap_chunks, which enforces that the activated row count equals the deactivated row count. merge_chunks aggregates (aggregate indexes) and last-row-dedups (unique-key tables) the source group, so it legitimately emits fewer rows than it consumed. The checked swap then rejected the commit with "Deactivated row count (..) doesn't match activated row count (..) during swap", failing RepartitionRange / per-partition jobs. Commit with swap_chunks_without_check instead, matching how compaction commits its dedup'd merges. Adds a unit test on an aggregate index that reproduces the exact failure before the fix. Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
…dedup Adds range- and per-partition-strategy SQL tests on an aggregate-index table whose chunks share dimension keys across inserts. The repartition merge groups those rows by the sort key, so the swap activates fewer rows than it deactivates - the production RepartitionRange row-count-mismatch scenario. Without the unchecked-swap fix the repartition jobs never drain the inactive parents; with it the data stays consistent (sum(m) and per-key sums conserved). Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
…apses rows The merge-based repartition path committed every group with the unchecked swap. That is required only for aggregate indexes and unique-key tables, where merge_chunks aggregates / last-row-dedups and the activated row count is legitimately smaller. For a plain regular index the merge is a pure sort-merge that conserves rows, so commit it through the checked swap_chunks to keep the row-count integrity guard.
|
Claude finished @waralexrom's task in 1m 12s —— View job Code Review — follow-up on
|
compute_repartition_children deactivated the table as corrupt whenever the inactive parent's active children did not tile its range exactly. That check compared data-extent bounds (min_val/max_val), but a split sets the first child's lower bound to its first segment's data min rather than the parent's lower bound, so for normal multi-level topologies the children legitimately leave gaps and the outer edges don't match the parent. The check fired in production on RepartitionRange jobs and deactivated healthy pre-aggregation tables. The streaming split never drops rows (the first child is the low catch-all, the last the high one) and the legacy per-chunk path performed no such check, so the strict tiling guard was wrong. Replace it with an optional non-overlap check, gated behind CUBESTORE_REPARTITION_CHECK_OVERLAPPING_CHILDREN and off by default: gaps are benign, only overlapping children are genuine corruption.
|
Claude finished @waralexrom's task in 1m 5s —— View job Code Review — follow-up on
|
Summary
Speeds up repartition of an inactive parent's persisted chunks — the dominant cost in large CSV imports. Adds three opt-in levers behind a single strategy selector; default behavior is unchanged.
Changes
CUBESTORE_REPARTITION_PREFETCH_BUDGET): a sequential producer downloads upcoming chunks (anchor last) while the current one is processed, hiding download latency behind processing. A byte-budget semaphore bounds fetched-but-unprocessed data on disk; idempotent/deduped downloads make it race-safe.CUBESTORE_REPARTITION_STRATEGY):per_chunk(default),per_partition,range. Unknown value logs a warning and falls back toper_chunk.per_partition: one job streams the partition's chunks through a k-way merge in groups and splits the sorted stream into the active children at the wal-split limit, producing full-size chunks in one pass (reuses the compaction streaming-writer machinery; no materialization).range: schedule-time slicing intoRepartitionRangejobs, each merging an inclusive[start, end]chunk-id range as one atomic swap on the worker chosen by the hash of its bounds — restoring cross-worker parallelism. Slicing walks all chunks (active + inactive) so ranges stay id-pinned; the range end is job data, not the dedup key, so a late "tail" chunk dedups on its start instead of spawning duplicate jobs. A GC gate keeps an inactive parent's chunks until it fully drains so slicing stays stable.CUBESTORE_REPARTITION_MERGE_MAX_INPUT_FILES(default 50) andCUBESTORE_REPARTITION_MERGE_MAX_ROWS(default 4000000) bound merge fan-in / group size.Testing
repartition_multi_node_consistency(in-process / cluster / multi-process) verified green withper_partitionandrangeenabled.cargo checkclean oncubestoreandcubestore-sql-tests;cargo fmtapplied.Rollout
JobType::RepartitionRangedeserializes as an unknown variant on binaries that predate it, sorangeis only safe alongside the skip-unknown-jobs handling. All strategies default off (per_chunk); enable per-deployment.