Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions prometheus_client/values.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@
from .mmap_dict import mmap_key, MmapedDict


_multi_process_cleanups = []


def close_all_multiprocess_files():
for cleanup in _multi_process_cleanups:
cleanup()
_multi_process_cleanups.clear()


class MutexValue:
"""A float protected by a mutex."""

Expand Down Expand Up @@ -52,6 +61,13 @@ def MultiProcessValue(process_identifier=os.getpid):
# This avoids the need to also have mutexes in __MmapDict.
lock = Lock()

def cleanup():
for f in files.values():
f.close()
files.clear()
values.clear()
_multi_process_cleanups.append(cleanup)

class MmapedValue:
"""A float protected by a mutex backed by a per-process mmaped file."""

Expand Down Expand Up @@ -122,6 +138,14 @@ def get_exemplar(self):
# TODO: Implement exemplars for multiprocess mode.
return None

@classmethod
def close_all_files(cls):
with lock:
for f in files.values():
f.close()
files.clear()
values.clear()

return MmapedValue


Expand Down
27 changes: 16 additions & 11 deletions tests/test_asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,32 @@ def setUp(self):
# Setup ASGI scope
self.scope = {}
setup_testing_defaults(self.scope)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.communicator = None

def tearDown(self):
if self.communicator:
asyncio.new_event_loop().run_until_complete(
self.loop.run_until_complete(
self.communicator.wait()
)
self.loop.close()

def seed_app(self, app):
self.communicator = ApplicationCommunicator(app, self.scope)
async def _init():
self.communicator = ApplicationCommunicator(app, self.scope)
self.loop.run_until_complete(_init())

def send_input(self, payload):
asyncio.new_event_loop().run_until_complete(
self.loop.run_until_complete(
self.communicator.send_input(payload)
)

def send_default_request(self):
self.send_input({"type": "http.request", "body": b""})

def get_output(self):
output = asyncio.new_event_loop().run_until_complete(
output = self.loop.run_until_complete(
self.communicator.receive_output(0)
)
return output
Expand Down Expand Up @@ -148,9 +153,9 @@ def test_gzip(self):
increments = 2
self.increment_metrics(metric_name, help_text, increments)
app = make_asgi_app(self.registry)
self.seed_app(app)
# Send input with gzip header.
self.scope["headers"] = [(b"accept-encoding", b"gzip")]
self.seed_app(app)
self.send_input({"type": "http.request", "body": b""})
# Assert outputs are compressed.
outputs = self.get_all_output()
Expand All @@ -164,9 +169,9 @@ def test_gzip_disabled(self):
self.increment_metrics(metric_name, help_text, increments)
# Disable compression explicitly.
app = make_asgi_app(self.registry, disable_compression=True)
self.seed_app(app)
# Send input with gzip header.
self.scope["headers"] = [(b"accept-encoding", b"gzip")]
self.seed_app(app)
self.send_input({"type": "http.request", "body": b""})
# Assert outputs are not compressed.
outputs = self.get_all_output()
Expand All @@ -175,8 +180,8 @@ def test_gzip_disabled(self):
def test_openmetrics_encoding(self):
"""Response content type is application/openmetrics-text when appropriate Accept header is in request"""
app = make_asgi_app(self.registry)
self.seed_app(app)
self.scope["headers"] = [(b"Accept", b"application/openmetrics-text; version=1.0.0")]
self.seed_app(app)
self.send_input({"type": "http.request", "body": b""})

content_type = self.get_response_header_value('Content-Type').split(";")[0]
Expand Down Expand Up @@ -204,8 +209,8 @@ def test_qs_parsing(self):
self.increment_metrics(*m)

for i_1 in range(len(metrics)):
self.seed_app(app)
self.scope['query_string'] = f"name[]={metrics[i_1][0]}_total".encode("utf-8")
self.seed_app(app)
self.send_default_request()

outputs = self.get_all_output()
Expand All @@ -220,7 +225,7 @@ def test_qs_parsing(self):

self.assert_not_metrics(output, *metrics[i_2])

asyncio.new_event_loop().run_until_complete(
self.loop.run_until_complete(
self.communicator.wait()
)

Expand All @@ -237,8 +242,8 @@ def test_qs_parsing_multi(self):
for m in metrics:
self.increment_metrics(*m)

self.seed_app(app)
self.scope['query_string'] = "&".join([f"name[]={m[0]}_total" for m in metrics[0:2]]).encode("utf-8")
self.seed_app(app)
self.send_default_request()

outputs = self.get_all_output()
Expand All @@ -249,6 +254,6 @@ def test_qs_parsing_multi(self):
self.assert_metrics(output, *metrics[1])
self.assert_not_metrics(output, *metrics[2])

asyncio.new_event_loop().run_until_complete(
self.loop.run_until_complete(
self.communicator.wait()
)
22 changes: 18 additions & 4 deletions tests/test_multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,26 @@ def setUp(self):
def tearDown(self):
os.environ.pop('prometheus_multiproc_dir', None)
os.environ.pop('PROMETHEUS_MULTIPROC_DIR', None)
values.close_all_multiprocess_files()
values.ValueClass = MutexValue
shutil.rmtree(self.tempdir)

def test_deprecation_warning(self):
os.environ['prometheus_multiproc_dir'] = self.tempdir
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
values.ValueClass = get_value_class()
registry = CollectorRegistry()
collector = MultiProcessCollector(registry)
Counter('c', 'help', registry=None)

assert os.environ['PROMETHEUS_MULTIPROC_DIR'] == self.tempdir
assert len(w) == 1
assert issubclass(w[-1].category, DeprecationWarning)
assert "PROMETHEUS_MULTIPROC_DIR" in str(w[-1].message)
if os.name != 'nt':
assert len(w) == 1
assert issubclass(w[-1].category, DeprecationWarning)
assert "PROMETHEUS_MULTIPROC_DIR" in str(w[-1].message)
else:
assert len(w) == 0

def test_mark_process_dead_respects_lowercase(self):
os.environ['prometheus_multiproc_dir'] = self.tempdir
Expand All @@ -61,8 +66,9 @@ def _value_class(self):

def tearDown(self):
del os.environ['PROMETHEUS_MULTIPROC_DIR']
shutil.rmtree(self.tempdir)
values.close_all_multiprocess_files()
values.ValueClass = MutexValue
shutil.rmtree(self.tempdir)

def test_counter_adds(self):
c1 = Counter('c', 'help', registry=None)
Expand Down Expand Up @@ -119,6 +125,7 @@ def test_gauge_liveall(self):
g2.set(2)
self.assertEqual(1, self.registry.get_sample_value('g', {'pid': '123'}))
self.assertEqual(2, self.registry.get_sample_value('g', {'pid': '456'}))
values.close_all_multiprocess_files()
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(None, self.registry.get_sample_value('g', {'pid': '123'}))
self.assertEqual(2, self.registry.get_sample_value('g', {'pid': '456'}))
Expand All @@ -140,6 +147,7 @@ def test_gauge_livemin(self):
g1.set(1)
g2.set(2)
self.assertEqual(1, self.registry.get_sample_value('g'))
values.close_all_multiprocess_files()
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(2, self.registry.get_sample_value('g'))

Expand All @@ -160,6 +168,7 @@ def test_gauge_livemax(self):
g1.set(2)
g2.set(1)
self.assertEqual(2, self.registry.get_sample_value('g'))
values.close_all_multiprocess_files()
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(1, self.registry.get_sample_value('g'))

Expand All @@ -171,6 +180,7 @@ def test_gauge_sum(self):
g1.set(1)
g2.set(2)
self.assertEqual(3, self.registry.get_sample_value('g'))
values.close_all_multiprocess_files()
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(3, self.registry.get_sample_value('g'))

Expand All @@ -182,6 +192,7 @@ def test_gauge_livesum(self):
g1.set(1)
g2.set(2)
self.assertEqual(3, self.registry.get_sample_value('g'))
values.close_all_multiprocess_files()
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(2, self.registry.get_sample_value('g'))

Expand All @@ -192,6 +203,7 @@ def test_gauge_mostrecent(self):
g2.set(2)
g1.set(1)
self.assertEqual(1, self.registry.get_sample_value('g'))
values.close_all_multiprocess_files()
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(1, self.registry.get_sample_value('g'))

Expand All @@ -202,6 +214,7 @@ def test_gauge_livemostrecent(self):
g2.set(2)
g1.set(1)
self.assertEqual(1, self.registry.get_sample_value('g'))
values.close_all_multiprocess_files()
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(2, self.registry.get_sample_value('g'))

Expand Down Expand Up @@ -626,6 +639,7 @@ def test_corruption_detected(self):
list(self.d.read_all_values())

def tearDown(self):
self.d.close()
os.unlink(self.tempfile)


Expand Down
21 changes: 14 additions & 7 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,15 @@ def collect(self):
self.assertEqual(text.encode('utf-8'), generate_latest(registry, ALLOWUTF8))


def test_benchmark_text_string_to_metric_families(benchmark):
text = """# HELP go_gc_duration_seconds A summary of the GC invocation durations.
try:
import pytest_benchmark
HAS_BENCHMARK = True
except ImportError:
HAS_BENCHMARK = False

if HAS_BENCHMARK:
def test_benchmark_text_string_to_metric_families(benchmark):
text = """# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.013300656000000001
go_gc_duration_seconds{quantile="0.25"} 0.013638736
Expand Down Expand Up @@ -422,11 +429,11 @@ def test_benchmark_text_string_to_metric_families(benchmark):
hist_sum 2
"""

@benchmark
def _():
# We need to convert the generator to a full list in order to
# accurately measure the time to yield everything.
return list(text_string_to_metric_families(text))
@benchmark
def _():
# We need to convert the generator to a full list in order to
# accurately measure the time to yield everything.
return list(text_string_to_metric_families(text))


if __name__ == '__main__':
Expand Down