From 3b2d7b418e33ccd48b2f1e1b09e083274653e976 Mon Sep 17 00:00:00 2001 From: Michael Wang Date: Sun, 24 May 2026 11:11:34 -0700 Subject: [PATCH] feat: autonomous PR-watcher (Phase C) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Background asyncio loop in the platform's FastAPI lifespan that walks running code-review-engineer agents × their watched_repos, fetches open PRs from GitHub, dedups against reviewed_prs, and dispatches a review task — the last remaining piece of the Code Review Engineer epic. - watched_repos table (#28: unique(user_id, owner, repo) prevents two agents under the same user from watching the same repo and posting duplicate reviews) - WatchedRepoModel translates the PG unique-constraint violation into WatchedRepoExists so the router can return 409 - watched_repos router: POST/GET/DELETE /agents/{id}/watched-repos under user-side X-Api-Key auth - PRWatcher service: 120s cadence, per-(agent,repo) error isolation, 409-from-sidecar treated as "agent busy, defer to next tick", startup-only 30-min staleness gate to avoid backlog-flooding on platform restart - main.py lifespan starts/cancels the watcher; PR_WATCHER_ENABLED=false opt-out for tests and ad-hoc debug runs - 17 new tests (125 → 142, all passing) Co-Authored-By: Claude Opus 4.7 --- backend/app/main.py | 34 ++- backend/app/models/agent.py | 18 ++ backend/app/models/watched_repo.py | 93 ++++++ backend/app/routers/watched_repos.py | 77 +++++ backend/app/schemas/watched_repo.py | 17 ++ backend/app/services/pr_watcher.py | 174 +++++++++++ .../migrations/004_code_review_engineer.sql | 25 ++ backend/migrations/schema.sql | 16 ++ backend/tests/conftest.py | 3 + backend/tests/test_pr_watcher.py | 269 ++++++++++++++++++ backend/tests/test_watched_repos.py | 156 ++++++++++ 11 files changed, 881 insertions(+), 1 deletion(-) create mode 100644 backend/app/models/watched_repo.py create mode 100644 backend/app/routers/watched_repos.py create mode 100644 backend/app/schemas/watched_repo.py create mode 100644 backend/app/services/pr_watcher.py create mode 100644 backend/tests/test_pr_watcher.py create mode 100644 backend/tests/test_watched_repos.py diff --git a/backend/app/main.py b/backend/app/main.py index 5b1efa0..e9ce113 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,5 +1,7 @@ +import asyncio import logging import os +from contextlib import asynccontextmanager import httpx from fastapi import FastAPI, Request @@ -9,7 +11,11 @@ from slowapi.errors import RateLimitExceeded from app.ratelimit import limiter -from app.routers import users, agents, gateway, credentials, tasks, roles, auth, chat +from app.routers import ( + users, agents, gateway, credentials, tasks, roles, auth, chat, + watched_repos, +) +from app.services.pr_watcher import PRWatcher logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO"), @@ -17,10 +23,35 @@ ) logger = logging.getLogger("agentos") + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Start the PR watcher background task; cancel it cleanly on shutdown. + + Disabled when PR_WATCHER_ENABLED=false (test suites, ad-hoc debug runs); + otherwise on by default. + """ + watcher_task: asyncio.Task | None = None + if os.getenv("PR_WATCHER_ENABLED", "true").lower() != "false": + watcher = PRWatcher() + watcher_task = asyncio.create_task(watcher.run_forever()) + + try: + yield + finally: + if watcher_task is not None: + watcher_task.cancel() + try: + await watcher_task + except asyncio.CancelledError: + pass + + app = FastAPI( title="AgentOS Platform", description="Multi-tenant platform for hiring AI employees", version="0.1.0", + lifespan=lifespan, ) # ── Rate limiting ──────────────────────────────────────────────────────────── @@ -70,6 +101,7 @@ async def unhandled_error_handler(request: Request, exc: Exception): app.include_router(auth.router) app.include_router(auth.compat_router) app.include_router(chat.router) +app.include_router(watched_repos.router) @app.get("/health") diff --git a/backend/app/models/agent.py b/backend/app/models/agent.py index 38ff7a2..9ba55ca 100644 --- a/backend/app/models/agent.py +++ b/backend/app/models/agent.py @@ -50,6 +50,24 @@ def list_by_user(user_id: str) -> list[dict]: ) return result.data + @staticmethod + def list_running_by_role(role: str) -> list[dict]: + """Return every running agent for a given role. + + Used by the pr_watcher to enumerate Code Review Engineers it should + poll on behalf of. Filters on status='running' so stopped/errored + agents drop out of the watcher loop automatically. + """ + result = ( + get_supabase() + .table(TABLE) + .select("*") + .eq("role", role) + .eq("status", "running") + .execute() + ) + return result.data + @staticmethod def update(agent_id: str, **fields) -> dict | None: result = ( diff --git a/backend/app/models/watched_repo.py b/backend/app/models/watched_repo.py new file mode 100644 index 0000000..0930f29 --- /dev/null +++ b/backend/app/models/watched_repo.py @@ -0,0 +1,93 @@ +"""Repos a Code Review Engineer is subscribed to. + +The platform's pr_watcher iterates rows in this table on a 120s cadence +and dispatches review tasks for unreviewed open PRs. Cross-agent uniqueness +on (user_id, owner, repo) means two agents under the same user cannot watch +the same repo — this prevents the double-review problem (#28). +""" + +from app.database import get_supabase + +TABLE = "watched_repos" + + +class WatchedRepoExists(Exception): + """Raised when a (user_id, owner, repo) row already exists. + + The unique constraint at the DB level is the real guard; this exception + lets the router translate the violation into a 409 with a clean message. + """ + + +class WatchedRepoModel: + @staticmethod + def create(agent_id: str, user_id: str, owner: str, repo: str) -> dict: + """Subscribe an agent to a repo. + + Raises WatchedRepoExists if another agent under the same user already + watches this (owner, repo). Surfaced as a 409 in the router. + """ + data = { + "agent_id": agent_id, + "user_id": user_id, + "owner": owner, + "repo": repo, + } + try: + result = get_supabase().table(TABLE).insert(data).execute() + except Exception as exc: # noqa: BLE001 — supabase wraps PG errors + # The unique constraint violation comes back as a postgres error + # with code 23505. The supabase client surfaces it as a generic + # exception; check the message for the conflict marker. + msg = str(exc) + if "duplicate key" in msg or "23505" in msg or "watched_repos" in msg.lower(): + raise WatchedRepoExists( + f"This repo is already watched by another agent under user {user_id}" + ) from exc + raise + return result.data[0] + + @staticmethod + def list_by_agent(agent_id: str) -> list[dict]: + result = ( + get_supabase() + .table(TABLE) + .select("*") + .eq("agent_id", agent_id) + .order("created_at", desc=True) + .execute() + ) + return result.data + + @staticmethod + def list_all() -> list[dict]: + """Return every watched_repos row across every agent. + + Used by the pr_watcher tick — one query per cycle, in-process group + by agent_id. The platform's hackathon scale is well within what a + full-table scan can handle; revisit if we ever cross ~10k rows. + """ + result = get_supabase().table(TABLE).select("*").execute() + return result.data + + @staticmethod + def get_by_id(watched_id: str) -> dict | None: + result = ( + get_supabase() + .table(TABLE) + .select("*") + .eq("id", watched_id) + .execute() + ) + return result.data[0] if result.data else None + + @staticmethod + def delete(watched_id: str) -> bool: + result = ( + get_supabase() + .table(TABLE) + .delete() + .eq("id", watched_id) + .execute() + ) + return len(result.data) > 0 diff --git a/backend/app/routers/watched_repos.py b/backend/app/routers/watched_repos.py new file mode 100644 index 0000000..19cf06b --- /dev/null +++ b/backend/app/routers/watched_repos.py @@ -0,0 +1,77 @@ +"""User-side endpoints for subscribing agents to repos. + +The pr_watcher reads these rows to decide which (agent, owner, repo) +combinations to poll on each tick. Cross-agent uniqueness is enforced +at the DB level so we surface the conflict here as a 409. +""" + +from fastapi import APIRouter, Depends, HTTPException + +from app.auth import get_current_user +from app.models.agent import AgentModel +from app.models.watched_repo import WatchedRepoExists, WatchedRepoModel +from app.schemas.watched_repo import WatchedRepoCreate, WatchedRepoResponse + +router = APIRouter(prefix="/agents", tags=["watched-repos"]) + + +def _resolve_owned_agent(agent_id: str, user_id: str) -> dict: + """Fetch the agent and require it to belong to the caller. + + 404 hides the existence of agents owned by other users — same shape + as the existing /agents/{id} handler. + """ + agent = AgentModel.get_by_id(agent_id) + if not agent or agent["user_id"] != user_id: + raise HTTPException(404, "Agent not found") + return agent + + +@router.post( + "/{agent_id}/watched-repos", + response_model=WatchedRepoResponse, + status_code=201, +) +def subscribe_repo( + agent_id: str, + payload: WatchedRepoCreate, + user: dict = Depends(get_current_user), +): + _resolve_owned_agent(agent_id, user["id"]) + try: + return WatchedRepoModel.create( + agent_id=agent_id, + user_id=user["id"], + owner=payload.owner, + repo=payload.repo, + ) + except WatchedRepoExists as exc: + raise HTTPException(409, str(exc)) + + +@router.get( + "/{agent_id}/watched-repos", + response_model=list[WatchedRepoResponse], +) +def list_watched_repos( + agent_id: str, + user: dict = Depends(get_current_user), +): + _resolve_owned_agent(agent_id, user["id"]) + return WatchedRepoModel.list_by_agent(agent_id) + + +@router.delete( + "/{agent_id}/watched-repos/{watched_id}", + status_code=204, +) +def unsubscribe_repo( + agent_id: str, + watched_id: str, + user: dict = Depends(get_current_user), +): + _resolve_owned_agent(agent_id, user["id"]) + row = WatchedRepoModel.get_by_id(watched_id) + if not row or row["agent_id"] != agent_id: + raise HTTPException(404, "Watched repo not found") + WatchedRepoModel.delete(watched_id) diff --git a/backend/app/schemas/watched_repo.py b/backend/app/schemas/watched_repo.py new file mode 100644 index 0000000..dce08cb --- /dev/null +++ b/backend/app/schemas/watched_repo.py @@ -0,0 +1,17 @@ +from datetime import datetime + +from pydantic import BaseModel + + +class WatchedRepoCreate(BaseModel): + owner: str + repo: str + + +class WatchedRepoResponse(BaseModel): + id: str + agent_id: str + user_id: str + owner: str + repo: str + created_at: datetime diff --git a/backend/app/services/pr_watcher.py b/backend/app/services/pr_watcher.py new file mode 100644 index 0000000..50728c4 --- /dev/null +++ b/backend/app/services/pr_watcher.py @@ -0,0 +1,174 @@ +"""Background poll loop that drives autonomous PR reviews. + +The watcher is a single asyncio.Task owned by the FastAPI lifespan. Every +POLL_INTERVAL seconds it walks the (running code-review-engineer agents) × +(their watched_repos) cross-product, asks GitHub for the list of open PRs +on each repo, dedupes against reviewed_prs, and dispatches a review task +for each unreviewed PR. The agent itself decides to call the github-pr-review +skill from inside its container; the watcher just triggers. + +Dedup is intentionally tied to the review skill successfully posting to +POST /gateway/github/review (which writes the reviewed_prs row server-side +in Phase D). If the LLM never actually submits the review, the PR stays +"unreviewed" and we'll dispatch again next tick. The watcher is the +trigger, not the bookkeeper. + +The 30-minute staleness gate is startup-only: on the first tick after the +platform boots, ignore PRs older than 30 min so we don't backlog-review a +queue that accumulated while the platform was down. Steady-state ticks +have no time gate. +""" + +import asyncio +import logging +from datetime import datetime, timedelta, timezone + +import httpx + +from app.models.agent import AgentModel +from app.models.reviewed_pr import ReviewedPRModel +from app.models.watched_repo import WatchedRepoModel +from app.services.dispatcher import Dispatcher +from app.services.gateway import GatewayService + +logger = logging.getLogger(__name__) + +POLL_INTERVAL_SECONDS = 120 +STARTUP_STALENESS_WINDOW = timedelta(minutes=30) +WATCHED_ROLE = "code-review-engineer" + + +def _parse_iso8601(s: str) -> datetime: + """Parse a GitHub-shaped ISO timestamp ('...Z') into an aware datetime.""" + if s.endswith("Z"): + s = s[:-1] + "+00:00" + return datetime.fromisoformat(s) + + +class PRWatcher: + def __init__( + self, + dispatcher: Dispatcher | None = None, + poll_interval: float = POLL_INTERVAL_SECONDS, + ): + self._dispatcher = dispatcher or Dispatcher() + self._poll_interval = poll_interval + self._started_at: datetime | None = None + self._first_tick_done = False + + async def run_forever(self) -> None: + """Sleep-tick-sleep until cancelled by the lifespan shutdown.""" + self._started_at = datetime.now(timezone.utc) + self._first_tick_done = False + logger.info("pr_watcher: started, polling every %ss", self._poll_interval) + try: + while True: + try: + await self.tick() + except Exception: # noqa: BLE001 — never let the loop die + logger.exception("pr_watcher: tick crashed; continuing") + self._first_tick_done = True + await asyncio.sleep(self._poll_interval) + except asyncio.CancelledError: + logger.info("pr_watcher: shutting down") + raise + + async def tick(self) -> None: + """One pass over (running agents) × (their watched_repos). + + Error isolation is at the (agent, repo) granularity: one repo + failing the GitHub call must not skip the rest. + """ + agents = AgentModel.list_running_by_role(WATCHED_ROLE) + if not agents: + return + + watched = WatchedRepoModel.list_all() + # Group rows by agent_id so we make at most one pass per (agent, repo). + by_agent: dict[str, list[dict]] = {} + for row in watched: + by_agent.setdefault(row["agent_id"], []).append(row) + + for agent in agents: + agent_repos = by_agent.get(agent["id"], []) + for row in agent_repos: + try: + await self._review_repo(agent, row["owner"], row["repo"]) + except Exception: # noqa: BLE001 — isolate per-repo failures + logger.exception( + "pr_watcher: failure for agent=%s repo=%s/%s", + agent["id"], row["owner"], row["repo"], + ) + + async def _review_repo(self, agent: dict, owner: str, repo: str) -> None: + """Fetch open PRs and dispatch a review for any that are unreviewed.""" + resp = await GatewayService.list_pull_requests( + user_id=agent["user_id"], owner=owner, repo=repo + ) + prs = resp.get("data") or [] + if not isinstance(prs, list): + # GitHub returns an error object (dict) on auth/rate-limit failures. + logger.warning( + "pr_watcher: list_pull_requests returned non-list for %s/%s: %s", + owner, repo, resp, + ) + return + + cutoff = ( + (self._started_at or datetime.now(timezone.utc)) - STARTUP_STALENESS_WINDOW + if not self._first_tick_done + else None + ) + + for pr in prs: + pr_number = pr.get("number") + if pr_number is None: + continue + + if cutoff is not None: + created_raw = pr.get("created_at") + if created_raw: + try: + created = _parse_iso8601(created_raw) + except ValueError: + created = None + if created is not None and created < cutoff: + continue + + if ReviewedPRModel.exists(agent["id"], owner, repo, pr_number): + continue + + await self._dispatch_review(agent, owner, repo, pr_number) + + async def _dispatch_review( + self, agent: dict, owner: str, repo: str, pr_number: int + ) -> None: + """Send the review instruction. 409 from the sidecar = agent busy, try next tick.""" + instruction = ( + f"A new pull request is open at {owner}/{repo}#{pr_number}. " + "Use the github-pr-review skill to fetch the diff, review it, " + "and submit your review via the platform gateway." + ) + try: + await self._dispatcher.dispatch_task( + agent_id=agent["id"], + instruction=instruction, + metadata={ + "trigger": "pr_watcher", + "owner": owner, + "repo": repo, + "pr_number": pr_number, + }, + ) + logger.info( + "pr_watcher: dispatched review agent=%s repo=%s/%s pr=#%s", + agent["id"], owner, repo, pr_number, + ) + except httpx.HTTPStatusError as exc: + if exc.response.status_code == 409: + logger.info( + "pr_watcher: agent=%s busy, deferring %s/%s#%s", + agent["id"], owner, repo, pr_number, + ) + return + raise diff --git a/backend/migrations/004_code_review_engineer.sql b/backend/migrations/004_code_review_engineer.sql index 590daff..583aefa 100644 --- a/backend/migrations/004_code_review_engineer.sql +++ b/backend/migrations/004_code_review_engineer.sql @@ -53,15 +53,40 @@ create table if not exists reviewed_prs ( create index if not exists idx_reviewed_prs_agent_id on reviewed_prs(agent_id); +-- ── Phase C — autonomous PR review ─────────────────────────────────────────── +-- Repos each agent is subscribed to. The platform's pr_watcher polls these on +-- a 120s cadence and dispatches review tasks for unreviewed open PRs. +-- +-- unique(user_id, owner, repo) prevents two agents owned by the same user +-- from watching the same repo (#28): without it, both would dispatch a +-- review of the same PR, doubling token cost and posting duplicate reviews. +-- user_id is duplicated from agents.user_id so the constraint can sit on +-- this row directly without a join. +create table if not exists watched_repos ( + id uuid primary key default gen_random_uuid(), + agent_id uuid references agents(id) on delete cascade not null, + user_id uuid references users(id) on delete cascade not null, + owner text not null, + repo text not null, + created_at timestamptz default now(), + unique(user_id, owner, repo) +); + +create index if not exists idx_watched_repos_agent_id on watched_repos(agent_id); +create index if not exists idx_watched_repos_user_id on watched_repos(user_id); + -- RLS — same closed-by-default pattern as the existing tables. alter table agent_memory enable row level security; alter table agent_action_log enable row level security; alter table reviewed_prs enable row level security; +alter table watched_repos enable row level security; drop policy if exists "Service role full access on agent_memory" on agent_memory; drop policy if exists "Service role full access on agent_action_log" on agent_action_log; drop policy if exists "Service role full access on reviewed_prs" on reviewed_prs; +drop policy if exists "Service role full access on watched_repos" on watched_repos; create policy "Service role full access on agent_memory" on agent_memory for all using (true); create policy "Service role full access on agent_action_log" on agent_action_log for all using (true); create policy "Service role full access on reviewed_prs" on reviewed_prs for all using (true); +create policy "Service role full access on watched_repos" on watched_repos for all using (true); diff --git a/backend/migrations/schema.sql b/backend/migrations/schema.sql index f80cab0..aa4daab 100644 --- a/backend/migrations/schema.sql +++ b/backend/migrations/schema.sql @@ -82,6 +82,19 @@ create table if not exists reviewed_prs ( create index if not exists idx_reviewed_prs_agent_id on reviewed_prs(agent_id); +create table if not exists watched_repos ( + id uuid primary key default gen_random_uuid(), + agent_id uuid references agents(id) on delete cascade not null, + user_id uuid references users(id) on delete cascade not null, + owner text not null, + repo text not null, + created_at timestamptz default now(), + unique(user_id, owner, repo) +); + +create index if not exists idx_watched_repos_agent_id on watched_repos(agent_id); +create index if not exists idx_watched_repos_user_id on watched_repos(user_id); + -- ── Row Level Security ─────────────────────────────────────────────────────── -- The backend uses the service-role key, which bypasses RLS. These policies -- keep anon/authenticated access closed by default. @@ -91,6 +104,7 @@ alter table credentials enable row level security; alter table agent_memory enable row level security; alter table agent_action_log enable row level security; alter table reviewed_prs enable row level security; +alter table watched_repos enable row level security; drop policy if exists "Service role full access on users" on users; drop policy if exists "Service role full access on agents" on agents; @@ -98,6 +112,7 @@ drop policy if exists "Service role full access on credentials" on credentials; drop policy if exists "Service role full access on agent_memory" on agent_memory; drop policy if exists "Service role full access on agent_action_log" on agent_action_log; drop policy if exists "Service role full access on reviewed_prs" on reviewed_prs; +drop policy if exists "Service role full access on watched_repos" on watched_repos; create policy "Service role full access on users" on users for all using (true); create policy "Service role full access on agents" on agents for all using (true); @@ -105,3 +120,4 @@ create policy "Service role full access on credentials" on credentials for all u create policy "Service role full access on agent_memory" on agent_memory for all using (true); create policy "Service role full access on agent_action_log" on agent_action_log for all using (true); create policy "Service role full access on reviewed_prs" on reviewed_prs for all using (true); +create policy "Service role full access on watched_repos" on watched_repos for all using (true); diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 0567e22..1b26d45 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -24,6 +24,9 @@ "PLATFORM_GATEWAY_URL": "http://host.docker.internal:8000/gateway", # Keep rate limits out of the way of the test suite. "RATE_LIMIT_ENABLED": "false", + # The PR watcher would spin up a background poll loop the moment + # TestClient(app) enters its lifespan; disable it for tests. + "PR_WATCHER_ENABLED": "false", } ) diff --git a/backend/tests/test_pr_watcher.py b/backend/tests/test_pr_watcher.py new file mode 100644 index 0000000..5d5a5b4 --- /dev/null +++ b/backend/tests/test_pr_watcher.py @@ -0,0 +1,269 @@ +"""Tests for the PR watcher service. + +The watcher walks (running code-review-engineer agents) × (their watched_repos), +asks GitHub for open PRs, dedups vs reviewed_prs, and dispatches a review task. +These tests run a single tick at a time — the asyncio.sleep loop is exercised +only briefly in the lifecycle test. +""" + +import asyncio +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx +import pytest + +from tests.conftest import _make_agent + + +def _running_agent(**overrides): + return _make_agent(role="code-review-engineer", status="running", **overrides) + + +def _watched_row(**overrides): + base = { + "id": "wr-1", + "agent_id": "agent-001", + "user_id": "user-001", + "owner": "octocat", + "repo": "hello-world", + "created_at": "2025-01-01T00:00:00+00:00", + } + base.update(overrides) + return base + + +def _pr(number: int, created_at: str | None = None): + return {"number": number, "created_at": created_at or "2099-01-01T00:00:00Z"} + + +class TestTick: + @pytest.mark.asyncio + async def test_tick_dispatches_review_for_unreviewed_pr(self): + from app.services.pr_watcher import PRWatcher + + agent = _running_agent() + with patch("app.services.pr_watcher.AgentModel") as mock_agents, \ + patch("app.services.pr_watcher.WatchedRepoModel") as mock_repos, \ + patch("app.services.pr_watcher.GatewayService") as mock_gw, \ + patch("app.services.pr_watcher.ReviewedPRModel") as mock_rev: + mock_agents.list_running_by_role.return_value = [agent] + mock_repos.list_all.return_value = [_watched_row()] + mock_gw.list_pull_requests = AsyncMock( + return_value={"status": 200, "data": [_pr(42)]} + ) + mock_rev.exists.return_value = False + + dispatcher = MagicMock() + dispatcher.dispatch_task = AsyncMock(return_value={"accepted": True}) + watcher = PRWatcher(dispatcher=dispatcher) + # Mark first tick as done so the 30-min gate doesn't run. + watcher._first_tick_done = True + await watcher.tick() + + dispatcher.dispatch_task.assert_awaited_once() + kwargs = dispatcher.dispatch_task.call_args.kwargs + assert kwargs["agent_id"] == agent["id"] + assert "octocat/hello-world#42" in kwargs["instruction"] + assert kwargs["metadata"]["pr_number"] == 42 + + @pytest.mark.asyncio + async def test_tick_skips_already_reviewed_pr(self): + from app.services.pr_watcher import PRWatcher + + with patch("app.services.pr_watcher.AgentModel") as mock_agents, \ + patch("app.services.pr_watcher.WatchedRepoModel") as mock_repos, \ + patch("app.services.pr_watcher.GatewayService") as mock_gw, \ + patch("app.services.pr_watcher.ReviewedPRModel") as mock_rev: + mock_agents.list_running_by_role.return_value = [_running_agent()] + mock_repos.list_all.return_value = [_watched_row()] + mock_gw.list_pull_requests = AsyncMock( + return_value={"status": 200, "data": [_pr(42)]} + ) + mock_rev.exists.return_value = True + + dispatcher = MagicMock() + dispatcher.dispatch_task = AsyncMock() + watcher = PRWatcher(dispatcher=dispatcher) + watcher._first_tick_done = True + await watcher.tick() + + dispatcher.dispatch_task.assert_not_awaited() + + @pytest.mark.asyncio + async def test_tick_treats_409_as_deferred_not_fatal(self): + """A 409 from the sidecar means the agent is busy — skip and try next tick.""" + from app.services.pr_watcher import PRWatcher + + with patch("app.services.pr_watcher.AgentModel") as mock_agents, \ + patch("app.services.pr_watcher.WatchedRepoModel") as mock_repos, \ + patch("app.services.pr_watcher.GatewayService") as mock_gw, \ + patch("app.services.pr_watcher.ReviewedPRModel") as mock_rev: + mock_agents.list_running_by_role.return_value = [_running_agent()] + mock_repos.list_all.return_value = [_watched_row()] + mock_gw.list_pull_requests = AsyncMock( + return_value={"status": 200, "data": [_pr(42)]} + ) + mock_rev.exists.return_value = False + + busy_response = MagicMock() + busy_response.status_code = 409 + dispatcher = MagicMock() + dispatcher.dispatch_task = AsyncMock( + side_effect=httpx.HTTPStatusError( + "busy", request=MagicMock(), response=busy_response + ) + ) + watcher = PRWatcher(dispatcher=dispatcher) + watcher._first_tick_done = True + # Should not raise. + await watcher.tick() + + # We don't write reviewed_prs on a deferred dispatch — that only + # happens server-side when the review skill actually fires. The + # PR remains a candidate for the next tick. + # No exception bubbled up: the watcher swallowed the 409. + + @pytest.mark.asyncio + async def test_tick_isolates_failures_per_repo(self): + """One repo's GitHub call failing must not stop the rest.""" + from app.services.pr_watcher import PRWatcher + + agent = _running_agent() + repo_bad = _watched_row(id="wr-bad", repo="broken") + repo_good = _watched_row(id="wr-good", repo="ok") + + # First call (broken) raises, second (ok) returns a PR. + call_results = [ + RuntimeError("github API down"), + {"status": 200, "data": [_pr(7)]}, + ] + + async def fake_list_prs(**kwargs): + outcome = call_results.pop(0) + if isinstance(outcome, Exception): + raise outcome + return outcome + + with patch("app.services.pr_watcher.AgentModel") as mock_agents, \ + patch("app.services.pr_watcher.WatchedRepoModel") as mock_repos, \ + patch("app.services.pr_watcher.GatewayService") as mock_gw, \ + patch("app.services.pr_watcher.ReviewedPRModel") as mock_rev: + mock_agents.list_running_by_role.return_value = [agent] + mock_repos.list_all.return_value = [repo_bad, repo_good] + mock_gw.list_pull_requests = AsyncMock(side_effect=fake_list_prs) + mock_rev.exists.return_value = False + + dispatcher = MagicMock() + dispatcher.dispatch_task = AsyncMock(return_value={"accepted": True}) + watcher = PRWatcher(dispatcher=dispatcher) + watcher._first_tick_done = True + await watcher.tick() + + # The good repo's PR still got dispatched despite the broken one + # crashing first. + dispatcher.dispatch_task.assert_awaited_once() + kwargs = dispatcher.dispatch_task.call_args.kwargs + assert kwargs["metadata"]["repo"] == "ok" + + @pytest.mark.asyncio + async def test_first_tick_ignores_stale_prs(self): + """On the first tick after boot, PRs older than 30 min are skipped + so the platform doesn't backlog-review accumulated history.""" + from app.services.pr_watcher import PRWatcher + + stale = _pr(1, created_at=( + datetime.now(timezone.utc) - timedelta(hours=2) + ).isoformat().replace("+00:00", "Z")) + fresh = _pr(2, created_at=( + datetime.now(timezone.utc) - timedelta(minutes=5) + ).isoformat().replace("+00:00", "Z")) + + with patch("app.services.pr_watcher.AgentModel") as mock_agents, \ + patch("app.services.pr_watcher.WatchedRepoModel") as mock_repos, \ + patch("app.services.pr_watcher.GatewayService") as mock_gw, \ + patch("app.services.pr_watcher.ReviewedPRModel") as mock_rev: + mock_agents.list_running_by_role.return_value = [_running_agent()] + mock_repos.list_all.return_value = [_watched_row()] + mock_gw.list_pull_requests = AsyncMock( + return_value={"status": 200, "data": [stale, fresh]} + ) + mock_rev.exists.return_value = False + + dispatcher = MagicMock() + dispatcher.dispatch_task = AsyncMock(return_value={"accepted": True}) + watcher = PRWatcher(dispatcher=dispatcher) + watcher._started_at = datetime.now(timezone.utc) + # leave _first_tick_done as False — this is the boot tick + await watcher.tick() + + # Only the fresh PR was dispatched. + assert dispatcher.dispatch_task.await_count == 1 + kwargs = dispatcher.dispatch_task.call_args.kwargs + assert kwargs["metadata"]["pr_number"] == 2 + + @pytest.mark.asyncio + async def test_steady_state_tick_reviews_old_prs(self): + """After the first tick, age doesn't matter — anything new gets reviewed.""" + from app.services.pr_watcher import PRWatcher + + old = _pr(1, created_at=( + datetime.now(timezone.utc) - timedelta(hours=24) + ).isoformat().replace("+00:00", "Z")) + + with patch("app.services.pr_watcher.AgentModel") as mock_agents, \ + patch("app.services.pr_watcher.WatchedRepoModel") as mock_repos, \ + patch("app.services.pr_watcher.GatewayService") as mock_gw, \ + patch("app.services.pr_watcher.ReviewedPRModel") as mock_rev: + mock_agents.list_running_by_role.return_value = [_running_agent()] + mock_repos.list_all.return_value = [_watched_row()] + mock_gw.list_pull_requests = AsyncMock( + return_value={"status": 200, "data": [old]} + ) + mock_rev.exists.return_value = False + + dispatcher = MagicMock() + dispatcher.dispatch_task = AsyncMock(return_value={"accepted": True}) + watcher = PRWatcher(dispatcher=dispatcher) + watcher._first_tick_done = True # past the boot gate + await watcher.tick() + + dispatcher.dispatch_task.assert_awaited_once() + + @pytest.mark.asyncio + async def test_no_running_agents_short_circuits(self): + """If no Code Review Engineer is running, we don't even query watched_repos.""" + from app.services.pr_watcher import PRWatcher + + with patch("app.services.pr_watcher.AgentModel") as mock_agents, \ + patch("app.services.pr_watcher.WatchedRepoModel") as mock_repos: + mock_agents.list_running_by_role.return_value = [] + mock_repos.list_all = MagicMock() + + dispatcher = MagicMock() + dispatcher.dispatch_task = AsyncMock() + watcher = PRWatcher(dispatcher=dispatcher) + watcher._first_tick_done = True + await watcher.tick() + + mock_repos.list_all.assert_not_called() + dispatcher.dispatch_task.assert_not_awaited() + + +class TestRunForever: + @pytest.mark.asyncio + async def test_run_forever_cancels_cleanly(self): + """A cancelled task should raise CancelledError out of run_forever.""" + from app.services.pr_watcher import PRWatcher + + watcher = PRWatcher(poll_interval=0.01) + # Replace tick with a no-op so the loop spins fast. + watcher.tick = AsyncMock(return_value=None) + + task = asyncio.create_task(watcher.run_forever()) + await asyncio.sleep(0.05) + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + assert watcher.tick.await_count >= 1 diff --git a/backend/tests/test_watched_repos.py b/backend/tests/test_watched_repos.py new file mode 100644 index 0000000..0b32663 --- /dev/null +++ b/backend/tests/test_watched_repos.py @@ -0,0 +1,156 @@ +"""Tests for the /agents/{id}/watched-repos router and WatchedRepoModel.""" + +from unittest.mock import patch + +from tests.conftest import _make_agent + + +def _make_watched(**overrides): + base = { + "id": "wr-001", + "agent_id": "agent-001", + "user_id": "user-001", + "owner": "octocat", + "repo": "hello-world", + "created_at": "2025-01-01T00:00:00+00:00", + } + base.update(overrides) + return base + + +class TestSubscribeRepo: + def test_subscribe_repo_success(self, authed_client): + client, user, fake_sb = authed_client + agent = _make_agent(role="code-review-engineer") + fake_sb.get_table("agents").set_select_result([agent]) + + with patch("app.routers.watched_repos.WatchedRepoModel.create") as mock_create: + mock_create.return_value = _make_watched() + resp = client.post( + f"/agents/{agent['id']}/watched-repos", + json={"owner": "octocat", "repo": "hello-world"}, + ) + + assert resp.status_code == 201 + assert resp.json()["owner"] == "octocat" + + def test_subscribe_repo_returns_409_on_cross_agent_conflict(self, authed_client): + """Two agents under the same user cannot watch the same repo (#28).""" + from app.models.watched_repo import WatchedRepoExists + + client, user, fake_sb = authed_client + agent = _make_agent(role="code-review-engineer") + fake_sb.get_table("agents").set_select_result([agent]) + + with patch("app.routers.watched_repos.WatchedRepoModel.create") as mock_create: + mock_create.side_effect = WatchedRepoExists("already watched") + resp = client.post( + f"/agents/{agent['id']}/watched-repos", + json={"owner": "octocat", "repo": "hello-world"}, + ) + + assert resp.status_code == 409 + assert "already watched" in resp.json()["detail"] + + def test_subscribe_repo_404_when_agent_not_owned(self, authed_client): + client, user, fake_sb = authed_client + # Agent owned by someone else. + other_agent = _make_agent(user_id="user-other") + fake_sb.get_table("agents").set_select_result([other_agent]) + + resp = client.post( + f"/agents/{other_agent['id']}/watched-repos", + json={"owner": "octocat", "repo": "hello-world"}, + ) + assert resp.status_code == 404 + + def test_subscribe_repo_requires_auth(self, client): + resp = client.post( + "/agents/agent-001/watched-repos", + json={"owner": "octocat", "repo": "hello-world"}, + ) + assert resp.status_code == 401 + + +class TestListWatchedRepos: + def test_list_success(self, authed_client): + client, user, fake_sb = authed_client + agent = _make_agent(role="code-review-engineer") + fake_sb.get_table("agents").set_select_result([agent]) + + with patch("app.routers.watched_repos.WatchedRepoModel.list_by_agent") as mock_list: + mock_list.return_value = [ + _make_watched(), + _make_watched(id="wr-002", repo="other"), + ] + resp = client.get(f"/agents/{agent['id']}/watched-repos") + + assert resp.status_code == 200 + assert len(resp.json()) == 2 + + +class TestUnsubscribeRepo: + def test_unsubscribe_success(self, authed_client): + client, user, fake_sb = authed_client + agent = _make_agent(role="code-review-engineer") + fake_sb.get_table("agents").set_select_result([agent]) + watched = _make_watched() + + with patch("app.routers.watched_repos.WatchedRepoModel.get_by_id") as mock_get, \ + patch("app.routers.watched_repos.WatchedRepoModel.delete") as mock_del: + mock_get.return_value = watched + mock_del.return_value = True + resp = client.delete( + f"/agents/{agent['id']}/watched-repos/{watched['id']}" + ) + + assert resp.status_code == 204 + mock_del.assert_called_once_with(watched["id"]) + + def test_unsubscribe_404_when_row_belongs_to_other_agent(self, authed_client): + """The watched_id must belong to this agent, not just exist.""" + client, user, fake_sb = authed_client + agent = _make_agent(role="code-review-engineer") + fake_sb.get_table("agents").set_select_result([agent]) + # Row from a different agent. + watched = _make_watched(agent_id="agent-other") + + with patch("app.routers.watched_repos.WatchedRepoModel.get_by_id") as mock_get: + mock_get.return_value = watched + resp = client.delete( + f"/agents/{agent['id']}/watched-repos/{watched['id']}" + ) + + assert resp.status_code == 404 + + +class TestWatchedRepoModelConflictTranslation: + """The model translates the PG unique-constraint violation into + WatchedRepoExists so the router can return 409.""" + + def test_duplicate_key_becomes_exists(self, fake_supabase): + from app.models.watched_repo import WatchedRepoExists, WatchedRepoModel + + table = fake_supabase.get_table("watched_repos") + table.mock.insert.return_value.execute.side_effect = Exception( + 'duplicate key value violates unique constraint "watched_repos_user_id_owner_repo_key"' + ) + + try: + WatchedRepoModel.create("agent-001", "user-001", "octocat", "hello") + except WatchedRepoExists: + return + raise AssertionError("expected WatchedRepoExists") + + def test_other_error_propagates(self, fake_supabase): + from app.models.watched_repo import WatchedRepoModel + + table = fake_supabase.get_table("watched_repos") + table.mock.insert.return_value.execute.side_effect = RuntimeError("connection refused") + + try: + WatchedRepoModel.create("agent-001", "user-001", "octocat", "hello") + except RuntimeError as exc: + assert "connection refused" in str(exc) + return + raise AssertionError("expected RuntimeError to propagate")