Skip to content

Commit cbacef0

Browse files
feat!: make set_provider non-blocking, add set_provider_and_wait (#595)
* feat!: make set_provider non-blocking, add set_provider_and_wait Signed-off-by: Jonathan Norris <jonathan.norris@dynatrace.com> * fix: ruff format signature collapse in api.py Signed-off-by: Jonathan Norris <jonathan.norris@dynatrace.com> * fix: use threading.Event in error event test to avoid flaky busy-wait Signed-off-by: Jonathan Norris <jonathan.norris@dynatrace.com> * fixup: pr feedback and additional checks Signed-off-by: Todd Baert <todd.baert@dynatrace.com> * fix: check active registration in stale-init guard, not _provider_status Signed-off-by: Jonathan Norris <jonathan.norris@dynatrace.com> * fixup: edge shutdown race Signed-off-by: Todd Baert <todd.baert@dynatrace.com> --------- Signed-off-by: Jonathan Norris <jonathan.norris@dynatrace.com> Signed-off-by: Todd Baert <todd.baert@dynatrace.com> Co-authored-by: Todd Baert <todd.baert@dynatrace.com>
1 parent 760d808 commit cbacef0

8 files changed

Lines changed: 486 additions & 70 deletions

File tree

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,15 @@ api.set_provider(NoOpProvider())
130130
open_feature_client = api.get_client()
131131
```
132132

133+
`set_provider()` is non-blocking: it registers the provider immediately and runs initialization in a background thread.
134+
Flag evaluations during the initialization window return the default value with a `PROVIDER_NOT_READY` error code.
135+
Use `set_provider_and_wait()` if you need to ensure the provider is ready before proceeding:
136+
137+
```python
138+
# blocks until the provider is initialized (or raises on failure)
139+
api.set_provider_and_wait(NoOpProvider())
140+
```
141+
133142
In some situations, it may be beneficial to register multiple providers in the same application.
134143
This is possible using [domains](#domains), which is covered in more detail below.
135144

openfeature/api.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
"remove_handler",
3434
"set_evaluation_context",
3535
"set_provider",
36+
"set_provider_and_wait",
3637
"set_transaction_context",
3738
"set_transaction_context_propagator",
3839
"shutdown",
@@ -52,6 +53,13 @@ def set_provider(provider: FeatureProvider, domain: str | None = None) -> None:
5253
provider_registry.set_provider(domain, provider)
5354

5455

56+
def set_provider_and_wait(provider: FeatureProvider, domain: str | None = None) -> None:
57+
if domain is None:
58+
provider_registry.set_default_provider(provider, wait_for_init=True)
59+
else:
60+
provider_registry.set_provider(domain, provider, wait_for_init=True)
61+
62+
5563
def clear_providers() -> None:
5664
provider_registry.clear_providers()
5765
_event_support.clear()

openfeature/provider/_registry.py

Lines changed: 145 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import threading
2+
13
from openfeature._event_support import run_handlers_for_provider
24
from openfeature.evaluation_context import EvaluationContext, get_evaluation_context
35
from openfeature.event import (
@@ -13,77 +15,144 @@ class ProviderRegistry:
1315
_default_provider: FeatureProvider
1416
_providers: dict[str, FeatureProvider]
1517
_provider_status: dict[FeatureProvider, ProviderStatus]
18+
_lock: threading.RLock
1619

1720
def __init__(self) -> None:
21+
self._lock = threading.RLock()
1822
self._default_provider = NoOpProvider()
1923
self._providers = {}
2024
self._provider_status = {
2125
self._default_provider: ProviderStatus.READY,
2226
}
2327

24-
def set_provider(self, domain: str, provider: FeatureProvider) -> None:
28+
def set_provider(
29+
self, domain: str, provider: FeatureProvider, wait_for_init: bool = False
30+
) -> None:
2531
if provider is None:
2632
raise GeneralError(error_message="No provider")
2733
if domain is None:
2834
raise GeneralError(error_message="No domain")
29-
providers = self._providers
30-
if domain in providers:
31-
old_provider = providers[domain]
32-
del providers[domain]
33-
if (
34-
old_provider != self._default_provider
35-
and old_provider not in providers.values()
36-
):
37-
self._shutdown_provider(old_provider)
38-
if provider != self._default_provider and provider not in providers.values():
39-
self._initialize_provider(provider)
40-
providers[domain] = provider
35+
36+
old_provider: FeatureProvider | None = None
37+
needs_init = False
38+
with self._lock:
39+
old_provider = self._providers.get(domain)
40+
self._providers[domain] = provider
41+
already_bound = provider is self._default_provider or any(
42+
p is provider for d, p in self._providers.items() if d != domain
43+
)
44+
if not already_bound:
45+
needs_init = True
46+
self._provider_status[provider] = ProviderStatus.NOT_READY
47+
48+
if needs_init:
49+
self._initialize_provider(provider, wait_for_init=wait_for_init)
50+
51+
# old-provider shutdown is always async so a hanging shutdown() cannot
52+
# block set_provider.
53+
if old_provider is not None and old_provider is not provider:
54+
self._shutdown_if_unused(old_provider)
4155

4256
def get_provider(self, domain: str | None) -> FeatureProvider:
4357
if domain is None:
4458
return self._default_provider
4559
return self._providers.get(domain, self._default_provider)
4660

47-
def set_default_provider(self, provider: FeatureProvider) -> None:
61+
def set_default_provider(
62+
self, provider: FeatureProvider, wait_for_init: bool = False
63+
) -> None:
4864
if provider is None:
4965
raise GeneralError(error_message="No provider")
50-
if (
51-
self._default_provider
52-
and self._default_provider not in self._providers.values()
53-
):
54-
self._shutdown_provider(self._default_provider)
55-
self._default_provider = provider
5666

57-
if self._default_provider not in self._providers.values():
58-
self._initialize_provider(provider)
67+
old_provider: FeatureProvider | None = None
68+
needs_init = False
69+
with self._lock:
70+
old_provider = self._default_provider
71+
self._default_provider = provider
72+
if (
73+
provider is not old_provider
74+
and provider not in self._providers.values()
75+
):
76+
needs_init = True
77+
self._provider_status[provider] = ProviderStatus.NOT_READY
78+
79+
if needs_init:
80+
self._initialize_provider(provider, wait_for_init=wait_for_init)
81+
82+
if old_provider is not None and old_provider is not provider:
83+
self._shutdown_if_unused(old_provider)
5984

6085
def get_default_provider(self) -> FeatureProvider:
6186
return self._default_provider
6287

6388
def clear_providers(self) -> None:
6489
self.shutdown()
65-
self._providers.clear()
66-
self._default_provider = NoOpProvider()
67-
self._provider_status = {
68-
self._default_provider: ProviderStatus.READY,
69-
}
90+
with self._lock:
91+
self._providers.clear()
92+
self._default_provider = NoOpProvider()
93+
self._provider_status = {
94+
self._default_provider: ProviderStatus.READY,
95+
}
7096

7197
def shutdown(self) -> None:
72-
for provider in {self._default_provider, *self._providers.values()}:
98+
with self._lock:
99+
providers = {self._default_provider, *self._providers.values()}
100+
101+
for provider in providers:
73102
self._shutdown_provider(provider)
74103

75104
def _get_evaluation_context(self) -> EvaluationContext:
76105
return get_evaluation_context()
77106

78-
def _initialize_provider(self, provider: FeatureProvider) -> None:
107+
def _initialize_provider(
108+
self, provider: FeatureProvider, wait_for_init: bool
109+
) -> None:
79110
provider.attach(self.dispatch_event)
111+
if not hasattr(provider, "initialize"):
112+
# nothing async to do; dispatch READY synchronously.
113+
self.dispatch_event(
114+
provider, ProviderEvent.PROVIDER_READY, ProviderEventDetails()
115+
)
116+
return
117+
if wait_for_init:
118+
self._run_initialize(provider, raise_on_error=True)
119+
return
120+
121+
thread = threading.Thread(
122+
target=self._run_initialize,
123+
args=(provider,),
124+
kwargs={"raise_on_error": False},
125+
daemon=True,
126+
)
127+
thread.start()
128+
129+
def _run_initialize(
130+
self, provider: FeatureProvider, raise_on_error: bool = False
131+
) -> None:
80132
try:
81-
if hasattr(provider, "initialize"):
82-
provider.initialize(self._get_evaluation_context())
133+
provider.initialize(self._get_evaluation_context())
134+
# stale init: provider was replaced/shut down during initialize(); drop event.
135+
# Check active registration, not _provider_status, since replaced providers
136+
# remain in _provider_status until async shutdown pops them.
137+
with self._lock:
138+
if (
139+
provider is not self._default_provider
140+
and provider not in self._providers.values()
141+
):
142+
return
83143
self.dispatch_event(
84144
provider, ProviderEvent.PROVIDER_READY, ProviderEventDetails()
85145
)
86146
except Exception as err:
147+
# stale init: provider was replaced/shut down during initialize(); drop event.
148+
# Check active registration, not _provider_status, since replaced providers
149+
# remain in _provider_status until async shutdown pops them.
150+
with self._lock:
151+
if (
152+
provider is not self._default_provider
153+
and provider not in self._providers.values()
154+
):
155+
return
87156
error_code = (
88157
err.error_code
89158
if isinstance(err, OpenFeatureError)
@@ -97,12 +166,42 @@ def _initialize_provider(self, provider: FeatureProvider) -> None:
97166
error_code=error_code,
98167
),
99168
)
100-
101-
def _shutdown_provider(self, provider: FeatureProvider) -> None:
169+
if raise_on_error:
170+
raise
171+
172+
def _shutdown_if_unused(self, provider: FeatureProvider) -> None:
173+
# only shut down if no longer referenced. shutdown runs on a daemon
174+
# thread so a hanging shutdown() cannot block the caller.
175+
with self._lock:
176+
if provider is self._default_provider:
177+
return
178+
if provider in self._providers.values():
179+
return
180+
181+
thread = threading.Thread(
182+
target=self._shutdown_provider,
183+
args=(provider,),
184+
kwargs={"abort_if_re_registered": True},
185+
daemon=True,
186+
)
187+
thread.start()
188+
189+
def _shutdown_provider(
190+
self, provider: FeatureProvider, abort_if_re_registered: bool = False
191+
) -> None:
102192
try:
103193
if hasattr(provider, "shutdown"):
104194
provider.shutdown()
105-
del self._provider_status[provider]
195+
# if provider is being re-registered, leave its status and event wiring intact
196+
if abort_if_re_registered:
197+
with self._lock:
198+
if (
199+
provider is self._default_provider
200+
or provider in self._providers.values()
201+
):
202+
return
203+
with self._lock:
204+
self._provider_status.pop(provider, None)
106205
except Exception as err:
107206
self.dispatch_event(
108207
provider,
@@ -132,17 +231,18 @@ def _update_provider_status(
132231
event: ProviderEvent,
133232
details: ProviderEventDetails,
134233
) -> None:
135-
if event == ProviderEvent.PROVIDER_READY:
136-
self._provider_status[provider] = ProviderStatus.READY
137-
elif event == ProviderEvent.PROVIDER_STALE:
138-
self._provider_status[provider] = ProviderStatus.STALE
139-
elif event == ProviderEvent.PROVIDER_ERROR:
140-
status = (
141-
ProviderStatus.FATAL
142-
if details.error_code == ErrorCode.PROVIDER_FATAL
143-
else ProviderStatus.ERROR
144-
)
145-
self._provider_status[provider] = status
234+
with self._lock:
235+
if event == ProviderEvent.PROVIDER_READY:
236+
self._provider_status[provider] = ProviderStatus.READY
237+
elif event == ProviderEvent.PROVIDER_STALE:
238+
self._provider_status[provider] = ProviderStatus.STALE
239+
elif event == ProviderEvent.PROVIDER_ERROR:
240+
status = (
241+
ProviderStatus.FATAL
242+
if details.error_code == ErrorCode.PROVIDER_FATAL
243+
else ProviderStatus.ERROR
244+
)
245+
self._provider_status[provider] = status
146246

147247

148248
provider_registry = ProviderRegistry()

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@ def clear_providers():
1515

1616
@pytest.fixture()
1717
def no_op_provider_client():
18-
api.set_provider(NoOpProvider())
18+
api.set_provider_and_wait(NoOpProvider())
1919
return api.get_client()

tests/features/steps/metadata_steps.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
from behave import given, then
22

3-
from openfeature.api import get_client, set_provider
3+
from openfeature.api import get_client, set_provider_and_wait
44
from openfeature.provider.in_memory_provider import InMemoryProvider
55
from tests.features.data import IN_MEMORY_FLAGS
66

77

88
@given("a stable provider")
99
def step_impl_stable_provider(context):
10-
set_provider(InMemoryProvider(IN_MEMORY_FLAGS))
10+
set_provider_and_wait(InMemoryProvider(IN_MEMORY_FLAGS))
1111
context.client = get_client()
1212

1313

tests/features/steps/steps.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from behave import given, then, when
66

7-
from openfeature.api import get_client, set_provider
7+
from openfeature.api import get_client, set_provider_and_wait
88
from openfeature.client import OpenFeatureClient
99
from openfeature.evaluation_context import EvaluationContext
1010
from openfeature.exception import ErrorCode
@@ -28,13 +28,13 @@ def step_impl_resolved_should_be(context, flag_type, key, expected_reason):
2828

2929
@given("a provider is registered with cache disabled")
3030
def step_impl_provider_without_cache(context):
31-
set_provider(InMemoryProvider(IN_MEMORY_FLAGS))
31+
set_provider_and_wait(InMemoryProvider(IN_MEMORY_FLAGS))
3232
context.client = get_client()
3333

3434

3535
@given("a provider is registered")
3636
def step_impl_provider(context):
37-
set_provider(InMemoryProvider(IN_MEMORY_FLAGS))
37+
set_provider_and_wait(InMemoryProvider(IN_MEMORY_FLAGS))
3838
context.client = get_client()
3939

4040

0 commit comments

Comments
 (0)