Skip to content

perf(cubestore): reduce metastore RPC fan-out during partitioning#11095

Open
waralexrom wants to merge 8 commits into
cubestore-chunk-repartition-speed-upfrom
cubestore-network-investigation
Open

perf(cubestore): reduce metastore RPC fan-out during partitioning#11095
waralexrom wants to merge 8 commits into
cubestore-chunk-repartition-speed-upfrom
cubestore-network-investigation

Conversation

@waralexrom

Copy link
Copy Markdown
Member

Summary

Cuts the worker→main metastore RPC fan-out during import/repartition/compaction jobs. Each metastore method is a one-shot TCP round-trip (and, for writes, a RocksDB write-lock acquisition); under wide imports these pile up into a connection storm against the main node — the suspected mechanism behind the recent import Connection reset by peer incident. This PR removes redundant reads, dedups per-node checks, and batches the per-index / per-child metastore calls behind a feature flag.

Stacked on #11088 (cubestore-chunk-repartition-speed-up) — review/merge that first; the base of this PR is that branch.

Changes

  • Load the table once per partitioning job instead of re-fetching get_table_by_id per chunk (and reuse the table already returned by get_partition_for_compaction).
  • Dedup the disk-space check across a job: get_used_disk_space_out_of_queue is resolved once per distinct target node instead of once per partition written.
  • Configurable disk-space single-flight lock wait via CUBESTORE_DISK_SPACE_LOCK_WAIT_MS (default 1000ms, sub-second allowed) — bounds how long a worker's RPC connection stays open waiting on the scan lock; was a hardcoded 10s.
  • Batched metastore RPCs behind CUBESTORE_METASTORE_BATCH_RPC (default off):
    • get_active_partitions_for_indexes — active partitions for all of a job's indexes in one read (I→1).
    • create_partitions — child partitions created in one write transaction in compaction split and multi-split (N→1, all-or-nothing).
    • create_chunks — range-repartition child chunks created via the existing insert_chunks in one write (the wide-import repartition cascade).

With the flag off, every path is byte-equivalent to the previous behavior.

Testing

  • cargo test -p cubestore --lib: green with the flag off, and 282/282 green with the flag forced on (temporary Config::test flip, reverted).
  • The full-suite-with-flag run caught a real bug: the batched active-partition fetch first returned a HashMap<u64, _>, which the flexbuffers metastore RPC can't serialize (non-string map keys); fixed by returning a positional Vec<Vec<…>>. Node-local unit tests missed it; the sql tests (real router+worker) caught it.
  • Per-batch correctness covered by new tests (metastore-level equivalence vs the per-item methods + flag-on paths through partition_data, compaction split, and range repartition).

🤖 Generated with Claude Code

@waralexrom waralexrom requested a review from a team as a code owner June 16, 2026 16:37
@github-actions github-actions Bot added cube store Issues relating to Cube Store rust Pull requests that update Rust code labels Jun 16, 2026
@claude

claude Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Claude finished @waralexrom's task in 3m 45s —— View job


Code Review — perf(cubestore): reduce metastore RPC fan-out during partitioning

Verdict: LGTM with minor non-blocking notes. 0 high · 0 medium · 5 low (all posted inline)

Details — checklist, findings, and rationale
  • Fetch PR diff vs origin/cubestore-chunk-repartition-speed-up
  • Check existing review threads (none from prior runs)
  • Review config/mod.rs (flag + lock-wait knob)
  • Review metastore/mod.rs (create_partitions, get_active_partitions_for_indexes)
  • Review store/compaction.rs (table reuse, batched child creation)
  • Review store/mod.rs (table threading, disk-space dedup, batched insert_chunks)
  • Post inline notes

Overall

Clean, well-scoped change. The flag-gated approach (CUBESTORE_METASTORE_BATCH_RPC, default off) makes this safe to land and roll out incrementally — with the flag off, every path is functionally identical to the prior code (verified by reading the diff). Test coverage hits both branches: per-item paths in the existing tests, batched paths in two new tests (partition_data_with_batch_rpc, repartition_chunk_range_merges_only_range with the flag forced on) plus metastore-level equivalence tests. The PR description's note about the flexbuffers/HashMap<u64,_> bug found by the SQL test suite is a good signal that the test plan worked.

The four real wins:

  1. Reusing table from get_partition_for_compaction (compaction.rs:704) — removes a guaranteed-redundant get_table_by_id per compaction job.
  2. Loading table once per partition_data call (store/mod.rs:2972) and threading it through partition_rows_for_index / post_process_columns / add_chunk_columns — removes O(indexes × partitions) get_table_by_id RPCs.
  3. Disk-space check dedup (store/mod.rs:2727+) — converts O(partitions) get_used_disk_space_out_of_queue RPCs to O(distinct nodes), which on a single-router cluster is 1.
  4. Batched writes behind the flag — N→1 round-trips and one RocksDB write-lock acquisition for the per-index/per-child fan-outs.

Inline notes (all low / non-blocking)

  1. store/mod.rs L2751–2754node_name_by_partition and HashSet insert run for every partition even when max_disk_space_per_worker == 0; consider hoisting an early-out. Also check_node_disk_space recomputes the node name internally.
  2. store/mod.rs L1187–1217 — Batched insert_chunks widens the orphan-row blast radius on upload failure (N rows vs k). Worth confirming the uploaded-false sweep treats batch orphans the same as the existing single orphans.
  3. store/mod.rs L3005–3008std::mem::take(&mut all[i]) is correct only because indexes is a unique, ordered set. A debug_assert! matching the fetched ids against the input ids would catch regressions cheaply.
  4. config/mod.rs L1853–1858 — Verify the arg order on env_parse_duration("CUBESTORE_DISK_SPACE_LOCK_WAIT_MS", 1000, Some(60_000), None) actually permits sub-second values as the PR description claims (the parameter naming is ambiguous from the call site).
  5. metastore/mod.rs L2774+ / L3631+ — Add #[tracing::instrument] on create_partitions and get_active_partitions_for_indexes to keep tracing parity with their per-item counterparts during incident triage.

Things I checked and didn't flag

  • create_partitions vs create_partition semantics: both call PartitionRocksTable::new(db_ref).insert(...) inside write_operation. The only difference is the op label and that all inserts share one batch_pipe. No swap-tracker / lifecycle hooks are bypassed.
  • get_active_partitions_for_indexes on an unknown index id returns Vec::new(), matching get_active_partitions_by_index_id (covered by the new test).
  • writer_props(&IdRow<Table>) already took a reference, so the table-by-ref threading is zero-friction.
  • The if !in_memory && checked_nodes.insert(...) short-circuit is correct — insert is not called on the in-memory path.
  • Config::test flip used for the full-suite-with-flag-on run is reverted (config defaults to false).
· [Branch](https://github.com/cube-js/cube/tree/cubestore-network-investigation)

Comment thread rust/cubestore/cubestore/src/store/mod.rs
Comment on lines +1187 to +1217
if self.config.metastore_batch_rpc() {
// Create all child chunks in one metastore write, then upload their files. The
// chunks are inactive until the swap below, so creating them before the uploads
// matches the per-item path's visibility.
let mut specs = Vec::new();
let mut spec_files = Vec::new();
for w in written {
if w.num_rows == 0 {
let _ = tokio::fs::remove_file(&w.file).await;
continue;
}
let child = &children[w.child_index];
specs.push(Chunk::new(
child.get_id(),
w.num_rows,
Some(Row::new(w.min)),
Some(Row::new(w.max)),
false,
)
.await?;
let remote = ChunkStore::chunk_file_name(chunk.clone());
let file_size = self.remote_fs.upload_file(w.file, remote).await?;
new_chunk_ids.push((chunk.get_id(), Some(file_size)));
));
spec_files.push(w.file);
}
if !specs.is_empty() {
let chunks = self.meta_store.insert_chunks(specs).await?;
for (file, chunk) in spec_files.into_iter().zip(chunks) {
let remote = ChunkStore::chunk_file_name(chunk.clone());
let file_size = self.remote_fs.upload_file(file, remote).await?;
new_chunk_ids.push((chunk.get_id(), Some(file_size)));
}
}
} else {
for w in written {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failure semantics differ subtly between the two branches: on the per-item path, if upload of chunk k fails, chunks 0..k exist as metastore rows with uploaded=false and k+1.. have no rows at all. On the batched path, all N chunk rows are created upfront, so a mid-stream upload failure leaves the full N rows as uploaded=false orphans (vs. k orphans before).

These should be cleaned up by the same uploaded-false sweep that handled the smaller orphan set previously, so this is not a correctness bug — but it does widen the failure blast radius for one job. Worth confirming there is no codepath that interprets "an uploaded=false chunk row exists" as "data was at least partially written" before merging.

Comment thread rust/cubestore/cubestore/src/store/mod.rs
Comment thread rust/cubestore/cubestore/src/config/mod.rs
Comment on lines +2774 to +2787
async fn create_partitions(
&self,
partitions: Vec<Partition>,
) -> Result<Vec<IdRow<Partition>>, CubeError> {
self.write_operation("create_partitions", move |db_ref, batch_pipe| {
let table = PartitionRocksTable::new(db_ref.clone());
let mut result = Vec::with_capacity(partitions.len());
for partition in partitions {
result.push(table.insert(partition, batch_pipe)?);
}
Ok(result)
})
.await
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding #[tracing::instrument(level = "trace", skip(self, partitions))] to match create_partition above — keeps tracing parity between the two paths when toggling the flag during incident triage. Same suggestion applies to get_active_partitions_for_indexes below.

@claude

claude Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Claude finished @waralexrom's task in 1m 16s —— View job


Re-review after sync

Verdict: LGTM — no new findings on the 2 commits since the prior review.

Details — what changed, what got resolved, what's still open
  • Pulled the two new commits (b34cf83, f68cc47)
  • Verified prior review notes against the new state
  • Resolved threads that were addressed
  • Confirmed remaining open threads still apply

What changed since the prior review

  • f68cc47 — fixes the HashMap<u64, _> flexbuffers serialization bug already called out in the PR description. get_active_partitions_for_indexes now returns Vec<Vec<IdRow<Partition>>> aligned positionally with the requested index ids. Length is validated; on mismatch returns an internal error rather than silently degrading.
  • b34cf83 — addresses two prior review notes:
    • hoists disk_check_enabled = !in_memory && max_disk_space_per_worker() != 0 so the per-partition node-name build + HashSet insert are skipped entirely when the limit is disabled (the common case);
    • adds a debug_assert! that each batched-fetch entry's partitions belong to the index at the same position — closes the positional-pairing footgun.

Threads resolved

  • store/mod.rs:2756 — disk-space dedup hoisting when limit disabled
  • store/mod.rs:3021debug_assert! for positional pairing
  • config/mod.rs:1858 — confirmed env_parse_duration(name, default, max, min) signature (verified at config/mod.rs:1274); the call (..., 1000, Some(60_000), None) is default=1000ms, max=60s, no min → sub-second IS allowed as the PR description states. No code change needed.

Threads left open (still apply, not addressed in this push)

  • store/mod.rs:1217 — batched insert_chunks widens the uploaded-false orphan blast radius on mid-stream upload failure. Not a correctness bug if the sweep treats batch orphans identically; flagged for confirmation before merging.
  • metastore/mod.rs:2787#[tracing::instrument] parity on create_partitions / get_active_partitions_for_indexes for incident triage.

Neither blocks the merge.

· [Branch](https://github.com/cube-js/cube/tree/cubestore-network-investigation)

@codecov

codecov Bot commented Jun 16, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 58.50%. Comparing base (c1dafa6) to head (c47cdb4).

Additional details and impacted files
@@                          Coverage Diff                          @@
##           cubestore-chunk-repartition-speed-up   #11095   +/-   ##
=====================================================================
  Coverage                                 58.50%   58.50%           
=====================================================================
  Files                                       216      216           
  Lines                                     17270    17270           
  Branches                                   3524     3524           
=====================================================================
  Hits                                      10103    10103           
  Misses                                     6652     6652           
  Partials                                    515      515           
Flag Coverage Δ
cube-backend 58.50% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

The disk-space scan single-flight on main waited up to a hardcoded 10s for
the compute lock. While a worker waits, its metastore RPC connection stays
open, growing the simultaneous-open-connection footprint and the window for
a reset. Cap it via CUBESTORE_DISK_SPACE_LOCK_WAIT_MS (default 1000ms, in
milliseconds so sub-second values work) and fall back to the existing
fail-open path on timeout.
add_chunk_columns and post_process_columns each re-fetched the table over the
metastore RPC for every chunk they produced, even though the table is the same
for the whole import/repartition job and immutable for its duration. On a wide
table this is one redundant read RPC per chunk, each a separate worker->main
connection feeding the connect-per-RPC storm.

Thread the table down instead: build_index_chunks loads it once before the
index loop, partition_rows loads it once for the single-index path, and
add_persistent_chunk loads it once. compact() now reuses the table already
returned by get_partition_for_compaction instead of re-fetching it.
…oning job

partition_rows_for_index ran check_node_disk_space for every partition it
wrote to, each a separate get_used_disk_space_out_of_queue metastore RPC. The
check resolves to a per-node total, so checking each distinct target node once
is enough and the answer is identical. Track checked nodes in a set and skip
repeats, cutting the per-partition RPC fan-out to one per node.
… flag

During a partitioning job build_index_chunks fetched active partitions with
one get_active_partitions_by_index_id RPC per index. Add a batched
get_active_partitions_for_indexes(Vec<u64>) that returns them for all indexes
in a single metastore round-trip, gated behind CUBESTORE_METASTORE_BATCH_RPC
(default off). When the flag is off the per-index path is unchanged.

partition_rows_for_index takes the active partitions as an optional preset; a
missing map entry in batch mode is an internal error rather than a silent
empty set (which would trip the corrupt-data path). Tests cover the metastore
method against the per-index method and the flag-on path through
build_index_chunks with multiple indexes.
…flag

compact() and the multi-partition split both created child partitions one
create_partition RPC at a time inside a loop. Add create_partitions(Vec) that
inserts them all in one metastore write transaction, gated behind
CUBESTORE_METASTORE_BATCH_RPC. This cuts N write RPCs / N write-lock
acquisitions to one and makes the creation all-or-nothing, removing the
orphan-partition window if a mid-loop insert failed.

When the flag is off the per-item loop is unchanged. MultiSplit carries the
flag from the compaction config. Tested at the metastore level and through the
compaction split path with the flag on.
…nd the flag

merge_chunk_group_into_children created one child chunk per written file with a
create_chunk RPC inside a loop. When CUBESTORE_METASTORE_BATCH_RPC is on, build
all child chunk specs and create them in one insert_chunks write, then upload
their files; the chunks are inactive until the swap, so create-before-upload
matches the per-item visibility and any chunk left behind by a failed upload is
swept by the existing remove_inactive_not_uploaded_chunks reaper. The per-item
path is unchanged when the flag is off.

This is the repartition cascade that wide imports trigger, so it cuts a large
share of the create_chunk write RPCs. Reuses the existing insert_chunks
metastore method. Covered by repartition_chunk_range_merges_only_range run with
the flag on.
get_active_partitions_for_indexes returned HashMap<u64, _>, but the metastore
RPC serializes with flexbuffers, which rejects non-string map keys
(KeyMustBeString) — it failed only over a real worker->main RPC, surfacing as
"Connection closed unexpectedly" during imports with the flag on, while the
node-local unit tests passed.

Return a positional Vec<Vec<IdRow<Partition>>> aligned with the requested index
ids instead. build_index_chunks zips by position and validates the length;
a length mismatch is an internal error rather than a silent empty set. Full
cubestore lib suite passes with the flag forced on.
Address review: hoist a disk_check_enabled flag so the per-partition node-name
build + dedup set are skipped entirely when max_disk_space_per_worker is 0 (the
common case), and guard the batched active-partition positional pairing with a
debug_assert that each entry belongs to the index at the same position.
@waralexrom waralexrom force-pushed the cubestore-network-investigation branch from b34cf83 to c47cdb4 Compare June 16, 2026 18:05
@claude

claude Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Claude finished @waralexrom's task in 43s —— View job


Re-review after force-push

Verdict: LGTM — no new findings. Branch was force-pushed (commits b34cf83/f68cc47c47cdb4/f0f0df3) but the substantive changes match what the previous re-review already covered.

Details — what changed, what's still open
  • Pulled the new tip
  • Diffed the commit set against the prior reviewed set
  • Cross-checked open review threads against the current code

What changed since the last re-review

The branch was rebased — b34cf83 (disk-space-disabled hoist + positional debug_assert!) and f68cc47 (flexbuffers-safe Vec<Vec<…>> return) are now c47cdb4 and f0f0df3 respectively. Commit subjects and diffs match; nothing materially new to review.

Threads still open (re-affirmed)

  • store/mod.rs:1220 — batched insert_chunks widens the uploaded-false orphan blast radius on mid-stream upload failure. Needs confirmation that the sweep treats batch orphans identically; not a correctness bug otherwise.
  • metastore/mod.rs:2787#[tracing::instrument] parity on create_partitions / get_active_partitions_for_indexes for incident triage.

Neither blocks merge.

Threads resolved (still resolved)

  • store/mod.rs:2759 — disk-space dedup hoisting when limit disabled
  • store/mod.rs:3024debug_assert! for positional pairing
  • config/mod.rs:1872env_parse_duration signature confirmed (default, max, min)

· · Branch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cube store Issues relating to Cube Store rust Pull requests that update Rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants