diff --git a/benchmarks/benchmark_lib.sh b/benchmarks/benchmark_lib.sh index 95e063a3d5..28f5dd50b7 100644 --- a/benchmarks/benchmark_lib.sh +++ b/benchmarks/benchmark_lib.sh @@ -694,6 +694,714 @@ setup_eval_context() { export EVAL_MAX_MODEL_LEN } +# ------------------------------ +# SpeedBench acceptance-length eval helpers +# ------------------------------ + +_prometheus_metric_values_from_text() { + local name="$1" + awk -v name="$name" ' + /^#/ { next } + { + metric = $1 + sub(/\{.*/, "", metric) + if (metric == name && $NF ~ /^-?([0-9]+(\.[0-9]*)?|\.[0-9]+)([eE][-+]?[0-9]+)?$/) { + print $NF + found = 1 + } + } + END { + if (!found) { + exit 1 + } + } + ' +} + +_prometheus_metric_values_url() { + local url="$1" + local name="$2" + local metrics + metrics=$(curl -fsS --max-time "${SPEEDBENCH_METRICS_CURL_TIMEOUT:-10}" "$url" 2>/dev/null) || return 1 + _prometheus_metric_values_from_text "$name" <<< "$metrics" +} + +_prometheus_metric_sum_url() { + local url="$1" + local name="$2" + local values + values=$(_prometheus_metric_values_url "$url" "$name") || return 1 + awk ' + { sum += $1; found = 1 } + END { + if (!found) { + exit 1 + } + printf "%.10f\n", sum + } + ' <<< "$values" +} + +_speedbench_normalize_metrics_url() { + local endpoint="$1" + endpoint="${endpoint%,}" + endpoint="${endpoint%/}" + [[ -z "$endpoint" ]] && return 0 + + if [[ "$endpoint" =~ ^https?:// ]]; then + if [[ "$endpoint" == */metrics || "$endpoint" == */metrics\?* ]]; then + echo "$endpoint" + else + echo "${endpoint}/metrics" + fi + elif [[ "$endpoint" =~ ^[0-9]+$ ]]; then + echo "http://0.0.0.0:${endpoint}/metrics" + elif [[ "$endpoint" =~ ^:[0-9]+$ ]]; then + echo "http://0.0.0.0${endpoint}/metrics" + elif [[ "$endpoint" == */metrics || "$endpoint" == */metrics\?* ]]; then + echo "http://${endpoint}" + else + echo "http://${endpoint}/metrics" + fi +} + +_speedbench_metric_urls() { + local port="$1" + local raw="${SPEEDBENCH_DECODE_METRICS_URLS:-${SPEEDBENCH_METRICS_URLS:-}}" + local endpoint + + if [[ -n "$raw" ]]; then + for endpoint in ${raw//,/ }; do + _speedbench_normalize_metrics_url "$endpoint" + done + return 0 + fi + + raw="${SPEEDBENCH_METRICS_PORTS:-}" + if [[ -n "$raw" ]]; then + for endpoint in ${raw//,/ }; do + _speedbench_normalize_metrics_url "$endpoint" + done + return 0 + fi + + echo "http://0.0.0.0:${port}/metrics" +} + +_speedbench_metric_sum() { + local port="$1" + local name="$2" + local url value + local total="0" + local found=0 + + while IFS= read -r url; do + [[ -z "$url" ]] && continue + value=$(_prometheus_metric_sum_url "$url" "$name" 2>/dev/null || true) + if [[ -n "$value" ]]; then + total=$(awk -v a="$total" -v b="$value" 'BEGIN { printf "%.10f", a + b }') + found=1 + fi + done < <(_speedbench_metric_urls "$port") + + [[ "$found" -eq 1 ]] || return 1 + awk -v total="$total" 'BEGIN { printf "%.10f\n", total }' +} + +_speedbench_metric_avg() { + local port="$1" + local name="$2" + local url value + local total="0" + local count=0 + + while IFS= read -r url; do + [[ -z "$url" ]] && continue + while IFS= read -r value; do + [[ -z "$value" ]] && continue + total=$(awk -v a="$total" -v b="$value" 'BEGIN { printf "%.10f", a + b }') + count=$((count + 1)) + done < <(_prometheus_metric_values_url "$url" "$name" 2>/dev/null || true) + done < <(_speedbench_metric_urls "$port") + + [[ "$count" -gt 0 ]] || return 1 + awk -v total="$total" -v count="$count" 'BEGIN { printf "%.10f\n", total / count }' +} + +_speedbench_metric_endpoint_count() { + local port="$1" + local url count=0 + while IFS= read -r url; do + [[ -n "$url" ]] && count=$((count + 1)) + done < <(_speedbench_metric_urls "$port") + echo "$count" +} + +_speedbench_trtllm_json_metrics_urls() { + local port="$1" + local raw="${SPEEDBENCH_TRTLLM_JSON_METRICS_URLS:-}" + local endpoint url + + if [[ -n "$raw" ]]; then + for endpoint in ${raw//,/ }; do + _speedbench_normalize_metrics_url "$endpoint" + done + return 0 + fi + + while IFS= read -r url; do + [[ -z "$url" ]] && continue + echo "$url" | sed -E 's#/prometheus/metrics([?].*)?$#/metrics#' + done < <(_speedbench_metric_urls "$port") +} + +_speedbench_trtllm_json_spec_metrics() { + local port="$1" + local mtp="$2" + local urls=() + local url + + while IFS= read -r url; do + [[ -n "$url" ]] && urls+=("$url") + done < <(_speedbench_trtllm_json_metrics_urls "$port") + + [[ "${#urls[@]}" -gt 0 ]] || return 1 + + python3 - "$mtp" "${urls[@]}" <<'PY' +import json +import os +import sys +import urllib.request + + +def number(value, default=0.0): + try: + if value is None: + return default + return float(value) + except (TypeError, ValueError): + return default + + +def stats_from_payload(payload): + if isinstance(payload, list): + return payload + if isinstance(payload, dict): + return [payload] + return [] + + +try: + mtp = float(sys.argv[1]) +except (IndexError, ValueError): + mtp = 0.0 + +timeout = float(os.environ.get("SPEEDBENCH_METRICS_CURL_TIMEOUT", "10")) +total_draft = 0.0 +total_accepted = 0.0 +total_requests = 0.0 +weighted_acceptance_length = 0.0 +unweighted_acceptance_length = 0.0 +unweighted_count = 0 +used_endpoints = 0 + +for url in sys.argv[2:]: + try: + with urllib.request.urlopen(url, timeout=timeout) as response: + payload = json.load(response) + except Exception as exc: # noqa: BLE001 - diagnostics for CI logs + print(f"SpeedBench AL eval: TRT-LLM JSON metrics fetch failed for {url}: {exc}", file=sys.stderr) + continue + + endpoint_had_spec = False + for stat in stats_from_payload(payload): + if not isinstance(stat, dict): + continue + spec = stat.get("specDecodingStats") + if not isinstance(spec, dict): + continue + + draft = number(spec.get("numDraftTokens")) + if draft <= 0: + continue + + accepted = number(spec.get("numAcceptedTokens")) + requests = number(spec.get("numRequestsWithDraftTokens")) + acceptance_length = number(spec.get("acceptanceLength"), default=-1.0) + + total_draft += draft + total_accepted += accepted + endpoint_had_spec = True + + if acceptance_length > 0: + if requests > 0: + total_requests += requests + weighted_acceptance_length += acceptance_length * requests + else: + unweighted_acceptance_length += acceptance_length + unweighted_count += 1 + + if endpoint_had_spec: + used_endpoints += 1 + +if total_requests > 0: + acceptance_length = weighted_acceptance_length / total_requests +elif unweighted_count > 0: + acceptance_length = unweighted_acceptance_length / unweighted_count +elif total_draft > 0 and mtp > 0: + acceptance_length = 1.0 + (total_accepted / (total_draft / mtp)) +else: + sys.exit(1) + +verify_steps = round(total_draft / mtp) if total_draft > 0 and mtp > 0 else 0 +print( + f"{acceptance_length:.4f}\t" + f"{int(round(total_accepted))}\t" + f"{int(verify_steps)}\t" + f"{int(round(total_draft))}\t" + f"{used_endpoints}" +) +PY +} + +_speedbench_trtllm_server_log_metrics() { + local mtp="$1" + local start_offset="${2:-0}" + local log_path="${SPEEDBENCH_TRTLLM_SERVER_LOG:-${SERVER_LOG:-}}" + + [[ -n "$log_path" && -f "$log_path" ]] || return 1 + + python3 "$(pwd)/utils/evals/trtllm_speedbench_al_from_log.py" \ + --log "$log_path" \ + --num-speculative-tokens "$mtp" \ + --start-offset "$start_offset" +} + +_speedbench_metric_delta() { + local before="$1" + local after="$2" + [[ -n "$before" && -n "$after" ]] || return 1 + awk -v before="$before" -v after="$after" ' + BEGIN { + delta = after - before + if (delta < 0) { + delta = after + } + printf "%.10f\n", delta + } + ' +} + +_speedbench_round_metric() { + local value="$1" + [[ -n "$value" ]] || return 1 + awk -v value="$value" 'BEGIN { printf "%.0f\n", value }' +} + +_speedbench_metrics_framework() { + local fw="${SPEEDBENCH_METRICS_FRAMEWORK:-${FRAMEWORK:-vllm}}" + fw="${fw,,}" + if [[ "$fw" == "dynamo" ]]; then + local inner="${SPEEDBENCH_DYNAMO_BACKEND_FRAMEWORK:-${DYNAMO_BACKEND_FRAMEWORK:-${DYNAMO_BACKEND:-}}}" + [[ -n "$inner" ]] && fw="dynamo-${inner,,}" + fi + + case "$fw" in + vllm|dynamo-vllm) + echo "vllm" + ;; + sglang|dynamo-sglang) + echo "sglang" + ;; + trt|trtllm|tensorrt-llm|tensorrt_llm|dynamo-trt|dynamo-trtllm|dynamo-tensorrt-llm|dynamo-tensorrt_llm) + echo "trtllm" + ;; + *) + echo "$fw" + ;; + esac +} + +_speedbench_metric_source_base() { + local framework="$1" + local configured="${SPEEDBENCH_METRICS_FRAMEWORK:-${FRAMEWORK:-$framework}}" + configured="${configured,,}" + if [[ "$configured" == dynamo* ]]; then + echo "dynamo-${framework}-prometheus" + else + echo "${framework}-prometheus" + fi +} + +_speedbench_spec_counter_metric() { + local framework="$1" + local kind="$2" + case "${framework}:${kind}" in + vllm:accepted) + echo "vllm:spec_decode_num_accepted_tokens_total" + ;; + vllm:proposed) + echo "vllm:spec_decode_num_draft_tokens_total" + ;; + vllm:verify) + echo "vllm:spec_decode_num_drafts_total" + ;; + trtllm:accepted) + echo "trtllm_spec_decode_num_accepted_tokens_total" + ;; + trtllm:proposed) + echo "trtllm_spec_decode_num_draft_tokens_total" + ;; + sglang:verify) + echo "sglang:spec_verify_calls_total" + ;; + sglang:completion) + echo "sglang:generation_tokens_total" + ;; + *) + return 1 + ;; + esac +} + +_speedbench_spec_gauge_metric() { + local framework="$1" + local kind="$2" + case "${framework}:${kind}" in + trtllm:acceptance_length) + echo "trtllm_spec_decode_acceptance_length" + ;; + sglang:draft_tokens_per_step) + echo "sglang:spec_num_draft_tokens" + ;; + *) + return 1 + ;; + esac +} + +_speedbench_spec_counter_sum() { + local framework="$1" + local port="$2" + local kind="$3" + local metric + metric=$(_speedbench_spec_counter_metric "$framework" "$kind") || return 1 + _speedbench_metric_sum "$port" "$metric" +} + +_speedbench_spec_gauge_avg() { + local framework="$1" + local port="$2" + local kind="$3" + local metric + metric=$(_speedbench_spec_gauge_metric "$framework" "$kind") || return 1 + _speedbench_metric_avg "$port" "$metric" +} + +_speedbench_reference_yaml() { + if [[ -n "${SPEEDBENCH_REFERENCE_YAML:-}" ]]; then + echo "$SPEEDBENCH_REFERENCE_YAML" + return 0 + fi + + case "${MODEL_PREFIX:-}" in + dsv4) + echo "golden_al_distribution/dsv4_mtp.yaml" + ;; + *) + return 1 + ;; + esac +} + +_speedbench_write_eval_result() { + local output="$1" + local mode="$2" + local mtp="$3" + local al="${4:-}" + local accepted="${5:-}" + local verify_steps="${6:-}" + local proposed_drafts="${7:-}" + local framework="${8:-${SPEEDBENCH_METRICS_FRAMEWORK:-${FRAMEWORK:-}}}" + local metric_source="${9:-}" + local error="${10:-}" + local speedbench_model="${MODEL_NAME:-${MODEL:-}}" + local reference + reference=$(_speedbench_reference_yaml 2>/dev/null || true) + + local record_cmd=( + python3 "$(pwd)/utils/evals/speedbench_al.py" + record + --output "$output" + --reference-yaml "$reference" + --model "$speedbench_model" + --model-prefix "${MODEL_PREFIX:-}" + --thinking-mode "$mode" + --num-speculative-tokens "$mtp" + --category "coding" + --output-len "4096" + --temperature "1.0" + --threshold-ratio "0.95" + --max-threshold-ratio "1.05" + ) + if [[ -n "$framework" ]]; then + record_cmd+=(--framework "$framework") + fi + if [[ -n "$metric_source" ]]; then + record_cmd+=(--metric-source "$metric_source") + fi + if [[ -n "$al" ]]; then + record_cmd+=(--acceptance-length "$al") + fi + if [[ -n "$accepted" ]]; then + record_cmd+=(--accepted-tokens "$accepted") + fi + if [[ -n "$verify_steps" ]]; then + record_cmd+=(--draft-tokens "$verify_steps") + record_cmd+=(--verify-steps "$verify_steps") + fi + if [[ -n "$proposed_drafts" ]]; then + record_cmd+=(--proposed-draft-tokens "$proposed_drafts") + fi + if [[ -n "$error" ]]; then + record_cmd+=(--error "$error") + fi + "${record_cmd[@]}" || true +} + +_speedbench_reference_available() { + local mode="$1" + local mtp="$2" + local reference + local speedbench_model="${MODEL_NAME:-${MODEL:-}}" + reference=$(_speedbench_reference_yaml) || return 1 + [[ -f "$reference" ]] || return 1 + python3 "$(pwd)/utils/evals/speedbench_al.py" resolve \ + --reference-yaml "$reference" \ + --model "$speedbench_model" \ + --model-prefix "${MODEL_PREFIX:-}" \ + --thinking-mode "$mode" \ + --num-speculative-tokens "$mtp" \ + --threshold-ratio "0.95" \ + --max-threshold-ratio "1.05" >/dev/null +} + +_speedbench_prepare_dataset() { + local speedbench_dir="$1" + if [[ -f "$speedbench_dir/qualitative.jsonl" ]]; then + return 0 + fi + mkdir -p "$speedbench_dir" + python3 -m pip install -q datasets tiktoken + curl -LsSf https://raw.githubusercontent.com/NVIDIA-NeMo/Skills/refs/heads/main/nemo_skills/dataset/speed-bench/prepare.py \ + | python3 - --config qualitative --output_dir "$speedbench_dir" + [[ -f "$speedbench_dir/qualitative.jsonl" ]] +} + +run_speedbench_al_eval() { + local port="${PORT:-8888}" + while [[ $# -gt 0 ]]; do + case $1 in + --port) port="$2"; shift 2 ;; + *) + if [[ $# -gt 1 && "$2" != --* ]]; then + shift 2 + else + shift + fi + ;; + esac + done + + local mtp="${SPEEDBENCH_NUM_SPEC_TOKENS:-${NUM_SPEC_TOKENS:-${SPECULATIVE_DRAFT_TOKENS:-2}}}" + local default_thinking_mode="off" + if [[ "${MODEL_PREFIX:-}" == "dsv4" ]]; then + default_thinking_mode="on" + fi + local mode="$default_thinking_mode" + + if [[ "${SPEC_DECODING:-none}" != "mtp" ]]; then + echo "SpeedBench AL eval: skipping non-MTP config (SPEC_DECODING=${SPEC_DECODING:-none})" + return 0 + fi + + if [[ -z "${EVAL_RESULT_DIR:-}" ]]; then + EVAL_RESULT_DIR="$(mktemp -d /tmp/eval_out-XXXXXX)" + export EVAL_RESULT_DIR + fi + + local output="${EVAL_RESULT_DIR}/results_speedbench_al_${mode}_mtp${mtp}.json" + local metrics_framework result_framework metric_source_base metrics_endpoint_count + metrics_framework=$(_speedbench_metrics_framework) + result_framework="${SPEEDBENCH_METRICS_FRAMEWORK:-${FRAMEWORK:-$metrics_framework}}" + metric_source_base=$(_speedbench_metric_source_base "$metrics_framework") + if [[ "$metrics_framework" == "trtllm" && -z "${SPEEDBENCH_DECODE_METRICS_URLS:-}${SPEEDBENCH_METRICS_URLS:-}${SPEEDBENCH_METRICS_PORTS:-}" ]]; then + export SPEEDBENCH_METRICS_URLS="http://0.0.0.0:${port}/prometheus/metrics" + fi + metrics_endpoint_count=$(_speedbench_metric_endpoint_count "$port") + + case "$metrics_framework" in + vllm|sglang|trtllm) + ;; + *) + echo "SpeedBench AL eval: unsupported speculative metrics framework=${metrics_framework}" >&2 + _speedbench_write_eval_result "$output" "$mode" "$mtp" "" "" "" "" "$result_framework" "$metric_source_base" "Unsupported speculative metrics framework: ${metrics_framework}" + return 0 + ;; + esac + + echo "SpeedBench AL eval: metrics framework=${metrics_framework}, endpoints=${metrics_endpoint_count}" + + local speedbench_dir="${SPEEDBENCH_DIR:-$(pwd)/speed_bench_data}" + if ! _speedbench_prepare_dataset "$speedbench_dir"; then + echo "SpeedBench AL eval: SpeedBench dataset download failed" >&2 + _speedbench_write_eval_result "$output" "$mode" "$mtp" "" "" "" "" "$result_framework" "$metric_source_base" "SpeedBench dataset download failed" + return 0 + fi + + if ! _speedbench_reference_available "$mode" "$mtp"; then + echo "SpeedBench AL eval: no reference for mode=${mode} mtp=${mtp}" >&2 + _speedbench_write_eval_result "$output" "$mode" "$mtp" "" "" "" "" "$result_framework" "$metric_source_base" "No SpeedBench AL reference for this eval cell" + return 0 + fi + + local thinking_kwargs='{"thinking": true, "reasoning_effort": "high"}' + + local accepted_before="" proposed_before="" verify_before="" completion_before="" + accepted_before=$(_speedbench_spec_counter_sum "$metrics_framework" "$port" "accepted" 2>/dev/null || true) + proposed_before=$(_speedbench_spec_counter_sum "$metrics_framework" "$port" "proposed" 2>/dev/null || true) + verify_before=$(_speedbench_spec_counter_sum "$metrics_framework" "$port" "verify" 2>/dev/null || true) + completion_before=$(_speedbench_spec_counter_sum "$metrics_framework" "$port" "completion" 2>/dev/null || true) + accepted_before="${accepted_before:-0}" + proposed_before="${proposed_before:-0}" + verify_before="${verify_before:-0}" + completion_before="${completion_before:-0}" + + local trt_server_log_offset="0" + if [[ "$metrics_framework" == "trtllm" ]]; then + local trt_server_log="${SPEEDBENCH_TRTLLM_SERVER_LOG:-${SERVER_LOG:-}}" + if [[ -n "$trt_server_log" && -f "$trt_server_log" ]]; then + trt_server_log_offset=$(wc -c < "$trt_server_log" 2>/dev/null || true) + trt_server_log_offset="${trt_server_log_offset//[!0-9]/}" + trt_server_log_offset="${trt_server_log_offset:-0}" + fi + fi + + local bench_rc=0 + local speedbench_model="${MODEL_NAME:-${MODEL:-}}" + echo "SpeedBench AL eval: running mode=${mode} mtp=${mtp}" + export OPENAI_API_KEY="${OPENAI_API_KEY:-EMPTY}" + local client_cmd=( + python3 "$(pwd)/utils/evals/speedbench_client.py" + --model "$speedbench_model" + --base-url "http://0.0.0.0:${port}" + --dataset-path "$speedbench_dir" + --category coding + --output-len 4096 + --temperature 1.0 + --thinking-mode "$mode" + --timeout "${SPEEDBENCH_CLIENT_TIMEOUT:-1800}" + --retries "${SPEEDBENCH_CLIENT_RETRIES:-2}" + ) + if [[ -n "${SPEEDBENCH_CLIENT_ENDPOINT:-}" ]]; then + client_cmd+=(--endpoint "$SPEEDBENCH_CLIENT_ENDPOINT") + elif [[ "${MODEL_PREFIX:-}" == "dsv4" ]]; then + client_cmd+=(--endpoint completions) + fi + if [[ "$mode" == "on" ]]; then + client_cmd+=(--thinking-kwargs "$thinking_kwargs") + fi + if [[ "${MODEL_PREFIX:-}" == "dsv4" ]]; then + client_cmd+=(--dsv4) + fi + "${client_cmd[@]}" || bench_rc=$? + if [[ "$bench_rc" -ne 0 ]]; then + echo "SpeedBench AL eval: client failed with exit code ${bench_rc}" >&2 + _speedbench_write_eval_result "$output" "$mode" "$mtp" "" "" "" "" "$result_framework" "$metric_source_base" "SpeedBench client failed with exit code ${bench_rc}" + return 0 + fi + + local accepted_after="" proposed_after="" verify_after="" completion_after="" + local al="" delta_acc="" delta_proposed="" delta_verify="" delta_completion="" metric_source="" + accepted_after=$(_speedbench_spec_counter_sum "$metrics_framework" "$port" "accepted" 2>/dev/null || true) + proposed_after=$(_speedbench_spec_counter_sum "$metrics_framework" "$port" "proposed" 2>/dev/null || true) + verify_after=$(_speedbench_spec_counter_sum "$metrics_framework" "$port" "verify" 2>/dev/null || true) + completion_after=$(_speedbench_spec_counter_sum "$metrics_framework" "$port" "completion" 2>/dev/null || true) + + if [[ -n "$accepted_after" ]]; then + delta_acc=$(_speedbench_round_metric "$(_speedbench_metric_delta "$accepted_before" "$accepted_after")") + fi + if [[ -n "$proposed_after" ]]; then + delta_proposed=$(_speedbench_round_metric "$(_speedbench_metric_delta "$proposed_before" "$proposed_after")") + fi + if [[ -n "$verify_after" ]]; then + delta_verify=$(_speedbench_round_metric "$(_speedbench_metric_delta "$verify_before" "$verify_after")") + fi + if [[ -n "$completion_after" ]]; then + delta_completion=$(_speedbench_round_metric "$(_speedbench_metric_delta "$completion_before" "$completion_after")") + fi + + if [[ "$metrics_framework" == "vllm" && -n "$delta_acc" && -n "$delta_verify" && "$delta_verify" -gt 0 ]]; then + al=$(awk -v accepted="$delta_acc" -v verify="$delta_verify" 'BEGIN { printf "%.4f", 1 + (accepted / verify) }') + metric_source="${metric_source_base}-counters-endpoints${metrics_endpoint_count}" + elif [[ "$metrics_framework" == "trtllm" ]]; then + al=$(_speedbench_spec_gauge_avg "$metrics_framework" "$port" "acceptance_length" 2>/dev/null | awk '{ printf "%.4f", $1 }' || true) + if [[ -n "$al" ]]; then + metric_source="${metric_source_base}-gauge-endpoints${metrics_endpoint_count}" + if [[ -n "$delta_acc" || -n "$delta_proposed" ]]; then + metric_source="${metric_source}+token-counters" + fi + else + local trt_json_metrics="" trt_json_endpoints="" + trt_json_metrics=$(_speedbench_trtllm_json_spec_metrics "$port" "$mtp" || true) + if [[ -n "$trt_json_metrics" ]]; then + IFS=$'\t' read -r al delta_acc delta_verify delta_proposed trt_json_endpoints <<< "$trt_json_metrics" + metric_source="trtllm-json-iteration-stats-endpoints${trt_json_endpoints}" + fi + if [[ -z "$al" ]]; then + local trt_log_metrics="" trt_log_samples="" + trt_log_metrics=$(_speedbench_trtllm_server_log_metrics "$mtp" "$trt_server_log_offset" || true) + if [[ -n "$trt_log_metrics" ]]; then + IFS=$'\t' read -r al delta_acc delta_verify delta_proposed trt_log_samples <<< "$trt_log_metrics" + metric_source="trtllm-server-log-generation-tokens-samples${trt_log_samples}" + fi + fi + fi + elif [[ "$metrics_framework" == "sglang" ]]; then + local draft_depth="" + if [[ -n "$delta_completion" && "$delta_completion" -gt 0 && -n "$delta_verify" && "$delta_verify" -gt 0 ]]; then + al=$(awk -v completion="$delta_completion" -v verify="$delta_verify" 'BEGIN { printf "%.4f", completion / verify }') + delta_acc=$(_speedbench_round_metric "$(awk -v completion="$delta_completion" -v verify="$delta_verify" 'BEGIN { value = completion - verify; if (value < 0) value = 0; printf "%.10f\n", value }')") + draft_depth=$(_speedbench_spec_gauge_avg "$metrics_framework" "$port" "draft_tokens_per_step" 2>/dev/null || true) + if [[ -n "$draft_depth" ]]; then + delta_proposed=$(_speedbench_round_metric "$(awk -v verify="$delta_verify" -v depth="$draft_depth" 'BEGIN { value = verify * (depth - 1); if (value < 0) value = 0; printf "%.10f\n", value }')") + elif [[ -n "$mtp" ]]; then + delta_proposed=$(_speedbench_round_metric "$(awk -v verify="$delta_verify" -v mtp="$mtp" 'BEGIN { value = verify * mtp; if (value < 0) value = 0; printf "%.10f\n", value }')") + fi + metric_source="${metric_source_base}-generation-counter+verify-counter-endpoints${metrics_endpoint_count}" + fi + if [[ -n "$delta_verify" && "$delta_verify" -gt 0 ]]; then + if [[ -z "$draft_depth" ]]; then + draft_depth=$(_speedbench_spec_gauge_avg "$metrics_framework" "$port" "draft_tokens_per_step" 2>/dev/null || true) + fi + if [[ -n "$draft_depth" && -z "$delta_proposed" ]]; then + delta_proposed=$(_speedbench_round_metric "$(awk -v verify="$delta_verify" -v depth="$draft_depth" 'BEGIN { value = verify * (depth - 1); if (value < 0) value = 0; printf "%.10f\n", value }')") + fi + fi + fi + + if [[ -z "$al" ]]; then + echo "SpeedBench AL eval: could not collect speculative acceptance metrics from server" >&2 + local metric_error="Could not collect speculative acceptance metrics from server" + if [[ "${FRAMEWORK:-}" == dynamo* && -z "${SPEEDBENCH_DECODE_METRICS_URLS:-}${SPEEDBENCH_METRICS_URLS:-}${SPEEDBENCH_METRICS_PORTS:-}" ]]; then + metric_error="${metric_error}; for Dynamo/disagg set SPEEDBENCH_DECODE_METRICS_URLS or SPEEDBENCH_METRICS_PORTS to decode-worker /metrics endpoints" + fi + _speedbench_write_eval_result "$output" "$mode" "$mtp" "" "$delta_acc" "$delta_verify" "$delta_proposed" "$result_framework" "$metric_source_base" "$metric_error" + else + _speedbench_write_eval_result "$output" "$mode" "$mtp" "$al" "$delta_acc" "$delta_verify" "$delta_proposed" "$result_framework" "$metric_source" + fi +} + run_lm_eval() { local port="${PORT:-8888}" local tasks_dir="${EVAL_TASKS_DIR:-utils/evals/gsm8k.yaml}" @@ -1002,6 +1710,11 @@ run_eval() { return 1 fi + # Batched lm-eval artifacts are staged in PWD. Run SpeedBench once per + # live engine and place its compact result alongside those artifacts. + export EVAL_RESULT_DIR="$(pwd)" + run_speedbench_al_eval "${forwarded[@]}" || true + local eval_conc results_dir eval_rc stage_rc local completed_concs=() local failed_concs=() @@ -1050,6 +1763,7 @@ run_eval() { fi local eval_rc=0 + run_speedbench_al_eval "${forwarded[@]}" || true case "$framework" in lm-eval|lm_eval) run_lm_eval "${forwarded[@]}" || eval_rc=$? ;; *) echo "Unknown framework '${framework}'"; eval_rc=1 ;; diff --git a/benchmarks/single_node/fixed_seq_len/dsv4_fp4_b200_vllm_mtp.sh b/benchmarks/single_node/fixed_seq_len/dsv4_fp4_b200_vllm_mtp.sh index 6846223e8e..0f4eeb8600 100755 --- a/benchmarks/single_node/fixed_seq_len/dsv4_fp4_b200_vllm_mtp.sh +++ b/benchmarks/single_node/fixed_seq_len/dsv4_fp4_b200_vllm_mtp.sh @@ -65,6 +65,7 @@ fi # use 2 speculative tokens for all configs for now NUM_SPEC_TOKENS=2 +export SPEEDBENCH_NUM_SPEC_TOKENS="$NUM_SPEC_TOKENS" # Start GPU monitoring (power, temperature, clocks every second) start_gpu_monitor diff --git a/benchmarks/single_node/fixed_seq_len/dsv4_fp4_b300_sglang_mtp.sh b/benchmarks/single_node/fixed_seq_len/dsv4_fp4_b300_sglang_mtp.sh index 9c8bab961b..388194ddd3 100755 --- a/benchmarks/single_node/fixed_seq_len/dsv4_fp4_b300_sglang_mtp.sh +++ b/benchmarks/single_node/fixed_seq_len/dsv4_fp4_b300_sglang_mtp.sh @@ -128,6 +128,7 @@ PYTHONNOUSERSITE=1 sglang serve \ --model-path $MODEL_PATH --served-model-name $MODEL \ --host 0.0.0.0 \ --port $PORT \ + --enable-metrics \ --trust-remote-code \ --tp $TP \ --ep-size $EP_SIZE \ diff --git a/benchmarks/single_node/fixed_seq_len/dsv4_fp4_b300_vllm_mtp.sh b/benchmarks/single_node/fixed_seq_len/dsv4_fp4_b300_vllm_mtp.sh index a5e7dd28cb..c2a3741250 100755 --- a/benchmarks/single_node/fixed_seq_len/dsv4_fp4_b300_vllm_mtp.sh +++ b/benchmarks/single_node/fixed_seq_len/dsv4_fp4_b300_vllm_mtp.sh @@ -66,6 +66,7 @@ fi # use 2 speculative tokens for all configs for now NUM_SPEC_TOKENS=2 +export SPEEDBENCH_NUM_SPEC_TOKENS="$NUM_SPEC_TOKENS" start_gpu_monitor diff --git a/runners/launch_b200-dgxc.sh b/runners/launch_b200-dgxc.sh index f10e0f4ea4..e35aea023b 100644 --- a/runners/launch_b200-dgxc.sh +++ b/runners/launch_b200-dgxc.sh @@ -374,6 +374,9 @@ EOF # Collect eval results if eval was requested if [[ "${RUN_EVAL:-false}" == "true" || "${EVAL_ONLY:-false}" == "true" ]]; then EVAL_DIR="$LOGS_DIR/eval_results" + if [[ "${FRAMEWORK:-}" == dynamo* && "${SPEC_DECODING:-none}" == "mtp" ]]; then + bash "$GITHUB_WORKSPACE/utils/evals/write_dynamo_speedbench_al_from_logs.sh" "$LOGS_DIR" "$GITHUB_WORKSPACE" + fi if [ -d "$EVAL_DIR" ]; then echo "Extracting eval results from $EVAL_DIR" shopt -s nullglob diff --git a/runners/launch_gb200-nv.sh b/runners/launch_gb200-nv.sh index 065ff7da70..ac9842312b 100755 --- a/runners/launch_gb200-nv.sh +++ b/runners/launch_gb200-nv.sh @@ -551,6 +551,9 @@ fi # Collect eval results if eval was requested if [[ "${RUN_EVAL:-false}" == "true" || "${EVAL_ONLY:-false}" == "true" ]]; then EVAL_DIR="$LOGS_DIR/eval_results" + if [[ "${FRAMEWORK:-}" == dynamo* && "${SPEC_DECODING:-none}" == "mtp" ]]; then + bash "$GITHUB_WORKSPACE/utils/evals/write_dynamo_speedbench_al_from_logs.sh" "$LOGS_DIR" "$GITHUB_WORKSPACE" + fi if [ -d "$EVAL_DIR" ]; then echo "Extracting eval results from $EVAL_DIR" shopt -s nullglob diff --git a/runners/launch_gb300-nv.sh b/runners/launch_gb300-nv.sh index 6766155001..cdd2c4f1c1 100644 --- a/runners/launch_gb300-nv.sh +++ b/runners/launch_gb300-nv.sh @@ -436,6 +436,9 @@ fi # Collect eval results if eval was requested if [[ "${RUN_EVAL:-false}" == "true" || "${EVAL_ONLY:-false}" == "true" ]]; then EVAL_DIR="$LOGS_DIR/eval_results" + if [[ "${FRAMEWORK:-}" == dynamo* && "${SPEC_DECODING:-none}" == "mtp" ]]; then + bash "$GITHUB_WORKSPACE/utils/evals/write_dynamo_speedbench_al_from_logs.sh" "$LOGS_DIR" "$GITHUB_WORKSPACE" + fi if [ -d "$EVAL_DIR" ]; then echo "Extracting eval results from $EVAL_DIR" shopt -s nullglob diff --git a/utils/collect_eval_results.py b/utils/collect_eval_results.py index 194fa4acba..a33b2f1b5f 100644 --- a/utils/collect_eval_results.py +++ b/utils/collect_eval_results.py @@ -81,10 +81,20 @@ def detect_lm_eval_jsons(d: Path, batched: bool = False) -> List[Path]: return [latest_by_conc[conc] for conc in sorted(latest_by_conc)] -def detect_eval_jsons(d: Path) -> Tuple[Optional[Path], Optional[Path]]: - """Return the latest legacy lm-eval JSON and deprecated second slot.""" +def detect_speedbench_jsons(d: Path) -> List[Path]: + """Return compact SpeedBench AL result JSONs from one artifact directory.""" + paths = [] + for path in d.glob('results*.json'): + data = load_json(path) + if isinstance(data, dict) and 'speedbench_al_eval_version' in data: + paths.append(path) + return sorted(paths) + + +def detect_eval_jsons(d: Path) -> Tuple[Optional[Path], List[Path]]: + """Return the latest legacy lm-eval JSON and all SpeedBench AL JSONs.""" lm_paths = detect_lm_eval_jsons(d) - return (lm_paths[0] if lm_paths else None), None + return (lm_paths[0] if lm_paths else None), detect_speedbench_jsons(d) def extract_lm_metrics(json_path: Path) -> List[Dict[str, Any]]: @@ -171,6 +181,38 @@ def get_val_se(filter_name: str) -> Tuple[Optional[float], Optional[float]]: return extracted +def extract_speedbench_al_metrics(json_path: Path) -> List[Dict[str, Any]]: + """Extract a compact SpeedBench AL result as an eval metric row.""" + data = load_json(json_path) or {} + if 'speedbench_al_eval_version' not in data: + return [] + + mode = data.get('thinking_mode', 'unknown') + mtp = data.get('num_speculative_tokens', 'unknown') + return [{ + 'metric_type': 'speedbench_al', + 'task': 'speedbench_al', + 'task_label': f"speedbench_al/{mode}/mtp{mtp}", + 'acceptance_length': data.get('acceptance_length'), + 'reference_acceptance_length': data.get('reference_acceptance_length'), + 'min_acceptance_length': data.get('min_acceptance_length'), + 'max_acceptance_length': data.get('max_acceptance_length'), + 'threshold_ratio': data.get('threshold_ratio'), + 'max_threshold_ratio': data.get('max_threshold_ratio'), + 'thinking_mode': mode, + 'num_speculative_tokens': mtp, + 'speedbench_framework': data.get('framework'), + 'speedbench_metric_source': data.get('metric_source'), + 'speedbench_accepted_tokens': data.get('accepted_tokens'), + 'speedbench_verify_steps': data.get('verify_steps', data.get('draft_tokens')), + 'speedbench_proposed_draft_tokens': data.get('proposed_draft_tokens'), + 'passed': data.get('passed'), + 'error': data.get('error'), + 'model': data.get('model'), + 'source': str(json_path), + }] + + def pct(x: Any) -> str: """Format value as percentage.""" try: @@ -250,7 +292,7 @@ def build_row(meta: Dict[str, Any], m: Dict[str, Any]) -> Dict[str, Any]: 'dp_attention': str(dp_attention).lower(), 'prefill_dp_attention': str(prefill_dp_attention).lower(), 'decode_dp_attention': str(decode_dp_attention).lower(), - 'task': m.get('task', 'unknown'), + 'task': m.get('task_label') or m.get('task', 'unknown'), 'em_strict': m.get('strict'), 'em_strict_se': m.get('strict_se'), 'em_flexible': m.get('flex'), @@ -260,7 +302,25 @@ def build_row(meta: Dict[str, Any], m: Dict[str, Any]) -> Dict[str, Any]: } # Add universal score field (primary metric for unified comparison) - if m.get('strict') is not None: + if m.get('metric_type') == 'speedbench_al': + row['score'] = m.get('acceptance_length') + row['score_name'] = 'acceptance_length' + row['score_se'] = None + row['speedbench_reference_acceptance_length'] = m.get('reference_acceptance_length') + row['speedbench_min_acceptance_length'] = m.get('min_acceptance_length') + row['speedbench_max_acceptance_length'] = m.get('max_acceptance_length') + row['speedbench_threshold_ratio'] = m.get('threshold_ratio') + row['speedbench_max_threshold_ratio'] = m.get('max_threshold_ratio') + row['speedbench_thinking_mode'] = m.get('thinking_mode') + row['speedbench_num_speculative_tokens'] = m.get('num_speculative_tokens') + row['speedbench_framework'] = m.get('speedbench_framework') + row['speedbench_metric_source'] = m.get('speedbench_metric_source') + row['speedbench_accepted_tokens'] = m.get('speedbench_accepted_tokens') + row['speedbench_verify_steps'] = m.get('speedbench_verify_steps') + row['speedbench_proposed_draft_tokens'] = m.get('speedbench_proposed_draft_tokens') + row['speedbench_passed'] = m.get('passed') + row['speedbench_error'] = m.get('error') + elif m.get('strict') is not None: row['score'] = m.get('strict') row['score_name'] = 'em_strict' row['score_se'] = m.get('strict_se') @@ -276,6 +336,28 @@ def build_row(meta: Dict[str, Any], m: Dict[str, Any]) -> Dict[str, Any]: return row +def score_cell(row: Dict[str, Any]) -> str: + """Format the primary score for lm-eval and non-percentage eval rows.""" + if row.get('score_name') == 'acceptance_length': + score = row.get('score') + minimum = row.get('speedbench_min_acceptance_length') + maximum = row.get('speedbench_max_acceptance_length') + passed = row.get('speedbench_passed') + if score is None: + return 'FAIL' + try: + status = 'PASS' if passed else 'FAIL' + if minimum is None or maximum is None: + return f"{float(score):.2f} ({status})" + return ( + f"{float(score):.2f} in " + f"[{float(minimum):.2f}, {float(maximum):.2f}] ({status})" + ) + except Exception: + return str(score) + return f"{pct(row['score'])}{se(row['score_se'])}" + + def collect_eval_rows(root: Path) -> List[Dict[str, Any]]: """Collect logical eval rows, expanding batched artifacts by concurrency.""" rows: List[Dict[str, Any]] = [] @@ -302,6 +384,10 @@ def collect_eval_rows(root: Path) -> List[Dict[str, Any]]: metrics_list = extract_lm_metrics(lm_path) for metrics in metrics_list: rows.append(build_row(row_meta, metrics)) + + for speedbench_path in detect_speedbench_jsons(d): + for metrics in extract_speedbench_al_metrics(speedbench_path): + rows.append(build_row(meta, metrics)) return rows @@ -376,7 +462,7 @@ def main(): r['conc'], r['dp_attention'], r['task'], - f"{pct(r['score'])}{se(r['score_se'])}", + score_cell(r), f"{pct(r['em_strict'])}{se(r['em_strict_se'])}", f"{pct(r['em_flexible'])}{se(r['em_flexible_se'])}", r['n_eff'] or '', @@ -414,7 +500,7 @@ def main(): r['decode_num_workers'], r['conc'], r['task'], - f"{pct(r['score'])}{se(r['score_se'])}", + score_cell(r), f"{pct(r['em_strict'])}{se(r['em_strict_se'])}", f"{pct(r['em_flexible'])}{se(r['em_flexible_se'])}", r['n_eff'] or '', diff --git a/utils/evals/dynamo_speedbench_al_from_logs.py b/utils/evals/dynamo_speedbench_al_from_logs.py new file mode 100644 index 0000000000..e2135953e9 --- /dev/null +++ b/utils/evals/dynamo_speedbench_al_from_logs.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python3 +"""Build a SpeedBench AL result from Dynamo decode-worker spec logs.""" + +from __future__ import annotations + +import argparse +import re +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable + +from speedbench_al import build_result, cmd_record + + +SPEC_LINE_RE = re.compile( + r"SpecDecoding metrics:\s*" + r"Mean acceptance length:\s*(?P[0-9]+(?:\.[0-9]+)?)" + r".*?" + r"Accepted:\s*(?P[0-9]+)\s*tokens,\s*" + r"Drafted:\s*(?P[0-9]+)\s*tokens" +) +SGLANG_ACCEPT_LINE_RE = re.compile( + r"\baccept len:\s*(?P[0-9]+(?:\.[0-9]+)?)" + r"\s*,\s*accept rate:\s*(?P[0-9]+(?:\.[0-9]+)?)" +) +WORKER_RE = re.compile(r"_decode_w(?P[0-9]+)\.out$") + + +@dataclass(frozen=True) +class LogMetrics: + path: Path + worker: str + samples: int + acceptance_length_samples: int + acceptance_length_total: float + accepted_tokens: int + proposed_draft_tokens: int + + @property + def has_counter_metrics(self) -> bool: + return self.proposed_draft_tokens > 0 + + +@dataclass(frozen=True) +class AggregatedMetrics: + workers: int + samples: int + accepted_tokens: int | None + proposed_draft_tokens: int | None + verify_steps: int | None + acceptance_length: float + has_counter_metrics: bool + selected_logs: tuple[Path, ...] + + +def _decode_log_files(logs_dir: Path) -> Iterable[Path]: + if not logs_dir.is_dir(): + return [] + return sorted(logs_dir.rglob("*_decode_w*.out")) + + +def parse_decode_log(path: Path) -> LogMetrics | None: + match = WORKER_RE.search(path.name) + if not match: + return None + + samples = 0 + acceptance_length_samples = 0 + acceptance_length_total = 0.0 + accepted = 0 + drafted = 0 + try: + lines = path.read_text(errors="ignore").splitlines() + except OSError: + return None + + for line in lines: + parsed = SPEC_LINE_RE.search(line) + if not parsed: + sglang_parsed = SGLANG_ACCEPT_LINE_RE.search(line) + if not sglang_parsed: + continue + samples += 1 + acceptance_length_samples += 1 + acceptance_length_total += float(sglang_parsed.group("al")) + continue + samples += 1 + accepted += int(parsed.group("accepted")) + drafted += int(parsed.group("drafted")) + + if samples == 0 or (drafted <= 0 and acceptance_length_samples == 0): + return None + + return LogMetrics( + path=path, + worker=match.group("worker"), + samples=samples, + acceptance_length_samples=acceptance_length_samples, + acceptance_length_total=acceptance_length_total, + accepted_tokens=accepted, + proposed_draft_tokens=drafted, + ) + + +def select_decode_worker_logs(logs_dir: Path) -> list[LogMetrics]: + by_worker: dict[str, LogMetrics] = {} + for path in _decode_log_files(logs_dir): + metrics = parse_decode_log(path) + if metrics is None: + continue + current = by_worker.get(metrics.worker) + if current is None: + by_worker[metrics.worker] = metrics + continue + if ( + metrics.has_counter_metrics, + metrics.samples, + metrics.proposed_draft_tokens, + metrics.acceptance_length_samples, + ) > ( + current.has_counter_metrics, + current.samples, + current.proposed_draft_tokens, + current.acceptance_length_samples, + ): + by_worker[metrics.worker] = metrics + return [by_worker[k] for k in sorted(by_worker, key=int)] + + +def aggregate_log_metrics(logs_dir: Path, mtp: int) -> AggregatedMetrics | None: + if mtp <= 0: + raise ValueError("mtp must be positive") + + selected = select_decode_worker_logs(logs_dir) + if not selected: + return None + + counter_logs = [item for item in selected if item.has_counter_metrics] + if counter_logs: + accepted = sum(item.accepted_tokens for item in counter_logs) + proposed = sum(item.proposed_draft_tokens for item in counter_logs) + samples = sum(item.samples for item in counter_logs) + verify_steps = round(proposed / mtp) + acceptance_length = 1.0 + (accepted / (proposed / mtp)) + + return AggregatedMetrics( + workers=len(counter_logs), + samples=samples, + accepted_tokens=accepted, + proposed_draft_tokens=proposed, + verify_steps=verify_steps, + acceptance_length=acceptance_length, + has_counter_metrics=True, + selected_logs=tuple(item.path for item in counter_logs), + ) + + al_logs = [item for item in selected if item.acceptance_length_samples > 0] + al_samples = sum(item.acceptance_length_samples for item in al_logs) + if al_samples <= 0: + return None + acceptance_length = ( + sum(item.acceptance_length_total for item in al_logs) / al_samples + ) + + return AggregatedMetrics( + workers=len(al_logs), + samples=al_samples, + accepted_tokens=None, + proposed_draft_tokens=None, + verify_steps=None, + acceptance_length=acceptance_length, + has_counter_metrics=False, + selected_logs=tuple(item.path for item in al_logs), + ) + + +def _record_args(args: argparse.Namespace, metrics: AggregatedMetrics | None) -> argparse.Namespace: + record = argparse.Namespace( + output=args.output, + reference_yaml=args.reference_yaml, + model=args.model, + model_prefix=args.model_prefix, + thinking_mode=args.thinking_mode, + num_speculative_tokens=args.num_speculative_tokens, + category=args.category, + output_len=args.output_len, + temperature=args.temperature, + threshold_ratio=args.threshold_ratio, + max_threshold_ratio=args.max_threshold_ratio, + framework=args.framework, + metric_source=args.metric_source, + acceptance_length=None, + accepted_tokens=None, + draft_tokens=None, + verify_steps=None, + proposed_draft_tokens=None, + error=None, + exit_status=False, + ) + if metrics is None: + record.error = ( + "Could not parse Dynamo speculative acceptance metrics from decode-worker logs" + ) + return record + + metric_source = args.metric_source + if ( + not metrics.has_counter_metrics + and metric_source.endswith("-decode-log-counters") + ): + metric_source = metric_source[: -len("counters")] + "accept-length" + record.metric_source = f"{metric_source}-workers{metrics.workers}-samples{metrics.samples}" + record.acceptance_length = f"{metrics.acceptance_length:.4f}" + if metrics.has_counter_metrics: + record.accepted_tokens = str(metrics.accepted_tokens) + record.verify_steps = str(metrics.verify_steps) + record.draft_tokens = str(metrics.verify_steps) + record.proposed_draft_tokens = str(metrics.proposed_draft_tokens) + return record + + +def cmd_from_logs(args: argparse.Namespace) -> int: + metrics = aggregate_log_metrics(Path(args.logs_dir), args.num_speculative_tokens) + record_args = _record_args(args, metrics) + result = build_result(record_args) + rc = cmd_record(record_args) + + if metrics is not None: + print("Dynamo SpeedBench AL log aggregation:") + for path in metrics.selected_logs: + print(f" selected {path}") + if not result.get("passed"): + return 0 + return rc + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--logs-dir", required=True) + parser.add_argument("--output", required=True) + parser.add_argument("--reference-yaml", required=True) + parser.add_argument("--model", required=True) + parser.add_argument("--model-prefix", default="") + parser.add_argument("--thinking-mode", required=True) + parser.add_argument("--num-speculative-tokens", type=int, required=True) + parser.add_argument("--category", default="coding") + parser.add_argument("--output-len", type=int, default=4096) + parser.add_argument("--temperature", type=float, default=1.0) + parser.add_argument("--threshold-ratio", type=float, default=0.95) + parser.add_argument("--max-threshold-ratio", type=float, default=1.05) + parser.add_argument("--framework", default="dynamo") + parser.add_argument("--metric-source", default="dynamo-decode-log-counters") + parser.set_defaults(func=cmd_from_logs) + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + try: + return args.func(args) + except Exception as exc: # noqa: BLE001 - CLI should record a concise failure + print(f"ERROR: {exc}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/utils/evals/speedbench_al.py b/utils/evals/speedbench_al.py new file mode 100644 index 0000000000..94a17c6132 --- /dev/null +++ b/utils/evals/speedbench_al.py @@ -0,0 +1,343 @@ +#!/usr/bin/env python3 +"""SpeedBench acceptance-length reference and result helpers.""" + +from __future__ import annotations + +import argparse +import json +import re +import sys +from pathlib import Path +from typing import Any + + +MODEL_PREFIX_ALIASES = { + "dsv4": "deepseek-v4-pro", +} + +DEFAULT_MIN_THRESHOLD_RATIO = 0.95 +DEFAULT_MAX_THRESHOLD_RATIO = 1.05 + + +def scaled_threshold(reference: float, ratio: float) -> float: + """Scale an AL reference without leaking binary float noise into boundaries.""" + return round(reference * ratio, 10) + + +def _parse_scalar(value: str) -> Any: + value = value.strip() + if value in {"", "null", "None", "~"}: + return None + if value in {"N/A", "NA", "n/a", "na"}: + return None + if (value.startswith('"') and value.endswith('"')) or ( + value.startswith("'") and value.endswith("'") + ): + return value[1:-1] + try: + if re.match(r"^-?\d+$", value): + return int(value) + return float(value) + except ValueError: + return value + + +def _load_simple_reference_yaml(path: Path) -> dict[str, Any]: + """Parse the simple nested mapping emitted by the SpeedBench AL workflow.""" + data: dict[str, Any] = {} + current_model: str | None = None + current_mode: str | None = None + + for raw_line in path.read_text().splitlines(): + line = raw_line.split("#", 1)[0].rstrip() + if not line.strip(): + continue + indent = len(raw_line) - len(raw_line.lstrip(" ")) + if ":" not in line: + continue + key, value = line.strip().split(":", 1) + key = key.strip().strip("'\"") + value = value.strip() + + if indent == 0: + current_model = key + data.setdefault(current_model, {}) + current_mode = None + elif indent == 2 and current_model is not None: + current_mode = key + data[current_model].setdefault(current_mode, {}) + elif indent == 4 and current_model is not None and current_mode is not None: + data[current_model][current_mode][key] = _parse_scalar(value) + + return data + + +def load_reference(path: Path) -> dict[str, Any]: + try: + import yaml # type: ignore + except ImportError: + return _load_simple_reference_yaml(path) + + with path.open() as f: + loaded = yaml.safe_load(f) or {} + if not isinstance(loaded, dict): + raise ValueError(f"{path} must contain a mapping at the top level") + return loaded + + +def normalize_key(value: str) -> str: + value = value.strip().split("/")[-1].lower() + value = value.replace("_", "-") + value = re.sub(r"[^a-z0-9.+-]+", "-", value) + return value.strip("-") + + +def model_candidates(model: str, model_prefix: str | None = None) -> list[str]: + candidates: list[str] = [] + if model_prefix: + prefix = normalize_key(model_prefix) + candidates.append(MODEL_PREFIX_ALIASES.get(prefix, prefix)) + if model: + normalized = normalize_key(model) + candidates.append(MODEL_PREFIX_ALIASES.get(normalized, normalized)) + seen = set() + out = [] + for candidate in candidates: + if candidate and candidate not in seen: + out.append(candidate) + seen.add(candidate) + return out + + +def normalize_mode(thinking_mode: str) -> str: + mode = thinking_mode.strip().lower().replace("-", "_") + if mode == "on": + return "thinking_on" + if mode == "off": + return "thinking_off" + raise ValueError("SpeedBench thinking mode must be 'on' or 'off'") + + +def lookup_reference( + reference: dict[str, Any], + model: str, + model_prefix: str | None, + thinking_mode: str, + num_speculative_tokens: int, +) -> tuple[str, str, float]: + normalized_reference = {normalize_key(str(k)): v for k, v in reference.items()} + mode_key = normalize_mode(thinking_mode) + token_key = str(num_speculative_tokens) + + for candidate in model_candidates(model, model_prefix): + model_block = normalized_reference.get(candidate) + if not isinstance(model_block, dict): + continue + mode_block = model_block.get(mode_key) + if not isinstance(mode_block, dict): + continue + value = mode_block.get(num_speculative_tokens, mode_block.get(token_key)) + if value is None: + continue + try: + return candidate, mode_key, float(value) + except (TypeError, ValueError): + continue + + candidates = ", ".join(model_candidates(model, model_prefix)) or "" + raise KeyError( + "No SpeedBench AL reference for " + f"model candidates [{candidates}], mode {mode_key}, MTP {num_speculative_tokens}" + ) + + +def _optional_float(value: str | None) -> float | None: + if value in {None, "", "None", "null", "N/A"}: + return None + return float(value) + + +def _optional_int(value: str | None) -> int | None: + if value in {None, "", "None", "null", "N/A"}: + return None + return int(float(value)) + + +def build_result(args: argparse.Namespace) -> dict[str, Any]: + reference_al: float | None = None + min_acceptance_length: float | None = None + max_acceptance_length: float | None = None + model_key: str | None = None + mode_key = normalize_mode(args.thinking_mode) + error: str | None = args.error + + if args.reference_yaml: + reference_path = Path(args.reference_yaml) + if reference_path.exists(): + try: + model_key, mode_key, reference_al = lookup_reference( + load_reference(reference_path), + args.model, + args.model_prefix, + args.thinking_mode, + args.num_speculative_tokens, + ) + min_acceptance_length = scaled_threshold( + reference_al, args.threshold_ratio + ) + max_acceptance_length = scaled_threshold( + reference_al, args.max_threshold_ratio + ) + except Exception as exc: # noqa: BLE001 - recorded for CI artifacts + error = error or str(exc) + else: + error = error or f"Reference YAML not found: {reference_path}" + + acceptance_length = _optional_float(args.acceptance_length) + accepted_tokens = _optional_int(args.accepted_tokens) + draft_tokens = _optional_int(args.draft_tokens) + verify_steps = _optional_int(getattr(args, "verify_steps", None)) + proposed_draft_tokens = _optional_int(getattr(args, "proposed_draft_tokens", None)) + if verify_steps is None: + verify_steps = draft_tokens + passed = ( + error is None + and acceptance_length is not None + and min_acceptance_length is not None + and max_acceptance_length is not None + and acceptance_length >= min_acceptance_length + and acceptance_length <= max_acceptance_length + ) + + result = { + "speedbench_al_eval_version": 1, + "task": "speedbench_al", + "model": args.model, + "model_key": model_key, + "model_prefix": args.model_prefix, + "thinking_mode": mode_key, + "num_speculative_tokens": args.num_speculative_tokens, + "category": args.category, + "output_len": args.output_len, + "temperature": args.temperature, + "framework": getattr(args, "framework", ""), + "metric_source": getattr(args, "metric_source", ""), + "acceptance_length": acceptance_length, + "accepted_tokens": accepted_tokens, + "verify_steps": verify_steps, + "proposed_draft_tokens": proposed_draft_tokens, + "draft_tokens": draft_tokens, + "reference_acceptance_length": reference_al, + "threshold_ratio": args.threshold_ratio, + "max_threshold_ratio": args.max_threshold_ratio, + "min_acceptance_length": min_acceptance_length, + "max_acceptance_length": max_acceptance_length, + "passed": passed, + } + if error: + result["error"] = error + return result + + +def cmd_resolve(args: argparse.Namespace) -> int: + model_key, mode_key, reference_al = lookup_reference( + load_reference(Path(args.reference_yaml)), + args.model, + args.model_prefix, + args.thinking_mode, + args.num_speculative_tokens, + ) + payload = { + "model_key": model_key, + "thinking_mode": mode_key, + "num_speculative_tokens": args.num_speculative_tokens, + "reference_acceptance_length": reference_al, + "threshold_ratio": args.threshold_ratio, + "max_threshold_ratio": args.max_threshold_ratio, + "min_acceptance_length": scaled_threshold(reference_al, args.threshold_ratio), + "max_acceptance_length": scaled_threshold( + reference_al, args.max_threshold_ratio + ), + } + print(json.dumps(payload, sort_keys=True)) + return 0 + + +def cmd_record(args: argparse.Namespace) -> int: + result = build_result(args) + output = Path(args.output) + output.write_text(json.dumps(result, indent=2, sort_keys=True) + "\n") + status = "PASS" if result["passed"] else "FAIL" + actual = result.get("acceptance_length") + minimum = result.get("min_acceptance_length") + maximum = result.get("max_acceptance_length") + print( + f"{status}: SpeedBench AL {actual} " + f"(range [{minimum}, {maximum}], mode {result['thinking_mode']}, " + f"mtp {result['num_speculative_tokens']})" + ) + if args.exit_status and not result["passed"]: + return 1 + return 0 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description=__doc__) + subparsers = parser.add_subparsers(dest="command", required=True) + + resolve = subparsers.add_parser("resolve", help="Resolve a reference AL cell") + resolve.add_argument("--reference-yaml", required=True) + resolve.add_argument("--model", required=True) + resolve.add_argument("--model-prefix", default="") + resolve.add_argument("--thinking-mode", required=True) + resolve.add_argument("--num-speculative-tokens", type=int, required=True) + resolve.add_argument( + "--threshold-ratio", type=float, default=DEFAULT_MIN_THRESHOLD_RATIO + ) + resolve.add_argument( + "--max-threshold-ratio", type=float, default=DEFAULT_MAX_THRESHOLD_RATIO + ) + resolve.set_defaults(func=cmd_resolve) + + record = subparsers.add_parser("record", help="Write a compact AL eval result") + record.add_argument("--output", required=True) + record.add_argument("--reference-yaml", default="") + record.add_argument("--model", required=True) + record.add_argument("--model-prefix", default="") + record.add_argument("--thinking-mode", required=True) + record.add_argument("--num-speculative-tokens", type=int, required=True) + record.add_argument("--category", default="coding") + record.add_argument("--output-len", type=int, default=4096) + record.add_argument("--temperature", type=float, default=1.0) + record.add_argument( + "--threshold-ratio", type=float, default=DEFAULT_MIN_THRESHOLD_RATIO + ) + record.add_argument( + "--max-threshold-ratio", type=float, default=DEFAULT_MAX_THRESHOLD_RATIO + ) + record.add_argument("--framework", default="") + record.add_argument("--metric-source", default="") + record.add_argument("--acceptance-length", default=None) + record.add_argument("--accepted-tokens", default=None) + record.add_argument("--draft-tokens", default=None) + record.add_argument("--verify-steps", default=None) + record.add_argument("--proposed-draft-tokens", default=None) + record.add_argument("--error", default=None) + record.add_argument("--exit-status", action="store_true") + record.set_defaults(func=cmd_record) + + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + try: + return args.func(args) + except Exception as exc: # noqa: BLE001 - CLI should print concise failures + print(f"ERROR: {exc}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/utils/evals/speedbench_client.py b/utils/evals/speedbench_client.py new file mode 100644 index 0000000000..5ad869dd90 --- /dev/null +++ b/utils/evals/speedbench_client.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 +"""Small OpenAI-compatible client for SpeedBench AL eval load. + +This intentionally avoids importing vLLM benchmark code so the eval can run in +TensorRT-LLM and SGLang runtime images. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +from pathlib import Path +from typing import Any +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen + + +def _load_dsv4_encoder(): + bench_serving_dir = Path(__file__).resolve().parents[1] / "bench_serving" + sys.path.insert(0, str(bench_serving_dir)) + from encoding_dsv4 import encode_messages # type: ignore + + return encode_messages + + +def _load_speedbench_requests( + dataset_path: Path, + category: str, + num_prompts: int, +) -> list[list[dict[str, Any]]]: + jsonl_path = dataset_path / "qualitative.jsonl" + if not jsonl_path.is_file(): + raise FileNotFoundError(f"missing SpeedBench JSONL: {jsonl_path}") + + requests: list[list[dict[str, Any]]] = [] + with jsonl_path.open(encoding="utf-8") as handle: + for line in handle: + if not line.strip(): + continue + row = json.loads(line) + if category and row.get("category") != category: + continue + messages = row.get("messages") + if not isinstance(messages, list) or not messages: + continue + requests.append(messages) + if num_prompts > 0 and len(requests) >= num_prompts: + break + + if not requests: + raise ValueError(f"no SpeedBench prompts found for category={category!r}") + return requests + + +def _json_post( + url: str, + payload: dict[str, Any], + timeout: int, + retries: int, +) -> dict[str, Any]: + body = json.dumps(payload).encode("utf-8") + headers = {"Content-Type": "application/json"} + api_key = os.environ.get("OPENAI_API_KEY") + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + last_error: Exception | None = None + for attempt in range(retries + 1): + request = Request(url, data=body, headers=headers, method="POST") + try: + with urlopen(request, timeout=timeout) as response: + raw = response.read().decode("utf-8") + return json.loads(raw) if raw else {} + except HTTPError as exc: + detail = exc.read().decode("utf-8", errors="replace") + message = f"HTTP {exc.code} from {url}: {detail[:1000]}" + last_error = RuntimeError(message) + if exc.code < 500: + break + except URLError as exc: + last_error = exc + except TimeoutError as exc: + last_error = exc + + if attempt < retries: + time.sleep(min(2**attempt, 10)) + + assert last_error is not None + raise last_error + + +def _chat_payload( + messages: list[dict[str, Any]], + model: str, + output_len: int, + temperature: float, + thinking_mode: str, + thinking_kwargs: dict[str, Any], +) -> dict[str, Any]: + payload: dict[str, Any] = { + "model": model, + "messages": messages, + "max_tokens": output_len, + "temperature": temperature, + "stream": False, + } + if thinking_mode == "on" and thinking_kwargs: + payload["chat_template_kwargs"] = thinking_kwargs + if "reasoning_effort" in thinking_kwargs: + payload["reasoning_effort"] = thinking_kwargs["reasoning_effort"] + return payload + + +def _completion_payload( + messages: list[dict[str, Any]], + model: str, + output_len: int, + temperature: float, + thinking_mode: str, + thinking_kwargs: dict[str, Any], + dsv4: bool, +) -> dict[str, Any]: + if dsv4: + encode_messages = _load_dsv4_encoder() + prompt = encode_messages( + messages, + thinking_mode="thinking" if thinking_mode == "on" else "chat", + reasoning_effort=thinking_kwargs.get("reasoning_effort"), + ) + else: + first = messages[0] + prompt = first.get("content", "") if isinstance(first, dict) else str(first) + + return { + "model": model, + "prompt": prompt, + "max_tokens": output_len, + "temperature": temperature, + "stream": False, + } + + +def run(args: argparse.Namespace) -> int: + dataset_path = Path(args.dataset_path) + prompts = _load_speedbench_requests(dataset_path, args.category, args.num_prompts) + base_url = args.base_url.rstrip("/") + chat_url = f"{base_url}/v1/chat/completions" + completions_url = f"{base_url}/v1/completions" + thinking_kwargs = json.loads(args.thinking_kwargs) if args.thinking_kwargs else {} + + failures = 0 + resolved_endpoint = args.endpoint + for index, messages in enumerate(prompts, start=1): + endpoint_attempts = ["chat", "completions"] if resolved_endpoint == "auto" else [resolved_endpoint] + last_error: Exception | None = None + success = False + for endpoint in endpoint_attempts: + if endpoint == "completions": + payload = _completion_payload( + messages, + args.model, + args.output_len, + args.temperature, + args.thinking_mode, + thinking_kwargs, + args.dsv4, + ) + url = completions_url + else: + payload = _chat_payload( + messages, + args.model, + args.output_len, + args.temperature, + args.thinking_mode, + thinking_kwargs, + ) + url = chat_url + + try: + _json_post(url, payload, timeout=args.timeout, retries=args.retries) + except Exception as exc: + last_error = exc + if resolved_endpoint == "auto" and endpoint == "chat": + print( + "SpeedBench client chat endpoint failed; trying completions " + f"fallback: {exc}", + file=sys.stderr, + ) + continue + break + else: + if resolved_endpoint == "auto": + resolved_endpoint = endpoint + print( + f"SpeedBench client request {index}/{len(prompts)} " + f"completed via {endpoint}", + flush=True, + ) + success = True + break + + if success: + continue + + if last_error is None: + last_error = RuntimeError("no SpeedBench endpoint attempts were made") + failures += 1 + print( + f"SpeedBench client request {index}/{len(prompts)} failed: {last_error}", + file=sys.stderr, + ) + if failures > args.max_failures: + return 1 + + return 0 + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--model", required=True) + parser.add_argument("--base-url", required=True) + parser.add_argument("--dataset-path", required=True) + parser.add_argument("--category", default="coding") + parser.add_argument("--output-len", type=int, default=4096) + parser.add_argument("--temperature", type=float, default=1.0) + parser.add_argument("--thinking-mode", choices=["on", "off"], default="off") + parser.add_argument("--thinking-kwargs", default="") + parser.add_argument("--endpoint", choices=["auto", "chat", "completions"], default="auto") + parser.add_argument("--num-prompts", type=int, default=-1) + parser.add_argument("--timeout", type=int, default=1800) + parser.add_argument("--retries", type=int, default=2) + parser.add_argument("--max-failures", type=int, default=0) + parser.add_argument("--dsv4", action="store_true") + return run(parser.parse_args()) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/utils/evals/trtllm_speedbench_al_from_log.py b/utils/evals/trtllm_speedbench_al_from_log.py new file mode 100644 index 0000000000..c63a933915 --- /dev/null +++ b/utils/evals/trtllm_speedbench_al_from_log.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +"""Parse TRT-LLM iteration logs into SpeedBench AL counters.""" + +from __future__ import annotations + +import argparse +import math +import re +import sys +from dataclasses import dataclass +from pathlib import Path + + +GEN_TOKENS_RE = re.compile(r"'num_generation_tokens':\s*(?P[0-9]+)") +ITER_LINE_RE = re.compile(r"\biter\s*=\s*[0-9]+,.*\bstates\s*=") +METRICS_GET_RE = re.compile(r'GET\s+/(?:prometheus/)?metrics\b') + + +@dataclass(frozen=True) +class TrtLogMetrics: + samples: int + accepted_tokens: int + proposed_draft_tokens: int + verify_steps: int + generated_tokens: int + + @property + def acceptance_length(self) -> float: + return self.generated_tokens / self.verify_steps + + +def _read_log_suffix(path: Path, start_offset: int) -> list[str]: + with path.open("rb") as f: + if start_offset > 0: + f.seek(start_offset) + return f.read().decode(errors="ignore").splitlines() + + +def parse_trtllm_iteration_log( + path: Path, + mtp: int, + start_offset: int = 0, + stop_at_metrics_get: bool = True, +) -> TrtLogMetrics | None: + if mtp <= 0: + raise ValueError("mtp must be positive") + if not path.is_file(): + return None + + samples = 0 + accepted = 0 + proposed = 0 + verify_steps = 0 + generated = 0 + max_tokens_per_step = mtp + 1 + + for line in _read_log_suffix(path, start_offset): + if samples and stop_at_metrics_get and METRICS_GET_RE.search(line): + break + if not ITER_LINE_RE.search(line): + continue + match = GEN_TOKENS_RE.search(line) + if not match: + continue + + gen_tokens = int(match.group("tokens")) + if gen_tokens <= 0: + continue + + # SpeedBench AL is issued at max-concurrency=1 today, where each + # generation iteration is one verification step. Keep a batched fallback + # for postmortem logs by assuming no step can emit more than mtp + 1 + # tokens per active request. + steps = max(1, math.ceil(gen_tokens / max_tokens_per_step)) + samples += 1 + verify_steps += steps + generated += gen_tokens + accepted += max(gen_tokens - steps, 0) + proposed += steps * mtp + + if samples == 0 or verify_steps <= 0: + return None + + return TrtLogMetrics( + samples=samples, + accepted_tokens=accepted, + proposed_draft_tokens=proposed, + verify_steps=verify_steps, + generated_tokens=generated, + ) + + +def cmd_tsv(args: argparse.Namespace) -> int: + metrics = parse_trtllm_iteration_log( + Path(args.log), + args.num_speculative_tokens, + start_offset=max(args.start_offset, 0), + stop_at_metrics_get=not args.no_stop_at_metrics_get, + ) + if metrics is None: + return 1 + + print( + f"{metrics.acceptance_length:.4f}\t" + f"{metrics.accepted_tokens}\t" + f"{metrics.verify_steps}\t" + f"{metrics.proposed_draft_tokens}\t" + f"{metrics.samples}" + ) + return 0 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--log", required=True) + parser.add_argument("--num-speculative-tokens", type=int, required=True) + parser.add_argument("--start-offset", type=int, default=0) + parser.add_argument( + "--no-stop-at-metrics-get", + action="store_true", + help="Do not stop parsing at the next /metrics request after samples appear.", + ) + parser.set_defaults(func=cmd_tsv) + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + try: + return args.func(args) + except Exception as exc: # noqa: BLE001 - CLI should return concise diagnostics + print(f"ERROR: {exc}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/utils/evals/validate_scores.py b/utils/evals/validate_scores.py index 1fff4178e0..4722174a79 100644 --- a/utils/evals/validate_scores.py +++ b/utils/evals/validate_scores.py @@ -190,6 +190,51 @@ def validate_batch_manifest( return errors +def validate_speedbench_al(data: dict, source: str) -> tuple[bool, int]: + """Validate a compact SpeedBench AL result JSON.""" + if "speedbench_al_eval_version" not in data: + return False, 0 + + actual = data.get("acceptance_length") + minimum = data.get("min_acceptance_length") + maximum = data.get("max_acceptance_length") + passed = data.get("passed") + label = ( + f"{data.get('task', 'speedbench_al')} " + f"{data.get('thinking_mode', 'unknown')} " + f"mtp{data.get('num_speculative_tokens', 'unknown')}" + ) + + values_are_numeric = all( + isinstance(value, (int, float)) for value in (actual, minimum, maximum) + ) + within_range = values_are_numeric and minimum <= actual <= maximum + + if passed is True and within_range: + print( + f"PASS: {label} AL = {float(actual):.4f} " + f"(range [{float(minimum):.4f}, {float(maximum):.4f}])" + ) + return True, 1 + + if values_are_numeric: + if actual < minimum: + comparison = "below" + elif actual > maximum: + comparison = "above" + else: + comparison = "marked failed" + print( + f"FAIL: {label} AL = {actual:.4f} ({comparison}; " + f"expected [{minimum:.4f}, {maximum:.4f}])", + file=sys.stderr, + ) + else: + error = data.get("error", "missing acceptance length or validation bounds") + print(f"FAIL: {label} in {source}: {error}", file=sys.stderr) + return False, 1 + + def main() -> int: # CI merges this script's stdout and stderr into a single log. When stdout # is a pipe it is block-buffered by default and only flushes at exit, which @@ -302,6 +347,14 @@ def main() -> int: conc_label = f"[conc={match.group(1)}] " if match else "" with open(f) as fh: data = json.load(fh) + + speedbench_ok, speedbench_checked = validate_speedbench_al(data, f) + if speedbench_checked: + checked += speedbench_checked + if not speedbench_ok: + failed = True + continue + for task, metrics in data.get("results", {}).items(): min_score, source = resolve_threshold(config, prefix, task, args.min_score) for name, val in metrics.items(): diff --git a/utils/evals/write_dynamo_speedbench_al_from_logs.sh b/utils/evals/write_dynamo_speedbench_al_from_logs.sh new file mode 100644 index 0000000000..d47d428d48 --- /dev/null +++ b/utils/evals/write_dynamo_speedbench_al_from_logs.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash + +set -u + +logs_dir="${1:-}" +workspace="${2:-${GITHUB_WORKSPACE:-$(pwd)}}" + +if [[ -z "$logs_dir" ]]; then + echo "Dynamo SpeedBench AL: missing logs directory argument" >&2 + exit 0 +fi + +if [[ "${FRAMEWORK:-}" != dynamo* || "${SPEC_DECODING:-none}" != "mtp" ]]; then + echo "Dynamo SpeedBench AL: skipping FRAMEWORK=${FRAMEWORK:-unknown} SPEC_DECODING=${SPEC_DECODING:-none}" + exit 0 +fi + +mtp="${SPEEDBENCH_NUM_SPEC_TOKENS:-${NUM_SPEC_TOKENS:-${SPECULATIVE_DRAFT_TOKENS:-}}}" +if [[ -z "$mtp" && -n "${CONFIG_FILE:-}" ]]; then + config_path="${CONFIG_FILE%%:*}" + if [[ -f "$config_path" ]]; then + mtp="$(sed -n 's/.*num_speculative_tokens[^0-9]*\([0-9][0-9]*\).*/\1/p' "$config_path" | head -1)" + fi +fi +mtp="${mtp:-2}" + +mode="${SPEEDBENCH_THINKING_MODE:-}" +if [[ -z "$mode" ]]; then + if [[ "${MODEL_PREFIX:-}" == "dsv4" ]]; then + mode="on" + else + mode="off" + fi +fi + +model_name="${MODEL_NAME:-${MODEL:-}}" +if [[ -z "$model_name" ]]; then + model_name="${SERVED_MODEL_NAME:-unknown}" +fi + +reference_yaml="${SPEEDBENCH_REFERENCE_YAML:-}" +if [[ -z "$reference_yaml" ]]; then + case "${MODEL_PREFIX:-}" in + dsv4) + reference_yaml="${workspace}/golden_al_distribution/dsv4_mtp.yaml" + ;; + *) + echo "Dynamo SpeedBench AL: no golden AL file for MODEL_PREFIX=${MODEL_PREFIX:-unknown}" + exit 0 + ;; + esac +fi + +output="${workspace}/results_speedbench_al_${mode}_mtp${mtp}.json" +metric_source="dynamo-decode-log-counters" +if [[ -n "${FRAMEWORK:-}" ]]; then + metric_source="${FRAMEWORK}-decode-log-counters" +fi + +echo "Dynamo SpeedBench AL: parsing decode logs from $logs_dir" +python3 "${workspace}/utils/evals/dynamo_speedbench_al_from_logs.py" \ + --logs-dir "$logs_dir" \ + --output "$output" \ + --reference-yaml "$reference_yaml" \ + --model "$model_name" \ + --model-prefix "${MODEL_PREFIX:-}" \ + --thinking-mode "$mode" \ + --num-speculative-tokens "$mtp" \ + --framework "${FRAMEWORK:-dynamo}" \ + --metric-source "$metric_source" || true