Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/itk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ jobs:
run: bash run_itk.sh
working-directory: itk
env:
A2A_SAMPLES_REVISION: itk-v.021-alpha
A2A_ITK_REVISION: main
38 changes: 38 additions & 0 deletions .github/workflows/nightly.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Nightly ITK

on:
schedule:
- cron: '0 2 * * *' # 2:00 AM UTC daily
workflow_dispatch: # Allow manual execution

permissions:
contents: write

jobs:
nightly:
name: Nightly ITK Run
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v6

- name: Install uv
uses: astral-sh/setup-uv@v7

- name: Run Nightly ITK Tests
run: bash run_itk.sh
working-directory: itk
env:
A2A_ITK_REVISION: main
ITK_NIGHTLY_RUN: "True"

- name: Upload Results to Rolling Release
uses: softprops/action-gh-release@v2
with:
tag_name: "nightly-metrics"
prerelease: true
files: |
itk/itk_python.json
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Comment thread
kdziedzic70 marked this conversation as resolved.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ docker-compose.yaml
docs/ai/ai_learnings.md

# ITK Integration Test Artifacts
itk/a2a-samples/
itk/a2a-itk/
itk/pyproto/
itk/instruction.proto
itk/logs/
6 changes: 3 additions & 3 deletions itk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ podman system migrate

### 1. Set Environment Variable

You must set the `A2A_SAMPLES_REVISION` environment variable to specify which revision of the `a2a-samples` repository to use for testing. This can be a branch name, tag, or commit hash.
You must set the `A2A_ITK_REVISION` environment variable to specify which revision of the `a2a-itk` repository to use for testing. This can be a branch name, tag, or commit hash.

Example:
```
export A2A_SAMPLES_REVISION=itk-v.021-alpha
export A2A_ITK_REVISION=main
```

### 2. Execute Tests
Expand All @@ -48,7 +48,7 @@ Run the test script from this directory:
```

The script will:
- Clone `a2a-samples` (if not already present).
- Clone `a2a-itk` (if not already present).
- Checkout the specified revision.
- Build the ITK service Docker image.
- Run the tests and output results.
Expand Down
58 changes: 47 additions & 11 deletions itk/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from a2a.client.errors import A2AClientError
from a2a.compat.v0_3 import a2a_v0_3_pb2_grpc
from a2a.compat.v0_3.grpc_handler import CompatGrpcHandler
from a2a.compat.v0_3.types import Role as LegacyRole
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.routes import (
Expand All @@ -34,7 +35,7 @@
InMemoryPushNotificationConfigStore,
)
from a2a.server.tasks.inmemory_task_store import InMemoryTaskStore
from a2a.types import a2a_pb2_grpc
from a2a.types import Role, a2a_pb2_grpc
from a2a.types.a2a_pb2 import (
AgentCapabilities,
AgentCard,
Expand Down Expand Up @@ -100,9 +101,22 @@ def extract_instruction(
continue
else:
return inst

return None


def _get_text_from_part(part: Any) -> str | None:
"""Safely extracts text string from a Part object supporting protobuf, pydantic, and raw dict."""
if not part:
return None
if hasattr(part, 'HasField') and part.HasField('text'):
return part.text
root = getattr(part, 'root', part)
if isinstance(root, dict):
return root.get('text')
return getattr(root, 'text', None)


def _extract_text_from_event(event: Any) -> list[str]:
"""Extracts text parts from an event's message."""
if isinstance(event, tuple):
Expand All @@ -128,7 +142,7 @@ def _extract_text_from_event(event: Any) -> list[str]:
return results


async def _handle_call_agent_with_resubscribe(
async def _handle_call_agent_with_resubscribe( # noqa: PLR0912, PLR0915
client: Client, request: SendMessageRequest
) -> list[str]:
"""Handles the send-disconnect-resubscribe flow."""
Expand All @@ -154,9 +168,32 @@ async def _handle_call_agent_with_resubscribe(
finished = False
async for event in resub_agen:
logger.info('Event after re-subscribe: %s', event)
if hasattr(event, 'HasField') and event.HasField('task'):
if isinstance(event, Task):
task_obj = event
elif hasattr(event, 'HasField') and event.HasField('task'):
task_obj = event.task

if task_obj and hasattr(task_obj, 'history'):
for msg in task_obj.history:
if msg.role in (
Role.ROLE_AGENT,
LegacyRole.agent,
'ROLE_AGENT',
):
for part in msg.parts:
text = _get_text_from_part(part)
if text and 'task-finished' in text:
logger.info(
'Found task-finished in history, breaking loop!'
)
results.append(text.replace('task-finished', ''))
finished = True
break
if finished:
break
if finished:
break

extracted_text = _extract_text_from_event(event)
for text in extracted_text:
processed_text = text.replace('task-finished', '')
Expand All @@ -171,14 +208,13 @@ async def _handle_call_agent_with_resubscribe(
if not results and task_obj and hasattr(task_obj, 'history'):
logger.info('Results empty after loop, reading from history.')
for msg in task_obj.history:
# Check stringified role to support protobuf enums (2 for ROLE_AGENT in v0.3 and v1.0)
# as well as string descriptors from dict/JSON forms.
if str(msg.role) in {'2', 'ROLE_AGENT', 'agent'}:
results.extend(
part.text.replace('task-finished', '')
for part in msg.parts
if part.text
)
# Check role using SDK schemas for v1.0 (protobuf enum Role.ROLE_AGENT)
# and v0.3 (pydantic enum LegacyRole.agent), as well as string forms.
if msg.role in (Role.ROLE_AGENT, LegacyRole.agent, 'ROLE_AGENT'):
for part in msg.parts:
text = _get_text_from_part(part)
if text:
results.append(text.replace('task-finished', ''))

if not finished:
logger.info('Canceling task %s after retrieval.', task_id)
Expand Down
127 changes: 35 additions & 92 deletions itk/run_itk.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ cleanup() {
docker stop itk-service > /dev/null 2>&1 || true
docker rm itk-service > /dev/null 2>&1 || true
docker rmi itk_service > /dev/null 2>&1 || true
rm -rf a2a-samples > /dev/null 2>&1 || true
rm -rf a2a-itk > /dev/null 2>&1 || true
rm -rf pyproto > /dev/null 2>&1 || true
rm -f instruction.proto > /dev/null 2>&1 || true
echo "Done. Final exit code: $RESULT"
Expand All @@ -23,24 +23,24 @@ cleanup() {
# Register cleanup function to run on script exit
trap cleanup EXIT

# 1. Pull a2a-samples and checkout revision
: "${A2A_SAMPLES_REVISION:?A2A_SAMPLES_REVISION environment variable must be set}"
# 1. Pull a2a-itk and checkout revision
: "${A2A_ITK_REVISION:?A2A_ITK_REVISION environment variable must be set}"

if [ ! -d "a2a-samples" ]; then
git clone https://github.com/a2aproject/a2a-samples.git a2a-samples
if [ ! -d "a2a-itk" ]; then
git clone https://github.com/a2aproject/a2a-itk.git a2a-itk
fi
cd a2a-samples
cd a2a-itk
git fetch origin
git checkout "$A2A_SAMPLES_REVISION"
git checkout "$A2A_ITK_REVISION"

# Only pull if it's a branch (not a detached HEAD)
if git symbolic-ref -q HEAD > /dev/null; then
git pull origin "$A2A_SAMPLES_REVISION"
git pull origin "$A2A_ITK_REVISION"
fi
cd ..

# 2. Copy instruction.proto from a2a-samples
cp a2a-samples/itk/protos/instruction.proto ./instruction.proto
# 2. Copy instruction.proto from a2a-itk
cp a2a-itk/protos/instruction.proto ./instruction.proto

# 3. Build pyproto library
mkdir -p pyproto
Expand All @@ -54,9 +54,9 @@ uv run --with grpcio-tools python -m grpc_tools.protoc \
# Fix imports in generated file
sed -i 's/^import instruction_pb2 as instruction__pb2/from . import instruction_pb2 as instruction__pb2/' pyproto/instruction_pb2_grpc.py

# 4. Build jit itk_service docker image from root of a2a-samples/itk
# We run docker build from the itk directory inside a2a-samples
docker build -t itk_service a2a-samples/itk
# 4. Build jit itk_service docker image from root of a2a-itk
# We run docker build from the root directory of a2a-itk
docker build -t itk_service a2a-itk

# 5. Start docker service
# Mounting a2a-python as repo and itk as current agent
Expand Down Expand Up @@ -109,86 +109,28 @@ if ! curl -s http://127.0.0.1:8000/ > /dev/null; then
exit 1
fi

echo "ITK Service is up! Sending compatibility test request..."
SCENARIO_FILE="scenarios.json"
if [ "${ITK_NIGHTLY_RUN^^}" = "TRUE" ]; then
SCENARIO_FILE="scenarios_full.json"
fi

echo "ITK Service is up! Sending compatibility test request using $SCENARIO_FILE..."
RESPONSE=$(curl -s -X POST http://127.0.0.1:8000/run \
-H "Content-Type: application/json" \
-d '{
"tests": [
{
"name": "Star Topology (Full) - JSONRPC & GRPC",
"sdks": ["current", "python_v10", "python_v03", "go_v10", "go_v03"],
"traversal": "euler",
"edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"],
"protocols": ["jsonrpc", "grpc"],
"behavior": "send_message"
},
{
"name": "Star Topology (No Go v03) - HTTP_JSON",
"sdks": ["current", "python_v10", "python_v03", "go_v10"],
"traversal": "euler",
"edges": ["0->1", "0->2", "0->3", "1->0", "2->0", "3->0"],
"protocols": ["http_json"],
"behavior": "send_message"
},
{
"name": "Star Topology (Full) - JSONRPC & GRPC (Streaming)",
"sdks": ["current", "python_v10", "python_v03", "go_v10", "go_v03"],
"traversal": "euler",
"edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"],
"protocols": ["jsonrpc", "grpc"],
"streaming": true,
"behavior": "send_message"
},
{
"name": "Star Topology (No Go v03) - HTTP_JSON (Streaming)",
"sdks": ["current", "python_v10", "python_v03", "go_v10"],
"traversal": "euler",
"edges": ["0->1", "0->2", "0->3", "1->0", "2->0", "3->0"],
"protocols": ["http_json"],
"streaming": true,
"behavior": "send_message"
},
{
"name": "Push Notification Test - JSONRPC & GRPC",
"sdks": ["current", "python_v10", "python_v03", "go_v03"],
"traversal": "euler",
"edges": ["0->1", "0->2", "0->3", "1->0", "2->0", "3->0"],
"protocols": ["jsonrpc", "grpc"],
"behavior": "push_notification"
},
{
"name": "Push Notification Test - HTTP_JSON",
"sdks": ["current", "python_v10", "python_v03"],
"traversal": "euler",
"edges": ["0->1", "0->2", "1->0", "2->0"],
"protocols": ["http_json"],
"behavior": "push_notification"
},
{
"name": "Resubscribe Test - JSONRPC",
"sdks": ["current", "python_v10", "python_v03", "go_v10", "go_v03"],
"traversal": "euler",
"edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"],
"protocols": ["jsonrpc"],
"streaming": true,
"behavior": "resubscribe"
},
{
"name": "Resubscribe Test - Python & Go Non-JSONRPC Protocols",
"sdks": ["current", "python_v10", "python_v03", "go_v10"],
"traversal": "euler",
"edges": ["0->1", "0->2", "0->3", "1->0", "2->0", "3->0"],
"protocols": ["grpc", "http_json"],
"streaming": true,
"behavior": "resubscribe"
}
]
}')

echo "--------------------------------------------------------"
echo "ITK TEST RESULTS:"
echo "--------------------------------------------------------"
echo "$RESPONSE" | python3 -c "
-d "@$SCENARIO_FILE")

if [ "${ITK_NIGHTLY_RUN^^}" = "TRUE" ]; then
echo "Nightly run detected. Saving raw results and running process_results.py..."
echo "$RESPONSE" > raw_results.json
python3 a2a-itk/scripts/process_results.py \
--history_output_file itk_python.json \
--history_url https://github.com/a2aproject/a2a-python/releases/download/nightly-metrics/itk_python.json
RESULT=$?
else
echo "--------------------------------------------------------"
echo "ITK TEST RESULTS:"
echo "--------------------------------------------------------"
echo "$RESPONSE" | python3 -c "
import sys, json
try:
data = json.load(sys.stdin)
Expand All @@ -206,7 +148,8 @@ except Exception as e:
print(f'Raw response: {data if \"data\" in locals() else \"no data\"}')
sys.exit(1)
"
RESULT=$?
RESULT=$?
fi
set -e

if [ $RESULT -ne 0 ]; then
Expand Down
Loading
Loading