diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 1636cd0d..19244660 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -24,8 +24,15 @@ jobs: tox-env: py313 - python-version: '3.14' tox-env: py314 + # Python 3.15 is a pre-release and currently unsupported by + # typing_extensions (no released version survives + # `from typing_extensions import *` on 3.15 because + # no_type_check_decorator is still listed in __all__ after its + # removal from typing), which breaks the python_utils import. + # Failures are advisory until upstream catches up. - python-version: '3.15-dev' tox-env: py315 + experimental: true - python-version: '3.14' tox-env: docs - python-version: '3.14' @@ -44,4 +51,8 @@ jobs: run: | python -m pip install --upgrade pip tox - name: Test with tox + # Step-level continue-on-error keeps the job green for + # experimental (pre-release Python) environments while still + # showing the failing step in the logs + continue-on-error: ${{ matrix.experimental || false }} run: tox -e ${{ matrix.tox-env }} diff --git a/examples.py b/examples.py index b07cf86f..eb2953d2 100644 --- a/examples.py +++ b/examples.py @@ -85,6 +85,11 @@ def do_something(bar): # Increment one of the progress bars at random multibar[bar_label].increment() + # The multibar context manager waits for all bars to finish on + # exit, so finish them explicitly + for bar_label in bar_labels: + multibar[bar_label].finish() + @example def multiple_bars_line_offset_example() -> None: diff --git a/progressbar/__main__.py b/progressbar/__main__.py index b4b4e9a9..59e4117c 100644 --- a/progressbar/__main__.py +++ b/progressbar/__main__.py @@ -342,8 +342,8 @@ def main(argv: list[str] | None = None) -> None: # noqa: C901 # Initialize the progress bar bar = progressbar.ProgressBar( - # widgets=widgets, - max_value=total_size or None, + widgets=widgets, + max_value=total_size if filesize_available else None, max_error=False, ) @@ -354,12 +354,20 @@ def main(argv: list[str] | None = None) -> None: # noqa: C901 total_transferred = 0 bar.start() - with contextlib.suppress(KeyboardInterrupt): + with contextlib.suppress(KeyboardInterrupt, BrokenPipeError): for input_path in input_paths: if isinstance(input_path, pathlib.Path): - input_stream = stack.enter_context( - input_path.open('r' if args.line_mode else 'rb') - ) + if args.line_mode: + # newline='' disables universal-newline + # translation so the byte count matches the file + # size for CRLF files as well + input_stream = stack.enter_context( + input_path.open('r', newline=''), + ) + else: + input_stream = stack.enter_context( + input_path.open('rb'), + ) else: input_stream = input_path @@ -374,7 +382,18 @@ def main(argv: list[str] | None = None) -> None: # noqa: C901 break output_stream.write(data) - total_transferred += len(data) + if isinstance(data, str): + # The total size is measured in bytes, so progress + # must be tracked in bytes as well + encoding = ( + getattr(input_stream, 'encoding', None) or 'utf-8' + ) + total_transferred += len( + data.encode(encoding, errors='replace'), + ) + else: + total_transferred += len(data) + bar.update(total_transferred) bar.finish(dirty=True) @@ -386,8 +405,14 @@ def _get_output_stream( stack: contextlib.ExitStack, ) -> typing.IO[typing.Any]: if output and output != '-': - mode = 'w' if line_mode else 'wb' - return stack.enter_context(open(output, mode)) # noqa: SIM115 + if line_mode: + # newline='' passes the data through without newline + # translation, mirroring the input handling + return stack.enter_context( + open(output, 'w', newline=''), # noqa: SIM115 + ) + + return stack.enter_context(open(output, 'wb')) # noqa: SIM115 elif line_mode: return sys.stdout else: diff --git a/progressbar/algorithms.py b/progressbar/algorithms.py index c0cb7a1f..8dd2cf89 100644 --- a/progressbar/algorithms.py +++ b/progressbar/algorithms.py @@ -27,10 +27,15 @@ class ExponentialMovingAverage(SmoothingAlgorithm): def __init__(self, alpha: float = 0.5) -> None: self.alpha = alpha - self.value = 0 + self.value: float | None = None def update(self, new_value: float, elapsed: timedelta) -> float: - self.value = self.alpha * new_value + (1 - self.alpha) * self.value + if self.value is None: + # Seed with the first observation instead of biasing towards 0 + self.value = new_value + else: + self.value = self.alpha * new_value + (1 - self.alpha) * self.value + return self.value @@ -43,10 +48,15 @@ class DoubleExponentialMovingAverage(SmoothingAlgorithm): def __init__(self, alpha: float = 0.5) -> None: self.alpha = alpha - self.ema1 = 0 - self.ema2 = 0 + self.ema1: float | None = None + self.ema2: float | None = None def update(self, new_value: float, elapsed: timedelta) -> float: - self.ema1 = self.alpha * new_value + (1 - self.alpha) * self.ema1 - self.ema2 = self.alpha * self.ema1 + (1 - self.alpha) * self.ema2 + if self.ema1 is None or self.ema2 is None: + # Seed with the first observation instead of biasing towards 0 + self.ema1 = self.ema2 = new_value + else: + self.ema1 = self.alpha * new_value + (1 - self.alpha) * self.ema1 + self.ema2 = self.alpha * self.ema1 + (1 - self.alpha) * self.ema2 + return 2 * self.ema1 - self.ema2 diff --git a/progressbar/bar.py b/progressbar/bar.py index c3493708..3784770f 100644 --- a/progressbar/bar.py +++ b/progressbar/bar.py @@ -11,6 +11,7 @@ import timeit import typing import warnings +import weakref from copy import deepcopy from datetime import datetime from types import FrameType @@ -132,10 +133,12 @@ def finish(self): # pragma: no cover def __del__(self): if not self._finished and self._started: # pragma: no cover # We're not using contextlib.suppress here because during teardown - # contextlib is not available anymore. + # contextlib is not available anymore. Any exception can occur + # here during interpreter shutdown (closed streams, partially + # torn down modules), so we suppress all of them. try: # noqa: SIM105 self.finish() - except AttributeError: + except Exception: # noqa: BLE001, S110 pass def __getstate__(self): @@ -385,6 +388,62 @@ def _to_unicode(cls, args: typing.Any): yield converters.to_unicode(arg) +class _ResizeRegistry: + """ + Shared SIGWINCH handling for all resizable progressbars. + + A single signal handler dispatches to every live bar. The original + handler is saved when the first bar registers and restored when the + last one unregisters, so overlapping bars can finish in any order + without leaving a dangling handler installed. + """ + + bars: typing.ClassVar[weakref.WeakSet[ResizableMixin]] = weakref.WeakSet() + previous_handler: typing.ClassVar[typing.Any] = None + + @classmethod + def install(cls, bar: ResizableMixin) -> None: + import signal + + if not hasattr(signal, 'SIGWINCH'): # pragma: no cover + # Not available on Windows + return + + if not cls.bars: + cls.previous_handler = signal.getsignal( + signal.SIGWINCH # type: ignore[attr-defined] + ) + signal.signal( + signal.SIGWINCH, # type: ignore[attr-defined] + cls.handle_resize, + ) + + cls.bars.add(bar) + + @classmethod + def uninstall(cls, bar: ResizableMixin) -> None: + import signal + + if not hasattr(signal, 'SIGWINCH'): # pragma: no cover + # Not available on Windows + return + + cls.bars.discard(bar) + if not cls.bars: + signal.signal( + signal.SIGWINCH, # type: ignore[attr-defined] + cls.previous_handler, + ) + cls.previous_handler = None + + @classmethod + def handle_resize( + cls, signum: int | None = None, frame: None | FrameType = None + ) -> None: + for bar in list(cls.bars): + bar._handle_resize(signum, frame) + + class ResizableMixin(ProgressBarMixinBase): def __init__(self, term_width: int | None = None, **kwargs: typing.Any): ProgressBarMixinBase.__init__(self, **kwargs) @@ -395,15 +454,7 @@ def __init__(self, term_width: int | None = None, **kwargs: typing.Any): else: # pragma: no cover with contextlib.suppress(Exception): self._handle_resize() - import signal - - self._prev_handle = signal.getsignal( - signal.SIGWINCH # type: ignore - ) - signal.signal( - signal.SIGWINCH, - self._handle_resize, # type: ignore - ) + _ResizeRegistry.install(self) self.signal_set = True def _handle_resize( @@ -417,12 +468,8 @@ def finish(self): # pragma: no cover ProgressBarMixinBase.finish(self) if self.signal_set: with contextlib.suppress(Exception): - import signal - - signal.signal( - signal.SIGWINCH, - self._prev_handle, # type: ignore - ) + _ResizeRegistry.uninstall(self) + self.signal_set = False class StdRedirectMixin(DefaultFdMixin): @@ -686,6 +733,8 @@ def init(self): self.end_time = None self.extra = dict() self._last_update_timer = timeit.default_timer() + self._started = False + self._finished = False @property def percentage(self) -> float | None: @@ -749,7 +798,7 @@ def data(self) -> types.Dict[str, types.Any]: - `minutes_elapsed`: The minutes since the bar started modulo 60 - `hours_elapsed`: The hours since the bar started modulo 24 - - `days_elapsed`: The hours since the bar started + - `days_elapsed`: The days since the bar started - `time_elapsed`: The raw elapsed `datetime.timedelta` object - `percentage`: Percentage as a float or `None` if no max_value is available @@ -788,8 +837,8 @@ def data(self) -> types.Dict[str, types.Any]: minutes_elapsed=(elapsed.seconds / 60) % 60, # The hours since the bar started modulo 24 hours_elapsed=(elapsed.seconds / (60 * 60)) % 24, - # The hours since the bar started - days_elapsed=(elapsed.seconds / (60 * 60 * 24)), + # The days since the bar started + days_elapsed=(elapsed.total_seconds() / (60 * 60 * 24)), # The raw elapsed `datetime.timedelta` object time_elapsed=elapsed, # Percentage as a float or `None` if no max_value is available @@ -954,7 +1003,7 @@ def _update_variables(self, kwargs): if key not in self.variables: raise TypeError( 'update() got an unexpected variable name as argument ' - '{key!r}', + f'{key!r}', ) elif self.variables[key] != value_: self.variables[key] = kwargs[key] @@ -1080,6 +1129,11 @@ def finish(self, end: str = '\n', dirty: bool = False): dirty (bool): When True the progressbar kept the current state and won't be set to 100 percent """ + if self._finished: + # Finishing twice would corrupt the global stream-wrapping + # state, so extra calls are no-ops + return + if not dirty: self.end_time = datetime.now() self.update(self.max_value, force=True) diff --git a/progressbar/base.py b/progressbar/base.py index 48edf18f..6c80c19d 100644 --- a/progressbar/base.py +++ b/progressbar/base.py @@ -1,6 +1,5 @@ from __future__ import annotations -import typing from typing import IO, TextIO @@ -9,12 +8,6 @@ class FalseMeta(type): def __bool__(cls) -> bool: # pragma: no cover return False - @classmethod - def __cmp__(cls, other: typing.Any) -> int: # pragma: no cover - return -1 - - __nonzero__ = __bool__ - class UnknownLength(metaclass=FalseMeta): pass diff --git a/progressbar/env.py b/progressbar/env.py index d2faae02..14d92dfc 100644 --- a/progressbar/env.py +++ b/progressbar/env.py @@ -92,6 +92,11 @@ def from_env(cls) -> ColorSupport: support = max(cls.XTERM_256, support) elif value == 'xterm': support = max(cls.XTERM, support) + elif env_flag(variable, default=False): + # Generic truthy flags such as `FORCE_COLOR=1` enable + # color support but don't specify the depth; assume full + # color support analogous to the Jupyter handling above. + return cls.XTERM_TRUECOLOR return support diff --git a/progressbar/multi.py b/progressbar/multi.py index 934798c3..fabd1b2d 100644 --- a/progressbar/multi.py +++ b/progressbar/multi.py @@ -74,10 +74,10 @@ class MultiBar(dict[str, bar.ProgressBar]): _previous_output: list[str] _finished_at: dict[bar.ProgressBar, float] _labeled: set[bar.ProgressBar] - _print_lock: threading.RLock = threading.RLock() - _thread: threading.Thread | None = None - _thread_finished: threading.Event = threading.Event() - _thread_closed: threading.Event = threading.Event() + _print_lock: threading.RLock + _thread: threading.Thread | None + _thread_finished: threading.Event + _thread_closed: threading.Event def __init__( self, @@ -125,6 +125,10 @@ def __init__( self._finished_at = {} self._previous_output = [] self._buffer = io.StringIO() + self._print_lock = threading.RLock() + self._thread = None + self._thread_finished = threading.Event() + self._thread_closed = threading.Event() super().__init__(bars or {}) @@ -172,7 +176,7 @@ def _label_bar(self, bar: bar.ProgressBar) -> None: self._labeled.add(bar) bar.widgets.insert(0, self.label_format.format(label=bar.label)) - if self.append_label and bar not in self._labeled: # pragma: no branch + if self.append_label: # pragma: no branch self._labeled.add(bar) bar.widgets.append(self.label_format.format(label=bar.label)) @@ -330,9 +334,14 @@ def print( self.flush() def flush(self) -> None: - self.fd.write(self._buffer.getvalue()) - self._buffer.truncate(0) - self.fd.flush() + # The fd write happens under the lock as well so concurrent + # print()/render() calls cannot interleave their output + with self._print_lock: + value = self._buffer.getvalue() + self._buffer.seek(0) + self._buffer.truncate(0) + self.fd.write(value) + self.fd.flush() def run(self, join: bool = True) -> None: """ @@ -346,7 +355,7 @@ def run(self, join: bool = True) -> None: if join or self._thread_closed.is_set(): # If the thread is closed, we need to check if the progressbars # have finished. If they have, we can exit the loop - for bar_ in self.values(): # pragma: no cover + for bar_ in list(self.values()): # pragma: no cover if not bar_.finished(): break else: @@ -357,26 +366,31 @@ def run(self, join: bool = True) -> None: def start(self) -> None: assert not self._thread, 'Multibar already started' - self._thread_closed.set() - self._thread = threading.Thread(target=self.run, args=(False,)) + self._thread_finished.clear() + self._thread_closed.clear() + self._thread = threading.Thread( + target=self.run, + args=(False,), + daemon=True, + ) self._thread.start() def join(self, timeout: float | None = None) -> None: if self._thread is not None: self._thread_closed.set() self._thread.join(timeout=timeout) - self._thread = None + if not self._thread.is_alive(): + self._thread = None def stop(self, timeout: float | None = None): self._thread_finished.set() self.join(timeout=timeout) def get_sorted_bars(self): - return sorted( - self.values(), - key=self.sort_keyfunc, - reverse=self.sort_reverse, - ) + # Materialize the values into a list first so other threads can + # add or remove bars while we are sorting and rendering + bars = list(self.values()) + return sorted(bars, key=self.sort_keyfunc, reverse=self.sort_reverse) def __enter__(self): self.start() @@ -388,4 +402,9 @@ def __exit__( exc_value: BaseException | None, traceback: types.TracebackType | None, ) -> bool | None: - self.join() + if exc_type is None: + self.join() + else: + # Don't wait for unfinished progressbars when an exception is + # propagating; that would block forever + self.stop() diff --git a/progressbar/terminal/base.py b/progressbar/terminal/base.py index e1f9543c..2d299f4e 100644 --- a/progressbar/terminal/base.py +++ b/progressbar/terminal/base.py @@ -341,8 +341,8 @@ def from_rgb(cls, rgb: RGB) -> HSL: def interpolate(self, end: HSL, step: float) -> HSL: return HSL( self.hue + (end.hue - self.hue) * step, - self.lightness + (end.lightness - self.lightness) * step, self.saturation + (end.saturation - self.saturation) * step, + self.lightness + (end.lightness - self.lightness) * step, ) diff --git a/progressbar/terminal/os_specific/posix.py b/progressbar/terminal/os_specific/posix.py index 34819983..ee873dcb 100644 --- a/progressbar/terminal/os_specific/posix.py +++ b/progressbar/terminal/os_specific/posix.py @@ -4,6 +4,10 @@ def getch() -> str: + if not sys.stdin.isatty(): + # Raw mode is unavailable (and unnecessary) without a tty + return sys.stdin.read(1) + fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) # type: ignore try: diff --git a/progressbar/terminal/os_specific/windows.py b/progressbar/terminal/os_specific/windows.py index 8d1f3f4b..9afd031c 100644 --- a/progressbar/terminal/os_specific/windows.py +++ b/progressbar/terminal/os_specific/windows.py @@ -25,6 +25,18 @@ _STD_INPUT_HANDLE = _DWORD(-10) _STD_OUTPUT_HANDLE = _DWORD(-11) +# GetStdHandle returns INVALID_HANDLE_VALUE (-1) when no console is +# attached (piped output, pythonw, services) +_INVALID_HANDLE_VALUE = _HANDLE(-1).value +# The EventType of a KEY_EVENT_RECORD in an INPUT_RECORD +_KEY_EVENT = 0x0001 + + +def _valid_handle(handle) -> bool: + # Handles may be plain ints (from a HANDLE restype) or ctypes + # instances; normalize before comparing + value = getattr(handle, 'value', handle) + return value is not None and value != _INVALID_HANDLE_VALUE class WindowsConsoleModeFlags(enum.IntFlag): @@ -48,25 +60,33 @@ def __str__(self) -> str: return f'{self.name} (0x{self.value:04X})' +# Explicit argtypes are required: without them ctypes passes arguments +# as 32-bit C ints, silently truncating 64-bit HANDLE values _GetConsoleMode = _kernel32.GetConsoleMode +_GetConsoleMode.argtypes = (_HANDLE, ctypes.POINTER(_DWORD)) _GetConsoleMode.restype = _BOOL _SetConsoleMode = _kernel32.SetConsoleMode +_SetConsoleMode.argtypes = (_HANDLE, _DWORD) _SetConsoleMode.restype = _BOOL _GetStdHandle = _kernel32.GetStdHandle +_GetStdHandle.argtypes = (_DWORD,) _GetStdHandle.restype = _HANDLE -_ReadConsoleInput = _kernel32.ReadConsoleInputA -_ReadConsoleInput.restype = _BOOL +_SetConsoleTextAttribute = _kernel32.SetConsoleTextAttribute +_SetConsoleTextAttribute.argtypes = (_HANDLE, _WORD) +_SetConsoleTextAttribute.restype = _BOOL _h_console_input = _GetStdHandle(_STD_INPUT_HANDLE) _input_mode = _DWORD() -_GetConsoleMode(_HANDLE(_h_console_input), ctypes.byref(_input_mode)) +if _valid_handle(_h_console_input): + _GetConsoleMode(_HANDLE(_h_console_input), ctypes.byref(_input_mode)) _h_console_output = _GetStdHandle(_STD_OUTPUT_HANDLE) _output_mode = _DWORD() -_GetConsoleMode(_HANDLE(_h_console_output), ctypes.byref(_output_mode)) +if _valid_handle(_h_console_output): + _GetConsoleMode(_HANDLE(_h_console_output), ctypes.byref(_output_mode)) class _COORD(ctypes.Structure): @@ -121,17 +141,34 @@ class _Event(ctypes.Union): _fields_ = (('EventType', _WORD), ('Event', _Event)) +_ReadConsoleInput = _kernel32.ReadConsoleInputA +_ReadConsoleInput.argtypes = ( + _HANDLE, + ctypes.POINTER(_INPUT_RECORD), + _DWORD, + ctypes.POINTER(_DWORD), +) +_ReadConsoleInput.restype = _BOOL + + def reset_console_mode() -> None: - _SetConsoleMode(_HANDLE(_h_console_input), _DWORD(_input_mode.value)) - _SetConsoleMode(_HANDLE(_h_console_output), _DWORD(_output_mode.value)) + if _valid_handle(_h_console_input): + _SetConsoleMode(_HANDLE(_h_console_input), _DWORD(_input_mode.value)) + + if _valid_handle(_h_console_output): + _SetConsoleMode(_HANDLE(_h_console_output), _DWORD(_output_mode.value)) def set_console_mode() -> bool: - mode = ( - _input_mode.value - | WindowsConsoleModeFlags.ENABLE_VIRTUAL_TERMINAL_INPUT - ) - _SetConsoleMode(_HANDLE(_h_console_input), _DWORD(mode)) + if not _valid_handle(_h_console_output): + return False + + if _valid_handle(_h_console_input): + mode = ( + _input_mode.value + | WindowsConsoleModeFlags.ENABLE_VIRTUAL_TERMINAL_INPUT + ) + _SetConsoleMode(_HANDLE(_h_console_input), _DWORD(mode)) mode = ( _output_mode.value @@ -146,7 +183,8 @@ def get_console_mode() -> int: def set_text_color(color) -> None: - _kernel32.SetConsoleTextAttribute(_h_console_output, color) + if _valid_handle(_h_console_output): + _SetConsoleTextAttribute(_HANDLE(_h_console_output), _WORD(color)) def print_color(text, color) -> None: @@ -156,19 +194,36 @@ def print_color(text, color) -> None: def getch(): + if not _valid_handle(_h_console_input): + return None + lp_buffer = (_INPUT_RECORD * 2)() n_length = _DWORD(2) lp_number_of_events_read = _DWORD() - _ReadConsoleInput( + if not _ReadConsoleInput( _HANDLE(_h_console_input), lp_buffer, n_length, ctypes.byref(lp_number_of_events_read), - ) - - char = lp_buffer[1].Event.KeyEvent.uChar.AsciiChar.decode('ascii') - if char == '\x00': + ): return None - return char + # Only the records that were actually read contain valid data. The + # Event field is a union, so the KeyEvent member may only be read + # for KEY_EVENT records, and non-ASCII keys must not crash the + # decode. + for i in range(min(lp_number_of_events_read.value, len(lp_buffer))): + record = lp_buffer[i] + if record.EventType != _KEY_EVENT: + continue + + key_event = record.Event.KeyEvent + if not key_event.bKeyDown: + continue + + char = key_event.uChar.AsciiChar.decode('ascii', errors='replace') + if char != '\x00': + return char + + return None diff --git a/progressbar/terminal/stream.py b/progressbar/terminal/stream.py index e3064b0b..04429e47 100644 --- a/progressbar/terminal/stream.py +++ b/progressbar/terminal/stream.py @@ -19,7 +19,7 @@ def fileno(self) -> int: return self.stream.fileno() def flush(self) -> None: - pass + self.stream.flush() def isatty(self) -> bool: return self.stream.isatty() @@ -83,6 +83,7 @@ def __init__( super().__init__(stream) def write(self, data: str) -> int: + written = len(data) data = data.rstrip('\n') # Move the cursor up self.stream.write(self.UP * self.lines) @@ -94,7 +95,9 @@ def write(self, data: str) -> int: self.stream.write(self.DOWN * self.lines) self.flush() - return len(data) + # Return the length of the original data; callers use this to + # detect short writes + return written class LastLineStream(TextIOOutputWrapper): diff --git a/progressbar/utils.py b/progressbar/utils.py index fb0c72b6..4a77da7a 100644 --- a/progressbar/utils.py +++ b/progressbar/utils.py @@ -154,10 +154,13 @@ def flush(self) -> None: def _flush(self) -> None: if value := self.buffer.getvalue(): self.flush() - self.target.write(value) + # Clear the buffer before writing so a failed write cannot + # cause the same data to be written again by the next flush self.buffer.seek(0) self.buffer.truncate(0) self.needs_clear = False + if not self.target.closed: + self.target.write(value) # when explicitly flushing, always flush the target as well self.flush_target() @@ -337,15 +340,23 @@ def unwrap_stdout(self) -> None: if self.wrapped_stdout > 1: self.wrapped_stdout -= 1 else: - sys.stdout = self.original_stdout + # Also reset our own reference so needs_clear() and + # update_capturing() don't act on a stale wrapper + self.stdout = sys.stdout = self.original_stdout self.wrapped_stdout = 0 + if not self.wrapped_stderr: + self.unwrap_excepthook() def unwrap_stderr(self) -> None: if self.wrapped_stderr > 1: self.wrapped_stderr -= 1 else: - sys.stderr = self.original_stderr + # Also reset our own reference so needs_clear() and + # update_capturing() don't act on a stale wrapper + self.stderr = sys.stderr = self.original_stderr self.wrapped_stderr = 0 + if not self.wrapped_stdout: + self.unwrap_excepthook() def needs_clear(self) -> bool: # pragma: no cover stdout_needs_clear = getattr(self.stdout, 'needs_clear', False) @@ -356,8 +367,8 @@ def flush(self) -> None: if self.wrapped_stdout and isinstance(self.stdout, WrappingIO): try: self.stdout._flush() - except io.UnsupportedOperation: # pragma: no cover - self.wrapped_stdout = False + except io.UnsupportedOperation: + self.wrapped_stdout = 0 logger.warning( 'Disabling stdout redirection, %r is not seekable', sys.stdout, @@ -367,7 +378,7 @@ def flush(self) -> None: try: self.stderr._flush() except io.UnsupportedOperation: # pragma: no cover - self.wrapped_stderr = False + self.wrapped_stderr = 0 logger.warning( 'Disabling stderr redirection, %r is not seekable', sys.stderr, diff --git a/progressbar/widgets.py b/progressbar/widgets.py index 82b5b0c6..5d00b5c2 100644 --- a/progressbar/widgets.py +++ b/progressbar/widgets.py @@ -92,7 +92,18 @@ def _marker(progress, data, width): progress.max_value is not base.UnknownLength and progress.max_value > 0 ): - length = int(progress.value / progress.max_value * width) + # The fill length is based on the progress relative to + # min_value; the max() guards against a zero range and the + # min() keeps the marker within the allotted width when the + # value exceeds max_value (with max_error=False) + length = min( + width, + int( + (progress.value - progress.min_value) + / max(progress.max_value - progress.min_value, 1e-6) + * width, + ), + ) return marker * length else: return marker @@ -463,12 +474,7 @@ def __call__( if isinstance(self.samples, datetime.timedelta): minimum_time = progress.last_update_time - self.samples - minimum_value = sample_values[-1] - while ( - sample_times[2:] - and minimum_time > sample_times[1] - and minimum_value > sample_values[1] - ): + while sample_times[2:] and minimum_time > sample_times[1]: sample_times.pop(0) sample_values.pop(0) elif len(sample_times) > self.samples: @@ -532,7 +538,9 @@ def __call__( ): """Updates the widget to show the ETA or total time when finished.""" if value is None: - value = data['value'] + # The per-item rate must be based on the progress relative to + # min_value, not the raw value + value = data['value'] - progress.min_value if elapsed is None: elapsed = data['time_elapsed'] @@ -677,7 +685,9 @@ def __call__( elapsed=None, ): if value is None: # pragma: no branch - value = data['value'] + # The per-item rate must be based on the progress relative to + # min_value, not the raw value + value = data['value'] - progress.min_value if elapsed is None: # pragma: no branch elapsed = data['time_elapsed'] @@ -777,10 +787,11 @@ def __call__( scaled = power = 0 data['unit'] = self.unit - if power == 0 and scaled < 0.1: - if scaled > 0: - scaled = 1 / scaled - data['scaled'] = scaled + if power == 0 and 0 < scaled < 0.1: + # Slow transfers are shown as seconds per unit instead. Note + # that this is only done when there is actual data; before the + # first data arrives the regular format is used. + data['scaled'] = 1 / scaled data['prefix'] = self.prefixes[0] return FormatWidgetMixin.__call__( self, @@ -1258,9 +1269,13 @@ def get_values(self, progress: ProgressBarMixinBase, data: Data): ranges = [0.0] * len(self.markers) for value in data['variables'][self.name] or []: if not isinstance(value, (int, float)): - # Progress is (value, max) + # Progress is (value, max). A zero maximum means the total + # is not known (yet), so no progress can be shown. progress_value, progress_max = value - value = float(progress_value) / float(progress_max) + if progress_max: + value = float(progress_value) / float(progress_max) + else: + value = 0.0 if not 0 <= value <= 1: raise ValueError( @@ -1611,11 +1626,19 @@ def __call__( marker = bg_color.bg(marker) self.job_markers.append(marker) + # Drop the oldest markers when they no longer fit the + # available width + while ( + len(self.job_markers) > 1 + and progress.custom_len(''.join(self.job_markers)) > width + ): + self.job_markers.pop(0) + marker = ''.join(self.job_markers) width -= progress.custom_len(marker) fill = converters.to_unicode(self.fill(progress, data, width)) - fill = self._apply_colors(fill * width, data) + fill = self._apply_colors(fill * max(width, 0), data) if self.fill_left: # pragma: no branch marker += fill diff --git a/tests/test_algorithms.py b/tests/test_algorithms.py index a6cc6467..4ce1364b 100644 --- a/tests/test_algorithms.py +++ b/tests/test_algorithms.py @@ -8,7 +8,7 @@ def test_ema_initialization() -> None: ema = algorithms.ExponentialMovingAverage() assert ema.alpha == 0.5 - assert ema.value == 0 + assert ema.value is None @pytest.mark.parametrize( @@ -25,7 +25,10 @@ def test_ema_initialization() -> None: ], ) def test_ema_update(alpha, new_value: float, expected) -> None: + # The first update seeds the average, so blending starts from an + # explicit zero observation: alpha * new_value + (1 - alpha) * 0 ema = algorithms.ExponentialMovingAverage(alpha) + ema.update(0, timedelta(seconds=1)) result = ema.update(new_value, timedelta(seconds=1)) assert result == expected @@ -33,8 +36,8 @@ def test_ema_update(alpha, new_value: float, expected) -> None: def test_dema_initialization() -> None: dema = algorithms.DoubleExponentialMovingAverage() assert dema.alpha == 0.5 - assert dema.ema1 == 0 - assert dema.ema2 == 0 + assert dema.ema1 is None + assert dema.ema2 is None @pytest.mark.parametrize( @@ -50,9 +53,28 @@ def test_dema_initialization() -> None: ], ) def test_dema_update(alpha, new_value: float, expected) -> None: + # Seeded with an explicit zero observation, a single update yields + # ema1 = alpha * v, ema2 = alpha^2 * v, so the result is + # alpha * v * (2 - alpha) which matches the historical values dema = algorithms.DoubleExponentialMovingAverage(alpha) + dema.update(0, timedelta(seconds=1)) result = dema.update(new_value, timedelta(seconds=1)) - assert result == expected + assert result == pytest.approx(expected) # Additional test functions can be added here as needed. + + +def test_ema_seeds_from_first_value() -> None: + # Regression: B8 - the average started at 0, biasing early values + # toward zero instead of the first observation. + ema = algorithms.ExponentialMovingAverage(0.5) + assert ema.update(100, timedelta(seconds=1)) == 100 + assert ema.update(50, timedelta(seconds=1)) == 75 + + +def test_dema_seeds_from_first_value() -> None: + # Regression: B8 - same zero bias for the double EMA. + dema = algorithms.DoubleExponentialMovingAverage(0.5) + assert dema.update(100, timedelta(seconds=1)) == 100 + assert dema.update(50, timedelta(seconds=1)) == 62.5 diff --git a/tests/test_color.py b/tests/test_color.py index 4a368af4..3c0f5fb4 100644 --- a/tests/test_color.py +++ b/tests/test_color.py @@ -410,3 +410,23 @@ def test_ansi_color(monkeypatch) -> None: def test_sgr_call() -> None: assert progressbar.terminal.encircled('test') == '\x1b[52mtest\x1b[54m' + + +def test_hsl_interpolate_preserves_components() -> None: + # Regression: C1 - interpolate() swapped the saturation and lightness + # arguments, corrupting every HSL gradient blend. + start_color = terminal.HSL(0, 100, 25) + end_color = terminal.HSL(0, 100, 75) + + assert start_color.interpolate(end_color, 0.5) == terminal.HSL(0, 100, 50) + + +@pytest.mark.parametrize('value', ['1', 'true', 'on']) +def test_color_support_force_color_flag(monkeypatch, value) -> None: + # Regression: C8 - the conventional FORCE_COLOR=1 left color support + # at NONE because only depth-style values were recognised. + if os.name == 'nt': + monkeypatch.setattr(os, 'name', 'posix') + + monkeypatch.setenv('FORCE_COLOR', value) + assert env.ColorSupport.from_env() == env.ColorSupport.XTERM_TRUECOLOR diff --git a/tests/test_data_transfer_bar.py b/tests/test_data_transfer_bar.py index 7e5cfcce..e8f5c577 100644 --- a/tests/test_data_transfer_bar.py +++ b/tests/test_data_transfer_bar.py @@ -1,3 +1,5 @@ +import io + import progressbar from progressbar import DataTransferBar @@ -14,3 +16,16 @@ def test_unknown_length() -> None: for i in range(50): dtb.update(i) dtb.finish() + + +def test_file_transfer_speed_before_any_data() -> None: + # Regression: B6 - before any data was transferred the widget + # rendered '0.0 s/B' using the inverse format. + widget = progressbar.FileTransferSpeed() + bar = progressbar.ProgressBar( + max_value=10, widgets=[widget], fd=io.StringIO(), term_width=60 + ) + bar.start() + output = widget(bar, bar.data()) + assert 's/' not in output + bar.finish(dirty=True) diff --git a/tests/test_failure.py b/tests/test_failure.py index 5953284b..299f6ff4 100644 --- a/tests/test_failure.py +++ b/tests/test_failure.py @@ -138,3 +138,11 @@ def test_increment() -> None: bar = progressbar.ProgressBar(max_value=10) bar.increment() del bar + + +def test_unexpected_update_keyword_arg_message() -> None: + # Regression: A3 - the error message contained the literal text + # '{key!r}' because the string was not an f-string. + bar = progressbar.ProgressBar(max_value=10) + with pytest.raises(TypeError, match='foo'): + bar.update(1, foo=10) diff --git a/tests/test_job_status.py b/tests/test_job_status.py index 778b6ce3..d4770908 100644 --- a/tests/test_job_status.py +++ b/tests/test_job_status.py @@ -1,8 +1,10 @@ +import io import time import pytest import progressbar +from progressbar import utils @pytest.mark.parametrize( @@ -20,3 +22,27 @@ def test_status(status) -> None: for _ in range(5): bar.increment(status=status, force=True) time.sleep(0.1) + + +def test_job_status_bar_does_not_overflow_width() -> None: + # Regression: B4 - accumulated job markers made the rendered output + # wider than the allotted width. + widget = progressbar.widgets.JobStatusBar('status') + bar = progressbar.ProgressBar( + widgets=[widget], + variables={'status': None}, + max_value=100, + fd=io.StringIO(), + term_width=60, + ) + bar.start() + data = bar.data() + data['variables'] = {'status': True} + + width = 5 + output = '' + for _ in range(10): + output = widget(bar, data, width=width) + + assert utils.len_color(output) <= width + bar.finish(dirty=True) diff --git a/tests/test_monitor_progress.py b/tests/test_monitor_progress.py index 6f0f148f..6f883487 100644 --- a/tests/test_monitor_progress.py +++ b/tests/test_monitor_progress.py @@ -200,7 +200,6 @@ def test_line_breaks(testdir) -> None: ' 60%|################################ |', ' 80%|########################################### |', '100%|######################################################|', - '100%|######################################################|', ), ) @@ -224,8 +223,6 @@ def test_no_line_breaks(testdir) -> None: ' 60%|################################ |', ' 80%|########################################### |', '100%|######################################################|', - '', - '100%|######################################################|', ] @@ -248,8 +245,6 @@ def test_percentage_label_bar(testdir) -> None: '|###########################60%#### |', '|###########################80%################ |', '|###########################100%###########################|', - '', - '|###########################100%###########################|', ] @@ -272,8 +267,6 @@ def test_granular_bar(testdir) -> None: '|OOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOo |', '|OOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO. |', '|OOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO|', - '', - '|OOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO|', ] @@ -287,10 +280,10 @@ def test_colors(testdir) -> None: testdir.makepyfile(_create_script(enable_colors=True, **kwargs)), ) pprint.pprint(result.stderr.lines, width=70) - assert result.stderr.lines == ['\x1b[92mgreen\x1b[0m'] * 3 + assert result.stderr.lines == ['\x1b[92mgreen\x1b[0m'] * 2 result = testdir.runpython( testdir.makepyfile(_create_script(enable_colors=False, **kwargs)), ) pprint.pprint(result.stderr.lines, width=70) - assert result.stderr.lines == ['green'] * 3 + assert result.stderr.lines == ['green'] * 2 diff --git a/tests/test_multibar.py b/tests/test_multibar.py index 84484200..daf55a17 100644 --- a/tests/test_multibar.py +++ b/tests/test_multibar.py @@ -1,3 +1,5 @@ +import contextlib +import io import random import threading import time @@ -137,6 +139,9 @@ def test_multibar_show_finished() -> None: bar.update(i) time.sleep(SLEEP) + # The context manager waits for all bars to finish + bar.finish() + multibar.render(force=True) @@ -185,8 +190,10 @@ def print_sometimes(bar, probability): for i in range(5): multibar.print(f'{i}', flush=False) - multibar.update(force=True, flush=False) - multibar.update(force=True, flush=True) + # Note: MultiBar inherits from dict, so update() would be + # dict.update and insert bogus entries; render() is intended here + multibar.render(force=True, flush=False) + multibar.render(force=True, flush=True) def test_multibar_no_format() -> None: @@ -243,7 +250,142 @@ def test_multibar_threads() -> None: time.sleep(0.1) bar.update(3) time.sleep(0.1) - multibar.join() + # join() waits until all bars have finished, so finish first bar.finish() multibar.join() + multibar.join() multibar.render(force=True) + + +def test_multibar_instances_do_not_share_thread_state() -> None: + # Regression: D1 - thread primitives were class attributes shared + # between all MultiBar instances. + multibar_a = progressbar.MultiBar(fd=io.StringIO()) + multibar_b = progressbar.MultiBar(fd=io.StringIO()) + + assert multibar_a._thread_finished is not multibar_b._thread_finished + assert multibar_a._thread_closed is not multibar_b._thread_closed + assert multibar_a._print_lock is not multibar_b._print_lock + + +def test_multibar_stop_does_not_poison_new_instances() -> None: + # Regression: D1 - stop() set a class-level Event, killing the render + # loop of every MultiBar created afterwards. + multibar = progressbar.MultiBar(fd=io.StringIO()) + multibar.start() + multibar.stop(timeout=5) + + fresh = progressbar.MultiBar(fd=io.StringIO()) + assert not fresh._thread_finished.is_set() + + +def test_multibar_start_keeps_render_thread_alive() -> None: + # Regression: D6 - start() called _thread_closed.set() instead of + # clearing it, so an empty multibar's render thread exited before + # any bars could be added. + multibar = progressbar.MultiBar(fd=io.StringIO()) + multibar.start() + try: + assert not multibar._thread_closed.is_set() + assert multibar._thread is not None + multibar._thread.join(timeout=0.5) + assert multibar._thread.is_alive() + finally: + multibar.stop(timeout=5) + + +def test_multibar_flush_does_not_emit_nul_bytes() -> None: + # Regression: D3 - flush() truncated the buffer without seeking back, + # so later writes padded the gap with NUL characters. + fd = io.StringIO() + multibar = progressbar.MultiBar(fd=fd) + multibar.print('hello') + multibar.print('world') + + assert '\x00' not in fd.getvalue() + + +def test_multibar_prepend_and_append_label() -> None: + # Regression: D7 - the append_label branch was unreachable when + # prepend_label was enabled as well. + multibar = progressbar.MultiBar( + prepend_label=True, + append_label=True, + fd=io.StringIO(), + ) + bar = progressbar.ProgressBar( + max_value=N, + widgets=['x'], + fd=io.StringIO(), + ) + multibar['job'] = bar + multibar._label_bar(bar) + + assert str(bar.widgets[0]).startswith('job') + assert str(bar.widgets[-1]).startswith('job') + + +def test_multibar_join_timeout_keeps_thread_reference() -> None: + # Regression: D8 - join(timeout) dropped the thread reference even + # when the thread was still running. + multibar = progressbar.MultiBar(fd=io.StringIO()) + assert multibar['unfinished'] is not None # creates an unfinished bar + multibar.start() + try: + multibar.join(timeout=0.01) + assert multibar._thread is not None + assert multibar._thread.is_alive() + finally: + multibar.stop(timeout=5) + + +def test_multibar_exception_in_context_exits_promptly() -> None: + # Regression: D4 - an exception inside `with MultiBar()` hung forever + # in __exit__ because join() waited for bars that never finish. + holder: dict[str, progressbar.MultiBar] = {} + + def scenario() -> None: + multibar = holder['multibar'] = progressbar.MultiBar( + fd=io.StringIO(), + ) + # Pre-fix the event is shared class state which other tests may + # have set; post-fix this only touches this instance. + multibar._thread_finished.clear() + # The bar must exist before the render thread starts so the + # thread observes an unfinished bar. + multibar['a'].update(0) + with contextlib.suppress(RuntimeError), multibar: + raise RuntimeError('boom') + + worker = threading.Thread(target=scenario, daemon=True) + worker.start() + worker.join(timeout=5) + try: + assert not worker.is_alive(), '__exit__ hung on unfinished bars' + finally: + # Unstick the render thread regardless of the outcome + holder['multibar']._thread_finished.set() + + +def test_multibar_concurrent_mutation() -> None: + # Regression: D2 - the render thread iterated self.values() without a + # snapshot while other threads add/remove bars. + errors: list[threading.ExceptHookArgs] = [] + original_excepthook = threading.excepthook + threading.excepthook = errors.append + multibar = progressbar.MultiBar(fd=io.StringIO()) + # Pre-fix the event is shared class state which other tests may have + # set; post-fix this only touches this instance. + multibar._thread_finished.clear() + assert multibar['keep'] is not None # creates an unfinished bar + multibar.start() + try: + for i in range(300): + assert multibar[f'bar {i}'] is not None + del multibar[f'bar {i}'] + finally: + multibar.stop(timeout=5) + threading.excepthook = original_excepthook + + assert not errors + assert not multibar._thread or not multibar._thread.is_alive() diff --git a/tests/test_os_specific.py b/tests/test_os_specific.py new file mode 100644 index 00000000..92792d89 --- /dev/null +++ b/tests/test_os_specific.py @@ -0,0 +1,17 @@ +import io +import os +import sys + +import pytest + +if os.name == 'nt': + pytest.skip('POSIX-only tests', allow_module_level=True) + +from progressbar.terminal import os_specific + + +def test_getch_with_non_tty_stdin(monkeypatch) -> None: + # Regression: E6 - getch() crashed with termios.error (or + # io.UnsupportedOperation) when stdin was not a tty. + monkeypatch.setattr(sys, 'stdin', io.StringIO('x')) + assert os_specific.getch() == 'x' diff --git a/tests/test_progressbar.py b/tests/test_progressbar.py index 23270a46..4ba826f4 100644 --- a/tests/test_progressbar.py +++ b/tests/test_progressbar.py @@ -1,11 +1,17 @@ import contextlib +import gc +import io import os +import signal +import sys import time +from datetime import timedelta import original_examples # type: ignore import pytest import progressbar +from progressbar import utils # Import hack to allow for parallel Tox try: @@ -77,3 +83,99 @@ def test_negative_maximum() -> None: progressbar.ProgressBar(max_value=-1) as progress, ): progress.start() + + +def test_elapsed_data_spans_days() -> None: + # Regression: A1 - days_elapsed was computed from timedelta.seconds, + # which only contains the sub-day component. + bar = progressbar.ProgressBar( + max_value=10, fd=io.StringIO(), term_width=60 + ) + bar.start() + bar.start_time -= timedelta(days=2, hours=3, minutes=4) + data = bar.data() + + expected_days = 2 + (3 * 3600 + 4 * 60) / 86400 + assert data['days_elapsed'] == pytest.approx(expected_days, abs=0.01) + + +def test_restart_after_finish_writes_final_newline() -> None: + # Regression: A2 - init() did not reset _finished, so a reused bar + # never wrote its final newline (and never flushed) again. + bar = progressbar.ProgressBar( + max_value=5, fd=io.StringIO(), term_width=60, line_breaks=False + ) + bar.start() + bar.update(5) + bar.finish() + assert bar.fd.getvalue().endswith('\n') + + bar.fd = io.StringIO() + bar.start() + assert not bar._finished + bar.update(5) + bar.finish() + assert bar.fd.getvalue().endswith('\n') + + +def test_repeated_finish_keeps_capturing_balanced() -> None: + # Regression: A2 - every finish() call decremented the global + # capturing counter, even when the bar was already finished. + baseline = utils.streams.capturing + try: + bar = progressbar.ProgressBar( + max_value=5, fd=io.StringIO(), term_width=60 + ) + bar.start() + bar.update(5) + bar.finish() + bar.finish() + assert utils.streams.capturing == baseline + finally: + utils.streams.capturing = baseline + + +def test_del_suppresses_finish_errors(monkeypatch) -> None: + # Regression: A4 - __del__ only suppressed AttributeError; any other + # exception from finish() leaked out of the finalizer (reported via + # sys.unraisablehook during garbage collection). + class ExplodingIO(io.StringIO): + def write(self, value: str) -> int: + raise ValueError('I/O operation on closed file') + + unraisable: list[object] = [] + monkeypatch.setattr(sys, 'unraisablehook', unraisable.append) + + bar = progressbar.ProgressBar(max_value=5, fd=io.StringIO(), term_width=60) + bar.start() + bar.fd = ExplodingIO() + del bar + gc.collect() + + assert not unraisable + + +@pytest.mark.skipif(os.name == 'nt', reason='SIGWINCH is POSIX-only') +def test_sigwinch_restored_with_overlapping_bars() -> None: + # Regression: A5 - with two live bars, finishing them in creation + # order left a dangling handler installed. + original = signal.getsignal(signal.SIGWINCH) + try: + bar1 = progressbar.ProgressBar(max_value=5, fd=io.StringIO()) + bar1.start() + bar2 = progressbar.ProgressBar(max_value=5, fd=io.StringIO()) + bar2.start() + + # A resize signal is dispatched to all live bars + signal.raise_signal(signal.SIGWINCH) + assert isinstance(bar1.term_width, int) + assert isinstance(bar2.term_width, int) + + bar1.update(5) + bar1.finish() + bar2.update(5) + bar2.finish() + + assert signal.getsignal(signal.SIGWINCH) is original + finally: + signal.signal(signal.SIGWINCH, original) diff --git a/tests/test_progressbar_command.py b/tests/test_progressbar_command.py index 3dd82d60..a277f8ca 100644 --- a/tests/test_progressbar_command.py +++ b/tests/test_progressbar_command.py @@ -2,6 +2,7 @@ import pytest +import progressbar import progressbar.__main__ as main @@ -142,3 +143,66 @@ def test_main_bytes_output(monkeypatch, tmp_path) -> None: def test_missing_input(tmp_path) -> None: with pytest.raises(SystemExit): main.main([str(tmp_path / 'output')]) + + +@pytest.fixture +def recorded_bars(monkeypatch): + created = [] + + class RecordingProgressBar(progressbar.ProgressBar): + def __init__(self, **kwargs) -> None: + created.append(self) + self.init_kwargs = kwargs + super().__init__(**kwargs) + + monkeypatch.setattr(main.progressbar, 'ProgressBar', RecordingProgressBar) + return created + + +def test_main_passes_widgets(tmp_path, recorded_bars) -> None: + # Regression: E2 - the configured widgets were built but never passed + # to the progress bar. + file = tmp_path / 'data.bin' + file.write_bytes(b'x' * 1024) + main.main([str(file), '-o', str(tmp_path / 'out.bin')]) + + assert recorded_bars + assert recorded_bars[0].init_kwargs.get('widgets') + + +def test_main_line_mode_counts_bytes(tmp_path, recorded_bars) -> None: + # Regression: E1 - line mode counted characters while the maximum was + # measured in bytes, so multi-byte content never reached 100%. + file = tmp_path / 'data.txt' + file.write_text(('é' * 99 + '\n') * 5, encoding='utf-8') + size = file.stat().st_size + + main.main(['-l', str(file), '-o', str(tmp_path / 'out.txt')]) + + assert recorded_bars[0].value == size + + +def test_main_broken_pipe(tmp_path, monkeypatch) -> None: + # Regression: E3 - an early-closing downstream pipe raised an + # unhandled BrokenPipeError. + file = tmp_path / 'data.bin' + file.write_bytes(b'x' * 1024) + + class BrokenPipeIO(io.BytesIO): + def write(self, data) -> int: + raise BrokenPipeError + + monkeypatch.setattr( + main, '_get_output_stream', lambda *args: BrokenPipeIO() + ) + main.main([str(file)]) # must not raise + + +def test_main_empty_file_has_known_size(tmp_path, recorded_bars) -> None: + # Regression: E8 - a zero-byte input flipped the bar into + # unknown-length mode although the file size was known. + file = tmp_path / 'empty.bin' + file.write_bytes(b'') + main.main([str(file), '-o', str(tmp_path / 'out.bin')]) + + assert recorded_bars[0].init_kwargs.get('max_value') == 0 diff --git a/tests/test_samples.py b/tests/test_samples.py index 2881fac0..36ce4ad5 100644 --- a/tests/test_samples.py +++ b/tests/test_samples.py @@ -110,3 +110,20 @@ def test_timedelta_no_update() -> None: bar.update(3) assert samples_widget(bar, None, True) == (timedelta(0, 1), 1) assert samples_widget(bar, None, False)[1] == [1, 2] + + +def test_timedelta_samples_evicted_when_value_stalls() -> None: + # Regression: B7 - eviction of expired samples additionally required + # the value to increase, so a stalled bar grew its window unboundedly. + samples_widget = widgets.SamplesMixin(samples=timedelta(seconds=2)) + bar = progressbar.ProgressBar(widgets=[samples_widget]) + samples_widget.INTERVAL = timedelta(0) + start = datetime(2000, 1, 1) + + bar.value = 1 + for i in range(10): + bar.last_update_time = start + timedelta(seconds=i) + samples_widget(bar, None) + + sample_times = samples_widget.get_sample_times(bar, None) + assert sample_times[-1] - sample_times[0] <= timedelta(seconds=3) diff --git a/tests/test_speed.py b/tests/test_speed.py index 4f53639e..928b9d0c 100644 --- a/tests/test_speed.py +++ b/tests/test_speed.py @@ -6,7 +6,9 @@ @pytest.mark.parametrize( 'total_seconds_elapsed,value,expected', [ - (1, 0, ' 0.0 s/B'), + # Zero progress means no data yet, so the regular format is used + # instead of the inverse (seconds per unit) format + (1, 0, ' 0.0 B/s'), (1, 0.01, '100.0 s/B'), (1, 0.1, ' 0.1 B/s'), (1, 1, ' 1.0 B/s'), diff --git a/tests/test_stream.py b/tests/test_stream.py index e32bbd5c..194310c2 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -161,3 +161,23 @@ def test_last_line_stream_methods() -> None: # Test close method stream.close() + + +def test_line_offset_stream_wrapper_write_length_and_flush() -> None: + # Regression: C5/C6 - write() returned the newline-stripped length + # and flush() never reached the wrapped stream. + class CountingIO(io.StringIO): + def __init__(self) -> None: + super().__init__() + self.flushes = 0 + + def flush(self) -> None: + self.flushes += 1 + super().flush() + + target = CountingIO() + wrapper = progressbar.LineOffsetStreamWrapper(lines=2, stream=target) + + written = wrapper.write('hello\n') + assert written == 6 + assert target.flushes >= 1 diff --git a/tests/test_utils.py b/tests/test_utils.py index e347acdb..fd8ab866 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,9 +1,12 @@ +import contextlib import io +import sys import pytest import progressbar import progressbar.env +from progressbar import utils @pytest.mark.parametrize( @@ -112,3 +115,90 @@ def raise_error(): fd.isatty = raise_error assert not progressbar.env.is_ansi_terminal(fd) + + +def test_stream_wrapper_unwrap_restores_excepthook() -> None: + # Regression: C7 - unwrap_stdout/unwrap_stderr left the custom + # excepthook installed forever. + wrapper = utils.StreamWrapper() + hook_before = sys.excepthook + wrapper.wrap_stdout() + try: + wrapper.unwrap_stdout() + assert sys.excepthook is hook_before + + # With both streams wrapped, the hook is only restored once the + # last stream is unwrapped + wrapper.wrap_stdout() + wrapper.wrap_stderr() + wrapper.unwrap_stdout() + # Bound methods are recreated on attribute access, so compare + # with == instead of `is` + assert sys.excepthook == wrapper.excepthook + wrapper.unwrap_stderr() + assert sys.excepthook is hook_before + + # Same in reverse order: stderr first, then stdout + wrapper.wrap_stdout() + wrapper.wrap_stderr() + wrapper.unwrap_stderr() + assert sys.excepthook == wrapper.excepthook + wrapper.unwrap_stdout() + assert sys.excepthook is hook_before + finally: + sys.excepthook = wrapper.original_excepthook + sys.stdout = wrapper.original_stdout + sys.stderr = wrapper.original_stderr + + +def test_stream_wrapper_flush_unsupported_keeps_int_counter() -> None: + # Regression: C2 - the unsupported-operation handler assigned False + # to the int wrap counter. + class UnsupportedIO(io.StringIO): + def write(self, value: str) -> int: + raise io.UnsupportedOperation('write') + + wrapper = utils.StreamWrapper() + wrapper.stdout = utils.WrappingIO(UnsupportedIO()) + wrapper.stdout.buffer.write('x') + wrapper.wrapped_stdout = 1 + wrapper.flush() + + assert wrapper.wrapped_stdout == 0 + assert type(wrapper.wrapped_stdout) is int + + +def test_wrapping_io_flush_does_not_duplicate_after_error() -> None: + # Regression: C3 - a failed target.write() left the buffer intact, so + # the next flush wrote the same data again. + class FlakyIO(io.StringIO): + def __init__(self) -> None: + super().__init__() + self.fail_once = True + + def write(self, value: str) -> int: + result = super().write(value) + if self.fail_once: + self.fail_once = False + raise OSError('disk full') + return result + + target = FlakyIO() + wrapped = utils.WrappingIO(target) + wrapped.buffer.write('hello') + with pytest.raises(OSError): + wrapped._flush() + with contextlib.suppress(OSError): + wrapped._flush() + + assert target.getvalue().count('hello') == 1 + + +def test_wrapping_io_flush_with_closed_target() -> None: + # Regression: C4 - flushing into an already closed target (e.g. from + # the atexit hook at interpreter shutdown) raised ValueError. + target = io.StringIO() + wrapped = utils.WrappingIO(target) + wrapped.buffer.write('data') + target.close() + wrapped._flush() # must not raise diff --git a/tests/test_widgets.py b/tests/test_widgets.py index 7ab3d88e..1017eb4b 100644 --- a/tests/test_widgets.py +++ b/tests/test_widgets.py @@ -1,6 +1,8 @@ from __future__ import annotations +import io import time +from datetime import timedelta import pytest @@ -204,3 +206,44 @@ def test_all_widgets_max_width(max_width, term_width) -> None: assert widget == '' else: assert widget != '' + + +def test_eta_respects_min_value() -> None: + # Regression: B3 - the items/second rate divided by the raw value + # instead of the progress relative to min_value. + bar = progressbar.ProgressBar( + min_value=50, max_value=100, fd=io.StringIO(), term_width=60 + ) + bar.start() + bar.update(75) + bar.start_time -= timedelta(seconds=30) + data = bar.data() + progressbar.ETA()(bar, data) + + # 25 of 50 items done in 30 seconds -> 30 seconds remaining + assert data['eta_seconds'] == pytest.approx(30, rel=0.05) + + +def test_multi_progress_bar_zero_total() -> None: + # Regression: B5 - a (value, 0) tuple raised ZeroDivisionError. + widget = progressbar.MultiProgressBar('jobs') + bar = progressbar.ProgressBar( + widgets=[widget], max_value=10, fd=io.StringIO(), term_width=60 + ) + ranges = widget.get_values(bar, {'variables': {'jobs': [(3, 0)]}}) + assert sum(ranges) > 0 + + +def test_bar_widget_respects_min_value() -> None: + # Regression: B9 - the fill width was computed from the raw value, so + # a bar at 0% progress with min_value > 0 rendered partially full. + bar = progressbar.ProgressBar( + min_value=50, + max_value=100, + widgets=[progressbar.Bar()], + fd=io.StringIO(), + term_width=60, + ) + bar.start() + assert '#' not in bar.fd.getvalue() + bar.finish(dirty=True) diff --git a/tests/test_windows.py b/tests/test_windows.py index 4c95fae4..5823f62a 100644 --- a/tests/test_windows.py +++ b/tests/test_windows.py @@ -88,3 +88,30 @@ def main() -> int: if __name__ == '__main__': main() + + +def test_kernel32_argtypes() -> None: + # Regression: E4 - missing argtypes silently truncated 64-bit HANDLE + # values to 32-bit C ints. + from progressbar.terminal.os_specific import windows + + assert windows._GetConsoleMode.argtypes is not None + assert windows._SetConsoleMode.argtypes is not None + assert windows._GetStdHandle.argtypes is not None + assert windows._ReadConsoleInput.argtypes is not None + + +def test_getch_reads_first_event(monkeypatch) -> None: + # Regression: E5 - getch() unconditionally decoded the second buffer + # entry, ignoring how many events were actually read. + from progressbar.terminal.os_specific import windows + + def fake_read_console_input(handle, buffer, length, events_read): + buffer[0].EventType = 1 # KEY_EVENT + buffer[0].Event.KeyEvent.bKeyDown = True + buffer[0].Event.KeyEvent.uChar.AsciiChar = b'a' + events_read._obj.value = 1 + return 1 + + monkeypatch.setattr(windows, '_ReadConsoleInput', fake_read_console_input) + assert windows.getch() == 'a'