Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/critical/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def pytest_runtest_setup(item):
or "memcached_backend" in item.nodeid
or "secure_env_fallback" in item.nodeid
or "local_cache_works" in item.nodeid
or "backpressure_load_control" in item.nodeid
)
if skip_redis:
# Remove autouse redis fixtures for tests that don't need Redis
Expand Down
48 changes: 34 additions & 14 deletions tests/critical/test_backpressure_load_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pytest

from cachekit.reliability.load_control import BackpressureController
from tests.utils.timing_helper import ThreadGate, TimingHelper

pytestmark = pytest.mark.critical

Expand All @@ -38,7 +39,7 @@ def slow_operation():
active_count["value"] += 1
max_observed["value"] = max(max_observed["value"], active_count["value"])

time.sleep(0.1) # Hold permit for 100ms
time.sleep(0.1) # Hold permit for 100ms (simulates real work)

with lock:
active_count["value"] -= 1
Expand All @@ -61,14 +62,17 @@ def test_request_rejected_when_queue_full(self):
timeout=10.0, # Long timeout (we won't wait)
)

gate = ThreadGate()

# Block the single execution slot
lock = threading.Lock()
lock.acquire() # Held until we release it
execution_lock = threading.Lock()
execution_lock.acquire() # Held until we release it

def blocking_operation():
"""Operation that blocks until lock is released."""
with controller.acquire():
with lock: # This will block
gate.signal("acquired")
with execution_lock: # This will block
pass

def quick_operation():
Expand All @@ -79,7 +83,7 @@ def quick_operation():
# Start the blocking operation in a thread
blocking_thread = threading.Thread(target=blocking_operation)
blocking_thread.start()
time.sleep(0.05) # Let it acquire the execution permit
gate.wait("acquired")

# Now the execution slot is occupied, queue should start filling
# Try to launch more operations than queue_size
Expand All @@ -103,7 +107,7 @@ def try_operation():
t.join(timeout=1.0)

# Release the blocking operation
lock.release()
execution_lock.release()
blocking_thread.join(timeout=1.0)

# Should have rejected the operations beyond queue capacity
Expand All @@ -117,19 +121,22 @@ def test_fast_fail_when_overloaded(self):
timeout=0.01, # Very short timeout for fast fail
)

gate = ThreadGate()

# Occupy the execution slot
execution_lock = threading.Lock()
execution_lock.acquire()

def blocking_op():
with controller.acquire():
gate.signal("acquired")
with execution_lock:
pass

# Start blocking operation
thread = threading.Thread(target=blocking_op)
thread.start()
time.sleep(0.05) # Let it acquire
gate.wait("acquired")

# Now queue is full and execution is blocked
# New request should fail fast
Expand Down Expand Up @@ -176,7 +183,12 @@ def queued_operation():
for t in threads:
t.start()

time.sleep(0.1) # Let them queue up
# Wait for threads to queue up (deterministic polling)
TimingHelper.wait_for_condition(
lambda: controller.queue_depth > 0,
timeout=2.0,
message="Should have queued requests",
)

# Queue depth should be >0 (threads are waiting)
current_depth = controller.queue_depth
Expand All @@ -199,18 +211,21 @@ def test_rejected_count_increments_correctly(self):
timeout=0.01,
)

gate = ThreadGate()

# Block the execution slot
execution_lock = threading.Lock()
execution_lock.acquire()

def blocking_op():
with controller.acquire():
gate.signal("acquired")
with execution_lock:
pass

thread = threading.Thread(target=blocking_op)
thread.start()
time.sleep(0.05)
gate.wait("acquired")

initial_rejected = controller.rejected_count

Expand Down Expand Up @@ -326,13 +341,15 @@ def queued_op():
for t in threads:
t.start()

time.sleep(0.1) # Let them queue
# Wait for threads to queue (deterministic polling)
TimingHelper.wait_for_condition(
lambda: controller.queue_depth > 0,
timeout=2.0,
message="Should have queued requests",
)

stats = controller.get_stats()
assert stats["queue_depth"] > 0, "Should show queued requests"
# Health should be False if queue_depth >= 80% of queue_size
# 90 operations, 10 concurrent = 80 in queue -> 80% threshold
# This might be True or False depending on exact timing

# Cleanup
execution_lock.release()
Expand Down Expand Up @@ -402,18 +419,21 @@ def test_reset_stats_clears_rejected_count(self):
"""CRITICAL: reset_stats() properly clears monitoring counters."""
controller = BackpressureController(max_concurrent=1, queue_size=1, timeout=0.01)

gate = ThreadGate()

# Block execution
execution_lock = threading.Lock()
execution_lock.acquire()

def blocking_op():
with controller.acquire():
gate.signal("acquired")
with execution_lock:
pass

thread = threading.Thread(target=blocking_op)
thread.start()
time.sleep(0.05)
gate.wait("acquired")

# Generate rejections
from cachekit.backends.errors import BackendError
Expand Down
Loading
Loading