From 2b2e9da2c1a3ba3166adb4f64eb0d7fd82e04d37 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Wed, 17 Jun 2026 13:26:22 -0700 Subject: [PATCH 1/3] fix: stop accumulating WhenAny continuations on the Conditions aggregate --- .../src/data_systems/fdv2/conditions.cpp | 67 +++++++++++++++++-- .../src/data_systems/fdv2/conditions.hpp | 27 +++++++- .../data_systems/fdv2/fdv2_data_system.cpp | 8 ++- libs/server-sdk/tests/conditions_test.cpp | 11 +-- 4 files changed, 99 insertions(+), 14 deletions(-) diff --git a/libs/server-sdk/src/data_systems/fdv2/conditions.cpp b/libs/server-sdk/src/data_systems/fdv2/conditions.cpp index 2d013c497..73a5ded2a 100644 --- a/libs/server-sdk/src/data_systems/fdv2/conditions.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/conditions.cpp @@ -2,6 +2,7 @@ #include +#include #include #include @@ -133,15 +134,73 @@ async::Future MakeAggregateFuture( } // namespace Conditions::Conditions(std::vector> conditions) - : conditions_(std::move(conditions)), - future_(MakeAggregateFuture(conditions_)) {} + : conditions_(std::move(conditions)), state_(std::make_shared()) { + MakeAggregateFuture(conditions_) + .Then( + [state = + state_](IFDv2Condition::Type const& type) -> std::monostate { + std::vector drained; + { + std::lock_guard lock(state->mutex); + state->aggregate_result = type; + drained = std::move(state->pending); + } + for (auto& entry : drained) { + entry.promise.Resolve(type); + } + return {}; + }, + async::kInlineExecutor); +} Conditions::~Conditions() { Close(); } -async::Future Conditions::GetFuture() const { - return future_; +async::Future Conditions::GetFuture( + async::CancellationToken token) { + async::Promise promise; + auto future = promise.GetFuture(); + std::int64_t id; + + { + std::lock_guard lock(state_->mutex); + if (state_->aggregate_result) { + promise.Resolve(*state_->aggregate_result); + return future; + } + id = state_->next_id++; + state_->pending.push_back({id, std::move(promise), nullptr}); + } + + // Construct cb outside the lock: if the token is already cancelled, the + // callback fires synchronously inside the ctor and needs to acquire + // state_->mutex. + auto cb = std::make_unique( + token, [state = state_, id]() { + std::lock_guard lock(state->mutex); + state->pending.erase( + std::remove_if( + state->pending.begin(), state->pending.end(), + [id](PendingEntry const& e) { return e.id == id; }), + state->pending.end()); + }); + + // Re-find the entry under the lock and attach cb. Between the push above + // and here, the aggregate may have fired and drained the vector, or cb's + // callback may have fired synchronously during construction and erased + // the entry. If gone, drop cb. + { + std::lock_guard lock(state_->mutex); + auto it = + std::find_if(state_->pending.begin(), state_->pending.end(), + [id](PendingEntry const& e) { return e.id == id; }); + if (it != state_->pending.end()) { + it->cancel_cb = std::move(cb); + } + } + + return future; } void Conditions::Inform(FDv2SourceResult const& result) { diff --git a/libs/server-sdk/src/data_systems/fdv2/conditions.hpp b/libs/server-sdk/src/data_systems/fdv2/conditions.hpp index f230e9a07..70a3518f7 100644 --- a/libs/server-sdk/src/data_systems/fdv2/conditions.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/conditions.hpp @@ -8,9 +8,11 @@ #include #include +#include #include #include #include +#include namespace launchdarkly::server_side::data_systems { @@ -136,7 +138,7 @@ class RecoveryConditionFactory final * Aggregates a set of conditions into a single Future that resolves with the * type of the first condition to fire. Inform() and Close() forward to every * underlying condition. If constructed with no conditions, GetFuture() - * returns a Future that never resolves. + * returns a Future that never resolves (until Close). * * Thread-safe: GetFuture, Inform, and Close may be called from any thread. */ @@ -153,16 +155,35 @@ class Conditions final { Conditions& operator=(Conditions const&) = delete; Conditions& operator=(Conditions&&) = delete; + /** + * Returns a fresh Future that resolves with the type of the first + * condition to fire. The caller must cancel the source corresponding to + * `token` once the result is no longer needed, so that the per-call + * Promise (and its registered continuations) can be released. + */ [[nodiscard]] async::Future - GetFuture() const; + GetFuture(async::CancellationToken token); void Inform(data_interfaces::FDv2SourceResult const& result); void Close(); private: + struct PendingEntry { + std::int64_t id; + async::Promise promise; + std::unique_ptr cancel_cb; + }; + + struct State { + std::mutex mutex; + std::int64_t next_id = 0; + std::vector pending; + std::optional aggregate_result; + }; + std::vector> conditions_; - async::Future future_; + std::shared_ptr const state_; }; } // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp index fcb980c3c..354fb8850 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp @@ -274,11 +274,13 @@ void FDv2DataSystem::RunSynchronizerNext() { return; } auto next_future = active_synchronizer_->Next(selector_); - auto cond_future = active_conditions_->GetFuture(); + auto cond_cancel = std::make_shared(); + auto cond_future = active_conditions_->GetFuture(cond_cancel->GetToken()); async::WhenAny(cond_future, next_future) .Then( - [this, cond_future, - next_future](std::size_t const& idx) -> std::monostate { + [this, cond_future, next_future, + cond_cancel](std::size_t const& idx) -> std::monostate { + cond_cancel->Cancel(); if (idx == 0) { OnConditionFired(*cond_future.GetResult()); } else { diff --git a/libs/server-sdk/tests/conditions_test.cpp b/libs/server-sdk/tests/conditions_test.cpp index 4a9976055..487833cb7 100644 --- a/libs/server-sdk/tests/conditions_test.cpp +++ b/libs/server-sdk/tests/conditions_test.cpp @@ -12,6 +12,8 @@ using namespace launchdarkly::server_side::data_interfaces; using namespace launchdarkly::server_side::data_systems; using namespace std::chrono_literals; +using launchdarkly::async::CancellationToken; + namespace { // Holds an io_context running on a worker thread; the executor produced by @@ -162,7 +164,7 @@ TEST(RecoveryConditionTest, CloseCancelsActiveTimerAndResolvesWithCancelled) { TEST(ConditionsTest, EmptyAggregateNeverResolves) { Conditions conditions({}); - auto result = conditions.GetFuture().WaitForResult(50ms); + auto result = conditions.GetFuture(CancellationToken{}).WaitForResult(50ms); EXPECT_FALSE(result.has_value()); } @@ -178,7 +180,7 @@ TEST(ConditionsTest, AggregateResolvesWithTypeOfFirstFiringCondition) { /*timeout=*/100ms)); Conditions conditions(std::move(conds)); - auto result = conditions.GetFuture().WaitForResult(1s); + auto result = conditions.GetFuture(CancellationToken{}).WaitForResult(1s); ASSERT_TRUE(result.has_value()); EXPECT_EQ(IFDv2Condition::Type::kRecovery, *result); @@ -202,7 +204,7 @@ TEST(ConditionsTest, InformForwardsToAllUnderlyingConditions) { /*status_code=*/0, "boom", std::chrono::system_clock::now()}, }}); - auto result = conditions.GetFuture().WaitForResult(1s); + auto result = conditions.GetFuture(CancellationToken{}).WaitForResult(1s); ASSERT_TRUE(result.has_value()); EXPECT_EQ(IFDv2Condition::Type::kFallback, *result); @@ -220,7 +222,8 @@ TEST(ConditionsTest, CloseForwardsToAllUnderlyingConditions) { conditions.Close(); - auto result = conditions.GetFuture().WaitForResult(200ms); + auto result = + conditions.GetFuture(CancellationToken{}).WaitForResult(200ms); ASSERT_TRUE(result.has_value()); EXPECT_EQ(IFDv2Condition::Type::kCancelled, *result); } From 34b3a6ce91452d30f1b85fbe14dd805ed83e7dc4 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Wed, 17 Jun 2026 15:23:27 -0700 Subject: [PATCH 2/3] fix: resolve pending Conditions promises on Close --- libs/server-sdk/src/data_systems/fdv2/conditions.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/libs/server-sdk/src/data_systems/fdv2/conditions.cpp b/libs/server-sdk/src/data_systems/fdv2/conditions.cpp index 73a5ded2a..1d6747561 100644 --- a/libs/server-sdk/src/data_systems/fdv2/conditions.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/conditions.cpp @@ -213,6 +213,18 @@ void Conditions::Close() { for (auto const& condition : conditions_) { condition->Close(); } + // Resolve any promises still pending. + std::vector drained; + { + std::lock_guard lock(state_->mutex); + if (!state_->aggregate_result) { + state_->aggregate_result = IFDv2Condition::Type::kCancelled; + } + drained = std::move(state_->pending); + } + for (auto& entry : drained) { + entry.promise.Resolve(IFDv2Condition::Type::kCancelled); + } } } // namespace launchdarkly::server_side::data_systems From 404e9a1fe2707826f7a0f2ad6688beb94a755e2a Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Wed, 17 Jun 2026 16:32:59 -0700 Subject: [PATCH 3/3] refactor: capture weak_ptr to State in Conditions callbacks --- .../src/data_systems/fdv2/conditions.cpp | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/libs/server-sdk/src/data_systems/fdv2/conditions.cpp b/libs/server-sdk/src/data_systems/fdv2/conditions.cpp index 1d6747561..01350eb93 100644 --- a/libs/server-sdk/src/data_systems/fdv2/conditions.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/conditions.cpp @@ -137,8 +137,12 @@ Conditions::Conditions(std::vector> conditions) : conditions_(std::move(conditions)), state_(std::make_shared()) { MakeAggregateFuture(conditions_) .Then( - [state = - state_](IFDv2Condition::Type const& type) -> std::monostate { + [weak_state = std::weak_ptr(state_)]( + IFDv2Condition::Type const& type) -> std::monostate { + auto state = weak_state.lock(); + if (!state) { + return {}; + } std::vector drained; { std::lock_guard lock(state->mutex); @@ -177,7 +181,11 @@ async::Future Conditions::GetFuture( // callback fires synchronously inside the ctor and needs to acquire // state_->mutex. auto cb = std::make_unique( - token, [state = state_, id]() { + token, [weak_state = std::weak_ptr(state_), id]() { + auto state = weak_state.lock(); + if (!state) { + return; + } std::lock_guard lock(state->mutex); state->pending.erase( std::remove_if(