Skip to content

Commit 23a5964

Browse files
committed
Added test for handling stray output in headerPrefixPipe and refactored header parsing logic to skip non-header lines
1 parent 6f51456 commit 23a5964

2 files changed

Lines changed: 42 additions & 35 deletions

File tree

internal/execution/supervisor/adapter_rpc_pipe.go

Lines changed: 22 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -48,53 +48,40 @@ func (h *headerPrefixPipe) Read(p []byte) (int, error) {
4848
h.reader = bufio.NewReader(h.stdio)
4949
}
5050

51-
// read headers
52-
headers := ""
51+
// Scan lines until we find Content-Length:, skipping any stray output.
52+
var contentLength int
5353
for {
5454
line, err := h.reader.ReadString('\n')
5555
if err != nil {
5656
return 0, err
5757
}
58-
59-
headers += line
60-
61-
// Detect the end of headers with double CRLF
62-
if strings.HasSuffix(headers, "\r\n\r\n") {
63-
break
58+
line = strings.TrimRight(line, "\r\n")
59+
if !strings.HasPrefix(line, "Content-Length:") {
60+
continue
61+
}
62+
parts := strings.SplitN(line, ":", 2)
63+
if len(parts) != 2 {
64+
return 0, fmt.Errorf("malformed Content-Length header")
6465
}
66+
v, err := strconv.Atoi(strings.TrimSpace(parts[1]))
67+
if err != nil {
68+
return 0, fmt.Errorf("invalid Content-Length value: %s", parts[1])
69+
}
70+
contentLength = v
71+
break
6572
}
66-
headers = strings.TrimSpace(headers)
67-
68-
// get content-length value
69-
var contentLength int
70-
lines := strings.Split(headers, "\r\n")
71-
for _, line := range lines {
72-
line = strings.TrimSpace(line)
73-
74-
if strings.HasPrefix(line, "Content-Length:") {
75-
parts := strings.SplitN(line, ":", 2)
76-
if len(parts) != 2 {
77-
return 0, fmt.Errorf("malformed Content-Length header")
78-
}
7973

80-
lengthStr := strings.TrimSpace(parts[1])
81-
82-
if lengthValue, err := strconv.Atoi(lengthStr); err != nil {
83-
return 0, fmt.Errorf("invalid Content-Length value: %s", lengthStr)
84-
} else {
85-
contentLength = lengthValue
86-
}
87-
88-
// found the content-length
74+
// Drain remaining header lines until the blank separator.
75+
for {
76+
line, err := h.reader.ReadString('\n')
77+
if err != nil {
78+
return 0, err
79+
}
80+
if strings.TrimRight(line, "\r\n") == "" {
8981
break
90-
9182
}
9283
}
9384

94-
if contentLength == 0 {
95-
return 0, fmt.Errorf("Content-Length header not found or zero")
96-
}
97-
9885
// Read exactly contentLength bytes
9986
content := make([]byte, contentLength)
10087
n, err := io.ReadFull(h.reader, content)

internal/execution/supervisor/adapter_rpc_pipe_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,26 @@ func TestHeaderPrefixPipe_MultipleMessages(t *testing.T) {
107107
assert.Equal(t, msg2, readBuf[:n])
108108
}
109109

110+
func TestHeaderPrefixPipe_SkipsStrayOutputBeforeContentLength(t *testing.T) {
111+
buf := newRwc()
112+
pipe := &headerPrefixPipe{stdio: buf}
113+
114+
msg := []byte(`{"jsonrpc":"2.0","id":1,"result":"ok"}`)
115+
116+
// Stray lines an evaluation function might emit on stdout before the
117+
// server loop starts (e.g. model-loading logs, library init messages).
118+
stray := "Loading model...\nReady.\n"
119+
header := fmt.Sprintf("Content-Length: %d\r\n\r\n", len(msg))
120+
buf.(*rwc).Buffer.Write([]byte(stray))
121+
buf.(*rwc).Buffer.Write([]byte(header))
122+
buf.(*rwc).Buffer.Write(msg)
123+
124+
readBuf := make([]byte, 512)
125+
n, err := pipe.Read(readBuf)
126+
require.NoError(t, err)
127+
assert.Equal(t, msg, readBuf[:n])
128+
}
129+
110130
func TestHeaderPrefixPipe_ConcurrentReadWrite(t *testing.T) {
111131
// Use two separate pipes: one for the write direction, one for the read
112132
// direction. This mirrors the real pipe topology where read and write are

0 commit comments

Comments
 (0)