Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 83 additions & 4 deletions libs/server-sdk/src/data_systems/fdv2/conditions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <launchdarkly/async/timer.hpp>

#include <algorithm>
#include <utility>
#include <variant>

Expand Down Expand Up @@ -133,15 +134,81 @@ async::Future<IFDv2Condition::Type> MakeAggregateFuture(
} // namespace

Conditions::Conditions(std::vector<std::unique_ptr<IFDv2Condition>> conditions)
: conditions_(std::move(conditions)),
future_(MakeAggregateFuture(conditions_)) {}
: conditions_(std::move(conditions)), state_(std::make_shared<State>()) {
MakeAggregateFuture(conditions_)
.Then(
[weak_state = std::weak_ptr<State>(state_)](
IFDv2Condition::Type const& type) -> std::monostate {
auto state = weak_state.lock();
if (!state) {
return {};
}
std::vector<PendingEntry> drained;
{
std::lock_guard lock(state->mutex);
state->aggregate_result = type;
drained = std::move(state->pending);
}
for (auto& entry : drained) {
entry.promise.Resolve(type);
}
return {};
},
async::kInlineExecutor);
}

Conditions::~Conditions() {
Close();
}

async::Future<IFDv2Condition::Type> Conditions::GetFuture() const {
return future_;
async::Future<IFDv2Condition::Type> Conditions::GetFuture(
async::CancellationToken token) {
async::Promise<IFDv2Condition::Type> promise;
auto future = promise.GetFuture();
std::int64_t id;

{
std::lock_guard lock(state_->mutex);
if (state_->aggregate_result) {
promise.Resolve(*state_->aggregate_result);
return future;
}
id = state_->next_id++;
state_->pending.push_back({id, std::move(promise), nullptr});
}

// Construct cb outside the lock: if the token is already cancelled, the
// callback fires synchronously inside the ctor and needs to acquire
// state_->mutex.
auto cb = std::make_unique<async::CancellationCallback>(
token, [weak_state = std::weak_ptr<State>(state_), id]() {
auto state = weak_state.lock();
if (!state) {
return;
}
std::lock_guard lock(state->mutex);
state->pending.erase(
std::remove_if(
state->pending.begin(), state->pending.end(),
[id](PendingEntry const& e) { return e.id == id; }),
state->pending.end());
});

// Re-find the entry under the lock and attach cb. Between the push above
// and here, the aggregate may have fired and drained the vector, or cb's
// callback may have fired synchronously during construction and erased
// the entry. If gone, drop cb.
{
std::lock_guard lock(state_->mutex);
auto it =
std::find_if(state_->pending.begin(), state_->pending.end(),
[id](PendingEntry const& e) { return e.id == id; });
if (it != state_->pending.end()) {
it->cancel_cb = std::move(cb);
}
}

return future;
}

void Conditions::Inform(FDv2SourceResult const& result) {
Expand All @@ -154,6 +221,18 @@ void Conditions::Close() {
for (auto const& condition : conditions_) {
condition->Close();
}
// Resolve any promises still pending.
std::vector<PendingEntry> drained;
{
std::lock_guard lock(state_->mutex);
if (!state_->aggregate_result) {
state_->aggregate_result = IFDv2Condition::Type::kCancelled;
}
drained = std::move(state_->pending);
}
for (auto& entry : drained) {
entry.promise.Resolve(IFDv2Condition::Type::kCancelled);
}
}

} // namespace launchdarkly::server_side::data_systems
27 changes: 24 additions & 3 deletions libs/server-sdk/src/data_systems/fdv2/conditions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
#include <boost/asio/any_io_executor.hpp>

#include <chrono>
#include <cstdint>
#include <memory>
#include <mutex>
#include <optional>
#include <vector>

namespace launchdarkly::server_side::data_systems {

Expand Down Expand Up @@ -136,7 +138,7 @@ class RecoveryConditionFactory final
* Aggregates a set of conditions into a single Future that resolves with the
* type of the first condition to fire. Inform() and Close() forward to every
* underlying condition. If constructed with no conditions, GetFuture()
* returns a Future that never resolves.
* returns a Future that never resolves (until Close).
*
* Thread-safe: GetFuture, Inform, and Close may be called from any thread.
*/
Expand All @@ -153,16 +155,35 @@ class Conditions final {
Conditions& operator=(Conditions const&) = delete;
Conditions& operator=(Conditions&&) = delete;

/**
* Returns a fresh Future that resolves with the type of the first
* condition to fire. The caller must cancel the source corresponding to
* `token` once the result is no longer needed, so that the per-call
* Promise (and its registered continuations) can be released.
*/
[[nodiscard]] async::Future<data_interfaces::IFDv2Condition::Type>
GetFuture() const;
GetFuture(async::CancellationToken token);

void Inform(data_interfaces::FDv2SourceResult const& result);

void Close();

private:
struct PendingEntry {
std::int64_t id;
async::Promise<data_interfaces::IFDv2Condition::Type> promise;
std::unique_ptr<async::CancellationCallback> cancel_cb;
};

struct State {
std::mutex mutex;
std::int64_t next_id = 0;
std::vector<PendingEntry> pending;
std::optional<data_interfaces::IFDv2Condition::Type> aggregate_result;
};

std::vector<std::unique_ptr<data_interfaces::IFDv2Condition>> conditions_;
async::Future<data_interfaces::IFDv2Condition::Type> future_;
std::shared_ptr<State> const state_;
};

} // namespace launchdarkly::server_side::data_systems
8 changes: 5 additions & 3 deletions libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,13 @@ void FDv2DataSystem::RunSynchronizerNext() {
return;
}
auto next_future = active_synchronizer_->Next(selector_);
auto cond_future = active_conditions_->GetFuture();
auto cond_cancel = std::make_shared<async::CancellationSource>();
auto cond_future = active_conditions_->GetFuture(cond_cancel->GetToken());
async::WhenAny(cond_future, next_future)
.Then(
[this, cond_future,
next_future](std::size_t const& idx) -> std::monostate {
[this, cond_future, next_future,
cond_cancel](std::size_t const& idx) -> std::monostate {
cond_cancel->Cancel();
if (idx == 0) {
OnConditionFired(*cond_future.GetResult());
} else {
Expand Down
11 changes: 7 additions & 4 deletions libs/server-sdk/tests/conditions_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ using namespace launchdarkly::server_side::data_interfaces;
using namespace launchdarkly::server_side::data_systems;
using namespace std::chrono_literals;

using launchdarkly::async::CancellationToken;

namespace {

// Holds an io_context running on a worker thread; the executor produced by
Expand Down Expand Up @@ -162,7 +164,7 @@ TEST(RecoveryConditionTest, CloseCancelsActiveTimerAndResolvesWithCancelled) {
TEST(ConditionsTest, EmptyAggregateNeverResolves) {
Conditions conditions({});

auto result = conditions.GetFuture().WaitForResult(50ms);
auto result = conditions.GetFuture(CancellationToken{}).WaitForResult(50ms);

EXPECT_FALSE(result.has_value());
}
Expand All @@ -178,7 +180,7 @@ TEST(ConditionsTest, AggregateResolvesWithTypeOfFirstFiringCondition) {
/*timeout=*/100ms));
Conditions conditions(std::move(conds));

auto result = conditions.GetFuture().WaitForResult(1s);
auto result = conditions.GetFuture(CancellationToken{}).WaitForResult(1s);

ASSERT_TRUE(result.has_value());
EXPECT_EQ(IFDv2Condition::Type::kRecovery, *result);
Expand All @@ -202,7 +204,7 @@ TEST(ConditionsTest, InformForwardsToAllUnderlyingConditions) {
/*status_code=*/0, "boom", std::chrono::system_clock::now()},
}});

auto result = conditions.GetFuture().WaitForResult(1s);
auto result = conditions.GetFuture(CancellationToken{}).WaitForResult(1s);

ASSERT_TRUE(result.has_value());
EXPECT_EQ(IFDv2Condition::Type::kFallback, *result);
Expand All @@ -220,7 +222,8 @@ TEST(ConditionsTest, CloseForwardsToAllUnderlyingConditions) {

conditions.Close();

auto result = conditions.GetFuture().WaitForResult(200ms);
auto result =
conditions.GetFuture(CancellationToken{}).WaitForResult(200ms);
ASSERT_TRUE(result.has_value());
EXPECT_EQ(IFDv2Condition::Type::kCancelled, *result);
}
Loading