diff --git a/openfeature/_event_support.py b/openfeature/_event_support.py index 00557ead..3928be3e 100644 --- a/openfeature/_event_support.py +++ b/openfeature/_event_support.py @@ -1,8 +1,11 @@ from __future__ import annotations +import atexit import threading import typing from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor +from logging import getLogger from openfeature.event import ( EventDetails, @@ -16,6 +19,10 @@ from openfeature.client import OpenFeatureClient +logger = getLogger("openfeature") +_event_executor = ThreadPoolExecutor(thread_name_prefix="openfeature-event-handler") +atexit.register(_event_executor.shutdown, wait=True) + _global_lock = threading.RLock() _global_handlers: dict[ProviderEvent, list[EventHandler]] = defaultdict(list) @@ -29,14 +36,22 @@ def run_client_handlers( client: OpenFeatureClient, event: ProviderEvent, details: EventDetails ) -> None: with _client_lock: - for handler in _client_handlers[client][event]: - handler(details) + handlers_by_event = _client_handlers.get(client) + if handlers_by_event is None: + return + + handlers = tuple(handlers_by_event.get(event, ())) + + for handler in handlers: + _submit_handler(handler, details) def run_global_handlers(event: ProviderEvent, details: EventDetails) -> None: with _global_lock: - for handler in _global_handlers[event]: - handler(details) + handlers = tuple(_global_handlers.get(event, ())) + + for handler in handlers: + _submit_handler(handler, details) def add_client_handler( @@ -83,9 +98,12 @@ def run_handlers_for_provider( run_global_handlers(event, details) # run the handlers for clients associated to this provider with _client_lock: - for client in _client_handlers: - if client.provider == provider: - run_client_handlers(client, event, details) + clients = tuple( + client for client in _client_handlers if client.provider == provider + ) + + for client in clients: + run_client_handlers(client, event, details) def _run_immediate_handler( @@ -98,7 +116,20 @@ def _run_immediate_handler( ProviderStatus.STALE: ProviderEvent.PROVIDER_STALE, } if event == status_to_event.get(client.get_provider_status()): - handler(EventDetails(provider_name=client.provider.get_metadata().name)) + _submit_handler( + handler, EventDetails(provider_name=client.provider.get_metadata().name) + ) + + +def _submit_handler(handler: EventHandler, details: EventDetails) -> None: + _event_executor.submit(_run_handler, handler, details) + + +def _run_handler(handler: EventHandler, details: EventDetails) -> None: + try: + handler(details) + except Exception: + logger.exception("Unhandled exception in OpenFeature event handler") def clear() -> None: diff --git a/tests/test_api.py b/tests/test_api.py index b7945cbb..cdb077fe 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,4 +1,5 @@ import threading +import time from unittest.mock import MagicMock import pytest @@ -32,6 +33,15 @@ ) +def wait_for_mock_call(mock: MagicMock, timeout: float = 1.0) -> None: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if mock.call_count: + return + + time.sleep(0.01) + + def test_should_not_raise_exception_with_noop_client(): # Given # No provider has been set @@ -295,6 +305,10 @@ def test_provider_events(): # Then # NOTE: provider_ready is called immediately after adding the handler + wait_for_mock_call(spy.provider_ready) + wait_for_mock_call(spy.provider_configuration_changed) + wait_for_mock_call(spy.provider_error) + wait_for_mock_call(spy.provider_stale) spy.provider_ready.assert_called_once() spy.provider_configuration_changed.assert_called_once_with(details) spy.provider_error.assert_called_once_with(details) @@ -335,6 +349,7 @@ def test_handlers_attached_to_provider_already_in_associated_state_should_run_im add_handler(ProviderEvent.PROVIDER_READY, spy.provider_ready) # Then + wait_for_mock_call(spy.provider_ready) spy.provider_ready.assert_called_once() @@ -344,12 +359,14 @@ def test_provider_ready_handlers_run_if_provider_initialize_function_terminates_ spy = MagicMock() add_handler(ProviderEvent.PROVIDER_READY, spy.provider_ready) + wait_for_mock_call(spy.provider_ready) spy.reset_mock() # reset the mock to avoid counting the immediate call on subscribe # When set_provider_and_wait(provider) # Then + wait_for_mock_call(spy.provider_ready) spy.provider_ready.assert_called_once() @@ -366,6 +383,7 @@ def test_provider_error_handlers_run_if_provider_initialize_function_terminates_ set_provider_and_wait(provider) # Then + wait_for_mock_call(spy.provider_error) spy.provider_error.assert_called_once() diff --git a/tests/test_client.py b/tests/test_client.py index 25819d4d..63453c99 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,4 +1,5 @@ import inspect +import threading import time import types import uuid @@ -7,7 +8,7 @@ import pytest -from openfeature import api +from openfeature import _event_support, api from openfeature.api import ( add_hooks, clear_hooks, @@ -29,6 +30,15 @@ from openfeature.transaction_context import ContextVarsTransactionContextPropagator +def wait_for_mock_call(mock: MagicMock, timeout: float = 1.0) -> None: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if mock.call_count: + return + + time.sleep(0.01) + + @pytest.mark.parametrize( "flag_type, default_value, get_method", ( @@ -467,6 +477,10 @@ def emit_all_events(provider): # Then # NOTE: provider_ready is called immediately after adding the handler + wait_for_mock_call(spy.provider_ready) + wait_for_mock_call(spy.provider_configuration_changed) + wait_for_mock_call(spy.provider_error) + wait_for_mock_call(spy.provider_stale) spy.provider_ready.assert_called_once() spy.provider_configuration_changed.assert_called_once_with(details) spy.provider_error.assert_called_once_with(details) @@ -525,9 +539,25 @@ def test_provider_event_late_binding(): other_provider.emit_provider_configuration_changed(other_provider_details) # Then + wait_for_mock_call(spy.provider_configuration_changed) spy.provider_configuration_changed.assert_called_once_with(details) +def test_run_client_handlers_without_registered_handlers_is_noop(): + provider = NoOpProvider() + set_provider(provider) + client = get_client("client-without-handlers") + details = EventDetails(provider_name=provider.get_metadata().name) + + assert client not in _event_support._client_handlers + + _event_support.run_client_handlers( + client, ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details + ) + + assert client not in _event_support._client_handlers + + # Requirement 5.1.4, Requirement 5.1.5 def test_provider_event_handler_exception(): # Given @@ -545,6 +575,7 @@ def test_provider_event_handler_exception(): ) # Then + wait_for_mock_call(spy.provider_error) spy.provider_error.assert_called_once_with( EventDetails( flags_changed=None, @@ -556,6 +587,68 @@ def test_provider_event_handler_exception(): ) +def test_provider_event_handler_exception_does_not_stop_subsequent_handlers(): + # Given + provider = NoOpProvider() + set_provider(provider) + + spy = MagicMock() + handler_called = threading.Event() + + raising_handler = MagicMock(side_effect=RuntimeError("handler failed")) + + def recording_handler(details): + spy.provider_error(details) + handler_called.set() + + client = get_client() + client.add_handler(ProviderEvent.PROVIDER_ERROR, raising_handler) + client.add_handler(ProviderEvent.PROVIDER_ERROR, recording_handler) + + details = ProviderEventDetails(error_code=ErrorCode.GENERAL, message="some_error") + expected_details = EventDetails.from_provider_event_details( + provider.get_metadata().name, details + ) + + # When + provider.emit_provider_error(details) + + # Then + assert handler_called.wait(timeout=1) + raising_handler.assert_called_once_with(expected_details) + spy.provider_error.assert_called_once_with(expected_details) + + +def test_provider_event_handlers_do_not_block_emitter(): + # Given + provider = NoOpProvider() + set_provider(provider) + + handler_started = threading.Event() + release_handler = threading.Event() + handler_finished = threading.Event() + + def slow_handler(details): + handler_started.set() + release_handler.wait(timeout=1) + handler_finished.set() + + client = get_client() + client.add_handler(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, slow_handler) + + # When + start_time = time.perf_counter() + provider.emit_provider_configuration_changed(ProviderEventDetails()) + elapsed = time.perf_counter() - start_time + + # Then + assert handler_started.wait(timeout=1) + # emit must return well before the handler's blocking wait (1s) would finish + assert elapsed < 0.5 + release_handler.set() + assert handler_finished.wait(timeout=1) + + def test_client_handlers_thread_safety(): provider = NoOpProvider() set_provider(provider)