Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion backend/app/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import logging
import os
from contextlib import asynccontextmanager

import httpx
from fastapi import FastAPI, Request
Expand All @@ -9,18 +11,47 @@
from slowapi.errors import RateLimitExceeded

from app.ratelimit import limiter
from app.routers import users, agents, gateway, credentials, tasks, roles, auth, chat
from app.routers import (
users, agents, gateway, credentials, tasks, roles, auth, chat,
watched_repos,
)
from app.services.pr_watcher import PRWatcher

logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s %(name)s — %(message)s",
)
logger = logging.getLogger("agentos")


@asynccontextmanager
async def lifespan(app: FastAPI):
"""Start the PR watcher background task; cancel it cleanly on shutdown.

Disabled when PR_WATCHER_ENABLED=false (test suites, ad-hoc debug runs);
otherwise on by default.
"""
watcher_task: asyncio.Task | None = None
if os.getenv("PR_WATCHER_ENABLED", "true").lower() != "false":
watcher = PRWatcher()
watcher_task = asyncio.create_task(watcher.run_forever())

try:
yield
finally:
if watcher_task is not None:
watcher_task.cancel()
try:
await watcher_task
except asyncio.CancelledError:
pass


app = FastAPI(
title="AgentOS Platform",
description="Multi-tenant platform for hiring AI employees",
version="0.1.0",
lifespan=lifespan,
)

# ── Rate limiting ────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -70,6 +101,7 @@ async def unhandled_error_handler(request: Request, exc: Exception):
app.include_router(auth.router)
app.include_router(auth.compat_router)
app.include_router(chat.router)
app.include_router(watched_repos.router)


@app.get("/health")
Expand Down
18 changes: 18 additions & 0 deletions backend/app/models/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@ def list_by_user(user_id: str) -> list[dict]:
)
return result.data

@staticmethod
def list_running_by_role(role: str) -> list[dict]:
"""Return every running agent for a given role.

Used by the pr_watcher to enumerate Code Review Engineers it should
poll on behalf of. Filters on status='running' so stopped/errored
agents drop out of the watcher loop automatically.
"""
result = (
get_supabase()
.table(TABLE)
.select("*")
.eq("role", role)
.eq("status", "running")
.execute()
)
return result.data

@staticmethod
def update(agent_id: str, **fields) -> dict | None:
result = (
Expand Down
93 changes: 93 additions & 0 deletions backend/app/models/watched_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Repos a Code Review Engineer is subscribed to.

The platform's pr_watcher iterates rows in this table on a 120s cadence
and dispatches review tasks for unreviewed open PRs. Cross-agent uniqueness
on (user_id, owner, repo) means two agents under the same user cannot watch
the same repo — this prevents the double-review problem (#28).
"""

from app.database import get_supabase

TABLE = "watched_repos"


class WatchedRepoExists(Exception):
"""Raised when a (user_id, owner, repo) row already exists.

The unique constraint at the DB level is the real guard; this exception
lets the router translate the violation into a 409 with a clean message.
"""


class WatchedRepoModel:
@staticmethod
def create(agent_id: str, user_id: str, owner: str, repo: str) -> dict:
"""Subscribe an agent to a repo.

Raises WatchedRepoExists if another agent under the same user already
watches this (owner, repo). Surfaced as a 409 in the router.
"""
data = {
"agent_id": agent_id,
"user_id": user_id,
"owner": owner,
"repo": repo,
}
try:
result = get_supabase().table(TABLE).insert(data).execute()
except Exception as exc: # noqa: BLE001 — supabase wraps PG errors
# The unique constraint violation comes back as a postgres error
# with code 23505. The supabase client surfaces it as a generic
# exception; check the message for the conflict marker.
msg = str(exc)
if "duplicate key" in msg or "23505" in msg or "watched_repos" in msg.lower():
raise WatchedRepoExists(
f"This repo is already watched by another agent under user {user_id}"
) from exc
raise
return result.data[0]

@staticmethod
def list_by_agent(agent_id: str) -> list[dict]:
result = (
get_supabase()
.table(TABLE)
.select("*")
.eq("agent_id", agent_id)
.order("created_at", desc=True)
.execute()
)
return result.data

@staticmethod
def list_all() -> list[dict]:
"""Return every watched_repos row across every agent.

Used by the pr_watcher tick — one query per cycle, in-process group
by agent_id. The platform's hackathon scale is well within what a
full-table scan can handle; revisit if we ever cross ~10k rows.
"""
result = get_supabase().table(TABLE).select("*").execute()
return result.data

@staticmethod
def get_by_id(watched_id: str) -> dict | None:
result = (
get_supabase()
.table(TABLE)
.select("*")
.eq("id", watched_id)
.execute()
)
return result.data[0] if result.data else None

@staticmethod
def delete(watched_id: str) -> bool:
result = (
get_supabase()
.table(TABLE)
.delete()
.eq("id", watched_id)
.execute()
)
return len(result.data) > 0
77 changes: 77 additions & 0 deletions backend/app/routers/watched_repos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""User-side endpoints for subscribing agents to repos.

The pr_watcher reads these rows to decide which (agent, owner, repo)
combinations to poll on each tick. Cross-agent uniqueness is enforced
at the DB level so we surface the conflict here as a 409.
"""

from fastapi import APIRouter, Depends, HTTPException

from app.auth import get_current_user
from app.models.agent import AgentModel
from app.models.watched_repo import WatchedRepoExists, WatchedRepoModel
from app.schemas.watched_repo import WatchedRepoCreate, WatchedRepoResponse

router = APIRouter(prefix="/agents", tags=["watched-repos"])


def _resolve_owned_agent(agent_id: str, user_id: str) -> dict:
"""Fetch the agent and require it to belong to the caller.

404 hides the existence of agents owned by other users — same shape
as the existing /agents/{id} handler.
"""
agent = AgentModel.get_by_id(agent_id)
if not agent or agent["user_id"] != user_id:
raise HTTPException(404, "Agent not found")
return agent


@router.post(
"/{agent_id}/watched-repos",
response_model=WatchedRepoResponse,
status_code=201,
)
def subscribe_repo(
agent_id: str,
payload: WatchedRepoCreate,
user: dict = Depends(get_current_user),
):
_resolve_owned_agent(agent_id, user["id"])
try:
return WatchedRepoModel.create(
agent_id=agent_id,
user_id=user["id"],
owner=payload.owner,
repo=payload.repo,
)
except WatchedRepoExists as exc:
raise HTTPException(409, str(exc))


@router.get(
"/{agent_id}/watched-repos",
response_model=list[WatchedRepoResponse],
)
def list_watched_repos(
agent_id: str,
user: dict = Depends(get_current_user),
):
_resolve_owned_agent(agent_id, user["id"])
return WatchedRepoModel.list_by_agent(agent_id)


@router.delete(
"/{agent_id}/watched-repos/{watched_id}",
status_code=204,
)
def unsubscribe_repo(
agent_id: str,
watched_id: str,
user: dict = Depends(get_current_user),
):
_resolve_owned_agent(agent_id, user["id"])
row = WatchedRepoModel.get_by_id(watched_id)
if not row or row["agent_id"] != agent_id:
raise HTTPException(404, "Watched repo not found")
WatchedRepoModel.delete(watched_id)
17 changes: 17 additions & 0 deletions backend/app/schemas/watched_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from datetime import datetime

from pydantic import BaseModel


class WatchedRepoCreate(BaseModel):
owner: str
repo: str


class WatchedRepoResponse(BaseModel):
id: str
agent_id: str
user_id: str
owner: str
repo: str
created_at: datetime
Loading