diff --git a/osism/commands/reconciler.py b/osism/commands/reconciler.py index d559d8e6..4f3fdbd4 100644 --- a/osism/commands/reconciler.py +++ b/osism/commands/reconciler.py @@ -64,6 +64,7 @@ def take_action(self, parsed_args): logger.error( f"Timeout while waiting for further output of task {t.task_id} (sync inventory)" ) + return 1 else: logger.info( f"Task {t.task_id} (sync inventory) is running in background. No more output." diff --git a/osism/tasks/reconciler.py b/osism/tasks/reconciler.py index 1764c8f0..8cab5f1a 100644 --- a/osism/tasks/reconciler.py +++ b/osism/tasks/reconciler.py @@ -5,6 +5,7 @@ import subprocess from celery import Celery +from celery.exceptions import MaxRetriesExceededError, Retry from loguru import logger from osism import settings, utils from osism.tasks import Config @@ -12,6 +13,11 @@ app = Celery("reconciler") app.config_from_object(Config) +LOCK_RETRY_MAX_RETRIES = 5 +LOCK_RETRY_DELAY = 5 +LOCK_TIMEOUT_RC = 1 +RECONCILER_LOCK_KEY = "lock_osism_tasks_reconciler_execution" + @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): @@ -24,52 +30,132 @@ def setup_periodic_tasks(sender, **kwargs): ) -@app.task(bind=True, name="osism.tasks.reconciler.run") -def run(self, publish=True): - # Check if tasks are locked before execution - utils.check_task_lock_and_exit() +def _push_task_output_best_effort(task_id, line): + try: + utils.push_task_output(task_id, line) + except Exception: + logger.exception(f"Failed to publish output for reconciler task {task_id}") - lock = utils.create_redlock( - key="lock_osism_tasks_reconciler_run", - auto_release_time=60, - ) - if lock.acquire(timeout=20): - logger.info("RUN /run.sh") +def _finish_task_output_best_effort(task_id, rc): + try: + utils.finish_task_output(task_id, rc=rc) + except Exception: + logger.exception(f"Failed to finish output for reconciler task {task_id}") - env = os.environ.copy() - p = subprocess.Popen( +def _publish_failure_best_effort(task_id, exc): + _push_task_output_best_effort(task_id, f"Reconciler failed: {exc}\n") + _finish_task_output_best_effort(task_id, 1) + + +def _release_lock_best_effort(lock): + from pottery import ReleaseUnlockedLock + + try: + lock.release() + except ReleaseUnlockedLock: + logger.warning( + "Lock auto-released before explicit release (auto_release_time exceeded)" + ) + except Exception: + logger.exception("Failed to release reconciler lock") + + +def _terminate_process_best_effort(process): + try: + if process.poll() is None: + process.kill() + process.wait() + except Exception: + logger.exception("Failed to terminate reconciler subprocess") + + +def _retry_after_lock_timeout(task, publish): + if publish and task.request.retries < LOCK_RETRY_MAX_RETRIES: + _push_task_output_best_effort( + task.request.id, + f"Reconciler busy; retrying lock acquisition in {LOCK_RETRY_DELAY}s\n", + ) + + try: + raise task.retry(countdown=LOCK_RETRY_DELAY) + except MaxRetriesExceededError: + message = ( + "Reconciler lock could not be acquired after " + f"{LOCK_RETRY_MAX_RETRIES + 1} attempts\n" + ) + logger.error(message.rstrip()) + if publish: + _push_task_output_best_effort(task.request.id, message) + _finish_task_output_best_effort(task.request.id, LOCK_TIMEOUT_RC) + raise + + +def _execute_reconciler(task, publish): + lock = None + lock_acquired = False + process = None + + try: + utils.check_task_lock_and_exit() + + lock = utils.create_redlock( + key=RECONCILER_LOCK_KEY, + auto_release_time=60, + ) + + if not lock.acquire(timeout=20): + return _retry_after_lock_timeout(task, publish) + + lock_acquired = True + logger.info("RUN /run.sh") + + process = subprocess.Popen( "/run.sh", shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - env=env, + env=os.environ.copy(), ) if publish: - for line in io.TextIOWrapper(p.stdout, encoding="utf-8"): - utils.push_task_output(self.request.id, line) + for line in io.TextIOWrapper(process.stdout, encoding="utf-8"): + utils.push_task_output(task.request.id, line) - rc = p.wait(timeout=60) + rc = process.wait(timeout=60) if publish: - utils.finish_task_output(self.request.id, rc=rc) - - from pottery import ReleaseUnlockedLock - - try: - lock.release() - except ReleaseUnlockedLock: - logger.warning( - "Lock auto-released before explicit release (auto_release_time exceeded)" - ) + utils.finish_task_output(task.request.id, rc=rc) + + return rc + except (Retry, MaxRetriesExceededError): + raise + except BaseException as exc: + if process is not None: + _terminate_process_best_effort(process) + logger.exception(f"Reconciler task {task.request.id} failed") + if publish: + _publish_failure_best_effort(task.request.id, exc) + raise + finally: + if lock_acquired: + _release_lock_best_effort(lock) + + +@app.task( + bind=True, + name="osism.tasks.reconciler.run", + max_retries=LOCK_RETRY_MAX_RETRIES, +) +def run(self, publish=True): + return _execute_reconciler(self, publish) @app.task(bind=True, name="osism.tasks.reconciler.run_on_change") def run_on_change(self): lock = utils.create_redlock( - key="lock_osism_tasks_reconciler_run_on_change", + key=RECONCILER_LOCK_KEY, auto_release_time=60, ) diff --git a/tests/unit/commands/test_reconciler.py b/tests/unit/commands/test_reconciler.py new file mode 100644 index 00000000..356148ed --- /dev/null +++ b/tests/unit/commands/test_reconciler.py @@ -0,0 +1,22 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for the ``osism reconciler`` commands.""" + +from unittest.mock import MagicMock, patch + +from osism.commands import reconciler + + +def test_sync_returns_nonzero_on_task_timeout(): + cmd = reconciler.Sync(MagicMock(), MagicMock()) + parsed_args = cmd.get_parser("test").parse_args([]) + + with patch("osism.commands.reconciler.utils.check_task_lock_and_exit"), patch( + "osism.tasks.reconciler.run.delay", return_value=MagicMock() + ), patch( + "osism.commands.reconciler.utils.fetch_task_output", + side_effect=TimeoutError, + ): + result = cmd.take_action(parsed_args) + + assert result == 1 diff --git a/tests/unit/tasks/test_reconciler.py b/tests/unit/tasks/test_reconciler.py new file mode 100644 index 00000000..133bd105 --- /dev/null +++ b/tests/unit/tasks/test_reconciler.py @@ -0,0 +1,212 @@ +# SPDX-License-Identifier: Apache-2.0 + +from subprocess import TimeoutExpired +from unittest.mock import MagicMock + +import pytest +from celery.exceptions import MaxRetriesExceededError, Retry + +from osism.tasks import reconciler + + +def _task(*, retries=0, retry_exception=None): + task = MagicMock() + task.request.id = "task-1" + task.request.retries = retries + task.retry.side_effect = retry_exception or Retry() + return task + + +def test_lock_timeout_retries_same_task_with_status(mocker): + task = _task() + push = mocker.patch("osism.tasks.reconciler.utils.push_task_output") + finish = mocker.patch("osism.tasks.reconciler.utils.finish_task_output") + + with pytest.raises(Retry): + reconciler._retry_after_lock_timeout(task, publish=True) + + push.assert_called_once_with( + "task-1", + f"Reconciler busy; retrying lock acquisition in {reconciler.LOCK_RETRY_DELAY}s\n", + ) + finish.assert_not_called() + task.retry.assert_called_once_with(countdown=reconciler.LOCK_RETRY_DELAY) + + +def test_retry_status_publication_failure_does_not_prevent_retry(mocker): + task = _task() + mocker.patch( + "osism.tasks.reconciler.utils.push_task_output", + side_effect=RuntimeError("redis unavailable"), + ) + with pytest.raises(Retry): + reconciler._retry_after_lock_timeout(task, publish=True) + + task.retry.assert_called_once_with(countdown=reconciler.LOCK_RETRY_DELAY) + + +def test_lock_retry_exhaustion_publishes_failure_and_raises(mocker): + task = _task( + retries=reconciler.LOCK_RETRY_MAX_RETRIES, + retry_exception=MaxRetriesExceededError("exhausted"), + ) + push = mocker.patch("osism.tasks.reconciler.utils.push_task_output") + finish = mocker.patch("osism.tasks.reconciler.utils.finish_task_output") + with pytest.raises(MaxRetriesExceededError): + reconciler._retry_after_lock_timeout(task, publish=True) + + push.assert_called_once() + assert "could not be acquired" in push.call_args.args[1] + finish.assert_called_once_with("task-1", rc=reconciler.LOCK_TIMEOUT_RC) + + +def test_publish_false_retries_without_stream_output(mocker): + task = _task() + push = mocker.patch("osism.tasks.reconciler.utils.push_task_output") + finish = mocker.patch("osism.tasks.reconciler.utils.finish_task_output") + with pytest.raises(Retry): + reconciler._retry_after_lock_timeout(task, publish=False) + + push.assert_not_called() + finish.assert_not_called() + + +def test_publish_false_exhaustion_raises_without_stream_output(mocker): + task = _task(retry_exception=MaxRetriesExceededError("exhausted")) + push = mocker.patch("osism.tasks.reconciler.utils.push_task_output") + finish = mocker.patch("osism.tasks.reconciler.utils.finish_task_output") + with pytest.raises(MaxRetriesExceededError): + reconciler._retry_after_lock_timeout(task, publish=False) + + push.assert_not_called() + finish.assert_not_called() + + +def test_execute_reconciler_publishes_success_and_releases_lock(mocker): + task = _task() + lock = MagicMock() + lock.acquire.return_value = True + process = MagicMock() + process.stdout = [b"line\n"] + process.wait.return_value = 0 + mocker.patch("osism.tasks.reconciler.utils.check_task_lock_and_exit") + mocker.patch("osism.tasks.reconciler.utils.create_redlock", return_value=lock) + mocker.patch("osism.tasks.reconciler.subprocess.Popen", return_value=process) + mocker.patch("osism.tasks.reconciler.io.TextIOWrapper", return_value=["line\n"]) + push = mocker.patch("osism.tasks.reconciler.utils.push_task_output") + finish = mocker.patch("osism.tasks.reconciler.utils.finish_task_output") + + result = reconciler._execute_reconciler(task, publish=True) + + assert result == 0 + push.assert_called_once_with("task-1", "line\n") + finish.assert_called_once_with("task-1", rc=0) + lock.release.assert_called_once_with() + + +def test_execute_reconciler_popen_failure_publishes_and_releases(mocker): + task = _task() + lock = MagicMock() + lock.acquire.return_value = True + mocker.patch("osism.tasks.reconciler.utils.check_task_lock_and_exit") + mocker.patch("osism.tasks.reconciler.utils.create_redlock", return_value=lock) + mocker.patch( + "osism.tasks.reconciler.subprocess.Popen", + side_effect=OSError("cannot start"), + ) + push = mocker.patch("osism.tasks.reconciler.utils.push_task_output") + finish = mocker.patch("osism.tasks.reconciler.utils.finish_task_output") + + with pytest.raises(OSError, match="cannot start"): + reconciler._execute_reconciler(task, publish=True) + + assert "cannot start" in push.call_args.args[1] + finish.assert_called_once_with("task-1", rc=1) + lock.release.assert_called_once_with() + + +def test_execute_reconciler_timeout_kills_process(mocker): + task = _task() + lock = MagicMock() + lock.acquire.return_value = True + process = MagicMock() + process.stdout = [] + process.wait.side_effect = [TimeoutExpired("/run.sh", 60), 0] + process.poll.return_value = None + mocker.patch("osism.tasks.reconciler.utils.check_task_lock_and_exit") + mocker.patch("osism.tasks.reconciler.utils.create_redlock", return_value=lock) + mocker.patch("osism.tasks.reconciler.subprocess.Popen", return_value=process) + mocker.patch("osism.tasks.reconciler.io.TextIOWrapper", return_value=[]) + mocker.patch("osism.tasks.reconciler.utils.push_task_output") + finish = mocker.patch("osism.tasks.reconciler.utils.finish_task_output") + + with pytest.raises(TimeoutExpired): + reconciler._execute_reconciler(task, publish=True) + + process.kill.assert_called_once_with() + finish.assert_called_once_with("task-1", rc=1) + lock.release.assert_called_once_with() + + +def test_execute_reconciler_task_lock_system_exit_publishes_failure(mocker): + task = _task() + mocker.patch( + "osism.tasks.reconciler.utils.check_task_lock_and_exit", + side_effect=SystemExit(1), + ) + create_lock = mocker.patch("osism.tasks.reconciler.utils.create_redlock") + finish = mocker.patch("osism.tasks.reconciler.utils.finish_task_output") + + with pytest.raises(SystemExit): + reconciler._execute_reconciler(task, publish=True) + + create_lock.assert_not_called() + finish.assert_called_once_with("task-1", rc=1) + + +def test_execute_reconciler_does_not_convert_retry_to_failure(mocker): + task = _task() + lock = MagicMock() + lock.acquire.return_value = False + mocker.patch("osism.tasks.reconciler.utils.check_task_lock_and_exit") + mocker.patch("osism.tasks.reconciler.utils.create_redlock", return_value=lock) + mocker.patch( + "osism.tasks.reconciler._retry_after_lock_timeout", + side_effect=Retry(), + ) + finish = mocker.patch("osism.tasks.reconciler.utils.finish_task_output") + + with pytest.raises(Retry): + reconciler._execute_reconciler(task, publish=True) + + finish.assert_not_called() + lock.release.assert_not_called() + + +def test_explicit_and_periodic_runs_use_shared_lock(mocker): + task = _task() + lock = MagicMock() + lock.acquire.return_value = False + create_lock = mocker.patch( + "osism.tasks.reconciler.utils.create_redlock", + return_value=lock, + ) + mocker.patch("osism.tasks.reconciler.utils.check_task_lock_and_exit") + mocker.patch( + "osism.tasks.reconciler._retry_after_lock_timeout", + side_effect=Retry(), + ) + + with pytest.raises(Retry): + reconciler._execute_reconciler(task, publish=True) + reconciler.run_on_change.run() + + execution_locks = [ + item + for item in create_lock.call_args_list + if item.kwargs.get("auto_release_time") == 60 + ] + assert len(execution_locks) == 2 + assert all( + item.kwargs["key"] == reconciler.RECONCILER_LOCK_KEY for item in execution_locks + )