Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
a75604a
feat: define `PreparedWrite` and `SupportsChunkPacking` data structures
d-v-b Apr 7, 2026
a072c31
Merge branch 'main' of https://github.com/zarr-developers/zarr-python…
d-v-b Apr 7, 2026
47a407f
feat: new codec pipeline that uses sync path
d-v-b Apr 7, 2026
3c27e49
feat: complete second codecpipeline
d-v-b Apr 8, 2026
9b834a4
Merge branch 'main' of https://github.com/zarr-developers/zarr-python…
d-v-b Apr 8, 2026
c731cf2
fix: handle rectilinear chunks
d-v-b Apr 8, 2026
9e25150
Merge branch 'main' of https://github.com/zarr-developers/zarr-python…
d-v-b Apr 8, 2026
ae0580c
fixup
d-v-b Apr 9, 2026
0effe4d
feat: SupportsSetRange protocol + sync byte-range writes
d-v-b Apr 17, 2026
7f45aba
feat: add sync codec methods to V2 and numcodecs codecs
d-v-b Apr 17, 2026
9b26f90
feat: SyncCodecPipeline — synchronous codec pipeline with per-chunk p…
d-v-b Apr 17, 2026
a48f4f7
feat: partial-shard write support in ShardingCodec
d-v-b Apr 17, 2026
fba975e
test: codec invariants + pipeline parity matrix
d-v-b Apr 17, 2026
1be5563
chore: gitignore local agent/planning notes
d-v-b Apr 17, 2026
985716b
Merge branch 'main' into perf/prepared-write-v2
d-v-b Apr 29, 2026
65d98a1
Merge branch 'main' into perf/prepared-write-v2
d-v-b Apr 30, 2026
3f3e7ea
Merge branch 'perf/prepared-write-v2' of https://github.com/d-v-b/zar…
d-v-b Apr 30, 2026
8b23d22
chore: remove unused PreparedWrite and SupportsChunkCodec
d-v-b May 1, 2026
cd3c14b
chore: remove stale phased-pipeline test files
d-v-b May 1, 2026
88eac8f
refactor: rename SyncCodecPipeline to FusedCodecPipeline
d-v-b May 1, 2026
c3d11d0
refactor: lift _merge_chunk_array to module level
d-v-b May 1, 2026
71f0d32
refactor: extract _async_read_fallback to module level
d-v-b May 1, 2026
f463035
refactor: extract _async_write_fallback to module level
d-v-b May 1, 2026
621361a
test(bench): parametrize test_e2e over both codec pipelines
d-v-b May 1, 2026
fb812c4
test(bench): parametrize test_e2e over a synthetic latency dimension
d-v-b May 1, 2026
d2e08de
Merge branch 'main' of https://github.com/zarr-developers/zarr-python…
d-v-b May 1, 2026
241282a
chore: restore deleted comments in V2Codec._decode_sync
d-v-b May 1, 2026
0324a85
docs: explain non-obvious behaviors in sharding sync methods
d-v-b May 1, 2026
2f97362
docs: convert RST inline literals to Markdown-style backticks
d-v-b May 1, 2026
c9c8c26
perf: memoize encoded inner chunk for scalar complete-shard writes (#…
d-v-b May 30, 2026
5c536bb
merge: integrate upstream/main (subchunk write orders #3826, partial-…
d-v-b Jun 4, 2026
3b4707a
perf+fix: bulk whole-shard read + repair _ShardIndex construction pos…
d-v-b Jun 4, 2026
5ae0d88
fix: FusedCodecPipeline honors subchunk_write_order + coalesced parti…
d-v-b Jun 5, 2026
fed58c0
fix: address roborev review (job 222) — Fused sharding correctness
d-v-b Jun 5, 2026
11bba96
Merge branch 'main' into perf/prepared-write-v2
d-v-b Jun 5, 2026
7656383
docs: correct FusedCodecPipeline framing — sync scheduling, not IO/co…
d-v-b Jun 5, 2026
b7b9570
Merge remote-tracking branch 'origin/perf/prepared-write-v2' into per…
d-v-b Jun 5, 2026
16c932d
refactor: stop hard-coding assumptions about the 'unordered' write order
d-v-b Jun 5, 2026
efb4b36
feat: make FusedCodecPipeline the default codec pipeline
d-v-b Jun 5, 2026
0814ffd
fix: ShardingCodec inner pipeline follows the configured default, not…
d-v-b Jun 5, 2026
071f87b
fix: Fused async decode/encode must evolve codec specs (HIGH-2) + sha…
d-v-b Jun 5, 2026
6f12b42
test: dedupe codec-pipeline tests against the shared CodecPipelineTes…
d-v-b Jun 5, 2026
7d606f2
test: unify the create/write/read suite tests into one Scenario-param…
d-v-b Jun 5, 2026
4cc328a
test: prune test_fused_pipeline.py to its irreducible Fused-specific …
d-v-b Jun 5, 2026
9acdb3a
test: dissolve test_codec_invariants.py, redistributing by subject
d-v-b Jun 5, 2026
35255fa
test: drop redundant read-parity matrix, move partial-read coverage t…
d-v-b Jun 5, 2026
9855060
test: rename test_sync_codec_pipeline -> test_chunk_transform; drop c…
d-v-b Jun 5, 2026
cccab40
feat: remove byte-range-write support pending store-interface decision
d-v-b Jun 5, 2026
79e0896
refactor: remove dead _get_default_chunk_spec helper
d-v-b Jun 5, 2026
100a69a
docs: correct ShardingCodec._encode_sync docstring re: write order
d-v-b Jun 5, 2026
89a92c7
refactor: remove unused ShardingCodec._load_shard_index wrapper
d-v-b Jun 5, 2026
bba8382
docs: drop stale set_range_sync mention from FusedCodecPipeline docst…
d-v-b Jun 5, 2026
b8e8950
docs: use plain single backticks in docstrings, not RST double-backticks
d-v-b Jun 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,9 @@ tests/.hypothesis
zarr/version.py
zarr.egg-info/

# Local agent / planning notes (not versioned)
.claude/
CLAUDE.md
docs/superpowers/
# zarr-metadata package lockfile (a library, not an app)
packages/zarr-metadata/uv.lock
1 change: 1 addition & 0 deletions changes/PLACEHOLDER-fused-default.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`FusedCodecPipeline` is now the default codec pipeline. It runs codec compute synchronously and in bulk (avoiding the per-chunk async scheduling overhead of `BatchedCodecPipeline`), giving large speedups for sharded arrays (up to ~24x writes / ~14x reads on many-chunks-per-shard layouts, more with compression) and no regressions on compute-bound workloads. The previous behavior is available by setting `zarr.config.set({"codec_pipeline.path": "zarr.core.codec_pipeline.BatchedCodecPipeline"})`.
67 changes: 67 additions & 0 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,73 @@ async def get_ranges(
):
yield group

def get_ranges_sync(
self,
key: str,
byte_ranges: Sequence[ByteRequest | None],
*,
prototype: BufferPrototype,
max_gap_bytes: int = 1 << 20, # 1 MiB
max_coalesced_bytes: int = 16 << 20, # 16 MiB
) -> Sequence[tuple[int, Buffer | None]]:
"""Synchronous, coalescing counterpart of `get_ranges`.

Plans merged fetches with the same `coalesce_ranges` policy as the async
path, then issues one synchronous `get_sync` per merged group (or per
uncoalescable request) and slices results back into per-input buffers.
Used by the sync codec pipeline's partial-shard reads so they get the
same byte-range coalescing as the async path, without an event loop.

Returns a list of `(input_index, Buffer | None)`. Raises
`BaseExceptionGroup` containing a `FileNotFoundError` if the key is
absent (matching `get_ranges`), so callers can handle a deleted shard
uniformly across the sync and async paths.

Requires the store to implement `get_sync` (`SupportsGetSync`).
"""
from zarr.core._coalesce import coalesce_ranges

if not isinstance(self, SupportsGetSync):
raise TypeError(f"{type(self).__name__} does not support synchronous reads")

groups, uncoalescable = coalesce_ranges(
byte_ranges, max_gap_bytes=max_gap_bytes, max_coalesced_bytes=max_coalesced_bytes
)
results: list[tuple[int, Buffer | None]] = []
errors: list[BaseException] = []

def _get(req: ByteRequest | None) -> Buffer | None:
return self.get_sync(key, prototype=prototype, byte_range=req)

for idx, req in uncoalescable:
buf = _get(req)
if buf is None:
errors.append(FileNotFoundError(key))
else:
results.append((idx, buf))

for members in groups:
if len(members) == 1:
solo_idx, solo_req = members[0]
buf = _get(solo_req)
if buf is None:
errors.append(FileNotFoundError(key))
else:
results.append((solo_idx, buf))
continue
start = members[0][1].start
end = max(r.end for _, r in members)
big = _get(RangeByteRequest(start, end))
if big is None:
errors.append(FileNotFoundError(key))
continue
for member_idx, r in members:
results.append((member_idx, big[r.start - start : r.end - start]))

if errors:
raise BaseExceptionGroup("chunk read failed", errors)
return results

async def getsize(self, key: str) -> int:
"""
Return the size, in bytes, of a value in a Store.
Expand Down
26 changes: 20 additions & 6 deletions src/zarr/codecs/_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ class V2Codec(ArrayBytesCodec):

is_fixed_size = False

async def _decode_single(
def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
cdata = chunk_bytes.as_array_like()
# decompress
if self.compressor:
chunk = await asyncio.to_thread(self.compressor.decode, cdata)
chunk = self.compressor.decode(cdata)
else:
chunk = cdata

# apply filters
if self.filters:
for f in reversed(self.filters):
chunk = await asyncio.to_thread(f.decode, chunk)
chunk = f.decode(chunk)

# view as numpy array with correct dtype
chunk = ensure_ndarray_like(chunk)
Expand Down Expand Up @@ -70,7 +70,7 @@ async def _decode_single(

return get_ndbuffer_class().from_ndarray_like(chunk)

async def _encode_single(
def _encode_sync(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
Expand All @@ -83,18 +83,32 @@ async def _encode_single(
# apply filters
if self.filters:
for f in self.filters:
chunk = await asyncio.to_thread(f.encode, chunk)
chunk = f.encode(chunk)
# check object encoding
if ensure_ndarray_like(chunk).dtype == object:
raise RuntimeError("cannot write object array without object codec")

# compress
if self.compressor:
cdata = await asyncio.to_thread(self.compressor.encode, chunk)
cdata = self.compressor.encode(chunk)
else:
cdata = chunk
cdata = ensure_bytes(cdata)
return chunk_spec.prototype.buffer.from_bytes(cdata)

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return await asyncio.to_thread(self._encode_sync, chunk_array, chunk_spec)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError
50 changes: 30 additions & 20 deletions src/zarr/codecs/numcodecs/_codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
if TYPE_CHECKING:
from zarr.abc.numcodec import Numcodec
from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, BufferPrototype, NDBuffer
from zarr.core.buffer import Buffer, NDBuffer

CODEC_PREFIX = "numcodecs."

Expand Down Expand Up @@ -134,53 +134,63 @@ class _NumcodecsBytesBytesCodec(_NumcodecsCodec, BytesBytesCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(
as_numpy_array_wrapper,
self._codec.decode,
chunk_data,
chunk_spec.prototype,
)
def _decode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return as_numpy_array_wrapper(self._codec.decode, chunk_data, chunk_spec.prototype)

def _encode(self, chunk_data: Buffer, prototype: BufferPrototype) -> Buffer:
def _encode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
encoded = self._codec.encode(chunk_data.as_array_like())
if isinstance(encoded, np.ndarray): # Required for checksum codecs
return prototype.buffer.from_bytes(encoded.tobytes())
return prototype.buffer.from_bytes(encoded)
return chunk_spec.prototype.buffer.from_bytes(encoded.tobytes())
return chunk_spec.prototype.buffer.from_bytes(encoded)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)

async def _encode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._encode, chunk_data, chunk_spec.prototype)
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)


class _NumcodecsArrayArrayCodec(_NumcodecsCodec, ArrayArrayCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
def _decode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = await asyncio.to_thread(self._codec.decode, chunk_ndarray)
out = self._codec.decode(chunk_ndarray)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
def _encode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = await asyncio.to_thread(self._codec.encode, chunk_ndarray)
out = self._codec.encode(chunk_ndarray)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out)

async def _decode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)


class _NumcodecsArrayBytesCodec(_NumcodecsCodec, ArrayBytesCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
def _decode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_bytes = chunk_data.to_bytes()
out = await asyncio.to_thread(self._codec.decode, chunk_bytes)
out = self._codec.decode(chunk_bytes)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
def _encode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = await asyncio.to_thread(self._codec.encode, chunk_ndarray)
out = self._codec.encode(chunk_ndarray)
return chunk_spec.prototype.buffer.from_bytes(out)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)


# bytes-to-bytes codecs
class Blosc(_NumcodecsBytesBytesCodec, codec_name="blosc"):
Expand Down
Loading
Loading