Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions pkg/tui/service/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,18 @@ func (s *Supervisor) subscribeWithRouting(ctx context.Context, a *app.App, sessi
a.SubscribeWith(ctx, send)
}

// isTopLevelStream reports whether a stream lifecycle event belongs to the
// runner's own top-level session rather than a forwarded nested sub-session
// (e.g. transfer_task or a fork-mode run_skill). Sub-session streams share
// the parent's event channel but carry a different SessionID; they must not
// toggle IsRunning or clear a pending attention event on the parent runner.
//
// An empty SessionID is treated as top-level for backward compatibility with
// emitters that omit it (matching the convention in handleTokenUsage). (#3217)
func isTopLevelStream(runnerID, evSessionID string) bool {
return evSessionID == "" || evSessionID == runnerID
}

// handleRuntimeEvent updates runner state based on runtime events.
func (s *Supervisor) handleRuntimeEvent(sessionID string, msg tea.Msg) {
s.mu.Lock()
Expand All @@ -160,15 +172,19 @@ func (s *Supervisor) handleRuntimeEvent(sessionID string, msg tea.Msg) {

switch ev := msg.(type) {
case *runtime.StreamStartedEvent:
runner.IsRunning = true
runner.PendingEvent = nil // New stream supersedes any stale pending event
s.notifyTabsUpdated()
if isTopLevelStream(runner.ID, ev.SessionID) {
runner.IsRunning = true
runner.PendingEvent = nil // New top-level stream supersedes any stale pending event
s.notifyTabsUpdated()
}

case *runtime.StreamStoppedEvent:
runner.IsRunning = false
runner.PendingEvent = nil // Clear any pending attention event since stream ended
runner.NeedsAttn = false
s.notifyTabsUpdated()
if isTopLevelStream(runner.ID, ev.SessionID) {
runner.IsRunning = false
runner.PendingEvent = nil // Clear any pending attention event since the top-level stream ended
runner.NeedsAttn = false
s.notifyTabsUpdated()
}

case *runtime.SessionTitleEvent:
runner.Title = ev.Title
Expand Down
162 changes: 162 additions & 0 deletions pkg/tui/service/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/docker/docker-agent/pkg/runtime"
"github.com/docker/docker-agent/pkg/tools"
)

func newTestSupervisor(ids []string, activeID string) *Supervisor {
Expand Down Expand Up @@ -122,3 +126,161 @@ func TestSetPendingEvent_UnknownSession(t *testing.T) {

assert.Nil(t, s.runners["A"].PendingEvent, "unrelated runner is untouched")
}

// --- #3217: session-aware stream lifecycle tests ---

// TestIsTopLevelStream covers the isTopLevelStream helper directly.
func TestIsTopLevelStream(t *testing.T) {
tests := []struct {
runnerID string
evSessionID string
want bool
}{
{runnerID: "sess-A", evSessionID: "sess-A", want: true}, // exact match → top-level
{runnerID: "sess-A", evSessionID: "", want: true}, // empty → top-level (backward compat)
{runnerID: "sess-A", evSessionID: "child-B", want: false}, // different ID → nested
{runnerID: "sess-A", evSessionID: "sess-B", want: false}, // sibling ID → nested
}
for _, tc := range tests {
got := isTopLevelStream(tc.runnerID, tc.evSessionID)
assert.Equal(t, tc.want, got,
"isTopLevelStream(%q, %q)", tc.runnerID, tc.evSessionID)
}
}

// TestStreamStarted_SubSessionDoesNotDropPendingEvent verifies that a
// StreamStartedEvent carrying a child session ID (nested sub-agent/fork-skill
// stream forwarded through the parent's event channel) does NOT wipe the
// parent runner's pending elicitation event. (#3217)
func TestStreamStarted_SubSessionDoesNotDropPendingEvent(t *testing.T) {
s := newTestSupervisor([]string{"sess-A", "sess-B"}, "sess-B") // sess-A is background

elicitation := runtime.ElicitationRequest("confirm?", "form", nil, "", "eid-1", nil, "agent")
s.runners["sess-A"].PendingEvent = elicitation
s.runners["sess-A"].NeedsAttn = true
s.runners["sess-A"].IsRunning = true // already running a top-level turn

// A nested sub-session stream starts (different SessionID).
s.handleRuntimeEvent("sess-A", &runtime.StreamStartedEvent{
Type: "stream_started",
SessionID: "child-xyz",
})

require.NotNil(t, s.runners["sess-A"].PendingEvent,
"nested StreamStarted must NOT clear the parent's pending elicitation")
assert.True(t, s.runners["sess-A"].NeedsAttn,
"nested StreamStarted must NOT clear NeedsAttn")
assert.True(t, s.runners["sess-A"].IsRunning,
"nested StreamStarted must NOT change IsRunning")
}

// TestStreamStopped_SubSessionDoesNotDropPendingEvent verifies that a
// StreamStoppedEvent from a child session does NOT clear the parent's pending
// event, NeedsAttn, or IsRunning. (#3217)
func TestStreamStopped_SubSessionDoesNotDropPendingEvent(t *testing.T) {
s := newTestSupervisor([]string{"sess-A", "sess-B"}, "sess-B")

elicitation := runtime.ElicitationRequest("confirm?", "form", nil, "", "eid-2", nil, "agent")
s.runners["sess-A"].PendingEvent = elicitation
s.runners["sess-A"].NeedsAttn = true
s.runners["sess-A"].IsRunning = true

// A nested sub-session stream stops (different SessionID).
s.handleRuntimeEvent("sess-A", &runtime.StreamStoppedEvent{
Type: "stream_stopped",
SessionID: "child-xyz",
})

require.NotNil(t, s.runners["sess-A"].PendingEvent,
"nested StreamStopped must NOT clear the parent's pending elicitation")
assert.True(t, s.runners["sess-A"].NeedsAttn,
"nested StreamStopped must NOT clear NeedsAttn")
assert.True(t, s.runners["sess-A"].IsRunning,
"nested StreamStopped must NOT flip IsRunning to false while parent is still running")
}

// TestStreamStarted_TopLevelSupersedesStalePending verifies that a top-level
// StreamStartedEvent (matching session ID) STILL clears a stale pending event
// — the original intent must be preserved. (#3217)
func TestStreamStarted_TopLevelSupersedesStalePending(t *testing.T) {
s := newTestSupervisor([]string{"sess-A"}, "sess-A")

s.runners["sess-A"].PendingEvent = runtime.ElicitationRequest(
"old?", "form", nil, "", "eid-stale", nil, "agent",
)
s.runners["sess-A"].IsRunning = false

// New top-level turn starts.
s.handleRuntimeEvent("sess-A", &runtime.StreamStartedEvent{
Type: "stream_started",
SessionID: "sess-A",
})

assert.Nil(t, s.runners["sess-A"].PendingEvent,
"top-level StreamStarted must supersede any stale pending event")
assert.True(t, s.runners["sess-A"].IsRunning,
"top-level StreamStarted must set IsRunning")
}

// TestStreamStopped_TopLevelClearsPendingAndNeedsAttn verifies that a
// top-level StreamStoppedEvent correctly clears all three fields. (#3217)
func TestStreamStopped_TopLevelClearsPendingAndNeedsAttn(t *testing.T) {
tests := []struct {
name string
pending any // tea.Msg
}{
{
name: "elicitation pending",
pending: runtime.ElicitationRequest("q?", "form", nil, "", "eid-3", nil, "agent"),
},
{
name: "tool confirmation pending",
pending: runtime.ToolCallConfirmation(tools.ToolCall{}, tools.Tool{}, "agent"),
},
{
name: "max iterations pending",
pending: runtime.MaxIterationsReached(10),
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := newTestSupervisor([]string{"sess-A"}, "sess-B")
s.runners["sess-A"].PendingEvent = tc.pending
s.runners["sess-A"].NeedsAttn = true
s.runners["sess-A"].IsRunning = true

s.handleRuntimeEvent("sess-A", &runtime.StreamStoppedEvent{
Type: "stream_stopped",
SessionID: "sess-A",
})

assert.Nil(t, s.runners["sess-A"].PendingEvent,
"top-level StreamStopped must clear PendingEvent")
assert.False(t, s.runners["sess-A"].NeedsAttn,
"top-level StreamStopped must clear NeedsAttn")
assert.False(t, s.runners["sess-A"].IsRunning,
"top-level StreamStopped must clear IsRunning")
})
}
}

// TestStreamStarted_EmptySessionID_TreatedAsTopLevel verifies that an empty
// SessionID is treated as top-level for backward compatibility with emitters
// that omit it. (#3217)
func TestStreamStarted_EmptySessionID_TreatedAsTopLevel(t *testing.T) {
s := newTestSupervisor([]string{"sess-A"}, "sess-A")

s.runners["sess-A"].PendingEvent = runtime.ElicitationRequest(
"old?", "form", nil, "", "eid-old", nil, "agent",
)

// Emitter omits SessionID (empty string).
s.handleRuntimeEvent("sess-A", &runtime.StreamStartedEvent{
Type: "stream_started",
SessionID: "",
})

assert.Nil(t, s.runners["sess-A"].PendingEvent,
"empty SessionID must be treated as top-level and supersede stale pending event")
assert.True(t, s.runners["sess-A"].IsRunning)
}
Loading