Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions src/a2a/client/transports/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,29 @@ def _map_grpc_error(e: grpc.aio.AioRpcError) -> NoReturn:

# Use grpc_status to cleanly extract the rich Status from the call
status = rpc_status.from_call(cast('grpc.Call', e))
data = None

if status is not None:
exception_cls = None
for detail in status.details:
if detail.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
if detail.Is(error_details_pb2.BadRequest.DESCRIPTOR):
bad_request = error_details_pb2.BadRequest()
detail.Unpack(bad_request)
errors = [
{'field': v.field, 'message': v.description}
for v in bad_request.field_violations
]
data = {'errors': errors}
# Infer InvalidParamsError from BadRequest details
exception_cls = A2A_REASON_TO_ERROR.get('INVALID_PARAMS')
Comment thread
ishymko marked this conversation as resolved.
Outdated
elif detail.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
error_info = error_details_pb2.ErrorInfo()
detail.Unpack(error_info)

if error_info.domain == 'a2a-protocol.org':
exception_cls = A2A_REASON_TO_ERROR.get(error_info.reason)
if exception_cls:
raise exception_cls(status.message) from e

if exception_cls:
raise exception_cls(status.message, data=data) from e

raise A2AClientError(f'gRPC Error {e.code().name}: {e.details()}') from e

Expand Down
3 changes: 2 additions & 1 deletion src/a2a/client/transports/jsonrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,10 @@ def _create_jsonrpc_error(self, error_dict: dict[str, Any]) -> Exception:
"""Creates the appropriate A2AError from a JSON-RPC error dictionary."""
code = error_dict.get('code')
message = error_dict.get('message', str(error_dict))
data = error_dict.get('data')

if isinstance(code, int) and code in _JSON_RPC_ERROR_CODE_TO_A2A_ERROR:
return _JSON_RPC_ERROR_CODE_TO_A2A_ERROR[code](message)
return _JSON_RPC_ERROR_CODE_TO_A2A_ERROR[code](message, data=data)

# Fallback to general A2AClientError
return A2AClientError(f'JSON-RPC Error {code}: {message}')
Expand Down
15 changes: 14 additions & 1 deletion src/a2a/server/request_handlers/default_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
InMemoryQueueManager,
QueueManager,
)
from a2a.server.request_handlers.request_handler import RequestHandler
from a2a.server.request_handlers.request_handler import (
RequestHandler,
validate_request_params,
)
Comment thread
ishymko marked this conversation as resolved.
from a2a.server.tasks import (
PushNotificationConfigStore,
PushNotificationEvent,
Expand Down Expand Up @@ -118,6 +121,7 @@ def __init__( # noqa: PLR0913
# asyncio tasks and to surface unexpected exceptions.
self._background_tasks = set()

@validate_request_params
async def on_get_task(
self,
params: GetTaskRequest,
Expand All @@ -133,6 +137,7 @@ async def on_get_task(

return apply_history_length(task, params)

@validate_request_params
async def on_list_tasks(
self,
params: ListTasksRequest,
Expand All @@ -154,6 +159,7 @@ async def on_list_tasks(

return page

@validate_request_params
async def on_cancel_task(
self,
params: CancelTaskRequest,
Expand Down Expand Up @@ -317,6 +323,7 @@ async def _send_push_notification_if_needed(
):
await self._push_sender.send_notification(task_id, event)

@validate_request_params
async def on_message_send(
self,
params: SendMessageRequest,
Expand Down Expand Up @@ -386,6 +393,7 @@ async def push_notification_callback(event: Event) -> None:

return result

@validate_request_params
async def on_message_send_stream(
self,
params: SendMessageRequest,
Expand Down Expand Up @@ -474,6 +482,7 @@ async def _cleanup_producer(
async with self._running_agents_lock:
self._running_agents.pop(task_id, None)

@validate_request_params
async def on_create_task_push_notification_config(
self,
params: TaskPushNotificationConfig,
Expand All @@ -499,6 +508,7 @@ async def on_create_task_push_notification_config(

return params

@validate_request_params
async def on_get_task_push_notification_config(
self,
params: GetTaskPushNotificationConfigRequest,
Expand Down Expand Up @@ -530,6 +540,7 @@ async def on_get_task_push_notification_config(

raise InternalError(message='Push notification config not found')

@validate_request_params
async def on_subscribe_to_task(
self,
params: SubscribeToTaskRequest,
Expand Down Expand Up @@ -572,6 +583,7 @@ async def on_subscribe_to_task(
async for event in result_aggregator.consume_and_emit(consumer):
yield event

@validate_request_params
async def on_list_task_push_notification_configs(
self,
params: ListTaskPushNotificationConfigsRequest,
Expand All @@ -597,6 +609,7 @@ async def on_list_task_push_notification_configs(
configs=push_notification_config_list
)

@validate_request_params
async def on_delete_task_push_notification_config(
self,
params: DeleteTaskPushNotificationConfigRequest,
Expand Down
21 changes: 17 additions & 4 deletions src/a2a/server/request_handlers/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,16 +438,29 @@ async def abort_context(
error.message if hasattr(error, 'message') else str(error)
)

# Create standard Status and pack the ErrorInfo
# Create standard Status
status = status_pb2.Status(code=status_code, message=error_msg)
detail = any_pb2.Any()
detail.Pack(error_info)
status.details.append(detail)

# Exclusive details based on error type:
if error.data and error.data.get('errors'):
bad_request = error_details_pb2.BadRequest()
for err_dict in error.data['errors']:
violation = bad_request.field_violations.add()
violation.field = err_dict.get('field', '')
violation.description = err_dict.get('message', '')
any_bad_request = any_pb2.Any()
any_bad_request.Pack(bad_request)
status.details.append(any_bad_request)
else:
detail = any_pb2.Any()
detail.Pack(error_info)
status.details.append(detail)

# Use grpc_status to safely generate standard trailing metadata
rich_status = rpc_status.to_status(status)

new_metadata: list[tuple[str, str | bytes]] = []

trailing = context.trailing_metadata()
if trailing:
for k, v in trailing:
Expand Down
1 change: 1 addition & 0 deletions src/a2a/server/request_handlers/jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def _build_error_response(
jsonrpc_error = model_class(
code=code,
message=str(error),
data=error.data,
)
else:
jsonrpc_error = JSONRPCInternalError(message=str(error))
Expand Down
43 changes: 42 additions & 1 deletion src/a2a/server/request_handlers/request_handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import functools
import inspect

from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, Callable
from typing import Any

from google.protobuf.message import Message as ProtoMessage

from a2a.server.context import ServerCallContext
from a2a.server.events.event_queue import Event
Expand All @@ -19,6 +25,7 @@
TaskPushNotificationConfig,
)
from a2a.utils.errors import UnsupportedOperationError
from a2a.utils.proto_utils import validate_proto_required_fields


class RequestHandler(ABC):
Expand Down Expand Up @@ -218,3 +225,37 @@ async def on_delete_task_push_notification_config(
Returns:
None
"""


def validate_request_params(method: Callable) -> Callable:
"""Decorator for RequestHandler methods to validate required fields on incoming requests."""
if inspect.isasyncgenfunction(method):

@functools.wraps(method)
async def async_generator_wrapper(
self: RequestHandler,
params: ProtoMessage,
context: ServerCallContext,
*args: Any,
**kwargs: Any,
) -> AsyncGenerator:
if params is not None:
validate_proto_required_fields(params)
Comment thread
ishymko marked this conversation as resolved.
async for item in method(self, params, context, *args, **kwargs):
yield item

return async_generator_wrapper

@functools.wraps(method)
async def async_wrapper(
self: RequestHandler,
params: ProtoMessage,
context: ServerCallContext,
*args: Any,
**kwargs: Any,
) -> Any:
if params is not None:
validate_proto_required_fields(params)
Comment thread
ishymko marked this conversation as resolved.
return await method(self, params, context, *args, **kwargs)

return async_wrapper
3 changes: 2 additions & 1 deletion src/a2a/utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ class A2AError(Exception):
message: str = 'A2A Error'
data: dict | None = None

def __init__(self, message: str | None = None):
def __init__(self, message: str | None = None, data: dict | None = None):
if message:
self.message = message
self.data = data
Comment thread
ishymko marked this conversation as resolved.
super().__init__(self.message)


Expand Down
109 changes: 108 additions & 1 deletion src/a2a/utils/proto_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
This module provides helper functions for common proto type operations.
"""

from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, TypedDict

from google.api.field_behavior_pb2 import FieldBehavior, field_behavior
from google.protobuf.descriptor import FieldDescriptor
from google.protobuf.json_format import ParseDict
from google.protobuf.message import Message as ProtobufMessage

from a2a.utils.errors import InvalidParamsError


if TYPE_CHECKING:
from starlette.datastructures import QueryParams
Expand Down Expand Up @@ -189,3 +193,106 @@ def parse_params(params: QueryParams, message: ProtobufMessage) -> None:
processed[k] = parsed_val

ParseDict(processed, message, ignore_unknown_fields=True)


class ValidationDetail(TypedDict):
"""Structured validation error detail."""

field: str
message: str


def _check_required_field_violation(
msg: ProtobufMessage, field: FieldDescriptor
) -> ValidationDetail | None:
"""Check if a required field is missing or invalid."""
val = getattr(msg, field.name)
if field.is_repeated:
if not val:
return ValidationDetail(
field=field.name,
message='Field must contain at least one element.',
)
Comment thread
ishymko marked this conversation as resolved.
elif field.has_presence:
if not msg.HasField(field.name):
return ValidationDetail(
field=field.name, message='Field is required.'
)
elif val == field.default_value:
return ValidationDetail(field=field.name, message='Field is required.')
Comment thread
ishymko marked this conversation as resolved.
return None


def _append_nested_errors(
errors: list[ValidationDetail],
prefix: str,
sub_errs: list[ValidationDetail],
) -> None:
"""Format nested validation errors and append to errors list."""
for sub in sub_errs:
sub_field = sub['field']
errors.append(
ValidationDetail(
field=f'{prefix}.{sub_field}' if sub_field else prefix,
message=sub['message'],
)
Comment thread
ishymko marked this conversation as resolved.
)


def _recurse_validation(
msg: ProtobufMessage, field: FieldDescriptor
) -> list[ValidationDetail]:
"""Recurse validation for nested messages and map fields."""
errors: list[ValidationDetail] = []
if field.type != FieldDescriptor.TYPE_MESSAGE:
return errors

val = getattr(msg, field.name)
if not field.is_repeated:
if msg.HasField(field.name):
sub_errs = _validate_proto_required_fields_internal(val)
_append_nested_errors(errors, field.name, sub_errs)
elif field.message_type.GetOptions().map_entry:
for k, v in val.items():
if isinstance(v, ProtobufMessage):
sub_errs = _validate_proto_required_fields_internal(v)
_append_nested_errors(errors, f'{field.name}[{k}]', sub_errs)
Comment thread
ishymko marked this conversation as resolved.
else:
for i, item in enumerate(val):
sub_errs = _validate_proto_required_fields_internal(item)
_append_nested_errors(errors, f'{field.name}[{i}]', sub_errs)
return errors


def _validate_proto_required_fields_internal(
msg: ProtobufMessage,
) -> list[ValidationDetail]:
"""Internal validation that returns a list of error dictionaries."""
desc = msg.DESCRIPTOR
errors: list[ValidationDetail] = []

for field in desc.fields:
options = field.GetOptions()
if FieldBehavior.REQUIRED in options.Extensions[field_behavior]:
violation = _check_required_field_violation(msg, field)
if violation:
errors.append(violation)
errors.extend(_recurse_validation(msg, field))
return errors


def validate_proto_required_fields(msg: ProtobufMessage) -> None:
"""Validate that all fields marked as REQUIRED are present on the proto message.

Args:
msg: The Protobuf message to validate.

Raises:
InvalidParamsError: If a required field is missing or empty.
"""
errors = _validate_proto_required_fields_internal(msg)

if errors:
raise InvalidParamsError(
message='Validation failed', data={'errors': errors}
)
Loading
Loading