Skip to content

Commit b5735ab

Browse files
authored
fix(batcher): Reset lock and flusher in child after fork (#6163)
Apply the same fork-safety fix introduced in #6148 (for the monitor) to all batchers (log, metrics, span). If `os.fork()` runs while another thread holds `Batcher._lock`, the child inherits the lock locked but the holding thread does not exist in the child, so the lock can never be released and `_ensure_thread` deadlocks forever. Register an after-fork hook via `os.register_at_fork` that replaces `_lock` with a fresh lock and resets `_flusher` / `_flusher_pid` in the child. The hook holds a `weakref` to the batcher so it does not keep the instance alive. Each batcher (log, metrics, span) gets a regression test that acquires the lock on the parent, forks, and asserts in the child that the lock object was replaced, the new lock is not held, and `_flusher` / `_flusher_pid` were reset. Tests are skipped on Windows and on Python builds without `os.register_at_fork`. Fixes PY-2391 Fixes #6149
1 parent fc3eab4 commit b5735ab

5 files changed

Lines changed: 230 additions & 1 deletion

File tree

sentry_sdk/_batcher.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import threading
44
from datetime import datetime, timezone
55
from typing import TYPE_CHECKING, TypeVar, Generic
6+
import weakref
67

78
from sentry_sdk.utils import format_timestamp
89
from sentry_sdk.envelope import Envelope, Item, PayloadRef
@@ -38,6 +39,26 @@ def __init__(
3839
self._flusher: "Optional[threading.Thread]" = None
3940
self._flusher_pid: "Optional[int]" = None
4041

42+
# See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50
43+
if hasattr(os, "register_at_fork"):
44+
weak_reset = weakref.WeakMethod(self._reset_thread_state)
45+
46+
def _reset_in_child() -> None:
47+
method = weak_reset()
48+
if method is not None:
49+
method()
50+
51+
os.register_at_fork(after_in_child=_reset_in_child)
52+
53+
def _reset_thread_state(self) -> None:
54+
self._buffer = []
55+
self._running = True
56+
self._lock = threading.Lock()
57+
self._active = threading.local()
58+
self._flush_event = threading.Event()
59+
self._flusher = None
60+
self._flusher_pid = None
61+
4162
def _ensure_thread(self) -> bool:
4263
"""For forking processes we might need to restart this thread.
4364
This ensures that our process actually has that thread running.

sentry_sdk/_span_batcher.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
import threading
21
from collections import defaultdict
32
from datetime import datetime, timezone
3+
import os
4+
import threading
45
from typing import TYPE_CHECKING
6+
import weakref
57

68
from sentry_sdk._batcher import Batcher
79
from sentry_sdk.envelope import Envelope, Item, PayloadRef
@@ -50,6 +52,30 @@ def __init__(
5052
self._flusher: "Optional[threading.Thread]" = None
5153
self._flusher_pid: "Optional[int]" = None
5254

55+
# See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50
56+
if hasattr(os, "register_at_fork"):
57+
weak_reset = weakref.WeakMethod(self._reset_thread_state)
58+
59+
def _reset_in_child() -> None:
60+
method = weak_reset()
61+
if method is not None:
62+
method()
63+
64+
os.register_at_fork(after_in_child=_reset_in_child)
65+
66+
def _reset_thread_state(self) -> None:
67+
self._span_buffer = defaultdict(list)
68+
self._running_size = defaultdict(lambda: 0)
69+
self._running = True
70+
71+
self._lock = threading.Lock()
72+
self._active = threading.local()
73+
74+
self._flush_event = threading.Event()
75+
76+
self._flusher = None
77+
self._flusher_pid = None
78+
5379
def add(self, span: "StreamedSpan") -> None:
5480
# Bail out if the current thread is already executing batcher code.
5581
# This prevents deadlocks when code running inside the batcher (e.g.

tests/test_logs.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
import os
34
import sys
45
import time
56
from typing import List, Any, Mapping, Union
@@ -837,3 +838,60 @@ def add_to_envelope_with_reentrant_add(envelope):
837838
assert reentrant_add_called
838839
# If the re-entrancy guard didn't work, this test would hang and it'd
839840
# eventually be timed out by pytest-timeout
841+
842+
843+
@pytest.mark.skipif(
844+
sys.platform == "win32"
845+
or not hasattr(os, "fork")
846+
or not hasattr(os, "register_at_fork"),
847+
reason="requires POSIX fork and os.register_at_fork (Python 3.7+)",
848+
)
849+
def test_log_batcher_lock_reset_in_child_after_fork(sentry_init):
850+
"""Regression test for the LogBatcher fork-deadlock fix.
851+
852+
If os.fork() runs while another thread holds LogBatcher._lock, the
853+
child inherits the lock locked. The holding thread does not exist in
854+
the child, so the lock can never be released and _ensure_thread
855+
deadlocks forever. The after-fork hook must replace the lock with a
856+
fresh one in the child and reset
857+
_flusher / _flusher_pid / _buffer / _active / _flush_event.
858+
"""
859+
sentry_init(enable_logs=True)
860+
batcher = sentry_sdk.get_client().log_batcher
861+
assert batcher is not None
862+
863+
original_lock = batcher._lock
864+
original_lock.acquire()
865+
866+
batcher._buffer.append(object())
867+
batcher._active.flag = True
868+
batcher._flush_event.set()
869+
batcher._running = False
870+
871+
pid = os.fork()
872+
if pid == 0:
873+
replaced = batcher._lock is not original_lock
874+
unheld = batcher._lock.acquire(blocking=False)
875+
876+
flusher_reset = batcher._flusher is None and batcher._flusher_pid is None
877+
buffer_reset = len(batcher._buffer) == 0
878+
active_reset = not getattr(batcher._active, "flag", False)
879+
880+
event_reset = not batcher._flush_event.is_set()
881+
running_reset = batcher._running is True
882+
883+
os._exit(
884+
0
885+
if replaced
886+
and unheld
887+
and flusher_reset
888+
and buffer_reset
889+
and active_reset
890+
and event_reset
891+
and running_reset
892+
else 1
893+
)
894+
895+
original_lock.release()
896+
_, status = os.waitpid(pid, 0)
897+
assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0

tests/test_metrics.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
import os
2+
import sys
13
from typing import List
24
from unittest import mock
35

6+
import pytest
47

58
import sentry_sdk
69
from sentry_sdk import get_client
@@ -514,3 +517,59 @@ def before_send_metric(metric, _):
514517
)
515518

516519
get_client().flush()
520+
521+
522+
@pytest.mark.skipif(
523+
sys.platform == "win32"
524+
or not hasattr(os, "fork")
525+
or not hasattr(os, "register_at_fork"),
526+
reason="requires POSIX fork and os.register_at_fork (Python 3.7+)",
527+
)
528+
def test_metrics_batcher_lock_reset_in_child_after_fork(sentry_init):
529+
"""Regression test for the MetricsBatcher fork-deadlock fix.
530+
531+
If os.fork() runs while another thread holds MetricsBatcher._lock,
532+
the child inherits the lock locked. The holding thread does not
533+
exist in the child, so the lock can never be released and
534+
_ensure_thread deadlocks forever. The after-fork hook must replace
535+
the lock with a fresh one in the child and reset
536+
_flusher / _flusher_pid / _buffer / _active / _flush_event.
537+
"""
538+
sentry_init()
539+
batcher = sentry_sdk.get_client().metrics_batcher
540+
assert batcher is not None
541+
542+
original_lock = batcher._lock
543+
original_lock.acquire()
544+
545+
batcher._buffer.append(object())
546+
batcher._active.flag = True
547+
batcher._flush_event.set()
548+
batcher._running = False
549+
550+
pid = os.fork()
551+
if pid == 0:
552+
replaced = batcher._lock is not original_lock
553+
unheld = batcher._lock.acquire(blocking=False)
554+
555+
flusher_reset = batcher._flusher is None and batcher._flusher_pid is None
556+
buffer_reset = len(batcher._buffer) == 0
557+
active_reset = not getattr(batcher._active, "flag", False)
558+
event_reset = not batcher._flush_event.is_set()
559+
running_reset = batcher._running is True
560+
561+
os._exit(
562+
0
563+
if replaced
564+
and unheld
565+
and flusher_reset
566+
and buffer_reset
567+
and active_reset
568+
and event_reset
569+
and running_reset
570+
else 1
571+
)
572+
573+
original_lock.release()
574+
_, status = os.waitpid(pid, 0)
575+
assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0

tests/tracing/test_span_streaming.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import os
23
import re
34
import sys
45
import time
@@ -1554,3 +1555,67 @@ def test_transport_format(sentry_init, capture_envelopes):
15541555
}
15551556
]
15561557
}
1558+
1559+
1560+
@pytest.mark.skipif(
1561+
sys.platform == "win32"
1562+
or not hasattr(os, "fork")
1563+
or not hasattr(os, "register_at_fork"),
1564+
reason="requires POSIX fork and os.register_at_fork (Python 3.7+)",
1565+
)
1566+
def test_span_batcher_lock_reset_in_child_after_fork(sentry_init):
1567+
"""Regression test for the SpanBatcher fork-deadlock fix.
1568+
1569+
If os.fork() runs while another thread holds SpanBatcher._lock, the
1570+
child inherits the lock locked. The holding thread does not exist in
1571+
the child, so the lock can never be released and _ensure_thread
1572+
deadlocks forever. The after-fork hook must replace the lock with a
1573+
fresh one in the child and reset
1574+
_flusher / _flusher_pid / _span_buffer / _running_size / _active /
1575+
_flush_event.
1576+
"""
1577+
sentry_init(
1578+
traces_sample_rate=1.0,
1579+
_experiments={"trace_lifecycle": "stream"},
1580+
)
1581+
batcher = sentry_sdk.get_client().span_batcher
1582+
assert batcher is not None
1583+
1584+
original_lock = batcher._lock
1585+
original_lock.acquire()
1586+
1587+
batcher._span_buffer["test-trace-id"].append(object())
1588+
batcher._running_size["test-trace-id"] = 42
1589+
batcher._active.flag = True
1590+
batcher._flush_event.set()
1591+
batcher._running = False
1592+
1593+
pid = os.fork()
1594+
if pid == 0:
1595+
replaced = batcher._lock is not original_lock
1596+
unheld = batcher._lock.acquire(blocking=False)
1597+
1598+
flusher_reset = batcher._flusher is None and batcher._flusher_pid is None
1599+
span_buffer_reset = len(batcher._span_buffer) == 0
1600+
running_size_reset = len(batcher._running_size) == 0
1601+
1602+
active_reset = not getattr(batcher._active, "flag", False)
1603+
event_reset = not batcher._flush_event.is_set()
1604+
running_reset = batcher._running is True
1605+
1606+
os._exit(
1607+
0
1608+
if replaced
1609+
and unheld
1610+
and flusher_reset
1611+
and span_buffer_reset
1612+
and running_size_reset
1613+
and active_reset
1614+
and event_reset
1615+
and running_reset
1616+
else 1
1617+
)
1618+
1619+
original_lock.release()
1620+
_, status = os.waitpid(pid, 0)
1621+
assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0

0 commit comments

Comments
 (0)