Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 113 additions & 87 deletions .github/workflows/orchestrate.yml

Large diffs are not rendered by default.

313 changes: 153 additions & 160 deletions agentic_ai/agents/agent_framework/multi_agent/HANDOFF_README.md

Large diffs are not rendered by default.

1,008 changes: 404 additions & 604 deletions agentic_ai/agents/agent_framework/multi_agent/handoff_multi_domain_agent.py

Large diffs are not rendered by default.

119 changes: 78 additions & 41 deletions agentic_ai/agents/agent_framework/multi_agent/magentic_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
MagenticPlanReviewRequest,
MagenticPlanReviewResponse,
)
from agent_framework.azure import AzureOpenAIChatClient # type: ignore[import]
from agent_framework.openai import OpenAIChatClient

from agents.base_agent import BaseAgent, ToolCallTrackingMixin
from agents.agent_framework.utils import create_filtered_tool_list
Expand All @@ -42,11 +42,11 @@ def __init__(self, backing_store: Dict[str, Any]) -> None:
self._async_lock = asyncio.Lock()
self._sync_lock = ThreadLock()

async def save_checkpoint(self, checkpoint: WorkflowCheckpoint) -> str:
async def save(self, checkpoint: WorkflowCheckpoint) -> str:
async with self._async_lock:
self._checkpoints[checkpoint.checkpoint_id] = checkpoint.to_dict()
self._backing["latest_checkpoint"] = checkpoint.checkpoint_id
self._backing["workflow_id"] = checkpoint.workflow_id
self._backing["workflow_name"] = checkpoint.workflow_name

if len(self._checkpoints) > self._RETENTION:
sorted_ids = sorted(
Expand All @@ -57,34 +57,39 @@ async def save_checkpoint(self, checkpoint: WorkflowCheckpoint) -> str:
self._checkpoints.pop(checkpoint_id, None)
return checkpoint.checkpoint_id

async def load_checkpoint(self, checkpoint_id: str) -> WorkflowCheckpoint | None:
async def load(self, checkpoint_id: str) -> WorkflowCheckpoint | None:
async with self._async_lock:
data = self._checkpoints.get(checkpoint_id)
if not data:
return None
return WorkflowCheckpoint.from_dict(data)

async def list_checkpoint_ids(self, workflow_id: str | None = None) -> List[str]:
async def list_checkpoint_ids(self, *, workflow_name: str) -> List[str]:
async with self._async_lock:
if workflow_id is None:
return list(self._checkpoints.keys())
return [cid for cid, data in self._checkpoints.items() if data.get("workflow_id") == workflow_id]
return [cid for cid, data in self._checkpoints.items() if data.get("workflow_name") == workflow_name]

async def list_checkpoints(self, workflow_id: str | None = None) -> List[WorkflowCheckpoint]:
async def list_checkpoints(self, *, workflow_name: str) -> List[WorkflowCheckpoint]:
async with self._async_lock:
if workflow_id is None:
ids = list(self._checkpoints.keys())
else:
ids = [cid for cid, data in self._checkpoints.items() if data.get("workflow_id") == workflow_id]
ids = [cid for cid, data in self._checkpoints.items() if data.get("workflow_name") == workflow_name]
return [WorkflowCheckpoint.from_dict(self._checkpoints[cid]) for cid in ids]

async def delete_checkpoint(self, checkpoint_id: str) -> bool:
async def delete(self, checkpoint_id: str) -> bool:
async with self._async_lock:
removed = self._checkpoints.pop(checkpoint_id, None)
if removed and self._backing.get("latest_checkpoint") == checkpoint_id:
self._backing.pop("latest_checkpoint", None)
return removed is not None

async def get_latest(self, *, workflow_name: str) -> WorkflowCheckpoint | None:
async with self._async_lock:
latest_id = self._backing.get("latest_checkpoint")
if not latest_id:
return None
data = self._checkpoints.get(latest_id)
if not data or data.get("workflow_name") != workflow_name:
return None
return WorkflowCheckpoint.from_dict(data)

@property
def latest_checkpoint_id(self) -> str | None:
with self._sync_lock:
Expand Down Expand Up @@ -214,8 +219,8 @@ def __init__(
logger.warning(
"[AgentFramework-Magentic] Ignoring checkpoint storage override because it does not implement CheckpointStorage."
)
self._participant_client: Optional[AzureOpenAIChatClient] = None
self._manager_client: Optional[AzureOpenAIChatClient] = None
self._participant_client: Optional[OpenAIChatClient] = None
self._manager_client: Optional[OpenAIChatClient] = None
self._workflow_event_logging_enabled = bool(self._config.get("log_workflow_events", False))
self._enable_plan_review = bool(self._config.get("enable_plan_review", False))
self._manager_instructions = self._config.get(
Expand Down Expand Up @@ -353,32 +358,32 @@ async def _maybe_create_tools(self, headers: Dict[str, str]) -> List[MCPStreamab

return None

def _get_participant_client(self) -> AzureOpenAIChatClient:
def _get_participant_client(self) -> OpenAIChatClient:
if self._participant_client is None:
self._participant_client = self._build_chat_client()
return self._participant_client

def _get_manager_client(self) -> AzureOpenAIChatClient:
def _get_manager_client(self) -> OpenAIChatClient:
if self._manager_client is None:
self._manager_client = self._build_chat_client()
return self._manager_client

def _build_chat_client(self) -> AzureOpenAIChatClient:
def _build_chat_client(self) -> OpenAIChatClient:
# Use API key if available, otherwise use credential-based authentication
if self.azure_openai_key:
logger.info("[AgentFramework-Magentic] Using API key authentication for Azure OpenAI")
return AzureOpenAIChatClient(
return OpenAIChatClient(
api_key=self.azure_openai_key,
deployment_name=self.azure_deployment,
endpoint=self.azure_openai_endpoint,
model=self.azure_deployment,
azure_endpoint=self.azure_openai_endpoint,
api_version=self.api_version,
)
elif self.azure_credential:
logger.info("[AgentFramework-Magentic] Using managed identity authentication for Azure OpenAI")
return AzureOpenAIChatClient(
return OpenAIChatClient(
credential=self.azure_credential,
deployment_name=self.azure_deployment,
endpoint=self.azure_openai_endpoint,
model=self.azure_deployment,
azure_endpoint=self.azure_openai_endpoint,
api_version=self.api_version,
)
else:
Expand Down Expand Up @@ -436,8 +441,8 @@ async def _resume_previous_run(

async def _build_workflow(
self,
participant_client: AzureOpenAIChatClient,
manager_client: AzureOpenAIChatClient,
participant_client: OpenAIChatClient,
manager_client: OpenAIChatClient,
tools: List[MCPStreamableHTTPTool] | None,
checkpoint_storage: CheckpointStorage,
) -> Any:
Expand Down Expand Up @@ -471,7 +476,7 @@ async def _build_workflow(

async def _create_participants(
self,
participant_client: AzureOpenAIChatClient,
participant_client: OpenAIChatClient,
tools: Iterable[MCPStreamableHTTPTool] | None,
) -> Dict[str, FrameworkAgent]:
# Get base MCP tool (connect once, filter per agent)
Expand Down Expand Up @@ -695,7 +700,7 @@ async def _run_workflow(
async def _process_workflow_event(self, event: WorkflowEvent) -> None:
"""Process workflow events and stream to WebSocket clients.

In 1.0.0rc1, all events are WorkflowEvent with a .type field:
In agent-framework 1.2.x, all events are WorkflowEvent with a .type field:
- 'magentic_orchestrator': plan, replan, progress ledger updates
- 'request_info': plan review requests
- 'data': streaming tokens from participant agents
Expand Down Expand Up @@ -913,8 +918,8 @@ def _coerce_checkpoint_storage(self, candidate: Any) -> Optional[CheckpointStora
return None

required_methods = [
"save_checkpoint",
"load_checkpoint",
"save",
"load",
]

for method_name in required_methods:
Expand Down Expand Up @@ -1070,14 +1075,16 @@ async def _purge_checkpoint_storage(self, storage: CheckpointStorage) -> None:
except Exception as exc:
logger.debug("clear_all failed: %s", exc)

# Fallback: list and delete individually
# Fallback: list and delete individually using the 1.2.x CheckpointStorage protocol.
# ``list_checkpoint_ids`` now requires a keyword-only ``workflow_name``.
list_fn = getattr(storage, "list_checkpoint_ids", None)
delete_fn = getattr(storage, "delete_checkpoint", None)
delete_fn = getattr(storage, "delete", None)
if not (callable(list_fn) and callable(delete_fn)):
return

try:
checkpoint_ids = await self._call_maybe_async(list_fn)
workflow_name = self._workflow_name_for_storage(storage)
checkpoint_ids = await self._call_maybe_async(list_fn, workflow_name=workflow_name) if workflow_name else []
if checkpoint_ids:
for checkpoint_id in checkpoint_ids:
try:
Expand All @@ -1087,9 +1094,22 @@ async def _purge_checkpoint_storage(self, storage: CheckpointStorage) -> None:
except Exception as exc:
logger.debug("Unable to enumerate checkpoints: %s", exc)

@staticmethod
def _workflow_name_for_storage(storage: CheckpointStorage) -> str | None:
"""Best-effort lookup of the active workflow name for a storage instance.

The DictCheckpointStorage shipped with this module records the workflow
name on every ``save()``; for other storages we cannot infer it.
"""
backing = getattr(storage, "_backing", None)
if isinstance(backing, dict):
return backing.get("workflow_name")
return None

async def _get_latest_checkpoint_id(self, storage: CheckpointStorage) -> Optional[str]:
"""Get the most recent checkpoint ID from storage."""
# Try latest_checkpoint_id property/method first
# Try latest_checkpoint_id property/method first (nonstandard convenience
# exposed by the in-process DictCheckpointStorage in this module).
latest_id_attr = getattr(storage, "latest_checkpoint_id", None)
if callable(latest_id_attr):
try:
Expand All @@ -1101,11 +1121,28 @@ async def _get_latest_checkpoint_id(self, storage: CheckpointStorage) -> Optiona
elif isinstance(latest_id_attr, str):
return latest_id_attr

# Try list_checkpoints and get latest
# Best-effort: the 1.2.x ``CheckpointStorage`` protocol requires a
# keyword-only ``workflow_name`` on ``get_latest`` / ``list_checkpoints``
# / ``list_checkpoint_ids``. Without one we cannot call those methods.
workflow_name = self._workflow_name_for_storage(storage)

# Try the 1.2.x ``get_latest`` shortcut.
get_latest_fn = getattr(storage, "get_latest", None)
if callable(get_latest_fn) and workflow_name:
try:
latest = await self._call_maybe_async(get_latest_fn, workflow_name=workflow_name)
if latest is not None:
checkpoint_id = getattr(latest, "checkpoint_id", None)
if isinstance(checkpoint_id, str):
return checkpoint_id
except Exception:
pass

# Try list_checkpoints and pick the most recent entry.
list_checkpoints_fn = getattr(storage, "list_checkpoints", None)
if callable(list_checkpoints_fn):
if callable(list_checkpoints_fn) and workflow_name:
try:
checkpoints = await self._call_maybe_async(list_checkpoints_fn)
checkpoints = await self._call_maybe_async(list_checkpoints_fn, workflow_name=workflow_name)
if checkpoints:
latest = max(checkpoints, key=lambda cp: (
getattr(cp, "timestamp", ""),
Expand All @@ -1115,11 +1152,11 @@ async def _get_latest_checkpoint_id(self, storage: CheckpointStorage) -> Optiona
except Exception:
pass

# Fallback: list checkpoint IDs and return last
# Fallback: list checkpoint IDs and return last.
list_ids_fn = getattr(storage, "list_checkpoint_ids", None)
if callable(list_ids_fn):
if callable(list_ids_fn) and workflow_name:
try:
checkpoint_ids = await self._call_maybe_async(list_ids_fn)
checkpoint_ids = await self._call_maybe_async(list_ids_fn, workflow_name=workflow_name)
if checkpoint_ids:
return checkpoint_ids[-1]
except Exception:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from typing import Any, Dict, List

from agent_framework import Agent as FrameworkAgent, AgentSession, ChatOptions, MCPStreamableHTTPTool
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.openai import OpenAIChatClient

from agents.base_agent import BaseAgent, ToolCallTrackingMixin

Expand Down Expand Up @@ -88,16 +88,16 @@ async def _setup_agents(self) -> None:

# Create chat client
client_kwargs = {
"deployment_name": self.azure_deployment,
"endpoint": self.azure_openai_endpoint,
"model": self.azure_deployment,
"azure_endpoint": self.azure_openai_endpoint,
"api_version": self.api_version,
}
if self.azure_openai_key:
client_kwargs["api_key"] = self.azure_openai_key
else:
client_kwargs["credential"] = self.azure_credential

chat_client = AzureOpenAIChatClient(**client_kwargs)
chat_client = OpenAIChatClient(**client_kwargs)

# Create MCP tools
tools = await self._create_mcp_tools()
Expand Down
14 changes: 7 additions & 7 deletions agentic_ai/agents/agent_framework/single_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any, Dict, List

from agent_framework import Agent as FrameworkAgent, AgentSession, ChatOptions, MCPStreamableHTTPTool
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.openai import OpenAIChatClient

from agents.base_agent import BaseAgent, ToolCallTrackingMixin

Expand Down Expand Up @@ -55,18 +55,18 @@ async def _setup_single_agent(self) -> None:

# Use API key if available, otherwise use credential-based authentication
if has_api_key:
chat_client = AzureOpenAIChatClient(
chat_client = OpenAIChatClient(
api_key=self.azure_openai_key,
deployment_name=self.azure_deployment,
endpoint=self.azure_openai_endpoint,
model=self.azure_deployment,
azure_endpoint=self.azure_openai_endpoint,
api_version=self.api_version,
)
logger.info("[AgentFramework] Using API key authentication for Azure OpenAI")
else:
chat_client = AzureOpenAIChatClient(
chat_client = OpenAIChatClient(
credential=self.azure_credential,
deployment_name=self.azure_deployment,
endpoint=self.azure_openai_endpoint,
model=self.azure_deployment,
azure_endpoint=self.azure_openai_endpoint,
api_version=self.api_version,
)
logger.info("[AgentFramework] Using managed identity authentication for Azure OpenAI")
Expand Down
3 changes: 1 addition & 2 deletions agentic_ai/agents/mcp_agent_demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,7 @@ uv run python workflow_group_chat.py

| Package | Purpose |
|---------|---------|
| [agent-framework-core](https://github.com/microsoft/agent-framework) | Microsoft Agent Framework — agents, tools, MCP client |
| [agent-framework-orchestrations](https://github.com/microsoft/agent-framework) | GroupChatBuilder for multi-agent workflows |
| [agent-framework](https://github.com/microsoft/agent-framework) | Microsoft Agent Framework 1.2.1 — unified umbrella package (agents, tools, MCP client, orchestrations including the native `HandoffBuilder`, `GroupChatBuilder`, etc.) |
| [fastmcp](https://github.com/jlowin/fastmcp) | PrefectHQ FastMCP v3 — stateful MCP server with session support |
| [langgraph](https://github.com/langchain-ai/langgraph) | Stateful agent graphs with MemorySaver |
| [langchain-openai](https://github.com/langchain-ai/langchain) | Azure OpenAI integration for LangGraph |
Expand Down
8 changes: 4 additions & 4 deletions agentic_ai/agents/mcp_agent_demo/mcp_client_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
load_dotenv(env_path)

from agent_framework import Agent, MCPStreamableHTTPTool
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.openai import OpenAIChatClient


async def main() -> None:
Expand All @@ -49,10 +49,10 @@ async def main() -> None:
print(f"✅ Connected! Tools available from MCP server: {[t.name for t in mcp_tool.functions]}")

# 2) Create a local Coordinator agent that wraps the MCP tool
client = AzureOpenAIChatClient(
client = OpenAIChatClient(
api_key=api_key,
endpoint=endpoint,
deployment_name=deployment,
azure_endpoint=endpoint,
model=deployment,
api_version=api_version,
)

Expand Down
8 changes: 4 additions & 4 deletions agentic_ai/agents/mcp_agent_demo/mcp_client_hybrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
load_dotenv(env_path)

from agent_framework import Agent, MCPStreamableHTTPTool
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.openai import OpenAIChatClient

# ── The same raw EDR alert used in the typed-contracts demo ─────────────────

Expand Down Expand Up @@ -118,10 +118,10 @@ async def main() -> None:
print(f" ✅ Connected — {len(tool_names)} tools: {', '.join(tool_names)}")
print()

client = AzureOpenAIChatClient(
client = OpenAIChatClient(
api_key=api_key,
endpoint=endpoint,
deployment_name=deployment,
azure_endpoint=endpoint,
model=deployment,
api_version=api_version,
)

Expand Down
8 changes: 4 additions & 4 deletions agentic_ai/agents/mcp_agent_demo/mcp_client_stateful.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
load_dotenv(env_path)

from agent_framework import Agent, MCPStreamableHTTPTool
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.openai import OpenAIChatClient


async def main() -> None:
Expand Down Expand Up @@ -61,10 +61,10 @@ async def main() -> None:
print(f"✅ Connected! Tools available: {tool_names}")

# ── Create the local coordinator agent ──────────────────────────
client = AzureOpenAIChatClient(
client = OpenAIChatClient(
api_key=api_key,
endpoint=endpoint,
deployment_name=deployment,
azure_endpoint=endpoint,
model=deployment,
api_version=api_version,
)

Expand Down
Loading
Loading