Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/4040.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`zarr.experimental.cache_store.CacheStore` now performs negative caching by default (`cache_missing=True`, opt-out). A full-key read that finds the key absent in the source store is remembered, so repeat reads of that absent key return immediately without a source round-trip — useful for sparse arrays where most chunks resolve to the fill value. Remembered misses respect `max_age_seconds` and are evicted when the key is written via `set`/`set_if_not_exists`. Negative-cache activity is reported as `negative_hits` in `cache_stats()` and `missing_keys` in `cache_info()`. Only full-key reads are affected (not byte-range reads or `exists`). Pass `cache_missing=False` to restore the previous behavior. Like the positive cache (unbounded when `max_size is None`), the negative cache is bounded only by `max_age_seconds`; set a finite TTL for scans over very large sparse key spaces.
110 changes: 108 additions & 2 deletions src/zarr/experimental/cache_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ class _CacheState:
hits: int = 0
misses: int = 0
evictions: int = 0
negative_hits: int = 0
key_insert_times: dict[_CacheEntryKey, float] = field(default_factory=dict)
range_cache: dict[str, dict[ByteRequest, Buffer]] = field(default_factory=dict)
# Negative cache: full keys known to be absent in the source store, mapped to their
# (monotonic) insertion time. Used to short-circuit repeat reads of absent keys.
# Entries carry no data, so they are kept out of the byte-size accounting above;
# staleness is bounded by ``max_age_seconds``.
missing_keys: dict[str, float] = field(default_factory=dict)


class CacheStore(WrapperStore[Store]):
Expand Down Expand Up @@ -62,6 +68,26 @@ class CacheStore(WrapperStore[Store]):
Note: Individual values larger than max_size will not be cached.
cache_set_data : bool, optional
Whether to cache data when it's written to the store. Default is True.
cache_missing : bool, optional
Whether to remember full-key misses (negative caching). When True, a full-key
``get`` that finds the key absent in the source store records that absence, so
subsequent ``get``s for the same key return ``None`` without a source round-trip.
This benefits repeated reads of sparse arrays (most chunks absent). Negative
entries respect ``max_age_seconds`` and are evicted when the key is written
(``set``/``set_if_not_exists``). Only full-key reads are affected (not byte-range
reads or ``exists``). Default is True.

Notes:

- With ``max_age_seconds="infinity"`` (the default) a remembered miss never
expires, so a key written to the source by another process stays invisible
through this cache. Pair ``cache_missing=True`` with a finite
``max_age_seconds`` if the source may be written concurrently.
- Like the positive cache (which is unbounded when ``max_size is None``), the
negative cache is bounded only by ``max_age_seconds``. With an infinite TTL,
a scan over a very large sparse key space will accumulate one small entry per
absent key. Set a finite ``max_age_seconds`` (or ``cache_missing=False``) for
such workloads.

Examples
--------
Expand Down Expand Up @@ -91,6 +117,7 @@ class CacheStore(WrapperStore[Store]):
max_age_seconds: int | Literal["infinity"]
max_size: int | None
cache_set_data: bool
cache_missing: bool
_state: _CacheState

def __init__(
Expand All @@ -101,6 +128,7 @@ def __init__(
max_age_seconds: int | str = "infinity",
max_size: int | None = None,
cache_set_data: bool = True,
cache_missing: bool = True,
) -> None:
super().__init__(store)

Expand All @@ -121,6 +149,7 @@ def __init__(
self.max_age_seconds = max_age_seconds
self.max_size = max_size
self.cache_set_data = cache_set_data
self.cache_missing = cache_missing
self._state = _CacheState()

def _with_store(self, store: Store) -> Self:
Expand All @@ -136,6 +165,7 @@ def with_read_only(self, read_only: bool = False) -> Self:
max_age_seconds=self.max_age_seconds,
max_size=self.max_size,
cache_set_data=self.cache_set_data,
cache_missing=self.cache_missing,
)
store._state = self._state
return store
Expand All @@ -151,6 +181,31 @@ def _is_key_fresh(self, entry_key: _CacheEntryKey) -> bool:
elapsed = now - self._state.key_insert_times.get(entry_key, 0)
return elapsed < self.max_age_seconds

def _is_missing_fresh(self, key: str) -> bool:
"""Check if a negative (missing-key) entry is still fresh.

Mirrors ``_is_key_fresh`` but reads the negative-cache insertion time.
"""
if self.max_age_seconds == "infinity":
return True
elapsed = time.monotonic() - self._state.missing_keys.get(key, 0.0)
return elapsed < self.max_age_seconds

def _record_missing(self, key: str) -> None:
"""Record *key* as known-missing (absent in the source store).

Must be called while holding ``self._state.lock``. Staleness is bounded by
``max_age_seconds`` via ``_is_missing_fresh``.
"""
self._state.missing_keys[key] = time.monotonic()

def _evict_missing(self, key: str) -> None:
"""Drop any negative entry for *key* (it is now present or being written).

Must be called while holding ``self._state.lock``.
"""
self._state.missing_keys.pop(key, None)

async def _accommodate_value(self, value_size: int) -> None:
"""Ensure there is enough space in the cache for a new value.

Expand Down Expand Up @@ -266,6 +321,10 @@ async def _cache_miss(
await self._cache.delete(key)
async with self._state.lock:
self._remove_from_tracking(key)
# The key is absent in the source: remember the miss so a repeat
# read can short-circuit without a source round-trip.
if self.cache_missing:
self._record_missing(key)
else:
entry_key: _CacheEntryKey = (key, byte_range)
async with self._state.lock:
Expand All @@ -279,6 +338,10 @@ async def _cache_miss(
if byte_range is None:
await self._cache.set(key, result)
await self._track_entry(key, result)
# A value now exists for this key: drop any stale negative entry.
if self.cache_missing:
async with self._state.lock:
self._evict_missing(key)
else:
entry_key = (key, byte_range)
self._state.range_cache.setdefault(key, {})[byte_range] = result
Expand Down Expand Up @@ -351,6 +414,16 @@ async def get(
Buffer | None
The retrieved data, or None if not found
"""
# Negative cache fast-path (full-key reads only): a fresh "known absent" record
# short-circuits to None without consulting the positive cache or the source.
# Checked here, before the positive-entry freshness gate, because a negative-only
# key has no positive entry and would otherwise be routed straight to the source.
if self.cache_missing and byte_range is None:
async with self._state.lock:
if key in self._state.missing_keys and self._is_missing_fresh(key):
self._state.negative_hits += 1
return None

entry_key: _CacheEntryKey = (key, byte_range) if byte_range is not None else key
if not self._is_key_fresh(entry_key):
return await self._get_no_cache(key, prototype, byte_range)
Expand All @@ -369,9 +442,12 @@ async def set(self, key: str, value: Buffer) -> None:
The data to store
"""
await super().set(key, value)
# Invalidate all cached byte-range entries (source data changed)
# Invalidate all cached byte-range entries (source data changed) and drop any
# negative entry — the key now has a value.
async with self._state.lock:
self._invalidate_range_entries(key)
if self.cache_missing:
self._evict_missing(key)
if self.cache_set_data:
await self._cache.set(key, value)
await self._track_entry(key, value)
Expand All @@ -380,6 +456,26 @@ async def set(self, key: str, value: Buffer) -> None:
async with self._state.lock:
self._remove_from_tracking(key)

async def set_if_not_exists(self, key: str, value: Buffer) -> None:
"""
Store data only if the key does not already exist in the source store.

Parameters
----------
key : str
The key to store under
value : Buffer
The data to store
"""
await super().set_if_not_exists(key, value)
# Whether or not the write happened, any negative entry is now unsafe: either
# we just wrote the key, or it already existed (so the record was already
# wrong). Evicting unconditionally is always safe. We do not populate the
# positive cache here — there is no guaranteed-fresh value to store.
if self.cache_missing:
async with self._state.lock:
self._evict_missing(key)

async def delete(self, key: str) -> None:
"""
Delete data from both the underlying store and cache.
Expand Down Expand Up @@ -407,18 +503,26 @@ def cache_info(self) -> dict[str, Any]:
"max_size": self.max_size,
"current_size": self._state.current_size,
"cache_set_data": self.cache_set_data,
"cache_missing": self.cache_missing,
"tracked_keys": len(self._state.key_insert_times),
"cached_keys": len(self._state.cache_order),
"missing_keys": len(self._state.missing_keys),
}

def cache_stats(self) -> dict[str, Any]:
"""Return cache performance statistics."""
"""Return cache performance statistics.

``hit_rate`` reflects positive-cache hits over positive lookups only; a
negative-cache hit (an absent key served from the negative cache) is reported
separately as ``negative_hits`` and is counted as neither a hit nor a miss.
"""
total_requests = self._state.hits + self._state.misses
hit_rate = self._state.hits / total_requests if total_requests > 0 else 0.0
return {
"hits": self._state.hits,
"misses": self._state.misses,
"evictions": self._state.evictions,
"negative_hits": self._state.negative_hits,
"total_requests": total_requests,
"hit_rate": hit_rate,
}
Expand All @@ -435,7 +539,9 @@ async def clear_cache(self) -> None:
self._state.cache_order.clear()
self._state.key_sizes.clear()
self._state.range_cache.clear()
self._state.missing_keys.clear()
self._state.current_size = 0
self._state.negative_hits = 0

def __repr__(self) -> str:
"""Return string representation of the cache store."""
Expand Down
130 changes: 130 additions & 0 deletions tests/test_experimental/test_cache_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,10 @@ async def test_cache_info(self, cached_store: CacheStore) -> None:
"max_size",
"current_size",
"cache_set_data",
"cache_missing",
"tracked_keys",
"cached_keys",
"missing_keys",
}
assert set(info.keys()) == expected_keys

Expand Down Expand Up @@ -1047,3 +1049,131 @@ async def test_delete_invalidates_cached_byte_ranges(self) -> None:
# Key is gone from source
result = await cached_store.get("key", proto)
assert result is None


class TestCacheStoreNegativeCaching:
"""Tests for opt-in negative (missing-key) caching (``cache_missing=True``)."""

async def test_basic(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""A second get of an absent key is served from the negative cache without a
source round-trip."""
source = MemoryStore()
cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True)
proto = default_buffer_prototype()

calls = {"n": 0}
orig_get = source.get

async def counting_get(*args: object, **kwargs: object) -> object:
calls["n"] += 1
return await orig_get(*args, **kwargs) # type: ignore[arg-type]

monkeypatch.setattr(source, "get", counting_get)

assert await cs.get("c/0", proto) is None
assert cs.cache_info()["missing_keys"] == 1
after_first = calls["n"]

assert await cs.get("c/0", proto) is None
assert calls["n"] == after_first # no further source access
assert cs.cache_stats()["negative_hits"] == 1

async def test_enabled_by_default(self) -> None:
"""Negative caching is on by default (opt-out)."""
cs = CacheStore(MemoryStore(), cache_store=MemoryStore())
proto = default_buffer_prototype()
assert cs.cache_missing is True
assert await cs.get("c/0", proto) is None
assert await cs.get("c/0", proto) is None
assert cs.cache_info()["missing_keys"] == 1
assert cs.cache_stats()["negative_hits"] == 1

async def test_can_be_disabled(self) -> None:
"""With ``cache_missing=False`` nothing is remembered."""
cs = CacheStore(MemoryStore(), cache_store=MemoryStore(), cache_missing=False)
proto = default_buffer_prototype()
assert await cs.get("c/0", proto) is None
assert await cs.get("c/0", proto) is None
assert cs.cache_info()["missing_keys"] == 0
assert cs.cache_stats()["negative_hits"] == 0

async def test_evicted_on_set(self) -> None:
source = MemoryStore()
cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True)
proto = default_buffer_prototype()
assert await cs.get("c/0", proto) is None
assert cs.cache_info()["missing_keys"] == 1

await cs.set("c/0", CPUBuffer.from_bytes(b"value"))
assert cs.cache_info()["missing_keys"] == 0
result = await cs.get("c/0", proto)
assert result is not None
assert result.to_bytes() == b"value"

async def test_evicted_on_set_if_not_exists(self) -> None:
source = MemoryStore()
cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True)
proto = default_buffer_prototype()
assert await cs.get("c/0", proto) is None
assert cs.cache_info()["missing_keys"] == 1

await cs.set_if_not_exists("c/0", CPUBuffer.from_bytes(b"value"))
assert cs.cache_info()["missing_keys"] == 0
result = await cs.get("c/0", proto)
assert result is not None
assert result.to_bytes() == b"value"

async def test_respects_ttl(self) -> None:
"""A negative entry expires after ``max_age_seconds`` so a key written to the
source out-of-band becomes visible again."""
source = MemoryStore()
cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True, max_age_seconds=1)
proto = default_buffer_prototype()
assert await cs.get("c/0", proto) is None

# an external writer adds the key directly to the source store
await source.set("c/0", CPUBuffer.from_bytes(b"late"))

# before TTL: still reported missing from the negative cache
assert await cs.get("c/0", proto) is None
await asyncio.sleep(1.1)

# after TTL: the stale negative entry is bypassed, source is consulted
result = await cs.get("c/0", proto)
assert result is not None
assert result.to_bytes() == b"late"
assert cs.cache_info()["missing_keys"] == 0

async def test_byte_range_unaffected(self) -> None:
"""Byte-range misses do not populate the negative cache."""
cs = CacheStore(MemoryStore(), cache_store=MemoryStore(), cache_missing=True)
proto = default_buffer_prototype()
assert await cs.get("c/0", proto, byte_range=RangeByteRequest(0, 4)) is None
assert cs.cache_info()["missing_keys"] == 0

async def test_stats_and_info(self) -> None:
"""``negative_hits``/``missing_keys``/``cache_missing`` are surfaced and the
positive ``hit_rate`` is unaffected by negative hits."""
source = MemoryStore()
cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True)
proto = default_buffer_prototype()

await cs.set("present", CPUBuffer.from_bytes(b"x"))
assert (await cs.get("present", proto)) is not None # positive hit
assert await cs.get("absent", proto) is None # records miss
assert await cs.get("absent", proto) is None # negative hit

info = cs.cache_info()
stats = cs.cache_stats()
assert info["cache_missing"] is True
assert info["missing_keys"] == 1
assert stats["negative_hits"] == 1
assert stats["hits"] == 1
assert stats["misses"] == 1 # negative hit counts as neither hit nor miss
assert stats["hit_rate"] == 0.5

async def test_delete_does_not_record(self) -> None:
"""Deleting a key does not create a negative entry (deletion != checked-absent)."""
cs = CacheStore(MemoryStore(), cache_store=MemoryStore(), cache_missing=True)
await cs.delete("c/0")
assert cs.cache_info()["missing_keys"] == 0
Loading