From abc7caa9d279a72de98778d2a2825ff2e73d35f9 Mon Sep 17 00:00:00 2001 From: Djordje Lukic Date: Wed, 6 May 2026 14:48:56 +0200 Subject: [PATCH] fix(compactor): correct FirstKeptEntry mapping when prior summary exists When a session already contained a summary, the kept-tail boundary returned by mapToSessionIndex was wrong. It counted non-system items from sess.Messages[0], but the filtered chat-message list it was indexing into came from sess.GetMessages(a), which prepends a synthetic 'Session Summary: ...' user message and starts iterating items at the prior summary's FirstKeptEntry. Both effects shifted the count, so the new FirstKeptEntry landed far too early in the session - typically inside the conversation the prior summary already replaced. Replace nonSystemMessages + mapToSessionIndex with a single gatherCompactionInput that walks sess.Messages directly (mirroring session.buildSessionSummaryMessages), surfaces the synthetic Session Summary message when a prior summary exists, and returns a parallel slice of origin indices. firstKeptSessionIndex maps a split index back to a sess.Messages index via that parallel slice - correct by construction regardless of prior summaries. The synthetic message maps back to the prior summary's index so the prior summary item is preserved across the new compaction when the boundary lands on it. ComputeFirstKeptEntry no longer needs the *agent.Agent parameter; updated its single call site in session_compaction.go. Adds regression coverage in compactor_test.go for the prior-summary case (with and without a non-zero FirstKeptEntry on the prior summary), and replaces TestMapToSessionIndex with the equivalent TestGatherCompactionInput_NoPriorSummary. Signed-off-by: Djordje Lukic --- pkg/compaction/compaction_test.go | 84 +++++++++++++ pkg/runtime/compactor/compactor.go | 84 +++++++------ pkg/runtime/compactor/compactor_test.go | 150 ++++++++++++++++++++++-- pkg/runtime/session_compaction.go | 2 +- pkg/session/session.go | 78 ++++++++++++ pkg/session/session_race_test.go | 25 ++++ pkg/session/session_test.go | 98 ++++++++++++++++ 7 files changed, 464 insertions(+), 57 deletions(-) diff --git a/pkg/compaction/compaction_test.go b/pkg/compaction/compaction_test.go index 535727ce8..92f7d0f10 100644 --- a/pkg/compaction/compaction_test.go +++ b/pkg/compaction/compaction_test.go @@ -227,6 +227,90 @@ func TestSplitIndexForKeep(t *testing.T) { } } +// TestSplitIndexForKeep_NeverReturnsZeroForNonEmptyInput pins the +// invariant that for any non-empty messages slice, SplitIndexForKeep +// returns a value in [1, len(messages)] — never 0. +// +// This matters because the compactor's firstKeptSessionIndex maps the +// returned split index back to a sess.Messages position, and when a +// prior summary exists, sessIndices[0] points at the prior summary +// item rather than at the start of the prior kept-tail. If 0 were +// reachable, the new FirstKeptEntry would land on the prior summary +// item and the next reconstruction would skip the prior kept-tail +// (the synthetic-summary user message inserted by +// session.buildSessionSummaryMessages would still appear, but the +// kept conversation between the prior FirstKeptEntry and the prior +// summary index would be lost). +// +// The implementation makes 0 unreachable: lastValidBoundary is +// initialised to len(messages); the only update path sets it to i +// (the current index), but that update happens AFTER the overflow +// check at iteration i, so a lastValidBoundary of 0 set at i=0 is +// never returned — the loop exits and the function falls through to +// `return len(messages)`. The overflow-path return therefore yields +// values ≥ 1, and the no-overflow path yields len(messages). +func TestSplitIndexForKeep_NeverReturnsZeroForNonEmptyInput(t *testing.T) { + t.Parallel() + + msg := func(role chat.MessageRole, content string) chat.Message { + return chat.Message{Role: role, Content: content} + } + + cases := []struct { + name string + messages []chat.Message + maxTokens int64 + }{ + { + name: "single user message that fits", + messages: []chat.Message{msg(chat.MessageRoleUser, "hi")}, + maxTokens: 1_000_000, + }, + { + name: "single user message that overflows", + messages: []chat.Message{msg(chat.MessageRoleUser, strings.Repeat("x", 10_000))}, + maxTokens: 1, + }, + { + name: "first message is user/assistant and fits, rest overflows", + messages: []chat.Message{ + msg(chat.MessageRoleUser, "u0"), + msg(chat.MessageRoleAssistant, strings.Repeat("a", 40_000)), + msg(chat.MessageRoleUser, strings.Repeat("b", 40_000)), + }, + maxTokens: 5_000, + }, + { + name: "every message is user/assistant and everything fits (returns len)", + messages: []chat.Message{ + msg(chat.MessageRoleUser, "u0"), + msg(chat.MessageRoleAssistant, "a0"), + msg(chat.MessageRoleUser, "u1"), + }, + maxTokens: 1_000_000, + }, + { + name: "first message is the synthetic Session Summary user message (the prior-summary case)", + messages: []chat.Message{ + msg(chat.MessageRoleUser, "Session Summary: "+strings.Repeat("s", 80_000)), + msg(chat.MessageRoleUser, strings.Repeat("u", 40_000)), + msg(chat.MessageRoleAssistant, strings.Repeat("a", 40_000)), + }, + maxTokens: 5_000, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got := SplitIndexForKeep(tc.messages, tc.maxTokens) + assert.NotZero(t, got, "SplitIndexForKeep must not return 0 for non-empty input (would land FirstKeptEntry on the prior summary item and drop the prior kept-tail)") + assert.GreaterOrEqual(t, got, 1) + assert.LessOrEqual(t, got, len(tc.messages)) + }) + } +} + func TestFirstIndexInBudget(t *testing.T) { t.Parallel() diff --git a/pkg/runtime/compactor/compactor.go b/pkg/runtime/compactor/compactor.go index 721edd2d7..b40773d30 100644 --- a/pkg/runtime/compactor/compactor.go +++ b/pkg/runtime/compactor/compactor.go @@ -153,24 +153,35 @@ func RunLLM(ctx context.Context, args LLMArgs) (*Result, error) { // [maxKeepTokens] window. Used by the runtime when a hook supplies // its own summary so the kept-tail policy stays consistent across // the two strategies. -func ComputeFirstKeptEntry(sess *session.Session, a *agent.Agent) int { - return mapToSessionIndex(sess, compaction.SplitIndexForKeep(nonSystemMessages(sess, a), maxKeepTokens)) +func ComputeFirstKeptEntry(sess *session.Session) int { + messages, sessIndices := gatherCompactionInput(sess) + return firstKeptSessionIndex(sess, sessIndices, compaction.SplitIndexForKeep(messages, maxKeepTokens)) } -// nonSystemMessages returns the agent-visible messages in sess with -// the system entries filtered out. Both the LLM strategy (via -// [extractMessages]) and the hook-supplied path (via -// [ComputeFirstKeptEntry]) operate on this same shape, which is also -// what [compaction.SplitIndexForKeep] expects. -func nonSystemMessages(sess *session.Session, a *agent.Agent) []chat.Message { - var messages []chat.Message - for _, msg := range sess.GetMessages(a) { - if msg.Role == chat.MessageRoleSystem { - continue - } - messages = append(messages, msg) +// gatherCompactionInput is a thin wrapper around +// [session.Session.CompactionInput] that clears compaction-specific +// fields on the returned chat messages. +// +// Cost is per-message bookkeeping already accumulated into +// sess.TotalCost(); leaving it set would double-count when the +// summarization session reports its own TotalCost back through +// [Result.Cost]. CacheControl pins a provider cache checkpoint +// (Anthropic prompt caching, etc.); pinning it inside the +// summarization sub-call would associate the cache point with the +// throwaway compaction conversation rather than the parent session. +// +// The reconstruction work — surfacing a synthetic "Session Summary" +// message when a prior summary exists, picking the right start index +// past the prior summary, and tracking origin indices in sess.Messages +// — lives on Session itself so it can run under sess.mu.RLock and stay +// race-safe against concurrent AddMessage / ApplyCompaction calls. +func gatherCompactionInput(sess *session.Session) ([]chat.Message, []int) { + messages, sessIndices := sess.CompactionInput() + for i := range messages { + messages[i].Cost = 0 + messages[i].CacheControl = false } - return messages + return messages, sessIndices } // extractMessages returns the messages to send to the compaction @@ -188,23 +199,11 @@ func nonSystemMessages(sess *session.Session, a *agent.Agent) []chat.Message { // If the conversation tail itself doesn't fit in // (contextLimit − MaxSummaryTokens − prompt-overhead), older messages // are dropped from the front of the to-compact list to make room. -func extractMessages(sess *session.Session, a *agent.Agent, contextLimit int64, additionalPrompt string) ([]chat.Message, int) { - messages := nonSystemMessages(sess, a) - // Clear Cost and CacheControl on our local copy of the conversation. - // Cost is per-message bookkeeping that's already accumulated into - // sess.TotalCost(); leaving it set would double-count when the - // summarization session reports its own TotalCost back through the - // compactor.Result.Cost field. CacheControl pins a provider cache - // checkpoint (Anthropic prompt caching, etc.); pinning it inside the - // summarization sub-call would associate the cache point with the - // throwaway compaction conversation rather than the parent session. - for i := range messages { - messages[i].Cost = 0 - messages[i].CacheControl = false - } +func extractMessages(sess *session.Session, _ *agent.Agent, contextLimit int64, additionalPrompt string) ([]chat.Message, int) { + messages, sessIndices := gatherCompactionInput(sess) splitIdx := compaction.SplitIndexForKeep(messages, maxKeepTokens) - firstKeptEntry := mapToSessionIndex(sess, splitIdx) + firstKeptEntry := firstKeptSessionIndex(sess, sessIndices, splitIdx) messages = messages[:splitIdx] systemPromptMessage := chat.Message{ @@ -238,21 +237,18 @@ func extractMessages(sess *session.Session, a *agent.Agent, contextLimit int64, return messages, firstKeptEntry } -// mapToSessionIndex maps an index in the non-system-filtered message -// list (the form [extractMessages] operates on) back to an index in -// sess.Messages. Returns len(sess.Messages) when filteredIdx is past -// the end — i.e. "compact everything; keep nothing of the tail". -func mapToSessionIndex(sess *session.Session, filteredIdx int) int { - count := 0 - for i, item := range sess.Messages { - if item.IsMessage() && item.Message.Message.Role != chat.MessageRoleSystem { - if count == filteredIdx { - return i - } - count++ - } +// firstKeptSessionIndex translates a split index produced against the +// chat-message list returned by [gatherCompactionInput] back to an +// index in sess.Messages, suitable for the new summary's +// FirstKeptEntry. Out-of-range splits map to len(sess.Messages), +// matching the "compact everything; keep nothing of the tail" +// sentinel that session.buildSessionSummaryMessages handles by +// skipping the conversation loop. +func firstKeptSessionIndex(sess *session.Session, sessIndices []int, splitIdx int) int { + if splitIdx >= len(sessIndices) { + return len(sess.Messages) } - return len(sess.Messages) + return sessIndices[splitIdx] } // toItems wraps a flat slice of chat messages into session items so a diff --git a/pkg/runtime/compactor/compactor_test.go b/pkg/runtime/compactor/compactor_test.go index bfd638415..f85f0f6db 100644 --- a/pkg/runtime/compactor/compactor_test.go +++ b/pkg/runtime/compactor/compactor_test.go @@ -179,12 +179,10 @@ func TestExtractMessages_KeepsRecentMessages(t *testing.T) { func TestComputeFirstKeptEntry(t *testing.T) { t.Parallel() - a := agent.New("test", "") - t.Run("empty session returns 0", func(t *testing.T) { t.Parallel() sess := session.New() - assert.Equal(t, 0, ComputeFirstKeptEntry(sess, a)) + assert.Equal(t, 0, ComputeFirstKeptEntry(sess)) }) t.Run("short conversation: split at end (compact everything)", func(t *testing.T) { @@ -194,11 +192,11 @@ func TestComputeFirstKeptEntry(t *testing.T) { session.NewMessageItem(&session.Message{Message: chat.Message{Role: chat.MessageRoleUser, Content: "hi"}}), session.NewMessageItem(&session.Message{Message: chat.Message{Role: chat.MessageRoleAssistant, Content: "hello"}}), })) - assert.Equal(t, len(sess.Messages), ComputeFirstKeptEntry(sess, a)) + assert.Equal(t, len(sess.Messages), ComputeFirstKeptEntry(sess)) }) } -func TestMapToSessionIndex(t *testing.T) { +func TestGatherCompactionInput_NoPriorSummary(t *testing.T) { t.Parallel() sess := session.New(session.WithMessages([]session.Item{ @@ -209,13 +207,141 @@ func TestMapToSessionIndex(t *testing.T) { session.NewMessageItem(&session.Message{Message: chat.Message{Role: chat.MessageRoleUser, Content: "u2"}}), })) - // Filtered list (no system): [u1, a1, u2] → indices 0,1,2 - // Map back to sess.Messages indices: 1, 2, 4 - assert.Equal(t, 1, mapToSessionIndex(sess, 0)) - assert.Equal(t, 2, mapToSessionIndex(sess, 1)) - assert.Equal(t, 4, mapToSessionIndex(sess, 2)) - // Past the end: returns len(sess.Messages) - assert.Equal(t, len(sess.Messages), mapToSessionIndex(sess, 3)) + messages, sessIndices := gatherCompactionInput(sess) + require.Len(t, messages, 3) + assert.Equal(t, []int{1, 2, 4}, sessIndices) + + assert.Equal(t, 1, firstKeptSessionIndex(sess, sessIndices, 0)) + assert.Equal(t, 2, firstKeptSessionIndex(sess, sessIndices, 1)) + assert.Equal(t, 4, firstKeptSessionIndex(sess, sessIndices, 2)) + // Past the end: returns len(sess.Messages) (compact-everything sentinel). + assert.Equal(t, len(sess.Messages), firstKeptSessionIndex(sess, sessIndices, 3)) +} + +// TestGatherCompactionInput_WithPriorSummary pins the regression where +// an existing summary in the history made the runtime miscompute +// FirstKeptEntry: counting non-system items from index 0 ignores both +// the synthetic "Session Summary" message that surfaces at the head of +// the chat list and the prior summary's start offset, so the kept +// boundary lands far too early in the session. +func TestGatherCompactionInput_WithPriorSummary(t *testing.T) { + t.Parallel() + + newMsgItem := func(role chat.MessageRole, content string) session.Item { + return session.NewMessageItem(&session.Message{Message: chat.Message{Role: role, Content: content}}) + } + + // Session shape: + // [0..7] : pre-compaction conversation (already summarized). + // [8..9] : kept tail of the prior compaction (FirstKeptEntry=8). + // [10] : prior summary item. + // [11..14]: post-compaction conversation. + items := []session.Item{ + newMsgItem(chat.MessageRoleUser, "u0"), + newMsgItem(chat.MessageRoleAssistant, "a0"), + newMsgItem(chat.MessageRoleUser, "u1"), + newMsgItem(chat.MessageRoleAssistant, "a1"), + newMsgItem(chat.MessageRoleUser, "u2"), + newMsgItem(chat.MessageRoleAssistant, "a2"), + newMsgItem(chat.MessageRoleUser, "u3"), + newMsgItem(chat.MessageRoleAssistant, "a3"), + newMsgItem(chat.MessageRoleUser, "u4-kept"), + newMsgItem(chat.MessageRoleAssistant, "a4-kept"), + {Summary: "prior summary", FirstKeptEntry: 8}, + newMsgItem(chat.MessageRoleUser, "u5"), + newMsgItem(chat.MessageRoleAssistant, "a5"), + newMsgItem(chat.MessageRoleUser, "u6"), + newMsgItem(chat.MessageRoleAssistant, "a6"), + } + sess := session.New(session.WithMessages(items)) + + messages, sessIndices := gatherCompactionInput(sess) + + // Expected filtered list: + // [0]: synthetic Session Summary user message (origin: prior summary at idx 10) + // [1]: items[8] (kept-tail user) + // [2]: items[9] (kept-tail assistant) + // [3]: items[11] (post-summary user) + // [4]: items[12] (post-summary assistant) + // [5]: items[13] + // [6]: items[14] + require.Len(t, messages, 7) + assert.Equal(t, chat.MessageRoleUser, messages[0].Role) + assert.Contains(t, messages[0].Content, "Session Summary: prior summary") + assert.Equal(t, []int{10, 8, 9, 11, 12, 13, 14}, sessIndices) + + // A split that keeps the last two messages should map to items[13] + // (the user message at idx 13), not to items[5] which is what the + // old count-from-zero implementation produced. + assert.Equal(t, 13, firstKeptSessionIndex(sess, sessIndices, 5)) + + // A split that keeps the entire post-summary tail (everything from + // items[8] onwards including the prior summary) maps the synthetic + // message back to its originating summary index so the prior + // summary item is preserved across the new compaction. + assert.Equal(t, 10, firstKeptSessionIndex(sess, sessIndices, 0)) + + // Out-of-range split: compact everything, keep nothing. + assert.Equal(t, len(sess.Messages), firstKeptSessionIndex(sess, sessIndices, len(messages))) +} + +// TestFirstKeptSessionIndex_SplitZeroOnEmptyInputUsesSafeSentinel +// pins the only path through which splitIdx == 0 can reach +// firstKeptSessionIndex: an empty messages list (which only happens +// for a brand-new session with no prior summary). In that case +// sessIndices is also empty and the out-of-range branch returns +// len(sess.Messages), the "compact everything; keep nothing" sentinel +// that session.buildSessionSummaryMessages safely treats as no kept +// tail. +// +// This is the safety net behind the +// SplitIndexForKeep_NeverReturnsZeroForNonEmptyInput invariant: even +// if a future change accidentally let splitIdx==0 escape from a +// non-empty SplitIndexForKeep call, the bot's concern ("sessIndices[0] +// = lastSummaryIdx is returned, dropping the prior kept-tail in the +// next reconstruction") only triggers when sessIndices is non-empty +// AND splitIdx==0 — which the invariant rules out and this test pins +// the empty-input alternative for. +func TestFirstKeptSessionIndex_SplitZeroOnEmptyInputUsesSafeSentinel(t *testing.T) { + t.Parallel() + + sess := session.New() + var sessIndices []int + + // Empty input is the only legitimate way splitIdx==0 reaches + // firstKeptSessionIndex. Both branches (>= len(sessIndices) and + // the indexed lookup) must yield len(sess.Messages) here. + assert.Equal(t, len(sess.Messages), firstKeptSessionIndex(sess, sessIndices, 0)) +} + +// TestGatherCompactionInput_PriorSummaryWithoutFirstKeptEntry covers +// the case where a prior summary was applied as "compact everything, +// keep nothing" (FirstKeptEntry left at zero): the iteration must +// start strictly after the summary item, not from the top of the +// session. +func TestGatherCompactionInput_PriorSummaryWithoutFirstKeptEntry(t *testing.T) { + t.Parallel() + + newMsgItem := func(role chat.MessageRole, content string) session.Item { + return session.NewMessageItem(&session.Message{Message: chat.Message{Role: role, Content: content}}) + } + + items := []session.Item{ + newMsgItem(chat.MessageRoleUser, "old"), + newMsgItem(chat.MessageRoleAssistant, "old-reply"), + {Summary: "prior summary"}, + newMsgItem(chat.MessageRoleUser, "new"), + newMsgItem(chat.MessageRoleAssistant, "new-reply"), + } + sess := session.New(session.WithMessages(items)) + + messages, sessIndices := gatherCompactionInput(sess) + + // Filtered list: synthetic-summary, items[3], items[4]. + // items[0..1] are excluded because they were compacted into the + // prior summary and FirstKeptEntry is zero. + require.Len(t, messages, 3) + assert.Equal(t, []int{2, 3, 4}, sessIndices) } func TestRunLLM_DoesNotDuplicateSystemPrompt(t *testing.T) { diff --git a/pkg/runtime/session_compaction.go b/pkg/runtime/session_compaction.go index 8a9c33cd9..e9f5e822d 100644 --- a/pkg/runtime/session_compaction.go +++ b/pkg/runtime/session_compaction.go @@ -148,7 +148,7 @@ func summaryFromHook(sess *session.Session, a *agent.Agent, pre *hooks.Result) * "session_id", sess.ID, "agent", a.Name(), "summary_length", len(pre.Summary)) return &compactor.Result{ Summary: pre.Summary, - FirstKeptEntry: compactor.ComputeFirstKeptEntry(sess, a), + FirstKeptEntry: compactor.ComputeFirstKeptEntry(sess), // Estimate the summary's token count for session bookkeeping; // no LLM was called so Cost stays at the zero value. InputTokens: compaction.EstimateMessageTokens(&chat.Message{ diff --git a/pkg/session/session.go b/pkg/session/session.go index cd2585cbf..ff50ebdeb 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -910,6 +910,84 @@ func buildSessionSummaryMessages(items []Item) ([]chat.Message, int) { return messages, startIndex } +// CompactionInput returns the chat messages that the compactor should +// summarize together with their origin indices in s.Messages. The +// returned messages are independent copies safe for the caller to +// mutate (cloned via snapshotItems); the parallel sessIndices slice +// maps each entry back to its source item so the caller can compute a +// FirstKeptEntry that survives prior summaries in the history. +// +// When the session contains a prior summary, the result begins with a +// synthetic "Session Summary: ..." user message whose origin index is +// the prior summary item itself; subsequent entries are the prior +// kept-tail and the post-summary conversation, mirroring what +// buildSessionSummaryMessages produces for the runtime. System +// messages stored on the session are filtered out (the compactor +// supplies its own system/user prompt around this list). +// +// This method intentionally bypasses GetMessages's agent-level +// transformations — invariant system prompts, NumHistoryItems +// trimming, old-tool-content truncation, whitespace normalization, +// orphan-tool-call sanitization, and cache_control marking. None of +// those belong in compaction input: the compactor needs the full, +// untrimmed history (so the LLM can summarize what trimming would +// have hidden), supplies its own system/user prompt, and runs through +// a sub-runtime that re-applies sanitization on its own session. +// +// All work is performed under s.mu.RLock via snapshotItems, so this +// method is safe to call concurrently with AddMessage / ApplyCompaction +// on the same session. +func (s *Session) CompactionInput() ([]chat.Message, []int) { + items := s.snapshotItems() + + lastSummaryIndex := -1 + for i := len(items) - 1; i >= 0; i-- { + if items[i].Summary != "" { + lastSummaryIndex = i + break + } + } + + var ( + messages []chat.Message + sessIndices []int + ) + + if lastSummaryIndex >= 0 { + messages = append(messages, chat.Message{ + Role: chat.MessageRoleUser, + Content: "Session Summary: " + items[lastSummaryIndex].Summary, + CreatedAt: nowFn().Format(time.RFC3339), + }) + // The synthetic message stands in for the prior summary item; + // when this index lands inside the kept tail we want the + // summary item itself preserved so the next compaction round + // still sees it via buildSessionSummaryMessages. + sessIndices = append(sessIndices, lastSummaryIndex) + } + + startIndex := lastSummaryIndex + 1 + if lastSummaryIndex >= 0 { + kept := items[lastSummaryIndex].FirstKeptEntry + if kept > 0 && kept < lastSummaryIndex { + startIndex = kept + } + } + + for i := startIndex; i < len(items); i++ { + if !items[i].IsMessage() { + continue + } + msg := items[i].Message.Message + if msg.Role == chat.MessageRoleSystem { + continue + } + messages = append(messages, msg) + sessIndices = append(sessIndices, i) + } + return messages, sessIndices +} + func (s *Session) GetMessages(a *agent.Agent, extraSystemMessages ...chat.Message) []chat.Message { slog.Debug("Getting messages for agent", "agent", a.Name(), "session_id", s.ID) diff --git a/pkg/session/session_race_test.go b/pkg/session/session_race_test.go index 17ed004b3..705b36153 100644 --- a/pkg/session/session_race_test.go +++ b/pkg/session/session_race_test.go @@ -20,3 +20,28 @@ func TestAddMessageUsageRecordConcurrent(t *testing.T) { t.Errorf("expected 100 records, got %d", got) } } + +// TestCompactionInputConcurrent pins the data-race fix for the +// compactor: CompactionInput must read s.Messages under s.mu (via +// snapshotItems) so it stays safe against concurrent AddMessage and +// ApplyCompaction calls. Run with -race; without the lock the slice +// header read aliases the live backing array and the race detector +// flags the AddMessage append. +func TestCompactionInputConcurrent(t *testing.T) { + s := New() + var wg sync.WaitGroup + for range 100 { + wg.Go(func() { + s.AddMessage(&Message{Message: chat.Message{Role: chat.MessageRoleUser, Content: "u"}}) + }) + wg.Go(func() { + _, _ = s.CompactionInput() + }) + } + // One concurrent ApplyCompaction-shaped write to exercise the same + // lock from a writer that also bumps the cumulative token counts. + wg.Go(func() { + s.ApplyCompaction(0, 0, Item{Summary: "snap"}) + }) + wg.Wait() +} diff --git a/pkg/session/session_test.go b/pkg/session/session_test.go index 9212598e9..db46ba017 100644 --- a/pkg/session/session_test.go +++ b/pkg/session/session_test.go @@ -706,3 +706,101 @@ func TestNormalizeMessageContent(t *testing.T) { }) } } + +func TestCompactionInput(t *testing.T) { + t.Parallel() + + newMsg := func(role chat.MessageRole, content string) Item { + return NewMessageItem(&Message{Message: chat.Message{Role: role, Content: content}}) + } + + t.Run("empty session returns empty", func(t *testing.T) { + t.Parallel() + sess := New() + messages, sessIndices := sess.CompactionInput() + assert.Empty(t, messages) + assert.Empty(t, sessIndices) + }) + + t.Run("system messages on the session are filtered out", func(t *testing.T) { + t.Parallel() + sess := New(WithMessages([]Item{ + newMsg(chat.MessageRoleSystem, "sys"), + newMsg(chat.MessageRoleUser, "u1"), + newMsg(chat.MessageRoleAssistant, "a1"), + newMsg(chat.MessageRoleSystem, "sys2"), + newMsg(chat.MessageRoleUser, "u2"), + })) + + messages, sessIndices := sess.CompactionInput() + require.Len(t, messages, 3) + assert.Equal(t, []int{1, 2, 4}, sessIndices) + assert.Equal(t, "u1", messages[0].Content) + assert.Equal(t, "a1", messages[1].Content) + assert.Equal(t, "u2", messages[2].Content) + }) + + t.Run("prior summary surfaces synthetic message and starts at FirstKeptEntry", func(t *testing.T) { + t.Parallel() + items := []Item{ + newMsg(chat.MessageRoleUser, "u0"), + newMsg(chat.MessageRoleAssistant, "a0"), + newMsg(chat.MessageRoleUser, "u1-kept"), + newMsg(chat.MessageRoleAssistant, "a1-kept"), + {Summary: "prior summary", FirstKeptEntry: 2}, + newMsg(chat.MessageRoleUser, "u2"), + newMsg(chat.MessageRoleAssistant, "a2"), + } + sess := New(WithMessages(items)) + + messages, sessIndices := sess.CompactionInput() + + require.Len(t, messages, 5) + assert.Equal(t, chat.MessageRoleUser, messages[0].Role) + assert.Contains(t, messages[0].Content, "Session Summary: prior summary") + // The synthetic message maps back to the prior summary item; the + // kept-tail then resumes at the prior FirstKeptEntry, skipping + // the (non-message) summary item itself. + assert.Equal(t, []int{4, 2, 3, 5, 6}, sessIndices) + }) + + t.Run("prior summary without FirstKeptEntry starts strictly after the summary", func(t *testing.T) { + t.Parallel() + items := []Item{ + newMsg(chat.MessageRoleUser, "old"), + newMsg(chat.MessageRoleAssistant, "old-reply"), + {Summary: "prior summary"}, + newMsg(chat.MessageRoleUser, "new"), + newMsg(chat.MessageRoleAssistant, "new-reply"), + } + sess := New(WithMessages(items)) + + messages, sessIndices := sess.CompactionInput() + + require.Len(t, messages, 3) + assert.Equal(t, []int{2, 3, 4}, sessIndices) + }) + + t.Run("returned messages are independent copies safe to mutate", func(t *testing.T) { + t.Parallel() + sess := New(WithMessages([]Item{ + NewMessageItem(&Message{Message: chat.Message{ + Role: chat.MessageRoleUser, + Content: "hello", + Cost: 1.5, + CacheControl: true, + }}), + })) + + messages, _ := sess.CompactionInput() + require.Len(t, messages, 1) + + messages[0].Cost = 0 + messages[0].CacheControl = false + messages[0].Content = "mutated" + + assert.InDelta(t, 1.5, sess.Messages[0].Message.Message.Cost, 0) + assert.True(t, sess.Messages[0].Message.Message.CacheControl) + assert.Equal(t, "hello", sess.Messages[0].Message.Message.Content) + }) +}