From eff4bed0af6d0ea136cf0e5ef576ef11fcb87715 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Wed, 25 Mar 2026 15:55:15 -0300 Subject: [PATCH] feat(closes OPEN-9557): extend promote to resolve fields from function outputs Co-Authored-By: Claude Opus 4.6 (1M context) --- examples/tracing/promote_trace_columns.py | 171 ++++++++++++++++++ src/openlayer/lib/tracing/tracer.py | 156 +++++++++++++--- tests/test_tracing_core.py | 207 ++++++++++++++++++++++ 3 files changed, 512 insertions(+), 22 deletions(-) create mode 100644 examples/tracing/promote_trace_columns.py diff --git a/examples/tracing/promote_trace_columns.py b/examples/tracing/promote_trace_columns.py new file mode 100644 index 00000000..b4a9f29e --- /dev/null +++ b/examples/tracing/promote_trace_columns.py @@ -0,0 +1,171 @@ +""" +Example: Promoting inputs and outputs to top-level trace columns. + +The `promote` parameter on @trace() lets you surface function inputs *and* +output fields as top-level columns in the trace data, so you can create +Openlayer tests against them (e.g. "agent_tool_call_count < 10"). + +Keys are resolved from **inputs first**, then from the **output** (dict, +Pydantic model, or dataclass). Use a list to keep original names, or a dict +to alias them and avoid collisions between parent and child steps. +""" + +import dataclasses +import os +from typing import Any, Dict, List + +from pydantic import BaseModel + +os.environ["OPENLAYER_API_KEY"] = "your-api-key-here" +os.environ["OPENLAYER_INFERENCE_PIPELINE_ID"] = "your-pipeline-id-here" + +from openlayer.lib import trace +from openlayer.lib.tracing import tracer + + +# --------------------------------------------------------------------------- +# 1. Promote from a Pydantic model output +# --------------------------------------------------------------------------- + +class AgentResult(BaseModel): + answer: str + tool_call_count: int + tool_names: List[str] + + +@trace(promote={ + "user_query": "agent_input_query", # from input + "tool_call_count": "agent_tool_calls", # from output + "tool_names": "agent_tools", # from output +}) +def run_agent(user_query: str) -> AgentResult: + """Simulates an agent that uses tools to answer a question. + + The trace data will include three top-level columns: + - agent_input_query (from the `user_query` input) + - agent_tool_calls (from the Pydantic output's `tool_call_count`) + - agent_tools (from the Pydantic output's `tool_names`) + """ + # ... agent logic would go here ... + return AgentResult( + answer="Paris is the capital of France.", + tool_call_count=2, + tool_names=["web_search", "summarize"], + ) + + +# --------------------------------------------------------------------------- +# 2. Promote from a dict output (list form -- no aliasing) +# --------------------------------------------------------------------------- + +@trace(promote=["score", "confidence"]) +def evaluate(text: str) -> Dict[str, Any]: + """Evaluates text quality. `score` and `confidence` become top-level columns.""" + return {"score": 0.95, "confidence": 0.87, "explanation": "Well-structured."} + + +# --------------------------------------------------------------------------- +# 3. Promote from a dataclass output +# --------------------------------------------------------------------------- + +@dataclasses.dataclass +class RetrievalResult: + documents: List[str] + doc_count: int + avg_relevance: float + + +@trace(promote={"doc_count": "retrieval_doc_count", "avg_relevance": "retrieval_relevance"}) +def retrieve(query: str) -> RetrievalResult: + """Retrieves relevant documents. Promotes doc_count and avg_relevance.""" + return RetrievalResult( + documents=["doc_a", "doc_b", "doc_c"], + doc_count=3, + avg_relevance=0.82, + ) + + +# --------------------------------------------------------------------------- +# 4. Nested traces -- child steps promote to the same top-level row +# --------------------------------------------------------------------------- + + +class ToolResult(BaseModel): + tool_call_count: int + tool_names: List[str] + result: str + + +@trace(promote={"tool_call_count": "child_tool_calls", "tool_names": "child_tools"}) +def inner_agent_step(task: str) -> ToolResult: + """A child step whose output fields are promoted to the parent trace. + + Even though this is a nested step, `promote` writes to the shared Trace + object, so `child_tool_calls` and `child_tools` become top-level columns. + """ + return ToolResult( + tool_call_count=5, + tool_names=["search", "calculator", "code_exec", "summarize", "translate"], + result=f"Completed: {task}", + ) + + +@trace(promote={"user_query": "input_query"}) +def orchestrator(user_query: str) -> str: + """Parent function that delegates to a child step. + + After execution the trace will have top-level columns from *both* levels: + - input_query (parent input) + - child_tool_calls (child output) + - child_tools (child output) + """ + step1 = inner_agent_step("look up facts") + step2 = inner_agent_step("summarize findings") + return f"{step1.result} | {step2.result}" + + +# --------------------------------------------------------------------------- + +def main(): + print("=== 1. Promote from Pydantic output ===") + result = run_agent("What is the capital of France?") + print(f" Answer: {result.answer}") + print(f" Tool calls: {result.tool_call_count}") + print() + + print("=== 2. Promote from dict output (list form) ===") + scores = evaluate("The quick brown fox.") + print(f" Score: {scores['score']}, Confidence: {scores['confidence']}") + print() + + print("=== 3. Promote from dataclass output ===") + docs = retrieve("machine learning basics") + print(f" Retrieved {docs.doc_count} docs, avg relevance: {docs.avg_relevance}") + print() + + print("=== 4. Nested traces -- child promote to parent row ===") + captured_trace = None + + @trace(promote={"user_query": "input_query"}) + def traced_orchestrator(user_query: str) -> str: + nonlocal captured_trace + step1 = inner_agent_step("look up facts") + step2 = inner_agent_step("summarize findings") + captured_trace = tracer.get_current_trace() + return f"{step1.result} | {step2.result}" + + out = traced_orchestrator("hello world") + print(f" Result: {out}") + print(f" Trace metadata: {captured_trace.metadata}") + assert captured_trace.metadata["input_query"] == "hello world", "parent input promoted" + assert captured_trace.metadata["child_tool_calls"] == 5, "child output promoted" + assert "search" in captured_trace.metadata["child_tools"], "child list output promoted" + print(" All promoted columns verified!") + print() + + print("Check your Openlayer dashboard -- promoted fields appear as top-level") + print("columns you can write tests against (e.g. child_tool_calls < 10).") + + +if __name__ == "__main__": + main() diff --git a/src/openlayer/lib/tracing/tracer.py b/src/openlayer/lib/tracing/tracer.py index 319ebbf3..baea887f 100644 --- a/src/openlayer/lib/tracing/tracer.py +++ b/src/openlayer/lib/tracing/tracer.py @@ -3,6 +3,7 @@ import asyncio import atexit import contextvars +import dataclasses import inspect import json import logging @@ -532,15 +533,20 @@ def trace( Parameters ---------- promote : list of str or dict mapping str to str, optional - Kwarg names whose values should be surfaced as top-level columns in the - trace data. Pass a list to use the original kwarg names as column names, + Names whose values should be surfaced as top-level columns in the + trace data. Keys are resolved from function **inputs** first; any + unresolved keys are then looked up in the function **output** (dict, + Pydantic model, or dataclass). Pass a list to keep original names, or a dict to alias them:: - # List form – uses original kwarg names - @tracer.trace(promote=["tool_call_count", "user_query"]) + # List form – uses original names + @tracer.trace(promote=["user_query", "tool_call_count"]) - # Dict form – maps kwarg_name -> column_name - @tracer.trace(promote={"user_query": "agent_input_query"}) + # Dict form – maps name -> column_name + @tracer.trace(promote={ + "user_query": "agent_input_query", # from input + "tool_call_count": "agent_tool_calls", # from output + }) Examples -------- @@ -601,6 +607,7 @@ def __init__(self): self._token = None self._output_chunks = [] self._trace_initialized = False + self._unresolved_promote = {} self._captured_context = ( None # Capture context for ASGI compatibility ) @@ -628,7 +635,9 @@ def __next__(self): context_kwarg=context_kwarg, question_kwarg=question_kwarg, ) - _apply_promote_kwargs(self._inputs, promote) + self._unresolved_promote = _apply_promote_kwargs( + self._inputs, promote + ) self._trace_initialized = True try: @@ -642,6 +651,13 @@ def __next__(self): # Use captured context to ensure we have access to the trace output = _join_output_chunks(self._output_chunks) if self._captured_context: + # Run promote inside captured context so + # update_current_trace can find the trace ContextVar + self._captured_context.run( + _apply_promote_output, + output, + self._unresolved_promote, + ) self._captured_context.run( _finalize_sync_generator_step, step=self._step, @@ -654,6 +670,9 @@ def __next__(self): on_flush_failure=on_flush_failure, ) else: + _apply_promote_output( + output, self._unresolved_promote + ) _finalize_sync_generator_step( step=self._step, token=self._token, @@ -724,7 +743,9 @@ def wrapper(*func_args, **func_kwargs): context_kwarg=context_kwarg, question_kwarg=question_kwarg, ) - _apply_promote_kwargs(original_inputs, promote) + unresolved_promote = _apply_promote_kwargs( + original_inputs, promote + ) # Apply input guardrails modified_inputs, input_guardrail_metadata = ( @@ -766,6 +787,9 @@ def wrapper(*func_args, **func_kwargs): else: output = func(*func_args, **func_kwargs) + # Promote unresolved keys from output + _apply_promote_output(output, unresolved_promote) + # Apply output guardrails (skip if function was skipped) if ( hasattr(modified_inputs, "__class__") @@ -841,15 +865,20 @@ def trace_async( Parameters ---------- promote : list of str or dict mapping str to str, optional - Kwarg names whose values should be surfaced as top-level columns in the - trace data. Pass a list to use the original kwarg names as column names, + Names whose values should be surfaced as top-level columns in the + trace data. Keys are resolved from function **inputs** first; any + unresolved keys are then looked up in the function **output** (dict, + Pydantic model, or dataclass). Pass a list to keep original names, or a dict to alias them:: - # List form – uses original kwarg names - @tracer.trace_async(promote=["job_id", "user_query"]) + # List form – uses original names + @tracer.trace_async(promote=["job_id", "tool_call_count"]) - # Dict form – maps kwarg_name -> column_name - @tracer.trace_async(promote={"user_query": "agent_input_query"}) + # Dict form – maps name -> column_name + @tracer.trace_async(promote={ + "user_query": "agent_input_query", # from input + "tool_call_count": "agent_tool_calls", # from output + }) Examples -------- @@ -892,6 +921,7 @@ def __init__(self): self._token = None self._output_chunks = [] self._trace_initialized = False + self._unresolved_promote = {} def __aiter__(self): return self @@ -916,7 +946,9 @@ async def __anext__(self): context_kwarg=context_kwarg, question_kwarg=question_kwarg, ) - _apply_promote_kwargs(self._inputs, promote) + self._unresolved_promote = _apply_promote_kwargs( + self._inputs, promote + ) self._trace_initialized = True try: @@ -926,6 +958,9 @@ async def __anext__(self): except StopAsyncIteration: # Finalize trace when generator is exhausted output = _join_output_chunks(self._output_chunks) + _apply_promote_output( + output, self._unresolved_promote + ) _finalize_async_generator_step( step=self._step, token=self._token, @@ -968,6 +1003,7 @@ async def async_function_wrapper(*func_args, **func_kwargs): ) as step: output = exception = None guardrail_metadata = {} + unresolved_promote = {} try: # Apply promote / input guardrails if provided @@ -980,7 +1016,9 @@ async def async_function_wrapper(*func_args, **func_kwargs): context_kwarg=context_kwarg, question_kwarg=question_kwarg, ) - _apply_promote_kwargs(inputs, promote) + unresolved_promote = _apply_promote_kwargs( + inputs, promote + ) if guardrails: # Process inputs through guardrails @@ -1030,6 +1068,9 @@ async def async_function_wrapper(*func_args, **func_kwargs): _log_step_exception(step, exc) raise exc + # Promote unresolved keys from output + _apply_promote_output(output, unresolved_promote) + # Apply output guardrails if provided if guardrails and output is not None: try: @@ -1080,6 +1121,7 @@ def sync_wrapper(*func_args, **func_kwargs): ) as step: output = exception = None guardrail_metadata = {} + unresolved_promote = {} try: # Apply promote / input guardrails if provided if promote or guardrails: @@ -1091,7 +1133,9 @@ def sync_wrapper(*func_args, **func_kwargs): context_kwarg=context_kwarg, question_kwarg=question_kwarg, ) - _apply_promote_kwargs(inputs, promote) + unresolved_promote = _apply_promote_kwargs( + inputs, promote + ) if guardrails: # Process inputs through guardrails @@ -1137,6 +1181,9 @@ def sync_wrapper(*func_args, **func_kwargs): _log_step_exception(step, exc) exception = exc + # Promote unresolved keys from output + _apply_promote_output(output, unresolved_promote) + # Apply output guardrails if provided if guardrails and output is not None: try: @@ -1869,21 +1916,86 @@ def _extract_function_inputs( def _apply_promote_kwargs( inputs: dict, promote: Optional[Union[List[str], Dict[str, str]]], -) -> None: - """Promote selected function kwargs to trace-level columns.""" +) -> Dict[str, str]: + """Promote selected function kwargs to trace-level columns. + + Returns a mapping of unresolved keys (field_name -> column_name) that were + not found in the function inputs, so they can be tried against the output. + """ if not promote: - return + return {} mapping: Dict[str, str] = ( {k: k for k in promote} if isinstance(promote, list) else promote ) resolved: Dict[str, Any] = {} + unresolved: Dict[str, str] = {} for kwarg_name, column_name in mapping.items(): if kwarg_name in inputs: resolved[column_name] = inputs[kwarg_name] + else: + unresolved[kwarg_name] = column_name + if resolved: + update_current_trace(**resolved) + return unresolved + + +def _apply_promote_output( + output: Any, + unresolved: Dict[str, str], +) -> None: + """Promote fields from a function's output to trace-level columns. + + Parameters + ---------- + output : Any + The function return value. Supported types: dict, Pydantic model, + dataclass. Other types are skipped with a warning. + unresolved : dict + Mapping of field_name -> column_name for keys that were not found + in the function inputs (returned by ``_apply_promote_kwargs``). + """ + if not unresolved or output is None: + return + + # Convert output to a dict + output_dict: Optional[Dict[str, Any]] = None + if isinstance(output, dict): + output_dict = output + elif hasattr(output, "model_dump") and callable(output.model_dump): + # Pydantic v2 + try: + output_dict = output.model_dump() + except Exception: + logger.warning("promote: failed to call model_dump() on output.") + elif hasattr(output, "dict") and callable(output.dict): + # Pydantic v1 + try: + output_dict = output.dict() + except Exception: + logger.warning("promote: failed to call dict() on output.") + elif dataclasses.is_dataclass(output) and not isinstance(output, type): + try: + output_dict = dataclasses.asdict(output) + except Exception: + logger.warning("promote: failed to convert dataclass output to dict.") + + if output_dict is None: + for field_name, column_name in unresolved.items(): + logger.warning( + "promote: key `%s` not found in inputs and output is not a " + "dict, Pydantic model, or dataclass.", + field_name, + ) + return + + resolved: Dict[str, Any] = {} + for field_name, column_name in unresolved.items(): + if field_name in output_dict: + resolved[column_name] = utils.json_serialize(output_dict[field_name]) else: logger.warning( - "promote: kwarg `%s` not found in inputs of the current function.", - kwarg_name, + "promote: key `%s` not found in function inputs or output.", + field_name, ) if resolved: update_current_trace(**resolved) diff --git a/tests/test_tracing_core.py b/tests/test_tracing_core.py index 7ebdda3d..99e2b18a 100644 --- a/tests/test_tracing_core.py +++ b/tests/test_tracing_core.py @@ -605,3 +605,210 @@ def outer_function() -> None: assert inner_step.metadata is not None assert "Exceptions" in inner_step.metadata assert "Inner error" in inner_step.metadata["Exceptions"] + + +class TestPromoteOutput: + """Test promote parameter with output field extraction.""" + + def setup_method(self) -> None: + tracer._configured_api_key = None + tracer._configured_pipeline_id = None + tracer._configured_base_url = None + tracer._client = None + + def teardown_method(self) -> None: + tracer._configured_api_key = None + tracer._configured_pipeline_id = None + tracer._configured_base_url = None + tracer._client = None + + @patch.object(tracer, "_publish", False) + def test_promote_input_kwargs_only(self) -> None: + """Promote with input-only keys still works (regression).""" + captured_trace = None + + @tracer.trace(promote={"query": "user_query"}) + def my_func(query: str) -> str: + nonlocal captured_trace + captured_trace = tracer.get_current_trace() + return "answer" + + my_func("hello") + assert captured_trace is not None + assert captured_trace.metadata["user_query"] == "hello" + + @patch.object(tracer, "_publish", False) + def test_promote_output_from_dict(self) -> None: + """Promote fields from a dict output.""" + captured_trace = None + + @tracer.trace(promote={"tool_count": "agent_tool_count"}) + def my_func() -> Dict[str, Any]: + nonlocal captured_trace + captured_trace = tracer.get_current_trace() + return {"tool_count": 5, "answer": "result"} + + result = my_func() + assert result == {"tool_count": 5, "answer": "result"} + assert captured_trace is not None + assert captured_trace.metadata["agent_tool_count"] == 5 + + @patch.object(tracer, "_publish", False) + def test_promote_output_from_pydantic_model(self) -> None: + """Promote fields from a Pydantic model output.""" + pytest.importorskip("pydantic") + from pydantic import BaseModel + + class AgentResult(BaseModel): + answer: str + tool_call_count: int + tool_names: List[str] + + captured_trace = None + + @tracer.trace(promote={ + "tool_call_count": "agent_tool_calls", + "tool_names": "agent_tools", + }) + def my_func() -> AgentResult: + nonlocal captured_trace + captured_trace = tracer.get_current_trace() + return AgentResult( + answer="Paris", + tool_call_count=3, + tool_names=["search", "lookup", "summarize"], + ) + + result = my_func() + assert result.answer == "Paris" + assert captured_trace is not None + assert captured_trace.metadata["agent_tool_calls"] == 3 + assert captured_trace.metadata["agent_tools"] == [ + "search", "lookup", "summarize" + ] + + @patch.object(tracer, "_publish", False) + def test_promote_output_from_dataclass(self) -> None: + """Promote fields from a dataclass output.""" + import dataclasses + + @dataclasses.dataclass + class EvalResult: + score: float + confidence: float + details: str + + captured_trace = None + + @tracer.trace(promote=["score", "confidence"]) + def my_func() -> EvalResult: + nonlocal captured_trace + captured_trace = tracer.get_current_trace() + return EvalResult(score=0.95, confidence=0.87, details="good") + + result = my_func() + assert result.score == 0.95 + assert captured_trace is not None + assert captured_trace.metadata["score"] == 0.95 + assert captured_trace.metadata["confidence"] == 0.87 + # "details" was NOT promoted + assert "details" not in captured_trace.metadata + + @patch.object(tracer, "_publish", False) + def test_promote_mixed_inputs_and_outputs(self) -> None: + """Promote some keys from inputs, others from output.""" + captured_trace = None + + @tracer.trace(promote={ + "user_query": "input_query", + "tool_count": "agent_tool_count", + }) + def my_func(user_query: str) -> Dict[str, Any]: + nonlocal captured_trace + captured_trace = tracer.get_current_trace() + return {"tool_count": 3, "answer": "result"} + + my_func("what is AI?") + assert captured_trace is not None + # "user_query" resolved from inputs + assert captured_trace.metadata["input_query"] == "what is AI?" + # "tool_count" resolved from output + assert captured_trace.metadata["agent_tool_count"] == 3 + + @patch.object(tracer, "_publish", False) + def test_promote_missing_key_warns(self) -> None: + """Keys not found in inputs or outputs should log a warning.""" + + @tracer.trace(promote={"nonexistent": "col"}) + def my_func() -> Dict[str, Any]: + return {"answer": "result"} + + with patch("openlayer.lib.tracing.tracer.logger") as mock_logger: + my_func() + # Should warn about missing key + mock_logger.warning.assert_called() + warning_args = mock_logger.warning.call_args_list + assert any("nonexistent" in str(call) for call in warning_args) + + @patch.object(tracer, "_publish", False) + def test_promote_output_list_form(self) -> None: + """Promote output fields using list form (no aliasing).""" + captured_trace = None + + @tracer.trace(promote=["score"]) + def my_func() -> Dict[str, Any]: + nonlocal captured_trace + captured_trace = tracer.get_current_trace() + return {"score": 0.95, "details": "good"} + + my_func() + assert captured_trace is not None + assert captured_trace.metadata["score"] == 0.95 + + @patch.object(tracer, "_publish", False) + def test_promote_output_nested_trace(self) -> None: + """Child steps can promote output fields to the shared trace.""" + captured_trace = None + + @tracer.trace(promote={"tool_count": "child_tool_count"}) + def child_func() -> Dict[str, Any]: + return {"tool_count": 7, "answer": "inner"} + + @tracer.trace(promote={"user_query": "input_query"}) + def parent_func(user_query: str) -> str: + nonlocal captured_trace + result = child_func() + captured_trace = tracer.get_current_trace() + return result["answer"] + + parent_func("hello") + assert captured_trace is not None + # Parent promoted from inputs + assert captured_trace.metadata["input_query"] == "hello" + # Child promoted from output + assert captured_trace.metadata["child_tool_count"] == 7 + + @patch.object(tracer, "_publish", False) + def test_promote_output_async(self) -> None: + """Promote output fields from an async function.""" + captured_trace = None + + @tracer.trace_async(promote={"score": "eval_score"}) + async def my_func() -> Dict[str, Any]: + nonlocal captured_trace + captured_trace = tracer.get_current_trace() + return {"score": 0.9, "details": "good"} + + asyncio.run(my_func()) + assert captured_trace is not None + assert captured_trace.metadata["eval_score"] == 0.9 + + @patch.object(tracer, "_publish", False) + def test_promote_output_none_is_safe(self) -> None: + """Promote should not crash when output is None.""" + @tracer.trace(promote={"tool_count": "tc"}) + def my_func(tool_count: int) -> None: + return None + + # "tool_count" exists in inputs, so it should be promoted from there + my_func(tool_count=5)