From 17c2298f9eb4473f25d144dd978cf0004d675d5f Mon Sep 17 00:00:00 2001 From: Lucian Anton Date: Mon, 1 Jun 2026 16:54:23 +0200 Subject: [PATCH] fix found by Claude for IFS crash at the last step --- src/multio/server/Dispatcher.cc | 5 ++ src/multio/server/Dispatcher.h | 4 +- src/multio/server/Listener.cc | 108 +++++++++++++++++---------- src/multio/server/Listener.h | 4 +- src/multio/transport/MpiTransport.cc | 27 ++++++- src/multio/util/FailureHandling.cc | 4 + src/multio/util/FailureHandling.h | 27 ++++++- 7 files changed, 129 insertions(+), 50 deletions(-) diff --git a/src/multio/server/Dispatcher.cc b/src/multio/server/Dispatcher.cc index c04fed05c..68e56b020 100644 --- a/src/multio/server/Dispatcher.cc +++ b/src/multio/server/Dispatcher.cc @@ -30,6 +30,11 @@ Dispatcher::Dispatcher(const config::ComponentConfiguration& compConf, eckit::Qu util::FailureHandlerResponse Dispatcher::handleFailure(util::OnDispatchError t, const util::FailureContext& c, util::DefaultFailureState&) const { + if (t == util::OnDispatchError::Recover) { + // Log and continue: do not interrupt the queue, otherwise the model side + // hangs/aborts during close_connections. + return util::FailureHandlerResponse::Ignore; + } queue_.interrupt(c.eptr); return util::FailureHandlerResponse::Rethrow; }; diff --git a/src/multio/server/Dispatcher.h b/src/multio/server/Dispatcher.h index 148c58cf3..1a9368765 100644 --- a/src/multio/server/Dispatcher.h +++ b/src/multio/server/Dispatcher.h @@ -47,11 +47,11 @@ struct DispatcherFailureTraits { using OnErrorType = util::OnDispatchError; using FailureOptions = util::DefaultFailureOptions; using FailureState = util::DefaultFailureState; - using TagSequence = util::integer_sequence; + using TagSequence = util::integer_sequence; static inline std::optional parse(const std::string& str) { return util::parseErrorTag(str); } - static inline OnErrorType defaultOnErrorTag() { return OnErrorType::Propagate; }; + static inline OnErrorType defaultOnErrorTag() { return OnErrorType::Recover; }; static inline std::string configKey() { return std::string("on-dispatch-error"); }; static inline FailureOptions parseFailureOptions(const eckit::Configuration& conf) { return util::parseDefaultFailureOptions(conf); diff --git a/src/multio/server/Listener.cc b/src/multio/server/Listener.cc index 28167f7bc..4139f262f 100644 --- a/src/multio/server/Listener.cc +++ b/src/multio/server/Listener.cc @@ -112,6 +112,11 @@ Listener::~Listener() = default; util::FailureHandlerResponse Listener::handleFailure(util::OnReceiveError t, const util::FailureContext& c, util::DefaultFailureState&) const { + if (t == util::OnReceiveError::Recover) { + // Log and continue: do not interrupt the queue nor abort other transports, + // otherwise the model side hangs/aborts during close_connections. + return util::FailureHandlerResponse::Ignore; + } msgQueue_.interrupt(c.eptr); transport::TransportRegistry::instance().abortAll(c.eptr); @@ -129,48 +134,69 @@ void Listener::start() { withFailureHandling([&]() { do { - Message msg = transport_.receive(); - - switch (msg.tag()) { - case Message::Tag::Open: - connections_.insert(msg.source()); - ++openedCount_; - LOG_DEBUG_LIB(LibMultio) - << "*** OPENING connection to " << msg.source() << ": client count = " << clientCount_ - << ", opened count = " << openedCount_ << ", active connections = " << connections_.size() - << std::endl; - break; - - case Message::Tag::Close: - connections_.erase(connections_.find(msg.source())); - LOG_DEBUG_LIB(LibMultio) - << "*** CLOSING connection to " << msg.source() << ": client count = " << clientCount_ - << ", opened count = " << openedCount_ << ", active connections = " << connections_.size() - << std::endl; - break; - - case Message::Tag::Synchronization: { - checkConnection(msg.source()); - LOG_DEBUG_LIB(LibMultio) << "*** SYNCHRONIZATION received from " << msg.source() << std::endl; - msgQueue_.emplace(std::move(msg)); - break; + // Wrap each iteration in its own failure-handling scope so that a + // parse/transport error on one message logs and continues with the + // next one, instead of exiting the loop and leaving peer clients + // blocked in MPI_Wait inside closeConnections(). + try { + Message msg = transport_.receive(); + + switch (msg.tag()) { + case Message::Tag::Open: + connections_.insert(msg.source()); + ++openedCount_; + LOG_DEBUG_LIB(LibMultio) + << "*** OPENING connection to " << msg.source() << ": client count = " << clientCount_ + << ", opened count = " << openedCount_ << ", active connections = " << connections_.size() + << std::endl; + break; + + case Message::Tag::Close: + connections_.erase(connections_.find(msg.source())); + LOG_DEBUG_LIB(LibMultio) + << "*** CLOSING connection to " << msg.source() << ": client count = " << clientCount_ + << ", opened count = " << openedCount_ << ", active connections = " << connections_.size() + << std::endl; + break; + + case Message::Tag::Synchronization: { + checkConnection(msg.source()); + LOG_DEBUG_LIB(LibMultio) << "*** SYNCHRONIZATION received from " << msg.source() << std::endl; + msgQueue_.emplace(std::move(msg)); + break; + } + + case Message::Tag::Domain: + case Message::Tag::Parametrization: + case Message::Tag::Mask: + case Message::Tag::Notification: + case Message::Tag::Flush: + case Message::Tag::Field: + checkConnection(msg.source()); + LOG_DEBUG_LIB(LibMultio) << "*** Message received: " << msg << std::endl; + msgQueue_.emplace(std::move(msg)); + break; + + default: + std::ostringstream oss; + oss << "Unhandled message: " << msg << std::endl; + throw eckit::SeriousBug(oss.str()); } - - case Message::Tag::Domain: - case Message::Tag::Parametrization: - case Message::Tag::Mask: - case Message::Tag::Notification: - case Message::Tag::Flush: - case Message::Tag::Field: - checkConnection(msg.source()); - LOG_DEBUG_LIB(LibMultio) << "*** Message received: " << msg << std::endl; - msgQueue_.emplace(std::move(msg)); - break; - - default: - std::ostringstream oss; - oss << "Unhandled message: " << msg << std::endl; - throw eckit::SeriousBug(oss.str()); + } + catch (...) { + if (this->parsedOnErrTag_ != util::OnReceiveError::Recover) { + throw; + } + // Log via the FailureAware machinery (Ignore branch) and + // continue with the next message. Do NOT touch the + // connection counters: that corrupts tracking for unrelated + // peers and causes spurious "Connection ... not open" errors. + // With Recover, the surviving Close messages on the wire are + // what eventually drains openedCount_/connections_ and lets + // the loop exit cleanly. + util::DefaultFailureState st; + (void)handleFailure(util::OnReceiveError::Recover, + util::FailureContext{std::current_exception(), "Listener receive failed"}, st); } } while (moreConnections() && msgQueue_.checkInterrupt()); }); diff --git a/src/multio/server/Listener.h b/src/multio/server/Listener.h index cfc7ac95f..1eedf5d1c 100644 --- a/src/multio/server/Listener.h +++ b/src/multio/server/Listener.h @@ -45,11 +45,11 @@ struct ReceiverFailureTraits { using OnErrorType = util::OnReceiveError; using FailureOptions = util::DefaultFailureOptions; using FailureState = util::DefaultFailureState; - using TagSequence = util::integer_sequence; + using TagSequence = util::integer_sequence; static inline std::optional parse(const std::string& str) { return util::parseErrorTag(str); } - static inline OnErrorType defaultOnErrorTag() { return OnErrorType::Propagate; }; + static inline OnErrorType defaultOnErrorTag() { return OnErrorType::Recover; }; static inline std::string configKey() { return std::string("on-receive-error"); }; static inline FailureOptions parseFailureOptions(const eckit::Configuration& conf) { return util::parseDefaultFailureOptions(conf); diff --git a/src/multio/transport/MpiTransport.cc b/src/multio/transport/MpiTransport.cc index 5803edba2..19ca4aa40 100644 --- a/src/multio/transport/MpiTransport.cc +++ b/src/multio/transport/MpiTransport.cc @@ -295,12 +295,35 @@ Message MpiTransport::receive() { ReceivedBuffer streamArgs; streamQueue_.pop(streamArgs); if (streamArgs.buffer) { + // RAII: ensure the buffer is always released back to the pool, + // even if a decodeMessage call below throws. Otherwise a single + // bad message would permanently leak a buffer slot and eventually + // stall the transport. + struct ReleaseGuard { + decltype(streamArgs.buffer) buf; + ~ReleaseGuard() { + if (buf) { + buf->status.store(BufferStatus::available, std::memory_order_release); + } + } + } releaseGuard{streamArgs.buffer}; + eckit::ResizableMemoryStream strm{streamArgs.buffer->content}; while (strm.position() < streamArgs.size) { util::ScopedTiming decodeTiming{statistics_.decodeTiming_}; - auto msg = decodeMessage(strm); - msgPack_.push(std::move(msg)); + try { + auto msg = decodeMessage(strm); + msgPack_.push(std::move(msg)); + } + catch (...) { + // A corrupt message would leave the stream cursor at an + // unknown position; we cannot safely keep decoding the + // remainder of the buffer. Rethrow so the listener's + // on-error policy (recover / propagate) decides what to do. + throw; + } } + releaseGuard.buf = nullptr; streamArgs.buffer->status.store(BufferStatus::available, std::memory_order_release); } } diff --git a/src/multio/util/FailureHandling.cc b/src/multio/util/FailureHandling.cc index f2e0268aa..041a96ff8 100644 --- a/src/multio/util/FailureHandling.cc +++ b/src/multio/util/FailureHandling.cc @@ -57,6 +57,8 @@ std::string eckit::Translator::operator()(OnReceive switch (tag) { case OnReceiveError::Propagate: return std::string("propagate"); + case OnReceiveError::Recover: + return std::string("recover"); default: throw eckit::SeriousBug("Unknown OnReceiveError tag", Here()); } @@ -66,6 +68,8 @@ std::string eckit::Translator::operator()(OnDispat switch (tag) { case OnDispatchError::Propagate: return std::string("propagate"); + case OnDispatchError::Recover: + return std::string("recover"); default: throw eckit::SeriousBug("Unknown OnDispatchError tag", Here()); } diff --git a/src/multio/util/FailureHandling.h b/src/multio/util/FailureHandling.h index 694f79b67..cc4052454 100644 --- a/src/multio/util/FailureHandling.h +++ b/src/multio/util/FailureHandling.h @@ -73,11 +73,13 @@ enum class OnActionError : unsigned enum class OnReceiveError : unsigned { - Propagate = PROPAGATE_ERROR + Propagate = PROPAGATE_ERROR, + Recover = RECOVER_ERROR }; enum class OnDispatchError : unsigned { - Propagate = PROPAGATE_ERROR + Propagate = PROPAGATE_ERROR, + Recover = RECOVER_ERROR }; @@ -208,7 +210,7 @@ void printFailureContext(std::ostream& out, const FailureContext& c); template class FailureAware { -private: +protected: using OnErrorType = typename ComponentFailureTraits::OnErrorType; using FailureOptions = typename ComponentFailureTraits::FailureOptions; using FailureState = typename ComponentFailureTraits::FailureState; @@ -219,6 +221,10 @@ class FailureAware { public: FailureAware(const config::ComponentConfiguration& compConf) : peerTag_{compConf.multioConfig().localPeerTag()} { + // Seed with the trait-supplied default so that, when the YAML key is + // absent, parsedOnErrTag_ does not silently fall back to the + // value-initialised enum (= Propagate = 0) and ignore the trait. + parsedOnErrTag_ = ComponentFailureTraits::defaultOnErrorTag(); if (compConf.parsedConfig().has(ComponentFailureTraits::configKey())) { auto unparsedOnErrTagMaybe = ([&]() { try { @@ -299,11 +305,26 @@ class FailureAware { callable(); } catch (...) { + // Recover the inner exception's what() so that the log shows + // the real cause instead of the default "unknown" context + // produced by the no-arg withFailureHandling overload. + std::string innerWhat; + try { + std::rethrow_exception(std::current_exception()); + } + catch (const std::exception& _ie) { + innerWhat = _ie.what(); + } + catch (...) { + innerWhat = ""; + } + std::ostringstream oss; oss << "FailureAware<" << ComponentFailureTraits::componentName() << "> with behaviour \"" << eckit::translate(parsedOnErrTag_) << "\" on " << eckit::translate(peerTag_) << " for context: [" << std::endl << contextString() << std::endl + << "Inner exception: " << innerWhat << std::endl << "]"; FailureContext fctx{std::current_exception(), oss.str()};