Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions apps/web/src/app/api/openrouter/[...path]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ export async function POST(request: NextRequest): Promise<NextResponseType<unkno
ttfbMs,
statusCode: response.status,
},
response.clone(),
response,
requestStartedAt
);
usageContext.status_code = response.status;
Expand Down Expand Up @@ -773,7 +773,7 @@ export async function POST(request: NextRequest): Promise<NextResponseType<unkno
accountForMicrodollarUsage(clonedReponse, usageContext, openrouterRequestSpan);

await handleRequestLogging({
clonedResponse: response.clone(),
response,
user: maybeUser,
organization_id: organizationId || null,
provider: effectiveProviderContext.provider.id,
Expand Down Expand Up @@ -801,6 +801,7 @@ export async function POST(request: NextRequest): Promise<NextResponseType<unkno
effectiveProviderContext.provider.id,
requestBodyParsed.kind
);

if (rewrittenResponse) {
return rewrittenResponse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ export async function POST(request: NextRequest): Promise<NextResponseType<unkno
ttfbMs,
statusCode: response.status,
},
response.clone(),
response,
requestStartedAt
);

Expand Down
2 changes: 1 addition & 1 deletion apps/web/src/app/api/openrouter/embeddings/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ export async function POST(request: NextRequest): Promise<NextResponseType<unkno
ttfbMs,
statusCode: response.status,
},
response.clone(),
response,
requestStartedAt
);

Expand Down
14 changes: 8 additions & 6 deletions apps/web/src/lib/ai-gateway/handleRequestLogging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,25 @@ async function isLoggingEnabledForUser(
}

export async function handleRequestLogging(params: {
clonedResponse: Response;
response: Response;
user: User | null;
organization_id: string | null;
session_id: string | null;
provider: string;
model: string;
request: GatewayRequest;
}) {
const { clonedResponse, user, organization_id, session_id, provider, model, request } = params;
const { response, user, organization_id, session_id, provider, model, request } = params;
if (!(await isLoggingEnabledForUser(user, organization_id))) {
return;
}

const clonedResponse = response.clone();
after(async () => {
let response: string | undefined;
let responseText: string | undefined;
try {
response = await clonedResponse.text();
const error = detectToolCallArgumentErrors(response, request);
responseText = await clonedResponse.text();
const error = detectToolCallArgumentErrors(responseText, request);
const apiRequestLogId = await db
.insert(api_request_log)
.values({
Expand All @@ -55,7 +57,7 @@ export async function handleRequestLogging(params: {
model,
provider,
request: request.body,
response,
response: responseText,
error,
})
.returning({ id: api_request_log.id });
Expand Down
3 changes: 3 additions & 0 deletions apps/web/src/lib/ai-gateway/llm-proxy-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ export function getOutputHeaders(response: Response) {
}
outputHeaders.set('Content-Encoding', 'identity');
// Content-Encoding: identity is here because Vercel modifies encoding/compression and causes issues
if (outputHeaders.get('content-type')?.includes('text/event-stream')) {
outputHeaders.set('Cache-Control', 'no-cache, no-transform');
}

return outputHeaders;
}
Expand Down
3 changes: 2 additions & 1 deletion apps/web/src/lib/ai-gateway/o11y/api-metrics.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,13 @@ export function emitApiMetrics(params: ApiMetricsParams) {

export function emitApiMetricsForResponse(
params: Omit<ApiMetricsParams, 'clientSecret' | 'completeRequestMs'>,
responseToDrain: Response,
response: Response,
requestStartedAt: number
) {
if (!apiMetricsUrl) return;
if (!O11Y_KILO_GATEWAY_CLIENT_SECRET) return;

const responseToDrain = response.clone();
after(async () => {
let inferenceProvider: string | undefined;
try {
Expand Down
231 changes: 231 additions & 0 deletions apps/web/src/lib/rewriteModelResponse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,4 +313,235 @@ describe('rewriteFreeModelResponse_Responses', () => {
expect(sse).toContain('event: response.completed');
expect(dataPayloads(sse)).toContain('[DONE]');
});

test('emits byte chunks that can be consumed through the Response body API', async () => {
const upstream = sseResponse(
'event: response.completed\n' +
'data: {"type":"response.completed","sequence_number":0,"response":{"model":"upstream-model","status":"completed"}}\n\n'
);

const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL);
const output = await result.text();

expect(dataObjects(output)).toEqual([
{
type: 'response.completed',
sequence_number: 0,
response: { model: REWRITTEN_MODEL, status: 'completed' },
},
]);
expect(result.headers.get('cache-control')).toBe('no-cache, no-transform');
});

test('stops parsing a multi-event chunk when downstream backpressure applies', async () => {
const upstream = sseResponse(
'event: response.in_progress\n' +
'data: {"type":"response.in_progress","sequence_number":0}\n\n' +
'event: response.completed\n' +
'data: not-json\n\n'
);

const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL);
const reader = result.body?.getReader();
if (!reader) throw new Error('Expected a response body');

const firstRead = await reader.read();
expect(firstRead.done).toBe(false);
expect(dataObjects(new TextDecoder().decode(firstRead.value))).toEqual([
{ type: 'response.in_progress', sequence_number: 0 },
]);
await expect(reader.read()).rejects.toBeInstanceOf(SyntaxError);
});

test('preserves contiguous events across one-byte chunks and split UTF-8 code points', async () => {
const events = Array.from({ length: 58 }, (_, sequenceNumber) => {
const type = sequenceNumber === 57 ? 'response.completed' : 'response.output_text.delta';
const response =
sequenceNumber === 57 ? `,"response":{"model":"upstream-model","status":"completed"}` : '';
return `event: ${type}\ndata: {"type":"${type}","sequence_number":${sequenceNumber},"delta":"café"${response}}\n\n`;
}).join('');
const bytes = new TextEncoder().encode(events);
let chunkIndex = 0;
const stream = new ReadableStream<Uint8Array>({
async pull(controller) {
if (chunkIndex === bytes.length) {
controller.close();
return;
}
if (chunkIndex === 7 || chunkIndex === Math.floor(bytes.length / 2)) {
await new Promise(resolve => setTimeout(resolve, 1));
}
controller.enqueue(bytes.slice(chunkIndex, chunkIndex + 1));
chunkIndex++;
},
});
const upstream = new Response(stream, {
headers: { 'content-type': 'text/event-stream' },
});

const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL);
const output = await result.text();
const parsed = dataObjects(output) as Array<{
type: string;
sequence_number: number;
delta: string;
response?: { model: string; status: string };
}>;

expect(parsed).toHaveLength(58);
expect(parsed.map(event => event.sequence_number)).toEqual(
Array.from({ length: 58 }, (_, index) => index)
);
expect(parsed.every(event => event.delta === 'café')).toBe(true);
expect(parsed.at(-1)).toMatchObject({
type: 'response.completed',
sequence_number: 57,
response: { model: REWRITTEN_MODEL, status: 'completed' },
});
});

test('forwards an empty reasoning item and a complete response without [DONE]', async () => {
const upstream = sseResponse(
'event: response.output_item.done\n' +
'data: {"type":"response.output_item.done","sequence_number":0,"item":{"type":"reasoning","summary":[],"encrypted_content":"encrypted"}}\n\n' +
'event: response.completed\n' +
'data: {"type":"response.completed","sequence_number":1,"response":{"model":"upstream-model","status":"completed"}}\n\n'
);

const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL);
const output = await result.text();
const events = dataObjects(output) as Array<{
type: string;
item?: { type: string; summary: unknown[]; encrypted_content: string };
}>;

expect(events[0]).toMatchObject({
type: 'response.output_item.done',
item: { type: 'reasoning', summary: [], encrypted_content: 'encrypted' },
});
expect(events[1]).toMatchObject({ type: 'response.completed' });
expect(dataPayloads(output)).not.toContain('[DONE]');
});

test('propagates a source error instead of converting it to clean EOF', async () => {
const sourceError = new Error('source failed before response.completed');
let pullCount = 0;
const stream = new ReadableStream<Uint8Array>({
pull(controller) {
if (pullCount++ === 0) {
controller.enqueue(
new TextEncoder().encode(
'event: response.in_progress\ndata: {"type":"response.in_progress","sequence_number":0}\n\n'
)
);
return;
}
controller.error(sourceError);
},
});
const upstream = new Response(stream, {
headers: { 'content-type': 'text/event-stream' },
});

const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL);

await expect(result.text()).rejects.toBe(sourceError);
});

test('propagates downstream cancellation to the upstream reader', async () => {
let cancelReason: unknown;
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(
new TextEncoder().encode(
'event: response.in_progress\ndata: {"type":"response.in_progress","sequence_number":0}\n\n'
)
);
},
cancel(reason) {
cancelReason = reason;
},
});
const upstream = new Response(stream, {
headers: { 'content-type': 'text/event-stream' },
});
const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL);
const reader = result.body?.getReader();
expect(reader).toBeDefined();
await reader?.read();

const reason = new Error('downstream stopped');
await reader?.cancel(reason);
await new Promise(resolve => setTimeout(resolve, 0));

expect(cancelReason).toBe(reason);
});

test('dispatches a complete final event without a trailing blank delimiter', async () => {
const upstream = sseResponse(
'event: response.completed\n' +
'data: {"type":"response.completed","sequence_number":0,"response":{"model":"upstream-model","status":"completed"}}'
);

const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL);
const output = await result.text();

expect(dataObjects(output)).toEqual([
{
type: 'response.completed',
sequence_number: 0,
response: { model: REWRITTEN_MODEL, status: 'completed' },
},
]);
});

test('preserves SSE event IDs while normalizing comments and multiline data', async () => {
const upstream = sseResponse(
': upstream heartbeat\n\n' +
'unknown: ignored\n' +
'id: event-57\n' +
'event: response.completed\n' +
'data: {"type":\n' +
'data: "response.completed","response":{"model":"upstream-model","status":"completed"}}\n\n'
);

const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL);
const output = await result.text();

expect(output).toContain(': KILO PROCESSING\n\n');
expect(output).toContain('id: event-57\n');
expect(output).toContain('event: response.completed\n');
expect(dataObjects(output)).toEqual([
{
type: 'response.completed',
response: { model: REWRITTEN_MODEL, status: 'completed' },
},
]);
});

test('delivers the same complete response while concurrent clones are drained', async () => {
const body =
'event: response.created\n' +
'data: {"type":"response.created","sequence_number":0,"response":{"model":"upstream-model","status":"in_progress"}}\n\n' +
'event: response.completed\n' +
'data: {"type":"response.completed","sequence_number":1,"response":{"model":"upstream-model","status":"completed"}}\n\n';
const upstream = sseResponse(body);
const usageClone = upstream.clone();
const loggingClone = upstream.clone();

const result = await rewriteFreeModelResponse_Responses(upstream, REWRITTEN_MODEL);
const [callerOutput, usageOutput, loggingOutput] = await Promise.all([
result.text(),
usageClone.text(),
loggingClone.text(),
]);

expect(
(dataObjects(callerOutput) as Array<{ sequence_number: number }>).map(
event => event.sequence_number
)
).toEqual([0, 1]);
expect(usageOutput).toBe(body);
expect(loggingOutput).toBe(body);
});
});
Loading