-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathcore.py
More file actions
99 lines (84 loc) · 3.27 KB
/
core.py
File metadata and controls
99 lines (84 loc) · 3.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""Compatibility re-exports for historical imports.
The project now keeps implementation in dedicated modules mirroring the
agent-client-protocol Rust structure, but external callers may still import
from ``acp.core``. Keep the surface API stable by forwarding to the new homes.
"""
from __future__ import annotations
from typing import Any
from .agent.connection import AgentSideConnection
from .client.connection import ClientSideConnection
from .connection import Connection, JsonValue, MethodHandler
from .exceptions import ErrorCode, RequestError
from .interfaces import Agent, Client
__all__ = [
"DEFAULT_STDIO_BUFFER_LIMIT_BYTES",
"Agent",
"AgentSideConnection",
"Client",
"ClientSideConnection",
"Connection",
"ErrorCode",
"JsonValue",
"MethodHandler",
"RequestError",
"connect_to_agent",
"run_agent",
]
# Default to 50MB for agent/client data transfer.
# The original stdio_streams default is 64KB, which is not large
# enough for multimodal use-cases.
DEFAULT_STDIO_BUFFER_LIMIT_BYTES = 50 * 1024 * 1024
async def run_agent(
agent: Agent,
input_stream: Any = None,
output_stream: Any = None,
*,
use_unstable_protocol: bool = False,
stdio_buffer_limit_bytes: int = DEFAULT_STDIO_BUFFER_LIMIT_BYTES,
**connection_kwargs: Any,
) -> None:
"""Run an ACP agent over the given input/output streams.
This is a convenience function that creates an :class:`AgentSideConnection`
and starts listening for incoming messages.
Args:
agent: The agent implementation to run.
input_stream: The (client) input stream to write to (defaults: ``sys.stdin``).
output_stream: The (client) output stream to read from (defaults: ``sys.stdout``).
use_unstable_protocol: Whether to enable unstable protocol features.
**connection_kwargs: Additional keyword arguments to pass to the
:class:`AgentSideConnection` constructor.
"""
from .stdio import stdio_streams
if input_stream is None and output_stream is None:
output_stream, input_stream = await stdio_streams(limit=stdio_buffer_limit_bytes)
conn = AgentSideConnection(
agent,
input_stream,
output_stream,
listening=False,
use_unstable_protocol=use_unstable_protocol,
**connection_kwargs,
)
await conn.listen()
def connect_to_agent(
client: Client,
input_stream: Any,
output_stream: Any,
*,
use_unstable_protocol: bool = False,
**connection_kwargs: Any,
) -> ClientSideConnection:
"""Create a ClientSideConnection to an ACP agent over the given input/output streams.
Args:
client: The client implementation to use.
input_stream: The (agent) input stream to write to (default: ``sys.stdin``).
output_stream: The (agent) output stream to read from (default: ``sys.stdout``).
use_unstable_protocol: Whether to enable unstable protocol features.
**connection_kwargs: Additional keyword arguments to pass to the
:class:`ClientSideConnection` constructor.
Returns:
A :class:`ClientSideConnection` instance connected to the agent.
"""
return ClientSideConnection(
client, input_stream, output_stream, use_unstable_protocol=use_unstable_protocol, **connection_kwargs
)