From 0770a2c9d4a4d1baa005e35a32cb9a9db1c49ca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Catriel=20M=C3=BCller?= Date: Mon, 8 Jun 2026 18:01:12 -0300 Subject: [PATCH 1/2] fix(ai-gateway): make SSE rewriting stream-safe --- .../src/app/api/openrouter/[...path]/route.ts | 5 +- .../openrouter/audio/transcriptions/route.ts | 2 +- .../app/api/openrouter/embeddings/route.ts | 2 +- .../lib/ai-gateway/handleRequestLogging.ts | 14 +- .../src/lib/ai-gateway/llm-proxy-helpers.ts | 3 + .../lib/ai-gateway/o11y/api-metrics.server.ts | 3 +- apps/web/src/lib/rewriteModelResponse.test.ts | 211 +++++++++++++ apps/web/src/lib/rewriteModelResponse.ts | 293 ++++++++---------- 8 files changed, 360 insertions(+), 173 deletions(-) diff --git a/apps/web/src/app/api/openrouter/[...path]/route.ts b/apps/web/src/app/api/openrouter/[...path]/route.ts index 8a0c54bb63..c164dbb716 100644 --- a/apps/web/src/app/api/openrouter/[...path]/route.ts +++ b/apps/web/src/app/api/openrouter/[...path]/route.ts @@ -720,7 +720,7 @@ export async function POST(request: NextRequest): Promise { - let response: string | undefined; + let responseText: string | undefined; try { - response = await clonedResponse.text(); - const error = detectToolCallArgumentErrors(response, request); + responseText = await clonedResponse.text(); + const error = detectToolCallArgumentErrors(responseText, request); const apiRequestLogId = await db .insert(api_request_log) .values({ @@ -55,7 +57,7 @@ export async function handleRequestLogging(params: { model, provider, request: request.body, - response, + response: responseText, error, }) .returning({ id: api_request_log.id }); diff --git a/apps/web/src/lib/ai-gateway/llm-proxy-helpers.ts b/apps/web/src/lib/ai-gateway/llm-proxy-helpers.ts index 14ae12088d..2261e49c2e 100644 --- a/apps/web/src/lib/ai-gateway/llm-proxy-helpers.ts +++ b/apps/web/src/lib/ai-gateway/llm-proxy-helpers.ts @@ -290,6 +290,9 @@ export function getOutputHeaders(response: Response) { } outputHeaders.set('Content-Encoding', 'identity'); // Content-Encoding: identity is here because Vercel modifies encoding/compression and causes issues + if (outputHeaders.get('content-type')?.includes('text/event-stream')) { + outputHeaders.set('Cache-Control', 'no-cache, no-transform'); + } return outputHeaders; } diff --git a/apps/web/src/lib/ai-gateway/o11y/api-metrics.server.ts b/apps/web/src/lib/ai-gateway/o11y/api-metrics.server.ts index 8f8a41ac13..f4612a40b3 100644 --- a/apps/web/src/lib/ai-gateway/o11y/api-metrics.server.ts +++ b/apps/web/src/lib/ai-gateway/o11y/api-metrics.server.ts @@ -202,12 +202,13 @@ export function emitApiMetrics(params: ApiMetricsParams) { export function emitApiMetricsForResponse( params: Omit, - responseToDrain: Response, + response: Response, requestStartedAt: number ) { if (!apiMetricsUrl) return; if (!O11Y_KILO_GATEWAY_CLIENT_SECRET) return; + const responseToDrain = response.clone(); after(async () => { let inferenceProvider: string | undefined; try { diff --git a/apps/web/src/lib/rewriteModelResponse.test.ts b/apps/web/src/lib/rewriteModelResponse.test.ts index 7dc69f99c4..ded8ebbdee 100644 --- a/apps/web/src/lib/rewriteModelResponse.test.ts +++ b/apps/web/src/lib/rewriteModelResponse.test.ts @@ -313,4 +313,215 @@ describe('rewriteFreeModelResponse_Responses', () => { expect(sse).toContain('event: response.completed'); expect(dataPayloads(sse)).toContain('[DONE]'); }); + + test('emits byte chunks that can be consumed through the Response body API', async () => { + const upstream = sseResponse( + 'event: response.completed\n' + + 'data: {"type":"response.completed","sequence_number":0,"response":{"model":"upstream-model","status":"completed"}}\n\n' + ); + + const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL); + const output = await result.text(); + + expect(dataObjects(output)).toEqual([ + { + type: 'response.completed', + sequence_number: 0, + response: { model: REWRITTEN_MODEL, status: 'completed' }, + }, + ]); + expect(result.headers.get('cache-control')).toBe('no-cache, no-transform'); + }); + + test('preserves contiguous events across one-byte chunks and split UTF-8 code points', async () => { + const events = Array.from({ length: 58 }, (_, sequenceNumber) => { + const type = sequenceNumber === 57 ? 'response.completed' : 'response.output_text.delta'; + const response = + sequenceNumber === 57 ? `,"response":{"model":"upstream-model","status":"completed"}` : ''; + return `event: ${type}\ndata: {"type":"${type}","sequence_number":${sequenceNumber},"delta":"café"${response}}\n\n`; + }).join(''); + const bytes = new TextEncoder().encode(events); + let chunkIndex = 0; + const stream = new ReadableStream({ + async pull(controller) { + if (chunkIndex === bytes.length) { + controller.close(); + return; + } + if (chunkIndex === 7 || chunkIndex === Math.floor(bytes.length / 2)) { + await new Promise(resolve => setTimeout(resolve, 1)); + } + controller.enqueue(bytes.slice(chunkIndex, chunkIndex + 1)); + chunkIndex++; + }, + }); + const upstream = new Response(stream, { + headers: { 'content-type': 'text/event-stream' }, + }); + + const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL); + const output = await result.text(); + const parsed = dataObjects(output) as Array<{ + type: string; + sequence_number: number; + delta: string; + response?: { model: string; status: string }; + }>; + + expect(parsed).toHaveLength(58); + expect(parsed.map(event => event.sequence_number)).toEqual( + Array.from({ length: 58 }, (_, index) => index) + ); + expect(parsed.every(event => event.delta === 'café')).toBe(true); + expect(parsed.at(-1)).toMatchObject({ + type: 'response.completed', + sequence_number: 57, + response: { model: REWRITTEN_MODEL, status: 'completed' }, + }); + }); + + test('forwards an empty reasoning item and a complete response without [DONE]', async () => { + const upstream = sseResponse( + 'event: response.output_item.done\n' + + 'data: {"type":"response.output_item.done","sequence_number":0,"item":{"type":"reasoning","summary":[],"encrypted_content":"encrypted"}}\n\n' + + 'event: response.completed\n' + + 'data: {"type":"response.completed","sequence_number":1,"response":{"model":"upstream-model","status":"completed"}}\n\n' + ); + + const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL); + const output = await result.text(); + const events = dataObjects(output) as Array<{ + type: string; + item?: { type: string; summary: unknown[]; encrypted_content: string }; + }>; + + expect(events[0]).toMatchObject({ + type: 'response.output_item.done', + item: { type: 'reasoning', summary: [], encrypted_content: 'encrypted' }, + }); + expect(events[1]).toMatchObject({ type: 'response.completed' }); + expect(dataPayloads(output)).not.toContain('[DONE]'); + }); + + test('propagates a source error instead of converting it to clean EOF', async () => { + const sourceError = new Error('source failed before response.completed'); + let pullCount = 0; + const stream = new ReadableStream({ + pull(controller) { + if (pullCount++ === 0) { + controller.enqueue( + new TextEncoder().encode( + 'event: response.in_progress\ndata: {"type":"response.in_progress","sequence_number":0}\n\n' + ) + ); + return; + } + controller.error(sourceError); + }, + }); + const upstream = new Response(stream, { + headers: { 'content-type': 'text/event-stream' }, + }); + + const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL); + + await expect(result.text()).rejects.toBe(sourceError); + }); + + test('propagates downstream cancellation to the upstream reader', async () => { + let cancelReason: unknown; + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode( + 'event: response.in_progress\ndata: {"type":"response.in_progress","sequence_number":0}\n\n' + ) + ); + }, + cancel(reason) { + cancelReason = reason; + }, + }); + const upstream = new Response(stream, { + headers: { 'content-type': 'text/event-stream' }, + }); + const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL); + const reader = result.body?.getReader(); + expect(reader).toBeDefined(); + await reader?.read(); + + const reason = new Error('downstream stopped'); + await reader?.cancel(reason); + await new Promise(resolve => setTimeout(resolve, 0)); + + expect(cancelReason).toBe(reason); + }); + + test('dispatches a complete final event without a trailing blank delimiter', async () => { + const upstream = sseResponse( + 'event: response.completed\n' + + 'data: {"type":"response.completed","sequence_number":0,"response":{"model":"upstream-model","status":"completed"}}' + ); + + const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL); + const output = await result.text(); + + expect(dataObjects(output)).toEqual([ + { + type: 'response.completed', + sequence_number: 0, + response: { model: REWRITTEN_MODEL, status: 'completed' }, + }, + ]); + }); + + test('preserves SSE event IDs while normalizing comments and multiline data', async () => { + const upstream = sseResponse( + ': upstream heartbeat\n\n' + + 'unknown: ignored\n' + + 'id: event-57\n' + + 'event: response.completed\n' + + 'data: {"type":\n' + + 'data: "response.completed","response":{"model":"upstream-model","status":"completed"}}\n\n' + ); + + const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL); + const output = await result.text(); + + expect(output).toContain(': KILO PROCESSING\n\n'); + expect(output).toContain('id: event-57\n'); + expect(output).toContain('event: response.completed\n'); + expect(dataObjects(output)).toEqual([ + { + type: 'response.completed', + response: { model: REWRITTEN_MODEL, status: 'completed' }, + }, + ]); + }); + + test('delivers the same complete response while concurrent clones are drained', async () => { + const body = + 'event: response.created\n' + + 'data: {"type":"response.created","sequence_number":0,"response":{"model":"upstream-model","status":"in_progress"}}\n\n' + + 'event: response.completed\n' + + 'data: {"type":"response.completed","sequence_number":1,"response":{"model":"upstream-model","status":"completed"}}\n\n'; + const upstream = sseResponse(body); + const usageClone = upstream.clone(); + const loggingClone = upstream.clone(); + + const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL); + const [callerOutput, usageOutput, loggingOutput] = await Promise.all([ + result.text(), + usageClone.text(), + loggingClone.text(), + ]); + + expect( + (dataObjects(callerOutput) as Array<{ sequence_number: number }>).map( + event => event.sequence_number + ) + ).toEqual([0, 1]); + expect(usageOutput).toBe(body); + expect(loggingOutput).toBe(body); + }); }); diff --git a/apps/web/src/lib/rewriteModelResponse.ts b/apps/web/src/lib/rewriteModelResponse.ts index d7b4ada0f7..abe529e029 100644 --- a/apps/web/src/lib/rewriteModelResponse.ts +++ b/apps/web/src/lib/rewriteModelResponse.ts @@ -21,6 +21,98 @@ function rewriteUsage(usage: OpenRouterUsage) { } } +function createRewrittenSseStream( + body: ReadableStream | null, + rewriteJson: (json: T) => void +): ReadableStream { + const reader = body?.getReader() ?? null; + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + let outputController: ReadableStreamDefaultController | null = null; + let doneReceived = false; + let cancelled = false; + let finished = false; + let released = false; + let enqueueCount = 0; + + const release = () => { + if (!reader || released) return; + released = true; + reader.releaseLock(); + }; + const enqueue = (value: string) => { + if (cancelled || !outputController) return; + outputController.enqueue(encoder.encode(value)); + enqueueCount++; + }; + const parser = createParser({ + onEvent(event: EventSourceMessage) { + if (event.data === '[DONE]') { + doneReceived = true; + return; + } + + const json = JSON.parse(event.data) as T; + rewriteJson(json); + const idLine = event.id === undefined ? '' : `id: ${event.id}\n`; + const eventLine = event.event ? `event: ${event.event}\n` : ''; + enqueue(`${idLine}${eventLine}data: ${JSON.stringify(json)}\n\n`); + }, + onComment() { + enqueue(': KILO PROCESSING\n\n'); + }, + }); + + return new ReadableStream({ + start(controller) { + outputController = controller; + if (!reader) { + finished = true; + controller.close(); + } + }, + async pull(controller) { + if (!reader || finished || cancelled) return; + + try { + const enqueueCountBeforePull = enqueueCount; + while (!finished && !cancelled && enqueueCount === enqueueCountBeforePull) { + const { done, value } = await reader.read(); + if (cancelled) return; + if (!done) { + parser.feed(decoder.decode(value, { stream: true })); + continue; + } + + finished = true; + const finalText = decoder.decode(); + if (finalText) parser.feed(finalText); + // Be permissive at EOF and dispatch a complete final data line even + // when the upstream omitted its trailing blank SSE delimiter. + parser.feed('\n\n'); + if (doneReceived) enqueue('data: [DONE]\n\n'); + release(); + controller.close(); + } + } catch (error) { + finished = true; + release(); + if (!cancelled) controller.error(error); + } + }, + cancel(reason) { + cancelled = true; + finished = true; + if (reader) { + void reader + .cancel(reason) + .catch(() => undefined) + .finally(release); + } + }, + }); +} + export async function rewriteFreeModelResponse_ChatCompletions(response: Response, model: string) { const headers = getOutputHeaders(response); @@ -55,67 +147,25 @@ export async function rewriteFreeModelResponse_ChatCompletions(response: Respons }); } - const stream = new ReadableStream({ - async start(controller) { - const reader = response.body?.getReader(); - if (!reader) { - controller.close(); - return; - } - - let doneReceived = false; - const parser = createParser({ - onEvent(event: EventSourceMessage) { - if (event.data === '[DONE]') { - doneReceived = true; - return; - } - const json = JSON.parse(event.data) as ChatCompletionChunk; - if (json.model) { - json.model = model; - } - - const delta = json.choices?.[0]?.delta; - if (delta) { - // Some APIs set null here, which is not accepted by OpenCode - if (delta?.role === null) { - delete delta.role; - } - } - - if (!json.choices) { - // Some APIs leave this out when returning usage, which is not accepted by OpenCode - json.choices = []; - } + const stream = createRewrittenSseStream(response.body, json => { + if (json.model) { + json.model = model; + } - if (json.usage) { - rewriteUsage(json.usage); - } + const delta = json.choices?.[0]?.delta; + if (delta?.role === null) { + // Some APIs set null here, which is not accepted by OpenCode + delete delta.role; + } - const eventLine = event.event ? 'event: ' + event.event + '\n' : ''; - controller.enqueue(eventLine + 'data: ' + JSON.stringify(json) + '\n\n'); - }, - onComment() { - controller.enqueue(': KILO PROCESSING\n\n'); - }, - }); + if (!json.choices) { + // Some APIs leave this out when returning usage, which is not accepted by OpenCode + json.choices = []; + } - const decoder = new TextDecoder(); - while (true) { - const { done, value } = await reader.read(); - if (done) { - // Flush any event left buffered when the stream ends without a - // trailing blank line, so its data isn't silently dropped. - parser.reset({ consume: true }); - if (doneReceived) { - controller.enqueue('data: [DONE]\n\n'); - } - controller.close(); - break; - } - parser.feed(decoder.decode(value, { stream: true })); - } - }, + if (json.usage) { + rewriteUsage(json.usage); + } }); return new NextResponse(stream, { @@ -179,67 +229,25 @@ export async function rewriteFreeModelResponse_Messages(response: Response, mode }); } - const stream = new ReadableStream({ - async start(controller) { - const reader = response.body?.getReader(); - if (!reader) { - controller.close(); - return; + const stream = createRewrittenSseStream< + MessagesApiMessageStart | MessagesApiMessageDelta | Anthropic.Messages.MessageStreamEvent + >(response.body, json => { + if (json.type === 'message_start') { + const event = json as MessagesApiMessageStart; + if (event.message.model) { + event.message.model = model; } + if (event.message.usage) { + rewriteMessagesUsage(event.message.usage); + } + } - let doneReceived = false; - const parser = createParser({ - onEvent(event: EventSourceMessage) { - if (event.data === '[DONE]') { - doneReceived = true; - return; - } - const json = JSON.parse(event.data) as - | MessagesApiMessageStart - | MessagesApiMessageDelta - | Anthropic.Messages.MessageStreamEvent; - - if (json.type === 'message_start') { - const e = json as MessagesApiMessageStart; - if (e.message.model) { - e.message.model = model; - } - if (e.message.usage) { - rewriteMessagesUsage(e.message.usage); - } - } - - if (json.type === 'message_delta') { - const e = json as MessagesApiMessageDelta; - if (e.usage) { - rewriteMessagesUsage(e.usage); - } - } - - const eventLine = event.event ? 'event: ' + event.event + '\n' : ''; - controller.enqueue(eventLine + 'data: ' + JSON.stringify(json) + '\n\n'); - }, - onComment() { - controller.enqueue(': KILO PROCESSING\n\n'); - }, - }); - - const decoder = new TextDecoder(); - while (true) { - const { done, value } = await reader.read(); - if (done) { - // Flush any event left buffered when the stream ends without a - // trailing blank line, so its data isn't silently dropped. - parser.reset({ consume: true }); - if (doneReceived) { - controller.enqueue('data: [DONE]\n\n'); - } - controller.close(); - break; - } - parser.feed(decoder.decode(value, { stream: true })); + if (json.type === 'message_delta') { + const event = json as MessagesApiMessageDelta; + if (event.usage) { + rewriteMessagesUsage(event.usage); } - }, + } }); return new NextResponse(stream, { @@ -285,54 +293,15 @@ export async function rewriteFreeModelResponse_Responses(response: Response, mod }); } - const stream = new ReadableStream({ - async start(controller) { - const reader = response.body?.getReader(); - if (!reader) { - controller.close(); - return; + const stream = createRewrittenSseStream(response.body, json => { + if (json.response) { + if (json.response.model) { + json.response.model = model; } - - let doneReceived = false; - const parser = createParser({ - onEvent(event: EventSourceMessage) { - if (event.data === '[DONE]') { - doneReceived = true; - return; - } - const json = JSON.parse(event.data) as ResponsesApiEvent; - if (json.response) { - if (json.response.model) { - json.response.model = model; - } - if (json.response.usage) { - rewriteUsage(json.response.usage); - } - } - const eventLine = event.event ? 'event: ' + event.event + '\n' : ''; - controller.enqueue(eventLine + 'data: ' + JSON.stringify(json) + '\n\n'); - }, - onComment() { - controller.enqueue(': KILO PROCESSING\n\n'); - }, - }); - - const decoder = new TextDecoder(); - while (true) { - const { done, value } = await reader.read(); - if (done) { - // Flush any event left buffered when the stream ends without a - // trailing blank line, so its data isn't silently dropped. - parser.reset({ consume: true }); - if (doneReceived) { - controller.enqueue('data: [DONE]\n\n'); - } - controller.close(); - break; - } - parser.feed(decoder.decode(value, { stream: true })); + if (json.response.usage) { + rewriteUsage(json.response.usage); } - }, + } }); return new NextResponse(stream, { From add112a6e646d4d15222b6191a328db6a23717df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Catriel=20M=C3=BCller?= Date: Mon, 8 Jun 2026 18:15:20 -0300 Subject: [PATCH 2/2] fix(ai-gateway): honor SSE rewrite backpressure --- apps/web/src/lib/rewriteModelResponse.test.ts | 20 +++++ apps/web/src/lib/rewriteModelResponse.ts | 82 ++++++++++++++----- 2 files changed, 80 insertions(+), 22 deletions(-) diff --git a/apps/web/src/lib/rewriteModelResponse.test.ts b/apps/web/src/lib/rewriteModelResponse.test.ts index ded8ebbdee..f5fb9293a0 100644 --- a/apps/web/src/lib/rewriteModelResponse.test.ts +++ b/apps/web/src/lib/rewriteModelResponse.test.ts @@ -333,6 +333,26 @@ describe('rewriteFreeModelResponse_Responses', () => { expect(result.headers.get('cache-control')).toBe('no-cache, no-transform'); }); + test('stops parsing a multi-event chunk when downstream backpressure applies', async () => { + const upstream = sseResponse( + 'event: response.in_progress\n' + + 'data: {"type":"response.in_progress","sequence_number":0}\n\n' + + 'event: response.completed\n' + + 'data: not-json\n\n' + ); + + const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL); + const reader = result.body?.getReader(); + if (!reader) throw new Error('Expected a response body'); + + const firstRead = await reader.read(); + expect(firstRead.done).toBe(false); + expect(dataObjects(new TextDecoder().decode(firstRead.value))).toEqual([ + { type: 'response.in_progress', sequence_number: 0 }, + ]); + await expect(reader.read()).rejects.toBeInstanceOf(SyntaxError); + }); + test('preserves contiguous events across one-byte chunks and split UTF-8 code points', async () => { const events = Array.from({ length: 58 }, (_, sequenceNumber) => { const type = sequenceNumber === 57 ? 'response.completed' : 'response.output_text.delta'; diff --git a/apps/web/src/lib/rewriteModelResponse.ts b/apps/web/src/lib/rewriteModelResponse.ts index abe529e029..bab330601b 100644 --- a/apps/web/src/lib/rewriteModelResponse.ts +++ b/apps/web/src/lib/rewriteModelResponse.ts @@ -28,22 +28,40 @@ function createRewrittenSseStream( const reader = body?.getReader() ?? null; const encoder = new TextEncoder(); const decoder = new TextDecoder(); - let outputController: ReadableStreamDefaultController | null = null; + const queuedOutput: Uint8Array[] = []; + let decodedInput = ''; let doneReceived = false; + let doneOutputQueued = false; + let upstreamDone = false; let cancelled = false; let finished = false; let released = false; - let enqueueCount = 0; const release = () => { if (!reader || released) return; released = true; reader.releaseLock(); }; - const enqueue = (value: string) => { - if (cancelled || !outputController) return; - outputController.enqueue(encoder.encode(value)); - enqueueCount++; + const queueOutput = (value: string) => { + if (!cancelled) queuedOutput.push(encoder.encode(value)); + }; + const takeNextInputLine = (): string | null => { + for (let index = 0; index < decodedInput.length; index++) { + const character = decodedInput[index]; + if (character === '\n') { + const line = decodedInput.slice(0, index + 1); + decodedInput = decodedInput.slice(index + 1); + return line; + } + if (character !== '\r') continue; + if (index + 1 === decodedInput.length && !upstreamDone) return null; + + const lineEnd = decodedInput[index + 1] === '\n' ? index + 2 : index + 1; + const line = decodedInput.slice(0, lineEnd); + decodedInput = decodedInput.slice(lineEnd); + return line; + } + return null; }; const parser = createParser({ onEvent(event: EventSourceMessage) { @@ -56,16 +74,15 @@ function createRewrittenSseStream( rewriteJson(json); const idLine = event.id === undefined ? '' : `id: ${event.id}\n`; const eventLine = event.event ? `event: ${event.event}\n` : ''; - enqueue(`${idLine}${eventLine}data: ${JSON.stringify(json)}\n\n`); + queueOutput(`${idLine}${eventLine}data: ${JSON.stringify(json)}\n\n`); }, onComment() { - enqueue(': KILO PROCESSING\n\n'); + queueOutput(': KILO PROCESSING\n\n'); }, }); return new ReadableStream({ start(controller) { - outputController = controller; if (!reader) { finished = true; controller.close(); @@ -75,27 +92,46 @@ function createRewrittenSseStream( if (!reader || finished || cancelled) return; try { - const enqueueCountBeforePull = enqueueCount; - while (!finished && !cancelled && enqueueCount === enqueueCountBeforePull) { - const { done, value } = await reader.read(); - if (cancelled) return; - if (!done) { - parser.feed(decoder.decode(value, { stream: true })); + while (queuedOutput.length === 0 && !finished && !cancelled) { + const nextLine = takeNextInputLine(); + if (nextLine !== null) { + parser.feed(nextLine); + continue; + } + + if (!upstreamDone) { + const { done, value } = await reader.read(); + if (cancelled) return; + if (!done) { + decodedInput += decoder.decode(value, { stream: true }); + continue; + } + + upstreamDone = true; + decodedInput += decoder.decode(); + // Be permissive at EOF and dispatch a complete final data line even + // when the upstream omitted its trailing blank SSE delimiter. + decodedInput += '\n\n'; + release(); + continue; + } + + if (doneReceived && !doneOutputQueued) { + doneOutputQueued = true; + queueOutput('data: [DONE]\n\n'); continue; } finished = true; - const finalText = decoder.decode(); - if (finalText) parser.feed(finalText); - // Be permissive at EOF and dispatch a complete final data line even - // when the upstream omitted its trailing blank SSE delimiter. - parser.feed('\n\n'); - if (doneReceived) enqueue('data: [DONE]\n\n'); - release(); controller.close(); } + + const nextOutput = queuedOutput.shift(); + if (nextOutput) controller.enqueue(nextOutput); } catch (error) { finished = true; + queuedOutput.length = 0; + decodedInput = ''; release(); if (!cancelled) controller.error(error); } @@ -103,6 +139,8 @@ function createRewrittenSseStream( cancel(reason) { cancelled = true; finished = true; + queuedOutput.length = 0; + decodedInput = ''; if (reader) { void reader .cancel(reason)