Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/multio/server/Dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ Dispatcher::Dispatcher(const config::ComponentConfiguration& compConf, eckit::Qu

util::FailureHandlerResponse Dispatcher::handleFailure(util::OnDispatchError t, const util::FailureContext& c,
util::DefaultFailureState&) const {
if (t == util::OnDispatchError::Recover) {
// Log and continue: do not interrupt the queue, otherwise the model side
// hangs/aborts during close_connections.
return util::FailureHandlerResponse::Ignore;
}
queue_.interrupt(c.eptr);
return util::FailureHandlerResponse::Rethrow;
};
Expand Down
4 changes: 2 additions & 2 deletions src/multio/server/Dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ struct DispatcherFailureTraits {
using OnErrorType = util::OnDispatchError;
using FailureOptions = util::DefaultFailureOptions;
using FailureState = util::DefaultFailureState;
using TagSequence = util::integer_sequence<OnErrorType, OnErrorType::Propagate>;
using TagSequence = util::integer_sequence<OnErrorType, OnErrorType::Propagate, OnErrorType::Recover>;
static inline std::optional<OnErrorType> parse(const std::string& str) {
return util::parseErrorTag<OnErrorType, TagSequence>(str);
}
static inline OnErrorType defaultOnErrorTag() { return OnErrorType::Propagate; };
static inline OnErrorType defaultOnErrorTag() { return OnErrorType::Recover; };
static inline std::string configKey() { return std::string("on-dispatch-error"); };
static inline FailureOptions parseFailureOptions(const eckit::Configuration& conf) {
return util::parseDefaultFailureOptions(conf);
Expand Down
108 changes: 67 additions & 41 deletions src/multio/server/Listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ Listener::~Listener() = default;

util::FailureHandlerResponse Listener::handleFailure(util::OnReceiveError t, const util::FailureContext& c,
util::DefaultFailureState&) const {
if (t == util::OnReceiveError::Recover) {
// Log and continue: do not interrupt the queue nor abort other transports,
// otherwise the model side hangs/aborts during close_connections.
return util::FailureHandlerResponse::Ignore;
}
msgQueue_.interrupt(c.eptr);
transport::TransportRegistry::instance().abortAll(c.eptr);

Expand All @@ -129,48 +134,69 @@ void Listener::start() {

withFailureHandling([&]() {
do {
Message msg = transport_.receive();

switch (msg.tag()) {
case Message::Tag::Open:
connections_.insert(msg.source());
++openedCount_;
LOG_DEBUG_LIB(LibMultio)
<< "*** OPENING connection to " << msg.source() << ": client count = " << clientCount_
<< ", opened count = " << openedCount_ << ", active connections = " << connections_.size()
<< std::endl;
break;

case Message::Tag::Close:
connections_.erase(connections_.find(msg.source()));
LOG_DEBUG_LIB(LibMultio)
<< "*** CLOSING connection to " << msg.source() << ": client count = " << clientCount_
<< ", opened count = " << openedCount_ << ", active connections = " << connections_.size()
<< std::endl;
break;

case Message::Tag::Synchronization: {
checkConnection(msg.source());
LOG_DEBUG_LIB(LibMultio) << "*** SYNCHRONIZATION received from " << msg.source() << std::endl;
msgQueue_.emplace(std::move(msg));
break;
// Wrap each iteration in its own failure-handling scope so that a
// parse/transport error on one message logs and continues with the
// next one, instead of exiting the loop and leaving peer clients
// blocked in MPI_Wait inside closeConnections().
try {
Message msg = transport_.receive();

switch (msg.tag()) {
case Message::Tag::Open:
connections_.insert(msg.source());
++openedCount_;
LOG_DEBUG_LIB(LibMultio)
<< "*** OPENING connection to " << msg.source() << ": client count = " << clientCount_
<< ", opened count = " << openedCount_ << ", active connections = " << connections_.size()
<< std::endl;
break;

case Message::Tag::Close:
connections_.erase(connections_.find(msg.source()));
LOG_DEBUG_LIB(LibMultio)
<< "*** CLOSING connection to " << msg.source() << ": client count = " << clientCount_
<< ", opened count = " << openedCount_ << ", active connections = " << connections_.size()
<< std::endl;
break;

case Message::Tag::Synchronization: {
checkConnection(msg.source());
LOG_DEBUG_LIB(LibMultio) << "*** SYNCHRONIZATION received from " << msg.source() << std::endl;
msgQueue_.emplace(std::move(msg));
break;
}

case Message::Tag::Domain:
case Message::Tag::Parametrization:
case Message::Tag::Mask:
case Message::Tag::Notification:
case Message::Tag::Flush:
case Message::Tag::Field:
checkConnection(msg.source());
LOG_DEBUG_LIB(LibMultio) << "*** Message received: " << msg << std::endl;
msgQueue_.emplace(std::move(msg));
break;

default:
std::ostringstream oss;
oss << "Unhandled message: " << msg << std::endl;
throw eckit::SeriousBug(oss.str());
}

case Message::Tag::Domain:
case Message::Tag::Parametrization:
case Message::Tag::Mask:
case Message::Tag::Notification:
case Message::Tag::Flush:
case Message::Tag::Field:
checkConnection(msg.source());
LOG_DEBUG_LIB(LibMultio) << "*** Message received: " << msg << std::endl;
msgQueue_.emplace(std::move(msg));
break;

default:
std::ostringstream oss;
oss << "Unhandled message: " << msg << std::endl;
throw eckit::SeriousBug(oss.str());
}
catch (...) {
if (this->parsedOnErrTag_ != util::OnReceiveError::Recover) {
throw;
}
// Log via the FailureAware machinery (Ignore branch) and
// continue with the next message. Do NOT touch the
// connection counters: that corrupts tracking for unrelated
// peers and causes spurious "Connection ... not open" errors.
// With Recover, the surviving Close messages on the wire are
// what eventually drains openedCount_/connections_ and lets
// the loop exit cleanly.
util::DefaultFailureState st;
(void)handleFailure(util::OnReceiveError::Recover,
util::FailureContext{std::current_exception(), "Listener receive failed"}, st);
}
} while (moreConnections() && msgQueue_.checkInterrupt());
});
Expand Down
4 changes: 2 additions & 2 deletions src/multio/server/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ struct ReceiverFailureTraits {
using OnErrorType = util::OnReceiveError;
using FailureOptions = util::DefaultFailureOptions;
using FailureState = util::DefaultFailureState;
using TagSequence = util::integer_sequence<OnErrorType, OnErrorType::Propagate>;
using TagSequence = util::integer_sequence<OnErrorType, OnErrorType::Propagate, OnErrorType::Recover>;
static inline std::optional<OnErrorType> parse(const std::string& str) {
return util::parseErrorTag<OnErrorType, TagSequence>(str);
}
static inline OnErrorType defaultOnErrorTag() { return OnErrorType::Propagate; };
static inline OnErrorType defaultOnErrorTag() { return OnErrorType::Recover; };
static inline std::string configKey() { return std::string("on-receive-error"); };
static inline FailureOptions parseFailureOptions(const eckit::Configuration& conf) {
return util::parseDefaultFailureOptions(conf);
Expand Down
27 changes: 25 additions & 2 deletions src/multio/transport/MpiTransport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,35 @@ Message MpiTransport::receive() {
ReceivedBuffer streamArgs;
streamQueue_.pop(streamArgs);
if (streamArgs.buffer) {
// RAII: ensure the buffer is always released back to the pool,
// even if a decodeMessage call below throws. Otherwise a single
// bad message would permanently leak a buffer slot and eventually
// stall the transport.
struct ReleaseGuard {
decltype(streamArgs.buffer) buf;
~ReleaseGuard() {
if (buf) {
buf->status.store(BufferStatus::available, std::memory_order_release);
}
}
} releaseGuard{streamArgs.buffer};

eckit::ResizableMemoryStream strm{streamArgs.buffer->content};
while (strm.position() < streamArgs.size) {
util::ScopedTiming decodeTiming{statistics_.decodeTiming_};
auto msg = decodeMessage(strm);
msgPack_.push(std::move(msg));
try {
auto msg = decodeMessage(strm);
msgPack_.push(std::move(msg));
}
catch (...) {
// A corrupt message would leave the stream cursor at an
// unknown position; we cannot safely keep decoding the
// remainder of the buffer. Rethrow so the listener's
// on-error policy (recover / propagate) decides what to do.
throw;
}
}
releaseGuard.buf = nullptr;
streamArgs.buffer->status.store(BufferStatus::available, std::memory_order_release);
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/multio/util/FailureHandling.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ std::string eckit::Translator<OnReceiveError, std::string>::operator()(OnReceive
switch (tag) {
case OnReceiveError::Propagate:
return std::string("propagate");
case OnReceiveError::Recover:
return std::string("recover");
default:
throw eckit::SeriousBug("Unknown OnReceiveError tag", Here());
}
Expand All @@ -66,6 +68,8 @@ std::string eckit::Translator<OnDispatchError, std::string>::operator()(OnDispat
switch (tag) {
case OnDispatchError::Propagate:
return std::string("propagate");
case OnDispatchError::Recover:
return std::string("recover");
default:
throw eckit::SeriousBug("Unknown OnDispatchError tag", Here());
}
Expand Down
27 changes: 24 additions & 3 deletions src/multio/util/FailureHandling.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ enum class OnActionError : unsigned

enum class OnReceiveError : unsigned
{
Propagate = PROPAGATE_ERROR
Propagate = PROPAGATE_ERROR,
Recover = RECOVER_ERROR
};
enum class OnDispatchError : unsigned
{
Propagate = PROPAGATE_ERROR
Propagate = PROPAGATE_ERROR,
Recover = RECOVER_ERROR
};


Expand Down Expand Up @@ -208,7 +210,7 @@ void printFailureContext(std::ostream& out, const FailureContext& c);

template <typename ComponentFailureTraits>
class FailureAware {
private:
protected:
using OnErrorType = typename ComponentFailureTraits::OnErrorType;
using FailureOptions = typename ComponentFailureTraits::FailureOptions;
using FailureState = typename ComponentFailureTraits::FailureState;
Expand All @@ -219,6 +221,10 @@ class FailureAware {

public:
FailureAware(const config::ComponentConfiguration& compConf) : peerTag_{compConf.multioConfig().localPeerTag()} {
// Seed with the trait-supplied default so that, when the YAML key is
// absent, parsedOnErrTag_ does not silently fall back to the
// value-initialised enum (= Propagate = 0) and ignore the trait.
parsedOnErrTag_ = ComponentFailureTraits::defaultOnErrorTag();
if (compConf.parsedConfig().has(ComponentFailureTraits::configKey())) {
auto unparsedOnErrTagMaybe = ([&]() {
try {
Expand Down Expand Up @@ -299,11 +305,26 @@ class FailureAware {
callable();
}
catch (...) {
// Recover the inner exception's what() so that the log shows
// the real cause instead of the default "unknown" context
// produced by the no-arg withFailureHandling overload.
std::string innerWhat;
try {
std::rethrow_exception(std::current_exception());
}
catch (const std::exception& _ie) {
innerWhat = _ie.what();
}
catch (...) {
innerWhat = "<non-std::exception thrown>";
}

std::ostringstream oss;
oss << "FailureAware<" << ComponentFailureTraits::componentName() << "> with behaviour \""
<< eckit::translate<std::string>(parsedOnErrTag_) << "\" on "
<< eckit::translate<std::string>(peerTag_) << " for context: [" << std::endl
<< contextString() << std::endl
<< "Inner exception: " << innerWhat << std::endl
<< "]";

FailureContext fctx{std::current_exception(), oss.str()};
Expand Down
Loading