Skip to content

Expired SSD-tiered key returned by GET/MGET while TTL reports -2 #6874

@shahyash2609

Description

@shahyash2609

Describe the bug
When a string key is tiered to SSD (via --tiered_prefix) and its TTL expires while a SSD read for that key is in-flight, GET/MGET returns the key's value even though TTL correctly reports -2 (key not found / expired). The expired value leaks to the client.

To Reproduce

  1. Start Dragonfly with tiering and a memory limit that forces offloading
    ./dragonfly
    --tiered_prefix=/mnt/ssd/df_tier
    --maxmemory=4G
    --tiered_offload_threshold=0.9
    --tiered_upload_threshold=0.25
    --cache_mode=true

  2. Fill ~80% of maxmemory so offloading stays active and uploads are suppressed
    (see repro script below)

  3. SET a key with a short TTL and a value large enough to be tiered (>= 64 B)
    SET mykey "X*3000" PX 3000

  4. Wait until the key is tiered (DEBUG OBJECT mykey shows spill_len:)

  5. Issue GET ~50 ms before expiry (SSD read starts, tx hop ends)
    GET mykey # returns value ← correct at this point

  6. Background expiry fires (every 10 ms, hz=100), key is deleted from prime table

  7. Query TTL after expiry — returns -2 (key gone)
    TTL mykey # -2 ← key is expired

  8. GET again — should return nil, but returns the value
    GET mykey # "XXX..." ← BUG: expired value returned

Repro script (reproduces 24 % of keys on first iteration with the config above):

#!/usr/bin/env python3
"""
Reproduce DragonflyDB tiered-storage TTL race condition.
BUG: GET returns a value for a key that TTL reports as -2 (expired).
"""
import redis, threading, time, sys

HOST = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1"
PORT = int(sys.argv[2]) if len(sys.argv) > 2 else 6379

VALUE_SIZE            = 3_000
BATCH_SIZE            = 500
TTL_MS                = 3_000
FIRE_BEFORE_EXPIRY_MS = 50

def make_client():
    return redis.Redis(host=HOST, port=PORT, socket_timeout=10,
                       socket_connect_timeout=5, decode_responses=False)

def get_tiered_entries(r):
    return int(r.info("all").get("tiered_entries", 0))

def is_key_tiered(r, key):
    try:
        out = r.execute_command("DEBUG", "OBJECT", key)
        return b"spill_len:" in out
    except Exception:
        return False

def fill_memory(r, maxmemory_bytes, offload_threshold, upload_threshold):
    fill_ratio = min(1.0 - min(offload_threshold, upload_threshold) + 0.05, 0.95)
    num_keys   = int(fill_ratio * maxmemory_bytes) // VALUE_SIZE
    pipe = r.pipeline(transaction=False)
    for i in range(num_keys):
        pipe.set(f"filler:{i}", b"F" * VALUE_SIZE)
        if (i + 1) % 5_000 == 0:
            pipe.execute(); pipe = r.pipeline(transaction=False)
    pipe.execute()
    deadline = time.monotonic() + 90
    while time.monotonic() < deadline:
        if get_tiered_entries(r) >= 1_000: return True
        time.sleep(2)
    return False

def write_test_keys(r, prefix, count, ttl_ms, value):
    pipe = r.pipeline(transaction=False)
    for i in range(count): pipe.set(f"{prefix}:{i}", value, px=ttl_ms)
    pipe.execute()

def wait_for_test_keys_tiered(r, prefix, count, timeout_s=40):
    sample = [f"{prefix}:{i}" for i in range(0, count, max(1, count // 20))]
    needed = max(1, int(len(sample) * 0.8))
    deadline = time.monotonic() + timeout_s
    while time.monotonic() < deadline:
        if sum(1 for k in sample if is_key_tiered(r, k)) >= needed: return True
        time.sleep(1)
    return False

def run_iteration(prefix, n, get_conns):
    r     = make_client()
    value = (f"TESTVAL{n}_" * 200)[:VALUE_SIZE].encode()
    write_test_keys(r, prefix, BATCH_SIZE, TTL_MS, value)
    write_at = time.monotonic()
    if not wait_for_test_keys_tiered(r, prefix, BATCH_SIZE): return 0

    elapsed_ms = (time.monotonic() - write_at) * 1_000
    sleep_ms   = TTL_MS - elapsed_ms - FIRE_BEFORE_EXPIRY_MS
    if sleep_ms > 0: time.sleep(sleep_ms / 1_000)

    get_results = {}; lock = threading.Lock()
    def do_get(key, conn):
        val = conn.get(key)
        with lock: get_results[key] = val

    threads = [threading.Thread(target=do_get, args=(f"{prefix}:{i}", get_conns[i]),
                                daemon=True) for i in range(BATCH_SIZE)]
    for t in threads: t.start()
    for t in threads: t.join()

    remaining = (write_at + TTL_MS / 1_000) - time.monotonic() + 0.1
    if remaining > 0: time.sleep(remaining)

    keys = [f"{prefix}:{i}" for i in range(BATCH_SIZE)]
    pipe = r.pipeline(transaction=False)
    for k in keys: pipe.ttl(k)
    ttls = dict(zip(keys, pipe.execute()))

    bugs = [(k, get_results.get(k)) for k in keys
            if ttls[k] == -2 and get_results.get(k) is not None]
    for k, v in bugs[:5]:
        print(f"  BUG: key={k}  TTL=-2  GET={len(v)}B  preview={v[:40]}")
    return len(bugs)

def main():
    r = make_client()
    maxmem           = int(r.config_get("maxmemory").get("maxmemory", 0))
    offload_threshold = float(r.config_get("tiered_offload_threshold").get("tiered_offload_threshold", 0.5))
    upload_threshold  = float(r.config_get("tiered_upload_threshold").get("tiered_upload_threshold", 0.1))
    if not fill_memory(r, maxmem, offload_threshold, upload_threshold):
        sys.exit("Tiering not active — check --tiered_prefix and --maxmemory")
    get_conns = [make_client() for _ in range(BATCH_SIZE)]
    total = 0
    for it in range(1, 6):
        total += run_iteration(f"race:{it}", it, get_conns)
        if total >= 5: break
    print(f"\nBug reproduced {total} time(s)" if total else "Not reproduced")

if __name__ == "__main__":
    main()

Expected behavior
GET/MGET should return nil (null) for a key whose TTL has expired, consistent with what TTL reports. An expired key must never return a value.

Screenshots
Image

Environment (please complete the following information):

  • OS: Ubuntu 20.04
  • Kernel: Linux 6.14.0-1021-gcp
  • Containerized: Bare Metal
  • Dragonfly Version: v1.36.0
  • Storage: NVMe local SSD at --tiered_prefix path

Reproducible Code Snippet

The bug is a race condition in src/server/tiering/op_manager.cc, ProcessRead().

Timeline:
T-50ms GET fires → FindReadOnly: key valid → SSD read submitted to io_uring
transaction hop ends — shard thread is now free
T+0ms background heartbeat (every 10ms, hz=100) runs on shard thread:
ExpireIfNeeded → key expired → PerformDeletionAtomic
→ TieredStorage::Delete → OpManager::DeleteOffloaded
→ sets ko.deleting = true on the pending read entry
T+1ms io_uring callback fires → ProcessRead():
ko.deleting == true BUT read callbacks are still invoked ← BUG
→ TResult resolves with the value from SSD
→ GET returns the expired value to the client
T+100ms TTL check → -2 (key absent from prime table)

Buggy code (src/server/tiering/op_manager.cc, ProcessRead(), ~line 154):

  if (page) {
      size_t offset = ko.segment.offset - info->segment.offset;
      ko.decoder->Initialize(page->substr(offset, ko.segment.length));
      for (auto& cb : ko.read_cbs)
          cb(&*ko.decoder);   // ← called unconditionally even when ko.deleting == true
  }

Proposed fix: when ko.deleting == true and the I/O succeeded, signal the callbacks with a cancellation error instead of delivering the data. The waiter (GetReplies::Send) should then return nil to the client.

  if (page) {
      size_t offset = ko.segment.offset - info->segment.offset;
      ko.decoder->Initialize(page->substr(offset, ko.segment.length));
      if (!ko.deleting) {
          for (auto& cb : ko.read_cbs)
              cb(&*ko.decoder);
      } else {
          // Key was deleted while SSD read was in-flight; unblock waiters with an error
          for (auto& cb : ko.read_cbs)
              cb(nonstd::make_unexpected(
                  std::make_error_code(std::errc::operation_canceled)));
      }
  }

Additionally, GetReplies::Send(TResultOrT&&) in src/server/string_family.cc (~line 543) should treat operation_canceled as a null reply rather than surfacing it as an error string:

  // current
  else {
      if (null_on_io_error_) rb->SendNull();
      else Send(iores.error().message());   // sends error text as bulk string — also wrong
  }

  // proposed
  else {
      if (null_on_io_error_ ||
          iores.error() == std::errc::operation_canceled)
          rb->SendNull();          // key expired mid-read → correct Redis null
      else
          rb->SendError(kTieredIoError);   // real I/O failure → proper Redis error
  }

Additional context

  • Only affects keys stored in tiered (SSD) storage (pv.IsExternal() == true). In-memory keys are not affected.
  • Hit rate is high: 123/500 keys (24.6%) in one iteration with --hz=100 and NVMe SSD, because the background expiry heartbeat fires every 10 ms while SSD I/O takes 0.1–2 ms — many in-flight reads overlap with an expiry cycle.
  • The ko.deleting flag was correctly introduced to prevent re-using the freed disk segment, but the read callbacks were never gated on it.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions