Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions app/src/pages/Agents.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,40 @@ export default function Agents() {
if (d.connected && d.emails?.length) { setGmailConn(true); setGmailData(d.emails) }
}),
fetchGithubActivity().then(d => {
if (d.connected && d.items?.length) { setGithubConn(true); setGithubData(d.items) }
if (d.connected && d.items?.length) {
setGithubConn(true); setGithubData(d.items)
// Reset filters on the mock → real transition so a leftover filter
// doesn't strand the user on an empty view of the new dataset.
setGhCategory('all'); setGhRepo('all')
}
}),
]).finally(() => setFeedLoading(false))
}, [])

// Background auto-refresh — pulls fresh data from the backend cache every
// 120s so the feed reflects upstream changes without a manual reload.
// Pairs with the server-side feed_poller that keeps the cache warm; the
// two intervals are intentionally desynced (each browser starts its phase
// from page-mount, the backend from process boot — the 180s cache TTL
// absorbs the drift). We deliberately do NOT set feedLoading on tick to
// avoid the spinner flicker, and we do NOT reset filters so the user's
// active selection survives the swap.
useEffect(() => {
if (!isLoggedIn()) return
const id = setInterval(() => {
fetchSlackMessages().then(d => {
if (d.connected) { setSlackConn(true); setSlackData(d.messages) }
})
fetchGmailMessages().then(d => {
if (d.connected) { setGmailConn(true); setGmailData(d.emails) }
})
fetchGithubActivity().then(d => {
if (d.connected) { setGithubConn(true); setGithubData(d.items) }
})
}, 120_000)
return () => clearInterval(id)
}, [])

// OAuth redirect params — also flip the active tab to whichever tool the
// user just connected, so they land on its feed instead of the Slack default.
useEffect(() => {
Expand All @@ -197,7 +226,12 @@ export default function Agents() {
}
if (p.get('github_connected')) {
setTab('github')
fetchGithubActivity().then(d => { if (d.connected) { setGithubConn(true); setGithubData(d.items) } })
fetchGithubActivity().then(d => {
if (d.connected) {
setGithubConn(true); setGithubData(d.items)
setGhCategory('all'); setGhRepo('all')
}
})
}
if (p.toString()) window.history.replaceState({}, '', '/agents')
}, [])
Expand All @@ -215,9 +249,9 @@ export default function Agents() {
// ── GitHub filter state ──────────────────
const [ghCategory, setGhCategory] = useState<GhFilter>('all')
const [ghRepo, setGhRepo] = useState<'all' | string>('all')
// Reset filters when the underlying data refreshes — avoids stranding the
// user on a filter that matches nothing.
useEffect(() => { setGhCategory('all'); setGhRepo('all') }, [githubData])
// Filters reset at call sites where the dataset shape genuinely changes
// (mount fetch, OAuth callback, disconnect-to-mock). Auto-refresh ticks
// do NOT reset — the user keeps their active filter across the swap.

const filteredGithub = useMemo(() => githubData.filter(it => {
if (ghCategory !== 'all' && (it.category ?? 'other') !== ghCategory) return false
Expand Down Expand Up @@ -260,7 +294,12 @@ export default function Agents() {
await disconnectService(tab as 'slack' | 'gmail' | 'github')
if (tab === 'slack') { setSlackConn(false); setSlackData(MOCK_SLACK) }
if (tab === 'gmail') { setGmailConn(false); setGmailData(MOCK_GMAIL) }
if (tab === 'github') { setGithubConn(false); setGithubData(MOCK_GITHUB) }
if (tab === 'github') {
setGithubConn(false); setGithubData(MOCK_GITHUB)
// Disconnect swaps real data for mock — filter selections from the
// real dataset (real repo names) will not match anything in mock.
setGhCategory('all'); setGhRepo('all')
}
}

async function handlePostDigest() {
Expand Down
5 changes: 5 additions & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,8 @@ RATE_LIMIT_ENABLED=true
# Set to "false" to skip starting the Phase C PR watcher on app startup.
# Useful if you're iterating on routers and don't want background dispatches.
PR_WATCHER_ENABLED=true

# Set to "false" to skip starting the Signal Feed poller (keeps the Slack/
# Gmail/GitHub cache warm). Disable when you want feed fetches to always be
# live against the upstream APIs.
FEED_POLLER_ENABLED=true
28 changes: 19 additions & 9 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
users, agents, gateway, credentials, tasks, roles, auth, chat,
watched_repos,
)
from app.services.feed_poller import FeedPoller
from app.services.pr_watcher import PRWatcher

logging.basicConfig(
Expand All @@ -26,23 +27,32 @@

@asynccontextmanager
async def lifespan(app: FastAPI):
"""Start the PR watcher background task; cancel it cleanly on shutdown.
"""Start the platform's background workers and cancel them cleanly on
shutdown. Each worker is individually env-gated so the test suite (and
ad-hoc debug runs) can disable them; defaults are all enabled.

Disabled when PR_WATCHER_ENABLED=false (test suites, ad-hoc debug runs);
otherwise on by default.
Workers:
- PRWatcher — autonomous PR review trigger (PR_WATCHER_ENABLED)
- FeedPoller — keeps Signal Feed cache warm (FEED_POLLER_ENABLED)
"""
watcher_task: asyncio.Task | None = None
background_tasks: list[asyncio.Task] = []

if os.getenv("PR_WATCHER_ENABLED", "true").lower() != "false":
watcher = PRWatcher()
watcher_task = asyncio.create_task(watcher.run_forever())
background_tasks.append(asyncio.create_task(PRWatcher().run_forever()))
if os.getenv("FEED_POLLER_ENABLED", "true").lower() != "false":
background_tasks.append(asyncio.create_task(FeedPoller().run_forever()))

try:
yield
finally:
if watcher_task is not None:
watcher_task.cancel()
# Cancel all tasks first, then await each — order matters: if we
# awaited inside the cancel loop the second worker would run for a
# tick or two after the first was already gone.
for task in background_tasks:
task.cancel()
for task in background_tasks:
try:
await watcher_task
await task
except asyncio.CancelledError:
pass

Expand Down
15 changes: 15 additions & 0 deletions backend/app/models/credential.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ def list_by_user(user_id: str) -> list[dict]:
)
return result.data

@staticmethod
def list_active_services() -> list[dict]:
"""Return every (user_id, service) pair that currently has a credential.

Used by the Signal Feed poller to enumerate the work it needs to do
each tick. Returns minimal columns — the poller doesn't need the
token (the fetcher will re-fetch the credential anyway)."""
result = (
get_supabase()
.table(TABLE)
.select("user_id, service")
.execute()
)
return result.data

@staticmethod
def delete(user_id: str, service: str) -> bool:
result = (
Expand Down
93 changes: 93 additions & 0 deletions backend/app/services/feed_poller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Background poll loop that keeps the Signal Feed cache warm.

The poller is a single asyncio.Task owned by the FastAPI lifespan, mirroring
`pr_watcher.py`. Every POLL_INTERVAL seconds it walks every credential row,
calls the matching fetcher, and overwrites the cache entry. Users opening
the feed in the meantime read the warm cache in <100ms instead of paying
the 2-5s live-fetch cost.

The poller is write-only against `signal_feed_cache` — it never reads.
Reading would defeat its purpose (a hit would short-circuit the refresh
the poller exists to perform).

No startup-staleness gate (unlike pr_watcher), because warming the cache
on tick 1 produces no user-visible side effect — there's no review being
dispatched, no notification being sent. Worst case after a long downtime:
the first tick repopulates from-scratch, exactly as if every user had just
opened the page.
"""

import asyncio
import logging
from typing import Awaitable, Callable

from app.models.credential import CredentialModel
from app.services import feed_fetchers, signal_feed_cache

logger = logging.getLogger(__name__)

POLL_INTERVAL_SECONDS = 120

# Only these services produce a Signal Feed payload. A user can have other
# credentials (e.g. discord) — those rows are skipped, not errored.
FEED_FETCHERS: dict[str, Callable[[str], Awaitable[dict]]] = {
"slack": feed_fetchers.slack_messages,
"gmail": feed_fetchers.gmail_messages,
"github": feed_fetchers.github_activity,
}


class FeedPoller:
Comment on lines +38 to +40
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CLAUDE.md docs convention violationFeedPoller is a peer to PRWatcher but is not documented in CLAUDE.md's Architecture section.

CLAUDE.md L25–27 documents the PR watcher service (its behavior, env gate, and file path). The FeedPoller is an equivalent background worker wired into the same lifespan, but the Architecture section has no entry for it.

CLAUDE.md rule: "When behavior or setup changes, update the relevant md (README.md, LOCAL_SETUP.md, ROADMAP.md, this file) in the same change."

The PR updates backend/.env.example with FEED_POLLER_ENABLED (correct), but CLAUDE.md itself needs a matching architecture bullet, e.g.:

Signal Feed poller. FastAPI lifespan starts an asyncio poll loop (backend/app/services/feed_poller.py) that walks every credential row every 120s, calls the matching feed fetcher (Slack/Gmail/GitHub), and writes results to the in-memory cache. Gated by FEED_POLLER_ENABLED.

def __init__(self, poll_interval: float = POLL_INTERVAL_SECONDS):
self._poll_interval = poll_interval

async def run_forever(self) -> None:
"""Sleep-tick-sleep until cancelled by the lifespan shutdown."""
logger.info("feed_poller: started, polling every %ss", self._poll_interval)
try:
while True:
try:
await self.tick()
except Exception: # noqa: BLE001 — never let the loop die
logger.exception("feed_poller: tick crashed; continuing")
await asyncio.sleep(self._poll_interval)
except asyncio.CancelledError:
logger.info("feed_poller: shutting down")
raise

async def tick(self) -> None:
"""One pass over every (user, feed-service) credential row.

Error isolation is at the (user, service) granularity: a Slack
outage for one user must not skip Gmail for the same user, nor
any other user's feeds.
"""
rows = CredentialModel.list_active_services()
if not rows:
return

refreshed = 0
for row in rows:
user_id = row["user_id"]
service = row["service"]
fetcher = FEED_FETCHERS.get(service)
if fetcher is None:
continue # discord et al. — no feed surface
try:
payload = await fetcher(user_id)
except Exception: # noqa: BLE001 — isolate per (user, service)
logger.exception(
"feed_poller: fetch failed user=%s service=%s",
user_id, service,
)
continue

# Only cache successful fetches. A connected=False response
# means the credential vanished mid-tick (raced with disconnect)
# or the upstream rejected the token — don't paper over it.
if payload.get("connected"):
signal_feed_cache.set(user_id, service, payload)
refreshed += 1

if refreshed:
logger.info("feed_poller: refreshed %s cache entries", refreshed)
3 changes: 3 additions & 0 deletions backend/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
# The PR watcher would spin up a background poll loop the moment
# TestClient(app) enters its lifespan; disable it for tests.
"PR_WATCHER_ENABLED": "false",
# Same reasoning — the Signal Feed poller is a second background
# task started in the lifespan; tests should never hit live APIs.
"FEED_POLLER_ENABLED": "false",
}
)

Expand Down
Loading