diff --git a/content/en/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md b/content/en/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md
index 33b5e90164..be3c847ba2 100644
--- a/content/en/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md
+++ b/content/en/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md
@@ -1,6 +1,6 @@
---
Description: ""
-date: "2026-05-17"
+date: "2026-05-22"
lastmod: ""
tags: []
title: Agent Cancel and TurnLoop Quick Start
@@ -14,9 +14,7 @@ A quick start guide for the two core features in Eino ADK: **Agent Cancel** and
All examples in this document use the following generic instantiations:
- `T = string` (the business item type pushed to TurnLoop)
-- `M = *schema.Message` (the Agent message type, i.e., the standard `Message`)
-
-ADK type aliases:
+- `M = *schema.Message` (the Agent message type, i.e., the standard `Message`) ADK type aliases:
```go
type Agent = TypedAgent[*schema.Message]
@@ -24,9 +22,7 @@ type AgentInput = TypedAgentInput[*schema.Message]
type AgentEvent = TypedAgentEvent[*schema.Message]
```
-When using `*schema.AgenticMessage`, simply replace `M` with the corresponding type—all API signatures are completely symmetric.
-
----
+## When using `*schema.AgenticMessage`, simply replace `M` with the corresponding type—all API signatures are completely symmetric.
## Part 1: Agent Cancel
@@ -130,8 +126,6 @@ Build a continuously running agent service: users send messages at any time, the
### Turn Lifecycle
-
-
### Basic Usage
```go
@@ -406,105 +400,199 @@ On normal exit (without saving a new checkpoint), TurnLoop will attempt to delet
## Part 4: Complete Example
-Simulates a chat service supporting priority scheduling, preemption, and checkpoint recovery:
+Simulates a chat service supporting priority scheduling, preemption, and checkpoint recovery. This example can be compiled and run directly (replace `myModel` with a real ChatModel implementation).
```go
package main
import (
- "context"
- "log"
- "strings"
- "time"
-
- "github.com/cloudwego/eino/adk"
- "github.com/cloudwego/eino/schema"
+ "context"
+ "fmt"
+ "log"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/cloudwego/eino/adk"
+ "github.com/cloudwego/eino/schema"
)
-func main() {
- ctx := context.Background()
- store := adk.NewInMemoryStore()
-
- cfg := adk.TurnLoopConfig[string, *schema.Message]{
- GenInput: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], items []string) (*adk.GenInputResult[string, *schema.Message], error) {
- // Sort by priority, consume only the first item, keep the rest for subsequent turns
- sorted := sortByPriority(items)
- return &adk.GenInputResult[string, *schema.Message]{
- Input: &adk.AgentInput{Messages: []*schema.Message{schema.UserMessage(sorted[0])}},
- Consumed: sorted[:1],
- Remaining: sorted[1:], // Items not in either will be discarded
- }, nil
- },
-
- GenResume: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], interruptedItems, unhandledItems, newItems []string) (*adk.GenResumeResult[string, *schema.Message], error) {
- all := append(append(interruptedItems, unhandledItems...), newItems...)
- return &adk.GenResumeResult[string, *schema.Message]{
- Consumed: all[:1],
- Remaining: all[1:],
- }, nil
- },
-
- PrepareAgent: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], consumed []string) (adk.Agent, error) {
- return buildAgent(consumed), nil
- },
-
- OnAgentEvents: func(ctx context.Context, tc *adk.TurnContext[string, *schema.Message], events *adk.AsyncIterator[*adk.AgentEvent]) error {
- for {
- event, ok := events.Next()
- if !ok {
- break
- }
- // Detect preemption/stop signals for cleanup
- select {
- case <-tc.Preempted:
- log.Println("Preempted by higher priority message")
- case <-tc.Stopped:
- log.Printf("Service shutting down: %s", tc.StopCause())
- default:
- }
- if event.Err != nil {
- // Don't propagate CancelError, framework handles it automatically
- return event.Err
- }
- log.Printf("[%s] %s", event.AgentName, extractText(event))
- }
- return nil
- },
+// --- 1. Implement CheckPointStore interface ---
- Store: store,
- CheckpointID: "chat-session-001",
- }
+type InMemoryStore struct {
+ mu sync.Mutex
+ m map[string][]byte
+}
+
+func NewInMemoryStore() *InMemoryStore {
+ return &InMemoryStore{m: make(map[string][]byte)}
+}
+
+func (s *InMemoryStore) Get(_ context.Context, id string) ([]byte, bool, error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ data, ok := s.m[id]
+ return data, ok, nil
+}
- loop := adk.NewTurnLoop(cfg)
- loop.Push("Hello, help me check the weather")
- loop.Run(ctx)
-
- // Send urgent message to preempt after 1 second
- time.AfterFunc(1*time.Second, func() {
- loop.Push("Stop! Handle this urgent issue first",
- adk.WithPreempt[string, *schema.Message](adk.AnySafePoint),
- )
- })
-
- // Graceful shutdown after 5 seconds
- time.AfterFunc(5*time.Second, func() {
- loop.Stop(
- adk.WithGracefulTimeout(3*time.Second),
- adk.WithStopCause("service shutdown"),
- )
- })
-
- result := loop.Wait()
- log.Printf("Exit reason: %v", result.ExitReason)
- log.Printf("Unhandled messages: %v", result.UnhandledItems)
- log.Printf("Stop cause: %s", result.StopCause)
- log.Printf("checkpoint: attempted=%v, err=%v", result.CheckpointAttempted, result.CheckpointErr)
-
- // Next startup with the same cfg will automatically resume from checkpoint
+func (s *InMemoryStore) Set(_ context.Context, id string, data []byte) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.m[id] = data
+ return nil
+}
+
+// Optional: implement CheckPointDeleter to support automatic cleanup of expired checkpoints
+func (s *InMemoryStore) Delete(_ context.Context, id string) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ delete(s.m, id)
+ return nil
+}
+
+// --- 2. Implement a minimal Agent (use adk.NewChatModelAgent in production) ---
+
+type echoAgent struct{}
+
+func (a *echoAgent) Name(_ context.Context) string { return "EchoAgent" }
+func (a *echoAgent) Description(_ context.Context) string { return "echoes input" }
+
+func (a *echoAgent) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.AgentRunOption) *adk.AsyncIterator[*adk.AgentEvent] {
+ iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]()
+ go func() {
+ defer gen.Close()
+ // Simulate time-consuming processing
+ select {
+ case <-time.After(500 * time.Millisecond):
+ case <-ctx.Done():
+ gen.Send(&adk.AgentEvent{Err: ctx.Err()})
+ return
+ }
+ // Return echo result
+ reply := "Echo: "
+ if len(input.Messages) > 0 {
+ reply += input.Messages[0].Content
+ }
+ gen.Send(&adk.AgentEvent{
+ AgentName: "EchoAgent",
+ Output: &adk.AgentOutput{
+ MessageOutput: &adk.MessageVariant{
+ Message: schema.AssistantMessage(reply, nil),
+ },
+ },
+ })
+ }()
+ return iter
+}
+
+// --- 3. Priority sorting helper function ---
+
+func sortByPriority(items []string) []string {
+ sorted := make([]string, len(items))
+ copy(sorted, items)
+ sort.SliceStable(sorted, func(i, j int) bool {
+ // Items starting with "!" are treated as high priority
+ return strings.HasPrefix(sorted[i], "!") && !strings.HasPrefix(sorted[j], "!")
+ })
+ return sorted
+}
+
+// --- 4. Main flow ---
+
+func main() {
+ ctx := context.Background()
+ store := NewInMemoryStore()
+ agent := &echoAgent{}
+
+ cfg := adk.TurnLoopConfig[string, *schema.Message]{
+ GenInput: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], items []string) (*adk.GenInputResult[string, *schema.Message], error) {
+ // Sort by priority, consume only the first item, keep the rest for subsequent turns
+ sorted := sortByPriority(items)
+ return &adk.GenInputResult[string, *schema.Message]{
+ Input: &adk.AgentInput{Messages: []*schema.Message{schema.UserMessage(sorted[0])}},
+ Consumed: sorted[:1],
+ Remaining: sorted[1:],
+ }, nil
+ },
+
+ GenResume: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], interruptedItems, unhandledItems, newItems []string) (*adk.GenResumeResult[string, *schema.Message], error) {
+ all := append(append(interruptedItems, unhandledItems...), newItems...)
+ return &adk.GenResumeResult[string, *schema.Message]{
+ Consumed: all[:1],
+ Remaining: all[1:],
+ }, nil
+ },
+
+ PrepareAgent: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], consumed []string) (adk.Agent, error) {
+ return agent, nil
+ },
+
+ OnAgentEvents: func(ctx context.Context, tc *adk.TurnContext[string, *schema.Message], events *adk.AsyncIterator[*adk.AgentEvent]) error {
+ for {
+ event, ok := events.Next()
+ if !ok {
+ break
+ }
+ // Detect preemption/stop signals for cleanup
+ select {
+ case <-tc.Preempted:
+ log.Println("Preempted by higher priority message")
+ case <-tc.Stopped:
+ log.Printf("Service shutting down: %s", tc.StopCause())
+ default:
+ }
+ if event.Err != nil {
+ // Don't propagate CancelError, framework handles it automatically
+ return event.Err
+ }
+ if event.Output != nil && event.Output.MessageOutput != nil {
+ fmt.Printf("[%s] %s\n", event.AgentName, event.Output.MessageOutput.Message.Content)
+ }
+ }
+ return nil
+ },
+
+ Store: store,
+ CheckpointID: "session-123",
+ }
+
+ // First run
+ loop := adk.NewTurnLoop(cfg)
+ loop.Push("normal message")
+ loop.Push("low priority task")
+ loop.Run(ctx)
+
+ // Simulate pushing an urgent message after a delay (triggers preemption)
+ time.AfterFunc(200*time.Millisecond, func() {
+ accepted, ack := loop.Push("!urgent message",
+ adk.WithPreempt[string, *schema.Message](adk.AnySafePoint),
+ )
+ if accepted {
+ <-ack
+ log.Println("Preemption signal acknowledged")
+ }
+ })
+
+ // Graceful stop after 2 seconds
+ time.AfterFunc(2*time.Second, func() {
+ loop.Stop(
+ adk.WithGraceful(),
+ adk.WithStopCause("demo timeout"),
+ )
+ })
+
+ result := loop.Wait()
+ fmt.Printf("Exit reason: %v\n", result.ExitReason)
+ fmt.Printf("Stop cause: %s\n", result.StopCause)
+ fmt.Printf("checkpoint: attempted=%v, err=%v\n", result.CheckpointAttempted, result.CheckpointErr)
+
+ // Second run (same cfg, containing the same CheckpointID) will automatically resume from checkpoint
}
```
----
+> 💡
+> In production, replace `echoAgent` with `adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{...})`. The `CheckPointStore` implementation can use Redis / database or other persistence solutions.
## FAQ
diff --git a/content/zh/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md b/content/zh/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md
index 04a64e15da..62da3beaab 100644
--- a/content/zh/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md
+++ b/content/zh/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md
@@ -1,6 +1,6 @@
---
Description: ""
-date: "2026-05-17"
+date: "2026-05-22"
lastmod: ""
tags: []
title: Agent Cancel 与 TurnLoop 快速入门
@@ -14,9 +14,7 @@ Eino ADK 中 **Agent 取消** 和 **TurnLoop** 两项核心特性的快速入门
本文示例统一使用以下泛型实例化:
- `T = string`(推送给 TurnLoop 的业务项类型)
-- `M = *schema.Message`(Agent 消息类型,即标准 `Message`)
-
-ADK 中相关类型别名:
+- `M = *schema.Message`(Agent 消息类型,即标准 `Message`)ADK 中相关类型别名:
```go
type Agent = TypedAgent[*schema.Message]
@@ -24,9 +22,7 @@ type AgentInput = TypedAgentInput[*schema.Message]
type AgentEvent = TypedAgentEvent[*schema.Message]
```
-当需要使用 `*schema.AgenticMessage` 时,将 `M` 替换为对应类型即可,所有 API 签名完全对称。
-
----
+## 当需要使用 `*schema.AgenticMessage` 时,将 `M` 替换为对应类型即可,所有 API 签名完全对称。
## 第一部分:Agent 取消
@@ -130,8 +126,6 @@ for {
### Turn 生命周期
-
-
### 基本用法
```go
@@ -406,105 +400,199 @@ type CheckPointDeleter interface {
## 第四部分:完整示例
-模拟一个支持优先级调度、抢占和 checkpoint 恢复的聊天服务:
+模拟一个支持优先级调度、抢占和 checkpoint 恢复的聊天服务。本示例可直接编译运行(需替换 `myModel` 为真实 ChatModel 实现)。
```go
package main
import (
- "context"
- "log"
- "strings"
- "time"
-
- "github.com/cloudwego/eino/adk"
- "github.com/cloudwego/eino/schema"
+ "context"
+ "fmt"
+ "log"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/cloudwego/eino/adk"
+ "github.com/cloudwego/eino/schema"
)
-func main() {
- ctx := context.Background()
- store := adk.NewInMemoryStore()
-
- cfg := adk.TurnLoopConfig[string, *schema.Message]{
- GenInput: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], items []string) (*adk.GenInputResult[string, *schema.Message], error) {
- // 按优先级排序后,只消费第一条,其余留给后续轮次
- sorted := sortByPriority(items)
- return &adk.GenInputResult[string, *schema.Message]{
- Input: &adk.AgentInput{Messages: []*schema.Message{schema.UserMessage(sorted[0])}},
- Consumed: sorted[:1],
- Remaining: sorted[1:], // 不在两者中的项目会被丢弃
- }, nil
- },
-
- GenResume: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], interruptedItems, unhandledItems, newItems []string) (*adk.GenResumeResult[string, *schema.Message], error) {
- all := append(append(interruptedItems, unhandledItems...), newItems...)
- return &adk.GenResumeResult[string, *schema.Message]{
- Consumed: all[:1],
- Remaining: all[1:],
- }, nil
- },
-
- PrepareAgent: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], consumed []string) (adk.Agent, error) {
- return buildAgent(consumed), nil
- },
-
- OnAgentEvents: func(ctx context.Context, tc *adk.TurnContext[string, *schema.Message], events *adk.AsyncIterator[*adk.AgentEvent]) error {
- for {
- event, ok := events.Next()
- if !ok {
- break
- }
- // 感知抢占/停止信号,做收尾处理
- select {
- case <-tc.Preempted:
- log.Println("被更高优先级消息抢占")
- case <-tc.Stopped:
- log.Printf("服务关停: %s", tc.StopCause())
- default:
- }
- if event.Err != nil {
- // 不传播 CancelError,框架自动处理
- return event.Err
- }
- log.Printf("[%s] %s", event.AgentName, extractText(event))
- }
- return nil
- },
+// --- 1. 实现 CheckPointStore 接口 ---
- Store: store,
- CheckpointID: "chat-session-001",
- }
+type InMemoryStore struct {
+ mu sync.Mutex
+ m map[string][]byte
+}
+
+func NewInMemoryStore() *InMemoryStore {
+ return &InMemoryStore{m: make(map[string][]byte)}
+}
+
+func (s *InMemoryStore) Get(_ context.Context, id string) ([]byte, bool, error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ data, ok := s.m[id]
+ return data, ok, nil
+}
- loop := adk.NewTurnLoop(cfg)
- loop.Push("你好,帮我查一下天气")
- loop.Run(ctx)
-
- // 1 秒后发送紧急消息抢占
- time.AfterFunc(1*time.Second, func() {
- loop.Push("停!先帮我处理这个紧急问题",
- adk.WithPreempt[string, *schema.Message](adk.AnySafePoint),
- )
- })
-
- // 5 秒后优雅关停
- time.AfterFunc(5*time.Second, func() {
- loop.Stop(
- adk.WithGracefulTimeout(3*time.Second),
- adk.WithStopCause("service shutdown"),
- )
- })
-
- result := loop.Wait()
- log.Printf("退出原因: %v", result.ExitReason)
- log.Printf("未处理消息: %v", result.UnhandledItems)
- log.Printf("停止原因: %s", result.StopCause)
- log.Printf("checkpoint: attempted=%v, err=%v", result.CheckpointAttempted, result.CheckpointErr)
-
- // 下次以相同 cfg 启动将自动从 checkpoint 恢复
+func (s *InMemoryStore) Set(_ context.Context, id string, data []byte) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.m[id] = data
+ return nil
+}
+
+// 可选:实现 CheckPointDeleter 以支持自动清理过期 checkpoint
+func (s *InMemoryStore) Delete(_ context.Context, id string) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ delete(s.m, id)
+ return nil
+}
+
+// --- 2. 实现一个最小 Agent(生产环境请使用 adk.NewChatModelAgent) ---
+
+type echoAgent struct{}
+
+func (a *echoAgent) Name(_ context.Context) string { return "EchoAgent" }
+func (a *echoAgent) Description(_ context.Context) string { return "echoes input" }
+
+func (a *echoAgent) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.AgentRunOption) *adk.AsyncIterator[*adk.AgentEvent] {
+ iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]()
+ go func() {
+ defer gen.Close()
+ // 模拟耗时处理
+ select {
+ case <-time.After(500 * time.Millisecond):
+ case <-ctx.Done():
+ gen.Send(&adk.AgentEvent{Err: ctx.Err()})
+ return
+ }
+ // 返回 echo 结果
+ reply := "Echo: "
+ if len(input.Messages) > 0 {
+ reply += input.Messages[0].Content
+ }
+ gen.Send(&adk.AgentEvent{
+ AgentName: "EchoAgent",
+ Output: &adk.AgentOutput{
+ MessageOutput: &adk.MessageVariant{
+ Message: schema.AssistantMessage(reply, nil),
+ },
+ },
+ })
+ }()
+ return iter
+}
+
+// --- 3. 优先级排序辅助函数 ---
+
+func sortByPriority(items []string) []string {
+ sorted := make([]string, len(items))
+ copy(sorted, items)
+ sort.SliceStable(sorted, func(i, j int) bool {
+ // 以 "!" 开头的视为高优先级
+ return strings.HasPrefix(sorted[i], "!") && !strings.HasPrefix(sorted[j], "!")
+ })
+ return sorted
+}
+
+// --- 4. 主流程 ---
+
+func main() {
+ ctx := context.Background()
+ store := NewInMemoryStore()
+ agent := &echoAgent{}
+
+ cfg := adk.TurnLoopConfig[string, *schema.Message]{
+ GenInput: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], items []string) (*adk.GenInputResult[string, *schema.Message], error) {
+ // 按优先级排序后,只消费第一条,其余留给后续轮次
+ sorted := sortByPriority(items)
+ return &adk.GenInputResult[string, *schema.Message]{
+ Input: &adk.AgentInput{Messages: []*schema.Message{schema.UserMessage(sorted[0])}},
+ Consumed: sorted[:1],
+ Remaining: sorted[1:],
+ }, nil
+ },
+
+ GenResume: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], interruptedItems, unhandledItems, newItems []string) (*adk.GenResumeResult[string, *schema.Message], error) {
+ all := append(append(interruptedItems, unhandledItems...), newItems...)
+ return &adk.GenResumeResult[string, *schema.Message]{
+ Consumed: all[:1],
+ Remaining: all[1:],
+ }, nil
+ },
+
+ PrepareAgent: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], consumed []string) (adk.Agent, error) {
+ return agent, nil
+ },
+
+ OnAgentEvents: func(ctx context.Context, tc *adk.TurnContext[string, *schema.Message], events *adk.AsyncIterator[*adk.AgentEvent]) error {
+ for {
+ event, ok := events.Next()
+ if !ok {
+ break
+ }
+ // 感知抢占/停止信号,做收尾处理
+ select {
+ case <-tc.Preempted:
+ log.Println("被更高优先级消息抢占")
+ case <-tc.Stopped:
+ log.Printf("服务关停: %s", tc.StopCause())
+ default:
+ }
+ if event.Err != nil {
+ // 不传播 CancelError,框架自动处理
+ return event.Err
+ }
+ if event.Output != nil && event.Output.MessageOutput != nil {
+ fmt.Printf("[%s] %s\n", event.AgentName, event.Output.MessageOutput.Message.Content)
+ }
+ }
+ return nil
+ },
+
+ Store: store,
+ CheckpointID: "session-123",
+ }
+
+ // 第一次运行
+ loop := adk.NewTurnLoop(cfg)
+ loop.Push("普通消息")
+ loop.Push("低优先级任务")
+ loop.Run(ctx)
+
+ // 模拟延迟后推入紧急消息(触发抢占)
+ time.AfterFunc(200*time.Millisecond, func() {
+ accepted, ack := loop.Push("!紧急消息",
+ adk.WithPreempt[string, *schema.Message](adk.AnySafePoint),
+ )
+ if accepted {
+ <-ack
+ log.Println("抢占信号已确认")
+ }
+ })
+
+ // 2 秒后优雅停止
+ time.AfterFunc(2*time.Second, func() {
+ loop.Stop(
+ adk.WithGraceful(),
+ adk.WithStopCause("demo timeout"),
+ )
+ })
+
+ result := loop.Wait()
+ fmt.Printf("退出原因: %v\n", result.ExitReason)
+ fmt.Printf("停止原因: %s\n", result.StopCause)
+ fmt.Printf("checkpoint: attempted=%v, err=%v\n", result.CheckpointAttempted, result.CheckpointErr)
+
+ // 第二次运行(相同 cfg,包含相同 CheckpointID)将自动从 checkpoint 恢复
}
```
----
+> 💡
+> 生产环境中,将 `echoAgent` 替换为 `adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{...})`。`CheckPointStore` 实现可使用 Redis / 数据库等持久化方案。
## 常见问题