Skip to content

Commit ad6cb36

Browse files
committed
fix: restart local-eval polling thread after fork (#77)
The polling thread that refreshes the local evaluation environment document is created in the parent process. Threads do not survive os.fork(), so pre-fork servers (gunicorn, uwsgi, multiprocessing) end up with worker processes whose Python Thread object is intact but whose underlying OS thread is dead. Workers then serve a frozen environment-document snapshot from the moment of fork for their entire lifetime. EnvironmentDataPollingManager now composes a threading.Thread instead of inheriting from it, and registers an os.register_at_fork hook so a fresh polling thread is started in each child. The hook also closes the parent's requests.Session so connection-pool sockets are not shared across processes. Stream and analytics threads are not yet covered; tracking separately.
1 parent 14fd76e commit ad6cb36

5 files changed

Lines changed: 245 additions & 12 deletions

File tree

flagsmith/flagsmith.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,6 @@ def _initialise_local_evaluation(self) -> None:
235235
EnvironmentDataPollingManager(
236236
main=self,
237237
refresh_interval_seconds=self.environment_refresh_interval_seconds,
238-
daemon=True,
239238
)
240239
)
241240

flagsmith/polling_manager.py

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import logging
4+
import os
45
import threading
56
import time
67
import typing
@@ -11,26 +12,64 @@
1112
logger = logging.getLogger(__name__)
1213

1314

14-
class EnvironmentDataPollingManager(threading.Thread):
15+
class EnvironmentDataPollingManager:
16+
"""Owns the worker thread that periodically refreshes the local
17+
evaluation environment document.
18+
19+
Composes (rather than extends) :class:`threading.Thread` so the
20+
worker can be replaced — most importantly after :func:`os.fork`,
21+
where threads do not survive into the child. We register an at-fork
22+
hook so a forked worker (gunicorn, uwsgi, multiprocessing) gets a
23+
freshly-started polling thread for its own PID. See issue #77.
24+
"""
25+
1526
def __init__(
1627
self,
17-
*args: typing.Any,
28+
*,
1829
main: Flagsmith,
1930
refresh_interval_seconds: typing.Union[int, float] = 10,
20-
**kwargs: typing.Any,
21-
):
22-
super(EnvironmentDataPollingManager, self).__init__(*args, **kwargs)
31+
) -> None:
32+
self._main = main
33+
self._refresh_interval_seconds = refresh_interval_seconds
2334
self._stop_event = threading.Event()
24-
self.main = main
25-
self.refresh_interval_seconds = refresh_interval_seconds
35+
self._thread = self._build_thread()
36+
self._at_fork_registered = False
2637

27-
def run(self) -> None:
38+
def _build_thread(self) -> threading.Thread:
39+
return threading.Thread(target=self._run, daemon=True)
40+
41+
def _run(self) -> None:
2842
while not self._stop_event.is_set():
29-
self.main.update_environment()
30-
time.sleep(self.refresh_interval_seconds)
43+
self._main.update_environment()
44+
time.sleep(self._refresh_interval_seconds)
45+
46+
def start(self) -> None:
47+
self._thread.start()
48+
if not self._at_fork_registered and hasattr(os, "register_at_fork"):
49+
os.register_at_fork(after_in_child=self._restart_after_fork)
50+
self._at_fork_registered = True
51+
52+
def _restart_after_fork(self) -> None:
53+
if self._thread.is_alive():
54+
return
55+
# Sockets in the parent's connection pool are inherited as
56+
# shared FDs across fork; reusing them would interleave bytes
57+
# between processes. Drop them so the new thread opens fresh.
58+
if session := getattr(self._main, "session", None):
59+
session.close()
60+
self._stop_event = threading.Event()
61+
self._thread = self._build_thread()
62+
self._thread.start()
3163

3264
def stop(self) -> None:
3365
self._stop_event.set()
3466

67+
def is_alive(self) -> bool:
68+
return self._thread.is_alive()
69+
70+
@property
71+
def ident(self) -> typing.Optional[int]:
72+
return self._thread.ident
73+
3574
def __del__(self) -> None:
3675
self._stop_event.set()

poetry.lock

Lines changed: 136 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pre-commit = { version = "^4.2.0", python = ">=3.9,<4" }
3030
responses = "^0.24.1"
3131
types-requests = "^2.32"
3232
pyfakefs = "^5.9.2"
33+
pytest-httpserver = {version = "^1.1.0", python = ">=3.10,<4"}
3334

3435
[tool.mypy]
3536
exclude = ["example/*"]

0 commit comments

Comments
 (0)