Skip to content

iamsinghrajat/async-cache

Repository files navigation

async-cache

info:In-memory application layer cache
https://static.pepy.tech/personalized-badge/async-cache?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads

Installation

pip install async-cache

See full documentation at https://async-cache.readthedocs.io/

Core Usage: Function API for Microservices

Use AsyncCache for flexible caching:

from cache import AsyncCache

cache = AsyncCache(maxsize=1000, default_ttl=300)  # TTL in seconds

async def get_data(key):
    return await cache.get(
        key,
        loader=lambda: db_query(key),  # auto-caches on miss
    )

# Warmup hot keys at startup
await cache.warmup({"hot:key": lambda: preload_hot()})

# Metrics for observability
print(cache.get_metrics())  # hits, misses, size, hit_rate

Key Features & Examples

Thundering Herd Protection

Prevents duplicate work under concurrent load (e.g., popular keys). Without it, 100 misses = 100 DB hits; with it, = 1.

cache = AsyncCache()
async def loader():
    return await db_query()  # expensive
# 100 concurrent -> 1 loader call
results = await asyncio.gather(*[cache.get('key', loader=loader) for _ in range(100)])
DataLoader-Style Batching

Groups concurrent gets into one batch call (reduces DB load; configurable window/size).

async def batch_loader(keys):
    # one DB query for batch
    return {k: await db_batch_query(k) for k in keys}
# auto-groups within 5ms window
await asyncio.gather(
    cache.get(1, batch_loader=batch_loader),
    cache.get(2, batch_loader=batch_loader)
)
Cache Warmup

Preload at startup to avoid cold misses.

await cache.warmup({
    "user:1": lambda: load_user(1),
    "config:global": lambda: load_config(),
})
Metrics

Observability for hit rate, size, etc. (global or per-function).

metrics = cache.get_metrics()  # or func.get_metrics()
# {'hits': 950, 'misses': 50, 'size': 200, 'hit_rate': 0.95}
# Use for Prometheus/monitoring
TTL & Invalidation

Per-key control + size-based eviction.

await cache.set('key', value, ttl=60)  # override
await cache.delete('key')  # or func.invalidate_cache(args)
cache.clear()

Decorator Convenience

For simple/readable code (uses core API under the hood):

from cache import AsyncLRU, AsyncTTL

@AsyncLRU(maxsize=128)
async def func(*args):
    ...

@AsyncTTL(time_to_live=60, skip_args=1)  # e.g. skip 'self'
async def method(self, arg):
    ...

Agent Cache — AI Agent-Aware Caching

AgentCache extends async-cache into an AI-agent-aware caching layer with tool execution caching, resource-based invalidation, session scoping, and loop detection.

from agent_cache import AgentCache, AgentCacheInvalidator, AgentCacheSession

Read Tool Caching

Cache results of read-only agent tools with resource tagging and TTL:

@AgentCache(resource="cart", scope="global", ttl=60)
async def get_cart(user_id):
    return await db.fetch_cart(user_id)

# First call fetches from DB; subsequent calls return cached result
cart = await get_cart("user_1")
cart = await get_cart("user_1")  # cache hit — no DB call

Write/Mutation Invalidation

Automatically invalidate related cached reads when mutations occur:

@AgentCacheInvalidator(resource="cart", scope="global")
async def add_to_cart(user_id, item):
    await db.add_item(user_id, item)

await get_cart("user_1")       # cached
await add_to_cart("user_1", "laptop")  # invalidates all "cart" cache entries
await get_cart("user_1")       # re-fetched from DB

Session Scoping

Isolate cache state per agent session. Session-scoped caches are cleared when the session ends — different agents or users never share stale data:

@AgentCache(resource="ticket", scope="session")
async def get_ticket(ticket_id):
    return await api.get_ticket(ticket_id)

async with AgentCacheSession(session_id="agent-A") as session:
    await get_ticket("T-100")  # fetched and cached for this session
    await get_ticket("T-100")  # cache hit

# Session ended — cache cleared. A new session starts fresh.

Loop Detection

Detect and halt infinite agent tool loops — oscillations, recursive cycles, retry storms:

async with AgentCacheSession(
    loop_detection=True,
    max_tool_repeats=5,       # max identical (tool+args) calls
    max_execution_depth=50,   # max total tool calls
    on_loop="raise",          # "raise", "warn", or "short_circuit"
) as session:
    # If the agent enters A -> B -> A -> B, raises AgentLoopDetectedError
    # If a tool is called > 5 times with same args, raises
    # If total calls exceed 50, raises

Observability

Expose metrics at session and global level:

from agent_cache import get_metrics

async with AgentCacheSession() as session:
    # ... agent workflow ...
    print(session.get_metrics())
    # {'hits': 12, 'misses': 5, 'invalidations': 2, 'loop_detections': 0, 'hit_rate': 0.706}

print(get_metrics())  # global aggregate

Real-World Use Cases

1. E-commerce Shopping Agent

An AI agent assists users with online shopping: browsing products, managing carts, and checking out. Without caching, the agent redundantly fetches the same cart on every step. Without invalidation, stale cart data causes incorrect pricing.

@AgentCache(resource="cart", scope="global", ttl=120)
async def get_cart(user_id):
    return await shop_api.get_cart(user_id)

@AgentCache(resource="product", scope="global", ttl=300)
async def get_product(product_id):
    return await shop_api.get_product(product_id)

@AgentCacheInvalidator(resource="cart", scope="global")
async def add_to_cart(user_id, product_id):
    return await shop_api.add_to_cart(user_id, product_id)

@AgentCacheInvalidator(resource="cart", scope="global")
async def checkout(user_id):
    return await shop_api.checkout(user_id)

async with AgentCacheSession(loop_detection=False) as session:
    cart = await get_cart("user_1")        # DB fetch
    product = await get_product("laptop")  # DB fetch
    await add_to_cart("user_1", "laptop")  # invalidates cart cache
    cart = await get_cart("user_1")        # re-fetched (correct total)
    await checkout("user_1")               # invalidates cart cache

2. Customer Support Agent (Session-Scoped)

Multiple support agents handle tickets concurrently. Each agent's session caches are isolated — Agent A updating a ticket doesn't serve stale data to Agent B.

@AgentCache(resource="ticket", scope="session")
async def get_ticket(ticket_id):
    return await support_api.get_ticket(ticket_id)

@AgentCacheInvalidator(resource="ticket", scope="session")
async def update_ticket(ticket_id, status):
    return await support_api.update_ticket(ticket_id, status)

# Agent A
async with AgentCacheSession(session_id="agent-A"):
    ticket = await get_ticket("T-100")          # fetched
    await update_ticket("T-100", "in_progress") # invalidates session cache
    ticket = await get_ticket("T-100")          # re-fetched

# Agent B — independent session, no stale data from Agent A
async with AgentCacheSession(session_id="agent-B"):
    ticket = await get_ticket("T-100")  # fresh fetch

3. Research Agent — Preventing Search Loops

A research agent searches the web and synthesizes results. Without loop detection, a confused agent can endlessly repeat the same search or oscillate between search and summarize.

@AgentCache(resource="search", scope="global", ttl=600)
async def search_web(query):
    return await search_api.search(query)

@AgentCache(resource="page", scope="global", ttl=600)
async def fetch_page(url):
    return await http.get(url)

async with AgentCacheSession(
    max_tool_repeats=3,
    max_execution_depth=20,
    on_loop="raise",
) as session:
    results = await search_web("async python caching")
    results = await search_web("async python caching")  # cache hit
    # If the agent calls search_web("async python caching") 4 times
    # → AgentLoopDetectedError raised, halting the runaway loop

4. Inventory Management Agent — Oscillation Detection

An inventory agent checks stock and updates quantities. A buggy planning loop can oscillate between checking and updating endlessly.

@AgentCache(resource="stock", scope="global")
async def check_stock(item_id):
    return await inventory_api.get_stock(item_id)

@AgentCacheInvalidator(resource="stock", scope="global")
async def update_stock(item_id, delta):
    return await inventory_api.adjust(item_id, delta)

async with AgentCacheSession(max_tool_repeats=100, on_loop="raise") as session:
    await check_stock("SKU-42")
    await update_stock("SKU-42", -1)
    await check_stock("SKU-42")
    await update_stock("SKU-42", -1)
    # → AgentLoopDetectedError: Cycle detected: check_stock -> update_stock

5. Multi-API Orchestrator — Retry Storm Detection

An agent calling flaky external APIs can get stuck retrying. Loop detection catches retry storms before they exhaust rate limits or budgets.

@AgentCacheInvalidator(resource="payment", scope="global")
async def charge_payment(order_id, amount):
    return await payment_api.charge(order_id, amount)

async with AgentCacheSession(max_tool_repeats=3, on_loop="raise") as session:
    for attempt in range(10):
        try:
            await charge_payment("ORD-1", 99.99)
            break
        except PaymentError:
            continue  # retry
    # After 4th attempt → AgentLoopDetectedError stops the retry storm

Testing

Run the full test suite:

python -m unittest discover tests -v

A local test dashboard is also available for interactive testing:

python demo/app.py  # Runs on http://localhost:5001

Use it to verify caching behavior, metrics, and concurrent load handling.