-
-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathbinaries.py
More file actions
92 lines (75 loc) · 3.18 KB
/
binaries.py
File metadata and controls
92 lines (75 loc) · 3.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""Binaries router for the Memory Tracker API."""
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
import logging
from .. import schemas, crud
from ..database import get_database
from ..logging_config import get_logger
router = APIRouter(prefix="/api", tags=["binaries"])
@router.get("/binaries", response_model=List[schemas.Binary])
async def get_binaries(db: AsyncSession = Depends(get_database)):
logger = get_logger("api.binaries")
logger.info("Fetching all binaries")
try:
binaries = await crud.get_binaries(db)
logger.info("Successfully retrieved binaries", extra={"count": len(binaries)})
return [
schemas.Binary(
id=binary.id,
name=binary.name,
flags=binary.flags,
description=binary.description,
color=binary.color,
icon=binary.icon,
display_order=binary.display_order,
)
for binary in binaries
]
except Exception as e:
logger.error("Failed to fetch binaries", extra={"error": str(e)})
raise HTTPException(status_code=500, detail="Failed to fetch binaries")
@router.get("/binaries/{binary_id}", response_model=schemas.Binary)
async def get_binary(binary_id: str, db: AsyncSession = Depends(get_database)):
logger = logging.getLogger(__name__)
logger.info(f"Fetching binary: {binary_id}")
binary = await crud.get_binary_by_id(db, binary_id=binary_id)
if binary is None:
logger.warning(f"Binary not found: {binary_id}")
raise HTTPException(status_code=404, detail="Binary not found")
logger.info(f"Found binary: {binary.name} with {len(binary.flags)} flags")
return schemas.Binary(
id=binary.id,
name=binary.name,
flags=binary.flags,
description=binary.description,
color=binary.color,
icon=binary.icon,
display_order=binary.display_order,
)
@router.get("/binaries/{binary_id}/environments", response_model=List[dict])
async def get_environments_for_binary(
binary_id: str, db: AsyncSession = Depends(get_database)
):
binary = await crud.get_binary_by_id(db, binary_id=binary_id)
if binary is None:
raise HTTPException(status_code=404, detail="Binary not found")
environments = await crud.get_environments_for_binary(db, binary_id=binary_id)
return environments
@router.get(
"/binaries/{binary_id}/environments/{environment_id}/commits",
response_model=List[dict],
)
async def get_commits_for_binary_and_environment(
binary_id: str, environment_id: str, db: AsyncSession = Depends(get_database)
):
binary = await crud.get_binary_by_id(db, binary_id=binary_id)
if binary is None:
raise HTTPException(status_code=404, detail="Binary not found")
environment = await crud.get_environment_by_id(db, environment_id=environment_id)
if environment is None:
raise HTTPException(status_code=404, detail="Environment not found")
commits = await crud.get_commits_for_binary_and_environment(
db, binary_id=binary_id, environment_id=environment_id
)
return commits