Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions openfeature/_event_support.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations

import atexit
import threading
import typing
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from logging import getLogger

from openfeature.event import (
EventDetails,
Expand All @@ -16,6 +19,10 @@
from openfeature.client import OpenFeatureClient


logger = getLogger("openfeature")
_event_executor = ThreadPoolExecutor(thread_name_prefix="openfeature-event-handler")
Comment thread
gruebel marked this conversation as resolved.
atexit.register(_event_executor.shutdown, wait=True)

_global_lock = threading.RLock()
_global_handlers: dict[ProviderEvent, list[EventHandler]] = defaultdict(list)

Expand All @@ -29,14 +36,22 @@ def run_client_handlers(
client: OpenFeatureClient, event: ProviderEvent, details: EventDetails
) -> None:
with _client_lock:
for handler in _client_handlers[client][event]:
handler(details)
handlers_by_event = _client_handlers.get(client)
if handlers_by_event is None:
return

handlers = tuple(handlers_by_event.get(event, ()))

for handler in handlers:
_submit_handler(handler, details)


def run_global_handlers(event: ProviderEvent, details: EventDetails) -> None:
with _global_lock:
for handler in _global_handlers[event]:
handler(details)
handlers = tuple(_global_handlers.get(event, ()))

for handler in handlers:
_submit_handler(handler, details)


def add_client_handler(
Expand Down Expand Up @@ -83,9 +98,12 @@ def run_handlers_for_provider(
run_global_handlers(event, details)
# run the handlers for clients associated to this provider
with _client_lock:
for client in _client_handlers:
if client.provider == provider:
run_client_handlers(client, event, details)
clients = tuple(
client for client in _client_handlers if client.provider == provider
)

for client in clients:
run_client_handlers(client, event, details)


def _run_immediate_handler(
Expand All @@ -98,7 +116,20 @@ def _run_immediate_handler(
ProviderStatus.STALE: ProviderEvent.PROVIDER_STALE,
}
if event == status_to_event.get(client.get_provider_status()):
handler(EventDetails(provider_name=client.provider.get_metadata().name))
_submit_handler(
handler, EventDetails(provider_name=client.provider.get_metadata().name)
)


def _submit_handler(handler: EventHandler, details: EventDetails) -> None:
_event_executor.submit(_run_handler, handler, details)


def _run_handler(handler: EventHandler, details: EventDetails) -> None:
try:
handler(details)
except Exception:
logger.exception("Unhandled exception in OpenFeature event handler")


def clear() -> None:
Expand Down
18 changes: 18 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import threading
import time
from unittest.mock import MagicMock

import pytest
Expand Down Expand Up @@ -32,6 +33,15 @@
)


def wait_for_mock_call(mock: MagicMock, timeout: float = 1.0) -> None:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if mock.call_count:
return

time.sleep(0.01)


def test_should_not_raise_exception_with_noop_client():
# Given
# No provider has been set
Expand Down Expand Up @@ -295,6 +305,10 @@ def test_provider_events():

# Then
# NOTE: provider_ready is called immediately after adding the handler
wait_for_mock_call(spy.provider_ready)
wait_for_mock_call(spy.provider_configuration_changed)
wait_for_mock_call(spy.provider_error)
wait_for_mock_call(spy.provider_stale)
spy.provider_ready.assert_called_once()
spy.provider_configuration_changed.assert_called_once_with(details)
spy.provider_error.assert_called_once_with(details)
Expand Down Expand Up @@ -335,6 +349,7 @@ def test_handlers_attached_to_provider_already_in_associated_state_should_run_im
add_handler(ProviderEvent.PROVIDER_READY, spy.provider_ready)

# Then
wait_for_mock_call(spy.provider_ready)
spy.provider_ready.assert_called_once()


Expand All @@ -344,12 +359,14 @@ def test_provider_ready_handlers_run_if_provider_initialize_function_terminates_

spy = MagicMock()
add_handler(ProviderEvent.PROVIDER_READY, spy.provider_ready)
wait_for_mock_call(spy.provider_ready)
spy.reset_mock() # reset the mock to avoid counting the immediate call on subscribe

# When
set_provider_and_wait(provider)

# Then
wait_for_mock_call(spy.provider_ready)
spy.provider_ready.assert_called_once()


Expand All @@ -366,6 +383,7 @@ def test_provider_error_handlers_run_if_provider_initialize_function_terminates_
set_provider_and_wait(provider)

# Then
wait_for_mock_call(spy.provider_error)
spy.provider_error.assert_called_once()


Expand Down
95 changes: 94 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
import threading
import time
import types
import uuid
Expand All @@ -7,7 +8,7 @@

import pytest

from openfeature import api
from openfeature import _event_support, api
from openfeature.api import (
add_hooks,
clear_hooks,
Expand All @@ -29,6 +30,15 @@
from openfeature.transaction_context import ContextVarsTransactionContextPropagator


def wait_for_mock_call(mock: MagicMock, timeout: float = 1.0) -> None:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if mock.call_count:
return

time.sleep(0.01)


@pytest.mark.parametrize(
"flag_type, default_value, get_method",
(
Expand Down Expand Up @@ -467,6 +477,10 @@ def emit_all_events(provider):

# Then
# NOTE: provider_ready is called immediately after adding the handler
wait_for_mock_call(spy.provider_ready)
wait_for_mock_call(spy.provider_configuration_changed)
wait_for_mock_call(spy.provider_error)
wait_for_mock_call(spy.provider_stale)
spy.provider_ready.assert_called_once()
spy.provider_configuration_changed.assert_called_once_with(details)
spy.provider_error.assert_called_once_with(details)
Expand Down Expand Up @@ -525,9 +539,25 @@ def test_provider_event_late_binding():
other_provider.emit_provider_configuration_changed(other_provider_details)

# Then
wait_for_mock_call(spy.provider_configuration_changed)
spy.provider_configuration_changed.assert_called_once_with(details)


def test_run_client_handlers_without_registered_handlers_is_noop():
provider = NoOpProvider()
set_provider(provider)
client = get_client("client-without-handlers")
details = EventDetails(provider_name=provider.get_metadata().name)

assert client not in _event_support._client_handlers

_event_support.run_client_handlers(
client, ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details
)

assert client not in _event_support._client_handlers


# Requirement 5.1.4, Requirement 5.1.5
def test_provider_event_handler_exception():
# Given
Expand All @@ -545,6 +575,7 @@ def test_provider_event_handler_exception():
)

# Then
wait_for_mock_call(spy.provider_error)
spy.provider_error.assert_called_once_with(
EventDetails(
flags_changed=None,
Expand All @@ -556,6 +587,68 @@ def test_provider_event_handler_exception():
)


def test_provider_event_handler_exception_does_not_stop_subsequent_handlers():
# Given
provider = NoOpProvider()
set_provider(provider)

spy = MagicMock()
handler_called = threading.Event()

raising_handler = MagicMock(side_effect=RuntimeError("handler failed"))

def recording_handler(details):
spy.provider_error(details)
handler_called.set()

client = get_client()
client.add_handler(ProviderEvent.PROVIDER_ERROR, raising_handler)
client.add_handler(ProviderEvent.PROVIDER_ERROR, recording_handler)

details = ProviderEventDetails(error_code=ErrorCode.GENERAL, message="some_error")
expected_details = EventDetails.from_provider_event_details(
provider.get_metadata().name, details
)

# When
provider.emit_provider_error(details)

# Then
assert handler_called.wait(timeout=1)
Comment thread
gruebel marked this conversation as resolved.
raising_handler.assert_called_once_with(expected_details)
spy.provider_error.assert_called_once_with(expected_details)


def test_provider_event_handlers_do_not_block_emitter():
# Given
provider = NoOpProvider()
set_provider(provider)

handler_started = threading.Event()
release_handler = threading.Event()
handler_finished = threading.Event()

def slow_handler(details):
handler_started.set()
release_handler.wait(timeout=1)
handler_finished.set()

client = get_client()
client.add_handler(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, slow_handler)

# When
start_time = time.perf_counter()
provider.emit_provider_configuration_changed(ProviderEventDetails())
elapsed = time.perf_counter() - start_time

# Then
assert handler_started.wait(timeout=1)
# emit must return well before the handler's blocking wait (1s) would finish
assert elapsed < 0.5
release_handler.set()
assert handler_finished.wait(timeout=1)


def test_client_handlers_thread_safety():
provider = NoOpProvider()
set_provider(provider)
Expand Down
Loading