From 8bb16c9529371d7d01ebbc7a8ca3f1fe831f4881 Mon Sep 17 00:00:00 2001 From: "shentong.martin" Date: Fri, 22 May 2026 11:03:57 +0800 Subject: [PATCH] docs(eino): update agent cancel and turnloop quickstart translation --- .../agent_cancel_and_turnloop_quickstart.md | 280 ++++++++++++------ .../agent_cancel_and_turnloop_quickstart.md | 280 ++++++++++++------ 2 files changed, 368 insertions(+), 192 deletions(-) diff --git a/content/en/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md b/content/en/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md index 33b5e90164..be3c847ba2 100644 --- a/content/en/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md +++ b/content/en/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md @@ -1,6 +1,6 @@ --- Description: "" -date: "2026-05-17" +date: "2026-05-22" lastmod: "" tags: [] title: Agent Cancel and TurnLoop Quick Start @@ -14,9 +14,7 @@ A quick start guide for the two core features in Eino ADK: **Agent Cancel** and All examples in this document use the following generic instantiations: - `T = string` (the business item type pushed to TurnLoop) -- `M = *schema.Message` (the Agent message type, i.e., the standard `Message`) - -ADK type aliases: +- `M = *schema.Message` (the Agent message type, i.e., the standard `Message`) ADK type aliases: ```go type Agent = TypedAgent[*schema.Message] @@ -24,9 +22,7 @@ type AgentInput = TypedAgentInput[*schema.Message] type AgentEvent = TypedAgentEvent[*schema.Message] ``` -When using `*schema.AgenticMessage`, simply replace `M` with the corresponding type—all API signatures are completely symmetric. - ---- +## When using `*schema.AgenticMessage`, simply replace `M` with the corresponding type—all API signatures are completely symmetric. ## Part 1: Agent Cancel @@ -130,8 +126,6 @@ Build a continuously running agent service: users send messages at any time, the ### Turn Lifecycle - - ### Basic Usage ```go @@ -406,105 +400,199 @@ On normal exit (without saving a new checkpoint), TurnLoop will attempt to delet ## Part 4: Complete Example -Simulates a chat service supporting priority scheduling, preemption, and checkpoint recovery: +Simulates a chat service supporting priority scheduling, preemption, and checkpoint recovery. This example can be compiled and run directly (replace `myModel` with a real ChatModel implementation). ```go package main import ( - "context" - "log" - "strings" - "time" - - "github.com/cloudwego/eino/adk" - "github.com/cloudwego/eino/schema" + "context" + "fmt" + "log" + "sort" + "strings" + "sync" + "time" + + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/schema" ) -func main() { - ctx := context.Background() - store := adk.NewInMemoryStore() - - cfg := adk.TurnLoopConfig[string, *schema.Message]{ - GenInput: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], items []string) (*adk.GenInputResult[string, *schema.Message], error) { - // Sort by priority, consume only the first item, keep the rest for subsequent turns - sorted := sortByPriority(items) - return &adk.GenInputResult[string, *schema.Message]{ - Input: &adk.AgentInput{Messages: []*schema.Message{schema.UserMessage(sorted[0])}}, - Consumed: sorted[:1], - Remaining: sorted[1:], // Items not in either will be discarded - }, nil - }, - - GenResume: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], interruptedItems, unhandledItems, newItems []string) (*adk.GenResumeResult[string, *schema.Message], error) { - all := append(append(interruptedItems, unhandledItems...), newItems...) - return &adk.GenResumeResult[string, *schema.Message]{ - Consumed: all[:1], - Remaining: all[1:], - }, nil - }, - - PrepareAgent: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], consumed []string) (adk.Agent, error) { - return buildAgent(consumed), nil - }, - - OnAgentEvents: func(ctx context.Context, tc *adk.TurnContext[string, *schema.Message], events *adk.AsyncIterator[*adk.AgentEvent]) error { - for { - event, ok := events.Next() - if !ok { - break - } - // Detect preemption/stop signals for cleanup - select { - case <-tc.Preempted: - log.Println("Preempted by higher priority message") - case <-tc.Stopped: - log.Printf("Service shutting down: %s", tc.StopCause()) - default: - } - if event.Err != nil { - // Don't propagate CancelError, framework handles it automatically - return event.Err - } - log.Printf("[%s] %s", event.AgentName, extractText(event)) - } - return nil - }, +// --- 1. Implement CheckPointStore interface --- - Store: store, - CheckpointID: "chat-session-001", - } +type InMemoryStore struct { + mu sync.Mutex + m map[string][]byte +} + +func NewInMemoryStore() *InMemoryStore { + return &InMemoryStore{m: make(map[string][]byte)} +} + +func (s *InMemoryStore) Get(_ context.Context, id string) ([]byte, bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + data, ok := s.m[id] + return data, ok, nil +} - loop := adk.NewTurnLoop(cfg) - loop.Push("Hello, help me check the weather") - loop.Run(ctx) - - // Send urgent message to preempt after 1 second - time.AfterFunc(1*time.Second, func() { - loop.Push("Stop! Handle this urgent issue first", - adk.WithPreempt[string, *schema.Message](adk.AnySafePoint), - ) - }) - - // Graceful shutdown after 5 seconds - time.AfterFunc(5*time.Second, func() { - loop.Stop( - adk.WithGracefulTimeout(3*time.Second), - adk.WithStopCause("service shutdown"), - ) - }) - - result := loop.Wait() - log.Printf("Exit reason: %v", result.ExitReason) - log.Printf("Unhandled messages: %v", result.UnhandledItems) - log.Printf("Stop cause: %s", result.StopCause) - log.Printf("checkpoint: attempted=%v, err=%v", result.CheckpointAttempted, result.CheckpointErr) - - // Next startup with the same cfg will automatically resume from checkpoint +func (s *InMemoryStore) Set(_ context.Context, id string, data []byte) error { + s.mu.Lock() + defer s.mu.Unlock() + s.m[id] = data + return nil +} + +// Optional: implement CheckPointDeleter to support automatic cleanup of expired checkpoints +func (s *InMemoryStore) Delete(_ context.Context, id string) error { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.m, id) + return nil +} + +// --- 2. Implement a minimal Agent (use adk.NewChatModelAgent in production) --- + +type echoAgent struct{} + +func (a *echoAgent) Name(_ context.Context) string { return "EchoAgent" } +func (a *echoAgent) Description(_ context.Context) string { return "echoes input" } + +func (a *echoAgent) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.AgentRunOption) *adk.AsyncIterator[*adk.AgentEvent] { + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + defer gen.Close() + // Simulate time-consuming processing + select { + case <-time.After(500 * time.Millisecond): + case <-ctx.Done(): + gen.Send(&adk.AgentEvent{Err: ctx.Err()}) + return + } + // Return echo result + reply := "Echo: " + if len(input.Messages) > 0 { + reply += input.Messages[0].Content + } + gen.Send(&adk.AgentEvent{ + AgentName: "EchoAgent", + Output: &adk.AgentOutput{ + MessageOutput: &adk.MessageVariant{ + Message: schema.AssistantMessage(reply, nil), + }, + }, + }) + }() + return iter +} + +// --- 3. Priority sorting helper function --- + +func sortByPriority(items []string) []string { + sorted := make([]string, len(items)) + copy(sorted, items) + sort.SliceStable(sorted, func(i, j int) bool { + // Items starting with "!" are treated as high priority + return strings.HasPrefix(sorted[i], "!") && !strings.HasPrefix(sorted[j], "!") + }) + return sorted +} + +// --- 4. Main flow --- + +func main() { + ctx := context.Background() + store := NewInMemoryStore() + agent := &echoAgent{} + + cfg := adk.TurnLoopConfig[string, *schema.Message]{ + GenInput: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], items []string) (*adk.GenInputResult[string, *schema.Message], error) { + // Sort by priority, consume only the first item, keep the rest for subsequent turns + sorted := sortByPriority(items) + return &adk.GenInputResult[string, *schema.Message]{ + Input: &adk.AgentInput{Messages: []*schema.Message{schema.UserMessage(sorted[0])}}, + Consumed: sorted[:1], + Remaining: sorted[1:], + }, nil + }, + + GenResume: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], interruptedItems, unhandledItems, newItems []string) (*adk.GenResumeResult[string, *schema.Message], error) { + all := append(append(interruptedItems, unhandledItems...), newItems...) + return &adk.GenResumeResult[string, *schema.Message]{ + Consumed: all[:1], + Remaining: all[1:], + }, nil + }, + + PrepareAgent: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], consumed []string) (adk.Agent, error) { + return agent, nil + }, + + OnAgentEvents: func(ctx context.Context, tc *adk.TurnContext[string, *schema.Message], events *adk.AsyncIterator[*adk.AgentEvent]) error { + for { + event, ok := events.Next() + if !ok { + break + } + // Detect preemption/stop signals for cleanup + select { + case <-tc.Preempted: + log.Println("Preempted by higher priority message") + case <-tc.Stopped: + log.Printf("Service shutting down: %s", tc.StopCause()) + default: + } + if event.Err != nil { + // Don't propagate CancelError, framework handles it automatically + return event.Err + } + if event.Output != nil && event.Output.MessageOutput != nil { + fmt.Printf("[%s] %s\n", event.AgentName, event.Output.MessageOutput.Message.Content) + } + } + return nil + }, + + Store: store, + CheckpointID: "session-123", + } + + // First run + loop := adk.NewTurnLoop(cfg) + loop.Push("normal message") + loop.Push("low priority task") + loop.Run(ctx) + + // Simulate pushing an urgent message after a delay (triggers preemption) + time.AfterFunc(200*time.Millisecond, func() { + accepted, ack := loop.Push("!urgent message", + adk.WithPreempt[string, *schema.Message](adk.AnySafePoint), + ) + if accepted { + <-ack + log.Println("Preemption signal acknowledged") + } + }) + + // Graceful stop after 2 seconds + time.AfterFunc(2*time.Second, func() { + loop.Stop( + adk.WithGraceful(), + adk.WithStopCause("demo timeout"), + ) + }) + + result := loop.Wait() + fmt.Printf("Exit reason: %v\n", result.ExitReason) + fmt.Printf("Stop cause: %s\n", result.StopCause) + fmt.Printf("checkpoint: attempted=%v, err=%v\n", result.CheckpointAttempted, result.CheckpointErr) + + // Second run (same cfg, containing the same CheckpointID) will automatically resume from checkpoint } ``` ---- +> 💡 +> In production, replace `echoAgent` with `adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{...})`. The `CheckPointStore` implementation can use Redis / database or other persistence solutions. ## FAQ diff --git a/content/zh/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md b/content/zh/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md index 04a64e15da..62da3beaab 100644 --- a/content/zh/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md +++ b/content/zh/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md @@ -1,6 +1,6 @@ --- Description: "" -date: "2026-05-17" +date: "2026-05-22" lastmod: "" tags: [] title: Agent Cancel 与 TurnLoop 快速入门 @@ -14,9 +14,7 @@ Eino ADK 中 **Agent 取消** 和 **TurnLoop** 两项核心特性的快速入门 本文示例统一使用以下泛型实例化: - `T = string`(推送给 TurnLoop 的业务项类型) -- `M = *schema.Message`(Agent 消息类型,即标准 `Message`) - -ADK 中相关类型别名: +- `M = *schema.Message`(Agent 消息类型,即标准 `Message`)ADK 中相关类型别名: ```go type Agent = TypedAgent[*schema.Message] @@ -24,9 +22,7 @@ type AgentInput = TypedAgentInput[*schema.Message] type AgentEvent = TypedAgentEvent[*schema.Message] ``` -当需要使用 `*schema.AgenticMessage` 时,将 `M` 替换为对应类型即可,所有 API 签名完全对称。 - ---- +## 当需要使用 `*schema.AgenticMessage` 时,将 `M` 替换为对应类型即可,所有 API 签名完全对称。 ## 第一部分:Agent 取消 @@ -130,8 +126,6 @@ for { ### Turn 生命周期 - - ### 基本用法 ```go @@ -406,105 +400,199 @@ type CheckPointDeleter interface { ## 第四部分:完整示例 -模拟一个支持优先级调度、抢占和 checkpoint 恢复的聊天服务: +模拟一个支持优先级调度、抢占和 checkpoint 恢复的聊天服务。本示例可直接编译运行(需替换 `myModel` 为真实 ChatModel 实现)。 ```go package main import ( - "context" - "log" - "strings" - "time" - - "github.com/cloudwego/eino/adk" - "github.com/cloudwego/eino/schema" + "context" + "fmt" + "log" + "sort" + "strings" + "sync" + "time" + + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/schema" ) -func main() { - ctx := context.Background() - store := adk.NewInMemoryStore() - - cfg := adk.TurnLoopConfig[string, *schema.Message]{ - GenInput: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], items []string) (*adk.GenInputResult[string, *schema.Message], error) { - // 按优先级排序后,只消费第一条,其余留给后续轮次 - sorted := sortByPriority(items) - return &adk.GenInputResult[string, *schema.Message]{ - Input: &adk.AgentInput{Messages: []*schema.Message{schema.UserMessage(sorted[0])}}, - Consumed: sorted[:1], - Remaining: sorted[1:], // 不在两者中的项目会被丢弃 - }, nil - }, - - GenResume: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], interruptedItems, unhandledItems, newItems []string) (*adk.GenResumeResult[string, *schema.Message], error) { - all := append(append(interruptedItems, unhandledItems...), newItems...) - return &adk.GenResumeResult[string, *schema.Message]{ - Consumed: all[:1], - Remaining: all[1:], - }, nil - }, - - PrepareAgent: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], consumed []string) (adk.Agent, error) { - return buildAgent(consumed), nil - }, - - OnAgentEvents: func(ctx context.Context, tc *adk.TurnContext[string, *schema.Message], events *adk.AsyncIterator[*adk.AgentEvent]) error { - for { - event, ok := events.Next() - if !ok { - break - } - // 感知抢占/停止信号,做收尾处理 - select { - case <-tc.Preempted: - log.Println("被更高优先级消息抢占") - case <-tc.Stopped: - log.Printf("服务关停: %s", tc.StopCause()) - default: - } - if event.Err != nil { - // 不传播 CancelError,框架自动处理 - return event.Err - } - log.Printf("[%s] %s", event.AgentName, extractText(event)) - } - return nil - }, +// --- 1. 实现 CheckPointStore 接口 --- - Store: store, - CheckpointID: "chat-session-001", - } +type InMemoryStore struct { + mu sync.Mutex + m map[string][]byte +} + +func NewInMemoryStore() *InMemoryStore { + return &InMemoryStore{m: make(map[string][]byte)} +} + +func (s *InMemoryStore) Get(_ context.Context, id string) ([]byte, bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + data, ok := s.m[id] + return data, ok, nil +} - loop := adk.NewTurnLoop(cfg) - loop.Push("你好,帮我查一下天气") - loop.Run(ctx) - - // 1 秒后发送紧急消息抢占 - time.AfterFunc(1*time.Second, func() { - loop.Push("停!先帮我处理这个紧急问题", - adk.WithPreempt[string, *schema.Message](adk.AnySafePoint), - ) - }) - - // 5 秒后优雅关停 - time.AfterFunc(5*time.Second, func() { - loop.Stop( - adk.WithGracefulTimeout(3*time.Second), - adk.WithStopCause("service shutdown"), - ) - }) - - result := loop.Wait() - log.Printf("退出原因: %v", result.ExitReason) - log.Printf("未处理消息: %v", result.UnhandledItems) - log.Printf("停止原因: %s", result.StopCause) - log.Printf("checkpoint: attempted=%v, err=%v", result.CheckpointAttempted, result.CheckpointErr) - - // 下次以相同 cfg 启动将自动从 checkpoint 恢复 +func (s *InMemoryStore) Set(_ context.Context, id string, data []byte) error { + s.mu.Lock() + defer s.mu.Unlock() + s.m[id] = data + return nil +} + +// 可选:实现 CheckPointDeleter 以支持自动清理过期 checkpoint +func (s *InMemoryStore) Delete(_ context.Context, id string) error { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.m, id) + return nil +} + +// --- 2. 实现一个最小 Agent(生产环境请使用 adk.NewChatModelAgent) --- + +type echoAgent struct{} + +func (a *echoAgent) Name(_ context.Context) string { return "EchoAgent" } +func (a *echoAgent) Description(_ context.Context) string { return "echoes input" } + +func (a *echoAgent) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.AgentRunOption) *adk.AsyncIterator[*adk.AgentEvent] { + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + defer gen.Close() + // 模拟耗时处理 + select { + case <-time.After(500 * time.Millisecond): + case <-ctx.Done(): + gen.Send(&adk.AgentEvent{Err: ctx.Err()}) + return + } + // 返回 echo 结果 + reply := "Echo: " + if len(input.Messages) > 0 { + reply += input.Messages[0].Content + } + gen.Send(&adk.AgentEvent{ + AgentName: "EchoAgent", + Output: &adk.AgentOutput{ + MessageOutput: &adk.MessageVariant{ + Message: schema.AssistantMessage(reply, nil), + }, + }, + }) + }() + return iter +} + +// --- 3. 优先级排序辅助函数 --- + +func sortByPriority(items []string) []string { + sorted := make([]string, len(items)) + copy(sorted, items) + sort.SliceStable(sorted, func(i, j int) bool { + // 以 "!" 开头的视为高优先级 + return strings.HasPrefix(sorted[i], "!") && !strings.HasPrefix(sorted[j], "!") + }) + return sorted +} + +// --- 4. 主流程 --- + +func main() { + ctx := context.Background() + store := NewInMemoryStore() + agent := &echoAgent{} + + cfg := adk.TurnLoopConfig[string, *schema.Message]{ + GenInput: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], items []string) (*adk.GenInputResult[string, *schema.Message], error) { + // 按优先级排序后,只消费第一条,其余留给后续轮次 + sorted := sortByPriority(items) + return &adk.GenInputResult[string, *schema.Message]{ + Input: &adk.AgentInput{Messages: []*schema.Message{schema.UserMessage(sorted[0])}}, + Consumed: sorted[:1], + Remaining: sorted[1:], + }, nil + }, + + GenResume: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], interruptedItems, unhandledItems, newItems []string) (*adk.GenResumeResult[string, *schema.Message], error) { + all := append(append(interruptedItems, unhandledItems...), newItems...) + return &adk.GenResumeResult[string, *schema.Message]{ + Consumed: all[:1], + Remaining: all[1:], + }, nil + }, + + PrepareAgent: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], consumed []string) (adk.Agent, error) { + return agent, nil + }, + + OnAgentEvents: func(ctx context.Context, tc *adk.TurnContext[string, *schema.Message], events *adk.AsyncIterator[*adk.AgentEvent]) error { + for { + event, ok := events.Next() + if !ok { + break + } + // 感知抢占/停止信号,做收尾处理 + select { + case <-tc.Preempted: + log.Println("被更高优先级消息抢占") + case <-tc.Stopped: + log.Printf("服务关停: %s", tc.StopCause()) + default: + } + if event.Err != nil { + // 不传播 CancelError,框架自动处理 + return event.Err + } + if event.Output != nil && event.Output.MessageOutput != nil { + fmt.Printf("[%s] %s\n", event.AgentName, event.Output.MessageOutput.Message.Content) + } + } + return nil + }, + + Store: store, + CheckpointID: "session-123", + } + + // 第一次运行 + loop := adk.NewTurnLoop(cfg) + loop.Push("普通消息") + loop.Push("低优先级任务") + loop.Run(ctx) + + // 模拟延迟后推入紧急消息(触发抢占) + time.AfterFunc(200*time.Millisecond, func() { + accepted, ack := loop.Push("!紧急消息", + adk.WithPreempt[string, *schema.Message](adk.AnySafePoint), + ) + if accepted { + <-ack + log.Println("抢占信号已确认") + } + }) + + // 2 秒后优雅停止 + time.AfterFunc(2*time.Second, func() { + loop.Stop( + adk.WithGraceful(), + adk.WithStopCause("demo timeout"), + ) + }) + + result := loop.Wait() + fmt.Printf("退出原因: %v\n", result.ExitReason) + fmt.Printf("停止原因: %s\n", result.StopCause) + fmt.Printf("checkpoint: attempted=%v, err=%v\n", result.CheckpointAttempted, result.CheckpointErr) + + // 第二次运行(相同 cfg,包含相同 CheckpointID)将自动从 checkpoint 恢复 } ``` ---- +> 💡 +> 生产环境中,将 `echoAgent` 替换为 `adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{...})`。`CheckPointStore` 实现可使用 Redis / 数据库等持久化方案。 ## 常见问题