Skip to content

Commit fb4e9a1

Browse files
Isolate provider event handlers
Signed-off-by: Lucas-FManager <265058144+Lucas-FManager@users.noreply.github.com>
1 parent 5ff1cf0 commit fb4e9a1

3 files changed

Lines changed: 132 additions & 8 deletions

File tree

openfeature/_event_support.py

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import threading
44
import typing
55
from collections import defaultdict
6+
from concurrent.futures import ThreadPoolExecutor
7+
from logging import getLogger
68

79
from openfeature.event import (
810
EventDetails,
@@ -16,6 +18,9 @@
1618
from openfeature.client import OpenFeatureClient
1719

1820

21+
_logger = getLogger(__name__)
22+
_event_executor = ThreadPoolExecutor(thread_name_prefix="openfeature-event-handler")
23+
1924
_global_lock = threading.RLock()
2025
_global_handlers: dict[ProviderEvent, list[EventHandler]] = defaultdict(list)
2126

@@ -29,14 +34,22 @@ def run_client_handlers(
2934
client: OpenFeatureClient, event: ProviderEvent, details: EventDetails
3035
) -> None:
3136
with _client_lock:
32-
for handler in _client_handlers[client][event]:
33-
handler(details)
37+
handlers_by_event = _client_handlers.get(client)
38+
if handlers_by_event is None:
39+
return
40+
41+
handlers = tuple(handlers_by_event.get(event, ()))
42+
43+
for handler in handlers:
44+
_submit_handler(handler, details)
3445

3546

3647
def run_global_handlers(event: ProviderEvent, details: EventDetails) -> None:
3748
with _global_lock:
38-
for handler in _global_handlers[event]:
39-
handler(details)
49+
handlers = tuple(_global_handlers.get(event, ()))
50+
51+
for handler in handlers:
52+
_submit_handler(handler, details)
4053

4154

4255
def add_client_handler(
@@ -83,9 +96,12 @@ def run_handlers_for_provider(
8396
run_global_handlers(event, details)
8497
# run the handlers for clients associated to this provider
8598
with _client_lock:
86-
for client in _client_handlers:
87-
if client.provider == provider:
88-
run_client_handlers(client, event, details)
99+
clients = tuple(
100+
client for client in _client_handlers if client.provider == provider
101+
)
102+
103+
for client in clients:
104+
run_client_handlers(client, event, details)
89105

90106

91107
def _run_immediate_handler(
@@ -98,7 +114,20 @@ def _run_immediate_handler(
98114
ProviderStatus.STALE: ProviderEvent.PROVIDER_STALE,
99115
}
100116
if event == status_to_event.get(client.get_provider_status()):
101-
handler(EventDetails(provider_name=client.provider.get_metadata().name))
117+
_submit_handler(
118+
handler, EventDetails(provider_name=client.provider.get_metadata().name)
119+
)
120+
121+
122+
def _submit_handler(handler: EventHandler, details: EventDetails) -> None:
123+
_event_executor.submit(_run_handler, handler, details)
124+
125+
126+
def _run_handler(handler: EventHandler, details: EventDetails) -> None:
127+
try:
128+
handler(details)
129+
except Exception:
130+
_logger.exception("Unhandled exception in OpenFeature event handler")
102131

103132

104133
def clear() -> None:

tests/test_api.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from unittest.mock import MagicMock
23

34
import pytest
@@ -30,6 +31,15 @@
3031
)
3132

3233

34+
def wait_for_mock_call(mock: MagicMock, timeout: float = 1.0) -> None:
35+
deadline = time.monotonic() + timeout
36+
while time.monotonic() < deadline:
37+
if mock.call_count:
38+
return
39+
40+
time.sleep(0.01)
41+
42+
3343
def test_should_not_raise_exception_with_noop_client():
3444
# Given
3545
# No provider has been set
@@ -293,6 +303,10 @@ def test_provider_events():
293303

294304
# Then
295305
# NOTE: provider_ready is called immediately after adding the handler
306+
wait_for_mock_call(spy.provider_ready)
307+
wait_for_mock_call(spy.provider_configuration_changed)
308+
wait_for_mock_call(spy.provider_error)
309+
wait_for_mock_call(spy.provider_stale)
296310
spy.provider_ready.assert_called_once()
297311
spy.provider_configuration_changed.assert_called_once_with(details)
298312
spy.provider_error.assert_called_once_with(details)
@@ -333,6 +347,7 @@ def test_handlers_attached_to_provider_already_in_associated_state_should_run_im
333347
add_handler(ProviderEvent.PROVIDER_READY, spy.provider_ready)
334348

335349
# Then
350+
wait_for_mock_call(spy.provider_ready)
336351
spy.provider_ready.assert_called_once()
337352

338353

@@ -342,12 +357,14 @@ def test_provider_ready_handlers_run_if_provider_initialize_function_terminates_
342357

343358
spy = MagicMock()
344359
add_handler(ProviderEvent.PROVIDER_READY, spy.provider_ready)
360+
wait_for_mock_call(spy.provider_ready)
345361
spy.reset_mock() # reset the mock to avoid counting the immediate call on subscribe
346362

347363
# When
348364
set_provider(provider)
349365

350366
# Then
367+
wait_for_mock_call(spy.provider_ready)
351368
spy.provider_ready.assert_called_once()
352369

353370

@@ -363,6 +380,7 @@ def test_provider_error_handlers_run_if_provider_initialize_function_terminates_
363380
set_provider(provider)
364381

365382
# Then
383+
wait_for_mock_call(spy.provider_error)
366384
spy.provider_error.assert_called_once()
367385

368386

tests/test_client.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import inspect
2+
import threading
23
import time
34
import types
45
import uuid
@@ -29,6 +30,15 @@
2930
from openfeature.transaction_context import ContextVarsTransactionContextPropagator
3031

3132

33+
def wait_for_mock_call(mock: MagicMock, timeout: float = 1.0) -> None:
34+
deadline = time.monotonic() + timeout
35+
while time.monotonic() < deadline:
36+
if mock.call_count:
37+
return
38+
39+
time.sleep(0.01)
40+
41+
3242
@pytest.mark.parametrize(
3343
"flag_type, default_value, get_method",
3444
(
@@ -467,6 +477,10 @@ def emit_all_events(provider):
467477

468478
# Then
469479
# NOTE: provider_ready is called immediately after adding the handler
480+
wait_for_mock_call(spy.provider_ready)
481+
wait_for_mock_call(spy.provider_configuration_changed)
482+
wait_for_mock_call(spy.provider_error)
483+
wait_for_mock_call(spy.provider_stale)
470484
spy.provider_ready.assert_called_once()
471485
spy.provider_configuration_changed.assert_called_once_with(details)
472486
spy.provider_error.assert_called_once_with(details)
@@ -525,6 +539,7 @@ def test_provider_event_late_binding():
525539
other_provider.emit_provider_configuration_changed(other_provider_details)
526540

527541
# Then
542+
wait_for_mock_call(spy.provider_configuration_changed)
528543
spy.provider_configuration_changed.assert_called_once_with(details)
529544

530545

@@ -545,6 +560,7 @@ def test_provider_event_handler_exception():
545560
)
546561

547562
# Then
563+
wait_for_mock_call(spy.provider_error)
548564
spy.provider_error.assert_called_once_with(
549565
EventDetails(
550566
flags_changed=None,
@@ -556,6 +572,67 @@ def test_provider_event_handler_exception():
556572
)
557573

558574

575+
def test_provider_event_handler_exception_does_not_stop_subsequent_handlers():
576+
# Given
577+
provider = NoOpProvider()
578+
set_provider(provider)
579+
580+
spy = MagicMock()
581+
handler_called = threading.Event()
582+
583+
def raising_handler(details):
584+
raise RuntimeError("handler failed")
585+
586+
def recording_handler(details):
587+
spy.provider_error(details)
588+
handler_called.set()
589+
590+
client = get_client()
591+
client.add_handler(ProviderEvent.PROVIDER_ERROR, raising_handler)
592+
client.add_handler(ProviderEvent.PROVIDER_ERROR, recording_handler)
593+
594+
details = ProviderEventDetails(error_code=ErrorCode.GENERAL, message="some_error")
595+
expected_details = EventDetails.from_provider_event_details(
596+
provider.get_metadata().name, details
597+
)
598+
599+
# When
600+
provider.emit_provider_error(details)
601+
602+
# Then
603+
assert handler_called.wait(timeout=1)
604+
spy.provider_error.assert_called_once_with(expected_details)
605+
606+
607+
def test_provider_event_handlers_do_not_block_emitter():
608+
# Given
609+
provider = NoOpProvider()
610+
set_provider(provider)
611+
612+
handler_started = threading.Event()
613+
release_handler = threading.Event()
614+
handler_finished = threading.Event()
615+
616+
def slow_handler(details):
617+
handler_started.set()
618+
release_handler.wait(timeout=1)
619+
handler_finished.set()
620+
621+
client = get_client()
622+
client.add_handler(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, slow_handler)
623+
624+
# When
625+
start_time = time.perf_counter()
626+
provider.emit_provider_configuration_changed(ProviderEventDetails())
627+
elapsed = time.perf_counter() - start_time
628+
629+
# Then
630+
assert handler_started.wait(timeout=1)
631+
assert elapsed < 0.2
632+
release_handler.set()
633+
assert handler_finished.wait(timeout=1)
634+
635+
559636
def test_client_handlers_thread_safety():
560637
provider = NoOpProvider()
561638
set_provider(provider)

0 commit comments

Comments
 (0)