diff --git a/vortex-cuda/src/arrow/canonical.rs b/vortex-cuda/src/arrow/canonical.rs index 109e343e424..7a6e2a664b8 100644 --- a/vortex-cuda/src/arrow/canonical.rs +++ b/vortex-cuda/src/arrow/canonical.rs @@ -764,7 +764,7 @@ fn gather_binary_values( /// /// Returns `None` for the buffer when Arrow can omit validity because all rows are valid. /// -/// Returned buffers use zeroed 4-byte padding so cuDF's word-sized mask reads stay in bounds. +/// Returned buffers use zeroed cuDF-sized padding so mask reads stay in bounds. /// Bits at positions `>= len + arrow_offset` within the final data byte are unspecified, as /// Arrow permits. pub(super) async fn export_arrow_validity_buffer( @@ -798,14 +798,11 @@ pub(super) async fn export_arrow_validity_buffer( let bitmap = ctx.ensure_on_device(bits).await?; // ArrowDeviceArray uses ArrowArray layout with its buffers being device pointers. // - // Validity is one bit per row, addressed via the Arrow array offset. Reuse the bitmap - // when Vortex's validity offset already matches Arrow's; otherwise repack on the GPU - // so row i is at Arrow bit `arrow_offset + i`. - let bitmap = if meta.offset() == arrow_offset { - bitmap - } else { - repack_arrow_validity_buffer(&bitmap, meta.offset(), len, arrow_offset, ctx)? - }; + // Validity is one bit per row, addressed via the Arrow array offset. Repack on the GPU + // so row i is at Arrow bit `arrow_offset + i` and the backing allocation has the + // zeroed cuDF-sized padding expected by Arrow Device consumers. + let bitmap = + repack_arrow_validity_buffer(&bitmap, meta.offset(), len, arrow_offset, ctx)?; // Keep nullable exports self-describing for consumers that require exact null counts. let null_count = count_arrow_validity_nulls(&bitmap, len, arrow_offset, ctx)?; Ok((Some(bitmap), null_count)) @@ -813,6 +810,14 @@ pub(super) async fn export_arrow_validity_buffer( } } +/// Minimum backing allocation quantum for Arrow validity buffers handed to cuDF. +/// +/// Arrow exposes only the logical bitmap byte length, but cuDF imports null masks into 64-byte +/// padded buffers and its kernels may read through that padded extent. Vortex therefore keeps the +/// exported `BufferHandle` sliced to Arrow's logical length while zero-padding the underlying CUDA +/// allocation to this boundary. +const CUDF_VALIDITY_BUFFER_PADDING: usize = 64; + /// Return the byte length needed for `len` validity bits at the given bit offset. fn validity_bitmap_byte_len(len: usize, arrow_offset: usize) -> VortexResult { Ok(len @@ -821,12 +826,25 @@ fn validity_bitmap_byte_len(len: usize, arrow_offset: usize) -> VortexResult usize { + if byte_len == 0 { + 1 + } else { + byte_len.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING) + } +} + /// Allocate a zeroed device buffer with cuDF-safe padding for Arrow validity masks. fn device_zeroed_byte_buffer( byte_len: usize, ctx: &mut CudaExecutionCtx, ) -> VortexResult { - let allocation_len = byte_len.next_multiple_of(size_of::()).max(1); + let allocation_len = validity_bitmap_allocation_byte_len(byte_len); let mut buffer = ctx.device_alloc::(allocation_len)?; ctx.stream() .memset_zeros(&mut buffer) @@ -894,8 +912,8 @@ pub fn count_arrow_validity_nulls( /// /// Vortex bitmaps may start at any bit offset. Arrow exposes only a byte-addressed validity buffer /// plus an array offset, so sliced compact exports need a GPU rewrite when either side has a -/// bit-level offset. The kernel writes the output one 64-bit word at a time, funnel-shifting two -/// adjacent input words, so the allocation is padded to whole words (zeroed by the edge masks). +/// bit-level offset. The output handle keeps Arrow's logical byte length, while the backing +/// allocation is zero-padded to cuDF's mask allocation size for consumers that read full masks. pub fn repack_arrow_validity_buffer( input_buffer: &BufferHandle, input_offset: usize, @@ -904,7 +922,13 @@ pub fn repack_arrow_validity_buffer( ctx: &mut CudaExecutionCtx, ) -> VortexResult { let output_bytes = validity_bitmap_byte_len(len, arrow_offset)?; + // The CUDA kernel writes the bitmap as u64 words, so round the logical byte length up to the + // number of words that cover the exported Arrow bytes. let output_words = output_bytes.div_ceil(size_of::()); + // `device_alloc::` takes a word count, while the padding policy is expressed in bytes. + // Round up so the padded byte allocation is fully represented by whole u64 words. + let allocation_words = + validity_bitmap_allocation_byte_len(output_bytes).div_ceil(size_of::()); // The kernel loads the input bitmap as 64-bit words. if !input_buffer @@ -914,7 +938,12 @@ pub fn repack_arrow_validity_buffer( vortex_bail!("Arrow validity repack requires an 8-byte aligned device buffer"); } - let output = ctx.device_alloc::(output_words.max(1))?; + let mut output = ctx.device_alloc::(allocation_words.max(1))?; + // The repack kernel writes only the logical bitmap words. Zero the whole backing allocation so + // cuDF's padded mask reads see invalid rows, not uninitialized CUDA memory. + ctx.stream() + .memset_zeros(&mut output) + .map_err(|err| vortex_err!("Failed to zero Arrow validity buffer padding: {err}"))?; let output_device = CudaDeviceBuffer::new(output); if output_words > 0 { @@ -1337,6 +1366,7 @@ mod tests { use crate::arrow::ArrowDeviceArray; use crate::arrow::DeviceArrayExt; use crate::arrow::PrivateData; + use crate::arrow::canonical::CUDF_VALIDITY_BUFFER_PADDING; use crate::arrow::canonical::export_arrow_validity_buffer; use crate::arrow::canonical::repack_arrow_validity_buffer; use crate::device_buffer::cuda_backing_allocation; @@ -2955,7 +2985,43 @@ mod tests { let backing_bytes = backing.to_host_sync(); assert_eq!( backing_bytes.len(), - output_bytes.next_multiple_of(size_of::()) + output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING) + ); + assert!(backing_bytes[output_bytes..].iter().all(|byte| *byte == 0)); + + Ok(()) + } + + #[crate::test] + async fn test_export_validity_buffer_pads_matching_offset() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&crate::cuda_session()) + .vortex_expect("failed to create execution context"); + + let len = 3; + let arrow_offset = 0; + let (buffer, null_count) = export_arrow_validity_buffer( + Validity::from(BitBuffer::from_iter([true, false, true])), + len, + arrow_offset, + &mut ctx, + ) + .await?; + ctx.synchronize_stream()?; + + assert_eq!(null_count, 1); + let buffer = buffer.vortex_expect("nullable validity should export a null buffer"); + let output_bytes = (len + arrow_offset).div_ceil(8); + assert_eq!(buffer.len(), output_bytes); + let actual = BitBuffer::new(buffer.to_host_sync(), len + arrow_offset) + .iter() + .collect::>(); + assert_eq!(actual, [true, false, true]); + + let backing = cuda_backing_allocation(&buffer)?; + let backing_bytes = backing.to_host_sync(); + assert_eq!( + backing_bytes.len(), + output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING) ); assert!(backing_bytes[output_bytes..].iter().all(|byte| *byte == 0)); @@ -2983,6 +3049,11 @@ mod tests { let bytes = buffer.to_host_sync(); assert_eq!(bytes.len(), (len + arrow_offset).div_ceil(8)); assert!(bytes.iter().all(|byte| *byte == 0)); + let backing = cuda_backing_allocation(&buffer)?; + assert_eq!( + backing.len(), + bytes.len().next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING) + ); Ok(()) } diff --git a/vortex-python-cuda/README.md b/vortex-python-cuda/README.md new file mode 100644 index 00000000000..4871dd34c68 --- /dev/null +++ b/vortex-python-cuda/README.md @@ -0,0 +1,61 @@ +# vortex-data-cuda + +CUDA extension for [Vortex](https://vortex.dev). Exports a `vortex.Array` to +[RAPIDS cuDF](https://docs.rapids.ai/api/cudf/stable/) or any +[Arrow C Device](https://arrow.apache.org/docs/format/CDeviceDataInterface.html) consumer, on the +GPU. Imported as `vortex_cuda`. + +## Install + +```bash +pip install vortex-data vortex-data-cuda # versions must match; CUDA device required +``` + +`to_cudf` also needs RAPIDS `cudf` and `pylibcudf` in the environment. + +## Export to cuDF + +`to_cudf` converts via the Arrow C Device interface: struct arrays become a `cudf.DataFrame`, +everything else a `cudf.Series`. Importing `vortex_cuda` installs it as `vortex.Array.to_cudf`. + +```python +import vortex, vortex_cuda +import pyarrow as pa + +s = vortex.array([1, None, 3]).to_cudf() # -> cudf.Series +df = vortex_cuda.to_cudf( # struct -> cudf.DataFrame + vortex.Array.from_arrow(pa.table({"x": [1, None, 3], "y": [4.0, 5.0, 6.0]})) +) +``` + +Buffers are imported zero-copy; host arrays are moved to the GPU as part of the export. cuDF keeps +shared ownership for the lifetime of the result and any view derived from it, so no extra +bookkeeping is needed. + +Signature: `to_cudf(obj, *, fallback="error")`. Only `fallback="error"` is supported +(`NotImplementedError` otherwise); raises `TypeError` for a non-`vortex.Array`, `RuntimeError` +without a CUDA device, `ImportError` if cuDF/pylibcudf are missing. + +## Export an Arrow C Device array + +`vortex.Array` exposes the standard `__arrow_c_device_array__` protocol (installed when CUDA is +available), so any Arrow-C-Device consumer can ingest it zero-copy: + +```python +import vortex, vortex_cuda, pylibcudf + +array = vortex.array([1, None, 3]) +column = pylibcudf.Column.from_arrow(array) # via the protocol + +schema_capsule, device_array_capsule = vortex_cuda.export_device_array(array) # raw capsules +``` + +`export_device_array` returns `PyCapsule`s named `"arrow_schema"` and `"arrow_device_array"`. The +consumer owns the exported structs and runs the Arrow release callbacks when done (libcudf does +this automatically); Vortex's device buffers stay alive until then. + +## Notes + +- Integer, float, bool, and string arrays (incl. nullable) and structs are supported; nulls are + preserved. +- A CUDA device is required; there is no CPU fallback. diff --git a/vortex-python-cuda/python/vortex_cuda/__init__.py b/vortex-python-cuda/python/vortex_cuda/__init__.py index c5d1610724e..0ef2b811d8f 100644 --- a/vortex-python-cuda/python/vortex_cuda/__init__.py +++ b/vortex-python-cuda/python/vortex_cuda/__init__.py @@ -2,11 +2,107 @@ # SPDX-FileCopyrightText: Copyright the Vortex contributors # pyright: reportMissingModuleSource=false, reportPrivateUsage=false +import importlib +from typing import Protocol, cast + from . import _lib +# Private debug hooks used by CUDA bridge tests. _debug_array_metadata_dtype = _lib._debug_array_metadata_dtype _debug_array_metadata_display_values = _lib._debug_array_metadata_display_values +_debug_arrow_device_array_capsule_summary = _lib._debug_arrow_device_array_capsule_summary +_debug_consume_arrow_device_array_capsules = _lib._debug_consume_arrow_device_array_capsules cuda_available = _lib.cuda_available export_device_array = _lib.export_device_array -__all__ = ["cuda_available", "export_device_array"] + +class _FromArrow(Protocol): + def from_arrow(self, obj: object) -> object: ... + + +class _FromPylibcudf(Protocol): + def from_pylibcudf(self, obj: object) -> object: ... + + +class _DataFrameFromPylibcudf(Protocol): + def from_pylibcudf(self, obj: object, metadata: dict[str, object] | None = None) -> object: ... + + +class _PylibcudfModule(Protocol): + Column: _FromArrow + Table: _FromArrow + + +class _CudfModule(Protocol): + Series: _FromPylibcudf + DataFrame: _DataFrameFromPylibcudf + + +_SUPPORTED_FALLBACKS = frozenset({"error"}) + + +def _Array_to_cudf(self: object, *, fallback: str = "error") -> object: + return to_cudf(self, fallback=fallback) + + +def _Array___arrow_c_device_array__( + self: object, + requested_schema: object | None = None, + **kwargs: object, +) -> tuple[object, object]: + return export_device_array(self, requested_schema, **kwargs) + + +def _install_vortex_array_methods() -> None: + import vortex + + setattr(vortex.Array, "to_cudf", _Array_to_cudf) + if cuda_available(): + setattr(vortex.Array, "__arrow_c_device_array__", _Array___arrow_c_device_array__) + + +def _import_cudf_modules() -> tuple[_CudfModule, _PylibcudfModule]: + try: + cudf = importlib.import_module("cudf") + pylibcudf = importlib.import_module("pylibcudf") + except ImportError as err: + raise ImportError("vortex_cuda.to_cudf requires RAPIDS cuDF and pylibcudf to be installed") from err + return cast(_CudfModule, cast(object, cudf)), cast(_PylibcudfModule, cast(object, pylibcudf)) + + +def to_cudf(obj: object, *, fallback: str = "error") -> object: + """Convert a Vortex array to a cuDF object through the Arrow Device interface. + + pylibcudf imports the exported Arrow Device array zero-copy and keeps shared ownership of + Vortex's device buffers (via libcudf's ``arrow_column``) for the lifetime of the returned + cuDF object and any view derived from it, so no extra keepalive is required here. + + ``fallback`` is reserved for future policy choices. The initial implementation + supports only ``fallback="error"`` and never falls back to host Arrow conversion. + """ + if fallback not in _SUPPORTED_FALLBACKS: + raise NotImplementedError("vortex_cuda.to_cudf currently supports only fallback='error'") + + import vortex + + if not isinstance(obj, vortex.Array): + raise TypeError(f"vortex_cuda.to_cudf expected a vortex.Array, got {type(obj).__name__}") + + if not cuda_available(): + raise RuntimeError("CUDA is not available; vortex_cuda.to_cudf requires a CUDA device") + + cudf, pylibcudf = _import_cudf_modules() + + dtype = obj.dtype + if isinstance(dtype, vortex.StructDType): + table = pylibcudf.Table.from_arrow(obj) + return cudf.DataFrame.from_pylibcudf(table, metadata={"columns": dtype.names()}) + + column = pylibcudf.Column.from_arrow(obj) + return cudf.Series.from_pylibcudf(column) + + +_install_vortex_array_methods() + + +__all__ = ["cuda_available", "export_device_array", "to_cudf"] diff --git a/vortex-python-cuda/python/vortex_cuda/_lib.pyi b/vortex-python-cuda/python/vortex_cuda/_lib.pyi index dad7628363d..51b4e100fe7 100644 --- a/vortex-python-cuda/python/vortex_cuda/_lib.pyi +++ b/vortex-python-cuda/python/vortex_cuda/_lib.pyi @@ -3,6 +3,10 @@ def _debug_array_metadata_dtype(array: object) -> str: ... def _debug_array_metadata_display_values(array: object) -> str: ... +def _debug_arrow_device_array_capsule_summary(schema: object, device_array: object) -> dict[str, object]: ... +def _debug_consume_arrow_device_array_capsules( + schema: object, device_array: object +) -> tuple[bool, bool, bool, bool, bool, bool]: ... def cuda_available() -> bool: ... def export_device_array( array: object, requested_schema: object | None = None, **kwargs: object diff --git a/vortex-python-cuda/src/lib.rs b/vortex-python-cuda/src/lib.rs index f93198cceb4..97bd3548c0f 100644 --- a/vortex-python-cuda/src/lib.rs +++ b/vortex-python-cuda/src/lib.rs @@ -461,6 +461,84 @@ fn release_exported(exported: &mut ArrowDeviceArrayWithSchema) { release_device_array(&mut exported.array); } +/// Return non-owning details from Arrow Device capsules for Python-side smoke consumers. +#[pyfunction] +fn _debug_arrow_device_array_capsule_summary<'py>( + py: Python<'py>, + schema: Bound<'py, PyCapsule>, + device_array: Bound<'py, PyCapsule>, +) -> PyResult> { + let schema = unsafe { + schema + .pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))? + .cast::() + .as_ref() + }; + let device_array = unsafe { + device_array + .pointer_checked(Some(ARROW_DEVICE_ARRAY_CAPSULE_NAME))? + .cast::() + .as_ref() + }; + + let summary = PyDict::new(py); + summary.set_item("schema_live", schema.release.is_some())?; + summary.set_item("array_live", device_array.array.release.is_some())?; + summary.set_item("is_cuda", device_array.device_type == ARROW_DEVICE_CUDA)?; + summary.set_item("device_type", device_array.device_type)?; + summary.set_item("device_id", device_array.device_id)?; + summary.set_item("length", device_array.array.length)?; + summary.set_item("null_count", device_array.array.null_count)?; + summary.set_item("n_buffers", device_array.array.n_buffers)?; + summary.set_item("n_children", device_array.array.n_children)?; + Ok(summary) +} + +/// Simulate a Python Arrow Device consumer taking ownership from the returned capsules. +#[pyfunction] +fn _debug_consume_arrow_device_array_capsules( + schema: Bound<'_, PyCapsule>, + device_array: Bound<'_, PyCapsule>, +) -> PyResult<(bool, bool, bool, bool, bool, bool)> { + let mut schema_ptr = schema + .pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))? + .cast::(); + let mut device_array_ptr = device_array + .pointer_checked(Some(ARROW_DEVICE_ARRAY_CAPSULE_NAME))? + .cast::(); + + let schema_ref = unsafe { schema_ptr.as_mut() }; + let device_array_ref = unsafe { device_array_ptr.as_mut() }; + let schema_had_release = schema_ref.release.is_some(); + let array_had_release = device_array_ref.array.release.is_some(); + + release_schema(schema_ref); + release_device_array(device_array_ref); + + let schema_release_cleared = schema_ref.release.is_none(); + let array_release_cleared = device_array_ref.array.release.is_none(); + + set_capsule_name(&schema, USED_ARROW_SCHEMA_CAPSULE_NAME)?; + set_capsule_name(&device_array, USED_ARROW_DEVICE_ARRAY_CAPSULE_NAME)?; + + Ok(( + schema_had_release, + array_had_release, + schema_release_cleared, + array_release_cleared, + schema.is_valid_checked(Some(USED_ARROW_SCHEMA_CAPSULE_NAME)), + device_array.is_valid_checked(Some(USED_ARROW_DEVICE_ARRAY_CAPSULE_NAME)), + )) +} + +fn set_capsule_name(capsule: &Bound<'_, PyCapsule>, name: &CStr) -> PyResult<()> { + let result = unsafe { ffi::PyCapsule_SetName(capsule.as_ptr(), name.as_ptr()) }; + if result != 0 { + return Err(PyErr::fetch(capsule.py())); + } + Ok(()) +} + fn schema_capsule<'py>( py: Python<'py>, schema: FFI_ArrowSchema, @@ -573,6 +651,14 @@ fn _lib(m: &Bound) -> PyResult<()> { m.add_function(wrap_pyfunction!(cuda_available, m)?)?; m.add_function(wrap_pyfunction!(_debug_array_metadata_dtype, m)?)?; m.add_function(wrap_pyfunction!(_debug_array_metadata_display_values, m)?)?; + m.add_function(wrap_pyfunction!( + _debug_arrow_device_array_capsule_summary, + m + )?)?; + m.add_function(wrap_pyfunction!( + _debug_consume_arrow_device_array_capsules, + m + )?)?; m.add_function(wrap_pyfunction!(export_device_array, m)?)?; Ok(()) } diff --git a/vortex-python-cuda/test/test_cuda.py b/vortex-python-cuda/test/test_cuda.py index de269164317..e5db2727b39 100644 --- a/vortex-python-cuda/test/test_cuda.py +++ b/vortex-python-cuda/test/test_cuda.py @@ -1,15 +1,58 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright the Vortex contributors +import gc +import sys import tomllib +import types +from collections.abc import Iterable from pathlib import Path -from typing import cast +from typing import Protocol, cast +import pytest import vortex_cuda import vortex +class _ArrowDeviceArrayExportable(Protocol): + def __arrow_c_device_array__( + self, + requested_schema: object | None = None, + **kwargs: object, + ) -> tuple[object, object]: ... + + +class _CudfArray(Protocol): + def to_cudf(self, *, fallback: str = "error") -> object: ... + + +class _ArrowPyList(Protocol): + def to_pylist(self) -> list[object]: ... + + +class _CudfSeries(Protocol): + @property + def dtype(self) -> object: ... + + def to_arrow(self) -> _ArrowPyList: ... + + +class _CudfDTypes(Protocol): + def items(self) -> Iterable[tuple[object, object]]: ... + + +class _CudfDataFrame(Protocol): + @property + def columns(self) -> Iterable[object]: ... + + @property + def dtypes(self) -> _CudfDTypes: ... + + def __len__(self) -> int: ... + def __getitem__(self, key: str) -> _CudfSeries: ... + + def workspace_version() -> str: workspace_pyproject = tomllib.loads((Path(__file__).parents[2] / "Cargo.toml").read_text()) return cast(str, workspace_pyproject["workspace"]["package"]["version"]) @@ -27,3 +70,210 @@ def test_extension_exact_pins_base_package(): pyproject = tomllib.loads((Path(__file__).parents[1] / "pyproject.toml").read_text()) assert pyproject["project"]["dependencies"] == [f"vortex-data=={workspace_version()}"] + + +def test_to_cudf_is_exported(): + assert "to_cudf" in vortex_cuda.__all__ + + +def test_import_installs_array_to_cudf(monkeypatch: pytest.MonkeyPatch): + array = vortex.Array.from_range(range(0, 3)) + calls: list[tuple[object, str]] = [] + + def fake_to_cudf(obj: object, *, fallback: str = "error") -> object: + calls.append((obj, fallback)) + return "cudf-object" + + monkeypatch.setattr(vortex_cuda, "to_cudf", fake_to_cudf) + + assert cast(_CudfArray, cast(object, array)).to_cudf(fallback="error") == "cudf-object" + assert calls == [(array, "error")] + + +def test_import_installs_array_arrow_c_device_array_on_cuda(monkeypatch: pytest.MonkeyPatch): + array = vortex.Array.from_range(range(0, 3)) + + if not vortex_cuda.cuda_available(): + assert not hasattr(array, "__arrow_c_device_array__") + return + + calls: list[tuple[object, object | None, dict[str, object]]] = [] + + def fake_export_device_array( + exported_array: object, + requested_schema: object | None = None, + **kwargs: object, + ) -> tuple[object, object]: + calls.append((exported_array, requested_schema, kwargs)) + return "schema", "device_array" + + monkeypatch.setattr(vortex_cuda, "export_device_array", fake_export_device_array) + + exportable = cast(_ArrowDeviceArrayExportable, cast(object, array)) + assert exportable.__arrow_c_device_array__("requested", future=None) == ("schema", "device_array") + assert calls == [(array, "requested", {"future": None})] + + +def test_to_cudf_rejects_unsupported_fallback(): + with pytest.raises(NotImplementedError, match="fallback='error'"): + _ = vortex_cuda.to_cudf(vortex.Array.from_range(range(0, 3)), fallback="host") + + +def test_to_cudf_rejects_non_vortex_array(): + with pytest.raises(TypeError, match="vortex.Array"): + _ = vortex_cuda.to_cudf(object()) + + +def test_to_cudf_cuda_unavailable_rejects_without_importing_cudf(monkeypatch: pytest.MonkeyPatch): + def fail_import_cudf_modules() -> tuple[object, object]: + raise AssertionError("unexpected import of cuDF modules") + + monkeypatch.setattr(vortex_cuda, "cuda_available", lambda: False) + monkeypatch.setattr("vortex_cuda._import_cudf_modules", fail_import_cudf_modules) + + with pytest.raises(RuntimeError, match="CUDA"): + _ = vortex_cuda.to_cudf(vortex.Array.from_range(range(0, 3))) + + +def test_to_cudf_non_struct_uses_pylibcudf_column_from_arrow(monkeypatch: pytest.MonkeyPatch): + array = vortex.Array.from_range(range(0, 3)) + fake_column = object() + + class FakeSeries: + pass + + fake_series = FakeSeries() + + class FakePylibcudfColumn: + @staticmethod + def from_arrow(obj: object) -> object: + assert obj is array + return fake_column + + class FakePylibcudfTable: + @staticmethod + def from_arrow(_obj: object) -> object: + raise AssertionError("non-struct array should not import through pylibcudf.Table") + + class FakeCudfSeries: + @staticmethod + def from_pylibcudf(column: object) -> object: + assert column is fake_column + return fake_series + + monkeypatch.setattr(vortex_cuda, "cuda_available", lambda: True) + monkeypatch.setitem( + sys.modules, + "pylibcudf", + types.SimpleNamespace(Column=FakePylibcudfColumn, Table=FakePylibcudfTable), + ) + monkeypatch.setitem(sys.modules, "cudf", types.SimpleNamespace(Series=FakeCudfSeries)) + + assert vortex_cuda.to_cudf(array) is fake_series + + +def test_to_cudf_struct_uses_pylibcudf_table_from_arrow(monkeypatch: pytest.MonkeyPatch): + import pyarrow as pa + + array = vortex.Array.from_arrow(pa.table({"x": [1, None, 3], "y": [4.0, 5.0, 6.0]})) + fake_table = object() + + class FakeDataFrame: + pass + + fake_dataframe = FakeDataFrame() + metadata_calls: list[dict[str, object] | None] = [] + + class FakePylibcudfColumn: + @staticmethod + def from_arrow(_obj: object) -> object: + raise AssertionError("struct array should not import through pylibcudf.Column") + + class FakePylibcudfTable: + @staticmethod + def from_arrow(obj: object) -> object: + assert obj is array + return fake_table + + class FakeCudfDataFrame: + @staticmethod + def from_pylibcudf(table: object, metadata: dict[str, object] | None = None) -> object: + assert table is fake_table + metadata_calls.append(metadata) + return fake_dataframe + + monkeypatch.setattr(vortex_cuda, "cuda_available", lambda: True) + monkeypatch.setitem( + sys.modules, + "pylibcudf", + types.SimpleNamespace(Column=FakePylibcudfColumn, Table=FakePylibcudfTable), + ) + monkeypatch.setitem(sys.modules, "cudf", types.SimpleNamespace(DataFrame=FakeCudfDataFrame)) + + assert vortex_cuda.to_cudf(array) is fake_dataframe + assert metadata_calls == [{"columns": ["x", "y"]}] + + +def test_to_cudf_real_cudf_smoke(): + cudf_module = cast(object, pytest.importorskip("cudf")) + pylibcudf_module = cast(object, pytest.importorskip("pylibcudf")) + assert cudf_module is not None + assert pylibcudf_module is not None + + if not vortex_cuda.cuda_available(): + pytest.skip("CUDA device is not available") + + import pyarrow as pa + + int_series = cast( + _CudfSeries, + cast(_CudfArray, cast(object, vortex.array([1, 2, 3]))).to_cudf(), + ) + assert type(int_series).__name__ == "Series" + assert int_series.to_arrow().to_pylist() == [1, 2, 3] + + nullable_int_series = cast( + _CudfSeries, + cast(_CudfArray, cast(object, vortex.array([1, None, 3]))).to_cudf(), + ) + assert type(nullable_int_series).__name__ == "Series" + assert nullable_int_series.to_arrow().to_pylist() == [1, None, 3] + assert "" in repr(nullable_int_series) + + nullable_bool_series = cast( + _CudfSeries, + cast(_CudfArray, cast(object, vortex.array([True, None, False]))).to_cudf(), + ) + assert type(nullable_bool_series).__name__ == "Series" + assert nullable_bool_series.to_arrow().to_pylist() == [True, None, False] + assert "" in repr(nullable_bool_series) + + string_series = cast( + _CudfSeries, + cast( + _CudfArray, + cast(object, vortex.array(["alpha", "beta", "gamma"])), + ).to_cudf(), + ) + assert type(string_series).__name__ == "Series" + assert string_series.to_arrow().to_pylist() == ["alpha", "beta", "gamma"] + + struct_array = vortex.Array.from_arrow(pa.table({"x": [1, None, 3], "y": [4.0, 5.0, 6.0]})) + dataframe = cast( + _CudfDataFrame, + cast(_CudfArray, cast(object, struct_array)).to_cudf(), + ) + assert type(dataframe).__name__ == "DataFrame" + assert list(dataframe.columns) == ["x", "y"] + assert len(dataframe) == 3 + assert "" in repr(dataframe) + assert {name: str(dtype) for name, dtype in dataframe.dtypes.items()} == { + "x": "int64", + "y": "float64", + } + + x_series = dataframe["x"] + del dataframe + _ = gc.collect() + assert x_series.to_arrow().to_pylist() == [1, None, 3] + assert "" in repr(x_series) diff --git a/vortex-python-cuda/test/test_native_bridge.py b/vortex-python-cuda/test/test_native_bridge.py index c98e70a11d5..960467ea25c 100644 --- a/vortex-python-cuda/test/test_native_bridge.py +++ b/vortex-python-cuda/test/test_native_bridge.py @@ -2,12 +2,38 @@ # SPDX-FileCopyrightText: Copyright the Vortex contributors # pyright: reportPrivateUsage=false +import gc + import pytest import vortex_cuda import vortex +def _require_cuda() -> None: + if not vortex_cuda.cuda_available(): + pytest.skip("CUDA device is not available") + + +def _assert_exported_device_array( + array: object, *, length: int, null_count: int, n_children: int +) -> tuple[object, object]: + schema, device_array = vortex_cuda.export_device_array(array) + summary = vortex_cuda._debug_arrow_device_array_capsule_summary(schema, device_array) + + assert summary["schema_live"] is True + assert summary["array_live"] is True + assert summary["is_cuda"] is True + assert summary["length"] == length + assert summary["null_count"] == null_count + assert summary["n_children"] == n_children + n_buffers = summary["n_buffers"] + assert isinstance(n_buffers, int) + assert n_buffers >= 0 + + return schema, device_array + + def test_debug_array_metadata_dtype_reads_base_vortex_array(): array = vortex.Array.from_range(range(0, 3)) @@ -64,3 +90,66 @@ def test_export_device_array_returns_capsules_or_clean_cuda_error(): schema, device_array = vortex_cuda.export_device_array(array) assert type(schema).__name__ == "PyCapsule" assert type(device_array).__name__ == "PyCapsule" + + +def test_arrow_device_export_primitive_array(): + _require_cuda() + + _ = _assert_exported_device_array(vortex.array([1, 2, 3]), length=3, null_count=0, n_children=0) + + +def test_arrow_device_export_nullable_primitive_array(): + _require_cuda() + + _ = _assert_exported_device_array(vortex.array([1, None, 3]), length=3, null_count=1, n_children=0) + + +def test_arrow_device_export_nullable_bool_array(): + _require_cuda() + + _ = _assert_exported_device_array(vortex.array([True, None, False]), length=3, null_count=1, n_children=0) + + +def test_arrow_device_export_string_array(): + _require_cuda() + + _ = _assert_exported_device_array( + vortex.array(["alpha", "beta", "a longer string that should use the varbin data buffer"]), + length=3, + null_count=0, + n_children=0, + ) + + +def test_arrow_device_export_struct_array(): + import pyarrow as pa + + _require_cuda() + + arrow_table = pa.table({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]}) + struct_array = vortex.Array.from_arrow( + pa.StructArray.from_arrays( # pyright: ignore[reportUnknownMemberType] + [arrow_table.column("a").combine_chunks(), arrow_table.column("b").combine_chunks()], + names=["a", "b"], + ) + ) + + _ = _assert_exported_device_array(struct_array, length=3, null_count=0, n_children=2) + + +def test_arrow_device_capsules_drop_unconsumed(): + _require_cuda() + + schema, device_array = _assert_exported_device_array(vortex.array([1, 2, 3]), length=3, null_count=0, n_children=0) + del schema, device_array + _ = gc.collect() + + +def test_arrow_device_capsules_consumer_release_and_used_names(): + _require_cuda() + + schema, device_array = _assert_exported_device_array(vortex.array([1, 2, 3]), length=3, null_count=0, n_children=0) + consume_result = vortex_cuda._debug_consume_arrow_device_array_capsules(schema, device_array) + assert consume_result == (True, True, True, True, True, True) + del schema, device_array + _ = gc.collect()