Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions c/experimental/stf/include/cccl/c/experimental/stf/stf.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ typedef struct stf_exec_place_opaque_t* stf_exec_place_handle;
//! \brief Opaque handle to a \c data_place.
typedef struct stf_data_place_opaque_t* stf_data_place_handle;

//! \brief Opaque handle to an \c exec_place_resources registry.
//!
//! Handles returned by stf_exec_place_resources_create() are owned by the
//! caller and must be released with stf_exec_place_resources_destroy().
//! Handles returned by stf_ctx_get_place_resources() do not own the context
//! resources, but the handle itself should still be released with
//! stf_exec_place_resources_destroy().
typedef struct stf_exec_place_resources_opaque_t* stf_exec_place_resources_handle;

//! \brief 4D position (coordinates) for partition mapping.
//! Layout matches C++ pos4 for use as partition function arguments/result.
typedef struct stf_pos4
Expand Down Expand Up @@ -170,6 +179,27 @@ stf_exec_place_grid_create(const stf_exec_place_handle* places, size_t count, co
//! \brief Same as stf_exec_place_destroy (grids are exec_place handles).
void stf_exec_place_grid_destroy(stf_exec_place_handle grid);

//! \brief Create a fresh exec_place_resources registry for standalone place-layer use.
//!
//! The registry lazily creates and owns stream pools for places used with
//! stf_exec_place_pick_stream(). Destroying it releases every stream it owns.
stf_exec_place_resources_handle stf_exec_place_resources_create(void);

//! \brief Destroy a registry returned by stf_exec_place_resources_create().
//!
//! For handles returned by stf_ctx_get_place_resources(), this releases only
//! the C handle wrapper and leaves the context-owned resources untouched.
//! \p h may be NULL.
void stf_exec_place_resources_destroy(stf_exec_place_resources_handle h);

//! \brief Pick a CUDA stream for \p h from the pools owned by \p res.
//!
//! \p for_computation is a hint: non-zero requests a compute stream, zero
//! requests a data-transfer stream. The returned stream is owned by \p res and
//! remains valid until \p res is destroyed, or until the owning context is
//! finalized for a borrowed registry.
CUstream stf_exec_place_pick_stream(stf_exec_place_resources_handle res, stf_exec_place_handle h, int for_computation);

//! \brief Host (CPU/pinned) data placement.
stf_data_place_handle stf_data_place_host(void);

Expand Down Expand Up @@ -337,6 +367,14 @@ stf_ctx_handle stf_ctx_create_graph(void);

void stf_ctx_finalize(stf_ctx_handle ctx);

//! \brief Borrow the per-place stream-pool registry embedded in \p ctx.
//!
//! The returned handle refers to resources that remain valid until
//! stf_ctx_finalize(ctx). Release the handle with
//! stf_exec_place_resources_destroy(); doing so does not destroy the
//! context-owned resources.
stf_exec_place_resources_handle stf_ctx_get_place_resources(stf_ctx_handle ctx);

//!
//! \brief Get synchronization fence for context
//!
Expand Down
65 changes: 64 additions & 1 deletion c/experimental/stf/src/stf.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@

using namespace cuda::experimental::stf;

struct stf_exec_place_resources_opaque_t
{
exec_place_resources* resources;
bool owns_resources;
bool owns_handle;
};

namespace
{
static_assert(sizeof(pos4) == sizeof(stf_pos4), "pos4 and stf_pos4 must have identical layout for C/C++ interop");
Expand Down Expand Up @@ -83,6 +90,10 @@ template <class P>
{
return static_cast<stf_data_place_handle>(opaque_bits);
}
else if constexpr (::std::is_same_v<P, exec_place_resources>)
{
static_assert(stf_dependent_false_v<P>, "use to_place_resources_opaque for exec_place_resources handles");
}
else if constexpr (::std::is_same_v<P, context>)
{
return static_cast<stf_ctx_handle>(opaque_bits);
Expand Down Expand Up @@ -117,7 +128,7 @@ template <class P>
template <class Opaque>
[[nodiscard]] auto* from_opaque_const(Opaque* h) noexcept
{
static_assert(!is_complete_v<Opaque>);
static_assert(!is_complete_v<Opaque> || ::std::is_same_v<Opaque*, stf_exec_place_resources_handle>);
const void* const opaque_bits = static_cast<const void*>(h);

if constexpr (::std::is_same_v<Opaque*, stf_exec_place_handle>)
Expand All @@ -128,6 +139,10 @@ template <class Opaque>
{
return static_cast<const data_place*>(opaque_bits);
}
else if constexpr (::std::is_same_v<Opaque*, stf_exec_place_resources_handle>)
{
return static_cast<const stf_exec_place_resources_opaque_t*>(opaque_bits)->resources;
}
else if constexpr (::std::is_same_v<Opaque*, stf_ctx_handle>)
{
return static_cast<const context*>(opaque_bits);
Expand Down Expand Up @@ -264,6 +279,45 @@ void stf_exec_place_grid_destroy(stf_exec_place_handle grid)
stf_exec_place_destroy(grid);
}

stf_exec_place_resources_handle stf_exec_place_resources_create(void)
{
return stf_try_allocate([] {
auto* res = new exec_place_resources{};
try
{
return new stf_exec_place_resources_opaque_t{res, true, true};
}
catch (...)
{
delete res;
throw;
}
});
}

void stf_exec_place_resources_destroy(stf_exec_place_resources_handle h)
{
if (h == nullptr)
{
return;
}
if (h->owns_resources)
{
delete h->resources;
}
Comment thread
NaderAlAwar marked this conversation as resolved.
if (h->owns_handle)
{
delete h;
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

CUstream stf_exec_place_pick_stream(stf_exec_place_resources_handle res, stf_exec_place_handle h, int for_computation)
{
_CCCL_ASSERT(res != nullptr, "exec_place_resources handle must not be null");
_CCCL_ASSERT(h != nullptr, "exec_place handle must not be null");
return reinterpret_cast<CUstream>(from_opaque(h)->pick_stream(*res->resources, for_computation != 0));
}

stf_data_place_handle stf_data_place_host(void)
{
return to_opaque(stf_try_allocate([] {
Expand Down Expand Up @@ -362,6 +416,15 @@ void stf_ctx_finalize(stf_ctx_handle ctx)
delete context_ptr;
}

stf_exec_place_resources_handle stf_ctx_get_place_resources(stf_ctx_handle ctx)
{
_CCCL_ASSERT(ctx != nullptr, "context handle must not be null");
auto* context_ptr = from_opaque(ctx);
return stf_try_allocate([context_ptr] {
return new stf_exec_place_resources_opaque_t{&context_ptr->async_resources().get_place_resources(), false, true};
});
}

cudaStream_t stf_fence(stf_ctx_handle ctx)
{
_CCCL_ASSERT(ctx != nullptr, "context handle must not be null");
Expand Down
58 changes: 58 additions & 0 deletions c/experimental/stf/test/test_places.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,61 @@ C2H_TEST("composite data place with stf_exec_place_grid_create (vector of places
REQUIRE(X[i] == static_cast<float>(i));
}
}

C2H_TEST("exec_place_pick_stream standalone resources", "[places][stream]")
{
stf_exec_place_resources_handle res = stf_exec_place_resources_create();
REQUIRE(res != nullptr);

stf_exec_place_handle place = stf_exec_place_current_device();
REQUIRE(place != nullptr);

CUstream stream = stf_exec_place_pick_stream(res, place, /*for_computation=*/1);
REQUIRE(stream != nullptr);
REQUIRE(cudaStreamSynchronize(reinterpret_cast<cudaStream_t>(stream)) == cudaSuccess);

stf_exec_place_destroy(place);
stf_exec_place_resources_destroy(res);
}

C2H_TEST("exec_place resources are independent", "[places][stream]")
{
stf_exec_place_resources_handle res1 = stf_exec_place_resources_create();
stf_exec_place_resources_handle res2 = stf_exec_place_resources_create();
REQUIRE(res1 != nullptr);
REQUIRE(res2 != nullptr);

stf_exec_place_handle place = stf_exec_place_current_device();
REQUIRE(place != nullptr);

CUstream stream1 = stf_exec_place_pick_stream(res1, place, /*for_computation=*/1);
CUstream stream2 = stf_exec_place_pick_stream(res2, place, /*for_computation=*/1);
REQUIRE(stream1 != nullptr);
REQUIRE(stream2 != nullptr);
REQUIRE(stream1 != stream2);

stf_exec_place_destroy(place);
stf_exec_place_resources_destroy(res2);
stf_exec_place_resources_destroy(res1);
}

C2H_TEST("exec_place_pick_stream borrowed context resources", "[places][stream][ctx]")
{
stf_ctx_handle ctx = stf_ctx_create();
REQUIRE(ctx != nullptr);

stf_exec_place_resources_handle res = stf_ctx_get_place_resources(ctx);
REQUIRE(res != nullptr);

stf_exec_place_handle place = stf_exec_place_current_device();
REQUIRE(place != nullptr);

CUstream stream = stf_exec_place_pick_stream(res, place, /*for_computation=*/1);
REQUIRE(stream != nullptr);
REQUIRE(cudaStreamSynchronize(reinterpret_cast<cudaStream_t>(stream)) == cudaSuccess);

stf_exec_place_resources_destroy(res);

stf_exec_place_destroy(place);
stf_ctx_finalize(ctx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ public:
return true;
}

stream_pool& get_stream_pool(bool) const override
stream_pool& get_stream_pool(bool, exec_place_resources&, const exec_place&) const override
{
// User-stream places carry their own single-stream pool and intentionally
// ignore the registry.
return dummy_pool_;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,11 @@ public:
return "green_ctx(id=" + ::std::to_string(get_cuda_context_id(g_ctx_)) + " dev=" + ::std::to_string(devid_) + ")";
}

stream_pool& get_stream_pool(bool) const override
stream_pool& get_stream_pool(bool, exec_place_resources&, const exec_place&) const override
{
// Green-context places carry their own pool (constructed from the
// green_ctx_view) and bypass the registry. The user is responsible for
// keeping the underlying CUgreenCtx alive while the pool is in use.
return pool_;
}

Expand Down
134 changes: 134 additions & 0 deletions cudax/include/cuda/experimental/__places/exec_place_resources.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//===----------------------------------------------------------------------===//
//
// Part of CUDASTF in CUDA C++ Core Libraries,
// under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES.
//
//===----------------------------------------------------------------------===//

/**
* @file
* @brief Standalone per-place stream-pool registry.
*
* `exec_place_resources` owns a `{compute, data}` `stream_pool` slot for every
* pooled place it is queried with. Slots are created lazily on first use and
* destroyed with the registry. The registry depends only on `stream_pool.cuh`
* and a forward declaration of `exec_place`; it can be embedded in any
* resource container (e.g. `async_resources_handle`) without pulling in STF.
*
* Keys are `exec_place::impl*` pointers. Pooled implementations (`device(N)`,
* `host()`) live as process-wide singleton impls, so pointer identity matches
* place identity for them. Self-contained implementations (`cuda_stream`,
* green-context, grid) override `get_stream_pool` and never reach the
* registry.
*/

#pragma once

#include <cuda/__cccl_config>

#if defined(_CCCL_IMPLICIT_SYSTEM_HEADER_GCC)
# pragma GCC system_header
#elif defined(_CCCL_IMPLICIT_SYSTEM_HEADER_CLANG)
# pragma clang system_header
#elif defined(_CCCL_IMPLICIT_SYSTEM_HEADER_MSVC)
# pragma system_header
#endif // no system header

#include <cuda/experimental/__places/stream_pool.cuh>

#include <mutex>
#include <unordered_map>

namespace cuda::experimental::places
{
/**
* @brief Default size of each per-place stream pool created by the registry.
*
* `exec_place::impl::pool_size` and `data_pool_size` are aliases to these
* values so `places.cuh` can keep its public surface unchanged.
*/
inline constexpr ::std::size_t exec_place_default_pool_size = 4;
inline constexpr ::std::size_t exec_place_default_data_pool_size = 4;

/**
* @brief A registry of per-place stream pools keyed by `exec_place::impl*`.
*
* For every distinct pooled impl pointer the registry is queried with, it
* owns one `{compute, data}` pair of `stream_pool`s, created lazily on first
* lookup with sizes `exec_place_default_pool_size` /
* `exec_place_default_data_pool_size`.
*
* The map itself is mutex-guarded. The mutex is only held across the
* find/insert into the map; subsequent stream creation (which happens lazily
* inside `stream_pool::next`) runs outside the lock, so contention is limited
* to slow-path task submission.
*
* Lifetime: each entry's pool is owned by the registry. Destroying the
* registry destroys every pool it has created (and their cached
* `cudaStream_t` handles). Consequently, a registry must not outlive the
* CUDA primary context(s) of the devices it has cached streams for; with
* this design, registries are typically embedded in an
* `async_resources_handle` and share the lifetime of the owning STF context.
*
* Caveats for externally-owned places:
* - User-stream places (`exec_place::cuda_stream(s)`) carry their own
* single-stream pool and never participate in the registry.
* - Green-context places carry their own pool (constructed from the
* `green_ctx_view`) and also bypass the registry. The user must keep the
* underlying `CUgreenCtx` alive as long as the place is used.
*/
class exec_place_resources
{
public:
struct per_place_pools
{
per_place_pools()
: compute(exec_place_default_pool_size)
, data(exec_place_default_data_pool_size)
{}
Comment thread
caugonnet marked this conversation as resolved.

stream_pool compute;
stream_pool data;
};

exec_place_resources() = default;

exec_place_resources(const exec_place_resources&) = delete;
exec_place_resources& operator=(const exec_place_resources&) = delete;
exec_place_resources(exec_place_resources&&) = delete;
exec_place_resources& operator=(exec_place_resources&&) = delete;

/**
* @brief Look up (or lazily create) the `{compute, data}` pool slot for the
* supplied impl pointer.
*
* Thread-safe: the mutex is held only across the find/insert. The returned
* reference is stable for the lifetime of the registry (`std::unordered_map`
* preserves node addresses across rehashes).
*/
[[nodiscard]] per_place_pools& get(const void* impl_key)
{
::std::lock_guard<::std::mutex> lock(mtx_);
auto it = map_.find(impl_key);
if (it == map_.end())
{
it = map_.emplace(impl_key, per_place_pools{}).first;
}
return it->second;
}

/// @brief Number of per-place entries currently cached. Mainly for tests.
[[nodiscard]] ::std::size_t size() const
{
::std::lock_guard<::std::mutex> lock(mtx_);
return map_.size();
}

private:
mutable ::std::mutex mtx_;
::std::unordered_map<const void*, per_place_pools> map_;
};
} // namespace cuda::experimental::places
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private:
sub_places.push_back(mv(place));
return;
}
auto& pool = scalar_place.get_stream_pool(true);
auto& pool = scalar_place.get_stream_pool(true, handle.get_place_resources());
for (size_t i = 0; i < pool.size(); i++)
{
sub_places.push_back(exec_place::cuda_stream(pool.next(scalar_place)));
Expand Down
Loading
Loading