1+ import time
2+ import redis
3+ from redis .exceptions import ConnectionError , TimeoutError
4+ import logging
5+
6+ logger = logging .getLogger (__name__ )
7+ import random
8+
9+ class _RedisWithRetry :
10+ """Lightweight proxy that wraps a redis.Redis instance.
11+
12+ Any callable attribute (e.g. get, set, blpop, xadd …) is executed with a
13+ retry loop that catches *ConnectionError* and *TimeoutError* from redis‑py
14+ and re‑issues the call after an exponential back‑off (base_delay × backoff^n)
15+ plus a small random jitter.
16+ """
17+
18+ RETRYABLE = (ConnectionError , TimeoutError )
19+
20+ def __init__ (self , client : redis .Redis , * ,
21+ max_attempts : int = 5 ,
22+ base_delay : float = 0.5 ,
23+ backoff : float = 2.0 ,
24+ jitter : float = 0.3 ):
25+ self ._client = client
26+ self ._max_attempts = max_attempts
27+ self ._base_delay = base_delay
28+ self ._backoff = backoff
29+ self ._jitter = jitter
30+
31+ # Forward non‑callable attrs (e.g. "connection_pool") directly ──────────
32+ def __getattr__ (self , name ):
33+ attr = getattr (self ._client , name )
34+ if not callable (attr ):
35+ return attr
36+
37+ # Wrap callable with retry loop
38+ def _wrapped (* args , ** kwargs ):
39+ attempt = 0
40+ delay = self ._base_delay
41+ while True :
42+ try :
43+ return attr (* args , ** kwargs )
44+ except self .RETRYABLE as exc :
45+ attempt += 1
46+ if attempt >= self ._max_attempts :
47+ logger .error (
48+ f"Redis command '{ name } ' failed after { attempt } attempts: { exc } " )
49+ raise
50+ sleep_for = delay + random .uniform (0 , self ._jitter )
51+ logger .warning (
52+ f"Redis '{ name } ' failed ({ exc .__class__ .__name__ } : { exc } ). "
53+ f"Retrying in { sleep_for :.2f} s (attempt { attempt } /{ self ._max_attempts } )" )
54+ time .sleep (sleep_for )
55+ delay *= self ._backoff
56+ return _wrapped
0 commit comments