1010
1111from __future__ import annotations
1212
13+ import contextlib
1314import datetime
1415import logging
1516import os
17+ import selectors
1618import subprocess
1719import sys
20+ import time
1821import typing as t
1922from collections .abc import Iterable , Mapping , MutableMapping , Sequence
2023
@@ -147,6 +150,7 @@ def run(
147150 log_in_real_time : bool = False ,
148151 check_returncode : bool = True ,
149152 callback : ProgressCallbackProtocol | None = None ,
153+ timeout : float | None = None ,
150154) -> str :
151155 """Run a command.
152156
@@ -186,6 +190,13 @@ def progress_cb(output, timestamp):
186190 sys.stdout.flush()
187191 run(['git', 'pull'], callback=progress_cb)
188192
193+ timeout : float, optional
194+ Seconds to wait before terminating the subprocess. When the deadline is
195+ exceeded the process is sent ``SIGTERM`` (then ``SIGKILL`` after a short
196+ grace period) and :class:`libvcs.exc.CommandTimeoutError` is raised with
197+ any output collected so far. ``None`` (default) disables the deadline and
198+ preserves the legacy behaviour of blocking until the process exits.
199+
189200 Upcoming changes
190201 ----------------
191202 When minimum python >= 3.10, pipesize: int = -1 will be added after umask.
@@ -240,28 +251,49 @@ def progress_cb(output: t.AnyStr, timestamp: datetime.datetime) -> None:
240251 # connected to a pseudo-TTY, which would require significant changes
241252 # to how subprocess execution is handled.
242253
243- while code is None :
244- code = proc .poll ()
254+ timeout_stdout : bytes | None = None
255+ timeout_stderr : bytes | None = None
256+ if timeout is None :
257+ while code is None :
258+ code = proc .poll ()
245259
246- if callback and callable (callback ) and proc .stderr is not None :
247- line = console_to_str (proc .stderr .read (128 ))
248- if line :
249- callback (output = line , timestamp = datetime .datetime .now ())
260+ if callback and callable (callback ) and proc .stderr is not None :
261+ line = console_to_str (proc .stderr .read (128 ))
262+ if line :
263+ callback (output = line , timestamp = datetime .datetime .now ())
264+ else :
265+ code , timeout_stdout , timeout_stderr = _wait_with_deadline (
266+ proc ,
267+ deadline = time .monotonic () + timeout ,
268+ timeout = timeout ,
269+ callback = callback ,
270+ cmd = _stringify_command (normalized_args ),
271+ )
250272 if callback and callable (callback ):
251273 callback (output = "\r " , timestamp = datetime .datetime .now ())
252274
253275 if proc .stdout is not None :
276+ stdout_lines : list [bytes ] = (
277+ timeout_stdout .split (b"\n " )
278+ if timeout_stdout is not None
279+ else proc .stdout .readlines ()
280+ )
254281 lines : t .Iterable [bytes ] = filter (
255282 None ,
256- (line .strip () for line in proc . stdout . readlines () ),
283+ (line .strip () for line in stdout_lines ),
257284 )
258285 all_output = console_to_str (b"\n " .join (lines ))
259286 else :
260287 all_output = ""
261288 if code and proc .stderr is not None :
289+ stderr_raw : list [bytes ] = (
290+ timeout_stderr .split (b"\n " )
291+ if timeout_stderr is not None
292+ else proc .stderr .readlines ()
293+ )
262294 stderr_lines : t .Iterable [bytes ] = filter (
263295 None ,
264- (line .strip () for line in proc . stderr . readlines () ),
296+ (line .strip () for line in stderr_raw ),
265297 )
266298 all_output = console_to_str (b"" .join (stderr_lines ))
267299 output = "" .join (all_output )
@@ -272,3 +304,218 @@ def progress_cb(output: t.AnyStr, timestamp: datetime.datetime) -> None:
272304 cmd = _stringify_command (normalized_args ),
273305 )
274306 return output
307+
308+
309+ #: Grace period after ``terminate()`` before escalating to ``kill()``.
310+ _TIMEOUT_KILL_GRACE_SECONDS = 0.5
311+
312+ #: Upper bound on the ``selectors.select()`` wait inside the deadline loop.
313+ _TIMEOUT_POLL_INTERVAL_SECONDS = 0.1
314+
315+
316+ def _wait_with_deadline (
317+ proc : subprocess .Popen [bytes ],
318+ * ,
319+ deadline : float ,
320+ timeout : float ,
321+ callback : ProgressCallbackProtocol | None ,
322+ cmd : str | list [str ],
323+ ) -> tuple [int , bytes | None , bytes | None ]:
324+ """Wait for ``proc`` to exit, enforcing a wall-clock deadline.
325+
326+ Drains both ``stdout`` and ``stderr`` concurrently so a child that fills
327+ either kernel pipe buffer (~64 KiB on Linux) cannot deadlock waiting for
328+ libvcs to read its output. Uses :mod:`selectors` to unblock when either
329+ stream is readable, when the child exits, or when the per-iteration poll
330+ interval expires -- whichever comes first.
331+
332+ When the deadline is exceeded the subprocess is reaped and
333+ :class:`libvcs.exc.CommandTimeoutError` is raised with the bytes captured
334+ before the timeout. Otherwise the captured stdout/stderr are returned to
335+ the caller alongside the exit code so they can be reused without re-
336+ reading the now-drained pipes.
337+
338+ Returns
339+ -------
340+ tuple[int, bytes | None, bytes | None]
341+ ``(returncode, stdout_bytes, stderr_bytes)``. A byte buffer is
342+ ``None`` when the corresponding pipe could not be put into non-
343+ blocking mode (Windows pipes, unusual fd types) -- in that case the
344+ caller should fall back to reading the pipe directly.
345+
346+ Notes
347+ -----
348+ The progress ``callback`` is invoked for ``stderr`` chunks only,
349+ matching the legacy non-timeout codepath. ``stdout`` is drained into
350+ the returned buffer to prevent the child from blocking on a full pipe,
351+ but its chunks are not forwarded to the callback in real time. Callers
352+ that want streaming ``stdout`` should redirect it themselves
353+ (e.g. ``stdout=`` to a file) rather than relying on ``callback``.
354+ """
355+ sel = selectors .DefaultSelector ()
356+ buffers : dict [t .IO [bytes ], list [bytes ]] = {}
357+ registered : set [t .IO [bytes ]] = set ()
358+ fds_to_restore : list [int ] = []
359+
360+ for stream in (proc .stdout , proc .stderr ):
361+ if stream is None :
362+ continue
363+ # Non-blocking reads so the selector loop never stalls on a short read
364+ # and a chatty child cannot fill a pipe buffer and wedge itself.
365+ try :
366+ fd = stream .fileno ()
367+ os .set_blocking (fd , False )
368+ except (OSError , ValueError ):
369+ continue
370+ sel .register (stream , selectors .EVENT_READ )
371+ buffers [stream ] = []
372+ registered .add (stream )
373+ fds_to_restore .append (fd )
374+
375+ code : int | None = None
376+ try :
377+ while True :
378+ code = proc .poll ()
379+ if code is not None :
380+ # Final drain: data written between the last ``select()`` wake
381+ # and the child's exit would otherwise be lost on the early
382+ # return. Only drain streams we put into non-blocking mode.
383+ for stream in list (registered ):
384+ trailing = _drain_stream (stream )
385+ if trailing :
386+ buffers [stream ].append (trailing )
387+ if (
388+ stream is proc .stderr
389+ and callback is not None
390+ and callable (callback )
391+ ):
392+ callback (
393+ output = console_to_str (trailing ),
394+ timestamp = datetime .datetime .now (),
395+ )
396+ break
397+
398+ remaining = deadline - time .monotonic ()
399+ if remaining <= 0 :
400+ # ``vcs_exit_code`` deliberately omitted here: ``proc.returncode``
401+ # is still ``None`` because the child has not been signalled yet,
402+ # and CLAUDE.md treats ``vcs_exit_code`` as a scalar ``int``.
403+ logger .warning (
404+ "subprocess deadline exceeded after %.3gs" ,
405+ timeout ,
406+ extra = {"vcs_cmd" : _format_cmd_for_log (cmd )},
407+ )
408+ _terminate_process (proc , cmd )
409+ for stream in list (registered ):
410+ trailing = _drain_stream (stream )
411+ if trailing :
412+ buffers [stream ].append (trailing )
413+ stdout_data = _join_buffer (buffers , proc .stdout )
414+ stderr_data = _join_buffer (buffers , proc .stderr )
415+ message = console_to_str ((stdout_data or b"" ) + (stderr_data or b"" ))
416+ raise exc .CommandTimeoutError (
417+ output = message ,
418+ returncode = proc .returncode ,
419+ cmd = cmd ,
420+ timeout = timeout ,
421+ )
422+
423+ wait = min (_TIMEOUT_POLL_INTERVAL_SECONDS , remaining )
424+ if not registered :
425+ # No streams to select on (e.g. ``os.set_blocking`` failed on
426+ # Windows pipes). Yield the CPU explicitly instead of busy-
427+ # looping until the deadline or process exit.
428+ time .sleep (wait )
429+ continue
430+
431+ events = sel .select (timeout = wait )
432+ if not events :
433+ continue
434+
435+ for key , _mask in events :
436+ stream = t .cast ("t.IO[bytes]" , key .fileobj )
437+ try :
438+ chunk = stream .read (128 )
439+ except (BlockingIOError , OSError ):
440+ chunk = b""
441+ if not chunk :
442+ # EOF from the child closing the pipe. Stop selecting on
443+ # it so the loop doesn't spin on an always-readable fd.
444+ sel .unregister (stream )
445+ registered .discard (stream )
446+ continue
447+ buffers [stream ].append (chunk )
448+ if (
449+ stream is proc .stderr
450+ and callback is not None
451+ and callable (callback )
452+ ):
453+ callback (
454+ output = console_to_str (chunk ),
455+ timestamp = datetime .datetime .now (),
456+ )
457+ finally :
458+ # Restore blocking mode so any subsequent read by the caller behaves
459+ # as expected; ignore failures (fd already closed, Windows pipe).
460+ for fd in fds_to_restore :
461+ with contextlib .suppress (OSError , ValueError ):
462+ os .set_blocking (fd , True )
463+ sel .close ()
464+
465+ return code , _join_buffer (buffers , proc .stdout ), _join_buffer (buffers , proc .stderr )
466+
467+
468+ def _join_buffer (
469+ buffers : dict [t .IO [bytes ], list [bytes ]],
470+ stream : t .IO [bytes ] | None ,
471+ ) -> bytes | None :
472+ """Concatenate captured chunks for ``stream``, or ``None`` if not tracked."""
473+ if stream is None or stream not in buffers :
474+ return None
475+ return b"" .join (buffers [stream ])
476+
477+
478+ def _terminate_process (proc : subprocess .Popen [bytes ], cmd : str | list [str ]) -> None :
479+ """Terminate ``proc`` gracefully, falling back to ``kill`` on the grace."""
480+ if proc .poll () is not None :
481+ return
482+ try :
483+ proc .terminate ()
484+ except (OSError , ProcessLookupError ):
485+ return
486+ try :
487+ proc .wait (timeout = _TIMEOUT_KILL_GRACE_SECONDS )
488+ except subprocess .TimeoutExpired :
489+ logger .debug (
490+ "subprocess sigkill escalated after sigterm grace expired" ,
491+ extra = {"vcs_cmd" : _format_cmd_for_log (cmd )},
492+ )
493+ with contextlib .suppress (OSError , ProcessLookupError ):
494+ proc .kill ()
495+ # If the child is still unreachable after SIGKILL, bail rather than
496+ # block forever -- we've already signalled the user-facing timeout.
497+ with contextlib .suppress (subprocess .TimeoutExpired ):
498+ proc .wait (timeout = _TIMEOUT_KILL_GRACE_SECONDS )
499+ if proc .poll () is None :
500+ logger .warning (
501+ "subprocess sigkill did not reap; child may be leaked" ,
502+ extra = {"vcs_cmd" : _format_cmd_for_log (cmd )},
503+ )
504+
505+
506+ def _format_cmd_for_log (cmd : str | list [str ]) -> str :
507+ """Render ``cmd`` as a flat string for the ``vcs_cmd`` log extra."""
508+ if isinstance (cmd , list ):
509+ return " " .join (cmd )
510+ return cmd
511+
512+
513+ def _drain_stream (stream : t .IO [bytes ] | None ) -> bytes :
514+ """Best-effort read of any remaining bytes from a subprocess pipe."""
515+ if stream is None :
516+ return b""
517+ try :
518+ data = stream .read () or b""
519+ except (BlockingIOError , OSError , ValueError ):
520+ data = b""
521+ return data
0 commit comments