Skip to content

Commit 03e544a

Browse files
committed
fix(run[timeout]) Add timeout to legacy subprocess runner
why: Callers like vcspull sync need a wall-clock deadline on VCS subprocesses so one unresponsive repo (credential prompt, stalled network, or a command like `git remote show origin` that pipes thousands of lines through the busy-poll loop) cannot freeze a whole batch. The legacy run() path had no timeout support at all, making that class of hang impossible to recover from. what: - Add `timeout: float | None = None` kwarg to libvcs._internal.run.run. - Introduce _wait_with_deadline which uses selectors to avoid blocking stderr reads and enforces the deadline without busy-polling. - On timeout, SIGTERM the child, escalate to SIGKILL after a short grace, drain remaining output, and raise exc.CommandTimeoutError with the partial output so callers can diagnose what the process was doing. - timeout=None preserves the pre-existing code path byte-for-byte so no existing caller changes behaviour. - Tests cover: default/None behaviour, deadline firing within a bounded wall clock, partial stderr capture, and child reaping (no zombies).
1 parent 1e1d2ff commit 03e544a

2 files changed

Lines changed: 227 additions & 7 deletions

File tree

src/libvcs/_internal/run.py

Lines changed: 131 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@
1010

1111
from __future__ import annotations
1212

13+
import contextlib
1314
import datetime
1415
import logging
1516
import os
17+
import selectors
1618
import subprocess
1719
import sys
20+
import time
1821
import typing as t
1922
from collections.abc import Iterable, Mapping, MutableMapping, Sequence
2023

@@ -147,6 +150,7 @@ def run(
147150
log_in_real_time: bool = False,
148151
check_returncode: bool = True,
149152
callback: ProgressCallbackProtocol | None = None,
153+
timeout: float | None = None,
150154
) -> str:
151155
"""Run a command.
152156
@@ -186,6 +190,13 @@ def progress_cb(output, timestamp):
186190
sys.stdout.flush()
187191
run(['git', 'pull'], callback=progress_cb)
188192
193+
timeout : float, optional
194+
Seconds to wait before terminating the subprocess. When the deadline is
195+
exceeded the process is sent ``SIGTERM`` (then ``SIGKILL`` after a short
196+
grace period) and :class:`libvcs.exc.CommandTimeoutError` is raised with
197+
any output collected so far. ``None`` (default) disables the deadline and
198+
preserves the legacy behaviour of blocking until the process exits.
199+
189200
Upcoming changes
190201
----------------
191202
When minimum python >= 3.10, pipesize: int = -1 will be added after umask.
@@ -240,13 +251,21 @@ def progress_cb(output: t.AnyStr, timestamp: datetime.datetime) -> None:
240251
# connected to a pseudo-TTY, which would require significant changes
241252
# to how subprocess execution is handled.
242253

243-
while code is None:
244-
code = proc.poll()
254+
if timeout is None:
255+
while code is None:
256+
code = proc.poll()
245257

246-
if callback and callable(callback) and proc.stderr is not None:
247-
line = console_to_str(proc.stderr.read(128))
248-
if line:
249-
callback(output=line, timestamp=datetime.datetime.now())
258+
if callback and callable(callback) and proc.stderr is not None:
259+
line = console_to_str(proc.stderr.read(128))
260+
if line:
261+
callback(output=line, timestamp=datetime.datetime.now())
262+
else:
263+
code = _wait_with_deadline(
264+
proc,
265+
deadline=time.monotonic() + timeout,
266+
callback=callback,
267+
cmd=_stringify_command(normalized_args),
268+
)
250269
if callback and callable(callback):
251270
callback(output="\r", timestamp=datetime.datetime.now())
252271

@@ -272,3 +291,109 @@ def progress_cb(output: t.AnyStr, timestamp: datetime.datetime) -> None:
272291
cmd=_stringify_command(normalized_args),
273292
)
274293
return output
294+
295+
296+
#: Grace period after ``terminate()`` before escalating to ``kill()``.
297+
_TIMEOUT_KILL_GRACE_SECONDS = 0.5
298+
299+
#: Upper bound on the ``selectors.select()`` wait inside the deadline loop.
300+
_TIMEOUT_POLL_INTERVAL_SECONDS = 0.1
301+
302+
303+
def _wait_with_deadline(
304+
proc: subprocess.Popen[bytes],
305+
*,
306+
deadline: float,
307+
callback: ProgressCallbackProtocol | None,
308+
cmd: str | list[str],
309+
) -> int:
310+
"""Wait for ``proc`` to exit, enforcing a wall-clock deadline.
311+
312+
Uses :mod:`selectors` so the wait unblocks when stderr is readable, when the
313+
child exits, or when the per-iteration poll interval expires -- whichever
314+
comes first. When the deadline is exceeded, the subprocess is reaped and
315+
:class:`libvcs.exc.CommandTimeoutError` is raised with whatever output was
316+
collected before the timeout.
317+
"""
318+
sel = selectors.DefaultSelector()
319+
stderr_stream = proc.stderr
320+
if stderr_stream is not None:
321+
# Non-blocking reads so the selector loop never stalls on a short read
322+
# from a stuck child (e.g. git waiting on network or credentials).
323+
try:
324+
os.set_blocking(stderr_stream.fileno(), False)
325+
except (OSError, ValueError):
326+
stderr_stream = None
327+
else:
328+
sel.register(stderr_stream, selectors.EVENT_READ)
329+
330+
partial_chunks: list[bytes] = []
331+
try:
332+
while True:
333+
code = proc.poll()
334+
if code is not None:
335+
return code
336+
337+
remaining = deadline - time.monotonic()
338+
if remaining <= 0:
339+
_terminate_process(proc)
340+
drained = _drain_stream(stderr_stream) + _drain_stream(proc.stdout)
341+
message = console_to_str(b"".join(partial_chunks) + drained)
342+
raise exc.CommandTimeoutError(
343+
output=message,
344+
returncode=proc.returncode,
345+
cmd=cmd,
346+
)
347+
348+
wait = min(_TIMEOUT_POLL_INTERVAL_SECONDS, remaining)
349+
events = sel.select(timeout=wait) if stderr_stream is not None else ()
350+
if not events:
351+
continue
352+
353+
for key, _mask in events:
354+
if key.fileobj is not stderr_stream:
355+
continue
356+
try:
357+
chunk = stderr_stream.read(128)
358+
except (BlockingIOError, OSError):
359+
chunk = b""
360+
if not chunk:
361+
continue
362+
partial_chunks.append(chunk)
363+
if callback is not None and callable(callback):
364+
callback(
365+
output=console_to_str(chunk),
366+
timestamp=datetime.datetime.now(),
367+
)
368+
finally:
369+
sel.close()
370+
371+
372+
def _terminate_process(proc: subprocess.Popen[bytes]) -> None:
373+
"""Terminate ``proc`` gracefully, falling back to ``kill`` on the grace."""
374+
if proc.poll() is not None:
375+
return
376+
try:
377+
proc.terminate()
378+
except (OSError, ProcessLookupError):
379+
return
380+
try:
381+
proc.wait(timeout=_TIMEOUT_KILL_GRACE_SECONDS)
382+
except subprocess.TimeoutExpired:
383+
with contextlib.suppress(OSError, ProcessLookupError):
384+
proc.kill()
385+
# If the child is still unreachable after SIGKILL, bail rather than
386+
# block forever -- we've already signalled the user-facing timeout.
387+
with contextlib.suppress(subprocess.TimeoutExpired):
388+
proc.wait(timeout=_TIMEOUT_KILL_GRACE_SECONDS)
389+
390+
391+
def _drain_stream(stream: t.IO[bytes] | None) -> bytes:
392+
"""Best-effort read of any remaining bytes from a subprocess pipe."""
393+
if stream is None:
394+
return b""
395+
try:
396+
data = stream.read() or b""
397+
except (BlockingIOError, OSError, ValueError):
398+
data = b""
399+
return data

tests/_internal/test_run.py

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,15 @@
33
from __future__ import annotations
44

55
import pathlib
6+
import subprocess
7+
import sys
8+
import time
9+
import typing as t
610

7-
from libvcs._internal.run import _normalize_command_args
11+
import pytest
12+
13+
from libvcs import exc
14+
from libvcs._internal.run import _normalize_command_args, run
815

916

1017
def test_normalize_command_args_keeps_scalar_string() -> None:
@@ -28,3 +35,91 @@ def test_normalize_command_args_coerces_pathlike() -> None:
2835

2936
assert _normalize_command_args(path) == ["example"]
3037
assert _normalize_command_args([path, "status"]) == ["example", "status"]
38+
39+
40+
def test_run_without_timeout_matches_legacy_behavior() -> None:
41+
"""Leaving ``timeout=None`` preserves the pre-timeout call semantics."""
42+
output = run([sys.executable, "-c", "print('hello'); print('world')"])
43+
44+
assert "hello" in output
45+
assert "world" in output
46+
47+
48+
def test_run_raises_command_timeout_when_deadline_exceeded() -> None:
49+
"""A command sleeping past ``timeout`` raises ``CommandTimeoutError`` fast."""
50+
started = time.monotonic()
51+
52+
with pytest.raises(exc.CommandTimeoutError) as excinfo:
53+
run(
54+
[sys.executable, "-c", "import time; time.sleep(10)"],
55+
timeout=0.3,
56+
)
57+
58+
elapsed = time.monotonic() - started
59+
60+
# Upper bound keeps the test honest: the deadline must actually fire, not
61+
# fall through to the legacy unbounded wait.
62+
assert elapsed < 2.5, f"timeout took too long: {elapsed:.2f}s"
63+
assert isinstance(excinfo.value, exc.CommandError)
64+
65+
66+
def test_run_timeout_captures_partial_stderr_output() -> None:
67+
"""Output produced before the timeout is preserved on ``CommandTimeoutError``."""
68+
script = (
69+
"import sys, time;"
70+
"sys.stderr.write('first\\n');"
71+
"sys.stderr.flush();"
72+
"time.sleep(10)"
73+
)
74+
75+
with pytest.raises(exc.CommandTimeoutError) as excinfo:
76+
run(
77+
[sys.executable, "-c", script],
78+
timeout=0.5,
79+
log_in_real_time=True,
80+
)
81+
82+
# The pre-timeout stderr chunk is forwarded to the callback and stashed in
83+
# the exception's ``output`` so callers can diagnose what the process was
84+
# doing before it was killed.
85+
assert "first" in excinfo.value.output
86+
87+
88+
def test_run_timeout_reaps_child_process() -> None:
89+
"""Timed-out processes are terminated; no zombies remain in the group."""
90+
script = "import time; time.sleep(10)"
91+
92+
captured: dict[str, t.Any] = {}
93+
original_popen = subprocess.Popen
94+
95+
def _capturing_popen(*args: t.Any, **kwargs: t.Any) -> t.Any:
96+
proc = original_popen(*args, **kwargs)
97+
captured["proc"] = proc
98+
return proc
99+
100+
# Attribute replacement keeps the test narrow -- we just need a handle on
101+
# the Popen that ``run`` created so we can assert the child was actually
102+
# reaped rather than abandoned (no zombie left behind).
103+
subprocess.Popen = _capturing_popen # type: ignore[misc,assignment]
104+
try:
105+
with pytest.raises(exc.CommandTimeoutError):
106+
run([sys.executable, "-c", script], timeout=0.3)
107+
finally:
108+
subprocess.Popen = original_popen # type: ignore[misc]
109+
110+
proc = captured["proc"]
111+
# ``returncode`` is only populated once ``wait`` succeeds, so a populated
112+
# value proves the timeout branch both killed and reaped the child.
113+
assert proc.returncode is not None
114+
115+
116+
def test_run_without_timeout_completes_successfully() -> None:
117+
"""Regression guard: quick commands return output, no TimeoutError, no hang."""
118+
output = run([sys.executable, "-c", "print('ok')"])
119+
assert "ok" in output
120+
121+
122+
def test_run_timeout_none_is_the_default() -> None:
123+
"""Omitting ``timeout`` is equivalent to ``timeout=None``."""
124+
output = run([sys.executable, "-c", "print('default')"], timeout=None)
125+
assert "default" in output

0 commit comments

Comments
 (0)