Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 30 additions & 38 deletions src/google/adk/telemetry/_instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from . import _metrics
from . import tracing
from ..events import event as event_lib
from ._schema_version import resolve_schema_version
from ._schema_version import SCHEMA_VERSION_SEMCONV_ALIGNED

if TYPE_CHECKING:
from ..agents.base_agent import BaseAgent
Expand All @@ -41,52 +43,42 @@
logger = logging.getLogger("google_adk." + __name__)


def _get_elapsed_s(
span: trace.Span | tracing.GenerateContentSpan | None,
fallback_start: float,
) -> float:
"""Guarantees consistent time source for duration calculation.

Note: This must be called with an ended span.

Args:
span (trace.Span | tracing.GenerateContentSpan | None): The ended span to
extract duration from.
fallback_start (float): Fallback start time in seconds (monotonic).

Returns:
float: Elapsed duration in seconds.
"""
if span is None:
return time.monotonic() - fallback_start

span = span.span if hasattr(span, "span") else span
start_ns = getattr(span, "start_time", None)
end_ns = getattr(span, "end_time", None)

if isinstance(start_ns, int) and isinstance(end_ns, int):
return (end_ns - start_ns) / 1e9 # Convert ns to s

# Fallback if span times are missing
return time.monotonic() - fallback_start


@contextlib.contextmanager
def record_invocation(
entrypoint_node: BaseNode | None,
conversation_id: str,
) -> Iterator[None]:
"""Top-level ``invocation`` span for a runner invocation.
"""Top-level invocation span for a runner invocation.

Schema v1 emits the legacy ``invocation`` span. Schema v2 replaces it with an
entrypoint ``invoke_workflow {entrypoint}`` span (entrypoint = root agent or
root node name), which omits the ``gen_ai.workflow.nested`` attribute, and a
``gen_ai.invoke_workflow.duration`` metric -- unless the entrypoint is itself
a workflow, in which case its own node span is the entrypoint
``invoke_workflow`` span and we avoid double-emitting it here.

Args:
entrypoint_node: The runner's root agent/node.
conversation_id: Session/conversation id.
conversation_id: Session/conversation id (stamped on the v2 span).

Yields:
Nothing; the span is active for the duration of the block.
Nothing; the span (if any) is active for the duration of the block.
"""
del entrypoint_node, conversation_id # Unused until schema v2 lands.
with tracing.tracer.start_as_current_span("invocation"):
if resolve_schema_version() < SCHEMA_VERSION_SEMCONV_ALIGNED:
with tracing.tracer.start_as_current_span("invocation"):
yield
return

from . import node_tracing
from ..workflow._workflow import Workflow

if isinstance(entrypoint_node, Workflow):
# The workflow's own node span is the entrypoint `invoke_workflow` span.
yield
return

entrypoint_name = entrypoint_node.name if entrypoint_node else ""
with node_tracing._use_invoke_workflow_span(entrypoint_name, conversation_id):
yield


Expand Down Expand Up @@ -152,7 +144,7 @@ async def record_agent_invocation(
finally:
_record_agent_metrics(
agent.name,
_get_elapsed_s(span, start_time),
_metrics.get_elapsed_s(span, start_time),
getattr(ctx, "user_content", None),
getattr(getattr(ctx, "session", None), "events", []),
caught_error,
Expand Down Expand Up @@ -198,7 +190,7 @@ async def record_tool_execution(
tool_name=tool.name,
tool_type=tool.__class__.__name__,
agent_name=agent.name,
elapsed_s=_get_elapsed_s(span, start_time),
elapsed_s=_metrics.get_elapsed_s(span, start_time),
error=caught_error,
)
except Exception: # pylint: disable=broad-exception-caught
Expand Down Expand Up @@ -227,7 +219,7 @@ async def record_inference_telemetry(
finally:
inference_error = sys.exc_info()[1]
agent = invocation_context.agent
elapsed_s = _get_elapsed_s(tel_ctx.span, start_time)
elapsed_s = _metrics.get_elapsed_s(tel_ctx.span, start_time)
try:
if agent is not None and tracing._should_emit_native_telemetry(agent):
_metrics.record_client_operation_duration(
Expand Down
61 changes: 61 additions & 0 deletions src/google/adk/telemetry/_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import annotations

import logging
import time
from typing import TYPE_CHECKING

from google.adk import version
Expand All @@ -30,6 +31,10 @@
from google.adk.events.event import Event
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse
from opentelemetry.trace import Span
from opentelemetry.util.types import AttributeValue

from .tracing import GenerateContentSpan

logger = logging.getLogger("google_adk." + __name__)

Expand Down Expand Up @@ -61,6 +66,11 @@
409.6,
],
)
_workflow_invocation_duration = meter.create_histogram(
"gen_ai.invoke_workflow.duration",
unit="s",
description="Duration of workflow invocations.",
)
_tool_execution_duration = meter.create_histogram(
"gen_ai.tool.execution.duration",
unit="s",
Expand Down Expand Up @@ -160,6 +170,27 @@ def record_agent_invocation_duration(
_agent_invocation_duration.record(elapsed_s, attributes=attrs)


def record_workflow_invocation_duration(
*,
workflow_name: str,
elapsed_s: float,
nested: bool,
error: BaseException | None = None,
) -> None:
"""Records the duration of a workflow invocation."""
attrs: dict[str, AttributeValue] = {
gen_ai_attributes.GEN_AI_OPERATION_NAME: "invoke_workflow",
}
# Root workflow omits the attribute entirely; only nested ones emit it.
if nested:
attrs["gen_ai.workflow.nested"] = True
if error is not None:
attrs[error_attributes.ERROR_TYPE] = type(error).__name__
if workflow_name:
attrs["gen_ai.workflow.name"] = workflow_name
_workflow_invocation_duration.record(elapsed_s, attributes=attrs)


def record_agent_request_size(
agent_name: str, user_content: types.Content | None
):
Expand Down Expand Up @@ -303,3 +334,33 @@ def _get_content_size(

def _get_provider_name() -> str:
return tracing._guess_gemini_system_name()


def get_elapsed_s(
span: Span | GenerateContentSpan | None,
fallback_start: float,
) -> float:
"""Guarantees consistent time source for duration calculation.

Note: This must be called with an ended span.

Args:
span (trace.Span | tracing.GenerateContentSpan | None): The ended span to
extract duration from.
fallback_start (float): Fallback start time in seconds (monotonic).

Returns:
float: Elapsed duration in seconds.
"""
if span is None:
return time.monotonic() - fallback_start

span = span.span if hasattr(span, "span") else span
start_ns = getattr(span, "start_time", None)
end_ns = getattr(span, "end_time", None)

if isinstance(start_ns, int) and isinstance(end_ns, int):
return (end_ns - start_ns) / 1e9 # Convert ns to s

# Fallback if span times are missing
return time.monotonic() - fallback_start
Loading
Loading