-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathusage_recorder.py
More file actions
211 lines (192 loc) · 8.3 KB
/
Copy pathusage_recorder.py
File metadata and controls
211 lines (192 loc) · 8.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
"""用量记录器 — 封装 token 用量日志、定价计算与证据构建."""
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from ..logging.db import TokenLogger
from ..pricing import PricingTable
from .usage_parser import UsageInfo
logger = logging.getLogger(__name__)
class UsageRecorder:
"""封装路由层的用量记录、定价日志与证据构建逻辑."""
def __init__(
self,
token_logger: TokenLogger | None = None,
pricing_table: PricingTable | None = None,
) -> None:
self._token_logger = token_logger
self._pricing_table = pricing_table
def set_pricing_table(self, table: PricingTable) -> None:
self._pricing_table = table
async def set_session_title(self, session_key: str, title: str) -> None:
"""为新 session 设置标题(委托给 TokenLogger)."""
if self._token_logger:
await self._token_logger.set_session_title(session_key, title)
# ── 用量信息构建 ──────────────────────────────────────
@staticmethod
def build_usage_info(usage: dict[str, Any]) -> UsageInfo:
from .usage_parser import UsageInfo
return UsageInfo(
input_tokens=usage.get("input_tokens", 0),
output_tokens=usage.get("output_tokens", 0),
cache_creation_tokens=usage.get("cache_creation_tokens", 0),
cache_read_tokens=usage.get("cache_read_tokens", 0),
request_id=usage.get("request_id", ""),
)
# ── 模型调用日志 ──────────────────────────────────────
def log_model_call(
self,
*,
vendor: str,
model_requested: str,
model_served: str,
duration_ms: int,
usage: UsageInfo,
) -> None:
"""打印模型调用级别的详细 Access Log."""
cost_str = "-"
if self._pricing_table is not None:
cost_value = self._pricing_table.compute_cost(
vendor=vendor,
model_served=model_served,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
cache_creation_tokens=usage.cache_creation_tokens,
cache_read_tokens=usage.cache_read_tokens,
)
if cost_value is not None:
cost_str = cost_value.format()
logger.debug(
"ModelCall: vendor=%s model_requested=%s model_served=%s "
"duration=%dms tokens=[in:%d out:%d cache_create:%d cache_read:%d] cost=%s",
vendor,
model_requested,
model_served,
duration_ms,
usage.input_tokens,
usage.output_tokens,
usage.cache_creation_tokens,
usage.cache_read_tokens,
cost_str,
)
# ── 持久化记录 ────────────────────────────────────────
async def record(
self,
vendor: str,
model_requested: str,
model_served: str,
usage: UsageInfo,
duration_ms: int,
success: bool,
failover: bool,
failover_from: str | None = None,
evidence_records: list[dict[str, Any]] | None = None,
client_category: str = "cc",
operation: str = "",
endpoint: str = "",
extra_usage: dict[str, Any] | None = None,
session_key: str = "",
) -> None:
"""记录用量到 TokenLogger.
Args:
client_category: 客户端类别(``cc`` = Claude Code,``api`` = 原生 API 透传)。
默认 ``cc`` 保持既有调用方零改动。
operation: 规范化操作名(``chat`` / ``embedding`` / ``generate_content`` ...)。
endpoint: 原始上游路径(``/v1/chat/completions`` ...),用于多维度排障。
extra_usage: 非规范 token 字段字典(Gemini thoughts / OpenAI reasoning 等),
序列化为 ``extra_usage_json`` 列供后续分析或补算单价。
"""
if not self._token_logger:
return
extra_usage_json = "{}"
if extra_usage:
try:
extra_usage_json = json.dumps(
extra_usage, ensure_ascii=False, sort_keys=True, default=str
)
except (TypeError, ValueError):
# 防御性兜底:任何序列化异常降级为空对象,避免污染主流程
logger.warning(
"Failed to serialize extra_usage for vendor=%s operation=%s",
vendor,
operation,
)
extra_usage_json = "{}"
await self._token_logger.log(
vendor=vendor,
model_requested=model_requested,
model_served=model_served,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
cache_creation_tokens=usage.cache_creation_tokens,
cache_read_tokens=usage.cache_read_tokens,
duration_ms=duration_ms,
success=success,
failover=failover,
failover_from=failover_from,
request_id=usage.request_id,
client_category=client_category,
operation=operation,
endpoint=endpoint,
extra_usage_json=extra_usage_json,
session_key=session_key,
)
if not evidence_records:
return
if not hasattr(self._token_logger, "log_evidence"):
return
# Evidence 归档策略:
# - 既有 copilot 流量保持原行为(保证 copilot 相关告警/审计的字段稳定);
# - client_category='api' 的原生透传流量全量归档(便于后续补算 reasoning/audio
# 等非规范 token 的单价与审计模型返回漂移)。
if vendor != "copilot" and client_category != "api":
return
for record in evidence_records:
await self._token_logger.log_evidence(**record)
# ── 证据记录构建 ──────────────────────────────────────
@staticmethod
def build_nonstream_evidence_records(
*, vendor: str, model_served: str, usage: UsageInfo
) -> list[dict[str, Any]]:
if vendor != "copilot":
return []
raw_usage: dict[str, Any] = {
"input_tokens": usage.input_tokens,
"output_tokens": usage.output_tokens,
}
if usage.cache_creation_tokens > 0:
raw_usage["cache_creation_input_tokens"] = usage.cache_creation_tokens
if usage.cache_read_tokens > 0:
raw_usage["cache_read_input_tokens"] = usage.cache_read_tokens
return [
{
"vendor": vendor,
"request_id": usage.request_id,
"model_served": model_served,
"evidence_kind": "nonstream_usage_summary",
"raw_usage_json": json.dumps(
raw_usage, ensure_ascii=False, sort_keys=True
),
"parsed_input_tokens": usage.input_tokens,
"parsed_output_tokens": usage.output_tokens,
"parsed_cache_creation_tokens": usage.cache_creation_tokens,
"parsed_cache_read_tokens": usage.cache_read_tokens,
"cache_signal_present": usage.cache_creation_tokens > 0
or usage.cache_read_tokens > 0,
"source_field_map_json": json.dumps(
{
"input_tokens": "input_tokens",
"output_tokens": "output_tokens",
"cache_creation_tokens": "cache_creation_input_tokens"
if usage.cache_creation_tokens > 0
else "",
"cache_read_tokens": "cache_read_input_tokens"
if usage.cache_read_tokens > 0
else "",
},
ensure_ascii=False,
sort_keys=True,
),
}
]