diff --git a/cpp/src/arrow/json/reader_test.cc b/cpp/src/arrow/json/reader_test.cc index a52626413d68..b23d6050716a 100644 --- a/cpp/src/arrow/json/reader_test.cc +++ b/cpp/src/arrow/json/reader_test.cc @@ -290,6 +290,39 @@ TEST(ReaderTest, MultipleChunksParallel) { AssertTablesEqual(*serial, *threaded); } +// Regression test for intermittent threading crashes on MinGW. +// Run this test multiple times manually to stress-test: +// while build/debug/arrow-json-test \ +// --gtest_filter=ReaderTest.MultipleChunksParallelRegression; do :; done +// See https://github.com/apache/arrow/issues/49272 + +// Helper used by multiple-chunk parallel tests to build the JSON input, +// configure read options and read the resulting table. +static Result> ReadMultipleChunksParallelTable( + int64_t count, bool use_threads, const ParseOptions& parse_options) { + ReadOptions read_options; + read_options.block_size = static_cast(count / 2); + read_options.use_threads = use_threads; + + std::string json; + json.reserve(static_cast(count) * 16); // rough reserve to avoid reallocations + for (int64_t i = 0; i < count; ++i) { + json += "{\"a\":" + std::to_string(i) + "}\n"; + } + + return ReadToTable(std::move(json), read_options, parse_options); +} + +TEST(ReaderTest, MultipleChunksParallelRegression) { + int64_t count = 1 << 10; + ParseOptions parse_options; + parse_options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType; + + ASSERT_OK_AND_ASSIGN(auto table, ReadMultipleChunksParallelTable( + count, /*use_threads=*/true, parse_options)); + ASSERT_EQ(table->num_rows(), count); +} + TEST(ReaderTest, ListArrayWithFewValues) { // ARROW-7647 ParseOptions parse_options; diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index bf107006f8bc..0f6356eef463 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -31,6 +31,7 @@ #include "arrow/util/io_util.h" #include "arrow/util/logging_internal.h" #include "arrow/util/mutex.h" +#include "arrow/util/windows_compatibility.h" #include "arrow/util/tracing_internal.h" @@ -630,9 +631,58 @@ void ThreadPool::CollectFinishedWorkersUnlocked() { state_->finished_workers_.clear(); } +// MinGW's __emutls implementation for C++ thread_local has known race conditions +// during thread creation that can cause segfaults. Use native Win32 TLS instead. +// See https://github.com/apache/arrow/issues/49272 +# ifdef __MINGW32__ + +namespace { +DWORD GetPoolTlsIndex() { + static DWORD index = [] { + DWORD i = TlsAlloc(); + if (i == TLS_OUT_OF_INDEXES) { + ARROW_LOG(FATAL) << "TlsAlloc failed for thread pool TLS: " + << WinErrorMessage(GetLastError()); + } + return i; + }(); + return index; +} +} // namespace + +static ThreadPool* GetCurrentThreadPool() { + // Preserve the caller's last-error value while also detecting TLS failures. + DWORD original_error = GetLastError(); + // Ensure a successful TlsGetValue() leaves GetLastError() == 0. + SetLastError(0); + auto* pool = static_cast(TlsGetValue(GetPoolTlsIndex())); + DWORD tls_error = GetLastError(); + if (tls_error != 0) { + // Restore the original error before logging a fatal TLS failure. + SetLastError(original_error); + ARROW_LOG(FATAL) << "TlsGetValue failed for thread pool TLS: " + << WinErrorMessage(tls_error); + } + // No TLS error: restore the caller's last-error value and return the pool. + SetLastError(original_error); + return pool; +} + +static void SetCurrentThreadPool(ThreadPool* pool) { + BOOL ok = TlsSetValue(GetPoolTlsIndex(), pool); + if (!ok) { + ARROW_LOG(FATAL) << "TlsSetValue failed for thread pool TLS: " + << WinErrorMessage(GetLastError()); + } +} +# else thread_local ThreadPool* current_thread_pool_ = nullptr; -bool ThreadPool::OwnsThisThread() { return current_thread_pool_ == this; } +static ThreadPool* GetCurrentThreadPool() { return current_thread_pool_; } +static void SetCurrentThreadPool(ThreadPool* pool) { current_thread_pool_ = pool; } +# endif + +bool ThreadPool::OwnsThisThread() { return GetCurrentThreadPool() == this; } void ThreadPool::LaunchWorkersUnlocked(int threads) { std::shared_ptr state = sp_state_; @@ -641,7 +691,7 @@ void ThreadPool::LaunchWorkersUnlocked(int threads) { state_->workers_.emplace_back(); auto it = --(state_->workers_.end()); *it = std::thread([this, state, it] { - current_thread_pool_ = this; + SetCurrentThreadPool(this); WorkerLoop(state, it); }); } diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index 45441fa32165..c1391c8be883 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -44,6 +44,7 @@ #include "arrow/util/macros.h" #include "arrow/util/test_common.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/windows_compatibility.h" namespace arrow { namespace internal { @@ -660,6 +661,34 @@ TEST_F(TestThreadPool, OwnsCurrentThread) { ASSERT_FALSE(one_failed); } +#ifdef _WIN32 +TEST_F(TestThreadPool, OwnsThisThreadPreservesLastError) { +# ifndef ARROW_ENABLE_THREADING + GTEST_SKIP() << "Test requires threading support"; +# endif + auto pool = this->MakeThreadPool(4); + + // Verify from outside the pool: OwnsThisThread() must not clobber + // the calling thread's last-error state. + ::SetLastError(ERROR_FILE_NOT_FOUND); + ASSERT_FALSE(pool->OwnsThisThread()); + ASSERT_EQ(::GetLastError(), static_cast(ERROR_FILE_NOT_FOUND)); + + // Verify from inside a pool thread. + std::atomic error_preserved{true}; + ASSERT_OK(pool->Spawn([&] { + ::SetLastError(ERROR_ACCESS_DENIED); + ASSERT_TRUE(pool->OwnsThisThread()); + if (::GetLastError() != ERROR_ACCESS_DENIED) { + error_preserved = false; + } + })); + + ASSERT_OK(pool->Shutdown()); + ASSERT_TRUE(error_preserved.load()); +} +#endif + TEST_F(TestThreadPool, StressSpawnThreaded) { #ifndef ARROW_ENABLE_THREADING GTEST_SKIP() << "Test requires threading support";