From 475fabacee987a801344b52f9b1d6f3b932ae960 Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Thu, 14 May 2026 21:11:19 -0700 Subject: [PATCH 1/2] geotiff: extract read_geotiff_dask to _backends/dask.py (#1886) Step 8 of #1813's multi-PR refactor of __init__.py. Pure code motion; no public API change. Moved into a new xrspatial/geotiff/_backends/dask.py: - read_geotiff_dask: dask read entry point (~350 lines), with the HTTP/fsspec COG metadata prefetch, MinIsWhite nodata-inversion detection, window clipping, max_pixels guard, and 50k-task graph cap intact. - _delayed_read_window: the per-chunk reader closure (~100 lines) that fans out via dask.delayed. Function bodies unchanged except for adjusted relative imports (``._X`` -> ``.._X``). Two helpers still living in __init__.py (``read_vrt`` and ``_read_geo_info``) are lazy-imported inside the function bodies to avoid a circular import; later PRs in #1813 will move them out and the lazy imports can become static. Promotes the matching lazy import on the other side: _backends/gpu.py's ``from .. import read_geotiff_dask`` (added for the same circular- import reason in #1895) becomes a static top-of-module ``from .dask import read_geotiff_dask`` now that the target is at a sibling path. Test changes ============ - test_dask_no_op_astype_1624.py monkeypatches ``_read_to_array``; the target moves from ``xrspatial.geotiff`` to ``xrspatial.geotiff._backends.dask`` so the spy still intercepts the per-chunk worker. Same pattern as the GPU spy update in #1895. - Restored the ``_apply_nodata_mask_gpu`` re-export from __init__.py (with a ``# noqa: F401`` marker). PR #1895's review-fix commit dropped this re-import after concluding it had no internal callers; it overlooked five test imports across ``test_nodata_nan_int_1774`` and ``test_nodata_out_of_range_1581`` that exercise the helper directly. Restoring the re-export fixes 6 tests that were failing on main since #1895 merged. Diff stats ========== - __init__.py: 2962 -> 2504 (-458 lines), plus 1 re-import line. - New _backends/dask.py: 481 lines. Verification: pixel-parity matrix from #1889 (192 cells), runtime identity (9), dask int nodata chunks (#1597), attrs parity (#1548), dask no-op astype (#1624), gpu chunks out-of-core (#1876), full geotiff suite minus the 8 pre-existing main failures -- 2862/2862 pass. Refs #1813. --- xrspatial/geotiff/__init__.py | 458 +---------------- xrspatial/geotiff/_backends/dask.py | 481 ++++++++++++++++++ xrspatial/geotiff/_backends/gpu.py | 6 +- .../tests/test_dask_no_op_astype_1624.py | 10 +- 4 files changed, 492 insertions(+), 463 deletions(-) create mode 100644 xrspatial/geotiff/_backends/dask.py diff --git a/xrspatial/geotiff/__init__.py b/xrspatial/geotiff/__init__.py index 7ad72b8f..1c643d48 100644 --- a/xrspatial/geotiff/__init__.py +++ b/xrspatial/geotiff/__init__.py @@ -68,7 +68,11 @@ _populate_attrs_from_geo_info, _resolve_nodata_attr, ) -from ._backends._gpu_helpers import _is_gpu_data +from ._backends._gpu_helpers import ( # noqa: F401 -- _apply_nodata_mask_gpu re-export + _apply_nodata_mask_gpu, + _is_gpu_data, +) +from ._backends.dask import read_geotiff_dask from ._backends.gpu import read_geotiff_gpu from ._crs import _resolve_crs_to_wkt, _wkt_to_epsg from ._reader import read_to_array as _read_to_array @@ -1330,458 +1334,6 @@ def _write_vrt_tiled(data, vrt_path, *, crs=None, nodata=None, _write_vrt_fn(vrt_path, tile_paths, relative=True, nodata=nodata) -def read_geotiff_dask(source: str, *, - dtype: str | np.dtype | None = None, - chunks: int | tuple = 512, - overview_level: int | None = None, - window: tuple | None = None, - band: int | None = None, - max_pixels: int | None = None, - name: str | None = None) -> xr.DataArray: - """Read a GeoTIFF as a dask-backed DataArray for out-of-core processing. - - Each chunk is loaded lazily via windowed reads. - - Parameters - ---------- - source : str - File path. - dtype : str, numpy.dtype, or None - Cast each chunk to this dtype after reading. None keeps the - file's native dtype. Float-to-int casts raise ValueError. - chunks : int or (row_chunk, col_chunk) tuple - Chunk size in pixels. Default 512. - overview_level : int or None - Overview level (0 = full resolution). - window : tuple or None - ``(row_start, col_start, row_stop, col_stop)`` to restrict - chunking to a sub-region of the file. Chunks are laid out - relative to the window origin. None reads the full raster. - band : int or None - Zero-based band index. None returns all bands (3D for - multi-band files, 2D for single-band). Selecting a single band - produces a 2D DataArray. - max_pixels : int or None - Maximum allowed pixel count (width * height * samples) for the - windowed region. None uses the reader default (~1 billion). - The cap is checked once up-front against the lazy region; each - chunk task also re-checks against ``max_pixels`` so windowed - reads stay bounded even when ``read_to_array`` is invoked - directly. - name : str or None - Name for the DataArray. - - Returns - ------- - xr.DataArray - Dask-backed DataArray with y/x coordinates. - """ - import dask.array as da - - from ._reader import _coerce_path - - source = _coerce_path(source) - - # Reject non-positive chunk sizes up front. ``chunks=0`` and negative - # values otherwise propagate into dask chunk math (``range(0, N, 0)`` - # ValueError, or empty chunk grids) with no indication that ``chunks`` - # was the problem. Shared with ``read_geotiff_gpu`` / ``read_vrt`` via - # ``_validate_chunks_arg`` so all three entry points emit the same - # error format (#1752 / #1776). ``allow_none=False`` (the default) - # rejects ``chunks=None`` with the same ValueError; this entry point - # requires a concrete chunk size since the chunk-unpacking math below - # would otherwise fail with a confusing TypeError. - chunks = _validate_chunks_arg(chunks) - - # ``open_geotiff`` already routes ``.vrt`` to ``read_vrt`` before - # reaching here, so this branch is only hit when ``read_geotiff_dask`` - # is called directly with a VRT path. Keep it as a defensive fallback - # rather than letting the windowed-read path try to parse VRT XML as - # TIFF bytes. ``read_vrt`` is the single source of truth for VRT. - if isinstance(source, str) and source.lower().endswith('.vrt'): - return read_vrt( - source, dtype=dtype, window=window, band=band, name=name, - chunks=chunks, max_pixels=max_pixels, - ) - - # P5: HTTP COG sources used to fire one IFD/header GET per chunk - # task. Parse metadata once here so every delayed task can reuse it. - # The same prefetch path also covers fsspec URIs (s3://, gs://, ...); - # ``_parse_cog_http_meta`` only needs a ``read_range``-having source, - # and ``_CloudSource`` satisfies that contract. Going through it - # bounds metadata reads to ``MAX_HTTP_HEADER_BYTES`` instead of - # fetching the whole remote object up front. See PR #1755 review. - is_http = ( - isinstance(source, str) - and source.startswith(('http://', 'https://')) - ) - from ._reader import _is_fsspec_uri - is_fsspec = isinstance(source, str) and _is_fsspec_uri(source) - http_meta = None - http_meta_key = None - if is_http or is_fsspec: - import dask - from ._reader import _parse_cog_http_meta - if is_http: - from ._reader import _HTTPSource - _src = _HTTPSource(source) - else: - from ._reader import _CloudSource - _src = _CloudSource(source) - try: - http_header, http_ifd, http_geo, _ = _parse_cog_http_meta( - _src, overview_level=overview_level) - finally: - _src.close() - http_meta = (http_header, http_ifd) - if http_ifd.orientation != 1: - raise ValueError( - f"Orientation tag (274) is {http_ifd.orientation}; " - f"dask-chunked reads (chunks=...) are not supported for " - f"non-default orientation on remote GeoTIFF sources. Read " - f"the full array first, then slice/chunk it." - ) - # Wrap the parsed metadata in a single dask Delayed so every - # window task takes it as a graph input, not a Python closure. - # Without this, the (TIFFHeader, IFD) pair -- which can carry - # multi-million-entry TileOffsets/TileByteCounts tuples on - # large COGs -- would be embedded in each chunk task and - # serialised N times under distributed/process schedulers. - http_meta_key = dask.delayed(http_meta, pure=True) - geo_info = http_geo - full_h = http_ifd.height - full_w = http_ifd.width - from ._dtypes import resolve_bits_per_sample, tiff_dtype_to_numpy - bps = resolve_bits_per_sample(http_ifd.bits_per_sample) - file_dtype = tiff_dtype_to_numpy(bps, http_ifd.sample_format) - n_bands = ( - http_ifd.samples_per_pixel - if http_ifd.samples_per_pixel > 1 else 0 - ) - # Stash IFD photometric for the MinIsWhite nodata-inversion check below. - geo_info._ifd_photometric = http_ifd.photometric - geo_info._ifd_samples_per_pixel = http_ifd.samples_per_pixel - else: - # Metadata-only read: O(1) memory via mmap, no pixel decompression - geo_info, full_h, full_w, file_dtype, n_bands = _read_geo_info( - source, overview_level=overview_level) - nodata = geo_info.nodata - nodata_attr = nodata # original sentinel preserved for attrs['nodata'] - # When the source is MinIsWhite (photometric == 0, samples_per_pixel == 1), - # the per-chunk reader inverts pixel values before this closure's nodata - # mask runs. Track the inverted sentinel so the mask compares against the - # post-inversion value, not the original (#1809). - if nodata is not None: - _phm = getattr(geo_info, '_ifd_photometric', None) - _spp = getattr(geo_info, '_ifd_samples_per_pixel', None) - if _phm == 0 and _spp == 1: - if file_dtype.kind == 'u' and np.isfinite(nodata) and \ - float(nodata).is_integer(): - vi = int(nodata) - info = np.iinfo(file_dtype) - if info.min <= vi <= info.max: - nodata = info.max - vi - elif file_dtype.kind == 'f' and not np.isnan(nodata): - nodata = -float(nodata) - - # Nodata masking promotes integer arrays to float64 (for NaN). - # Validate against the effective dtype, not the raw file dtype. - # An out-of-range sentinel (e.g. uint16 file + nodata=-9999) is a - # no-op for masking and leaves the file dtype unchanged. A - # non-finite sentinel ("NaN" / "Inf" GDAL_NODATA strings) cannot - # match an integer pixel either and is short-circuited via the - # ``np.isfinite`` gate so the ``int(...)`` cast never sees NaN - # (#1774). A fractional sentinel (e.g. ``"3.5"`` on a ``uint16`` - # file) also cannot match an integer pixel and ``int(3.5)`` would - # truncate to 3, silently flagging a real pixel value as nodata; - # gate on ``float(nodata).is_integer()`` as well so fractional - # tags stay on the no-op path. The try/except keeps callers that - # pass an exotic ``nodata`` type (e.g. complex) on the no-op path - # rather than surfacing an opaque error here. - effective_dtype = file_dtype - if (nodata is not None - and file_dtype.kind in ('u', 'i') - and np.isfinite(nodata) - and float(nodata).is_integer()): - try: - _nd_int = int(nodata) - _info = np.iinfo(file_dtype) - if _info.min <= _nd_int <= _info.max: - effective_dtype = np.dtype('float64') - except (TypeError, ValueError): - pass - - if dtype is not None: - target_dtype = np.dtype(dtype) - _validate_dtype_cast(effective_dtype, target_dtype) - else: - target_dtype = effective_dtype - - # Window clipping: restrict the lazy region to the requested - # sub-rectangle. ``read_to_array`` already accepts ``window=`` per - # chunk; we only need to remap the chunk grid so its origin moves to - # ``(win_r0, win_c0)`` and its extent shrinks to the window. - win_r0 = win_c0 = 0 - if window is not None: - win_r0, win_c0, win_r1, win_c1 = window - if (win_r0 < 0 or win_c0 < 0 - or win_r1 > full_h or win_c1 > full_w - or win_r0 >= win_r1 or win_c0 >= win_c1): - raise ValueError( - f"window={window} is outside the source extent " - f"({full_h}x{full_w}) or has non-positive size.") - # Mirror the eager-path windowed coord computation in open_geotiff, - # including the ``has_georef=False`` shortcut to integer pixel - # coords for non-georef files (#1710). - coords = _coords_from_geo_info( - geo_info, win_r1 - win_r0, win_c1 - win_c0, window=window, - ) - full_h = win_r1 - win_r0 - full_w = win_c1 - win_c0 - else: - coords = _geo_to_coords(geo_info, full_h, full_w) - - if band is not None: - # Reject ``bool`` and ``np.bool_`` up front; ``isinstance(True, int)`` - # is True in Python so ``True < n_bands`` evaluates without raising - # and silently reads band 1. ``np.bool_`` is not a subclass of - # ``bool`` so it needs its own check to match the VRT path's - # rejection. See #1786. - if isinstance(band, (bool, np.bool_)): - raise ValueError( - f"band must be a non-negative int, got {band!r}") - if n_bands == 0: - if band != 0: - raise IndexError( - f"band={band} requested on a single-band file.") - elif not 0 <= band < n_bands: - raise IndexError( - f"band={band} out of range for {n_bands}-band file.") - - # Up-front pixel-count guard against the windowed extent. Chunk - # tasks re-check via read_to_array's own ``max_pixels`` (which we - # forward through ``_delayed_read_window``), but catching an - # oversized request before any task is scheduled saves the caller - # from a misleading "tile size exceeds max_pixels" error in a - # chunk that happens to align with the file's tile grid. - # ``max_pixels=None`` substitutes the module default to match the - # eager (``read_to_array``) and VRT chunked paths. Without the - # substitution the guard would skip entirely on ``None`` and a - # caller could build a lazy graph over a region far larger than the - # documented safety cap. See issue #1838. - from ._reader import MAX_PIXELS_DEFAULT as _MAX_PIXELS_DEFAULT - effective_max_pixels = (max_pixels if max_pixels is not None - else _MAX_PIXELS_DEFAULT) - eff_bands = (1 if band is not None - else (n_bands if n_bands > 0 else 1)) - if full_h * full_w * eff_bands > effective_max_pixels: - raise ValueError( - f"Requested region {full_h}x{full_w}x{eff_bands} " - f"exceeds max_pixels={effective_max_pixels:,}.") - - if name is None: - import os - name = os.path.splitext(os.path.basename(source))[0] - - attrs = {} - _populate_attrs_from_geo_info(attrs, geo_info, window=window) - if nodata_attr is not None: - attrs['nodata'] = nodata_attr - - if isinstance(chunks, int): - ch_h = ch_w = chunks - else: - ch_h, ch_w = chunks - - # Graph-size guard. Each chunk becomes a delayed task whose Python graph - # entry retains ~1KB. At very large chunk counts the graph itself OOMs - # the driver before any read executes (30TB at chunks=256 => ~500M tasks - # => ~500GB graph on host). Refuse anything past the cap and ask the - # caller to pick a chunk size, rather than silently rescaling -- the - # rescaled chunks may not align with the user's downstream pipeline. - _MAX_DASK_CHUNKS = 50_000 - n_chunks = ((full_h + ch_h - 1) // ch_h) * ((full_w + ch_w - 1) // ch_w) - if n_chunks > _MAX_DASK_CHUNKS: - import math - scale = math.sqrt(n_chunks / _MAX_DASK_CHUNKS) - suggested_h = int(math.ceil(ch_h * scale)) - suggested_w = int(math.ceil(ch_w * scale)) - raise ValueError( - f"read_geotiff_dask: chunks=({ch_h}, {ch_w}) on a " - f"{full_h}x{full_w} image would produce {n_chunks:,} dask " - f"tasks, exceeding the {_MAX_DASK_CHUNKS:,}-task cap. Pass a " - f"larger chunks=... value explicitly (e.g. chunks=" - f"({suggested_h}, {suggested_w}) keeps the task count under " - "the cap)." - ) - - # Build dask array from delayed windowed reads - rows = list(range(0, full_h, ch_h)) - cols = list(range(0, full_w, ch_w)) - - # For multi-band, each window read returns (h, w, bands); for single-band (h, w) - # read_to_array with band=0 extracts a single band, band=None returns all - band_arg = band # None => all bands (or 2D for single-band file) - - # When ``band`` is set, each chunk reads a 2D slice -- collapse the - # output dims so the returned DataArray is 2D regardless of file band - # count. - out_has_band_axis = band is None and n_bands > 0 - - dask_rows = [] - for r0 in rows: - r1 = min(r0 + ch_h, full_h) - dask_cols = [] - for c0 in cols: - c1 = min(c0 + ch_w, full_w) - if out_has_band_axis: - block_shape = (r1 - r0, c1 - c0, n_bands) - else: - block_shape = (r1 - r0, c1 - c0) - # Translate window-relative chunk coords back to file-relative - # coords for ``read_to_array``. ``win_r0`` / ``win_c0`` are 0 - # when no window was requested. - # Always thread ``target_dtype`` so each delayed chunk lands - # in the same dtype that the dask array declared. Without - # this, an integer raster with an in-range nodata sentinel - # would have ``effective_dtype=float64`` declared on the - # dask graph but per-chunk arrays only promoted when a - # chunk happened to contain a sentinel pixel. Dask - # concatenation then preallocates from the first chunk's - # actual dtype (uint16), silently casting later float64 - # chunks back to int and converting their NaNs to 0. See - # issue #1597. - block = da.from_delayed( - _delayed_read_window(source, - r0 + win_r0, c0 + win_c0, - r1 + win_r0, c1 + win_c0, - overview_level, nodata, - band_arg, - target_dtype=target_dtype, - http_meta_key=http_meta_key, - max_pixels=max_pixels), - shape=block_shape, - dtype=target_dtype, - ) - dask_cols.append(block) - dask_rows.append(da.concatenate(dask_cols, axis=1)) - - dask_arr = da.concatenate(dask_rows, axis=0) - - if out_has_band_axis: - dims = ['y', 'x', 'band'] - coords['band'] = np.arange(n_bands) - else: - dims = ['y', 'x'] - - return xr.DataArray( - dask_arr, dims=dims, coords=coords, name=name, attrs=attrs, - ) - - -def _delayed_read_window(source, r0, c0, r1, c1, overview_level, nodata, - band, *, target_dtype=None, http_meta_key=None, - max_pixels=None): - """Dask-delayed function to read a single window. - - *http_meta_key* is an optional ``Delayed[(TIFFHeader, IFD)]`` parsed - once by :func:`read_geotiff_dask` and wrapped via ``dask.delayed``. - Passing it as a function argument (rather than a closure capture) - makes the metadata a single graph input that all window tasks - depend on, so distributed/process schedulers serialise it once - instead of once per chunk. For local sources it is ``None``. - """ - import dask - - @dask.delayed - def _read(http_meta): - # The prefetched-metadata fast path covers both HTTP COGs and - # fsspec-addressable remotes (s3://, gs://, az://, memory://, ...). - # Both source classes expose ``read_range``, which is all - # ``_fetch_decode_cog_http_tiles`` needs. - _is_http_src = isinstance(source, str) and source.startswith( - ('http://', 'https://')) - _is_fsspec_src = False - if http_meta is not None and isinstance(source, str) and \ - not _is_http_src: - from ._reader import _is_fsspec_uri as _ifs - _is_fsspec_src = _ifs(source) - if http_meta is not None and (_is_http_src or _is_fsspec_src): - from ._reader import ( - _fetch_decode_cog_http_tiles, - MAX_PIXELS_DEFAULT, - _apply_photometric_miniswhite, - ) - header, ifd = http_meta - if _is_http_src: - from ._reader import _HTTPSource - src = _HTTPSource(source) - else: - from ._reader import _CloudSource - src = _CloudSource(source) - try: - arr = _fetch_decode_cog_http_tiles( - src, header, ifd, - max_pixels=(max_pixels if max_pixels is not None - else MAX_PIXELS_DEFAULT), - window=(r0, c0, r1, c1)) - finally: - src.close() - if (arr.ndim == 3 and ifd.samples_per_pixel > 1 - and band is not None): - arr = arr[:, :, band] - arr = _apply_photometric_miniswhite(arr, ifd) - else: - _r2a_kwargs = {} - if max_pixels is not None: - _r2a_kwargs['max_pixels'] = max_pixels - arr, _ = _read_to_array(source, window=(r0, c0, r1, c1), - overview_level=overview_level, - band=band, **_r2a_kwargs) - if nodata is not None: - # ``arr`` was just decoded by ``_fetch_decode_cog_http_tiles`` - # or ``read_to_array``; both return freshly-allocated buffers - # that nothing else references, so the in-place sentinel - # rewrite is safe. Skip the defensive ``arr.copy()`` to - # avoid a peak-memory doubler on every dask chunk. - if arr.dtype.kind == 'f' and not np.isnan(nodata): - arr[arr == arr.dtype.type(nodata)] = np.nan - elif (arr.dtype.kind in ('u', 'i') - and np.isfinite(nodata) - and float(nodata).is_integer()): - # Out-of-range sentinels (e.g. uint16 + nodata=-9999) - # cannot match any pixel; skip the cast that would - # otherwise raise OverflowError and leave arr unchanged. - # Non-finite sentinels ("NaN" / "Inf" GDAL_NODATA strings) - # also cannot match an integer pixel and would raise - # ValueError on ``int(nodata)``; the ``np.isfinite`` gate - # mirrors ``_resolve_masked_fill`` in ``_reader.py`` - # (#1774). Fractional sentinels (e.g. ``"3.5"`` on a - # ``uint16`` file) also cannot match an integer pixel and - # ``int(3.5)`` would truncate to 3 and silently mask - # pixel value 3; the ``float(nodata).is_integer()`` gate - # short-circuits them too. - nodata_int = int(nodata) - info = np.iinfo(arr.dtype) - if info.min <= nodata_int <= info.max: - mask = arr == arr.dtype.type(nodata_int) - if mask.any(): - arr = arr.astype(np.float64) - arr[mask] = np.nan - if target_dtype is not None and arr.dtype != target_dtype: - # Skip the cast when dtype already matches. ``numpy.astype`` - # defaults to ``copy=True`` and would otherwise allocate a - # full chunk-sized buffer and memcpy on every read just to - # land in the same dtype the array already has. The int-> - # float64 promotion above (sentinel-hit branch) keeps the - # contract that every chunk lands in the dask-declared - # dtype; this guard only elides no-op casts. See #1624. - arr = arr.astype(target_dtype) - return arr - return _read(http_meta_key) - - - def write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray, path: str | BinaryIO, *, crs: int | str | None = None, diff --git a/xrspatial/geotiff/_backends/dask.py b/xrspatial/geotiff/_backends/dask.py new file mode 100644 index 00000000..0c4207d9 --- /dev/null +++ b/xrspatial/geotiff/_backends/dask.py @@ -0,0 +1,481 @@ +"""Dask read backend: ``read_geotiff_dask`` and ``_delayed_read_window``. + +Step 8 of issue #1813. With validators (#1882), attrs helpers (#1883), +and runtime sentinels (#1880) already extracted, the dask entry-point +body and its delayed-read helper move cleanly into this module. The +``read_vrt`` and ``_read_geo_info`` calls inside ``read_geotiff_dask`` +still target ``__init__.py``; both are lazy-imported inside the +function body to avoid a circular import (``__init__.py`` re-exports +``read_geotiff_dask`` from here). +""" +from __future__ import annotations + +import numpy as np +import xarray as xr + +from .._attrs import _populate_attrs_from_geo_info +from .._coords import ( + coords_from_geo_info as _coords_from_geo_info, + geo_to_coords as _geo_to_coords, +) +from .._reader import read_to_array as _read_to_array +from .._validation import _validate_chunks_arg, _validate_dtype_cast + + +def read_geotiff_dask(source: str, *, + dtype: str | np.dtype | None = None, + chunks: int | tuple = 512, + overview_level: int | None = None, + window: tuple | None = None, + band: int | None = None, + max_pixels: int | None = None, + name: str | None = None) -> xr.DataArray: + """Read a GeoTIFF as a dask-backed DataArray for out-of-core processing. + + Each chunk is loaded lazily via windowed reads. + + Parameters + ---------- + source : str + File path. + dtype : str, numpy.dtype, or None + Cast each chunk to this dtype after reading. None keeps the + file's native dtype. Float-to-int casts raise ValueError. + chunks : int or (row_chunk, col_chunk) tuple + Chunk size in pixels. Default 512. + overview_level : int or None + Overview level (0 = full resolution). + window : tuple or None + ``(row_start, col_start, row_stop, col_stop)`` to restrict + chunking to a sub-region of the file. Chunks are laid out + relative to the window origin. None reads the full raster. + band : int or None + Zero-based band index. None returns all bands (3D for + multi-band files, 2D for single-band). Selecting a single band + produces a 2D DataArray. + max_pixels : int or None + Maximum allowed pixel count (width * height * samples) for the + windowed region. None uses the reader default (~1 billion). + The cap is checked once up-front against the lazy region; each + chunk task also re-checks against ``max_pixels`` so windowed + reads stay bounded even when ``read_to_array`` is invoked + directly. + name : str or None + Name for the DataArray. + + Returns + ------- + xr.DataArray + Dask-backed DataArray with y/x coordinates. + """ + import dask.array as da + + from .._reader import _coerce_path + + source = _coerce_path(source) + + # Reject non-positive chunk sizes up front. ``chunks=0`` and negative + # values otherwise propagate into dask chunk math (``range(0, N, 0)`` + # ValueError, or empty chunk grids) with no indication that ``chunks`` + # was the problem. Shared with ``read_geotiff_gpu`` / ``read_vrt`` via + # ``_validate_chunks_arg`` so all three entry points emit the same + # error format (#1752 / #1776). ``allow_none=False`` (the default) + # rejects ``chunks=None`` with the same ValueError; this entry point + # requires a concrete chunk size since the chunk-unpacking math below + # would otherwise fail with a confusing TypeError. + chunks = _validate_chunks_arg(chunks) + + # ``open_geotiff`` already routes ``.vrt`` to ``read_vrt`` before + # reaching here, so this branch is only hit when ``read_geotiff_dask`` + # is called directly with a VRT path. Keep it as a defensive fallback + # rather than letting the windowed-read path try to parse VRT XML as + # TIFF bytes. ``read_vrt`` is the single source of truth for VRT. + if isinstance(source, str) and source.lower().endswith('.vrt'): + # Lazy import: ``read_vrt`` still lives in ``xrspatial.geotiff`` + # (extracted in a later step of #1813). The package re-exports + # this function so a static ``from .. import read_vrt`` would + # create a circular import at module load time. + from .. import read_vrt + return read_vrt( + source, dtype=dtype, window=window, band=band, name=name, + chunks=chunks, max_pixels=max_pixels, + ) + + # P5: HTTP COG sources used to fire one IFD/header GET per chunk + # task. Parse metadata once here so every delayed task can reuse it. + # The same prefetch path also covers fsspec URIs (s3://, gs://, ...); + # ``_parse_cog_http_meta`` only needs a ``read_range``-having source, + # and ``_CloudSource`` satisfies that contract. Going through it + # bounds metadata reads to ``MAX_HTTP_HEADER_BYTES`` instead of + # fetching the whole remote object up front. See PR #1755 review. + is_http = ( + isinstance(source, str) + and source.startswith(('http://', 'https://')) + ) + from .._reader import _is_fsspec_uri + is_fsspec = isinstance(source, str) and _is_fsspec_uri(source) + http_meta = None + http_meta_key = None + if is_http or is_fsspec: + import dask + from .._reader import _parse_cog_http_meta + if is_http: + from .._reader import _HTTPSource + _src = _HTTPSource(source) + else: + from .._reader import _CloudSource + _src = _CloudSource(source) + try: + http_header, http_ifd, http_geo, _ = _parse_cog_http_meta( + _src, overview_level=overview_level) + finally: + _src.close() + http_meta = (http_header, http_ifd) + if http_ifd.orientation != 1: + raise ValueError( + f"Orientation tag (274) is {http_ifd.orientation}; " + f"dask-chunked reads (chunks=...) are not supported for " + f"non-default orientation on remote GeoTIFF sources. Read " + f"the full array first, then slice/chunk it." + ) + # Wrap the parsed metadata in a single dask Delayed so every + # window task takes it as a graph input, not a Python closure. + # Without this, the (TIFFHeader, IFD) pair -- which can carry + # multi-million-entry TileOffsets/TileByteCounts tuples on + # large COGs -- would be embedded in each chunk task and + # serialised N times under distributed/process schedulers. + http_meta_key = dask.delayed(http_meta, pure=True) + geo_info = http_geo + full_h = http_ifd.height + full_w = http_ifd.width + from .._dtypes import resolve_bits_per_sample, tiff_dtype_to_numpy + bps = resolve_bits_per_sample(http_ifd.bits_per_sample) + file_dtype = tiff_dtype_to_numpy(bps, http_ifd.sample_format) + n_bands = ( + http_ifd.samples_per_pixel + if http_ifd.samples_per_pixel > 1 else 0 + ) + # Stash IFD photometric for the MinIsWhite nodata-inversion check below. + geo_info._ifd_photometric = http_ifd.photometric + geo_info._ifd_samples_per_pixel = http_ifd.samples_per_pixel + else: + # Metadata-only read: O(1) memory via mmap, no pixel decompression. + # Lazy import for the same circular-import reason as ``read_vrt`` + # above: ``_read_geo_info`` still lives in ``xrspatial.geotiff``. + from .. import _read_geo_info + geo_info, full_h, full_w, file_dtype, n_bands = _read_geo_info( + source, overview_level=overview_level) + nodata = geo_info.nodata + nodata_attr = nodata # original sentinel preserved for attrs['nodata'] + # When the source is MinIsWhite (photometric == 0, samples_per_pixel == 1), + # the per-chunk reader inverts pixel values before this closure's nodata + # mask runs. Track the inverted sentinel so the mask compares against the + # post-inversion value, not the original (#1809). + if nodata is not None: + _phm = getattr(geo_info, '_ifd_photometric', None) + _spp = getattr(geo_info, '_ifd_samples_per_pixel', None) + if _phm == 0 and _spp == 1: + if file_dtype.kind == 'u' and np.isfinite(nodata) and \ + float(nodata).is_integer(): + vi = int(nodata) + info = np.iinfo(file_dtype) + if info.min <= vi <= info.max: + nodata = info.max - vi + elif file_dtype.kind == 'f' and not np.isnan(nodata): + nodata = -float(nodata) + + # Nodata masking promotes integer arrays to float64 (for NaN). + # Validate against the effective dtype, not the raw file dtype. + # An out-of-range sentinel (e.g. uint16 file + nodata=-9999) is a + # no-op for masking and leaves the file dtype unchanged. A + # non-finite sentinel ("NaN" / "Inf" GDAL_NODATA strings) cannot + # match an integer pixel either and is short-circuited via the + # ``np.isfinite`` gate so the ``int(...)`` cast never sees NaN + # (#1774). A fractional sentinel (e.g. ``"3.5"`` on a ``uint16`` + # file) also cannot match an integer pixel and ``int(3.5)`` would + # truncate to 3, silently flagging a real pixel value as nodata; + # gate on ``float(nodata).is_integer()`` as well so fractional + # tags stay on the no-op path. The try/except keeps callers that + # pass an exotic ``nodata`` type (e.g. complex) on the no-op path + # rather than surfacing an opaque error here. + effective_dtype = file_dtype + if (nodata is not None + and file_dtype.kind in ('u', 'i') + and np.isfinite(nodata) + and float(nodata).is_integer()): + try: + _nd_int = int(nodata) + _info = np.iinfo(file_dtype) + if _info.min <= _nd_int <= _info.max: + effective_dtype = np.dtype('float64') + except (TypeError, ValueError): + pass + + if dtype is not None: + target_dtype = np.dtype(dtype) + _validate_dtype_cast(effective_dtype, target_dtype) + else: + target_dtype = effective_dtype + + # Window clipping: restrict the lazy region to the requested + # sub-rectangle. ``read_to_array`` already accepts ``window=`` per + # chunk; we only need to remap the chunk grid so its origin moves to + # ``(win_r0, win_c0)`` and its extent shrinks to the window. + win_r0 = win_c0 = 0 + if window is not None: + win_r0, win_c0, win_r1, win_c1 = window + if (win_r0 < 0 or win_c0 < 0 + or win_r1 > full_h or win_c1 > full_w + or win_r0 >= win_r1 or win_c0 >= win_c1): + raise ValueError( + f"window={window} is outside the source extent " + f"({full_h}x{full_w}) or has non-positive size.") + # Mirror the eager-path windowed coord computation in open_geotiff, + # including the ``has_georef=False`` shortcut to integer pixel + # coords for non-georef files (#1710). + coords = _coords_from_geo_info( + geo_info, win_r1 - win_r0, win_c1 - win_c0, window=window, + ) + full_h = win_r1 - win_r0 + full_w = win_c1 - win_c0 + else: + coords = _geo_to_coords(geo_info, full_h, full_w) + + if band is not None: + # Reject ``bool`` and ``np.bool_`` up front; ``isinstance(True, int)`` + # is True in Python so ``True < n_bands`` evaluates without raising + # and silently reads band 1. ``np.bool_`` is not a subclass of + # ``bool`` so it needs its own check to match the VRT path's + # rejection. See #1786. + if isinstance(band, (bool, np.bool_)): + raise ValueError( + f"band must be a non-negative int, got {band!r}") + if n_bands == 0: + if band != 0: + raise IndexError( + f"band={band} requested on a single-band file.") + elif not 0 <= band < n_bands: + raise IndexError( + f"band={band} out of range for {n_bands}-band file.") + + # Up-front pixel-count guard against the windowed extent. Chunk + # tasks re-check via read_to_array's own ``max_pixels`` (which we + # forward through ``_delayed_read_window``), but catching an + # oversized request before any task is scheduled saves the caller + # from a misleading "tile size exceeds max_pixels" error in a + # chunk that happens to align with the file's tile grid. + # ``max_pixels=None`` substitutes the module default to match the + # eager (``read_to_array``) and VRT chunked paths. Without the + # substitution the guard would skip entirely on ``None`` and a + # caller could build a lazy graph over a region far larger than the + # documented safety cap. See issue #1838. + from .._reader import MAX_PIXELS_DEFAULT as _MAX_PIXELS_DEFAULT + effective_max_pixels = (max_pixels if max_pixels is not None + else _MAX_PIXELS_DEFAULT) + eff_bands = (1 if band is not None + else (n_bands if n_bands > 0 else 1)) + if full_h * full_w * eff_bands > effective_max_pixels: + raise ValueError( + f"Requested region {full_h}x{full_w}x{eff_bands} " + f"exceeds max_pixels={effective_max_pixels:,}.") + + if name is None: + import os + name = os.path.splitext(os.path.basename(source))[0] + + attrs = {} + _populate_attrs_from_geo_info(attrs, geo_info, window=window) + if nodata_attr is not None: + attrs['nodata'] = nodata_attr + + if isinstance(chunks, int): + ch_h = ch_w = chunks + else: + ch_h, ch_w = chunks + + # Graph-size guard. Each chunk becomes a delayed task whose Python graph + # entry retains ~1KB. At very large chunk counts the graph itself OOMs + # the driver before any read executes (30TB at chunks=256 => ~500M tasks + # => ~500GB graph on host). Refuse anything past the cap and ask the + # caller to pick a chunk size, rather than silently rescaling -- the + # rescaled chunks may not align with the user's downstream pipeline. + _MAX_DASK_CHUNKS = 50_000 + n_chunks = ((full_h + ch_h - 1) // ch_h) * ((full_w + ch_w - 1) // ch_w) + if n_chunks > _MAX_DASK_CHUNKS: + import math + scale = math.sqrt(n_chunks / _MAX_DASK_CHUNKS) + suggested_h = int(math.ceil(ch_h * scale)) + suggested_w = int(math.ceil(ch_w * scale)) + raise ValueError( + f"read_geotiff_dask: chunks=({ch_h}, {ch_w}) on a " + f"{full_h}x{full_w} image would produce {n_chunks:,} dask " + f"tasks, exceeding the {_MAX_DASK_CHUNKS:,}-task cap. Pass a " + f"larger chunks=... value explicitly (e.g. chunks=" + f"({suggested_h}, {suggested_w}) keeps the task count under " + "the cap)." + ) + + # Build dask array from delayed windowed reads + rows = list(range(0, full_h, ch_h)) + cols = list(range(0, full_w, ch_w)) + + # For multi-band, each window read returns (h, w, bands); for single-band (h, w) + # read_to_array with band=0 extracts a single band, band=None returns all + band_arg = band # None => all bands (or 2D for single-band file) + + # When ``band`` is set, each chunk reads a 2D slice -- collapse the + # output dims so the returned DataArray is 2D regardless of file band + # count. + out_has_band_axis = band is None and n_bands > 0 + + dask_rows = [] + for r0 in rows: + r1 = min(r0 + ch_h, full_h) + dask_cols = [] + for c0 in cols: + c1 = min(c0 + ch_w, full_w) + if out_has_band_axis: + block_shape = (r1 - r0, c1 - c0, n_bands) + else: + block_shape = (r1 - r0, c1 - c0) + # Translate window-relative chunk coords back to file-relative + # coords for ``read_to_array``. ``win_r0`` / ``win_c0`` are 0 + # when no window was requested. + # Always thread ``target_dtype`` so each delayed chunk lands + # in the same dtype that the dask array declared. Without + # this, an integer raster with an in-range nodata sentinel + # would have ``effective_dtype=float64`` declared on the + # dask graph but per-chunk arrays only promoted when a + # chunk happened to contain a sentinel pixel. Dask + # concatenation then preallocates from the first chunk's + # actual dtype (uint16), silently casting later float64 + # chunks back to int and converting their NaNs to 0. See + # issue #1597. + block = da.from_delayed( + _delayed_read_window(source, + r0 + win_r0, c0 + win_c0, + r1 + win_r0, c1 + win_c0, + overview_level, nodata, + band_arg, + target_dtype=target_dtype, + http_meta_key=http_meta_key, + max_pixels=max_pixels), + shape=block_shape, + dtype=target_dtype, + ) + dask_cols.append(block) + dask_rows.append(da.concatenate(dask_cols, axis=1)) + + dask_arr = da.concatenate(dask_rows, axis=0) + + if out_has_band_axis: + dims = ['y', 'x', 'band'] + coords['band'] = np.arange(n_bands) + else: + dims = ['y', 'x'] + + return xr.DataArray( + dask_arr, dims=dims, coords=coords, name=name, attrs=attrs, + ) + + +def _delayed_read_window(source, r0, c0, r1, c1, overview_level, nodata, + band, *, target_dtype=None, http_meta_key=None, + max_pixels=None): + """Dask-delayed function to read a single window. + + *http_meta_key* is an optional ``Delayed[(TIFFHeader, IFD)]`` parsed + once by :func:`read_geotiff_dask` and wrapped via ``dask.delayed``. + Passing it as a function argument (rather than a closure capture) + makes the metadata a single graph input that all window tasks + depend on, so distributed/process schedulers serialise it once + instead of once per chunk. For local sources it is ``None``. + """ + import dask + + @dask.delayed + def _read(http_meta): + # The prefetched-metadata fast path covers both HTTP COGs and + # fsspec-addressable remotes (s3://, gs://, az://, memory://, ...). + # Both source classes expose ``read_range``, which is all + # ``_fetch_decode_cog_http_tiles`` needs. + _is_http_src = isinstance(source, str) and source.startswith( + ('http://', 'https://')) + _is_fsspec_src = False + if http_meta is not None and isinstance(source, str) and \ + not _is_http_src: + from .._reader import _is_fsspec_uri as _ifs + _is_fsspec_src = _ifs(source) + if http_meta is not None and (_is_http_src or _is_fsspec_src): + from .._reader import ( + _fetch_decode_cog_http_tiles, + MAX_PIXELS_DEFAULT, + _apply_photometric_miniswhite, + ) + header, ifd = http_meta + if _is_http_src: + from .._reader import _HTTPSource + src = _HTTPSource(source) + else: + from .._reader import _CloudSource + src = _CloudSource(source) + try: + arr = _fetch_decode_cog_http_tiles( + src, header, ifd, + max_pixels=(max_pixels if max_pixels is not None + else MAX_PIXELS_DEFAULT), + window=(r0, c0, r1, c1)) + finally: + src.close() + if (arr.ndim == 3 and ifd.samples_per_pixel > 1 + and band is not None): + arr = arr[:, :, band] + arr = _apply_photometric_miniswhite(arr, ifd) + else: + _r2a_kwargs = {} + if max_pixels is not None: + _r2a_kwargs['max_pixels'] = max_pixels + arr, _ = _read_to_array(source, window=(r0, c0, r1, c1), + overview_level=overview_level, + band=band, **_r2a_kwargs) + if nodata is not None: + # ``arr`` was just decoded by ``_fetch_decode_cog_http_tiles`` + # or ``read_to_array``; both return freshly-allocated buffers + # that nothing else references, so the in-place sentinel + # rewrite is safe. Skip the defensive ``arr.copy()`` to + # avoid a peak-memory doubler on every dask chunk. + if arr.dtype.kind == 'f' and not np.isnan(nodata): + arr[arr == arr.dtype.type(nodata)] = np.nan + elif (arr.dtype.kind in ('u', 'i') + and np.isfinite(nodata) + and float(nodata).is_integer()): + # Out-of-range sentinels (e.g. uint16 + nodata=-9999) + # cannot match any pixel; skip the cast that would + # otherwise raise OverflowError and leave arr unchanged. + # Non-finite sentinels ("NaN" / "Inf" GDAL_NODATA strings) + # also cannot match an integer pixel and would raise + # ValueError on ``int(nodata)``; the ``np.isfinite`` gate + # mirrors ``_resolve_masked_fill`` in ``_reader.py`` + # (#1774). Fractional sentinels (e.g. ``"3.5"`` on a + # ``uint16`` file) also cannot match an integer pixel and + # ``int(3.5)`` would truncate to 3 and silently mask + # pixel value 3; the ``float(nodata).is_integer()`` gate + # short-circuits them too. + nodata_int = int(nodata) + info = np.iinfo(arr.dtype) + if info.min <= nodata_int <= info.max: + mask = arr == arr.dtype.type(nodata_int) + if mask.any(): + arr = arr.astype(np.float64) + arr[mask] = np.nan + if target_dtype is not None and arr.dtype != target_dtype: + # Skip the cast when dtype already matches. ``numpy.astype`` + # defaults to ``copy=True`` and would otherwise allocate a + # full chunk-sized buffer and memcpy on every read just to + # land in the same dtype the array already has. The int-> + # float64 promotion above (sentinel-hit branch) keeps the + # contract that every chunk lands in the dask-declared + # dtype; this guard only elides no-op casts. See #1624. + arr = arr.astype(target_dtype) + return arr + return _read(http_meta_key) diff --git a/xrspatial/geotiff/_backends/gpu.py b/xrspatial/geotiff/_backends/gpu.py index 84ab09a3..40a3d636 100644 --- a/xrspatial/geotiff/_backends/gpu.py +++ b/xrspatial/geotiff/_backends/gpu.py @@ -34,6 +34,7 @@ _gpu_apply_window_band, _gpu_decode_single_band_tiles, ) +from .dask import read_geotiff_dask def read_geotiff_gpu(source: str, *, @@ -861,11 +862,6 @@ def _read_geotiff_gpu_chunked(source, *, dtype, chunks, overview_level, # for (the CPU path re-parses metadata anyway). pass - # Lazy import to avoid a circular dependency with ``__init__.py``, - # which re-exports ``read_geotiff_gpu`` from this module. By the - # time this line executes both modules are fully loaded. - from .. import read_geotiff_dask - cpu_da = read_geotiff_dask( source, dtype=dtype, chunks=chunks, overview_level=overview_level, window=window, band=band, diff --git a/xrspatial/geotiff/tests/test_dask_no_op_astype_1624.py b/xrspatial/geotiff/tests/test_dask_no_op_astype_1624.py index 34724186..76178a9d 100644 --- a/xrspatial/geotiff/tests/test_dask_no_op_astype_1624.py +++ b/xrspatial/geotiff/tests/test_dask_no_op_astype_1624.py @@ -73,7 +73,7 @@ def test_astype_skipped_when_dtypes_match(float32_no_nodata_tif, monkeypatch): chunk triggers one same-dtype astype. With the fix, none do. """ from xrspatial.geotiff import _reader as reader_mod - import xrspatial.geotiff as gt + from xrspatial.geotiff._backends import dask as gt path, _ = float32_no_nodata_tif @@ -105,10 +105,10 @@ def wrapped_r2a(*args, **kwargs): return tracked, meta # ``read_geotiff_dask``'s per-chunk worker calls the alias - # ``_read_to_array`` bound in ``xrspatial.geotiff``. Patch that - # binding; patching ``_reader.read_to_array`` would not affect the - # already-imported alias. See issue #1708 for why ``read_to_array`` - # is internal. + # ``_read_to_array`` bound in ``xrspatial.geotiff._backends.dask`` + # (since #1886). Patch that binding; patching + # ``_reader.read_to_array`` would not affect the already-imported + # alias. See issue #1708 for why ``read_to_array`` is internal. monkeypatch.setattr(gt, '_read_to_array', wrapped_r2a) dk = read_geotiff_dask(path, chunks=4) From 85c6fae5d48212ff663ce223dd045a2670580ad0 Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Thu, 14 May 2026 21:16:56 -0700 Subject: [PATCH 2/2] geotiff: address Copilot review on _backends/dask.py extraction (#1897) Two cleanups Copilot flagged: 1. ``_backends/gpu.py`` module docstring still described the ``read_geotiff_dask`` import as lazy. The import was promoted to a static top-of-module ``from .dask import read_geotiff_dask`` in this PR; updated the docstring to reflect the current strategy and note why the original lazy form existed. 2. ``__init__.py`` had the ``# noqa: F401`` applied to a multi-name import, suppressing lint signal for ``_is_gpu_data`` as well as the intentional ``_apply_nodata_mask_gpu`` re-export. Split into two imports so the ``noqa`` marks only the re-export line. ``_is_gpu_data`` keeps full lint coverage; a future genuinely-unused import on it would surface normally. --- xrspatial/geotiff/__init__.py | 7 +++---- xrspatial/geotiff/_backends/gpu.py | 10 +++++++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/xrspatial/geotiff/__init__.py b/xrspatial/geotiff/__init__.py index 1c643d48..d5a0a892 100644 --- a/xrspatial/geotiff/__init__.py +++ b/xrspatial/geotiff/__init__.py @@ -68,10 +68,9 @@ _populate_attrs_from_geo_info, _resolve_nodata_attr, ) -from ._backends._gpu_helpers import ( # noqa: F401 -- _apply_nodata_mask_gpu re-export - _apply_nodata_mask_gpu, - _is_gpu_data, -) +from ._backends._gpu_helpers import _is_gpu_data +# Re-export only; called by xrspatial/geotiff/tests/test_nodata_*.py. +from ._backends._gpu_helpers import _apply_nodata_mask_gpu # noqa: F401 from ._backends.dask import read_geotiff_dask from ._backends.gpu import read_geotiff_gpu from ._crs import _resolve_crs_to_wkt, _wkt_to_epsg diff --git a/xrspatial/geotiff/_backends/gpu.py b/xrspatial/geotiff/_backends/gpu.py index 40a3d636..3ee5607c 100644 --- a/xrspatial/geotiff/_backends/gpu.py +++ b/xrspatial/geotiff/_backends/gpu.py @@ -4,9 +4,13 @@ already extracted (#1884), validators in ``_validation`` (#1882), attrs helpers in ``_attrs`` (#1883), and sentinels in ``_runtime`` (#1880), moving the entry-point body is a near-mechanical lift; the body stays -unchanged except for adjusted relative imports and a lazy -``read_geotiff_dask`` import that avoids a circular import with -``__init__.py``. +unchanged except for adjusted relative imports. + +``_read_geotiff_gpu_chunked`` calls into ``read_geotiff_dask`` for its +CPU-decode fallback path. That import was originally lazy because +``read_geotiff_dask`` still lived in ``__init__.py``; once it moved +to a sibling module in #1886, the import was promoted to a static +top-of-module ``from .dask import read_geotiff_dask``. """ from __future__ import annotations