-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmetrics.py
More file actions
168 lines (142 loc) · 4.86 KB
/
metrics.py
File metadata and controls
168 lines (142 loc) · 4.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""
Simvue Server Metrics
=====================
Contains a class for remotely connecting to Simvue metrics, or defining
a new set of metrics given relevant arguments.
"""
import http
import typing
import json
import pydantic
from .base import SimvueObject
from simvue.models import MetricSet
from simvue.api.request import get as sv_get, get_json_from_response
try:
from typing import Self
except ImportError:
from typing_extensions import Self
__all__ = ["Metrics"]
class Metrics(SimvueObject):
"""Class for retrieving metrics stored on the server."""
def __init__(
self,
_read_only: bool = True,
_local: bool = False,
**kwargs,
) -> None:
"""Initialise a Metrics object instance."""
self._label = "metric"
super().__init__(_read_only=_read_only, _local=_local, **kwargs)
self._run_id = self._staging.get("run")
self._is_set = True
@classmethod
@pydantic.validate_call
def new(
cls, *, run: str, metrics: list[MetricSet], offline: bool = False, **kwargs
) -> Self:
"""Create a new Metrics entry on the Simvue server.
Parameters
----------
run: str
identifier for the run to attach metrics to.
metrics: list[MetricSet]
set of metrics to attach to run.
offline: bool, optional
whether to create in offline mode, default is False.
Returns
-------
Metrics
metrics object
"""
return Metrics(
run=run,
metrics=[metric.model_dump() for metric in metrics],
_read_only=False,
_offline=offline,
)
@classmethod
@pydantic.validate_call
def get(
cls,
metrics: list[str],
xaxis: typing.Literal["timestamp", "step", "time"],
runs: list[str],
*,
count: pydantic.PositiveInt | None = None,
offset: pydantic.PositiveInt | None = None,
**kwargs,
) -> typing.Generator[dict[str, dict[str, list[dict[str, float]]]], None, None]:
"""Retrieve metrics from the server for a given set of runs.
Parameters
----------
metrics: list[str]
name of metrics to retrieve.
xaxis : Literal["step", "time", "timestamp"]
the x-axis type
* step - enumeration.
* time - time in seconds.
* timestamp - time stamp.
runs : list[str]
list of runs to return metrics for.
count : int | None, optional
limit result count.
offset : int | None, optional
index offset for count.
Yields
------
dict[str, dict[str, list[dict[str, float]]]
metric set object containing metrics for run.
"""
yield from cls._get_all_objects(
offset=offset,
metrics=json.dumps(metrics),
runs=json.dumps(runs),
xaxis=xaxis,
count=count,
**kwargs,
)
@pydantic.validate_call
def span(self, run_ids: list[str]) -> dict[str, int | float]:
"""Returns the metrics span for the given runs"""
_url = self._base_url / "span"
_response = sv_get(url=f"{_url}", headers=self._headers, json=run_ids)
return get_json_from_response(
response=_response,
expected_status=[http.HTTPStatus.OK],
scenario="Retrieving metric spans",
)
@pydantic.validate_call
def names(self, run_ids: list[str]) -> list[str]:
"""Returns the metric names for the given runs"""
_url = self._base_url / "names"
_response = sv_get(
url=f"{_url}", headers=self._headers, params={"runs": json.dumps(run_ids)}
)
return get_json_from_response(
response=_response,
expected_status=[http.HTTPStatus.OK],
scenario="Retrieving metric names",
expected_type=list,
)
def _post_single(self, **kwargs) -> dict[str, typing.Any]:
return super()._post_single(is_json=False, **kwargs)
def delete(self, **kwargs) -> dict[str, typing.Any]:
"""Metrics cannot be deleted"""
raise NotImplementedError("Cannot delete metric set")
def on_reconnect(self, id_mapping: dict[str, str]):
"""Action performed when mode switched from offline to online.
Parameters
----------
id_mapping : dict[str, str]
mapping from offline to online identifier.
"""
if online_run_id := id_mapping.get(self._staging["run"]):
self._staging["run"] = online_run_id
def to_dict(self) -> dict[str, typing.Any]:
"""Convert metrics object to dictionary.
Returns
-------
dict[str, Any]
dictionary representation of metrics object.
"""
return self._staging