Skip to content

Commit 08415a2

Browse files
authored
Fix disconnected stream retention (#683)
1 parent 1b0b1fd commit 08415a2

3 files changed

Lines changed: 60 additions & 3 deletions

File tree

web/src/app/api/v1/chat/completions/_post.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -874,6 +874,9 @@ export async function postChatCompletions(params: {
874874

875875
// Log detailed error information for debugging
876876
const errorDetails = openrouterError?.toJSON()
877+
const shouldRecordMessages = freebuffAccessTier !== 'limited'
878+
const { messages: _messages, ...bodyWithoutMessages } = body
879+
const telemetryBody = shouldRecordMessages ? body : bodyWithoutMessages
877880
const providerLabel = siliconflowError
878881
? 'SiliconFlow'
879882
: opencodeZenError
@@ -901,7 +904,9 @@ export async function postChatCompletions(params: {
901904
messageCount: Array.isArray(typedBody.messages)
902905
? typedBody.messages.length
903906
: 0,
904-
messages: typedBody.messages,
907+
...(shouldRecordMessages
908+
? { messages: typedBody.messages }
909+
: { messagesOmitted: true, accessTier: freebuffAccessTier }),
905910
providerStatusCode: (
906911
openrouterError ??
907912
fireworksError ??
@@ -935,7 +940,7 @@ export async function postChatCompletions(params: {
935940
userId,
936941
properties: {
937942
error: error instanceof Error ? error.message : 'Unknown error',
938-
body,
943+
body: telemetryBody,
939944
agentId,
940945
streaming: bodyStream,
941946
},

web/src/llm-api/deepseek.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ export function isDeepSeekModel(model: string): boolean {
7474
return DEEPSEEK_ROUTED_MODELS.has(model)
7575
}
7676

77+
function isDeepSeekV4FlashModel(model: string): boolean {
78+
return (
79+
model === deepseekModels.deepseekV4Flash ||
80+
model === deepseekModels.deepseekV4FlashDirect
81+
)
82+
}
83+
7784
function getDeepSeekPricing(model: string): DeepSeekPricing {
7885
const entry = DEEPSEEK_MODELS[model]
7986
if (!entry) {
@@ -279,6 +286,7 @@ export async function handleDeepSeekStream({
279286
body,
280287
logger,
281288
})
289+
const skipDisconnectedBilling = isDeepSeekV4FlashModel(body.model)
282290

283291
const response = await createDeepSeekRequest({ body, originalModel, fetch })
284292

@@ -392,13 +400,26 @@ export async function handleDeepSeekStream({
392400
cancel() {
393401
clearInterval(heartbeatInterval)
394402
clientDisconnected = true
403+
if (skipDisconnectedBilling) {
404+
reader
405+
.cancel('client disconnected from DeepSeek V4 Flash stream')
406+
.catch((error) => {
407+
logger.warn(
408+
{ error },
409+
'Failed to cancel disconnected DeepSeek V4 Flash stream',
410+
)
411+
})
412+
}
395413
logger.warn(
396414
{
397415
clientDisconnected,
398416
responseTextLength: state.responseText.length,
399417
reasoningTextLength: state.reasoningText.length,
418+
skippedBilling: skipDisconnectedBilling,
400419
},
401-
'Client cancelled stream, continuing DeepSeek consumption for billing',
420+
skipDisconnectedBilling
421+
? 'Client cancelled DeepSeek V4 Flash stream, ending without billing'
422+
: 'Client cancelled stream, continuing DeepSeek consumption for billing',
402423
)
403424
},
404425
})

web/src/llm-api/openrouter.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type StreamState = {
4040
// endpoint. OR finalizes generation records asynchronously; 500ms is enough
4141
// in practice and keeps the delay off the client response path.
4242
const GENERATION_LOOKUP_DELAY_MS = 500
43+
const DISCONNECTED_STREAM_DRAIN_TIMEOUT_MS = 2 * 60 * 1000
4344

4445
// Extended timeout for deep-thinking models (e.g., gpt-5) that can take
4546
// a long time to start streaming.
@@ -363,6 +364,7 @@ export async function handleOpenRouterStream({
363364
billed: false,
364365
}
365366
let clientDisconnected = false
367+
let disconnectedStreamDrainTimeout: NodeJS.Timeout | null = null
366368

367369
// Runs once on any stream-exit path. If we didn't bill through the normal
368370
// path (stream ended without a usage chunk, got a provider error chunk,
@@ -488,12 +490,41 @@ export async function handleOpenRouterStream({
488490
}
489491
await ensureBilled()
490492
} finally {
493+
if (disconnectedStreamDrainTimeout) {
494+
clearTimeout(disconnectedStreamDrainTimeout)
495+
}
491496
clearInterval(heartbeatInterval)
492497
}
493498
},
494499
cancel() {
495500
clearInterval(heartbeatInterval)
496501
clientDisconnected = true
502+
disconnectedStreamDrainTimeout = setTimeout(() => {
503+
const stateSummary = {
504+
clientDisconnected,
505+
responseTextLength: state.responseText.length,
506+
reasoningTextLength: state.reasoningText.length,
507+
generationId: state.generationId,
508+
billed: state.billed,
509+
}
510+
if (!state.billed && !state.generationId) {
511+
logger.warn(
512+
stateSummary,
513+
'Disconnected OpenRouter stream exceeded drain timeout before fallback billing was possible; continuing to drain',
514+
)
515+
return
516+
}
517+
logger.warn(
518+
stateSummary,
519+
'Cancelling disconnected OpenRouter stream after drain timeout',
520+
)
521+
reader.cancel('client disconnected drain timeout').catch((error) => {
522+
logger.warn(
523+
{ error },
524+
'Failed to cancel disconnected OpenRouter stream',
525+
)
526+
})
527+
}, DISCONNECTED_STREAM_DRAIN_TIMEOUT_MS)
497528
// Log truncated state to prevent OOM during logging (state can be up to 2MB)
498529
logger.warn(
499530
{

0 commit comments

Comments
 (0)