Skip to content

Commit 4d76844

Browse files
authored
feat: Serve resource usage metrics (#58)
* feat(resource-usage): Add lightweight resource metrics server * fix(resource-usage): Handle None values for CPU count and usage percent * fix(resource-usage): Subtract inactive fie from current memory * refactor(resource-usage): Replace print statements with logging * refactor(resource-usage): Simplify parsing logic and improve early returns
1 parent 6de286f commit 4d76844

2 files changed

Lines changed: 288 additions & 0 deletions

File tree

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
#!/usr/bin/env python3
2+
"""Lightweight resource metrics server."""
3+
4+
import json
5+
import logging
6+
import os
7+
import threading
8+
import time
9+
from functools import partial
10+
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
11+
from pathlib import Path
12+
from typing import Any, Optional
13+
14+
try:
15+
import psutil
16+
17+
HAS_PSUTIL = True
18+
except ImportError:
19+
HAS_PSUTIL = False
20+
21+
22+
class ResourceMonitor:
23+
"""Monitors system resources via cgroups or psutil."""
24+
25+
def __init__(self, root_path: str = "/sys/fs/cgroup") -> None:
26+
self.root = Path(root_path)
27+
self.backend = self._detect_backend()
28+
self._last_cpu_sec: Optional[float] = None
29+
self._last_time: Optional[float] = None
30+
self._lock = threading.Lock()
31+
32+
def _detect_backend(self) -> str:
33+
if (self.root / "cgroup.controllers").exists():
34+
return "cgroupv2"
35+
if (self.root / "memory/memory.limit_in_bytes").exists():
36+
return "cgroupv1"
37+
if HAS_PSUTIL:
38+
return "psutil"
39+
return "none"
40+
41+
def _read_file(self, path: Path) -> Optional[str]:
42+
try:
43+
return path.read_text().strip()
44+
except (FileNotFoundError, PermissionError, OSError):
45+
return None
46+
47+
def get_memory(self) -> tuple[int, Optional[int]]:
48+
"""Returns (used_bytes, limit_bytes). Limit is None if unlimited."""
49+
if self.backend == "cgroupv2":
50+
current = self._parse_int(self.root / "memory.current", 0)
51+
inactive_file = self._parse_memory_stat("inactive_file")
52+
used = current - inactive_file
53+
limit_str = self._read_file(self.root / "memory.max")
54+
limit = self._parse_limit(limit_str, "max")
55+
return used, limit
56+
57+
if self.backend == "cgroupv1":
58+
used = self._parse_int(self.root / "memory/memory.usage_in_bytes", 0)
59+
limit_str = self._read_file(self.root / "memory/memory.limit_in_bytes")
60+
limit = self._parse_limit(limit_str, threshold=1_000_000_000_000_000)
61+
return used, limit
62+
63+
if self.backend == "psutil":
64+
proc = psutil.Process()
65+
return proc.memory_info().rss, psutil.virtual_memory().total
66+
67+
return 0, None
68+
69+
def get_cpu(self) -> tuple[Optional[float], Optional[float]]:
70+
"""Returns (usage_percent, limit_cores)."""
71+
usage_sec = self._get_cpu_seconds()
72+
percent = self._calc_percent(usage_sec)
73+
limit = self._get_cpu_limit()
74+
return percent, limit
75+
76+
def _parse_int(self, path: Path, default: int = 0) -> int:
77+
content = self._read_file(path)
78+
if not content:
79+
return default
80+
try:
81+
return int(content)
82+
except ValueError:
83+
return default
84+
85+
def _parse_limit(
86+
self,
87+
value: Optional[str],
88+
unlimited_marker: Optional[str] = None,
89+
threshold: Optional[int] = None,
90+
) -> Optional[int]:
91+
if not value:
92+
return None
93+
if unlimited_marker and value == unlimited_marker:
94+
return None
95+
try:
96+
parsed = int(value)
97+
if threshold and parsed >= threshold:
98+
return None
99+
return parsed
100+
except ValueError:
101+
return None
102+
103+
def _parse_memory_stat(self, key: str) -> int:
104+
"""Parse a value from memory.stat file."""
105+
content = self._read_file(self.root / "memory.stat")
106+
if not content:
107+
return 0
108+
for line in content.splitlines():
109+
if not line.startswith(key):
110+
continue
111+
parts = line.split()
112+
if len(parts) < 2:
113+
continue
114+
try:
115+
return int(parts[1])
116+
except ValueError:
117+
continue
118+
return 0
119+
120+
def _get_cpu_seconds(self) -> float:
121+
if self.backend == "cgroupv2":
122+
content = self._read_file(self.root / "cpu.stat")
123+
if not content:
124+
return 0.0
125+
for line in content.splitlines():
126+
if not line.startswith("usage_usec"):
127+
continue
128+
parts = line.split()
129+
if len(parts) >= 2:
130+
return int(parts[1]) / 1_000_000.0
131+
return 0.0
132+
133+
if self.backend == "cgroupv1":
134+
content = self._read_file(self.root / "cpuacct/cpuacct.usage")
135+
if not content:
136+
return 0.0
137+
return int(content) / 1_000_000_000.0
138+
139+
if self.backend == "psutil":
140+
times = psutil.Process().cpu_times()
141+
return times.user + times.system
142+
143+
return 0.0
144+
145+
def _get_cpu_limit(self) -> Optional[float]:
146+
if self.backend == "cgroupv2":
147+
content = self._read_file(self.root / "cpu.max")
148+
if not content:
149+
return None
150+
parts = content.split()
151+
if len(parts) < 2 or parts[0] == "max":
152+
return None
153+
try:
154+
return int(parts[0]) / int(parts[1])
155+
except (ValueError, ZeroDivisionError):
156+
return None
157+
158+
if self.backend == "cgroupv1":
159+
quota = self._parse_int(self.root / "cpu/cpu.cfs_quota_us", -1)
160+
period = self._parse_int(self.root / "cpu/cpu.cfs_period_us", 0)
161+
if quota <= 0 or period <= 0:
162+
return None
163+
return quota / period
164+
165+
if self.backend == "psutil":
166+
cpu_count = psutil.cpu_count(logical=True)
167+
return float(cpu_count) if cpu_count is not None else None
168+
169+
return None
170+
171+
def _calc_percent(self, current_sec: float) -> Optional[float]:
172+
now = time.monotonic()
173+
with self._lock:
174+
if self._last_cpu_sec is None or self._last_time is None:
175+
self._last_cpu_sec = current_sec
176+
self._last_time = now
177+
return None
178+
179+
time_delta = now - self._last_time
180+
cpu_delta = current_sec - self._last_cpu_sec
181+
self._last_cpu_sec = current_sec
182+
self._last_time = now
183+
184+
if time_delta <= 0:
185+
return 0.0
186+
return (cpu_delta / time_delta) * 100.0
187+
188+
189+
class MetricsHandler(BaseHTTPRequestHandler):
190+
"""HTTP handler for resource metrics."""
191+
192+
def __init__(self, monitor: ResourceMonitor, *args: Any, **kwargs: Any) -> None:
193+
self.monitor = monitor
194+
super().__init__(*args, **kwargs)
195+
196+
def do_GET(self) -> None:
197+
if self.path in ("/", "/resource-usage"):
198+
self._send_metrics()
199+
elif self.path == "/health":
200+
self._send_text(200, "ok")
201+
else:
202+
self.send_error(404)
203+
204+
def _send_metrics(self) -> None:
205+
mem_used, mem_limit = self.monitor.get_memory()
206+
cpu_percent, cpu_limit = self.monitor.get_cpu()
207+
208+
env_limit = os.environ.get("MEM_LIMIT")
209+
if env_limit:
210+
try:
211+
mem_limit = int(env_limit)
212+
except ValueError:
213+
pass
214+
215+
mem_util = None
216+
if mem_limit and mem_limit > 0:
217+
mem_util = round((mem_used / mem_limit) * 100, 2)
218+
219+
cpu_sat = None
220+
if cpu_percent is not None and cpu_limit:
221+
cpu_sat = round(cpu_percent / cpu_limit, 2)
222+
223+
data = {
224+
"meta": {"backend": self.monitor.backend, "timestamp": time.time()},
225+
"memory": {
226+
"used_bytes": mem_used,
227+
"limit_bytes": mem_limit,
228+
"usage_percent": mem_util,
229+
},
230+
"cpu": {
231+
"usage_percent": (
232+
round(cpu_percent, 2) if cpu_percent is not None else None
233+
),
234+
"limit_cores": cpu_limit,
235+
"saturation_percent": cpu_sat,
236+
},
237+
}
238+
self._send_json(200, data)
239+
240+
def _send_json(self, code: int, data: dict[str, Any]) -> None:
241+
body = json.dumps(data, indent=2).encode()
242+
self._send_response(code, body, "application/json")
243+
244+
def _send_text(self, code: int, text: str) -> None:
245+
self._send_response(code, text.encode(), "text/plain")
246+
247+
def _send_response(self, code: int, body: bytes, content_type: str) -> None:
248+
self.send_response(code)
249+
self.send_header("Content-Type", content_type)
250+
self.send_header("Content-Length", str(len(body)))
251+
self.send_header("X-Content-Type-Options", "nosniff")
252+
self.end_headers()
253+
self.wfile.write(body)
254+
255+
def log_message(self, msg_format: str, *args: Any) -> None:
256+
logging.info(f"{self.address_string()} - {msg_format % args}")
257+
258+
259+
def main() -> None:
260+
logging.basicConfig(
261+
level=logging.INFO,
262+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
263+
)
264+
265+
port = int(os.environ.get("RESOURCE_USAGE_METRICS_PORT", 9104))
266+
monitor = ResourceMonitor()
267+
monitor.get_cpu() # Initialize CPU baseline
268+
269+
handler = partial(MetricsHandler, monitor)
270+
server = ThreadingHTTPServer(("0.0.0.0", port), handler)
271+
272+
logging.info(f"Starting server on port {port} (backend: {monitor.backend})")
273+
274+
try:
275+
server.serve_forever()
276+
except KeyboardInterrupt:
277+
logging.info("Shutting down...")
278+
server.server_close()
279+
280+
281+
if __name__ == "__main__":
282+
main()

installer/__main__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,12 @@ def bootstrap():
382382
)
383383
all_actions.append(ExtraServerSpec(command=["python", prometheus_script]))
384384

385+
# Add resource usage server
386+
resource_usage_script = os.path.join(
387+
config_directory_path, "scripts", "resource_usage.py"
388+
)
389+
all_actions.append(ExtraServerSpec(command=["python", resource_usage_script]))
390+
385391
# Execute all actions via the unified registry
386392
from .module.executor import run_actions_in_installer_env
387393

0 commit comments

Comments
 (0)