Skip to content

Refactor STREAM_FORWARDING (#98) into client_req_stream API #104

Closed
filimonov wants to merge 4 commits into
ClickHouse:streamfrom
filimonov:client-req-stream-on-stream
Closed

Refactor STREAM_FORWARDING (#98) into client_req_stream API #104
filimonov wants to merge 4 commits into
ClickHouse:streamfrom
filimonov:client-req-stream-on-stream

Conversation

@filimonov
Copy link
Copy Markdown

@filimonov filimonov commented Apr 17, 2026

It's a PR to #98 :)

Summary

Replace the ad-hoc "recipe in a comment" contract that Keeper followed to forward pipelined client requests to the leader with a typed NuRaft abstraction: raft_server::open_client_req_stream() returns a client_req_stream that is single-channel, ordered, fail-closed, and term-fenced to the current leader. The local fast path delegates to append_entries_ext; the remote path owns one rpc_client and sends client_request messages with the STREAM_FORWARDING wire flag.

Along the way, fixes a real back-compat bug on the server side and tightens several latent failure modes that the old hand-rolled code had.

Fixes and improvements vs. baseline

correctness fixes

  1. expected_term_ gating on the wire flag. Before this change, any non-zero client_request.term was treated as a term fence on the leader, even for callers that didn't opt into STREAM_FORWARDING. That broke any external raft_server_handler::process_req user whose client_request happened to carry a term. The check is now strictly gated on the STREAM_FORWARDING wire flag.

  2. Empty-batch no-op. Old Keeper code, if called with an empty requests_for_sessions, forwarded an empty batch to append_entries_ext, which returns accepted=false, OK — old code then treated that as fatal and broke the stream. The stream now short-circuits empty batches.

  3. Local-path exception safety. If append_entries_ext throws synchronously (e.g. store_log_entry mid-batch fail), the old code let the exception propagate without latching the Keeper is_broken flag, so subsequent isBroken() checks returned false on an already-broken stream. The new path latches broken in a try/catch before rethrowing.

  4. Remote-path send-throw + double-completion. If rpc_client::send threw synchronously after partially queuing the handler, both the handler and the catch block could complete the cmd_result. A one-shot CAS latch now guarantees exactly one completion wins.

  5. Meta-callback teardown race. The STREAM_FORWARDING teardown-on-reject decision is now computed inside on_resp_ready after write_resp_meta_ runs. Without this, a meta callback that mutates resp->accepted_ could desync the teardown from the serialized response and leave the session open on a rejection.

API hygiene / robustness

  1. Typed API. raft_server::open_client_req_streamclient_req_stream. Keeper no longer hand-builds req_msg, flips STREAM_FORWARDING_REQUEST, or reaches into asio_service::create_client / get_config()->get_server()->get_endpoint().

  2. async_replication precondition checked at open time. Old code silently serialized behind client_req_timeout_ in sync mode, collapsing throughput with no diagnostic.

  3. streaming_mode_ precondition checked via new rpc_client::supports_pipelining() virtual (default false; asio_rpc_client overrides to return streaming_mode_). Without it, a second in-flight append used to hit the busy-socket assertion in asio_rpc_client::register_req_send.

  4. Lifetime safety. Stream holds wptr<raft_server>; a destroyed server makes the stream permanently broken rather than dereferencing stale state. In-flight completion callbacks share a ptr<shared_state> so they can still latch broken after the stream is destroyed.

  5. Correct src on forwarded req_msg. Old Keeper used src=0, which is a legal server id and confused peer-id-sensitive cb_func hooks. The stream now stamps the local server id.

  6. Diagnostic factory return. open_client_req_stream takes an optional cmd_result_code* out_err so callers can log the specific reason the stream could not be opened (CANCELLED / NOT_LEADER / FAILED). Keeper uses this for log lines.

Invariant / maintainability

  1. Structural no-gap invariant in handle_cli_req. New reject_cli_req helper is the sole funnel for accepted=false responses, all of them in the pre-store phase. A three-phase banner (validate / append+store / completion) makes the structural rule visible, and a phase-3 runtime guard (p_ft + assert(false)) catches any future regression that would silently break STREAM_FORWARDING's close-on-reject correctness.

  2. supports_pipelining() transport capability. New virtual on rpc_client; alternate transports can cleanly opt in.

  3. Docs moved to docs/stream_forwarding.md. Semantics, preconditions, wire-flag mechanics, no-gap invariant proof sketch, and an Operator Runbook for rolling upgrades are in a discoverable file instead of a 40-line comment in asio_service.cxx.

  4. Umbrella include. nuraft.hxx now exposes client_req_stream so consumers get the full API via the standard entry point.

  5. Reduced Keeper ↔ NuRaft coupling. Keeper went from ~8 direct NuRaft internal symbols (req_msg, msg_type, log_val_type, STREAM_FORWARDING_REQUEST, asio_service::create_client, get_config(), srv_config::get_endpoint(), rpc_client::send) down to one: raft_instance->open_client_req_stream(timeout_ms, &err).

  6. Unit tests. Three deterministic tests cover previously-uncovered behaviors: async_replication refusal with reason-code check, local happy path, and empty-batch no-op.

Consciously not fixed (documented scope)

  • Rolling-upgrade silent degrade. Against a peer that doesn't understand STREAM_FORWARDING the flag is silently ignored and the no-gap guarantee doesn't hold during the upgrade window. A per-feature ACK bit was considered and explicitly declined (would reinvent capability negotiation per-feature, wrong abstraction for NuRaft).

    Feature follows the same deployment discipline as async_replication did: precondition + operator runbook, not runtime enforcement. The correct long-term answer is a general peer-capability handshake, outside the scope of this change.

  • cb_func::ReturnNull combined with stream forwarding. Documented as unsupported. PreAppendLogLeader / AppendLogs / AppendLogFailed ReturnNull mid-batch always closed the session
    with a stored prefix — latent corruption on retry. Original NuRaft ABI retained as-is for this PR; fixing it cleanly needs a version bump/CHANGELOG path that this fork does not expose.

  • Mid-batch store_log_entry throw. The stored prefix will replicate; the stream breaks. No layer can fix this — callers need state-machine-level dedup if they want exactly-once on retry.

Verification

  • NuRaft unit tests: client_req_stream_test 3/3, raft_server_test 21/21.
  • ClickHouse build clean.
  • test_keeper_multinode_simple integration suite: 5/5.

Keeper consumer

That makes src/Coordination/KeeperAppendStream.cpp look like that:

#include <Coordination/KeeperAppendStream.h>

#if USE_NURAFT

#include <Coordination/KeeperServer.h>
#include <Coordination/CoordinationSettings.h>

namespace DB
{

namespace CoordinationSetting
{
    extern const CoordinationSettingsMilliseconds operation_timeout_ms;
}

KeeperAppendStream::KeeperAppendStream(KeeperServer * server_)
    : server(server_)
    , log(getLogger("KeeperAppendStream"))
{}

namespace
{
    const char * codeToString(nuraft::cmd_result_code code)
    {
        switch (code)
        {
            case nuraft::cmd_result_code::OK: return "OK";
            case nuraft::cmd_result_code::CANCELLED: return "CANCELLED (async_replication disabled)";
            case nuraft::cmd_result_code::NOT_LEADER: return "NOT_LEADER (no current leader or raft state uninitialized)";
            case nuraft::cmd_result_code::FAILED: return "FAILED (leader not in config, rpc_client creation failed, or transport does not support pipelining)";
            default: return "unexpected code";
        }
    }
}

bool KeeperAppendStream::isBroken() const
{
    return is_broken->load() || (stream && stream->is_broken());
}

void KeeperAppendStream::markAsBroken()
{
    is_broken->store(true);
}

void KeeperAppendStream::putRequestBatch(const KeeperRequestsForSessions & requests_for_sessions, std::function<void(bool)> callback)
{
    auto fail = [&]
    {
        is_broken->store(true);
        if (callback)
            callback(false);
    };

    if (isBroken())
    {
        fail();
        return;
    }

    if (!stream)
    {
        const uint64_t send_timeout_ms = server->keeper_context->getCoordinationSettings()[CoordinationSetting::operation_timeout_ms].totalMilliseconds();
        nuraft::cmd_result_code err = nuraft::cmd_result_code::OK;
        stream = server->raft_instance->open_client_req_stream(send_timeout_ms, &err);
        if (!stream)
        {
            LOG_WARNING(log, "Failed to open client_req_stream: {}", codeToString(err));
            fail();
            return;
        }
    }

    std::vector<nuraft::ptr<nuraft::buffer>> entries;
    entries.reserve(requests_for_sessions.size());
    for (const auto & request_for_session : requests_for_sessions)
        entries.push_back(IKeeperStateMachine::getZooKeeperLogEntry(request_for_session));

    auto res = stream->append(std::move(entries));
    res->when_ready(
        [is_broken_ = is_broken, callback](nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & r, nuraft::ptr<std::exception> &)
        {
            const bool accepted = r.get_accepted() && r.get_result_code() == nuraft::cmd_result_code::OK;
            if (!accepted)
                is_broken_->store(true);
            if (callback)
                callback(accepted);
        });
}

}

#endif

al13n321 and others added 4 commits April 13, 2026 13:40
Replace the recipe-in-a-comment contract (callers hand-building
client_request messages, managing term+flag and rpc_client themselves)
with a typed NuRaft abstraction: raft_server::open_client_req_stream()
returns a client_req_stream fenced to the current leader's term.

Also fix a back-compat regression in asio_service.cxx: expected_term_ was
being applied to any non-zero client_request.term, not just when
STREAM_FORWARDING was set. Now gated on the flag.

Details:
  - client_req_stream: is_broken(), append() -> cmd_result<ptr<buffer>>,
    fail-fast contract when already broken. Requires async_replication;
    refused otherwise, since pipelining has no throughput benefit in sync
    mode.
  - Local path: zero-cost pass-through to append_entries_ext.
  - Remote path: owns one rpc_client, sends req_msg with
    STREAM_FORWARDING_REQUEST, adapts the handler into a cmd_result.
  - handle_cli_req: reject_cli_req helper for every accepted=false path,
    routed above the store loop; three-phase banner makes the no-gap
    invariant structural rather than comment-only. cb_func::ReturnNull
    semantics are NOT changed — PreAppendLogLeader / AppendLogs retain
    their original "return nullptr cancels the request" contract.
    Combining stream forwarding with hooks that use ReturnNull is
    documented as unsupported (see docs/stream_forwarding.md); note that
    AppendLogs returning nullptr was always latent corruption (batch
    stored + transport error to caller + retry = duplicate commit), but
    fixing that is out of scope for this refactor and deferred to a
    future version-bumped change.
  - rpc_session: STREAM_FORWARDING teardown is decided inside
    on_resp_ready AFTER write_resp_meta_ runs, so a meta-callback that
    mutates accepted cannot desync the teardown decision from the
    serialized response. No regression test added for the meta-callback
    race — covering it needs a real asio harness with a registered meta
    callback, which the fake-network unit-test infrastructure does not
    offer.
  - Drop dead conn_attempts_ / reconnect guard in rpc_client.
  - Long recipe comment moved to docs/stream_forwarding.md.

Verified via ClickHouse build and test_keeper_multinode_simple (5/5 pass).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Member

@al13n321 al13n321 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the api / contract good enough for an "official" nuraft feature? It's stripped down to just what is needed for KeeperRequestDispatcher2, for maximum simplicity.

  • The caller must check is_broken() frequently. I guess most users would want a callback instead (people usually don't have a busy loop re-checking everything every 100us?). But it would be nontrivial to provide a callback because it'd require a new (?) mechanism to notify streams of term change.
  • Tracking in-flight requests is up to the caller. Commit info is not propagated. The user has to have some whole separate mechanism for correlating in-flight requests with committed entries. Which probably has to pretty much look like KeeperRequestDispatcher2. Maybe it would be more appropriate to use streams with async replication turned off, but with pipelined appends similar to #101 , so that replies from leader are actually meaningful? Then streams would be usable without the commit callback dance... ah, no, that wouldn't guarantee that the committed entry was applied on the current follower yet. So maybe that + make the stream wait for commit before reporting success.
  • Errors are not differentiated, the outcome of append is either "ok (... kinda ok, see previous item)" or "stream broke lol". Error responses are not sent from leader.
  • User has to implement retry backoff for when stream breaks.

Feels like too many footguns for a user-facing production feature.

It may make sense for someone to spend the time to implement this feature in a more friendly and robust way (with much more code and hence higher probability of bugs and more testing). I don't want to be that someone :) . Until that happens, maybe it should stay a clickhouse-only thing instead of a very half-baked nuraft thing? Or is it better to add an unfriendly implementation and leave it to someone else to improve it? Idk. At least the doc should be made very clear about the limitations instead of being a marketing copy.

@antonio2368 , plz be the judge :)

@@ -0,0 +1,102 @@
/************************************************************************
Copyright 2024-present ClickHouse, Inc.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Huh, how does copyright work for open source contributions and forks? If we upstream this, will nuraft end up with some kind of mixed copyright? Does the license allow that? What exactly is the purpose of these comments? I wonder what would happen if we just have the comment say that this comment itself comes WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND regarding its accuracy or legal status :P )

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It short: it ok, and may stay like that. There are no issues even if it will be upstreamed :) They can add own copyrights nearby. The licence explicitly allows others to use code written by you

Example of how it may look:
https://github.com/llvm/llvm-project/blob/da6ca203a677f51764cc9dfa1bbc9660a5c199e3/polly/lib/External/isl/isl_aff.c#L2-L9

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND regarding its accuracy or legal status

I'm not sure if legal departments will share that sense of humor :)

Comment thread src/asio_service.cxx
bs.put_i32(resp->get_result_code());
}

// Teardown decision for STREAM_FORWARDING. Computed here, after
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stopping here is too late, we may have already received and processed the next message oh I see, the assumption is that if leader rejects an append it'll reject all subsequent appends with the same term. No transient rejections. A slightly stronger assumption than before: the if ((flags_ & STREAM_FORWARDING) && !resp->get_accepted()) thing above allowed transient synchronous rejections without breaking no-gap.

But late rejection through write_resp_meta_ should be considered incompatible with STREAM_FORWARDING, we may have already received and processed the next message.

I'd remove all this and revert to the if ((flags_ & STREAM_FORWARDING) && !resp->get_accepted()) thing above.

* CANCELLED — async_replication disabled on the cluster config
* NOT_LEADER — no current leader or term==0
* FAILED — leader not in cluster config, rpc_client creation
* failed, or transport does not support pipelining
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should KeeperDispatcher2 do if transport doesn't support pipelining? I think we should:

  • allow creating a stream anyway,
  • make the stream notice that transport doesn't support pipelining, and keep track of whether a request is in flight (just another atomic next to broken_),
  • add method is_ready() to it to indicate whether append can be called right now; if transport supports pipelining, is_ready is always true, otherwise it's true if there's no request in flight,
  • make KeeperDispatcher2's loop check is_ready (and busy-wait for it just like for everything else).

Comment thread src/client_req_stream.cxx

bool client_req_stream::is_broken() const {
if (state_->broken_.load(std::memory_order_acquire)) return true;
auto srv = state_->srv_.lock();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment says this function is just 2 atomic loads, but this adds something like +1 atomic CAS and +1 atomic dec (miiiight be contended). Maybe use a non-weak pointer here? (Maybe add a non-weak pointer outside state_, or maybe use a plain pointer and say that client_req_stream is not allowed to outlive the raft_server.)

Comment thread src/client_req_stream.cxx
* observation, safe from any thread, cheap (two atomic loads).
*/
bool is_broken() const;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need mark_as_broken(). KeeperAppendStream has markAsBroken(), and KeeperRequestDispatcher2 uses it, and it would be mildly inconvenient to add another atomic+check in KeeperRequestDispatcher2 if it weren't available (actually for that particular call site it wouldn't be a big deal to go without it, but still seems more natural to have it).

Copy link
Copy Markdown
Author

@filimonov filimonov Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's odd. If owner of the stream thinks that the stream is bad - it should just destroy it and create a new one, why should it keep communicating with a broken stream, saying 'hey stream, bad news for you - i think you're broken' :)

Comment thread src/client_req_stream.cxx
Comment on lines +140 to +143
// One-shot latch so a partially-queued send can't let both the handler
// and the catch block complete `res`.
auto completed = cs_new<std::atomic<bool>>(false);
auto try_complete =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: send isn't even supposed to throw. Drop this and just set state_->broken_ and rethrow, and let the caller deal with ambiguity about whether the callback will be called or not (clickhouse can probably just std::abort).

Comment on lines +67 to +70
* `accepted=false` with a non-OK `result_code`. Callers should
* distinguish `CANCELLED` (client-side short-circuit when already
* broken, no I/O) from anything else (leader's verbatim code on
* remote rejection, or `FAILED` on transport exception).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should they distinguish it? Neither of the existing callers care.

"leader's verbatim code on remote rejection" - no, leader doesn't send response on error in STREAM_FORWARDING mode. This function doesn't meaningfully differentiate errors, there are just two outcomes: "ok, here's the log_idx from leader (but the entry is not committed, it may still get lost)", or "error of some kind, idk". Just list those outcomes in the comment.

(This and the required polling of is_broken make this whole feature quite user unfriendly. It feels weird to have it presented as a real nuraft feature.)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Most of nuraft code just return nulls + write something to log.

Comment thread src/client_req_stream.cxx
Comment on lines +156 to +174
ptr<std::exception> ret_err =
err ? std::static_pointer_cast<std::exception>(err) : nullptr;
cmd_result_code code = cmd_result_code::OK;

if (err || !resp) {
state->broken_.store(true, std::memory_order_release);
code = cmd_result_code::FAILED;
} else if (!resp->get_accepted()) {
state->broken_.store(true, std::memory_order_release);
code = resp->get_result_code();
// Invariant: accepted=false always carries non-OK
// (reject_cli_req). OK here means protocol violation.
if (code == cmd_result_code::OK) code = cmd_result_code::FAILED;
} else {
res->accept();
ret_buf = resp->get_ctx();
}

try_complete(*completed, ret_buf, ret_err, code);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This amount of code seems to oversell how much meaningful error differentiation is possible here. Just do if (err || !resp || !resp->get_accepted() || resp->get_result_code() != cmd_result_code::OK) { broken_ and FAILED } else { get_ctx() and OK }?

Or at least: in the else, need to propagate resp->get_result_code()? I don't think accepted necessarily means success.

Comment thread src/client_req_stream.cxx
Comment on lines +199 to +202
if (!cfg || !cfg->is_async_replication()) {
p_wn("open_client_req_stream: refused - async_replication is disabled");
return fail(cmd_result_code::CANCELLED);
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow this, it doesn't break anything, we want KeeperRequestDispatcher2 to work in this case.

@filimonov
Copy link
Copy Markdown
Author

filimonov commented Apr 18, 2026

Is the api / contract good enough for an "official" nuraft feature?

I actually was not planning to push it up, make it official etc. (i don't care).

The problem that PR tries to solve: Keeper code get 'polluted' / poisoned with many nuraft internals. That tight coupling may become a big headache in case of eventual upgrades of nuraft or other changes in nuraft code.

All nuraft-low level things should live closer to other nuraft code (in the same submodule at least). When you see nuraft code in context of other nuraft code - you know how it's used, how to review it / make it robust etc. Eventually you can transform it into a real feature. When you see some nuraft internals in ClickHouse repo you can't really review that / follow that.

I'm ok with not 'overselling' it (actually my plan was just to move it 'as is' from keeper code to nuraft, but with AI it went a bit wider :)

@al13n321 al13n321 force-pushed the stream branch 2 times, most recently from e644e20 to 608a804 Compare April 20, 2026 02:43
@al13n321
Copy link
Copy Markdown
Member

Moved a simplified version of this into #98

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants