From 24f46c644bafb24bd7ef879e3200fe6612d68219 Mon Sep 17 00:00:00 2001 From: kdrienCG Date: Fri, 12 Jun 2026 10:02:25 +0200 Subject: [PATCH 01/13] add build option to enable MPI desync detection --- src/cmake/GeosxConfig.cmake | 1 + src/cmake/GeosxOptions.cmake | 2 ++ src/coreComponents/common/GeosxConfig.hpp.in | 3 +++ src/docs/doxygen/GeosxConfig.hpp | 3 +++ 4 files changed, 9 insertions(+) diff --git a/src/cmake/GeosxConfig.cmake b/src/cmake/GeosxConfig.cmake index 6fd1b36b3a5..d4d2ebbc84f 100644 --- a/src/cmake/GeosxConfig.cmake +++ b/src/cmake/GeosxConfig.cmake @@ -14,6 +14,7 @@ set( PREPROCESSOR_DEFINES BOUNDS_CHECK METIS MKL MPI + MPI_DESYNC_DETECTION PARMETIS PETSC PYGEOSX diff --git a/src/cmake/GeosxOptions.cmake b/src/cmake/GeosxOptions.cmake index cd076e6fad5..25274b14444 100644 --- a/src/cmake/GeosxOptions.cmake +++ b/src/cmake/GeosxOptions.cmake @@ -92,6 +92,8 @@ endif() option( ENABLE_MPI "" ON ) +option( ENABLE_MPI_DESYNC_DETECTION "" OFF ) + option( ENABLE_CUDA "" OFF ) option( ENABLE_HIP "" OFF ) diff --git a/src/coreComponents/common/GeosxConfig.hpp.in b/src/coreComponents/common/GeosxConfig.hpp.in index 8bfab94de92..5e38ad9fab3 100644 --- a/src/coreComponents/common/GeosxConfig.hpp.in +++ b/src/coreComponents/common/GeosxConfig.hpp.in @@ -44,6 +44,9 @@ /// Enables use of MPI (CMake option ENABLE_MPI) #cmakedefine GEOS_USE_MPI +/// Enables use of MPI (CMake option ENABLE_MPI_DESYNC_DETECTION) +#cmakedefine GEOS_USE_MPI_DESYNC_DETECTION + /// Enables use of OpenMP (CMake option ENABLE_OPENMP) #cmakedefine GEOS_USE_OPENMP diff --git a/src/docs/doxygen/GeosxConfig.hpp b/src/docs/doxygen/GeosxConfig.hpp index 424b6923190..0ad6cd12f85 100644 --- a/src/docs/doxygen/GeosxConfig.hpp +++ b/src/docs/doxygen/GeosxConfig.hpp @@ -44,6 +44,9 @@ /// Enables use of MPI (CMake option ENABLE_MPI) #define GEOS_USE_MPI +/// Enables detection of MPI desynchronization (CMake option ENABLE_MPI_DESYNC_DETECTION) +#define GEOS_USE_MPI_DESYNC_DETECTION + /// Enables use of OpenMP (CMake option ENABLE_OPENMP) #define GEOS_USE_OPENMP From 61701a3829c5fcd16819bc4679235f2a9d3afe05 Mon Sep 17 00:00:00 2001 From: kdrienCG Date: Fri, 12 Jun 2026 10:04:21 +0200 Subject: [PATCH 02/13] add desync dectection for barrier() --- src/coreComponents/common/MpiWrapper.cpp | 24 ++++++++++++++++++++++++ src/coreComponents/common/MpiWrapper.hpp | 6 ++++++ 2 files changed, 30 insertions(+) diff --git a/src/coreComponents/common/MpiWrapper.cpp b/src/coreComponents/common/MpiWrapper.cpp index 78c9911d0a5..3e271ab4154 100644 --- a/src/coreComponents/common/MpiWrapper.cpp +++ b/src/coreComponents/common/MpiWrapper.cpp @@ -38,10 +38,34 @@ MPI_Comm MPI_COMM_GEOS; int MPI_COMM_GEOS = 0; #endif +void detectMPIDesync( MPI_Comm const & MPI_PARAM( comm ), int id ) +{ +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + int min_id = MpiWrapper::min( id ); + int max_id = MpiWrapper::max( id ); + + std::cout << "(detectMPIDesync) my_id = " << id << '\n'; + std::cout << "min_id = " << min_id << '\n'; + std::cout << "max_id = " << max_id << std::endl; + + if( min_id != max_id ) + { + std::cerr << "MPI desync detected" << std::endl; + MPI_Abort( comm, 1 ); + } +#endif +} + void MpiWrapper::barrier( MPI_Comm const & MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + int id = ++g_currentMpiOperationTag; MPI_Barrier( comm ); + detectMPIDesync( comm, id ); +#else + MPI_Barrier( comm ); +#endif #endif } diff --git a/src/coreComponents/common/MpiWrapper.hpp b/src/coreComponents/common/MpiWrapper.hpp index 30e65f896c0..157ef546058 100644 --- a/src/coreComponents/common/MpiWrapper.hpp +++ b/src/coreComponents/common/MpiWrapper.hpp @@ -815,6 +815,12 @@ struct MpiWrapper */ template< typename T > static int allReduce( T const * sendbuf, T * recvbuf, int count, MPI_Op op, MPI_Comm comm = MPI_COMM_GEOS ); + +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + /// Tag/counter of the latest MPI collective operation to detect MPI desynchronizations + inline static int g_currentMpiOperationTag = 0; +#endif + }; namespace internal From 71ac2895dded238b18912b74a4a8e6772f422d7d Mon Sep 17 00:00:00 2001 From: kdrienCG Date: Fri, 12 Jun 2026 11:17:20 +0200 Subject: [PATCH 03/13] move detectMpiDesync in the .hpp --- src/coreComponents/common/MpiWrapper.cpp | 20 +------------------- src/coreComponents/common/MpiWrapper.hpp | 12 ++++++++++++ 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/src/coreComponents/common/MpiWrapper.cpp b/src/coreComponents/common/MpiWrapper.cpp index 3e271ab4154..5d148ab6362 100644 --- a/src/coreComponents/common/MpiWrapper.cpp +++ b/src/coreComponents/common/MpiWrapper.cpp @@ -38,31 +38,13 @@ MPI_Comm MPI_COMM_GEOS; int MPI_COMM_GEOS = 0; #endif -void detectMPIDesync( MPI_Comm const & MPI_PARAM( comm ), int id ) -{ -#ifdef GEOS_USE_MPI_DESYNC_DETECTION - int min_id = MpiWrapper::min( id ); - int max_id = MpiWrapper::max( id ); - - std::cout << "(detectMPIDesync) my_id = " << id << '\n'; - std::cout << "min_id = " << min_id << '\n'; - std::cout << "max_id = " << max_id << std::endl; - - if( min_id != max_id ) - { - std::cerr << "MPI desync detected" << std::endl; - MPI_Abort( comm, 1 ); - } -#endif -} - void MpiWrapper::barrier( MPI_Comm const & MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI #ifdef GEOS_USE_MPI_DESYNC_DETECTION int id = ++g_currentMpiOperationTag; MPI_Barrier( comm ); - detectMPIDesync( comm, id ); + detectMpiDesync( comm, id ); #else MPI_Barrier( comm ); #endif diff --git a/src/coreComponents/common/MpiWrapper.hpp b/src/coreComponents/common/MpiWrapper.hpp index 157ef546058..bc78aa88114 100644 --- a/src/coreComponents/common/MpiWrapper.hpp +++ b/src/coreComponents/common/MpiWrapper.hpp @@ -819,6 +819,18 @@ struct MpiWrapper #ifdef GEOS_USE_MPI_DESYNC_DETECTION /// Tag/counter of the latest MPI collective operation to detect MPI desynchronizations inline static int g_currentMpiOperationTag = 0; + + /** + * @brief Detects MPI desynchronization from MPI collective operations. + * @param[in] comm The MPI_Comm over which the gather operates. + * @param[in] id The current collective operation counter of this rank. + */ + inline static void detectMpiDesync( MPI_Comm const & MPI_PARAM( comm ), int id ) + { + int min_id = MpiWrapper::min( id ); + int max_id = MpiWrapper::max( id ); + if( min_id != max_id ) { MPI_Abort( comm, 1 ); } + } #endif }; From e3a406ef2f86c9dd4e5ba16b185847a9a75a1305 Mon Sep 17 00:00:00 2001 From: kdrienCG Date: Fri, 12 Jun 2026 16:02:16 +0200 Subject: [PATCH 04/13] add desync detection to all collective operations --- src/coreComponents/common/MpiWrapper.cpp | 7 +- src/coreComponents/common/MpiWrapper.hpp | 147 ++++++++++++++++------- 2 files changed, 108 insertions(+), 46 deletions(-) diff --git a/src/coreComponents/common/MpiWrapper.cpp b/src/coreComponents/common/MpiWrapper.cpp index 5d148ab6362..7f8cb357d3f 100644 --- a/src/coreComponents/common/MpiWrapper.cpp +++ b/src/coreComponents/common/MpiWrapper.cpp @@ -41,12 +41,9 @@ int MPI_COMM_GEOS = 0; void MpiWrapper::barrier( MPI_Comm const & MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI -#ifdef GEOS_USE_MPI_DESYNC_DETECTION - int id = ++g_currentMpiOperationTag; - MPI_Barrier( comm ); - detectMpiDesync( comm, id ); -#else MPI_Barrier( comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); #endif #endif } diff --git a/src/coreComponents/common/MpiWrapper.hpp b/src/coreComponents/common/MpiWrapper.hpp index bc78aa88114..f690b3b34fb 100644 --- a/src/coreComponents/common/MpiWrapper.hpp +++ b/src/coreComponents/common/MpiWrapper.hpp @@ -1004,9 +1004,13 @@ int MpiWrapper::allgather( T_SEND const * const sendbuf, MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - return MPI_Allgather( sendbuf, sendcount, internal::getMpiType< T_SEND >(), - recvbuf, recvcount, internal::getMpiType< T_RECV >(), - comm ); + int ret = MPI_Allgather( sendbuf, sendcount, internal::getMpiType< T_SEND >(), + recvbuf, recvcount, internal::getMpiType< T_RECV >(), + comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else static_assert( std::is_same< T_SEND, T_RECV >::value, "MpiWrapper::allgather() for serial run requires send and receive buffers are of the same type" ); @@ -1025,9 +1029,13 @@ int MpiWrapper::allgatherv( T_SEND const * const sendbuf, MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - return MPI_Allgatherv( sendbuf, sendcount, internal::getMpiType< T_SEND >(), - recvbuf, recvcounts, displacements, internal::getMpiType< T_RECV >(), - comm ); + int ret = MPI_Allgatherv( sendbuf, sendcount, internal::getMpiType< T_SEND >(), + recvbuf, recvcounts, displacements, internal::getMpiType< T_RECV >(), + comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else static_assert( std::is_same< T_SEND, T_RECV >::value, "MpiWrapper::allgatherv() for serial run requires send and receive buffers are of the same type" ); @@ -1048,6 +1056,9 @@ void MpiWrapper::allGather( T const myValue, array1d< T > & allValues, MPI_Comm MPI_Datatype const MPI_TYPE = internal::getMpiType< T >(); MPI_Allgather( &myValue, 1, MPI_TYPE, allValues.data(), 1, MPI_TYPE, comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif #else allValues.resize( 1 ); @@ -1064,13 +1075,17 @@ int MpiWrapper::allGather( arrayView1d< T const > const & sendValues, #ifdef GEOS_USE_MPI int const mpiSize = commSize( comm ); allValues.resize( mpiSize * sendSize ); - return MPI_Allgather( sendValues.data(), - sendSize, - internal::getMpiType< T >(), - allValues.data(), - sendSize, - internal::getMpiType< T >(), - comm ); + int ret = MPI_Allgather( sendValues.data(), + sendSize, + internal::getMpiType< T >(), + allValues.data(), + sendSize, + internal::getMpiType< T >(), + comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else allValues.resize( sendSize ); @@ -1095,14 +1110,18 @@ int MpiWrapper::allGatherv( arrayView1d< T const > const & sendValues, array1d< int > displs( mpiSize + 1 ); std::partial_sum( counts.begin(), counts.end(), displs.begin() + 1 ); allValues.resize( displs.back() ); - return MPI_Allgatherv( sendValues.data(), - sendSize, - internal::getMpiType< T >(), - allValues.data(), - counts.data(), - displs.data(), - internal::getMpiType< T >(), - comm ); + int ret = MPI_Allgatherv( sendValues.data(), + sendSize, + internal::getMpiType< T >(), + allValues.data(), + counts.data(), + displs.data(), + internal::getMpiType< T >(), + comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else allValues.resize( sendSize ); @@ -1123,7 +1142,11 @@ int MpiWrapper::allReduce( T const * const sendbuf, { #ifdef GEOS_USE_MPI MPI_Datatype const mpiType = internal::getMpiType< T >(); - return MPI_Allreduce( sendbuf == recvbuf ? MPI_IN_PLACE : sendbuf, recvbuf, count, mpiType, op, comm ); + int ret = MPI_Allreduce( sendbuf == recvbuf ? MPI_IN_PLACE : sendbuf, recvbuf, count, mpiType, op, comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else if( sendbuf != recvbuf ) { @@ -1143,7 +1166,11 @@ int MpiWrapper::reduce( T const * const sendbuf, { #ifdef GEOS_USE_MPI MPI_Datatype const mpiType = internal::getMpiType< T >(); - return MPI_Reduce( sendbuf == recvbuf ? MPI_IN_PLACE : sendbuf, recvbuf, count, mpiType, op, root, comm ); + int ret = MPI_Reduce( sendbuf == recvbuf ? MPI_IN_PLACE : sendbuf, recvbuf, count, mpiType, op, root, comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else if( sendbuf != recvbuf ) { @@ -1161,7 +1188,11 @@ int MpiWrapper::scan( T const * const sendbuf, MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - return MPI_Scan( sendbuf, recvbuf, count, internal::getMpiType< T >(), op, comm ); + int ret = MPI_Scan( sendbuf, recvbuf, count, internal::getMpiType< T >(), op, comm ); +#ifdef GEOS_USE_MPI + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else memcpy( recvbuf, sendbuf, count*sizeof(T) ); return 0; @@ -1176,7 +1207,11 @@ int MpiWrapper::exscan( T const * const MPI_PARAM( sendbuf ), MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - return MPI_Exscan( sendbuf, recvbuf, count, internal::getMpiType< T >(), op, comm ); + int ret = MPI_Exscan( sendbuf, recvbuf, count, internal::getMpiType< T >(), op, comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else memset( recvbuf, 0, count*sizeof(T) ); return 0; @@ -1190,7 +1225,11 @@ int MpiWrapper::bcast( T * const MPI_PARAM( buffer ), MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - return MPI_Bcast( buffer, count, internal::getMpiType< T >(), root, comm ); + int ret = MPI_Bcast( buffer, count, internal::getMpiType< T >(), root, comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else return 0; #endif @@ -1202,6 +1241,9 @@ void MpiWrapper::broadcast( T & MPI_PARAM( value ), int MPI_PARAM( srcRank ), MP { #ifdef GEOS_USE_MPI MPI_Bcast( &value, 1, internal::getMpiType< T >(), srcRank, comm ); +// #ifdef GEOS_USE_MPI_DESYNC_DETECTION +// detectMpiDesync( comm, ++g_currentMpiOperationTag ); +// #endif #endif } @@ -1216,6 +1258,9 @@ void MpiWrapper::broadcast< string >( string & MPI_PARAM( value ), broadcast( size, srcRank, comm ); value.resize( size ); MPI_Bcast( const_cast< char * >( value.data() ), size, internal::getMpiType< char >(), srcRank, comm ); +// #ifdef GEOS_USE_MPI_DESYNC_DETECTION +// detectMpiDesync( comm, ++g_currentMpiOperationTag ); +// #endif #endif } @@ -1228,9 +1273,13 @@ int MpiWrapper::gather( TS const * const sendbuf, MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - return MPI_Gather( sendbuf, sendcount, internal::getMpiType< TS >(), - recvbuf, recvcount, internal::getMpiType< TR >(), - root, comm ); + int ret = MPI_Gather( sendbuf, sendcount, internal::getMpiType< TS >(), + recvbuf, recvcount, internal::getMpiType< TR >(), + root, comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else static_assert( std::is_same< TS, TR >::value, "MpiWrapper::gather() for serial run requires send and receive buffers are of the same type" ); @@ -1252,9 +1301,13 @@ int MpiWrapper::gather( T const & value, GEOS_ERROR_IF_LT_MSG( destValuesBuffer.size(), size_t( commSize() ), "Receive buffer is not large enough to contain the values to receive." ); #ifdef GEOS_USE_MPI - return MPI_Gather( &value, sizeof( T ), internal::getMpiType< uint8_t >(), - destValuesBuffer.data(), sizeof( T ), internal::getMpiType< uint8_t >(), - root, comm ); + int ret = MPI_Gather( &value, sizeof( T ), internal::getMpiType< uint8_t >(), + destValuesBuffer.data(), sizeof( T ), internal::getMpiType< uint8_t >(), + root, comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else memcpy( destValuesBuffer.data(), &value, sendBufferSize ); return 0; @@ -1271,9 +1324,13 @@ int MpiWrapper::gatherv( TS const * const sendbuf, MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - return MPI_Gatherv( sendbuf, sendcount, internal::getMpiType< TS >(), - recvbuf, recvcounts, displs, internal::getMpiType< TR >(), - root, comm ); + int ret = MPI_Gatherv( sendbuf, sendcount, internal::getMpiType< TS >(), + recvbuf, recvcounts, displs, internal::getMpiType< TR >(), + root, comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else static_assert( std::is_same< TS, TR >::value, "MpiWrapper::gather() for serial run requires send and receive buffers are of the same type" ); @@ -1295,9 +1352,13 @@ int MpiWrapper::scatter( TS const * const sendbuf, MPI_Comm MPI_PARAM( comm )) { #ifdef GEOS_USE_MPI - return MPI_Scatter( sendbuf, sendcount, internal::getMpiType< TS >(), - recvbuf, recvcount, internal::getMpiType< TR >(), - root, comm ); + int ret = MPI_Scatter( sendbuf, sendcount, internal::getMpiType< TS >(), + recvbuf, recvcount, internal::getMpiType< TR >(), + root, comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else static_assert( std::is_same< TS, TR >::value, "MpiWrapper::scatter() for serial run requires send and receive buffers are of the same type" ); @@ -1319,9 +1380,13 @@ int MpiWrapper::scatterv( TS const * const sendbuf, MPI_Comm MPI_PARAM( comm )) { #ifdef GEOS_USE_MPI - return MPI_Scatterv( sendbuf, sendcounts, displs, internal::getMpiType< TS >(), - recvbuf, recvcount, internal::getMpiType< TR >(), - root, comm ); + int ret = MPI_Scatterv( sendbuf, sendcounts, displs, internal::getMpiType< TS >(), + recvbuf, recvcount, internal::getMpiType< TR >(), + root, comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif + return ret; #else static_assert( std::is_same< TS, TR >::value, "MpiWrapper::scatterv() for serial run requires send and receive buffers are of the same type" ); From 72e65819bd79ebb8e7c7453c223705523ce63253 Mon Sep 17 00:00:00 2001 From: kdrienCG Date: Fri, 12 Jun 2026 16:45:47 +0200 Subject: [PATCH 05/13] complete collective calls desync detection --- src/coreComponents/common/MpiWrapper.hpp | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/coreComponents/common/MpiWrapper.hpp b/src/coreComponents/common/MpiWrapper.hpp index f690b3b34fb..1c972e6dff4 100644 --- a/src/coreComponents/common/MpiWrapper.hpp +++ b/src/coreComponents/common/MpiWrapper.hpp @@ -827,9 +827,9 @@ struct MpiWrapper */ inline static void detectMpiDesync( MPI_Comm const & MPI_PARAM( comm ), int id ) { - int min_id = MpiWrapper::min( id ); - int max_id = MpiWrapper::max( id ); - if( min_id != max_id ) { MPI_Abort( comm, 1 ); } + int minId = id; MPI_Allreduce( MPI_IN_PLACE, &minId, 1, MPI_INT, MPI_MIN, comm ); + int maxId = id; MPI_Allreduce( MPI_IN_PLACE, &minId, 1, MPI_INT, MPI_MAX, comm ); + if( minId != maxId ) { MPI_Abort( comm, 1 ); } } #endif @@ -1189,7 +1189,7 @@ int MpiWrapper::scan( T const * const sendbuf, { #ifdef GEOS_USE_MPI int ret = MPI_Scan( sendbuf, recvbuf, count, internal::getMpiType< T >(), op, comm ); -#ifdef GEOS_USE_MPI +#ifdef GEOS_USE_MPI_DESYNC_DETECTION detectMpiDesync( comm, ++g_currentMpiOperationTag ); #endif return ret; @@ -1241,9 +1241,9 @@ void MpiWrapper::broadcast( T & MPI_PARAM( value ), int MPI_PARAM( srcRank ), MP { #ifdef GEOS_USE_MPI MPI_Bcast( &value, 1, internal::getMpiType< T >(), srcRank, comm ); -// #ifdef GEOS_USE_MPI_DESYNC_DETECTION -// detectMpiDesync( comm, ++g_currentMpiOperationTag ); -// #endif +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif #endif } @@ -1258,9 +1258,9 @@ void MpiWrapper::broadcast< string >( string & MPI_PARAM( value ), broadcast( size, srcRank, comm ); value.resize( size ); MPI_Bcast( const_cast< char * >( value.data() ), size, internal::getMpiType< char >(), srcRank, comm ); -// #ifdef GEOS_USE_MPI_DESYNC_DETECTION -// detectMpiDesync( comm, ++g_currentMpiOperationTag ); -// #endif +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif #endif } @@ -1653,6 +1653,9 @@ MpiWrapper::allReduce( PairType< FIRST, SECOND > const & localPair, MPI_Comm com auto const mpiOp = internal::getMpiPairReductionOp< FIRST, SECOND, OP >(); PairType< FIRST, SECOND > pair{ localPair.first, localPair.second }; MPI_Allreduce( MPI_IN_PLACE, &pair, 1, type, mpiOp, comm ); +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + detectMpiDesync( comm, ++g_currentMpiOperationTag ); +#endif return pair; #else return localPair; From a19e09eb767ceb3e4659c0a70c2b185bb7d96349 Mon Sep 17 00:00:00 2001 From: kdrienCG Date: Fri, 12 Jun 2026 17:10:17 +0200 Subject: [PATCH 06/13] rename variables for readability --- src/coreComponents/common/MpiWrapper.cpp | 2 +- src/coreComponents/common/MpiWrapper.hpp | 44 ++++++++++++------------ 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/coreComponents/common/MpiWrapper.cpp b/src/coreComponents/common/MpiWrapper.cpp index 7f8cb357d3f..ea2179194f6 100644 --- a/src/coreComponents/common/MpiWrapper.cpp +++ b/src/coreComponents/common/MpiWrapper.cpp @@ -43,7 +43,7 @@ void MpiWrapper::barrier( MPI_Comm const & MPI_PARAM( comm ) ) #ifdef GEOS_USE_MPI MPI_Barrier( comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif #endif } diff --git a/src/coreComponents/common/MpiWrapper.hpp b/src/coreComponents/common/MpiWrapper.hpp index 1c972e6dff4..ff3719ad525 100644 --- a/src/coreComponents/common/MpiWrapper.hpp +++ b/src/coreComponents/common/MpiWrapper.hpp @@ -823,12 +823,12 @@ struct MpiWrapper /** * @brief Detects MPI desynchronization from MPI collective operations. * @param[in] comm The MPI_Comm over which the gather operates. - * @param[in] id The current collective operation counter of this rank. + * @param[in] operationId The current collective operation counter of this rank. */ - inline static void detectMpiDesync( MPI_Comm const & MPI_PARAM( comm ), int id ) + inline static void detectMpiDesync( MPI_Comm const & MPI_PARAM( comm ), int operationId ) { - int minId = id; MPI_Allreduce( MPI_IN_PLACE, &minId, 1, MPI_INT, MPI_MIN, comm ); - int maxId = id; MPI_Allreduce( MPI_IN_PLACE, &minId, 1, MPI_INT, MPI_MAX, comm ); + int minId = operationId; MPI_Allreduce( MPI_IN_PLACE, &minId, 1, MPI_INT, MPI_MIN, comm ); + int maxId = operationId; MPI_Allreduce( MPI_IN_PLACE, &minId, 1, MPI_INT, MPI_MAX, comm ); if( minId != maxId ) { MPI_Abort( comm, 1 ); } } #endif @@ -1008,7 +1008,7 @@ int MpiWrapper::allgather( T_SEND const * const sendbuf, recvbuf, recvcount, internal::getMpiType< T_RECV >(), comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; #else @@ -1033,7 +1033,7 @@ int MpiWrapper::allgatherv( T_SEND const * const sendbuf, recvbuf, recvcounts, displacements, internal::getMpiType< T_RECV >(), comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; #else @@ -1057,7 +1057,7 @@ void MpiWrapper::allGather( T const myValue, array1d< T > & allValues, MPI_Comm MPI_Allgather( &myValue, 1, MPI_TYPE, allValues.data(), 1, MPI_TYPE, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif #else @@ -1083,7 +1083,7 @@ int MpiWrapper::allGather( arrayView1d< T const > const & sendValues, internal::getMpiType< T >(), comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; @@ -1119,7 +1119,7 @@ int MpiWrapper::allGatherv( arrayView1d< T const > const & sendValues, internal::getMpiType< T >(), comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; @@ -1144,7 +1144,7 @@ int MpiWrapper::allReduce( T const * const sendbuf, MPI_Datatype const mpiType = internal::getMpiType< T >(); int ret = MPI_Allreduce( sendbuf == recvbuf ? MPI_IN_PLACE : sendbuf, recvbuf, count, mpiType, op, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; #else @@ -1168,7 +1168,7 @@ int MpiWrapper::reduce( T const * const sendbuf, MPI_Datatype const mpiType = internal::getMpiType< T >(); int ret = MPI_Reduce( sendbuf == recvbuf ? MPI_IN_PLACE : sendbuf, recvbuf, count, mpiType, op, root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; #else @@ -1190,7 +1190,7 @@ int MpiWrapper::scan( T const * const sendbuf, #ifdef GEOS_USE_MPI int ret = MPI_Scan( sendbuf, recvbuf, count, internal::getMpiType< T >(), op, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; #else @@ -1209,7 +1209,7 @@ int MpiWrapper::exscan( T const * const MPI_PARAM( sendbuf ), #ifdef GEOS_USE_MPI int ret = MPI_Exscan( sendbuf, recvbuf, count, internal::getMpiType< T >(), op, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; #else @@ -1227,7 +1227,7 @@ int MpiWrapper::bcast( T * const MPI_PARAM( buffer ), #ifdef GEOS_USE_MPI int ret = MPI_Bcast( buffer, count, internal::getMpiType< T >(), root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; #else @@ -1242,7 +1242,7 @@ void MpiWrapper::broadcast( T & MPI_PARAM( value ), int MPI_PARAM( srcRank ), MP #ifdef GEOS_USE_MPI MPI_Bcast( &value, 1, internal::getMpiType< T >(), srcRank, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif #endif } @@ -1259,7 +1259,7 @@ void MpiWrapper::broadcast< string >( string & MPI_PARAM( value ), value.resize( size ); MPI_Bcast( const_cast< char * >( value.data() ), size, internal::getMpiType< char >(), srcRank, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif #endif } @@ -1277,7 +1277,7 @@ int MpiWrapper::gather( TS const * const sendbuf, recvbuf, recvcount, internal::getMpiType< TR >(), root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; #else @@ -1305,7 +1305,7 @@ int MpiWrapper::gather( T const & value, destValuesBuffer.data(), sizeof( T ), internal::getMpiType< uint8_t >(), root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; #else @@ -1328,7 +1328,7 @@ int MpiWrapper::gatherv( TS const * const sendbuf, recvbuf, recvcounts, displs, internal::getMpiType< TR >(), root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; #else @@ -1356,7 +1356,7 @@ int MpiWrapper::scatter( TS const * const sendbuf, recvbuf, recvcount, internal::getMpiType< TR >(), root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; #else @@ -1384,7 +1384,7 @@ int MpiWrapper::scatterv( TS const * const sendbuf, recvbuf, recvcount, internal::getMpiType< TR >(), root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return ret; #else @@ -1654,7 +1654,7 @@ MpiWrapper::allReduce( PairType< FIRST, SECOND > const & localPair, MPI_Comm com PairType< FIRST, SECOND > pair{ localPair.first, localPair.second }; MPI_Allreduce( MPI_IN_PLACE, &pair, 1, type, mpiOp, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_currentMpiOperationTag ); + detectMpiDesync( comm, ++g_collectiveOperationCounter ); #endif return pair; #else From c8816212d808923c7da3ba624eea8ae92724dff4 Mon Sep 17 00:00:00 2001 From: kdrienCG Date: Fri, 12 Jun 2026 17:19:06 +0200 Subject: [PATCH 07/13] fix max Allreduce call --- src/coreComponents/common/MpiWrapper.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/coreComponents/common/MpiWrapper.hpp b/src/coreComponents/common/MpiWrapper.hpp index ff3719ad525..31c016797e4 100644 --- a/src/coreComponents/common/MpiWrapper.hpp +++ b/src/coreComponents/common/MpiWrapper.hpp @@ -828,7 +828,7 @@ struct MpiWrapper inline static void detectMpiDesync( MPI_Comm const & MPI_PARAM( comm ), int operationId ) { int minId = operationId; MPI_Allreduce( MPI_IN_PLACE, &minId, 1, MPI_INT, MPI_MIN, comm ); - int maxId = operationId; MPI_Allreduce( MPI_IN_PLACE, &minId, 1, MPI_INT, MPI_MAX, comm ); + int maxId = operationId; MPI_Allreduce( MPI_IN_PLACE, &maxId, 1, MPI_INT, MPI_MAX, comm ); if( minId != maxId ) { MPI_Abort( comm, 1 ); } } #endif From 27ec2ee20359e86db4f9cb6609caa8262526871f Mon Sep 17 00:00:00 2001 From: kdrienCG Date: Fri, 12 Jun 2026 17:37:21 +0200 Subject: [PATCH 08/13] add MpiDesyncGuard RAII helper for readability --- src/coreComponents/common/MpiWrapper.cpp | 4 +- src/coreComponents/common/MpiWrapper.hpp | 183 ++++++++++++----------- 2 files changed, 98 insertions(+), 89 deletions(-) diff --git a/src/coreComponents/common/MpiWrapper.cpp b/src/coreComponents/common/MpiWrapper.cpp index ea2179194f6..11f22c3fc9c 100644 --- a/src/coreComponents/common/MpiWrapper.cpp +++ b/src/coreComponents/common/MpiWrapper.cpp @@ -41,10 +41,10 @@ int MPI_COMM_GEOS = 0; void MpiWrapper::barrier( MPI_Comm const & MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - MPI_Barrier( comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif + MPI_Barrier( comm ); #endif } diff --git a/src/coreComponents/common/MpiWrapper.hpp b/src/coreComponents/common/MpiWrapper.hpp index 31c016797e4..f86364b8744 100644 --- a/src/coreComponents/common/MpiWrapper.hpp +++ b/src/coreComponents/common/MpiWrapper.hpp @@ -821,7 +821,30 @@ struct MpiWrapper inline static int g_currentMpiOperationTag = 0; /** - * @brief Detects MPI desynchronization from MPI collective operations. + * @struct MpiDesyncGuard + * @brief RAII helper to detect MPI desynchronizations from MPI collective operations. + */ + struct MpiDesyncGuard + { + MPI_Comm const & m_comm; + int const m_operationId; + + explicit MpiDesyncGuard( MPI_Comm const & comm ) + : m_comm( comm ) + , m_operationId( ++g_collectiveOperationCounter ) + {} + + ~MpiDesyncGuard() + { + detectMpiDesync( m_comm, m_operationId ); + } + + MpiDesyncGuard( MpiDesyncGuard const & ) = delete; + MpiDesyncGuard & operator=( MpiDesyncGuard const & ) = delete; + }; + + /** + * @brief Detects MPI desynchronizations from MPI collective operations. * @param[in] comm The MPI_Comm over which the gather operates. * @param[in] operationId The current collective operation counter of this rank. */ @@ -1004,13 +1027,12 @@ int MpiWrapper::allgather( T_SEND const * const sendbuf, MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - int ret = MPI_Allgather( sendbuf, sendcount, internal::getMpiType< T_SEND >(), - recvbuf, recvcount, internal::getMpiType< T_RECV >(), - comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif - return ret; + return MPI_Allgather( sendbuf, sendcount, internal::getMpiType< T_SEND >(), + recvbuf, recvcount, internal::getMpiType< T_RECV >(), + comm ); #else static_assert( std::is_same< T_SEND, T_RECV >::value, "MpiWrapper::allgather() for serial run requires send and receive buffers are of the same type" ); @@ -1029,13 +1051,12 @@ int MpiWrapper::allgatherv( T_SEND const * const sendbuf, MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - int ret = MPI_Allgatherv( sendbuf, sendcount, internal::getMpiType< T_SEND >(), - recvbuf, recvcounts, displacements, internal::getMpiType< T_RECV >(), - comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif - return ret; + return MPI_Allgatherv( sendbuf, sendcount, internal::getMpiType< T_SEND >(), + recvbuf, recvcounts, displacements, internal::getMpiType< T_RECV >(), + comm ); #else static_assert( std::is_same< T_SEND, T_RECV >::value, "MpiWrapper::allgatherv() for serial run requires send and receive buffers are of the same type" ); @@ -1050,15 +1071,15 @@ template< typename T > void MpiWrapper::allGather( T const myValue, array1d< T > & allValues, MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + MpiDesyncGuard const mpiDesyncGuard( comm ); +#endif int const mpiSize = commSize( comm ); allValues.resize( mpiSize ); MPI_Datatype const MPI_TYPE = internal::getMpiType< T >(); MPI_Allgather( &myValue, 1, MPI_TYPE, allValues.data(), 1, MPI_TYPE, comm ); -#ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); -#endif #else allValues.resize( 1 ); @@ -1073,19 +1094,18 @@ int MpiWrapper::allGather( arrayView1d< T const > const & sendValues, { int const sendSize = LvArray::integerConversion< int >( sendValues.size() ); #ifdef GEOS_USE_MPI - int const mpiSize = commSize( comm ); - allValues.resize( mpiSize * sendSize ); - int ret = MPI_Allgather( sendValues.data(), - sendSize, - internal::getMpiType< T >(), - allValues.data(), - sendSize, - internal::getMpiType< T >(), - comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif - return ret; + int const mpiSize = commSize( comm ); + allValues.resize( mpiSize * sendSize ); + return MPI_Allgather( sendValues.data(), + sendSize, + internal::getMpiType< T >(), + allValues.data(), + sendSize, + internal::getMpiType< T >(), + comm ); #else allValues.resize( sendSize ); @@ -1104,24 +1124,23 @@ int MpiWrapper::allGatherv( arrayView1d< T const > const & sendValues, { int const sendSize = LvArray::integerConversion< int >( sendValues.size() ); #ifdef GEOS_USE_MPI +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + MpiDesyncGuard const mpiDesyncGuard( comm ); +#endif int const mpiSize = commSize( comm ); array1d< int > counts; allGather( sendSize, counts, comm ); array1d< int > displs( mpiSize + 1 ); std::partial_sum( counts.begin(), counts.end(), displs.begin() + 1 ); allValues.resize( displs.back() ); - int ret = MPI_Allgatherv( sendValues.data(), - sendSize, - internal::getMpiType< T >(), - allValues.data(), - counts.data(), - displs.data(), - internal::getMpiType< T >(), - comm ); -#ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); -#endif - return ret; + return MPI_Allgatherv( sendValues.data(), + sendSize, + internal::getMpiType< T >(), + allValues.data(), + counts.data(), + displs.data(), + internal::getMpiType< T >(), + comm ); #else allValues.resize( sendSize ); @@ -1141,12 +1160,11 @@ int MpiWrapper::allReduce( T const * const sendbuf, MPI_Comm const MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - MPI_Datatype const mpiType = internal::getMpiType< T >(); - int ret = MPI_Allreduce( sendbuf == recvbuf ? MPI_IN_PLACE : sendbuf, recvbuf, count, mpiType, op, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif - return ret; + MPI_Datatype const mpiType = internal::getMpiType< T >(); + return MPI_Allreduce( sendbuf == recvbuf ? MPI_IN_PLACE : sendbuf, recvbuf, count, mpiType, op, comm ); #else if( sendbuf != recvbuf ) { @@ -1165,12 +1183,11 @@ int MpiWrapper::reduce( T const * const sendbuf, MPI_Comm const MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - MPI_Datatype const mpiType = internal::getMpiType< T >(); - int ret = MPI_Reduce( sendbuf == recvbuf ? MPI_IN_PLACE : sendbuf, recvbuf, count, mpiType, op, root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif - return ret; + MPI_Datatype const mpiType = internal::getMpiType< T >(); + return MPI_Reduce( sendbuf == recvbuf ? MPI_IN_PLACE : sendbuf, recvbuf, count, mpiType, op, root, comm ); #else if( sendbuf != recvbuf ) { @@ -1188,11 +1205,10 @@ int MpiWrapper::scan( T const * const sendbuf, MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - int ret = MPI_Scan( sendbuf, recvbuf, count, internal::getMpiType< T >(), op, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif - return ret; + return MPI_Scan( sendbuf, recvbuf, count, internal::getMpiType< T >(), op, comm ); #else memcpy( recvbuf, sendbuf, count*sizeof(T) ); return 0; @@ -1207,11 +1223,10 @@ int MpiWrapper::exscan( T const * const MPI_PARAM( sendbuf ), MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - int ret = MPI_Exscan( sendbuf, recvbuf, count, internal::getMpiType< T >(), op, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif - return ret; + return MPI_Exscan( sendbuf, recvbuf, count, internal::getMpiType< T >(), op, comm ); #else memset( recvbuf, 0, count*sizeof(T) ); return 0; @@ -1225,11 +1240,10 @@ int MpiWrapper::bcast( T * const MPI_PARAM( buffer ), MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - int ret = MPI_Bcast( buffer, count, internal::getMpiType< T >(), root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif - return ret; + return MPI_Bcast( buffer, count, internal::getMpiType< T >(), root, comm ); #else return 0; #endif @@ -1240,10 +1254,10 @@ template< typename T > void MpiWrapper::broadcast( T & MPI_PARAM( value ), int MPI_PARAM( srcRank ), MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - MPI_Bcast( &value, 1, internal::getMpiType< T >(), srcRank, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif + MPI_Bcast( &value, 1, internal::getMpiType< T >(), srcRank, comm ); #endif } @@ -1254,13 +1268,13 @@ void MpiWrapper::broadcast< string >( string & MPI_PARAM( value ), MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + MpiDesyncGuard const mpiDesyncGuard( comm ); +#endif int size = LvArray::integerConversion< int >( value.size() ); broadcast( size, srcRank, comm ); value.resize( size ); MPI_Bcast( const_cast< char * >( value.data() ), size, internal::getMpiType< char >(), srcRank, comm ); -#ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); -#endif #endif } @@ -1273,13 +1287,12 @@ int MpiWrapper::gather( TS const * const sendbuf, MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - int ret = MPI_Gather( sendbuf, sendcount, internal::getMpiType< TS >(), - recvbuf, recvcount, internal::getMpiType< TR >(), - root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif - return ret; + return MPI_Gather( sendbuf, sendcount, internal::getMpiType< TS >(), + recvbuf, recvcount, internal::getMpiType< TR >(), + root, comm ); #else static_assert( std::is_same< TS, TR >::value, "MpiWrapper::gather() for serial run requires send and receive buffers are of the same type" ); @@ -1301,13 +1314,12 @@ int MpiWrapper::gather( T const & value, GEOS_ERROR_IF_LT_MSG( destValuesBuffer.size(), size_t( commSize() ), "Receive buffer is not large enough to contain the values to receive." ); #ifdef GEOS_USE_MPI - int ret = MPI_Gather( &value, sizeof( T ), internal::getMpiType< uint8_t >(), - destValuesBuffer.data(), sizeof( T ), internal::getMpiType< uint8_t >(), - root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif - return ret; + return MPI_Gather( &value, sizeof( T ), internal::getMpiType< uint8_t >(), + destValuesBuffer.data(), sizeof( T ), internal::getMpiType< uint8_t >(), + root, comm ); #else memcpy( destValuesBuffer.data(), &value, sendBufferSize ); return 0; @@ -1324,13 +1336,12 @@ int MpiWrapper::gatherv( TS const * const sendbuf, MPI_Comm MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI - int ret = MPI_Gatherv( sendbuf, sendcount, internal::getMpiType< TS >(), - recvbuf, recvcounts, displs, internal::getMpiType< TR >(), - root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif - return ret; + return MPI_Gatherv( sendbuf, sendcount, internal::getMpiType< TS >(), + recvbuf, recvcounts, displs, internal::getMpiType< TR >(), + root, comm ); #else static_assert( std::is_same< TS, TR >::value, "MpiWrapper::gather() for serial run requires send and receive buffers are of the same type" ); @@ -1352,13 +1363,12 @@ int MpiWrapper::scatter( TS const * const sendbuf, MPI_Comm MPI_PARAM( comm )) { #ifdef GEOS_USE_MPI - int ret = MPI_Scatter( sendbuf, sendcount, internal::getMpiType< TS >(), - recvbuf, recvcount, internal::getMpiType< TR >(), - root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif - return ret; + return MPI_Scatter( sendbuf, sendcount, internal::getMpiType< TS >(), + recvbuf, recvcount, internal::getMpiType< TR >(), + root, comm ); #else static_assert( std::is_same< TS, TR >::value, "MpiWrapper::scatter() for serial run requires send and receive buffers are of the same type" ); @@ -1380,13 +1390,12 @@ int MpiWrapper::scatterv( TS const * const sendbuf, MPI_Comm MPI_PARAM( comm )) { #ifdef GEOS_USE_MPI - int ret = MPI_Scatterv( sendbuf, sendcounts, displs, internal::getMpiType< TS >(), - recvbuf, recvcount, internal::getMpiType< TR >(), - root, comm ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); + MpiDesyncGuard const mpiDesyncGuard( comm ); #endif - return ret; + return MPI_Scatterv( sendbuf, sendcounts, displs, internal::getMpiType< TS >(), + recvbuf, recvcount, internal::getMpiType< TR >(), + root, comm ); #else static_assert( std::is_same< TS, TR >::value, "MpiWrapper::scatterv() for serial run requires send and receive buffers are of the same type" ); @@ -1649,13 +1658,13 @@ MpiWrapper::PairType< FIRST, SECOND > MpiWrapper::allReduce( PairType< FIRST, SECOND > const & localPair, MPI_Comm comm ) { #ifdef GEOS_USE_MPI +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + MpiDesyncGuard const mpiDesyncGuard( comm ); +#endif auto const type = internal::getMpiPairType< FIRST, SECOND >(); auto const mpiOp = internal::getMpiPairReductionOp< FIRST, SECOND, OP >(); PairType< FIRST, SECOND > pair{ localPair.first, localPair.second }; MPI_Allreduce( MPI_IN_PLACE, &pair, 1, type, mpiOp, comm ); -#ifdef GEOS_USE_MPI_DESYNC_DETECTION - detectMpiDesync( comm, ++g_collectiveOperationCounter ); -#endif return pair; #else return localPair; From f7cf49f838aa63990ed65d5eaa2e785b06c29efc Mon Sep 17 00:00:00 2001 From: kdrienCG Date: Wed, 17 Jun 2026 11:29:10 +0200 Subject: [PATCH 09/13] fix global counter name --- src/coreComponents/common/MpiWrapper.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/coreComponents/common/MpiWrapper.hpp b/src/coreComponents/common/MpiWrapper.hpp index f86364b8744..f852f40e184 100644 --- a/src/coreComponents/common/MpiWrapper.hpp +++ b/src/coreComponents/common/MpiWrapper.hpp @@ -818,7 +818,7 @@ struct MpiWrapper #ifdef GEOS_USE_MPI_DESYNC_DETECTION /// Tag/counter of the latest MPI collective operation to detect MPI desynchronizations - inline static int g_currentMpiOperationTag = 0; + inline static int g_collectiveOperationCounter = 0; /** * @struct MpiDesyncGuard From e4753b7d563fc917c0ad98f3fce3e8dc0e9b0f9e Mon Sep 17 00:00:00 2001 From: kdrienCG Date: Wed, 17 Jun 2026 11:29:23 +0200 Subject: [PATCH 10/13] replace standard MPI_* collective operations use with MpiWrapper ones --- src/coreComponents/common/logger/Logger.cpp | 3 +- src/coreComponents/events/EventManager.cpp | 4 +- src/coreComponents/events/HaltEvent.cpp | 5 +- src/coreComponents/events/PeriodicEvent.cpp | 13 ++--- .../physicsSolvers/FieldStatisticsBase.hpp | 2 +- .../fluidFlow/wells/WellSolverBase.cpp | 2 +- .../solidMechanics/SolidMechanicsMPM.cpp | 57 ++++--------------- 7 files changed, 23 insertions(+), 63 deletions(-) diff --git a/src/coreComponents/common/logger/Logger.cpp b/src/coreComponents/common/logger/Logger.cpp index 86611998ebc..5b36036e602 100644 --- a/src/coreComponents/common/logger/Logger.cpp +++ b/src/coreComponents/common/logger/Logger.cpp @@ -19,6 +19,7 @@ // Source includes #include "Logger.hpp" +#include "common/MpiWrapper.hpp" #include "common/Path.hpp" namespace geos @@ -53,7 +54,7 @@ void InitializeLogger( MPI_Comm mpi_comm, const std::string & rankOutputDir ) makeDirsForPath( rankOutputDir ); } - MPI_Barrier( mpi_comm ); + MpiWrapper::barrier( mpi_comm ); std::string outputFilePath = rankOutputDir + "/rank_" + internal::g_rankString + ".out"; internal::g_rankStream = new std::ofstream( outputFilePath ); } diff --git a/src/coreComponents/events/EventManager.cpp b/src/coreComponents/events/EventManager.cpp index 5e24698b008..cbe8a9b02f4 100644 --- a/src/coreComponents/events/EventManager.cpp +++ b/src/coreComponents/events/EventManager.cpp @@ -171,9 +171,7 @@ bool EventManager::run( DomainPartition & domain ) #ifdef GEOS_USE_MPI // Find the min dt across processes - real64 dt_global; - MPI_Allreduce( &m_dt, &dt_global, 1, MPI_DOUBLE, MPI_MIN, MPI_COMM_GEOS ); - m_dt = dt_global; + m_dt = MpiWrapper::min( m_dt, MPI_COMM_GEOS ); #endif } LogPart logPart( "TIMESTEP", MpiWrapper::commRank() == 0 ); diff --git a/src/coreComponents/events/HaltEvent.cpp b/src/coreComponents/events/HaltEvent.cpp index 93cc9d2d581..3d1016e80f5 100644 --- a/src/coreComponents/events/HaltEvent.cpp +++ b/src/coreComponents/events/HaltEvent.cpp @@ -14,6 +14,7 @@ */ #include "HaltEvent.hpp" +#include "common/MpiWrapper.hpp" #include /** @@ -67,9 +68,7 @@ void HaltEvent::estimateEventTiming( real64 const GEOS_UNUSED_PARAM( time ), // The timing for the ranks may differ slightly, so synchronize // TODO: Only do the communication when you are close to the end? #ifdef GEOS_USE_MPI - integer forecast_global; - MPI_Allreduce( &forecast, &forecast_global, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD ); - forecast = forecast_global; + forecast = MpiWrapper::min( forecast, MPI_COMM_WORLD ); #endif setForecast( forecast ); diff --git a/src/coreComponents/events/PeriodicEvent.cpp b/src/coreComponents/events/PeriodicEvent.cpp index 86d6c85bb81..e1e79337d67 100644 --- a/src/coreComponents/events/PeriodicEvent.cpp +++ b/src/coreComponents/events/PeriodicEvent.cpp @@ -20,6 +20,7 @@ #include "PeriodicEvent.hpp" #include "common/format/Format.hpp" +#include "common/MpiWrapper.hpp" #include "functions/FunctionManager.hpp" namespace geos @@ -173,27 +174,21 @@ void PeriodicEvent::checkOptionalFunctionThreshold( real64 const time, // Because the function applied to an object may differ by rank, synchronize // (Note: this shouldn't occur very often, since it is only called if the base forecast <= 0) #ifdef GEOS_USE_MPI - real64 result_global; switch( m_functionStatOption ) { case 0: { - MPI_Allreduce( &result, &result_global, 1, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD ); - result = result_global; + result = MpiWrapper::min( result, MPI_COMM_WORLD ); break; } case 1: { - int nprocs; - MPI_Comm_size( MPI_COMM_WORLD, &nprocs ); - MPI_Allreduce( &result, &result_global, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD ); - result = result_global / nprocs; + result = MpiWrapper::sum( result, MPI_COMM_WORLD ) / MPI_Comm_size( MPI_COMM_WORLD ); break; } case 2: { - MPI_Allreduce( &result, &result_global, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD ); - result = result_global; + result = MpiWrapper::max( result, MPI_COMM_WORLD ); } } #endif diff --git a/src/coreComponents/physicsSolvers/FieldStatisticsBase.hpp b/src/coreComponents/physicsSolvers/FieldStatisticsBase.hpp index 13390fe314c..a7f6e82860e 100644 --- a/src/coreComponents/physicsSolvers/FieldStatisticsBase.hpp +++ b/src/coreComponents/physicsSolvers/FieldStatisticsBase.hpp @@ -99,7 +99,7 @@ class FieldStatisticsBase : public TaskBase makeDirsForPath( m_outputDir ); } // wait till the dir is created by rank 0 - MPI_Barrier( MPI_COMM_WORLD ); + MpiWrapper::barrier( MPI_COMM_WORLD ); } } diff --git a/src/coreComponents/physicsSolvers/fluidFlow/wells/WellSolverBase.cpp b/src/coreComponents/physicsSolvers/fluidFlow/wells/WellSolverBase.cpp index 060ff895e73..06e31b14f66 100644 --- a/src/coreComponents/physicsSolvers/fluidFlow/wells/WellSolverBase.cpp +++ b/src/coreComponents/physicsSolvers/fluidFlow/wells/WellSolverBase.cpp @@ -111,7 +111,7 @@ void WellSolverBase::postInputInitialization() makeDirsForPath( m_ratesOutputDir ); } // wait till the dir is created by rank 0 - MPI_Barrier( MPI_COMM_WORLD ); + MpiWrapper::barrier( MPI_COMM_WORLD ); } } diff --git a/src/coreComponents/physicsSolvers/solidMechanics/SolidMechanicsMPM.cpp b/src/coreComponents/physicsSolvers/solidMechanics/SolidMechanicsMPM.cpp index bf1814610e5..9611cc21f18 100644 --- a/src/coreComponents/physicsSolvers/solidMechanics/SolidMechanicsMPM.cpp +++ b/src/coreComponents/physicsSolvers/solidMechanics/SolidMechanicsMPM.cpp @@ -577,12 +577,12 @@ void SolidMechanicsMPM::initialize( NodeManager & nodeManager, BCTableSize = BCTable1D.size(); } - MPI_Bcast( &BCTableSize, 1, MPI_INT, 0, MPI_COMM_GEOS ); // Broadcast the size of BCTable1D to other processes + MpiWrapper::bcast( &BCTableSize, 1, 0, MPI_COMM_GEOS ); // Broadcast the size of BCTable1D to other processes if( rank != 0 ) // All processes except for root resize their versions of BCTable1D { BCTable1D.resize( BCTableSize ); } - MPI_Bcast( BCTable1D.data(), BCTableSize, MPI_DOUBLE, 0, MPI_COMM_GEOS ); // Broadcast BCTable1D to other processes + MpiWrapper::bcast( BCTable1D.data(), BCTableSize, 0, MPI_COMM_GEOS ); // Broadcast BCTable1D to other processes // Technically don't need to reshape BCTable1D into a 2D array, but it makes things more readable and should have little runtime penalty m_bcTable.resize( BCTableSize / 7, 7 ); // Initialize size of m_BCTable @@ -618,12 +618,12 @@ void SolidMechanicsMPM::initialize( NodeManager & nodeManager, FTableSize = FTable1D.size(); } - MPI_Bcast( &FTableSize, 1, MPI_INT, 0, MPI_COMM_GEOS ); // Broadcast the size of FTable1D to other processes + MpiWrapper::bcast( &FTableSize, 1, 0, MPI_COMM_GEOS ); // Broadcast the size of FTable1D to other processes if( rank != 0 ) // All processes except for root resize their versions of FTable1D { FTable1D.resize( FTableSize ); } - MPI_Bcast( FTable1D.data(), FTableSize, MPI_DOUBLE, 0, MPI_COMM_GEOS ); // Broadcast FTable1D to other processes + MpiWrapper::bcast( FTable1D.data(), FTableSize, 0, MPI_COMM_GEOS ); // Broadcast FTable1D to other processes // Techinically don't need to reshape FTable1D into a 2D array, but it makes things more readable and should have little runtime penalty m_fTable.resize( FTableSize / 4, 4 ); // Initialize size of m_fTable @@ -823,12 +823,7 @@ void SolidMechanicsMPM::initialize( NodeManager & nodeManager, { localMinMass = DBL_MAX; } - MPI_Allreduce( &localMinMass, - &globalMinMass, - 1, - MPI_DOUBLE, - MPI_MIN, - MPI_COMM_GEOS ); + globalMinMass = MpiWrapper::min( localMinMass, MPI_COMM_GEOS ); m_smallMass = fmin( globalMinMass * 1.0e-12, m_smallMass ); // Initialize deformation gradient and velocity gradient @@ -884,12 +879,7 @@ void SolidMechanicsMPM::initialize( NodeManager & nodeManager, } } } ); - MPI_Allreduce( &maxLocalGroupNumber, - &maxGlobalGroupNumber, - 1, - MPI_INT, - MPI_MAX, - MPI_COMM_GEOS ); + maxGlobalGroupNumber = MpiWrapper::max( maxLocalGroupNumber, MPI_COMM_GEOS ); // Number of contact groups m_numContactGroups = maxGlobalGroupNumber + 1; @@ -1465,12 +1455,7 @@ void SolidMechanicsMPM::applyEssentialBCs( const real64 dt, real64 globalFaceReactions[6]; for( int face = 0; face < 6; face++ ) { - MPI_Allreduce( &localFaceReactions[face], - &globalFaceReactions[face], - 1, - MPI_DOUBLE, - MPI_SUM, - MPI_COMM_GEOS ); + globalFaceReactions[face] = MpiWrapper::sum( localFaceReactions[face], MPI_COMM_GEOS ); } // Get end-of-step domain dimensions - note that m_domainExtent is updated later @@ -1956,7 +1941,7 @@ void SolidMechanicsMPM::solverProfiling( std::string label ) { if( m_solverProfiling >= 1 ) { - MPI_Barrier( MPI_COMM_GEOS ); + MpiWrapper::barrier( MPI_COMM_GEOS ); GEOS_LOG_RANK_IF( m_solverProfiling == 2, label ); m_profilingTimes.push_back( MPI_Wtime() ); m_profilingLabels.push_back( label ); @@ -2195,18 +2180,8 @@ void SolidMechanicsMPM::optimizeBinSort( ParticleManager & particleManager ) real64 globalWeightedMultiplier; int localNumberOfParticles = particleManager.getNumberOfParticles(); real64 localWeightedMultiplier = optimalMultiplier * localNumberOfParticles; - MPI_Allreduce( &localWeightedMultiplier, - &globalWeightedMultiplier, - 1, - MPI_DOUBLE, - MPI_SUM, - MPI_COMM_GEOS ); - MPI_Allreduce( &localNumberOfParticles, - &globalNumberOfParticles, - 1, - MPI_INT, - MPI_SUM, - MPI_COMM_GEOS ); + globalWeightedMultiplier = MpiWrapper::sum( localWeightedMultiplier, MPI_COMM_GEOS ); + globalNumberOfParticles = MpiWrapper::sum( localNumberOfParticles, MPI_COMM_GEOS ); // Set bin size multiplier m_binSizeMultiplier = std::max( (int) std::round( globalWeightedMultiplier / globalNumberOfParticles ), 1 ); @@ -2752,15 +2727,7 @@ void SolidMechanicsMPM::computeAndWriteBoxAverage( const real64 dt, // so file is directly plottable in excel as CSV or something. for( localIndex i = 0; i < 9; i++ ) { - real64 localSum = boxSums[i]; - real64 globalSum; - MPI_Allreduce( &localSum, - &globalSum, - 1, - MPI_DOUBLE, - MPI_SUM, - MPI_COMM_GEOS ); - boxSums[i] = globalSum; + boxSums[i] = MpiWrapper::sum( boxSums[i], MPI_COMM_GEOS ); } int rank; @@ -3332,7 +3299,7 @@ void SolidMechanicsMPM::printProfilingResults() } // Print out solver profiling - MPI_Barrier( MPI_COMM_GEOS ); + MpiWrapper::barrier( MPI_COMM_GEOS ); if( rank == 0 ) { std::cout << "---------------------------------------------" << std::endl; From 24f60b79ee91a58f3839a005dc964887467baa6c Mon Sep 17 00:00:00 2001 From: kdrienCG Date: Thu, 18 Jun 2026 09:48:47 +0200 Subject: [PATCH 11/13] store stacktraces on desync detections --- src/coreComponents/common/MpiWrapper.cpp | 23 +++++++++++++++++++++++ src/coreComponents/common/MpiWrapper.hpp | 16 +++++++++------- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/src/coreComponents/common/MpiWrapper.cpp b/src/coreComponents/common/MpiWrapper.cpp index 11f22c3fc9c..b268879f551 100644 --- a/src/coreComponents/common/MpiWrapper.cpp +++ b/src/coreComponents/common/MpiWrapper.cpp @@ -18,6 +18,7 @@ */ #include "MpiWrapper.hpp" +#include "LvArray/src/system.hpp" #include #if defined(__clang__) @@ -38,6 +39,28 @@ MPI_Comm MPI_COMM_GEOS; int MPI_COMM_GEOS = 0; #endif +#ifdef GEOS_USE_MPI +#ifdef GEOS_USE_MPI_DESYNC_DETECTION +std::string g_currentStacktrace; +std::string g_lastSuccessfulStacktrace; + +void MpiWrapper::saveStackTrace() +{ + g_currentStacktrace = LvArray::system::stackTrace( true ); +} + +void MpiWrapper::detectMpiDesync( MPI_Comm const & MPI_PARAM( comm ), int operationId ) +{ + int minId = operationId; MPI_Allreduce( MPI_IN_PLACE, &minId, 1, MPI_INT, MPI_MIN, comm ); + int maxId = operationId; MPI_Allreduce( MPI_IN_PLACE, &maxId, 1, MPI_INT, MPI_MAX, comm ); + if( minId != maxId ) + { + MPI_Abort( comm, 1 ); + } + g_lastSuccessfulStacktrace = g_currentStacktrace; +} +#endif + void MpiWrapper::barrier( MPI_Comm const & MPI_PARAM( comm ) ) { #ifdef GEOS_USE_MPI diff --git a/src/coreComponents/common/MpiWrapper.hpp b/src/coreComponents/common/MpiWrapper.hpp index f852f40e184..73c78643c50 100644 --- a/src/coreComponents/common/MpiWrapper.hpp +++ b/src/coreComponents/common/MpiWrapper.hpp @@ -832,7 +832,9 @@ struct MpiWrapper explicit MpiDesyncGuard( MPI_Comm const & comm ) : m_comm( comm ) , m_operationId( ++g_collectiveOperationCounter ) - {} + { + saveStackTrace(); + } ~MpiDesyncGuard() { @@ -848,12 +850,12 @@ struct MpiWrapper * @param[in] comm The MPI_Comm over which the gather operates. * @param[in] operationId The current collective operation counter of this rank. */ - inline static void detectMpiDesync( MPI_Comm const & MPI_PARAM( comm ), int operationId ) - { - int minId = operationId; MPI_Allreduce( MPI_IN_PLACE, &minId, 1, MPI_INT, MPI_MIN, comm ); - int maxId = operationId; MPI_Allreduce( MPI_IN_PLACE, &maxId, 1, MPI_INT, MPI_MAX, comm ); - if( minId != maxId ) { MPI_Abort( comm, 1 ); } - } + static void detectMpiDesync( MPI_Comm const & MPI_PARAM( comm ), int operationId ); + + /** + * @brief Save the stack trace for desync diagnostics for MPI collective calls. + */ + static void saveStackTrace(); #endif }; From b2574efaf9e0222c49f536210a3b1bf4dc5c48df Mon Sep 17 00:00:00 2001 From: kdrienCG Date: Fri, 19 Jun 2026 09:00:35 +0200 Subject: [PATCH 12/13] wip: use a timeout as desync detection method --- src/coreComponents/common/MpiWrapper.cpp | 1158 +++++++++++----------- src/coreComponents/common/MpiWrapper.hpp | 36 +- 2 files changed, 626 insertions(+), 568 deletions(-) diff --git a/src/coreComponents/common/MpiWrapper.cpp b/src/coreComponents/common/MpiWrapper.cpp index b268879f551..bcbf5e28052 100644 --- a/src/coreComponents/common/MpiWrapper.cpp +++ b/src/coreComponents/common/MpiWrapper.cpp @@ -1,554 +1,604 @@ -/* - * ------------------------------------------------------------------------------------------------------------ - * SPDX-License-Identifier: LGPL-2.1-only - * - * Copyright (c) 2016-2024 Lawrence Livermore National Security LLC - * Copyright (c) 2018-2024 TotalEnergies - * Copyright (c) 2018-2024 The Board of Trustees of the Leland Stanford Junior University - * Copyright (c) 2023-2024 Chevron - * Copyright (c) 2019- GEOS/GEOSX Contributors - * All rights reserved - * - * See top level LICENSE, COPYRIGHT, CONTRIBUTORS, NOTICE, and ACKNOWLEDGEMENTS files for details. - * ------------------------------------------------------------------------------------------------------------ - */ - -/** - * @file MpiWrapper.cpp - */ - -#include "MpiWrapper.hpp" -#include "LvArray/src/system.hpp" -#include - -#if defined(__clang__) - #pragma clang diagnostic push - #pragma clang diagnostic ignored "-Wunused-parameter" -#elif defined(__GNUC__) - #pragma GCC diagnostic push - #pragma GCC diagnostic ignored "-Wunused-parameter" -#endif - - -namespace geos -{ - -#ifdef GEOS_USE_MPI -MPI_Comm MPI_COMM_GEOS; -#else -int MPI_COMM_GEOS = 0; -#endif - -#ifdef GEOS_USE_MPI -#ifdef GEOS_USE_MPI_DESYNC_DETECTION -std::string g_currentStacktrace; -std::string g_lastSuccessfulStacktrace; - -void MpiWrapper::saveStackTrace() -{ - g_currentStacktrace = LvArray::system::stackTrace( true ); -} - -void MpiWrapper::detectMpiDesync( MPI_Comm const & MPI_PARAM( comm ), int operationId ) -{ - int minId = operationId; MPI_Allreduce( MPI_IN_PLACE, &minId, 1, MPI_INT, MPI_MIN, comm ); - int maxId = operationId; MPI_Allreduce( MPI_IN_PLACE, &maxId, 1, MPI_INT, MPI_MAX, comm ); - if( minId != maxId ) - { - MPI_Abort( comm, 1 ); - } - g_lastSuccessfulStacktrace = g_currentStacktrace; -} -#endif - -void MpiWrapper::barrier( MPI_Comm const & MPI_PARAM( comm ) ) -{ -#ifdef GEOS_USE_MPI -#ifdef GEOS_USE_MPI_DESYNC_DETECTION - MpiDesyncGuard const mpiDesyncGuard( comm ); -#endif - MPI_Barrier( comm ); -#endif -} - -int MpiWrapper::cartCoords( MPI_Comm comm, int rank, int maxdims, int coords[] ) -{ -#ifdef GEOS_USE_MPI - return MPI_Cart_coords( comm, rank, maxdims, coords ); -#else - return 0; -#endif -} - -int MpiWrapper::cartCreate( MPI_Comm comm_old, int ndims, const int dims[], const int periods[], - int reorder, MPI_Comm * comm_cart ) -{ -#ifdef GEOS_USE_MPI - return MPI_Cart_create( comm_old, ndims, dims, periods, reorder, comm_cart ); -#else - return 0; -#endif -} - -int MpiWrapper::cartRank( MPI_Comm comm, const int coords[] ) -{ - int rank = 0; -#ifdef GEOS_USE_MPI - MPI_Cart_rank( comm, coords, &rank ); -#endif - return rank; -} - -void MpiWrapper::commFree( MPI_Comm & comm ) -{ -#ifdef GEOS_USE_MPI - MPI_CHECK_ERROR( MPI_Comm_free( &comm ) ); -#else -// comm = MPI_COMM_NULL; -#endif -} - -int MpiWrapper::commRank( MPI_Comm const & MPI_PARAM( comm ) ) -{ - int rank = 0; -#ifdef GEOS_USE_MPI - MPI_Comm_rank( comm, &rank ); -#endif - return rank; -} - -int MpiWrapper::commSize( MPI_Comm const & MPI_PARAM( comm ) ) -{ - int size = 1; -#ifdef GEOS_USE_MPI - MPI_Comm_size( comm, &size ); -#endif - return size; -} - -bool MpiWrapper::commCompare( MPI_Comm const & comm1, MPI_Comm const & comm2 ) -{ -#ifdef GEOS_USE_MPI - int result; - MPI_Comm_compare( comm1, comm2, &result ); - return result == MPI_IDENT || result == MPI_CONGRUENT; -#else - return comm1 == comm2; -#endif -} - -bool MpiWrapper::initialized() -{ -#ifdef GEOS_USE_MPI - int ret = false; - MPI_CHECK_ERROR( MPI_Initialized( &ret ) ); - return ret; -#else - return false; -#endif -} - -int MpiWrapper::init( int * argc, char * * * argv ) -{ -#ifdef GEOS_USE_MPI - return MPI_Init( argc, argv ); -#else - return 0; -#endif -} - -internal::ManagedResources & internal::getManagedResources() -{ - static ManagedResources instance; - return instance; -} - -void internal::ManagedResources::finalize() -{ - for( MPI_Op resource : m_mpiOps ) - { - MPI_CHECK_ERROR( MPI_Op_free( &resource ) ); - } - m_mpiOps.clear(); - - for( MPI_Datatype resource : m_mpiTypes ) - { - MPI_CHECK_ERROR( MPI_Type_free( &resource ) ); - } - m_mpiTypes.clear(); -} - -void MpiWrapper::finalize() -{ -#ifdef GEOS_USE_MPI - MpiWrapper::commFree( MPI_COMM_GEOS ); - internal::getManagedResources().finalize(); - MPI_CHECK_ERROR( MPI_Finalize() ); -#endif -} - - -MPI_Comm MpiWrapper::commDup( MPI_Comm const comm ) -{ -#ifdef GEOS_USE_MPI - MPI_Comm duplicate; - MPI_CHECK_ERROR( MPI_Comm_dup( comm, &duplicate ) ); - return duplicate; -#else - return comm; -#endif -} - -MPI_Comm MpiWrapper::commSplit( MPI_Comm const comm, int color, int key ) -{ -#ifdef GEOS_USE_MPI - MPI_Comm scomm; - MPI_CHECK_ERROR( MPI_Comm_split( comm, color, key, &scomm ) ); - return scomm; -#else - return comm; -#endif -} - -int MpiWrapper::test( MPI_Request * request, int * flag, MPI_Status * status ) -{ -#ifdef GEOS_USE_MPI - return MPI_Test( request, flag, status ); -#else - *flag = 0; - return 0; -#endif -} - -int MpiWrapper::testAny( int count, MPI_Request array_of_requests[], int * idx, int * flag, MPI_Status array_of_statuses[] ) -{ -#ifdef GEOS_USE_MPI - return MPI_Testany( count, array_of_requests, idx, flag, array_of_statuses ); -#else - *flag = 0; - return 0; -#endif -} - -int MpiWrapper::testSome( int count, MPI_Request array_of_requests[], int * outcount, int array_of_indices[], MPI_Status array_of_statuses[] ) -{ -#ifdef GEOS_USE_MPI - return MPI_Testsome( count, array_of_requests, outcount, array_of_indices, array_of_statuses ); -#else - *outcount = 0; - return 0; -#endif -} - -int MpiWrapper::testAll( int count, MPI_Request array_of_requests[], int * flag, MPI_Status array_of_statuses[] ) -{ -#ifdef GEOS_USE_MPI - return MPI_Testall( count, array_of_requests, flag, array_of_statuses ); -#else - *flag = 0; - return 0; -#endif -} - -int MpiWrapper::check( MPI_Request * request, int * flag, MPI_Status * status ) -{ -#ifdef GEOS_USE_MPI - return MPI_Request_get_status( *request, flag, status ); -#else - *flag = 0; - return 0; -#endif -} - -int MpiWrapper::checkAny( int count, MPI_Request array_of_requests[], int * idx, int * flag, MPI_Status array_of_statuses[] ) -{ -#ifdef GEOS_USE_MPI - bool found = false; - int flagCache = -1; - int rval = MPI_SUCCESS; - stdVector< int > rvals( count ); - for( int jdx = 0; jdx < count; ++jdx ) - { - *flag = 0; - rvals[ jdx ] = MPI_Request_get_status( array_of_requests[ jdx ], flag, &array_of_statuses[ jdx ] ); - if( *flag && !found ) - { - *idx = jdx; - flagCache = *flag; - } - if( rvals[ jdx ] != MPI_SUCCESS ) - { - rval = rvals[ jdx ]; - } - } - if( found ) - { - *flag = flagCache; - } - return rval; -#else - *flag = 0; - return 0; -#endif -} - -int MpiWrapper::checkAll( int count, MPI_Request array_of_requests[], int * flag, MPI_Status array_of_statuses[] ) -{ -#ifdef GEOS_USE_MPI - // assume all passing, any that don't pass set the flag to false - *flag = 1; - int rval = MPI_SUCCESS; - stdVector< int > rvals( count ); - int iFlag = 0; - for( int idx = 0; idx < count; ++idx ) - { - rvals[ idx ] = MPI_Request_get_status( array_of_requests[ idx ], &iFlag, &array_of_statuses[ idx ] ); - if( !iFlag ) - { - *flag = iFlag; - } - if( rvals[ idx ] != MPI_SUCCESS ) - { - rval = rvals[ idx ]; - } - } - return rval; -#else - *flag = 0; - return 0; -#endif -} - -int MpiWrapper::wait( MPI_Request * request, MPI_Status * status ) -{ -#ifdef GEOS_USE_MPI - return MPI_Wait( request, status ); -#else - return 0; -#endif -} - -int MpiWrapper::waitAny( int count, MPI_Request array_of_requests[], int * indx, MPI_Status array_of_statuses[] ) -{ -#ifdef GEOS_USE_MPI - return MPI_Waitany( count, array_of_requests, indx, array_of_statuses ); -#else - return 0; -#endif -} - -int MpiWrapper::waitSome( int count, MPI_Request array_of_requests[], int * outcount, int array_of_indices[], MPI_Status array_of_statuses[] ) -{ -#ifdef GEOS_USE_MPI - return MPI_Waitsome( count, array_of_requests, outcount, array_of_indices, array_of_statuses ); -#else - // *outcount = 0; - return 0; -#endif -} - -int MpiWrapper::waitAll( int count, MPI_Request array_of_requests[], MPI_Status array_of_statuses[] ) -{ -#ifdef GEOS_USE_MPI - return MPI_Waitall( count, array_of_requests, array_of_statuses ); -#else - return 0; -#endif -} - -double MpiWrapper::wtime( void ) -{ -#ifdef GEOS_USE_MPI - return MPI_Wtime( ); -#else - return 0; -#endif - -} - -int MpiWrapper::activeWaitAny( const int count, MPI_Request array_of_requests[], MPI_Status array_of_statuses[], std::function< MPI_Request ( int ) > func ) -{ - int cmp = 0; - while( cmp < count ) - { - int idx = 0; - int err = waitAny( count, array_of_requests, &idx, array_of_statuses ); - if( err != MPI_SUCCESS ) - return err; - if( idx != MPI_UNDEFINED ) // only if all(requests == MPI_REQUEST_NULL) - { - func( idx ); - } - cmp++; - } - return MPI_SUCCESS; -} - -int MpiWrapper::activeWaitSome( const int count, - MPI_Request array_of_requests[], - MPI_Status array_of_statuses[], - std::function< MPI_Request ( int ) > func ) -{ - int cmp = 0; - while( cmp < count ) - { - int rcvd = 0; - stdVector< int > indices( count, -1 ); - int err = waitSome( count, array_of_requests, &rcvd, &indices[0], array_of_statuses ); - if( err != MPI_SUCCESS ) - return err; - if( rcvd > 0 ) - { - for( int ii = 0; ii < rcvd; ++ii ) - { - if( indices[ii] != MPI_UNDEFINED ) - { - func( indices[ii] ); - } - } - } - cmp += rcvd; - } - return MPI_SUCCESS; -} - - -int MpiWrapper::activeWaitSomeCompletePhase( const int participants, - stdVector< std::tuple< MPI_Request *, MPI_Status *, std::function< MPI_Request ( int ) > > > const & phases ) -{ - const int num_phases = phases.size(); - int err = 0; - for( int phase = 0; phase < num_phases; ++phase ) - { - MPI_Request * const requests = std::get< 0 >( phases[phase] ); - MPI_Status * const statuses = std::get< 1 >( phases[phase] ); - std::function< MPI_Request ( int ) > func = std::get< 2 >( phases[phase] ); - if( requests!=nullptr ) - { - err = activeWaitSome( participants, - requests, - statuses, - func ); - } - else - { - for( int idx = 0; idx < participants; ++idx ) - { - func( idx ); - } - } - if( err != MPI_SUCCESS ) - break; - } - return err; -} - -int MpiWrapper::activeWaitOrderedCompletePhase( const int participants, - stdVector< std::tuple< MPI_Request *, MPI_Status *, std::function< MPI_Request ( int ) > > > const & phases ) -{ - const int num_phases = phases.size(); - for( int phase = 0; phase < num_phases; ++phase ) - { - MPI_Request * const requests = std::get< 0 >( phases[phase] ); - MPI_Status * const statuses = std::get< 1 >( phases[phase] ); - std::function< MPI_Request ( int ) > func = std::get< 2 >( phases[phase] ); - - for( int idx = 0; idx < participants; ++idx ) - { - if( requests!=nullptr ) - { - wait( &requests[idx], &statuses[idx] ); - } - func( idx ); - } - } - return MPI_SUCCESS; -} - -int MpiWrapper::nodeCommSize() -{ - // if not initialized then we guess there is no MPI. - if( !initialized() ) - return 1; - - int len; - std::array< char, MPI_MAX_PROCESSOR_NAME + 1 > hostname; - MPI_Get_processor_name( hostname.data(), &len ); - hostname[len] = '\0'; - int color = (int)std::hash< string >{} (hostname.data()); - if( color < 0 ) - color *= -1; - - /** - * Create intra-node communicator - */ - MPI_Comm nodeComm; - int nodeCommSize; - MPI_Comm_split( MPI_COMM_WORLD, color, -1, &nodeComm ); - MPI_Comm_size( nodeComm, &nodeCommSize ); - return nodeCommSize; -} - -namespace internal -{ - -template< typename FIRST, typename SECOND > -MPI_Datatype getMpiCustomPairType() -{ - static auto const createTypeHolder = [] () { - using PAIR_T = MpiWrapper::PairType< FIRST, SECOND >; - static_assert( std::is_standard_layout_v< PAIR_T > ); - static_assert( std::is_trivially_copyable_v< PAIR_T > ); - - MPI_Datatype types[2] = { getMpiType< FIRST >(), getMpiType< SECOND >() }; - MPI_Aint offsets[2] = { offsetof( PAIR_T, first ), offsetof( PAIR_T, second ) }; - int blocksCount[2] = { 1, 1 }; - - MPI_Datatype mpiType; - GEOS_ERROR_IF_NE( MPI_Type_create_struct( 2, blocksCount, offsets, types, &mpiType ), MPI_SUCCESS ); - GEOS_ERROR_IF_NE( MPI_Type_commit( &mpiType ), MPI_SUCCESS ); - // Resource registered to be destroyed at MpiWrapper::finalize(). - internal::getManagedResources().m_mpiTypes.emplace( mpiType ); - return mpiType; - }; - // Static storage to ensure the MPI operation is created only once and reused for all calls to this function. - static MPI_Datatype mpiType{ createTypeHolder() }; - return mpiType; -} - -template<> MPI_Datatype getMpiPairType< int, int >() -{ return MPI_2INT; } - -template<> MPI_Datatype getMpiPairType< long int, int >() -{ return MPI_LONG_INT; } - -template<> MPI_Datatype getMpiPairType< long int, long int >() -{ return getMpiCustomPairType< long int, long int >(); } - -template<> MPI_Datatype getMpiPairType< long long int, long long int >() -{ return getMpiCustomPairType< long long int, long long int >(); } - -template<> MPI_Datatype getMpiPairType< float, int >() -{ return MPI_FLOAT_INT; } - -template<> MPI_Datatype getMpiPairType< double, int >() -{ return MPI_DOUBLE_INT; } - -template<> MPI_Datatype getMpiPairType< double, long int >() -{ return getMpiCustomPairType< double, long int >(); } - -template<> MPI_Datatype getMpiPairType< double, long long int >() -{ return getMpiCustomPairType< double, long long int >(); } - -template<> MPI_Datatype getMpiPairType< double, double >() -{ return getMpiCustomPairType< double, double >(); } - -} /* namespace internal */ - -} /* namespace geos */ - -#if defined(__clang__) - #pragma clang diagnostic pop -#elif defined(__GNUC__) - #pragma GCC diagnostic pop -#endif +/* + * ------------------------------------------------------------------------------------------------------------ + * SPDX-License-Identifier: LGPL-2.1-only + * + * Copyright (c) 2016-2024 Lawrence Livermore National Security LLC + * Copyright (c) 2018-2024 TotalEnergies + * Copyright (c) 2018-2024 The Board of Trustees of the Leland Stanford Junior University + * Copyright (c) 2023-2024 Chevron + * Copyright (c) 2019- GEOS/GEOSX Contributors + * All rights reserved + * + * See top level LICENSE, COPYRIGHT, CONTRIBUTORS, NOTICE, and ACKNOWLEDGEMENTS files for details. + * ------------------------------------------------------------------------------------------------------------ + */ + +/** + * @file MpiWrapper.cpp + */ + +#include "MpiWrapper.hpp" +#include "LvArray/src/system.hpp" +#include + +#if defined(__clang__) + #pragma clang diagnostic push + #pragma clang diagnostic ignored "-Wunused-parameter" +#elif defined(__GNUC__) + #pragma GCC diagnostic push + #pragma GCC diagnostic ignored "-Wunused-parameter" +#endif + + +namespace geos +{ + +#ifdef GEOS_USE_MPI +MPI_Comm MPI_COMM_GEOS; +#else +int MPI_COMM_GEOS = 0; +#endif + +#if defined( GEOS_USE_MPI ) && defined( GEOS_USE_MPI_DESYNC_DETECTION ) +std::string g_currentStacktrace; +std::string g_lastSuccessfulStacktrace; + +void MpiWrapper::saveStackTrace() +{ + g_currentStacktrace = LvArray::system::stackTrace( true ); +} + +void MpiWrapper::MpiDesyncGuard::failed() +{ + std::cerr << g_lastSuccessfulStacktrace << '\n' << g_currentStacktrace; + MPI_Abort( m_comm, 1 ); +} + +void MpiWrapper::MpiDesyncGuard::succeeded() +{ + // the MPI_Barrier did not hang + this->m_collectiveOperationSuccess = true; + g_lastSuccessfulStacktrace = g_currentStacktrace; +} + +void MpiWrapper::MpiDesyncGuard::timeout() +{ + std::this_thread::sleep_for( std::chrono::seconds( 10 ) ); + if( !this->m_collectiveOperationSuccess ) + { + failed(); + } +} + +// void MpiWrapper::MpiDesyncGuard::detectMpiDesync() +// { +// // start a timeout for the following MPI_Barrier +// std::thread timeout( &MpiWrapper::MpiDesyncGuard::timeout, this ); +// MPI_Barrier( this->m_comm ); +// succeeded(); +// } + +void MpiWrapper::MpiDesyncGuard::detectMpiDesync() +{ + // start a timeout for the following MPI_Barrier + std::thread timeout( [this]{ + std::this_thread::sleep_for( std::chrono::seconds( 10 ) ); + if( !this->m_collectiveOperationSuccess ) + { + failed(); + } + } ); + MPI_Barrier( this->m_comm ); + succeeded(); +} + +// void MpiWrapper::MpiDesyncGuard::detectMpiDesync() +// { +// // start a timeout for the following MPI_Barrier +// std::thread timeout( [this]{ +// std::this_thread::sleep_for( std::chrono::seconds( 10 ) ); +// if( !this->m_collectiveOperationSuccess ) +// { +// // (Timeout reached, desync detected) +// std::cerr << g_lastSuccessfulStacktrace << '\n' << g_currentStacktrace; +// MPI_Abort( m_comm, 1 ); +// } +// } ); +// MPI_Barrier( this->m_comm ); +// // Successful operation for this rank +// this->m_collectiveOperationSuccess = true; +// g_lastSuccessfulStacktrace = g_currentStacktrace; +// } +#endif + +void MpiWrapper::barrier( MPI_Comm const & MPI_PARAM( comm ) ) +{ +#ifdef GEOS_USE_MPI +#ifdef GEOS_USE_MPI_DESYNC_DETECTION + MpiDesyncGuard const mpiDesyncGuard( comm ); +#endif + MPI_Barrier( comm ); +#endif +} + +int MpiWrapper::cartCoords( MPI_Comm comm, int rank, int maxdims, int coords[] ) +{ +#ifdef GEOS_USE_MPI + return MPI_Cart_coords( comm, rank, maxdims, coords ); +#else + return 0; +#endif +} + +int MpiWrapper::cartCreate( MPI_Comm comm_old, int ndims, const int dims[], const int periods[], + int reorder, MPI_Comm * comm_cart ) +{ +#ifdef GEOS_USE_MPI + return MPI_Cart_create( comm_old, ndims, dims, periods, reorder, comm_cart ); +#else + return 0; +#endif +} + +int MpiWrapper::cartRank( MPI_Comm comm, const int coords[] ) +{ + int rank = 0; +#ifdef GEOS_USE_MPI + MPI_Cart_rank( comm, coords, &rank ); +#endif + return rank; +} + +void MpiWrapper::commFree( MPI_Comm & comm ) +{ +#ifdef GEOS_USE_MPI + MPI_CHECK_ERROR( MPI_Comm_free( &comm ) ); +#else +// comm = MPI_COMM_NULL; +#endif +} + +int MpiWrapper::commRank( MPI_Comm const & MPI_PARAM( comm ) ) +{ + int rank = 0; +#ifdef GEOS_USE_MPI + MPI_Comm_rank( comm, &rank ); +#endif + return rank; +} + +int MpiWrapper::commSize( MPI_Comm const & MPI_PARAM( comm ) ) +{ + int size = 1; +#ifdef GEOS_USE_MPI + MPI_Comm_size( comm, &size ); +#endif + return size; +} + +bool MpiWrapper::commCompare( MPI_Comm const & comm1, MPI_Comm const & comm2 ) +{ +#ifdef GEOS_USE_MPI + int result; + MPI_Comm_compare( comm1, comm2, &result ); + return result == MPI_IDENT || result == MPI_CONGRUENT; +#else + return comm1 == comm2; +#endif +} + +bool MpiWrapper::initialized() +{ +#ifdef GEOS_USE_MPI + int ret = false; + MPI_CHECK_ERROR( MPI_Initialized( &ret ) ); + return ret; +#else + return false; +#endif +} + +int MpiWrapper::init( int * argc, char * * * argv ) +{ +#ifdef GEOS_USE_MPI + return MPI_Init( argc, argv ); +#else + return 0; +#endif +} + +internal::ManagedResources & internal::getManagedResources() +{ + static ManagedResources instance; + return instance; +} + +void internal::ManagedResources::finalize() +{ + for( MPI_Op resource : m_mpiOps ) + { + MPI_CHECK_ERROR( MPI_Op_free( &resource ) ); + } + m_mpiOps.clear(); + + for( MPI_Datatype resource : m_mpiTypes ) + { + MPI_CHECK_ERROR( MPI_Type_free( &resource ) ); + } + m_mpiTypes.clear(); +} + +void MpiWrapper::finalize() +{ +#ifdef GEOS_USE_MPI + MpiWrapper::commFree( MPI_COMM_GEOS ); + internal::getManagedResources().finalize(); + MPI_CHECK_ERROR( MPI_Finalize() ); +#endif +} + + +MPI_Comm MpiWrapper::commDup( MPI_Comm const comm ) +{ +#ifdef GEOS_USE_MPI + MPI_Comm duplicate; + MPI_CHECK_ERROR( MPI_Comm_dup( comm, &duplicate ) ); + return duplicate; +#else + return comm; +#endif +} + +MPI_Comm MpiWrapper::commSplit( MPI_Comm const comm, int color, int key ) +{ +#ifdef GEOS_USE_MPI + MPI_Comm scomm; + MPI_CHECK_ERROR( MPI_Comm_split( comm, color, key, &scomm ) ); + return scomm; +#else + return comm; +#endif +} + +int MpiWrapper::test( MPI_Request * request, int * flag, MPI_Status * status ) +{ +#ifdef GEOS_USE_MPI + return MPI_Test( request, flag, status ); +#else + *flag = 0; + return 0; +#endif +} + +int MpiWrapper::testAny( int count, MPI_Request array_of_requests[], int * idx, int * flag, MPI_Status array_of_statuses[] ) +{ +#ifdef GEOS_USE_MPI + return MPI_Testany( count, array_of_requests, idx, flag, array_of_statuses ); +#else + *flag = 0; + return 0; +#endif +} + +int MpiWrapper::testSome( int count, MPI_Request array_of_requests[], int * outcount, int array_of_indices[], MPI_Status array_of_statuses[] ) +{ +#ifdef GEOS_USE_MPI + return MPI_Testsome( count, array_of_requests, outcount, array_of_indices, array_of_statuses ); +#else + *outcount = 0; + return 0; +#endif +} + +int MpiWrapper::testAll( int count, MPI_Request array_of_requests[], int * flag, MPI_Status array_of_statuses[] ) +{ +#ifdef GEOS_USE_MPI + return MPI_Testall( count, array_of_requests, flag, array_of_statuses ); +#else + *flag = 0; + return 0; +#endif +} + +int MpiWrapper::check( MPI_Request * request, int * flag, MPI_Status * status ) +{ +#ifdef GEOS_USE_MPI + return MPI_Request_get_status( *request, flag, status ); +#else + *flag = 0; + return 0; +#endif +} + +int MpiWrapper::checkAny( int count, MPI_Request array_of_requests[], int * idx, int * flag, MPI_Status array_of_statuses[] ) +{ +#ifdef GEOS_USE_MPI + bool found = false; + int flagCache = -1; + int rval = MPI_SUCCESS; + stdVector< int > rvals( count ); + for( int jdx = 0; jdx < count; ++jdx ) + { + *flag = 0; + rvals[ jdx ] = MPI_Request_get_status( array_of_requests[ jdx ], flag, &array_of_statuses[ jdx ] ); + if( *flag && !found ) + { + *idx = jdx; + flagCache = *flag; + } + if( rvals[ jdx ] != MPI_SUCCESS ) + { + rval = rvals[ jdx ]; + } + } + if( found ) + { + *flag = flagCache; + } + return rval; +#else + *flag = 0; + return 0; +#endif +} + +int MpiWrapper::checkAll( int count, MPI_Request array_of_requests[], int * flag, MPI_Status array_of_statuses[] ) +{ +#ifdef GEOS_USE_MPI + // assume all passing, any that don't pass set the flag to false + *flag = 1; + int rval = MPI_SUCCESS; + stdVector< int > rvals( count ); + int iFlag = 0; + for( int idx = 0; idx < count; ++idx ) + { + rvals[ idx ] = MPI_Request_get_status( array_of_requests[ idx ], &iFlag, &array_of_statuses[ idx ] ); + if( !iFlag ) + { + *flag = iFlag; + } + if( rvals[ idx ] != MPI_SUCCESS ) + { + rval = rvals[ idx ]; + } + } + return rval; +#else + *flag = 0; + return 0; +#endif +} + +int MpiWrapper::wait( MPI_Request * request, MPI_Status * status ) +{ +#ifdef GEOS_USE_MPI + return MPI_Wait( request, status ); +#else + return 0; +#endif +} + +int MpiWrapper::waitAny( int count, MPI_Request array_of_requests[], int * indx, MPI_Status array_of_statuses[] ) +{ +#ifdef GEOS_USE_MPI + return MPI_Waitany( count, array_of_requests, indx, array_of_statuses ); +#else + return 0; +#endif +} + +int MpiWrapper::waitSome( int count, MPI_Request array_of_requests[], int * outcount, int array_of_indices[], MPI_Status array_of_statuses[] ) +{ +#ifdef GEOS_USE_MPI + return MPI_Waitsome( count, array_of_requests, outcount, array_of_indices, array_of_statuses ); +#else + // *outcount = 0; + return 0; +#endif +} + +int MpiWrapper::waitAll( int count, MPI_Request array_of_requests[], MPI_Status array_of_statuses[] ) +{ +#ifdef GEOS_USE_MPI + return MPI_Waitall( count, array_of_requests, array_of_statuses ); +#else + return 0; +#endif +} + +double MpiWrapper::wtime( void ) +{ +#ifdef GEOS_USE_MPI + return MPI_Wtime( ); +#else + return 0; +#endif + +} + +int MpiWrapper::activeWaitAny( const int count, MPI_Request array_of_requests[], MPI_Status array_of_statuses[], std::function< MPI_Request ( int ) > func ) +{ + int cmp = 0; + while( cmp < count ) + { + int idx = 0; + int err = waitAny( count, array_of_requests, &idx, array_of_statuses ); + if( err != MPI_SUCCESS ) + return err; + if( idx != MPI_UNDEFINED ) // only if all(requests == MPI_REQUEST_NULL) + { + func( idx ); + } + cmp++; + } + return MPI_SUCCESS; +} + +int MpiWrapper::activeWaitSome( const int count, + MPI_Request array_of_requests[], + MPI_Status array_of_statuses[], + std::function< MPI_Request ( int ) > func ) +{ + int cmp = 0; + while( cmp < count ) + { + int rcvd = 0; + stdVector< int > indices( count, -1 ); + int err = waitSome( count, array_of_requests, &rcvd, &indices[0], array_of_statuses ); + if( err != MPI_SUCCESS ) + return err; + if( rcvd > 0 ) + { + for( int ii = 0; ii < rcvd; ++ii ) + { + if( indices[ii] != MPI_UNDEFINED ) + { + func( indices[ii] ); + } + } + } + cmp += rcvd; + } + return MPI_SUCCESS; +} + + +int MpiWrapper::activeWaitSomeCompletePhase( const int participants, + stdVector< std::tuple< MPI_Request *, MPI_Status *, std::function< MPI_Request ( int ) > > > const & phases ) +{ + const int num_phases = phases.size(); + int err = 0; + for( int phase = 0; phase < num_phases; ++phase ) + { + MPI_Request * const requests = std::get< 0 >( phases[phase] ); + MPI_Status * const statuses = std::get< 1 >( phases[phase] ); + std::function< MPI_Request ( int ) > func = std::get< 2 >( phases[phase] ); + if( requests!=nullptr ) + { + err = activeWaitSome( participants, + requests, + statuses, + func ); + } + else + { + for( int idx = 0; idx < participants; ++idx ) + { + func( idx ); + } + } + if( err != MPI_SUCCESS ) + break; + } + return err; +} + +int MpiWrapper::activeWaitOrderedCompletePhase( const int participants, + stdVector< std::tuple< MPI_Request *, MPI_Status *, std::function< MPI_Request ( int ) > > > const & phases ) +{ + const int num_phases = phases.size(); + for( int phase = 0; phase < num_phases; ++phase ) + { + MPI_Request * const requests = std::get< 0 >( phases[phase] ); + MPI_Status * const statuses = std::get< 1 >( phases[phase] ); + std::function< MPI_Request ( int ) > func = std::get< 2 >( phases[phase] ); + + for( int idx = 0; idx < participants; ++idx ) + { + if( requests!=nullptr ) + { + wait( &requests[idx], &statuses[idx] ); + } + func( idx ); + } + } + return MPI_SUCCESS; +} + +int MpiWrapper::nodeCommSize() +{ + // if not initialized then we guess there is no MPI. + if( !initialized() ) + return 1; + + int len; + std::array< char, MPI_MAX_PROCESSOR_NAME + 1 > hostname; + MPI_Get_processor_name( hostname.data(), &len ); + hostname[len] = '\0'; + int color = (int)std::hash< string >{} (hostname.data()); + if( color < 0 ) + color *= -1; + + /** + * Create intra-node communicator + */ + MPI_Comm nodeComm; + int nodeCommSize; + MPI_Comm_split( MPI_COMM_WORLD, color, -1, &nodeComm ); + MPI_Comm_size( nodeComm, &nodeCommSize ); + return nodeCommSize; +} + +namespace internal +{ + +template< typename FIRST, typename SECOND > +MPI_Datatype getMpiCustomPairType() +{ + static auto const createTypeHolder = [] () { + using PAIR_T = MpiWrapper::PairType< FIRST, SECOND >; + static_assert( std::is_standard_layout_v< PAIR_T > ); + static_assert( std::is_trivially_copyable_v< PAIR_T > ); + + MPI_Datatype types[2] = { getMpiType< FIRST >(), getMpiType< SECOND >() }; + MPI_Aint offsets[2] = { offsetof( PAIR_T, first ), offsetof( PAIR_T, second ) }; + int blocksCount[2] = { 1, 1 }; + + MPI_Datatype mpiType; + GEOS_ERROR_IF_NE( MPI_Type_create_struct( 2, blocksCount, offsets, types, &mpiType ), MPI_SUCCESS ); + GEOS_ERROR_IF_NE( MPI_Type_commit( &mpiType ), MPI_SUCCESS ); + // Resource registered to be destroyed at MpiWrapper::finalize(). + internal::getManagedResources().m_mpiTypes.emplace( mpiType ); + return mpiType; + }; + // Static storage to ensure the MPI operation is created only once and reused for all calls to this function. + static MPI_Datatype mpiType{ createTypeHolder() }; + return mpiType; +} + +template<> MPI_Datatype getMpiPairType< int, int >() +{ return MPI_2INT; } + +template<> MPI_Datatype getMpiPairType< long int, int >() +{ return MPI_LONG_INT; } + +template<> MPI_Datatype getMpiPairType< long int, long int >() +{ return getMpiCustomPairType< long int, long int >(); } + +template<> MPI_Datatype getMpiPairType< long long int, long long int >() +{ return getMpiCustomPairType< long long int, long long int >(); } + +template<> MPI_Datatype getMpiPairType< float, int >() +{ return MPI_FLOAT_INT; } + +template<> MPI_Datatype getMpiPairType< double, int >() +{ return MPI_DOUBLE_INT; } + +template<> MPI_Datatype getMpiPairType< double, long int >() +{ return getMpiCustomPairType< double, long int >(); } + +template<> MPI_Datatype getMpiPairType< double, long long int >() +{ return getMpiCustomPairType< double, long long int >(); } + +template<> MPI_Datatype getMpiPairType< double, double >() +{ return getMpiCustomPairType< double, double >(); } + +} /* namespace internal */ + +} /* namespace geos */ + +#if defined(__clang__) + #pragma clang diagnostic pop +#elif defined(__GNUC__) + #pragma GCC diagnostic pop +#endif diff --git a/src/coreComponents/common/MpiWrapper.hpp b/src/coreComponents/common/MpiWrapper.hpp index 73c78643c50..a660f1a1e3f 100644 --- a/src/coreComponents/common/MpiWrapper.hpp +++ b/src/coreComponents/common/MpiWrapper.hpp @@ -26,6 +26,10 @@ #include +// TODO move in cpp +#include +#include + #if defined(GEOS_USE_MPI) #include #define MPI_PARAM( x ) x @@ -817,9 +821,6 @@ struct MpiWrapper static int allReduce( T const * sendbuf, T * recvbuf, int count, MPI_Op op, MPI_Comm comm = MPI_COMM_GEOS ); #ifdef GEOS_USE_MPI_DESYNC_DETECTION - /// Tag/counter of the latest MPI collective operation to detect MPI desynchronizations - inline static int g_collectiveOperationCounter = 0; - /** * @struct MpiDesyncGuard * @brief RAII helper to detect MPI desynchronizations from MPI collective operations. @@ -827,31 +828,38 @@ struct MpiWrapper struct MpiDesyncGuard { MPI_Comm const & m_comm; - int const m_operationId; + bool m_collectiveOperationSuccess = false; explicit MpiDesyncGuard( MPI_Comm const & comm ) : m_comm( comm ) - , m_operationId( ++g_collectiveOperationCounter ) { - saveStackTrace(); + saveStackTrace(); // Here every rank saves a stacktrace TODO modify } ~MpiDesyncGuard() { - detectMpiDesync( m_comm, m_operationId ); + detectMpiDesync(); } + /** + * @brief Detects MPI desynchronizations from MPI collective operations. + */ + void detectMpiDesync(); + + /** + * TODO + */ + void timeout(); + + // TODO rename TODO is it useful? + void failed(); + // TODO rename TODO is it useful? + void succeeded(); + MpiDesyncGuard( MpiDesyncGuard const & ) = delete; MpiDesyncGuard & operator=( MpiDesyncGuard const & ) = delete; }; - /** - * @brief Detects MPI desynchronizations from MPI collective operations. - * @param[in] comm The MPI_Comm over which the gather operates. - * @param[in] operationId The current collective operation counter of this rank. - */ - static void detectMpiDesync( MPI_Comm const & MPI_PARAM( comm ), int operationId ); - /** * @brief Save the stack trace for desync diagnostics for MPI collective calls. */ From 067d70f6736669e983dd87ea09469722aa871c8f Mon Sep 17 00:00:00 2001 From: kdrienCG Date: Mon, 22 Jun 2026 10:48:01 +0200 Subject: [PATCH 13/13] wip: use a timeout as desync detection method --- src/coreComponents/common/MpiWrapper.cpp | 73 +++++++++--------------- src/coreComponents/common/MpiWrapper.hpp | 17 +++--- 2 files changed, 36 insertions(+), 54 deletions(-) diff --git a/src/coreComponents/common/MpiWrapper.cpp b/src/coreComponents/common/MpiWrapper.cpp index bcbf5e28052..d9d6b1569b8 100644 --- a/src/coreComponents/common/MpiWrapper.cpp +++ b/src/coreComponents/common/MpiWrapper.cpp @@ -19,7 +19,10 @@ #include "MpiWrapper.hpp" #include "LvArray/src/system.hpp" +#include "common/logger/Logger.hpp" #include +#include +#include #if defined(__clang__) #pragma clang diagnostic push @@ -50,65 +53,45 @@ void MpiWrapper::saveStackTrace() void MpiWrapper::MpiDesyncGuard::failed() { - std::cerr << g_lastSuccessfulStacktrace << '\n' << g_currentStacktrace; + GEOS_LOG_RANK_0( GEOS_FMT( "MPI desync detected: rank {) timed out\n" + "{}\n" + "Last successful stacktrace:\n" + "{}", + g_currentStacktrace, g_lastSuccessfulStacktrace ) ); MPI_Abort( m_comm, 1 ); } void MpiWrapper::MpiDesyncGuard::succeeded() { - // the MPI_Barrier did not hang - this->m_collectiveOperationSuccess = true; + m_collectiveOperationSuccess.store( true, std::memory_order_release ); g_lastSuccessfulStacktrace = g_currentStacktrace; } -void MpiWrapper::MpiDesyncGuard::timeout() +void MpiWrapper::MpiDesyncGuard::detectMpiDesync() { - std::this_thread::sleep_for( std::chrono::seconds( 10 ) ); - if( !this->m_collectiveOperationSuccess ) - { - failed(); - } -} + MPI_Request request; + MPI_Ibarrier( m_comm, &request ); -// void MpiWrapper::MpiDesyncGuard::detectMpiDesync() -// { -// // start a timeout for the following MPI_Barrier -// std::thread timeout( &MpiWrapper::MpiDesyncGuard::timeout, this ); -// MPI_Barrier( this->m_comm ); -// succeeded(); -// } + int flag = 0; + double start = MpiWrapper::wtime(); + while( true ) + { + MpiWrapper::test( &request, &flag, MPI_STATUS_IGNORE ); + if( flag ) + { + succeeded(); + return; + } -void MpiWrapper::MpiDesyncGuard::detectMpiDesync() -{ - // start a timeout for the following MPI_Barrier - std::thread timeout( [this]{ - std::this_thread::sleep_for( std::chrono::seconds( 10 ) ); - if( !this->m_collectiveOperationSuccess ) + if( MpiWrapper::wtime() - start > 10 ) { failed(); + return; } - } ); - MPI_Barrier( this->m_comm ); - succeeded(); -} - -// void MpiWrapper::MpiDesyncGuard::detectMpiDesync() -// { -// // start a timeout for the following MPI_Barrier -// std::thread timeout( [this]{ -// std::this_thread::sleep_for( std::chrono::seconds( 10 ) ); -// if( !this->m_collectiveOperationSuccess ) -// { -// // (Timeout reached, desync detected) -// std::cerr << g_lastSuccessfulStacktrace << '\n' << g_currentStacktrace; -// MPI_Abort( m_comm, 1 ); -// } -// } ); -// MPI_Barrier( this->m_comm ); -// // Successful operation for this rank -// this->m_collectiveOperationSuccess = true; -// g_lastSuccessfulStacktrace = g_currentStacktrace; -// } + + std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) ); + } +} #endif void MpiWrapper::barrier( MPI_Comm const & MPI_PARAM( comm ) ) diff --git a/src/coreComponents/common/MpiWrapper.hpp b/src/coreComponents/common/MpiWrapper.hpp index a660f1a1e3f..bbde1726a06 100644 --- a/src/coreComponents/common/MpiWrapper.hpp +++ b/src/coreComponents/common/MpiWrapper.hpp @@ -26,9 +26,7 @@ #include -// TODO move in cpp -#include -#include +#include #if defined(GEOS_USE_MPI) #include @@ -828,7 +826,7 @@ struct MpiWrapper struct MpiDesyncGuard { MPI_Comm const & m_comm; - bool m_collectiveOperationSuccess = false; + std::atomic m_collectiveOperationSuccess{ false }; explicit MpiDesyncGuard( MPI_Comm const & comm ) : m_comm( comm ) @@ -847,13 +845,14 @@ struct MpiWrapper void detectMpiDesync(); /** - * TODO + * @brief Method ran when a desynchronization is detected. + * TODO rename TODO is it useful? */ - void timeout(); - - // TODO rename TODO is it useful? void failed(); - // TODO rename TODO is it useful? + /** + * @brief Method ran when no desynchronizations are detected. + * TODO rename TODO is it useful? + */ void succeeded(); MpiDesyncGuard( MpiDesyncGuard const & ) = delete;