Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions cpp/src/arrow/json/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,39 @@ TEST(ReaderTest, MultipleChunksParallel) {
AssertTablesEqual(*serial, *threaded);
}

// Regression test for intermittent threading crashes on MinGW.
// Run this test multiple times manually to stress-test:
// while build/debug/arrow-json-test \
// --gtest_filter=ReaderTest.MultipleChunksParallelRegression; do :; done
// See https://github.com/apache/arrow/issues/49272

// Helper used by multiple-chunk parallel tests to build the JSON input,
// configure read options and read the resulting table.
static Result<std::shared_ptr<Table>> ReadMultipleChunksParallelTable(
int64_t count, bool use_threads, const ParseOptions& parse_options) {
ReadOptions read_options;
read_options.block_size = static_cast<int>(count / 2);
read_options.use_threads = use_threads;

std::string json;
json.reserve(static_cast<size_t>(count) * 16); // rough reserve to avoid reallocations
for (int64_t i = 0; i < count; ++i) {
json += "{\"a\":" + std::to_string(i) + "}\n";
}

return ReadToTable(std::move(json), read_options, parse_options);
}

TEST(ReaderTest, MultipleChunksParallelRegression) {
int64_t count = 1 << 10;
ParseOptions parse_options;
parse_options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;

ASSERT_OK_AND_ASSIGN(auto table, ReadMultipleChunksParallelTable(
count, /*use_threads=*/true, parse_options));
ASSERT_EQ(table->num_rows(), count);
Comment on lines +316 to +323
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description mentions adding a stress test that runs the parallel JSON read 20 times, but this new test only executes the path once and relies on a manual while-loop for stress. Either update the test to perform the intended repeated iterations (and potentially rename accordingly), or update the PR description/comments so they match the actual behavior.

Copilot uses AI. Check for mistakes.
}

TEST(ReaderTest, ListArrayWithFewValues) {
// ARROW-7647
ParseOptions parse_options;
Expand Down
54 changes: 52 additions & 2 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "arrow/util/io_util.h"
#include "arrow/util/logging_internal.h"
#include "arrow/util/mutex.h"
#include "arrow/util/windows_compatibility.h"

#include "arrow/util/tracing_internal.h"

Expand Down Expand Up @@ -630,9 +631,58 @@ void ThreadPool::CollectFinishedWorkersUnlocked() {
state_->finished_workers_.clear();
}

// MinGW's __emutls implementation for C++ thread_local has known race conditions
// during thread creation that can cause segfaults. Use native Win32 TLS instead.
// See https://github.com/apache/arrow/issues/49272
# ifdef __MINGW32__

namespace {
DWORD GetPoolTlsIndex() {
static DWORD index = [] {
DWORD i = TlsAlloc();
if (i == TLS_OUT_OF_INDEXES) {
ARROW_LOG(FATAL) << "TlsAlloc failed for thread pool TLS: "
<< WinErrorMessage(GetLastError());
}
return i;
}();
return index;
}
} // namespace

static ThreadPool* GetCurrentThreadPool() {
// Preserve the caller's last-error value while also detecting TLS failures.
DWORD original_error = GetLastError();
// Ensure a successful TlsGetValue() leaves GetLastError() == 0.
SetLastError(0);
auto* pool = static_cast<ThreadPool*>(TlsGetValue(GetPoolTlsIndex()));
DWORD tls_error = GetLastError();
if (tls_error != 0) {
// Restore the original error before logging a fatal TLS failure.
SetLastError(original_error);
ARROW_LOG(FATAL) << "TlsGetValue failed for thread pool TLS: "
<< WinErrorMessage(tls_error);
}
// No TLS error: restore the caller's last-error value and return the pool.
SetLastError(original_error);
return pool;
}

static void SetCurrentThreadPool(ThreadPool* pool) {
BOOL ok = TlsSetValue(GetPoolTlsIndex(), pool);
if (!ok) {
ARROW_LOG(FATAL) << "TlsSetValue failed for thread pool TLS: "
<< WinErrorMessage(GetLastError());
}
}
# else
thread_local ThreadPool* current_thread_pool_ = nullptr;

bool ThreadPool::OwnsThisThread() { return current_thread_pool_ == this; }
static ThreadPool* GetCurrentThreadPool() { return current_thread_pool_; }
static void SetCurrentThreadPool(ThreadPool* pool) { current_thread_pool_ = pool; }
# endif

bool ThreadPool::OwnsThisThread() { return GetCurrentThreadPool() == this; }

void ThreadPool::LaunchWorkersUnlocked(int threads) {
std::shared_ptr<State> state = sp_state_;
Expand All @@ -641,7 +691,7 @@ void ThreadPool::LaunchWorkersUnlocked(int threads) {
state_->workers_.emplace_back();
auto it = --(state_->workers_.end());
*it = std::thread([this, state, it] {
current_thread_pool_ = this;
SetCurrentThreadPool(this);
WorkerLoop(state, it);
});
}
Expand Down
29 changes: 29 additions & 0 deletions cpp/src/arrow/util/thread_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "arrow/util/macros.h"
#include "arrow/util/test_common.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/windows_compatibility.h"

namespace arrow {
namespace internal {
Expand Down Expand Up @@ -660,6 +661,34 @@ TEST_F(TestThreadPool, OwnsCurrentThread) {
ASSERT_FALSE(one_failed);
}

#ifdef _WIN32
TEST_F(TestThreadPool, OwnsThisThreadPreservesLastError) {
# ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Test requires threading support";
# endif
auto pool = this->MakeThreadPool(4);

// Verify from outside the pool: OwnsThisThread() must not clobber
// the calling thread's last-error state.
::SetLastError(ERROR_FILE_NOT_FOUND);
ASSERT_FALSE(pool->OwnsThisThread());
ASSERT_EQ(::GetLastError(), static_cast<DWORD>(ERROR_FILE_NOT_FOUND));

// Verify from inside a pool thread.
std::atomic<bool> error_preserved{true};
ASSERT_OK(pool->Spawn([&] {
::SetLastError(ERROR_ACCESS_DENIED);
ASSERT_TRUE(pool->OwnsThisThread());
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using gtest fatal assertions (ASSERT_*) inside the spawned worker thread. If the assertion fires it will be reported from a background thread and can be harder to diagnose/debug; it’s also unnecessary here since the test already uses an atomic to report failures back to the main thread. Prefer replacing the ASSERT_TRUE call with a normal conditional that flips the atomic, then assert on the atomic after Shutdown().

Suggested change
ASSERT_TRUE(pool->OwnsThisThread());
if (!pool->OwnsThisThread()) {
error_preserved = false;
return;
}

Copilot uses AI. Check for mistakes.
if (::GetLastError() != ERROR_ACCESS_DENIED) {
error_preserved = false;
}
}));

ASSERT_OK(pool->Shutdown());
ASSERT_TRUE(error_preserved.load());
}
#endif

TEST_F(TestThreadPool, StressSpawnThreaded) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Test requires threading support";
Expand Down
Loading