Skip to content

perf(shard): dispatch-path telemetry for shard routing observability#84

Merged
TinDang97 merged 13 commits into
mainfrom
perf/dispatch-telemetry
Apr 22, 2026
Merged

perf(shard): dispatch-path telemetry for shard routing observability#84
TinDang97 merged 13 commits into
mainfrom
perf/dispatch-telemetry

Conversation

@TinDang97
Copy link
Copy Markdown
Collaborator

@TinDang97 TinDang97 commented Apr 22, 2026

Summary

Foundation for dispatch-layer optimization: adds a moon_dispatch_path_total Prometheus counter with four labels that classifies every command by which routing path it took through the sharded connection handlers. Before reshaping ShardMessage or fusing the waker relay (next steps in the Moon-vs-Dragonfly architecture work), we need ground-truth data on what fraction of production traffic actually hits the SPSC slow path.

Scope of this PR: the final two commits (1518a1e, 360915e) are the telemetry work. The 9 commits preceding them are the unpushed Phase 176 handler-module refactor stack, included because the telemetry commits depend structurally on the split directory modules (handler_sharded/, handler_monoio/) those refactors created. They cannot be cleanly separated.

Four new counter labels

path= Meaning Hot?
local_inline SIMD fast path in blocking.rs::try_inline_dispatch — the hottest local branch 🔥 hottest
local Standard local branch (if is_local { ... }) for writes and non-inlined reads 🔥
cross_read_fast Cross-shard READ via RwLock shared-read — no SPSC message
cross_spsc Cross-shard WRITE, deferred into remote_groups and shipped as PipelineBatchSlotted 🐌 the slow path

Ratio cross_spsc / Σ is the key signal for dispatch-layer decisions.

Overhead

  • Each helper is #[inline] with an early-return on !METRICS_INITIALIZED.
  • &'static str labels → the metrics crate skips per-call allocation.
  • local_inline is incremented once per dispatch loop with the batch count (not per command).

Verified on macOS (arm64, monoio, release build, shards=4, p=1)

Experiment Result
10K random-key SETs, shards=4 local 2612 (26%) + cross_spsc 7389 (74%) — matches theoretical 25/75 split exactly
10K random-key GETs, shards=4 local_inline 2534 (25%) + cross_read_fast 7466 (75%) — zero SPSC for reads ✅
10K SETs+10K GETs, shards=1 local_inline 10000 + local 10001 (SETs+PING); zero cross-shard counters ✅

All counters sum exactly to driven traffic — no overcount, no undercount.

Finding during verification

The first iteration (1518a1e) had only 3 labels and missed ~25% of local GET traffic because try_inline_dispatch_loop is a SIMD fast path that intentionally skips standard metrics recording (see comment at blocking.rs:1115). The follow-up commit 360915e adds local_inline to close that gap — documented in the fully-accounting EXP B/C verification above.

Test plan

  • cargo check (default + runtime-tokio,jemalloc) — clean
  • cargo clippy -- -D warnings (both feature sets) — no warnings
  • cargo test admin::metrics_setup::tests::dispatch_path_counters_no_op_before_init — passes
  • cargo test shard::dispatch — 14 passed, no regressions
  • cargo fmt --check on all touched files
  • macOS arm64 benchmark — counter math closes exactly on every experiment
  • Linux CI — will verify both runtime feature sets
  • Deploy to a representative env + scrape moon_dispatch_path_total for 24h before deciding whether to land Step 1 (HotShardMessage 512→64B split)

Next steps gated on data from this PR

With the cross_spsc rate known, decide on:

  1. Step 1 — split ShardMessage into hot (≤64B) and cold variants if cross_spsc / Σ > 30% on real workloads
  2. Step 3 — fuse pending_wakers relay (loom-verified)
  3. Step 5 — prefetch on SPSC drain

Summary by CodeRabbit

Release Notes

  • New Features

    • Added alternative monoio-based connection handler for improved I/O performance options.
    • Implemented full-text search (FT.*) command support for vector and text search operations.
    • Added support for transaction (TXN.) and temporal (TEMPORAL.) commands with cross-store transaction management.
    • Added workspace (WS.) and message queue (MQ.) command support for data isolation and durable messaging.
  • Improvements

    • Enhanced WAL v3 durability management by separating buffered writes from fsync operations for flexible persistence control.
    • Improved pub/sub command handling with enhanced subscriber mode and pattern subscription support.

Split handler_sharded mod.rs (3040 LOC) into handler_sharded/
directory with 7 subfiles, each under 1500 LOC:

  dispatch.rs  464 LOC — CLIENT subcommands, CONFIG, SLOWLOG,
                          REPLICAOF/REPLCONF, INFO, READONLY,
                          BGSAVE/SAVE/LASTSAVE/BGREWRITEAOF,
                          cross-shard KEYS/SCAN/DBSIZE
  ft.rs        705 LOC — FT.* vector/text search commands
  mod.rs      1409 LOC — connection loop skeleton, HandlerResult,
                          AUTH gate, routing, cleanup
  pubsub.rs    529 LOC — subscriber loop, SUBSCRIBE/PSUBSCRIBE,
                          PUBLISH, PUBSUB introspection
  read.rs        1 LOC — placeholder (routing stays in mod.rs)
  txn.rs       258 LOC — TXN.BEGIN/COMMIT/ABORT, TEMPORAL
  write.rs     717 LOC — WS.*, MQ.*, MULTI/EXEC/DISCARD, GRAPH.*

Pure refactor — zero functional change, identical test results on
both runtime-tokio and runtime-monoio feature sets.

Addresses: HYG-07
author: Tin Dang
…txn.rs

Convert handler_monoio.rs (4206 LOC) into handler_monoio/ directory
module with 7 subfiles. Extract TXN.BEGIN/COMMIT/ABORT and
TEMPORAL.SNAPSHOT_AT/INVALIDATE into txn.rs (258 LOC) using the
same pub(super) bool-return pattern from handler_sharded (Plan 01).

- handler_monoio.rs -> handler_monoio/mod.rs (directory module)
- txn.rs extracted (258 LOC, identical logic to handler_sharded/txn.rs)
- Placeholder files: dispatch.rs, ft.rs, pubsub.rs, read.rs, write.rs
- Resolved mod pubsub / crate::pubsub name collision (fully-qualified paths)
- Removed unused temporal/transaction imports from mod.rs
- MonoioHandlerResult enum stays in mod.rs (external path preserved)
- pending_wakers relay stays in mod.rs per D-10

Addresses: HYG-07
author: Tin Dang
Extract FT.* vector/text search commands (853 LOC) into ft.rs and
pubsub command handlers (PUBLISH, UNSUBSCRIBE, PUBSUB introspection)
into pubsub.rs (170 LOC). Same pub(super) bool-return pattern as
handler_sharded (Plan 01).

- ft.rs: 880 LOC — all FT.* commands (multi-shard scatter-gather +
  single-shard fast paths), frame.clone() for Arc broadcast
- pubsub.rs: 170 LOC — PUBLISH, UNSUBSCRIBE/PUNSUBSCRIBE (no-op),
  PUBSUB CHANNELS/NUMSUB/NUMPAT introspection
- Subscriber-mode select loop + SUBSCRIBE/PSUBSCRIBE entry stay in
  mod.rs (monoio ownership I/O tightly coupled to loop state)
- Removed unused resolve_ft_search_as_of_lsn import from mod.rs
- mod.rs: 3038 LOC (further extraction in Task 3)

Addresses: HYG-07
author: Tin Dang
Extract dispatch.rs (1061 LOC) and expand pubsub.rs (170→294 LOC)
from handler_monoio/mod.rs (2319→1487 LOC). Fix unreachable code
in write.rs. All 7 files now under 1500 LOC.

Phase 176 complete — both handler_sharded (3436 LOC) and
handler_monoio (4206 LOC) split into directory modules.
Zero functional change, 3083 tests pass on both feature sets.

Addresses: HYG-07
author: Tin Dang
WAL v3 flush_if_needed() was calling flush_sync() which includes
fdatasync on every 4KB buffer fill. On macOS F_FULLFSYNC costs ~2ms,
making every batch of ~16 SET commands block on disk I/O.

Split flush_sync into flush_write (write to page cache) + sync_data
(fdatasync). flush_if_needed now calls flush_write only, matching
WAL v2's pattern. The 1s timer still calls flush_sync for durability.

Result: SET p=16 with persistence ON improved from 194K → 449K ops/s
(2.3x speedup). SET p=64 improved from 775K → 1.36M ops/s (1.75x).

author: Tin Dang
When disk-offload is enabled (default), every write was appended to
BOTH WAL v2 and WAL v3 — two write() syscalls per SPSC drain batch.
WAL v3 supersedes v2 (per-record LSN + CRC32C), so v2 is now skipped
entirely when v3 is active. Recovery already prefers v3 and falls back
to v2 only when v3 has 0 commands.

Result (macOS, 4 shards, persistence ON vs Redis AOF everysec):
  SET p=1:  25K → 141K (5.6x faster, 0.81x Redis)
  SET p=16: 434K → 1.62M (3.7x faster, 2.10x Redis)
  SET p=64: 1.36M → 2.36M (1.7x faster, 2.17x Redis)

author: Tin Dang
…ility

Adds a three-way Prometheus counter, `moon_dispatch_path_total{path=...}`,
that records how each command is routed through the sharded connection
handlers:

  - `local`            executed on the connection's own shard
  - `cross_read_fast`  cross-shard read via RwLock shared-read fast path
                       (no SPSC message, no cross-thread scheduling hop)
  - `cross_spsc`       deferred into `remote_groups` and shipped as a
                       `PipelineBatchSlotted` message (the slow path
                       flagged by the CLAUDE.md `--shards 1` gotcha)

Both handler variants are instrumented at the existing routing decision
points — `if is_local`, the cross-shard read fast path, and the
`remote_groups.entry(target).or_default().push(...)` site:

  - src/server/conn/handler_sharded/mod.rs  (3 sites)
  - src/server/conn/handler_monoio/mod.rs   (3 sites)

Why this first, not the payload shrink:

  Before reshaping `ShardMessage` (512-byte capped enum) or fusing the
  `pending_wakers` relay, we need ground truth on what fraction of
  production traffic actually hits the SPSC slow path. Under random
  keys + 4 shards + write-heavy workload the upper bound is ~75%, but
  read-heavy workloads already dodge SPSC entirely via the existing
  cross-shard read fast path. Without this counter, any dispatch-layer
  rewrite is speculation.

Overhead is near-zero on the hot path: each helper is `#[inline]`,
early-returns on `!METRICS_INITIALIZED`, and uses `&'static str`
labels so the `metrics` crate does not re-allocate per call.

Hygiene: includes a 3-line rustfmt fix to `src/shard/event_loop.rs`
that was introduced by fed2ba7 and failing `cargo fmt --check` in CI.

Validation
  - cargo check (default + runtime-tokio,jemalloc)         clean
  - cargo clippy -- -D warnings (both feature sets)        clean
  - cargo test admin::metrics_setup::tests::dispatch_path  1 passed
  - cargo test shard::dispatch                             14 passed
  - cargo fmt --check on all touched files                 clean

Next step gated on data: collect a day of ratio metrics from a real
workload, then decide whether the `HotShardMessage` split (Step 1 of
the Dragonfly-comparison plan) is worth the blast radius.

author: Tin Dang
Closes a blind spot discovered during macOS verification of 1518a1e:
the `try_inline_dispatch_loop` SIMD fast path in
`src/server/conn/blocking.rs` processes GET (and gated SET) commands
before they reach the standard frame-by-frame routing branch at
`handler_monoio/mod.rs:831`, so the three counters added in the prior
commit never saw ~25% of local GET traffic on a 4-shard setup.

Adds a fourth `path` label — `local_inline` — incremented once per
dispatch loop with the batch count returned by `try_inline_dispatch_loop`,
keeping the per-command hot path untouched.

Validation on macOS (arm64, monoio, shards=4, pipeline=1):

  EXP B  10K random-key GETs
    cross_read_fast  7466   (74.66%)
    local_inline     2534   (25.34%)
    ———————————————————————————
    Σ               10000   (full accounting — was 7467/10000 before)

  EXP C  shards=1 sanity (10K SET + 10K GET)
    local_inline    10000   (all GETs take inline path)
    local           10001   (all SETs + initial PING)
    cross_read_fast     0
    cross_spsc          0

Test `dispatch_path_counters_no_op_before_init` is extended to exercise
the new helper, including the count==0 short-circuit.

author: Tin Dang
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 22, 2026

Warning

Rate limit exceeded

@TinDang97 has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 29 minutes and 7 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 29 minutes and 7 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d0c9b2b6-30c6-4d7b-8347-cc2221a69803

📥 Commits

Reviewing files that changed from the base of the PR and between 360915e and 972643a.

📒 Files selected for processing (4)
  • CHANGELOG.md
  • src/command/vector_search/ft_text_search.rs
  • src/server/conn/handler_monoio/ft.rs
  • tests/pipeline_auto_index.rs
📝 Walkthrough

Walkthrough

This PR introduces two comprehensive connection handler modules (handler_monoio and handler_sharded) implementing full RESP command dispatch pipelines with auth, clustering, pub/sub, transactions, and persistence integration. It adds metrics helpers for dispatch tracking and refactors WAL v3 persistence with split write/sync semantics for improved durability control.

Changes

Cohort / File(s) Summary
Submodule & Configuration
.planning
Updated submodule reference commit hash.
Metrics Infrastructure
src/admin/metrics_setup.rs
Added four Prometheus counter helper functions for dispatch-path routing metrics (record_dispatch_local, record_dispatch_cross_read_fastpath, record_dispatch_cross_spsc, record_dispatch_local_inline) with initialization checks and smoke test coverage.
Graph Command Processing
src/command/graph/graph_read.rs
Simplified control flow in json_to_graph_value's Object branch by removing match arm wrapper and directly returning converted executor values.
WAL v3 Persistence Layer
src/persistence/wal_v3/segment.rs, src/shard/event_loop.rs, src/shard/spsc_handler.rs, src/shard/timers.rs
Split buffered WAL flushing into non-durable write-only and durable sync paths; added sync_data() for separate fsync enforcement; updated initialization to skip WAL v2 when disk-offload is enabled; made WAL append mutually-exclusive (v3-first); clarified v3 flush documentation.
Monoio Connection Handler Module
src/server/conn/handler_monoio/mod.rs, src/server/conn/handler_monoio/dispatch.rs, src/server/conn/handler_monoio/pubsub.rs, src/server/conn/handler_monoio/ft.rs, src/server/conn/handler_monoio/txn.rs, src/server/conn/handler_monoio/write.rs, src/server/conn/handler_monoio/read.rs
Implemented complete monoio async runtime connection handler with RESP frame parsing, auth gating, clustered routing, pub/sub (subscribe/publish/introspection), full-text search dispatch, transactions (TXN./TEMPORAL.), workspace (WS.) and message queue (MQ.) commands, blocking operations, multi/EXEC/DISCARD, and cross-shard coordination with response aggregation.
Sharded Connection Handler Module
src/server/conn/handler_sharded/mod.rs, src/server/conn/handler_sharded/dispatch.rs, src/server/conn/handler_sharded/pubsub.rs, src/server/conn/handler_sharded/ft.rs, src/server/conn/handler_sharded/txn.rs, src/server/conn/handler_sharded/write.rs, src/server/conn/handler_sharded/read.rs
Implemented Tokio-based sharded connection handler with subscriber mode loop, command dispatch pipeline, connection affinity/migration logic, client registry integration, pub/sub coordination, cross-shard aggregation for SCAN/KEYS/DBSIZE, and transaction/temporal/workspace/MQ/GRAPH command support.
Connection Infrastructure
src/server/conn/mod.rs
Added lint attribute to util module re-export.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Handler as Handler<br/>(monoio/sharded)
    participant AuthGate as Auth Gate
    participant CommandDispatcher as Command<br/>Dispatcher
    participant Coordinator as Cross-Shard<br/>Coordinator
    participant Shard as Local Shard

    Client->>Handler: Connect + RESP Stream
    Handler->>Handler: Register in client registry
    Handler->>Handler: Initialize per-connection state
    
    loop Read/Parse/Dispatch Loop
        Client->>Handler: Send RESP Frame
        Handler->>Handler: Parse frame to command
        Handler->>AuthGate: Check AUTH/HELLO/QUIT
        AuthGate-->>Handler: AuthGateResult (Consumed|Authenticated|NotAuth)
        
        alt Authentication required
            AuthGate->>Handler: Reject command
        else Command consumed
            Handler-->>Client: Send response
        else Proceed to dispatch
            Handler->>CommandDispatcher: Route command
            CommandDispatcher->>CommandDispatcher: Check: Local vs Remote?
            
            alt Local execution (KEYS, CONFIG, INFO, etc.)
                CommandDispatcher->>Shard: Execute with DB guard
                Shard-->>CommandDispatcher: Result
            else Cross-shard command (SCAN, DBSIZE, etc.)
                CommandDispatcher->>Coordinator: Scatter/Broadcast to shards
                Coordinator->>Shard: Dispatch to all shards
                Shard-->>Coordinator: Collect results
                Coordinator-->>CommandDispatcher: Aggregated response
            else Remote shard routing (by key)
                CommandDispatcher->>Coordinator: Send to target shard via SPSC
                Coordinator-->>CommandDispatcher: Deferred response slot
            end
            
            CommandDispatcher->>Handler: Push response frame(s)
        end
    end
    
    Handler->>Handler: RESP-encode accumulated responses
    Handler->>Client: Write batch of frames
    Handler->>Handler: Update client registry state
    
    Client->>Handler: Close connection
    Handler->>Handler: Cleanup: abort TXN, pubsub unsubscribe, affinity removal
    Handler->>Handler: Deregister from client registry
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Suggested labels

enhancement

Suggested reviewers

  • pilotspacex-byte

Poem

🐰 Two handlers unite, monoio and sharded in flight,
Dispatching commands through the night, with pubsub broadcast so bright!
Transactions bloom, workspaces zoom, WAL splits sync from write—
The rabbit hops through routing tables, making all queries alight! ✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title 'perf(shard): dispatch-path telemetry for shard routing observability' clearly and specifically summarizes the main change: adding dispatch-path telemetry for observability.
Description check ✅ Passed The description covers the summary, detailed implementation notes, performance impact, verification results, test plan, and next steps; all required sections are present and well-documented.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch perf/dispatch-telemetry

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 20

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

🟡 Minor comments (5)
src/persistence/wal_v3/segment.rs-134-186 (1)

134-186: ⚠️ Potential issue | 🟡 Minor

Minor: unused sync_data and stale flush_if_needed doc reference.

The write/sync split is clean and the durability contract is preserved. Two small nits:

  1. pub fn sync_data has no callers across the codebase — persistence_tick, event_loop, and the 1s timer (sync_wal_v3) all invoke flush_sync. If it's intended for a future caller, that's fine; otherwise consider removing dead public API.
  2. The doc on flush_if_needed (lines 169-170) says durable sync deferred to the 1s timer (sync_data), but the 1s timer calls flush_sync, not sync_data. Update the doc to reference flush_sync.
📝 Proposed doc fix
-    /// Flush if buffer exceeds a threshold — write only, no fsync.
-    ///
-    /// Matches WAL v2 pattern: frequent writes to OS page cache,
-    /// durable sync deferred to the 1s timer (`sync_data`).
+    /// Flush if buffer exceeds a threshold — write only, no fsync.
+    ///
+    /// Matches WAL v2 pattern: frequent writes to OS page cache,
+    /// durable sync deferred to the 1s timer via `flush_sync`
+    /// (which writes remaining buffered bytes and then fsyncs).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/wal_v3/segment.rs` around lines 134 - 186, Remove or justify
the unused public API `sync_data`: search for the function by name `sync_data`
and either delete it (and adjust visibility of related helpers) if there are no
intended callers, or add a TODO comment explaining its future use; ensure
callers use `flush_sync` where durability is required. Also update the
`flush_if_needed` docstring to reference `flush_sync` (not `sync_data`) and
mention that durable sync is deferred to the 1s timer which calls `flush_sync`;
refer to `flush_if_needed`, `flush_write`, `flush_sync`, and the timer callers
(`persistence_tick`, `event_loop`, `sync_wal_v3`) when making these changes.
src/admin/metrics_setup.rs-924-935 (1)

924-935: ⚠️ Potential issue | 🟡 Minor

Avoid asserting process-global metrics state in this test.

METRICS_INITIALIZED is shared across the whole test binary, so this can fail if another test initializes metrics first. Keep the smoke test order-independent.

Suggested fix
     #[test]
     fn dispatch_path_counters_no_op_before_init() {
-        // METRICS_INITIALIZED starts false; all three helpers must early-return.
-        // We just assert they do not panic. The absence of a global recorder
-        // means counter!() would otherwise be a no-op, but the guard is what
-        // we actually care about: no string allocation, no label churn.
-        assert!(!METRICS_INITIALIZED.load(Ordering::Relaxed));
+        // These helpers must be safe to call regardless of whether another
+        // test has already initialized the global recorder.
         record_dispatch_local();
         record_dispatch_cross_read_fastpath();
         record_dispatch_cross_spsc();
-        record_dispatch_local_inline(0); // count == 0 must short-circuit even when init
+        record_dispatch_local_inline(0);
         record_dispatch_local_inline(7);
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/admin/metrics_setup.rs` around lines 924 - 935, Remove the brittle
global-state assertion from the test dispatch_path_counters_no_op_before_init:
do not read or assert METRICS_INITIALIZED; instead simply call the helper
functions (record_dispatch_local, record_dispatch_cross_read_fastpath,
record_dispatch_cross_spsc, record_dispatch_local_inline) to ensure they do not
panic regardless of global recorder state. Update the test body by deleting the
line that checks METRICS_INITIALIZED.load(...) and keep the remaining calls
(including both record_dispatch_local_inline(0) and
record_dispatch_local_inline(7)) so the test remains order-independent.
src/server/conn/handler_monoio/ft.rs-153-165 (1)

153-165: ⚠️ Potential issue | 🟡 Minor

Avoid unwrap() in the FT.SEARCH dispatch path.

The is_text check implies query_bytes is present, but dispatch code should keep the invariant explicit. As per coding guidelines, src/**/*.rs: “No unwrap() or expect() in library code outside tests.”

Suggested fix
-                let query_str = query_bytes.unwrap();
+                let Some(query_str) = query_bytes else {
+                    responses.push(Frame::Error(Bytes::from_static(b"ERR invalid query")));
+                    return true;
+                };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_monoio/ft.rs` around lines 153 - 165, The FT.SEARCH
branch currently calls query_bytes.unwrap(); replace that unwrap with an
explicit check (e.g., if let Some(query_str) = query_bytes { ... } or match
query_bytes) and if None push an appropriate Frame::Error (similar to the
index_name error) and return true to preserve the dispatch invariant; update the
assignment to query_str to use the matched value so there is no unwrap/expect in
the FT.SEARCH handling (referencing is_text, query_bytes, index_name, responses
and Frame::Error in ft.rs).
src/server/conn/handler_sharded/ft.rs-157-170 (1)

157-170: ⚠️ Potential issue | 🟡 Minor

Avoid unwrap() in the FT.SEARCH dispatch path.

The is_text invariant makes this safe today, but library dispatch paths should not use unwrap(). As per coding guidelines, src/**/*.rs: “No unwrap() or expect() in library code outside tests.”

Suggested fix
-                #[allow(clippy::unwrap_used)] // query_bytes is Some when is_text is true
-                let query_str = query_bytes.unwrap();
+                let Some(query_str) = query_bytes else {
+                    responses.push(Frame::Error(Bytes::from_static(b"ERR invalid query")));
+                    return true;
+                };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded/ft.rs` around lines 157 - 170, The code uses
query_bytes.unwrap() in the FT.SEARCH dispatch path (inside the is_text branch)
which must be removed; replace the unwrap with a safe extraction like matching
or if-let on query_bytes and return the same error response if it's None.
Specifically, update the block that defines query_str (referencing is_text,
query_bytes and the variable query_str) to handle the None case without
panicking and preserve the existing error response behavior so the function
returns true on invalid/missing query bytes.
src/server/conn/handler_monoio/write.rs-493-520 (1)

493-520: ⚠️ Potential issue | 🟡 Minor

MQ.ACK swallows real xack errors as "0 acked".

Lines 515 and 519 return Frame::Integer(0) for both Err(_) from xack and a missing stream/group. That makes genuine failures indistinguishable from benign "nothing to ack" responses, and no WAL record is emitted either — clients cannot detect, retry, or alert on a real ack failure.

Consider differentiating: return Frame::Error (or a typed error) on Err(_), and Frame::Integer(0) only for the no-op case. At a minimum, log the error branch.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_monoio/write.rs` around lines 493 - 520, The xack
error branch currently swallows real errors by returning Frame::Integer(0);
change the error handling so that failures from stream.xack and from
db_guard.get_stream_mut return a Frame::Error with the error details (or at
least a descriptive error string) instead of Integer(0), keep Frame::Integer(0)
only for the benign Ok(None) "nothing to ack" case, and ensure WAL records
(ctx.shard_databases.wal_append) are only emitted after a successful xack; also
log the underlying error (include the error from stream.xack and from
get_stream_mut) to aid debugging and keep references to effective_key,
stream.xack, get_stream_mut, ctx.shard_databases.wal_append and msg_ids when
locating the code to change.
🧹 Nitpick comments (2)
src/server/conn/handler_monoio/write.rs (2)

53-56: Use ctx.cached_clock for created_at instead of SystemTime::now().

ctx.cached_clock.ms() is already used elsewhere in this file (e.g. line 339). Calling SystemTime::now() here bypasses the shard-cached-timestamp invariant and introduces an extra syscall per WS.CREATE. Also, unwrap_or_default() silently produces created_at = 0 if the system clock is before the Unix epoch; returning an error (or using the cached clock) would be more predictable.

As per coding guidelines: "Performance invariants: never call Instant::now() per-key (use shard-cached timestamp)." The same reasoning applies to SystemTime::now() when a monotonic shard-cached ms source exists.

♻️ Proposed change
-                let created_at = std::time::SystemTime::now()
-                    .duration_since(std::time::UNIX_EPOCH)
-                    .unwrap_or_default()
-                    .as_millis() as i64;
+                let created_at = ctx.cached_clock.ms() as i64;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_monoio/write.rs` around lines 53 - 56, Replace the
ad-hoc SystemTime call used to compute created_at with the shard-cached
timestamp: use ctx.cached_clock.ms() instead of
std::time::SystemTime::now()...duration_since(...).unwrap_or_default(); update
the assignment to let created_at = ctx.cached_clock.ms() as i64 (or otherwise
convert to i64 consistently with surrounding code) so the code honors the
shard-cached-timestamp invariant and avoids the extra syscall and silent zero on
epoch error; locate this change around the created_at binding in the write
handler (created_at, ctx.cached_clock.ms()) and ensure types match callers.

208-213: Prefer is_some_and over map_or(false, …) for idiomatic Rust on MSRV 1.94 / edition 2024.

The pattern map_or(false, |reg| reg.get(&ws_id).is_some()) is flagged by clippy::unnecessary_map_or (enabled by default, enforced as -D warnings in CI). Use is_some_and instead, which is available in MSRV 1.94 (stabilized in 1.70).

♻️ Proposed change
-                            let found = {
-                                let guard = ctx.shard_databases.workspace_registry(ctx.shard_id);
-                                guard
-                                    .as_ref()
-                                    .map_or(false, |reg| reg.get(&ws_id).is_some())
-                            };
+                            let found = {
+                                let guard = ctx.shard_databases.workspace_registry(ctx.shard_id);
+                                guard.as_ref().is_some_and(|reg| reg.get(&ws_id).is_some())
+                            };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_monoio/write.rs` around lines 208 - 213, The code
computes `found` by using map_or(false, |reg| reg.get(&ws_id).is_some()) which
triggers clippy::unnecessary_map_or; replace that pattern with the more
idiomatic is_some_and: call ctx.shard_databases.workspace_registry(ctx.shard_id)
into the guard and then use guard.as_ref().is_some_and(|reg|
reg.get(&ws_id).is_some()) so `found` becomes true only when the registry exists
and contains ws_id; update the expression around the `found` binding in write.rs
accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/server/conn/handler_monoio/dispatch.rs`:
- Around line 576-604: The handler currently skips re-registering the client in
ctx.tracking_table when CLIENT TRACKING ON is called a second time because
conn.tracking_rx is Some; to fix, after updating conn.tracking_state (and even
if conn.tracking_rx.is_some()), obtain the existing invalidation sender from
conn.tracking_state.invalidation_tx (clone it), borrow ctx.tracking_table and
call register_client(client_id, tx_clone), then apply any new redirect via
set_redirect(client_id, target) and re-register any prefixes via
register_prefix(client_id, prefix.clone(), config_parsed.noloop) so
REDIRECT/PREFIX changes take effect without requiring TRACKING OFF; use the same
symbol names (parse_tracking_args, conn.tracking_state, conn.tracking_rx,
conn.tracking_state.invalidation_tx,
ctx.tracking_table.register_client/set_redirect/register_prefix, client_id,
config_parsed) to locate and implement the change.

In `@src/server/conn/handler_monoio/ft.rs`:
- Around line 366-456: The monoio multi-shard handlers for FT.INFO, FT.COMPACT,
and FT.CONFIG currently run locally against ctx.shard_id and return early;
change each of those branches to call broadcast_vector_command like
FT.CREATE/FT.DROPINDEX do (use the same signature with ctx, cmd, cmd_args) so
the command is dispatched to all shards instead of a single shard; locate the
blocks that call crate::command::vector_search::ft_info, ft_compact, and
ft_config and replace them with broadcast_vector_command(ctx, cmd, cmd_args, /*
same options used by other broadcasts */), pushing the broadcast result into
responses and returning true.

In `@src/server/conn/handler_monoio/mod.rs`:
- Around line 588-699: ACL enforcement is being run too late
(dispatch::try_enforce_acl), allowing handlers like dispatch::try_handle_eval,
dispatch::try_handle_evalsha, dispatch::try_handle_script,
dispatch::try_handle_config, dispatch::try_handle_replicaof,
dispatch::try_handle_info and the pubsub handlers to consume commands before ACL
checks; move the call to dispatch::try_enforce_acl earlier in the request
dispatch sequence (e.g., immediately after authentication/hello handling and
before any executable-command handlers and pubsub handling) so that
dispatch::try_enforce_acl(cmd, cmd_args, &mut conn, ctx, &peer_addr, &mut
responses) runs before calling the listed try_handle_* and pubsub::try_handle_*
functions.
- Around line 656-790: Several command handlers (e.g.
pubsub::try_handle_publish, pubsub::try_handle_subscribe_entry,
pubsub::try_handle_unsubscribe, pubsub::try_handle_pubsub_introspection,
dispatch::try_handle_persistence, dispatch::try_enforce_acl,
dispatch::try_handle_client_admin, dispatch::try_handle_functions,
txn::try_handle_txn_begin/try_handle_txn_commit/try_handle_txn_abort,
txn::try_handle_temporal_snapshot_at/try_handle_temporal_invalidate,
write::try_handle_ws_command, write::try_handle_mq_command,
write::try_handle_multi_exec, dispatch::try_handle_blocking and the MULTI queue
branch) return or continue before any dispatch-path metric is recorded; add
calls to the existing recording helpers (record_dispatch_local,
record_dispatch_cross_read_fastpath, or record_dispatch_cross_spsc as
appropriate) at each early exit/continue/return/break so every consumed command
increments moon_dispatch_path_total with the correct label (choose the same
label logic used after routing), ensuring you insert the metric call immediately
before each continue/return/break in those handler outcome branches.
- Around line 472-518: The inlined-path can produce responses that remain
buffered when try_inline_dispatch_loop returns >0 but read_buf still contains an
incomplete command; fix by flushing write_buf whenever inlined > 0 before
falling through or looping: after the try_inline_dispatch_loop call check if
inlined > 0 and if write_buf is not empty perform the same
stream.write_all(data).await handling (and break/return on error) so responses
are sent immediately (use the existing write_buf split/freeze and error
handling), then only decide whether to continue based on read_buf.is_empty();
reference try_inline_dispatch_loop, write_buf, read_buf, and stream.write_all to
locate where to add this flush.
- Around line 1239-1257: The current code builds PipelineBatch messages using
the live conn.selected_db which can change before the batch is sent; modify the
grouping so each remote batch preserves the DB index it was queued with: either
include the selected_db captured from conn when building each group's entries
and use that captured value when constructing ShardMessage::PipelineBatch, or
split remote_groups by (target, db_index) so entries is already per-DB; update
the tuple types/keys where remote_groups is constructed and where entries are
mapped (see remote_groups, entries, oneshot_futures, and
ShardMessage::PipelineBatch) to carry/consume the per-entry db_index instead of
reading conn.selected_db at send time.

In `@src/server/conn/handler_monoio/pubsub.rs`:
- Around line 38-40: The code currently uses unwrap() when obtaining the ACL
RwLock (ctx.acl_table.read().unwrap()) and elsewhere; replace these unwraps with
non-panicking handling: attempt to read the lock (ctx.acl_table.read()), and if
it returns Err (poisoned/unavailable) convert that into sending a Frame::Error
response rather than panicking; similarly avoid unwrapping pubsub_tx — clone and
use the cloned sender via .clone() and handle any send errors gracefully (map
send failures to Frame::Error or log and continue). Update all locations that
reference acl_guard and any pubsub_tx unwraps (including the other block around
lines 165–180) to follow this pattern so library code never calls
unwrap()/expect().
- Around line 161-223: Introduce a local flag (e.g., let mut registered =
false;) before the for arg in cmd_args loop and set registered = true right
after a successful ctx.pubsub_registry.write().subscribe(...) or
.psubscribe(...) call (or immediately after conn.subscription_count += 1) so we
know a subscription was actually registered; after flushing write_buf, only
return SubscribeResult::Subscribed if registered is true, otherwise return a
non-subscribed result (add/choose an appropriate enum variant such as
SubscribeResult::NoSubscriptions or SubscribeResult::NotSubscribed) so the
connection does not enter subscriber mode when no subscriptions were accepted;
update any callers/tests accordingly.

In `@src/server/conn/handler_monoio/write.rs`:
- Around line 433-455: The current code xacks entries via
stream.xack(&group_name, &dlq_ack_ids) before ensuring DLQ delivery, which can
lose messages if db_guard.get_or_create_stream(&dlq_key) fails; change the
ordering in the MQ.POP path so you first call
db_guard.get_or_create_stream(&dlq_key), add each entry to the returned
dlq_stream (handle and surface any errors from dlq_stream.add/next_auto_id), and
only if all DLQ inserts succeed call stream.xack(&group_name, &dlq_ack_ids); on
DLQ creation/insert failure do not xack, and propagate or log an error (or
increment a metric) instead of silently swallowing the Err(_) branch. Ensure you
reference and protect dlq_entries, dlq_ack_ids, stream.xack,
db_guard.get_or_create_stream, and dlq_stream.add in the new flow.

In `@src/server/conn/handler_sharded/dispatch.rs`:
- Around line 421-435: The current check uses rs.try_read() and silently skips
READONLY enforcement when try_read() fails; change to fail-closed by treating a
failed try_read as if the node is a Replica: either use the blocking rs.read()
to obtain rs_guard or, if you must keep try_read(), add an Err branch that
pushes the same Frame::Error (same message) and returns true when try_read()
fails and metadata::is_write(cmd) is true; ensure you still check rs_guard.role
(replica) before returning the error so the enforcement around ctx.repl_state,
rs.try_read()/rs.read(), rs_guard.role, metadata::is_write, and Frame::Error is
preserved.
- Around line 64-89: The CLIENT TRACKING handling only updates
ctx.tracking_table when conn.tracking_rx.is_none(), so subsequent CLIENT
TRACKING ON calls won’t update registrations; move or duplicate the
tracking_table update logic so that after parse_tracking_args succeeds and
conn.tracking_state is updated (even when conn.tracking_rx.is_some()), you still
call ctx.tracking_table.borrow_mut().register_client(client_id, tx or existing
tx), set_redirect(client_id, target) if provided, and register_prefix(client_id,
prefix, noloop) for each prefix; locate this logic around the existing
parse_tracking_args match and the conn.tracking_rx check (functions/fields:
parse_tracking_args, conn.tracking_state, conn.tracking_rx,
ctx.tracking_table.register_client, set_redirect, register_prefix) and ensure
you use the existing stored sender when conn.tracking_rx is Some.

In `@src/server/conn/handler_sharded/mod.rs`:
- Around line 1214-1219: The pipeline batching currently uses the first entry's
selected_db (batch_db) for all commands, causing later commands to run in the
wrong DB if selected_db changes; update the batching logic that builds
ShardMessage::PipelineBatchSlotted to split or key remote_groups by (target,
db_index) instead of only target, or flush and start a new batch whenever an
entry's selected_db differs from the current batch_db. Locate the code that
reads entries, the batch_db calculation, and the creation of
ShardMessage::PipelineBatchSlotted and ensure grouping uses each entry's
selected_db (or groups by (target, selected_db)) so each pipeline batch sent to
the shard carries the correct db_index for all commands.
- Around line 756-856: The telemetry counter moon_dispatch_path_total is not
incremented for several pre-routing handlers that return early; update each of
these code paths to record the dispatch-path metric before returning/continuing:
add an increment to moon_dispatch_path_total (with the same label set used at
the later metric sites) inside txn::try_handle_txn_begin /
txn::try_handle_txn_commit / txn::try_handle_txn_abort returns, temporal
handlers (txn::try_handle_temporal_snapshot_at,
txn::try_handle_temporal_invalidate), workspace/multi handlers
(write::try_handle_ws_command, write::try_handle_mq_command,
write::try_handle_multi_exec), the blocking-command branch that calls
handle_blocking_command, and pubsub handlers (pubsub::try_handle_publish, the
branches returned from pubsub::try_handle_subscribe,
pubsub::try_handle_unsubscribe, and pubsub::try_handle_pubsub_introspection);
ensure you place the metric increment immediately before each
continue/break/return so those early exits are counted and use the same metric
labels (shard, local/cross, etc.) as the later sites.
- Around line 474-682: The ACL check must run before any executable command
paths (currently EVAL/EVALSHA and SCRIPT run first); move the block that calls
conn.acl_skip_allowed(), reads ctx.acl_table and invokes
acl_guard.check_command_permission / check_key_permission (and the subsequent
conn.acl_log push + responses.push Frame::Error behavior) to immediately after
the AUTH/HELLO/ACL handling (i.e. after conn_cmd::auth_acl, conn_cmd::hello_acl,
and crate::command::acl::handle_acl) and before the EVAL/EVALSHA and SCRIPT
handlers; keep the exact permission logic and timestamping, keep using
conn.acl_skip_allowed(), ctx.acl_table, check_command_permission,
check_key_permission, and metadata::is_write(cmd) so behavior is unchanged
except for ordering.
- Around line 123-181: The migration failure paths currently drop the connection
(SPSC full retry exhaustion or into_std error) but never call
crate::admin::metrics_setup::record_connection_closed(), leaking the
active-connection metric; add a call to record_connection_closed() in the
Err(returned_msg) branch after any ShardMessage::MigrateConnection fd cleanup
and warning (the block handling pending/OwnedFd cleanup and tracing::warn), and
also add record_connection_closed() in the Err(e) branch that logs "migration
into_std failed" so the metric is decremented whenever into_std or SPSC
migration ultimately loses the connection.
- Around line 1157-1163: The cross-shard read fast-path currently triggers for
any non-write command (metadata::is_write) even if the command isn't supported
by the read dispatcher; update the condition that chooses the fast-path in the
handler_sharded code (the block that calls
crate::admin::metrics_setup::record_dispatch_cross_read_fastpath(),
ctx.shard_databases.read_db(...), and dispatch_read(...)) to also require
is_dispatch_read_supported(cmd) (the same guard used in handler_monoio) so
unsupported reads fall back to the normal dispatcher; ensure you import or
reference the same is_dispatch_read_supported function and only take the fast
path when it returns true.

In `@src/server/conn/handler_sharded/pubsub.rs`:
- Around line 156-163: The unsubscribe handling currently always decrements
conn.subscription_count and calls unpropagate_subscription even when the
subscriber wasn't actually removed; change the logic in the UNSUBSCRIBE loop
(and mirror the same fix in the PUNSUBSCRIBE block) to check the return value of
ctx.pubsub_registry.write().unsubscribe (or whatever unsubscribe API returns)
and only when it indicates a removal: decrement conn.subscription_count, call
unpropagate_subscription(&ctx.all_remote_sub_maps, &ch, ctx.shard_id,
ctx.num_shards, false), and then serialize/send
pubsub::unsubscribe_response(&ch, conn.subscription_count); if unsubscribe did
not remove the subscriber, skip the decrement/unpropagate and still send the
appropriate response/count without mutating state.
- Around line 210-216: The RESET handler currently clears only local entries
(ctx.pubsub_registry.write().unsubscribe_all / punsubscribe_all) which discards
the removed channel/pattern lists and leaves all_remote_sub_maps stale; change
RESET so it captures the sets returned by unsubscribe_all(conn.subscriber_id)
and punsubscribe_all(conn.subscriber_id) (or adjust those methods to return the
removed channels/patterns), then call the existing propagation/unpropagation
routine (the code path used on normal unsubscribe/punsubscribe) to notify remote
shards to remove this subscriber from all_remote_sub_maps for those
channels/patterns, then set conn.subscription_count = 0 and write the RESET
reply as before.

In `@src/server/conn/handler_sharded/write.rs`:
- Around line 662-710: The code currently drains graph WAL into wal_records and
immediately calls ctx.shard_databases.wal_append even when conn.active_cross_txn
exists, which can make uncommitted graph mutations durable; change the logic so
that if conn.active_cross_txn.is_some() you do NOT call wal_append here but
instead store/defer wal_records on the active transaction (e.g., add to txn via
a new method like txn.defer_wal_records or txn.append_transactional_wal) or mark
them as transactional WAL tied to txn_id so they are only appended on TXN.COMMIT
(or discarded on TXN.ABORT); keep existing txn.record_graph/record_graph_undo
usage for rollback intents and ensure ctx.shard_databases.wal_append is only
invoked for non-transactional cases (when conn.active_cross_txn is None) or
during the commit path that consumes the deferred records.
- Around line 112-125: The current WS.DROP cleanup only scans logical DB 0 (in
write.rs) and may leave workspace-prefixed keys in other DBs; update the block
that uses ctx.shard_databases.write_db(ctx.shard_id, 0) to iterate over all
logical DB indices for this shard (e.g., 0..N or via a provided API on
ctx.shard_databases) and perform the same prefix scan/remove in each DB: for
each db index call ctx.shard_databases.write_db(ctx.shard_id, db_index), build
the same prefix from ws_id.as_hex(), collect keys_to_delete and remove them from
that DB so workspace keys are dropped across all logical DBs.

---

Minor comments:
In `@src/admin/metrics_setup.rs`:
- Around line 924-935: Remove the brittle global-state assertion from the test
dispatch_path_counters_no_op_before_init: do not read or assert
METRICS_INITIALIZED; instead simply call the helper functions
(record_dispatch_local, record_dispatch_cross_read_fastpath,
record_dispatch_cross_spsc, record_dispatch_local_inline) to ensure they do not
panic regardless of global recorder state. Update the test body by deleting the
line that checks METRICS_INITIALIZED.load(...) and keep the remaining calls
(including both record_dispatch_local_inline(0) and
record_dispatch_local_inline(7)) so the test remains order-independent.

In `@src/persistence/wal_v3/segment.rs`:
- Around line 134-186: Remove or justify the unused public API `sync_data`:
search for the function by name `sync_data` and either delete it (and adjust
visibility of related helpers) if there are no intended callers, or add a TODO
comment explaining its future use; ensure callers use `flush_sync` where
durability is required. Also update the `flush_if_needed` docstring to reference
`flush_sync` (not `sync_data`) and mention that durable sync is deferred to the
1s timer which calls `flush_sync`; refer to `flush_if_needed`, `flush_write`,
`flush_sync`, and the timer callers (`persistence_tick`, `event_loop`,
`sync_wal_v3`) when making these changes.

In `@src/server/conn/handler_monoio/ft.rs`:
- Around line 153-165: The FT.SEARCH branch currently calls
query_bytes.unwrap(); replace that unwrap with an explicit check (e.g., if let
Some(query_str) = query_bytes { ... } or match query_bytes) and if None push an
appropriate Frame::Error (similar to the index_name error) and return true to
preserve the dispatch invariant; update the assignment to query_str to use the
matched value so there is no unwrap/expect in the FT.SEARCH handling
(referencing is_text, query_bytes, index_name, responses and Frame::Error in
ft.rs).

In `@src/server/conn/handler_monoio/write.rs`:
- Around line 493-520: The xack error branch currently swallows real errors by
returning Frame::Integer(0); change the error handling so that failures from
stream.xack and from db_guard.get_stream_mut return a Frame::Error with the
error details (or at least a descriptive error string) instead of Integer(0),
keep Frame::Integer(0) only for the benign Ok(None) "nothing to ack" case, and
ensure WAL records (ctx.shard_databases.wal_append) are only emitted after a
successful xack; also log the underlying error (include the error from
stream.xack and from get_stream_mut) to aid debugging and keep references to
effective_key, stream.xack, get_stream_mut, ctx.shard_databases.wal_append and
msg_ids when locating the code to change.

In `@src/server/conn/handler_sharded/ft.rs`:
- Around line 157-170: The code uses query_bytes.unwrap() in the FT.SEARCH
dispatch path (inside the is_text branch) which must be removed; replace the
unwrap with a safe extraction like matching or if-let on query_bytes and return
the same error response if it's None. Specifically, update the block that
defines query_str (referencing is_text, query_bytes and the variable query_str)
to handle the None case without panicking and preserve the existing error
response behavior so the function returns true on invalid/missing query bytes.

---

Nitpick comments:
In `@src/server/conn/handler_monoio/write.rs`:
- Around line 53-56: Replace the ad-hoc SystemTime call used to compute
created_at with the shard-cached timestamp: use ctx.cached_clock.ms() instead of
std::time::SystemTime::now()...duration_since(...).unwrap_or_default(); update
the assignment to let created_at = ctx.cached_clock.ms() as i64 (or otherwise
convert to i64 consistently with surrounding code) so the code honors the
shard-cached-timestamp invariant and avoids the extra syscall and silent zero on
epoch error; locate this change around the created_at binding in the write
handler (created_at, ctx.cached_clock.ms()) and ensure types match callers.
- Around line 208-213: The code computes `found` by using map_or(false, |reg|
reg.get(&ws_id).is_some()) which triggers clippy::unnecessary_map_or; replace
that pattern with the more idiomatic is_some_and: call
ctx.shard_databases.workspace_registry(ctx.shard_id) into the guard and then use
guard.as_ref().is_some_and(|reg| reg.get(&ws_id).is_some()) so `found` becomes
true only when the registry exists and contains ws_id; update the expression
around the `found` binding in write.rs accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 56319e2f-56ef-4b39-b942-941fceaf1f25

📥 Commits

Reviewing files that changed from the base of the PR and between f4fcd5a and 360915e.

📒 Files selected for processing (24)
  • .planning
  • src/admin/metrics_setup.rs
  • src/command/graph/graph_read.rs
  • src/persistence/wal_v3/segment.rs
  • src/server/conn/handler_monoio.rs
  • src/server/conn/handler_monoio/dispatch.rs
  • src/server/conn/handler_monoio/ft.rs
  • src/server/conn/handler_monoio/mod.rs
  • src/server/conn/handler_monoio/pubsub.rs
  • src/server/conn/handler_monoio/read.rs
  • src/server/conn/handler_monoio/txn.rs
  • src/server/conn/handler_monoio/write.rs
  • src/server/conn/handler_sharded.rs
  • src/server/conn/handler_sharded/dispatch.rs
  • src/server/conn/handler_sharded/ft.rs
  • src/server/conn/handler_sharded/mod.rs
  • src/server/conn/handler_sharded/pubsub.rs
  • src/server/conn/handler_sharded/read.rs
  • src/server/conn/handler_sharded/txn.rs
  • src/server/conn/handler_sharded/write.rs
  • src/server/conn/mod.rs
  • src/shard/event_loop.rs
  • src/shard/spsc_handler.rs
  • src/shard/timers.rs

Comment on lines +576 to +604
if sub_bytes.eq_ignore_ascii_case(b"TRACKING") {
match crate::command::client::parse_tracking_args(cmd_args) {
Ok(config_parsed) => {
if config_parsed.enable {
conn.tracking_state.enabled = true;
conn.tracking_state.bcast = config_parsed.bcast;
conn.tracking_state.noloop = config_parsed.noloop;
conn.tracking_state.optin = config_parsed.optin;
conn.tracking_state.optout = config_parsed.optout;

if conn.tracking_rx.is_none() {
let (tx, rx) = channel::mpsc_bounded::<Frame>(256);
conn.tracking_state.invalidation_tx = Some(tx.clone());
conn.tracking_rx = Some(rx);

let mut table = ctx.tracking_table.borrow_mut();
table.register_client(client_id, tx);
if let Some(target) = config_parsed.redirect {
table.set_redirect(client_id, target);
}
for prefix in &config_parsed.prefixes {
table.register_prefix(
client_id,
prefix.clone(),
config_parsed.noloop,
);
}
}
responses.push(Frame::SimpleString(Bytes::from_static(b"OK")));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Re-register tracking state on repeated CLIENT TRACKING ON.

When tracking is already enabled, this updates conn.tracking_state but skips table registration because tracking_rx is already Some. New REDIRECT or PREFIX options are ignored, leaving invalidation routing stale until the client turns tracking off.

Suggested fix
                         if config_parsed.enable {
+                            ctx.tracking_table.borrow_mut().untrack_all(client_id);
                             conn.tracking_state.enabled = true;
                             conn.tracking_state.bcast = config_parsed.bcast;
                             conn.tracking_state.noloop = config_parsed.noloop;
                             conn.tracking_state.optin = config_parsed.optin;
                             conn.tracking_state.optout = config_parsed.optout;
 
                             if conn.tracking_rx.is_none() {
                                 let (tx, rx) = channel::mpsc_bounded::<Frame>(256);
                                 conn.tracking_state.invalidation_tx = Some(tx.clone());
                                 conn.tracking_rx = Some(rx);
+                            }
 
-                                let mut table = ctx.tracking_table.borrow_mut();
-                                table.register_client(client_id, tx);
+                            if let Some(tx) = conn.tracking_state.invalidation_tx.clone() {
+                                let mut table = ctx.tracking_table.borrow_mut();
+                                table.register_client(client_id, tx);
                                 if let Some(target) = config_parsed.redirect {
                                     table.set_redirect(client_id, target);
                                 }
                                 for prefix in &config_parsed.prefixes {
                                     table.register_prefix(
@@
                                         config_parsed.noloop,
                                     );
                                 }
                             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_monoio/dispatch.rs` around lines 576 - 604, The
handler currently skips re-registering the client in ctx.tracking_table when
CLIENT TRACKING ON is called a second time because conn.tracking_rx is Some; to
fix, after updating conn.tracking_state (and even if
conn.tracking_rx.is_some()), obtain the existing invalidation sender from
conn.tracking_state.invalidation_tx (clone it), borrow ctx.tracking_table and
call register_client(client_id, tx_clone), then apply any new redirect via
set_redirect(client_id, target) and re-register any prefixes via
register_prefix(client_id, prefix.clone(), config_parsed.noloop) so
REDIRECT/PREFIX changes take effect without requiring TRACKING OFF; use the same
symbol names (parse_tracking_args, conn.tracking_state, conn.tracking_rx,
conn.tracking_state.invalidation_tx,
ctx.tracking_table.register_client/set_redirect/register_prefix, client_id,
config_parsed) to locate and implement the change.

Comment on lines +366 to +456
if cmd.eq_ignore_ascii_case(b"FT.INFO") {
let response = {
let vs = ctx.shard_databases.vector_store(ctx.shard_id);
let ts = ctx.shard_databases.text_store(ctx.shard_id);
crate::command::vector_search::ft_info(&vs, &ts, cmd_args)
};
responses.push(response);
return true;
}
if cmd.eq_ignore_ascii_case(b"FT._LIST") {
let response = {
let vs = ctx.shard_databases.vector_store(ctx.shard_id);
crate::command::vector_search::ft_list(&vs)
};
responses.push(response);
return true;
}
if cmd.eq_ignore_ascii_case(b"FT.COMPACT") {
let response = {
let mut vs = ctx.shard_databases.vector_store(ctx.shard_id);
let mut ts = ctx.shard_databases.text_store(ctx.shard_id);
crate::command::vector_search::ft_compact(&mut vs, &mut ts, cmd_args)
};
responses.push(response);
return true;
}
if cmd.eq_ignore_ascii_case(b"FT.CACHESEARCH") {
let response = {
let mut vs = ctx.shard_databases.vector_store(ctx.shard_id);
crate::command::vector_search::cache_search::ft_cachesearch(&mut vs, cmd_args)
};
responses.push(response);
return true;
}
if cmd.eq_ignore_ascii_case(b"FT.CONFIG") {
let response = {
let mut vs = ctx.shard_databases.vector_store(ctx.shard_id);
let mut ts = ctx.shard_databases.text_store(ctx.shard_id);
crate::command::vector_search::ft_config(&mut vs, &mut ts, cmd_args)
};
responses.push(response);
return true;
}
// FT.RECOMMEND, FT.NAVIGATE, FT.EXPAND need db/graph — dispatch locally
if cmd.eq_ignore_ascii_case(b"FT.RECOMMEND")
|| cmd.eq_ignore_ascii_case(b"FT.NAVIGATE")
|| cmd.eq_ignore_ascii_case(b"FT.EXPAND")
{
let response = {
let sdb = &ctx.shard_databases;
let mut vs = sdb.vector_store(ctx.shard_id);
if cmd.eq_ignore_ascii_case(b"FT.RECOMMEND") {
let mut db_guard = sdb.write_db(ctx.shard_id, 0);
crate::command::vector_search::recommend::ft_recommend(
&mut vs,
cmd_args,
Some(&mut *db_guard),
)
} else if cmd.eq_ignore_ascii_case(b"FT.NAVIGATE") {
#[cfg(feature = "graph")]
{
let graph_guard = sdb.graph_store_read(ctx.shard_id);
crate::command::vector_search::navigate::ft_navigate(
&mut vs,
Some(&graph_guard),
cmd_args,
None,
)
}
#[cfg(not(feature = "graph"))]
{
Frame::Error(Bytes::from_static(
b"ERR FT.NAVIGATE requires graph feature",
))
}
} else {
// FT.EXPAND
#[cfg(feature = "graph")]
{
let graph_guard = sdb.graph_store_read(ctx.shard_id);
crate::command::vector_search::ft_expand(&graph_guard, cmd_args)
}
#[cfg(not(feature = "graph"))]
{
Frame::Error(Bytes::from_static(b"ERR FT.EXPAND requires graph feature"))
}
}
};
responses.push(response);
return true;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Compare multi-shard FT.* routing in monoio vs sharded handlers.

rg -n -C4 'cmd\.eq_ignore_ascii_case\(b"FT\.INFO"\)|broadcast_vector_command|FT\.COMPACT|FT\.CONFIG' \
  --glob 'src/server/conn/handler_*/*.rs'

Repository: pilotspace/moon

Length of output: 8579


🏁 Script executed:

# Check the structure around broadcast_vector_command in sharded handler
cat -n src/server/conn/handler_sharded/ft.rs | sed -n '340,370p'

Repository: pilotspace/moon

Length of output: 1307


🏁 Script executed:

# Check what happens after broadcast_vector_command in sharded handler
cat -n src/server/conn/handler_sharded/ft.rs | sed -n '358,380p'

Repository: pilotspace/moon

Length of output: 1062


🏁 Script executed:

# Check the full command dispatch structure in monoio handler to see where broadcasts should occur
cat -n src/server/conn/handler_monoio/ft.rs | sed -n '200,280p'

Repository: pilotspace/moon

Length of output: 5040


🏁 Script executed:

# Look for any broadcast calls in monoio handler for FT management commands
rg -n 'broadcast_vector_command|broadcast' src/server/conn/handler_monoio/ft.rs

Repository: pilotspace/moon

Length of output: 142


🏁 Script executed:

# See the full command dispatch in sharded handler before broadcast_vector_command
cat -n src/server/conn/handler_sharded/ft.rs | sed -n '30,360p'

Repository: pilotspace/moon

Length of output: 18756


🏁 Script executed:

# Check if FT.INFO, COMPACT, CONFIG are handled BEFORE broadcast in sharded
rg -n 'FT\.INFO|FT\.COMPACT|FT\.CONFIG' src/server/conn/handler_sharded/ft.rs | head -20

Repository: pilotspace/moon

Length of output: 324


🏁 Script executed:

# Look at the broadcast_vector_command implementation to understand what it does
rg -n 'fn broadcast_vector_command|impl.*broadcast' src/shard/coordinator.rs | head -10

Repository: pilotspace/moon

Length of output: 101


🏁 Script executed:

# Look at the broadcast_vector_command implementation
cat -n src/shard/coordinator.rs | sed -n '829,900p'

Repository: pilotspace/moon

Length of output: 3127


🏁 Script executed:

# Check if broadcast_vector_command handles FT.INFO, COMPACT, CONFIG locally
cat -n src/shard/coordinator.rs | sed -n '829,950p'

Repository: pilotspace/moon

Length of output: 5793


🏁 Script executed:

# Check the end of broadcast_vector_command to see the final fallback handler
rg -n 'fn broadcast_vector_command' -A 100 src/shard/coordinator.rs | tail -60

Repository: pilotspace/moon

Length of output: 2553


🏁 Script executed:

# Check if monoio handler is conditional on single-shard mode
rg -n 'handler_monoio|single_shard|num_shards == 1' src/server/conn/*.rs | head -20

Repository: pilotspace/moon

Length of output: 297


🏁 Script executed:

# Check the comment at the top of monoio handler to understand its purpose
head -50 src/server/conn/handler_monoio/ft.rs

Repository: pilotspace/moon

Length of output: 2077


🏁 Script executed:

# Verify if monoio handler is used in multi-shard contexts
rg -n 'use.*handler_monoio|handler_monoio::' src/ | head -10

Repository: pilotspace/moon

Length of output: 451


🏁 Script executed:

# Check what happens after the FT.CONFIG handler in monoio (around line 400)
cat -n src/server/conn/handler_monoio/ft.rs | sed -n '400,450p'

Repository: pilotspace/moon

Length of output: 2791


🏁 Script executed:

# Look for any fallback broadcast or final catch-all in monoio after line 410
cat -n src/server/conn/handler_monoio/ft.rs | sed -n '410,470p'

Repository: pilotspace/moon

Length of output: 3259


🏁 Script executed:

# Check what sharded handler does with FT.INFO - is it before or after broadcast?
cat -n src/server/conn/handler_sharded/ft.rs | sed -n '1,370p' | grep -A5 -B5 'FT.INFO\|broadcast_vector_command'

Repository: pilotspace/moon

Length of output: 947


🏁 Script executed:

# Look at the exact behavior in handler_sharded to see if FT.INFO is in broadcast or before it
rg -n 'if cmd.eq_ignore_ascii_case.*FT\.INFO' src/server/conn/handler_sharded/ft.rs -B50 | tail -60

Repository: pilotspace/moon

Length of output: 2498


🏁 Script executed:

# Look at the full context after line 457 in monoio to see if there's a fallback broadcast
cat -n src/server/conn/handler_monoio/ft.rs | sed -n '456,480p'

Repository: pilotspace/moon

Length of output: 1445


🏁 Script executed:

# Check if monoio is supposed to have a fallback broadcast like sharded does
rg -n 'broadcast_vector_command' src/server/conn/handler_monoio/ft.rs

Repository: pilotspace/moon

Length of output: 142


🏁 Script executed:

# Examine the structure: compare lines 1-30 of both handlers
diff <(head -30 src/server/conn/handler_monoio/ft.rs) <(head -30 src/server/conn/handler_sharded/ft.rs)

Repository: pilotspace/moon

Length of output: 146


🏁 Script executed:

# Confirm the exact line count of monoio handler's multi-shard section
wc -l src/server/conn/handler_monoio/ft.rs

Repository: pilotspace/moon

Length of output: 99


🏁 Script executed:

# Check if there are any comments explaining why FT.INFO/COMPACT/CONFIG are local-only
rg -n 'FT\.INFO|FT\.COMPACT|FT\.CONFIG' src/server/conn/handler_monoio/ft.rs -B3 -A1

Repository: pilotspace/moon

Length of output: 1750


🏁 Script executed:

# Verify that sharded handler's dispatch_vector_command (in spsc_handler) handles these commands
rg -n 'fn dispatch_vector_command' -A 20 src/shard/spsc_handler.rs | head -40

Repository: pilotspace/moon

Length of output: 1074


🏁 Script executed:

# Confirm dispatch_vector_command handles FT.INFO, COMPACT, CONFIG
rg -n 'FT\.INFO|FT\.COMPACT|FT\.CONFIG' src/shard/spsc_handler.rs

Repository: pilotspace/moon

Length of output: 237


🏁 Script executed:

# Get a complete picture: where do monoio and sharded differ in multi-shard path?
cat -n src/server/conn/handler_monoio/ft.rs | sed -n '25,50p'

Repository: pilotspace/moon

Length of output: 1384


🏁 Script executed:

# Check if there are any comments explaining the design choice for monoio's FT.INFO handling
sed -n '360,410p' src/server/conn/handler_monoio/ft.rs

Repository: pilotspace/moon

Length of output: 2183


🏁 Script executed:

# Compare with sharded handler structure - does it have explicit comment about fallback?
sed -n '350,370p' src/server/conn/handler_sharded/ft.rs

Repository: pilotspace/moon

Length of output: 707


🏁 Script executed:

# Final check: are there any TODOs or FIXMEs related to this in monoio?
rg -n 'TODO|FIXME|BUG' src/server/conn/handler_monoio/ft.rs | head -20

Repository: pilotspace/moon

Length of output: 41


Broadcast FT.INFO, FT.COMPACT, FT.CONFIG to all shards in monoio multi-shard mode.

In the monoio handler's multi-shard path (lines 366–408), FT.INFO, FT.COMPACT, FT.CONFIG are handled locally against ctx.shard_id only and return early. The sharded handler broadcasts these commands via broadcast_vector_command to all shards. This causes:

  • FT.INFO to report only the current shard's state instead of aggregated cluster state
  • FT.COMPACT to compact only one shard instead of all shards
  • FT.CONFIG to configure only one shard instead of all shards

Replace the local handlers for these commands with calls to broadcast_vector_command (like FT.CREATE/FT.DROPINDEX at line 354) to maintain consistency with the sharded handler.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_monoio/ft.rs` around lines 366 - 456, The monoio
multi-shard handlers for FT.INFO, FT.COMPACT, and FT.CONFIG currently run
locally against ctx.shard_id and return early; change each of those branches to
call broadcast_vector_command like FT.CREATE/FT.DROPINDEX do (use the same
signature with ctx, cmd, cmd_args) so the command is dispatched to all shards
instead of a single shard; locate the blocks that call
crate::command::vector_search::ft_info, ft_compact, and ft_config and replace
them with broadcast_vector_command(ctx, cmd, cmd_args, /* same options used by
other broadcasts */), pushing the broadcast result into responses and returning
true.

Comment on lines +472 to +518
let inlined = try_inline_dispatch_loop(
&mut read_buf,
&mut write_buf,
&ctx.shard_databases,
ctx.shard_id,
conn.selected_db,
&ctx.aof_tx,
ctx.cached_clock.ms(),
ctx.num_shards,
can_inline_writes,
&ctx.runtime_config,
);
crate::admin::metrics_setup::record_dispatch_local_inline(inlined as u64);
if inlined > 0 && read_buf.is_empty() {
// All commands were inlined -- flush write_buf and continue
if !write_buf.is_empty() {
let data = write_buf.split().freeze();
let (result, _): (std::io::Result<usize>, bytes::Bytes) =
stream.write_all(data).await;
if result.is_err() {
break;
}
}
continue;
}
// If read_buf still has data, fall through to normal Frame parsing
// for remaining commands. Inlined responses are already in write_buf.
}

// Parse all complete frames from the read buffer (reuse pre-allocated Vec, cap at 1024)
frames.clear();
loop {
match codec.decode_frame(&mut read_buf) {
Ok(Some(frame)) => {
frames.push(frame);
if frames.len() >= 1024 {
break;
}
}
Ok(None) => break,
Err(_) => return (MonoioHandlerResult::Done, None), // parse error, close connection
}
}

if frames.is_empty() {
continue;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Flush inline responses before waiting for more input.

If try_inline_dispatch_loop writes a response and leaves an incomplete next command in read_buf, frames.is_empty() hits continue and the completed inline response remains buffered indefinitely until the client sends more bytes.

Proposed fix
         if frames.is_empty() {
+            if !write_buf.is_empty() {
+                let data = write_buf.split().freeze();
+                let (result, _): (std::io::Result<usize>, bytes::Bytes) =
+                    stream.write_all(data).await;
+                if result.is_err() {
+                    break;
+                }
+            }
             continue;
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_monoio/mod.rs` around lines 472 - 518, The
inlined-path can produce responses that remain buffered when
try_inline_dispatch_loop returns >0 but read_buf still contains an incomplete
command; fix by flushing write_buf whenever inlined > 0 before falling through
or looping: after the try_inline_dispatch_loop call check if inlined > 0 and if
write_buf is not empty perform the same stream.write_all(data).await handling
(and break/return on error) so responses are sent immediately (use the existing
write_buf split/freeze and error handling), then only decide whether to continue
based on read_buf.is_empty(); reference try_inline_dispatch_loop, write_buf,
read_buf, and stream.write_all to locate where to add this flush.

Comment on lines +588 to +699
// --- Connection-level commands (dispatched to dispatch.rs) ---
if dispatch::try_handle_cluster(cmd, cmd_args, ctx, &mut responses) {
continue;
}
if dispatch::try_handle_evalsha(cmd, cmd_args, &conn, ctx, &mut responses) {
continue;
}
if dispatch::try_handle_eval(cmd, cmd_args, &conn, ctx, &mut responses) {
continue;
}
if dispatch::try_handle_script(cmd, cmd_args, ctx, &mut responses) {
continue;
}
if dispatch::try_handle_cluster_routing(cmd, cmd_args, &mut conn, ctx, &mut responses) {
continue;
}
if dispatch::try_handle_auth(
cmd,
cmd_args,
&mut conn,
ctx,
&peer_addr,
&mut auth_delay_ms,
&mut responses,
) {
continue;
}
if dispatch::try_handle_hello(
cmd,
cmd_args,
&mut conn,
ctx,
client_id,
&peer_addr,
&mut auth_delay_ms,
&mut responses,
) {
continue;
}
if dispatch::try_handle_acl(cmd, cmd_args, &mut conn, ctx, &peer_addr, &mut responses) {
continue;
}
if dispatch::try_handle_config(cmd, cmd_args, ctx, &mut responses) {
continue;
}
if dispatch::try_handle_replicaof(cmd, cmd_args, ctx, &mut responses) {
continue;
}
if dispatch::try_handle_replconf(cmd, cmd_args, &mut responses) {
continue;
}
if dispatch::try_handle_info(cmd, cmd_args, &conn, ctx, &mut responses) {
continue;
}
if dispatch::try_enforce_readonly(cmd, ctx, &mut responses) {
continue;
}
// CLIENT early (ID, SETNAME, GETNAME, TRACKING) -- admin subcmds fall through to ACL gate
if dispatch::try_handle_client_early(
cmd,
cmd_args,
client_id,
&mut conn,
ctx,
&mut responses,
) {
continue;
}
// --- Pub/sub commands ---
if pubsub::try_handle_publish(
cmd,
cmd_args,
&conn,
ctx,
&mut responses,
&mut publish_batches,
) {
continue;
}
match pubsub::try_handle_subscribe_entry(
cmd,
cmd_args,
&mut conn,
ctx,
&peer_addr,
&mut responses,
&mut codec,
&mut write_buf,
&mut stream,
)
.await
{
pubsub::SubscribeResult::NotSubscribe => {}
pubsub::SubscribeResult::ArgError => continue,
pubsub::SubscribeResult::Subscribed => break,
pubsub::SubscribeResult::WriteError => return (MonoioHandlerResult::Done, None),
}
if pubsub::try_handle_unsubscribe(cmd, &mut responses) {
continue;
}
if pubsub::try_handle_pubsub_introspection(cmd, cmd_args, ctx, &mut responses) {
continue;
}
// --- Persistence + ACL gate + CLIENT admin + Functions ---
if dispatch::try_handle_persistence(cmd, ctx, &mut responses) {
continue;
}
if dispatch::try_enforce_acl(cmd, cmd_args, &mut conn, ctx, &peer_addr, &mut responses)
{
continue;
}
if dispatch::try_handle_client_admin(cmd, cmd_args, client_id, &conn, &mut responses) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Move ACL enforcement before executable command handlers.

EVAL, EVALSHA, SCRIPT, CONFIG, REPLICAOF, INFO, pub/sub, and other handlers can consume commands before try_enforce_acl runs at Line 695. Authenticated users with restricted ACLs can therefore bypass command/key checks for these paths.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_monoio/mod.rs` around lines 588 - 699, ACL
enforcement is being run too late (dispatch::try_enforce_acl), allowing handlers
like dispatch::try_handle_eval, dispatch::try_handle_evalsha,
dispatch::try_handle_script, dispatch::try_handle_config,
dispatch::try_handle_replicaof, dispatch::try_handle_info and the pubsub
handlers to consume commands before ACL checks; move the call to
dispatch::try_enforce_acl earlier in the request dispatch sequence (e.g.,
immediately after authentication/hello handling and before any
executable-command handlers and pubsub handling) so that
dispatch::try_enforce_acl(cmd, cmd_args, &mut conn, ctx, &peer_addr, &mut
responses) runs before calling the listed try_handle_* and pubsub::try_handle_*
functions.

Comment on lines +656 to +790
// --- Pub/sub commands ---
if pubsub::try_handle_publish(
cmd,
cmd_args,
&conn,
ctx,
&mut responses,
&mut publish_batches,
) {
continue;
}
match pubsub::try_handle_subscribe_entry(
cmd,
cmd_args,
&mut conn,
ctx,
&peer_addr,
&mut responses,
&mut codec,
&mut write_buf,
&mut stream,
)
.await
{
pubsub::SubscribeResult::NotSubscribe => {}
pubsub::SubscribeResult::ArgError => continue,
pubsub::SubscribeResult::Subscribed => break,
pubsub::SubscribeResult::WriteError => return (MonoioHandlerResult::Done, None),
}
if pubsub::try_handle_unsubscribe(cmd, &mut responses) {
continue;
}
if pubsub::try_handle_pubsub_introspection(cmd, cmd_args, ctx, &mut responses) {
continue;
}
// --- Persistence + ACL gate + CLIENT admin + Functions ---
if dispatch::try_handle_persistence(cmd, ctx, &mut responses) {
continue;
}
if dispatch::try_enforce_acl(cmd, cmd_args, &mut conn, ctx, &peer_addr, &mut responses)
{
continue;
}
if dispatch::try_handle_client_admin(cmd, cmd_args, client_id, &conn, &mut responses) {
continue;
}
if dispatch::try_handle_functions(
cmd,
cmd_args,
&conn,
ctx,
&func_registry,
&mut responses,
) {
continue;
}

// --- TXN.BEGIN / TXN.COMMIT / TXN.ABORT ---
if txn::try_handle_txn_begin(cmd, cmd_args, &mut conn, ctx, &mut responses) {
continue;
}
if txn::try_handle_txn_commit(cmd, cmd_args, &mut conn, ctx, &mut responses) {
continue;
}
if txn::try_handle_txn_abort(cmd, cmd_args, &mut conn, ctx, &mut responses) {
continue;
}

// --- TEMPORAL.SNAPSHOT_AT / TEMPORAL.INVALIDATE ---
if txn::try_handle_temporal_snapshot_at(cmd, cmd_args, ctx, &mut responses) {
continue;
}
if txn::try_handle_temporal_invalidate(cmd, cmd_args, ctx, &mut responses) {
continue;
}

// --- WS.* ---
if write::try_handle_ws_command(cmd, cmd_args, &mut conn, ctx, &mut responses) {
continue;
}

// --- MQ.* ---
if write::try_handle_mq_command(cmd, cmd_args, &mut conn, ctx, &mut responses) {
continue;
}

// --- MULTI / EXEC / DISCARD ---
if write::try_handle_multi_exec(cmd, &mut conn, ctx, &mut responses) {
continue;
}

// --- Workspace key prefix injection ---
// MUST happen before key_to_shard() so the {ws_id} hash tag determines
// shard routing. This is the ONLY code path where workspace prefixing
// occurs (WS-07, WS-12). All subsequent dispatch uses cmd_args (shadowed).
let rewritten = conn
.workspace_id
.as_ref()
.map(|ws_id| workspace_rewrite_args(cmd, cmd_args, ws_id));
let cmd_args: &[Frame] = rewritten.as_deref().unwrap_or(cmd_args);

// --- BLOCKING COMMANDS ---
match dispatch::try_handle_blocking(
cmd,
cmd_args,
&mut conn,
ctx,
&mut responses,
&mut codec,
&mut write_buf,
&mut stream,
&shutdown,
)
.await
{
dispatch::BlockingResult::NotBlocking => {}
dispatch::BlockingResult::Queued => continue,
dispatch::BlockingResult::Handled => break,
dispatch::BlockingResult::WriteError => return (MonoioHandlerResult::Done, None),
}

// --- MULTI queue mode: queue commands when in transaction ---
if conn.in_multi {
conn.command_queue.push(frame);
responses.push(Frame::SimpleString(Bytes::from_static(b"QUEUED")));
continue;
}

// --- Cross-shard aggregation commands: KEYS, SCAN, DBSIZE + multi-key ---
if dispatch::try_handle_cross_shard_commands(cmd, cmd_args, &conn, ctx, &mut responses)
.await
{
continue;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Record a dispatch-path label for commands consumed before routing.

These handlers continue before the record_dispatch_local, record_dispatch_cross_read_fastpath, or record_dispatch_cross_spsc calls. If moon_dispatch_path_total is intended to classify every command, commands like PUBLISH, SUBSCRIBE, TXN.*, TEMPORAL.*, WS.*, MQ.*, and blocking commands are missing from the denominator.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_monoio/mod.rs` around lines 656 - 790, Several
command handlers (e.g. pubsub::try_handle_publish,
pubsub::try_handle_subscribe_entry, pubsub::try_handle_unsubscribe,
pubsub::try_handle_pubsub_introspection, dispatch::try_handle_persistence,
dispatch::try_enforce_acl, dispatch::try_handle_client_admin,
dispatch::try_handle_functions,
txn::try_handle_txn_begin/try_handle_txn_commit/try_handle_txn_abort,
txn::try_handle_temporal_snapshot_at/try_handle_temporal_invalidate,
write::try_handle_ws_command, write::try_handle_mq_command,
write::try_handle_multi_exec, dispatch::try_handle_blocking and the MULTI queue
branch) return or continue before any dispatch-path metric is recorded; add
calls to the existing recording helpers (record_dispatch_local,
record_dispatch_cross_read_fastpath, or record_dispatch_cross_spsc as
appropriate) at each early exit/continue/return/break so every consumed command
increments moon_dispatch_path_total with the correct label (choose the same
label logic used after routing), ensuring you insert the metric call immediately
before each continue/return/break in those handler outcome branches.

Comment on lines +1214 to +1219
// Use the db_index captured with the first command (all commands in a
// pipeline batch targeting the same shard share the same db_index).
let batch_db = entries.first().map(|(_, _, _, _, db)| *db).unwrap_or(conn.selected_db);
let (meta, commands): (Vec<(usize, Option<Bytes>, Bytes)>, Vec<std::sync::Arc<Frame>>) =
entries.into_iter().map(|(idx, arc_frame, aof, cmd, _db)| ((idx, aof, cmd), arc_frame)).unzip();
let msg = ShardMessage::PipelineBatchSlotted { db_index: batch_db, commands, response_slot: crate::shard::dispatch::ResponseSlotPtr(slot_ptr) };
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Split remote batches by DB index.

The entries store each command’s selected_db, but the batch sends only the first entry’s DB. A pipeline that changes DB between two remote commands targeting the same shard can execute later commands against the wrong DB. Key remote_groups by (target, db_index) or flush when selected_db changes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded/mod.rs` around lines 1214 - 1219, The
pipeline batching currently uses the first entry's selected_db (batch_db) for
all commands, causing later commands to run in the wrong DB if selected_db
changes; update the batching logic that builds
ShardMessage::PipelineBatchSlotted to split or key remote_groups by (target,
db_index) instead of only target, or flush and start a new batch whenever an
entry's selected_db differs from the current batch_db. Locate the code that
reads entries, the batch_db calculation, and the creation of
ShardMessage::PipelineBatchSlotted and ensure grouping uses each entry's
selected_db (or groups by (target, selected_db)) so each pipeline batch sent to
the shard carries the correct db_index for all commands.

Comment on lines +156 to +163
for arg in cmd_args {
if let Some(ch) = extract_bytes(arg) {
{ ctx.pubsub_registry.write().unsubscribe(ch.as_ref(), conn.subscriber_id); }
conn.subscription_count = conn.subscription_count.saturating_sub(1);
unpropagate_subscription(&ctx.all_remote_sub_maps, &ch, ctx.shard_id, ctx.num_shards, false);
write_buf.clear();
crate::protocol::serialize(&pubsub::unsubscribe_response(&ch, conn.subscription_count), write_buf);
if stream.write_all(write_buf).await.is_err() { sub_break = true; break; }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Only decrement/unpropagate when a subscription was actually removed.

UNSUBSCRIBE channel and PUNSUBSCRIBE pattern currently decrement conn.subscription_count and update remote maps even if this subscriber was not registered for that channel/pattern. That can undercount subscriptions and remove remote fanout metadata for other valid subscriptions.

Suggested direction
-                                            { ctx.pubsub_registry.write().unsubscribe(ch.as_ref(), conn.subscriber_id); }
-                                            conn.subscription_count = conn.subscription_count.saturating_sub(1);
-                                            unpropagate_subscription(&ctx.all_remote_sub_maps, &ch, ctx.shard_id, ctx.num_shards, false);
+                                            let before = conn.subscription_count;
+                                            {
+                                                ctx.pubsub_registry
+                                                    .write()
+                                                    .unsubscribe(ch.as_ref(), conn.subscriber_id);
+                                            }
+                                            conn.subscription_count = ctx
+                                                .pubsub_registry
+                                                .read()
+                                                .total_subscription_count(conn.subscriber_id);
+                                            if conn.subscription_count < before {
+                                                unpropagate_subscription(&ctx.all_remote_sub_maps, &ch, ctx.shard_id, ctx.num_shards, false);
+                                            }

Apply the same pattern to PUNSUBSCRIBE.

Also applies to: 185-192

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded/pubsub.rs` around lines 156 - 163, The
unsubscribe handling currently always decrements conn.subscription_count and
calls unpropagate_subscription even when the subscriber wasn't actually removed;
change the logic in the UNSUBSCRIBE loop (and mirror the same fix in the
PUNSUBSCRIBE block) to check the return value of
ctx.pubsub_registry.write().unsubscribe (or whatever unsubscribe API returns)
and only when it indicates a removal: decrement conn.subscription_count, call
unpropagate_subscription(&ctx.all_remote_sub_maps, &ch, ctx.shard_id,
ctx.num_shards, false), and then serialize/send
pubsub::unsubscribe_response(&ch, conn.subscription_count); if unsubscribe did
not remove the subscriber, skip the decrement/unpropagate and still send the
appropriate response/count without mutating state.

Comment on lines +210 to +216
} else if cmd.eq_ignore_ascii_case(b"RESET") {
{ ctx.pubsub_registry.write().unsubscribe_all(conn.subscriber_id); }
{ ctx.pubsub_registry.write().punsubscribe_all(conn.subscriber_id); }
conn.subscription_count = 0;
write_buf.clear();
crate::protocol::serialize(&Frame::SimpleString(Bytes::from_static(b"RESET")), write_buf);
if stream.write_all(write_buf).await.is_err() { sub_break = true; break; }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unpropagate subscriptions on RESET.

RESET clears the local registry but discards the removed channels/patterns, leaving all_remote_sub_maps stale. Since disconnect cleanup later calls unsubscribe_all again and gets empty sets, remote shards can keep routing PUBLISH fanout to this shard after the client reset.

Proposed fix
-                                { ctx.pubsub_registry.write().unsubscribe_all(conn.subscriber_id); }
-                                { ctx.pubsub_registry.write().punsubscribe_all(conn.subscriber_id); }
+                                let removed_channels = {
+                                    ctx.pubsub_registry.write().unsubscribe_all(conn.subscriber_id)
+                                };
+                                let removed_patterns = {
+                                    ctx.pubsub_registry.write().punsubscribe_all(conn.subscriber_id)
+                                };
+                                for ch in &removed_channels {
+                                    unpropagate_subscription(
+                                        &ctx.all_remote_sub_maps,
+                                        ch,
+                                        ctx.shard_id,
+                                        ctx.num_shards,
+                                        false,
+                                    );
+                                }
+                                for pat in &removed_patterns {
+                                    unpropagate_subscription(
+                                        &ctx.all_remote_sub_maps,
+                                        pat,
+                                        ctx.shard_id,
+                                        ctx.num_shards,
+                                        true,
+                                    );
+                                }
+                                if let Ok(addr) = peer_addr.parse::<std::net::SocketAddr>() {
+                                    ctx.pubsub_affinity.write().remove(&addr.ip());
+                                }
                                 conn.subscription_count = 0;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded/pubsub.rs` around lines 210 - 216, The RESET
handler currently clears only local entries
(ctx.pubsub_registry.write().unsubscribe_all / punsubscribe_all) which discards
the removed channel/pattern lists and leaves all_remote_sub_maps stale; change
RESET so it captures the sets returned by unsubscribe_all(conn.subscriber_id)
and punsubscribe_all(conn.subscriber_id) (or adjust those methods to return the
removed channels/patterns), then call the existing propagation/unpropagation
routine (the code path used on normal unsubscribe/punsubscribe) to notify remote
shards to remove this subscriber from all_remote_sub_maps for those
channels/patterns, then set conn.subscription_count = 0 and write the RESET
reply as before.

Comment on lines +112 to +125
// Best-effort cleanup: delete all KV keys with ws
// prefix (WS-03).
{
let prefix = format!("{{{}}}:", ws_id.as_hex());
let mut db_guard = ctx.shard_databases.write_db(ctx.shard_id, 0);
let keys_to_delete: Vec<Vec<u8>> = db_guard
.keys()
.filter(|k| k.as_bytes().starts_with(prefix.as_bytes()))
.map(|k| k.as_bytes().to_vec())
.collect();
for key in &keys_to_delete {
db_guard.remove(key);
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Drop workspace keys from every logical DB.

WS.DROP removes the workspace metadata but only scans DB 0. Workspace-bound clients can write prefixed keys in other selected DBs, leaving data behind after the workspace is dropped.

Proposed fix
-                        {
-                            let prefix = format!("{{{}}}:", ws_id.as_hex());
-                            let mut db_guard = ctx.shard_databases.write_db(ctx.shard_id, 0);
-                            let keys_to_delete: Vec<Vec<u8>> = db_guard
-                                .keys()
-                                .filter(|k| k.as_bytes().starts_with(prefix.as_bytes()))
-                                .map(|k| k.as_bytes().to_vec())
-                                .collect();
-                            for key in &keys_to_delete {
-                                db_guard.remove(key);
-                            }
-                        }
+                        {
+                            let prefix = format!("{{{}}}:", ws_id.as_hex());
+                            for db_index in 0..ctx.shard_databases.db_count() {
+                                let mut db_guard =
+                                    ctx.shard_databases.write_db(ctx.shard_id, db_index);
+                                let keys_to_delete: Vec<Vec<u8>> = db_guard
+                                    .keys()
+                                    .filter(|k| k.as_bytes().starts_with(prefix.as_bytes()))
+                                    .map(|k| k.as_bytes().to_vec())
+                                    .collect();
+                                for key in &keys_to_delete {
+                                    db_guard.remove(key);
+                                }
+                            }
+                        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Best-effort cleanup: delete all KV keys with ws
// prefix (WS-03).
{
let prefix = format!("{{{}}}:", ws_id.as_hex());
let mut db_guard = ctx.shard_databases.write_db(ctx.shard_id, 0);
let keys_to_delete: Vec<Vec<u8>> = db_guard
.keys()
.filter(|k| k.as_bytes().starts_with(prefix.as_bytes()))
.map(|k| k.as_bytes().to_vec())
.collect();
for key in &keys_to_delete {
db_guard.remove(key);
}
}
// Best-effort cleanup: delete all KV keys with ws
// prefix (WS-03).
{
let prefix = format!("{{{}}}:", ws_id.as_hex());
for db_index in 0..ctx.shard_databases.db_count() {
let mut db_guard =
ctx.shard_databases.write_db(ctx.shard_id, db_index);
let keys_to_delete: Vec<Vec<u8>> = db_guard
.keys()
.filter(|k| k.as_bytes().starts_with(prefix.as_bytes()))
.map(|k| k.as_bytes().to_vec())
.collect();
for key in &keys_to_delete {
db_guard.remove(key);
}
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded/write.rs` around lines 112 - 125, The current
WS.DROP cleanup only scans logical DB 0 (in write.rs) and may leave
workspace-prefixed keys in other DBs; update the block that uses
ctx.shard_databases.write_db(ctx.shard_id, 0) to iterate over all logical DB
indices for this shard (e.g., 0..N or via a provided API on ctx.shard_databases)
and perform the same prefix scan/remove in each DB: for each db index call
ctx.shard_databases.write_db(ctx.shard_id, db_index), build the same prefix from
ws_id.as_hex(), collect keys_to_delete and remove them from that DB so workspace
keys are dropped across all logical DBs.

Comment on lines +662 to +710
let (response, wal_records, cypher_intents, cypher_undo_ops) =
if crate::command::graph::is_graph_write_cmd(cmd)
|| (cmd.eq_ignore_ascii_case(b"GRAPH.QUERY")
&& crate::command::graph::is_cypher_write_query(cmd_args))
{
let mut gs = ctx.shard_databases.graph_store_write(ctx.shard_id);
let (resp, cypher_intents, undo_ops) = if cmd.eq_ignore_ascii_case(b"GRAPH.QUERY") {
crate::command::graph::graph_query_or_write(&mut gs, cmd_args)
} else {
(
crate::command::graph::dispatch_graph_write(&mut gs, cmd, cmd_args),
Vec::new(),
Vec::new(),
)
};
let records = gs.drain_wal();
(resp, records, cypher_intents, undo_ops)
} else {
let gs = ctx.shard_databases.graph_store_read(ctx.shard_id);
let resp = crate::command::graph::dispatch_graph_read(&gs, cmd, cmd_args);
(resp, Vec::new(), Vec::new(), Vec::new())
};
// Phase 166: record graph intent for TXN rollback.
if let Some(txn) = conn.active_cross_txn.as_mut() {
let is_node = cmd.eq_ignore_ascii_case(b"GRAPH.ADDNODE");
let is_edge = cmd.eq_ignore_ascii_case(b"GRAPH.ADDEDGE");
if is_node || is_edge {
if let Frame::Integer(id) = &response {
if let Some(gname) = cmd_args.first().and_then(|f| extract_bytes(f)) {
txn.record_graph(*id as u64, is_node, gname);
}
}
}
if !cypher_intents.is_empty() {
if let Some(gname) = cmd_args.first().and_then(|f| extract_bytes(f)) {
for intent in &cypher_intents {
txn.record_graph(intent.entity_id, intent.is_node, gname.clone());
}
}
}
// Phase 174 FIX-01: push undo ops for SET/DELETE/MERGE rollback.
for undo_op in cypher_undo_ops {
txn.record_graph_undo(undo_op);
}
}
for record in wal_records {
ctx.shard_databases
.wal_append(ctx.shard_id, bytes::Bytes::from(record));
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Do not append graph WAL for uncommitted cross-store transactions.

When conn.active_cross_txn is present, graph writes record rollback intents but still drain and append graph WAL immediately. A crash before TXN.ABORT or before a failed transaction completes can replay uncommitted graph mutations as durable state. Defer these WAL records until TXN.COMMIT or encode them as transactional WAL tied to txn_id.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded/write.rs` around lines 662 - 710, The code
currently drains graph WAL into wal_records and immediately calls
ctx.shard_databases.wal_append even when conn.active_cross_txn exists, which can
make uncommitted graph mutations durable; change the logic so that if
conn.active_cross_txn.is_some() you do NOT call wal_append here but instead
store/defer wal_records on the active transaction (e.g., add to txn via a new
method like txn.defer_wal_records or txn.append_transactional_wal) or mark them
as transactional WAL tied to txn_id so they are only appended on TXN.COMMIT (or
discarded on TXN.ABORT); keep existing txn.record_graph/record_graph_undo usage
for rollback intents and ensure ctx.shard_databases.wal_append is only invoked
for non-transactional cases (when conn.active_cross_txn is None) or during the
commit path that consumes the deferred records.

PR #84 CI inherited two pre-existing failures from origin/main that
block the dispatch-telemetry merge:

  - Check (both OS): `pipeline_hset_fires_vector_and_text_auto_index`
    panics with "ResponseError: invalid KNN query syntax" when run
    under `--no-default-features --features runtime-tokio,jemalloc`.
    Root cause: the test file is gated only on `runtime-tokio`, but
    the code it exercises (text FT.SEARCH fast path in
    `handler_monoio/ft.rs:503` and `handler_sharded` equivalent) is
    `#[cfg(feature = "text-index")]`. Without text-index compiled in,
    text queries like `@name:corpus` fall through to the KNN-only
    parser and return that error.

  - Lint (audit-unwrap.sh): 4 un-annotated unwrap/expect calls in
    FT hot-path modules exceeded the baseline of 0.

Fixes
  - `tests/pipeline_auto_index.rs`: tighten the outer cfg to
    `#![cfg(all(feature = "runtime-tokio", feature = "text-index"))]`
    so the file compiles to zero tests under tokio-only, matching
    the actual runtime feature surface it targets.

  - `src/command/vector_search/ft_text_search.rs`: add inline
    `#[allow(clippy::unwrap_used)]` to the three `summarize_opts` /
    `highlight_opts` unwraps in `apply_post_processing`. Each branch
    is gated by a `do_summarize` / `do_highlight` flag that is itself
    derived from `Option::map_or(false, _)`, so reaching the unwrap
    implies `Some`. Invariants documented inline.

  - `src/server/conn/handler_monoio/ft.rs`: same pattern — the
    `query_bytes.unwrap()` at line 165 is reached only inside
    `if is_text { ... }`, where `is_text` was computed as
    `query_bytes.as_ref().map_or(false, _)`.

Validation
  - `bash scripts/audit-unwrap.sh`                              0 (was 4)
  - `cargo test pipeline_auto_index ... tokio+text-index`       1 passed
  - `cargo test pipeline_auto_index ... tokio-only`             0 tests (file skipped)
  - `cargo clippy` default + runtime-tokio,jemalloc             no warnings
  - `cargo fmt --check` on touched files                        clean

Both failures reproduce cleanly on pristine origin/main (verified via
`git worktree add /tmp/moon-main-check origin/main`), confirming this
is pre-existing breakage the PR #84 stack happens to inherit — not a
regression caused by the telemetry or Phase 176 refactor commits.

author: Tin Dang
Adds Phase 177 dispatch-observability entry (new moon_dispatch_path_total
counter and its four labels) plus the two CI hygiene fixes that unblock
this PR. Satisfies the CI changelog guard for PR #84.

author: Tin Dang
@TinDang97 TinDang97 merged commit 972643a into main Apr 22, 2026
5 of 8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant