Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
Description: ""
date: "2026-05-17"
date: "2026-05-22"
lastmod: ""
tags: []
title: Agent Cancel and TurnLoop Quick Start
Expand All @@ -14,19 +14,15 @@ A quick start guide for the two core features in Eino ADK: **Agent Cancel** and
All examples in this document use the following generic instantiations:

- `T = string` (the business item type pushed to TurnLoop)
- `M = *schema.Message` (the Agent message type, i.e., the standard `Message`)

ADK type aliases:
- `M = *schema.Message` (the Agent message type, i.e., the standard `Message`) ADK type aliases:

```go
type Agent = TypedAgent[*schema.Message]
type AgentInput = TypedAgentInput[*schema.Message]
type AgentEvent = TypedAgentEvent[*schema.Message]
```

When using `*schema.AgenticMessage`, simply replace `M` with the corresponding type—all API signatures are completely symmetric.

---
## When using `*schema.AgenticMessage`, simply replace `M` with the corresponding type—all API signatures are completely symmetric.

## Part 1: Agent Cancel

Expand Down Expand Up @@ -130,8 +126,6 @@ Build a continuously running agent service: users send messages at any time, the

### Turn Lifecycle

<a href="/img/eino/XrWqwC669hGGoibW1q3c2ToTnvf.png" target="_blank"><img src="/img/eino/XrWqwC669hGGoibW1q3c2ToTnvf.png" width="100%" /></a>

### Basic Usage

```go
Expand Down Expand Up @@ -406,105 +400,199 @@ On normal exit (without saving a new checkpoint), TurnLoop will attempt to delet

## Part 4: Complete Example

Simulates a chat service supporting priority scheduling, preemption, and checkpoint recovery:
Simulates a chat service supporting priority scheduling, preemption, and checkpoint recovery. This example can be compiled and run directly (replace `myModel` with a real ChatModel implementation).

```go
package main

import (
"context"
"log"
"strings"
"time"

"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/schema"
"context"
"fmt"
"log"
"sort"
"strings"
"sync"
"time"

"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/schema"
)

func main() {
ctx := context.Background()
store := adk.NewInMemoryStore()

cfg := adk.TurnLoopConfig[string, *schema.Message]{
GenInput: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], items []string) (*adk.GenInputResult[string, *schema.Message], error) {
// Sort by priority, consume only the first item, keep the rest for subsequent turns
sorted := sortByPriority(items)
return &adk.GenInputResult[string, *schema.Message]{
Input: &adk.AgentInput{Messages: []*schema.Message{schema.UserMessage(sorted[0])}},
Consumed: sorted[:1],
Remaining: sorted[1:], // Items not in either will be discarded
}, nil
},

GenResume: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], interruptedItems, unhandledItems, newItems []string) (*adk.GenResumeResult[string, *schema.Message], error) {
all := append(append(interruptedItems, unhandledItems...), newItems...)
return &adk.GenResumeResult[string, *schema.Message]{
Consumed: all[:1],
Remaining: all[1:],
}, nil
},

PrepareAgent: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], consumed []string) (adk.Agent, error) {
return buildAgent(consumed), nil
},

OnAgentEvents: func(ctx context.Context, tc *adk.TurnContext[string, *schema.Message], events *adk.AsyncIterator[*adk.AgentEvent]) error {
for {
event, ok := events.Next()
if !ok {
break
}
// Detect preemption/stop signals for cleanup
select {
case <-tc.Preempted:
log.Println("Preempted by higher priority message")
case <-tc.Stopped:
log.Printf("Service shutting down: %s", tc.StopCause())
default:
}
if event.Err != nil {
// Don't propagate CancelError, framework handles it automatically
return event.Err
}
log.Printf("[%s] %s", event.AgentName, extractText(event))
}
return nil
},
// --- 1. Implement CheckPointStore interface ---

Store: store,
CheckpointID: "chat-session-001",
}
type InMemoryStore struct {
mu sync.Mutex
m map[string][]byte
}

func NewInMemoryStore() *InMemoryStore {
return &InMemoryStore{m: make(map[string][]byte)}
}

func (s *InMemoryStore) Get(_ context.Context, id string) ([]byte, bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
data, ok := s.m[id]
return data, ok, nil
}

loop := adk.NewTurnLoop(cfg)
loop.Push("Hello, help me check the weather")
loop.Run(ctx)

// Send urgent message to preempt after 1 second
time.AfterFunc(1*time.Second, func() {
loop.Push("Stop! Handle this urgent issue first",
adk.WithPreempt[string, *schema.Message](adk.AnySafePoint),
)
})

// Graceful shutdown after 5 seconds
time.AfterFunc(5*time.Second, func() {
loop.Stop(
adk.WithGracefulTimeout(3*time.Second),
adk.WithStopCause("service shutdown"),
)
})

result := loop.Wait()
log.Printf("Exit reason: %v", result.ExitReason)
log.Printf("Unhandled messages: %v", result.UnhandledItems)
log.Printf("Stop cause: %s", result.StopCause)
log.Printf("checkpoint: attempted=%v, err=%v", result.CheckpointAttempted, result.CheckpointErr)

// Next startup with the same cfg will automatically resume from checkpoint
func (s *InMemoryStore) Set(_ context.Context, id string, data []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
s.m[id] = data
return nil
}

// Optional: implement CheckPointDeleter to support automatic cleanup of expired checkpoints
func (s *InMemoryStore) Delete(_ context.Context, id string) error {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.m, id)
return nil
}

// --- 2. Implement a minimal Agent (use adk.NewChatModelAgent in production) ---

type echoAgent struct{}

func (a *echoAgent) Name(_ context.Context) string { return "EchoAgent" }
func (a *echoAgent) Description(_ context.Context) string { return "echoes input" }

func (a *echoAgent) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.AgentRunOption) *adk.AsyncIterator[*adk.AgentEvent] {
iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]()
go func() {
defer gen.Close()
// Simulate time-consuming processing
select {
case <-time.After(500 * time.Millisecond):
case <-ctx.Done():
gen.Send(&adk.AgentEvent{Err: ctx.Err()})
return
}
// Return echo result
reply := "Echo: "
if len(input.Messages) > 0 {
reply += input.Messages[0].Content
}
gen.Send(&adk.AgentEvent{
AgentName: "EchoAgent",
Output: &adk.AgentOutput{
MessageOutput: &adk.MessageVariant{
Message: schema.AssistantMessage(reply, nil),
},
},
})
}()
return iter
}

// --- 3. Priority sorting helper function ---

func sortByPriority(items []string) []string {
sorted := make([]string, len(items))
copy(sorted, items)
sort.SliceStable(sorted, func(i, j int) bool {
// Items starting with "!" are treated as high priority
return strings.HasPrefix(sorted[i], "!") && !strings.HasPrefix(sorted[j], "!")
})
return sorted
}

// --- 4. Main flow ---

func main() {
ctx := context.Background()
store := NewInMemoryStore()
agent := &echoAgent{}

cfg := adk.TurnLoopConfig[string, *schema.Message]{
GenInput: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], items []string) (*adk.GenInputResult[string, *schema.Message], error) {
// Sort by priority, consume only the first item, keep the rest for subsequent turns
sorted := sortByPriority(items)
return &adk.GenInputResult[string, *schema.Message]{
Input: &adk.AgentInput{Messages: []*schema.Message{schema.UserMessage(sorted[0])}},
Consumed: sorted[:1],
Remaining: sorted[1:],
}, nil
},

GenResume: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], interruptedItems, unhandledItems, newItems []string) (*adk.GenResumeResult[string, *schema.Message], error) {
all := append(append(interruptedItems, unhandledItems...), newItems...)
return &adk.GenResumeResult[string, *schema.Message]{
Consumed: all[:1],
Remaining: all[1:],
}, nil
},

PrepareAgent: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], consumed []string) (adk.Agent, error) {
return agent, nil
},

OnAgentEvents: func(ctx context.Context, tc *adk.TurnContext[string, *schema.Message], events *adk.AsyncIterator[*adk.AgentEvent]) error {
for {
event, ok := events.Next()
if !ok {
break
}
// Detect preemption/stop signals for cleanup
select {
case <-tc.Preempted:
log.Println("Preempted by higher priority message")
case <-tc.Stopped:
log.Printf("Service shutting down: %s", tc.StopCause())
default:
}
if event.Err != nil {
// Don't propagate CancelError, framework handles it automatically
return event.Err
}
if event.Output != nil && event.Output.MessageOutput != nil {
fmt.Printf("[%s] %s\n", event.AgentName, event.Output.MessageOutput.Message.Content)
}
}
return nil
},

Store: store,
CheckpointID: "session-123",
}

// First run
loop := adk.NewTurnLoop(cfg)
loop.Push("normal message")
loop.Push("low priority task")
loop.Run(ctx)

// Simulate pushing an urgent message after a delay (triggers preemption)
time.AfterFunc(200*time.Millisecond, func() {
accepted, ack := loop.Push("!urgent message",
adk.WithPreempt[string, *schema.Message](adk.AnySafePoint),
)
if accepted {
<-ack
log.Println("Preemption signal acknowledged")
}
})

// Graceful stop after 2 seconds
time.AfterFunc(2*time.Second, func() {
loop.Stop(
adk.WithGraceful(),
adk.WithStopCause("demo timeout"),
)
})

result := loop.Wait()
fmt.Printf("Exit reason: %v\n", result.ExitReason)
fmt.Printf("Stop cause: %s\n", result.StopCause)
fmt.Printf("checkpoint: attempted=%v, err=%v\n", result.CheckpointAttempted, result.CheckpointErr)

// Second run (same cfg, containing the same CheckpointID) will automatically resume from checkpoint
}
```

---
> 💡
> In production, replace `echoAgent` with `adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{...})`. The `CheckPointStore` implementation can use Redis / database or other persistence solutions.

## FAQ

Expand Down
Loading
Loading