diff --git a/packages/db/src/schema.ts b/packages/db/src/schema.ts index 1a15a85ac1..fbf5400ce6 100644 --- a/packages/db/src/schema.ts +++ b/packages/db/src/schema.ts @@ -4361,6 +4361,7 @@ export type CloudAgentSessionRunFailureCode = | 'assistant_error' | 'wrapper_error_after_activity' | 'missing_assistant_reply' + | 'payment_required' | 'user_interrupt' | 'container_shutdown' | 'system_interrupt' diff --git a/packages/worker-utils/package.json b/packages/worker-utils/package.json index a8cb9ea1b3..1f8620ac9e 100644 --- a/packages/worker-utils/package.json +++ b/packages/worker-utils/package.json @@ -23,7 +23,8 @@ "./security-auto-analysis-policy": "./src/security-auto-analysis-policy.ts", "./security-remediation-policy": "./src/security-remediation-policy.ts", "./security-notification-policy": "./src/security-notification-policy.ts", - "./dependabot-dismissal-target": "./src/dependabot-dismissal-target.ts" + "./dependabot-dismissal-target": "./src/dependabot-dismissal-target.ts", + "./client-error": "./src/client-error.ts" }, "scripts": { "test": "vitest run", diff --git a/packages/worker-utils/src/client-error.test.ts b/packages/worker-utils/src/client-error.test.ts new file mode 100644 index 0000000000..95f6b2ef4d --- /dev/null +++ b/packages/worker-utils/src/client-error.test.ts @@ -0,0 +1,26 @@ +import { describe, expect, it } from 'vitest'; + +import { ClientErrorSchema, PublicErrorCodeSchema } from './client-error.js'; + +describe('ClientErrorSchema', () => { + it('accepts the public client error wire contract', () => { + expect( + ClientErrorSchema.parse({ + code: 'PENDING_QUEUE_FULL', + message: 'Queue is full', + retryable: true, + }) + ).toEqual({ + code: 'PENDING_QUEUE_FULL', + message: 'Queue is full', + retryable: true, + }); + }); + + it.each(['', 'lowercase', '_PRIVATE', '9INVALID', 'HAS-DASH'])( + 'rejects invalid code %j', + code => { + expect(PublicErrorCodeSchema.safeParse(code).success).toBe(false); + } + ); +}); diff --git a/packages/worker-utils/src/client-error.ts b/packages/worker-utils/src/client-error.ts new file mode 100644 index 0000000000..8117f2e98e --- /dev/null +++ b/packages/worker-utils/src/client-error.ts @@ -0,0 +1,11 @@ +import { z } from 'zod'; + +export const PublicErrorCodeSchema = z.string().regex(/^[A-Z][A-Z0-9_]*$/); + +export const ClientErrorSchema = z.object({ + code: PublicErrorCodeSchema, + message: z.string(), + retryable: z.boolean(), +}); + +export type ClientError = z.infer; diff --git a/packages/worker-utils/src/cloud-agent-failure.ts b/packages/worker-utils/src/cloud-agent-failure.ts index f3d05c3659..23e08ffc1d 100644 --- a/packages/worker-utils/src/cloud-agent-failure.ts +++ b/packages/worker-utils/src/cloud-agent-failure.ts @@ -27,6 +27,7 @@ export const CLOUD_AGENT_FAILURE_CODES = [ 'assistant_error', 'wrapper_error_after_activity', 'missing_assistant_reply', + 'payment_required', 'user_interrupt', 'container_shutdown', 'system_interrupt', diff --git a/services/cloud-agent-next/src/callbacks/types.ts b/services/cloud-agent-next/src/callbacks/types.ts index 24e32d3e50..b187434017 100644 --- a/services/cloud-agent-next/src/callbacks/types.ts +++ b/services/cloud-agent-next/src/callbacks/types.ts @@ -1,3 +1,5 @@ +import type { CloudAgentFailureStage } from '@kilocode/worker-utils/cloud-agent-failure'; +import type { ClientError } from '@kilocode/worker-utils/client-error'; import type { SafeFailureProjection } from '../session/safe-failure-projection.js'; export type CallbackTarget = { @@ -20,6 +22,8 @@ export type ExecutionCallbackPayload = { status: 'completed' | 'failed' | 'interrupted'; errorMessage?: string; failure?: SafeFailureProjection; + failureStage?: CloudAgentFailureStage; + clientError?: ClientError; /** Present when errorMessage was shortened to fit the callback queue. */ errorMessageTruncation?: CallbackTextTruncation; lastSeenBranch?: string; diff --git a/services/cloud-agent-next/src/middleware/auth.test.ts b/services/cloud-agent-next/src/middleware/auth.test.ts new file mode 100644 index 0000000000..1588097625 --- /dev/null +++ b/services/cloud-agent-next/src/middleware/auth.test.ts @@ -0,0 +1,64 @@ +import { Hono } from 'hono'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import type { HonoContext } from '../hono-context.js'; +import type { Env } from '../types.js'; + +vi.mock('../auth.js', () => ({ + validateKiloToken: vi.fn(), +})); + +vi.mock('../logger.js', () => { + const logger = { + setTags: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + withFields: vi.fn(), + }; + logger.withFields.mockReturnValue(logger); + return { + logger, + withLogTags: async (_tags: unknown, fn: () => Promise) => fn(), + WithLogTags: + () => + ( + _target: unknown, + _propertyKey: string, + descriptor: PropertyDescriptor + ): PropertyDescriptor => + descriptor, + }; +}); + +const { authMiddleware } = await import('./auth.js'); +const { validateKiloToken } = await import('../auth.js'); + +describe('authMiddleware', () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.mocked(validateKiloToken).mockResolvedValue({ success: false, error: 'Invalid token' }); + }); + + it('returns a non-retryable unauthorized client error without changing message or path', async () => { + const app = new Hono(); + app.use('/trpc/*', authMiddleware); + app.post('/trpc/:procedure', c => c.json({ ok: true })); + + const response = await app.fetch( + new Request('https://worker.test/trpc/send', { method: 'POST' }), + { NEXTAUTH_SECRET: 'secret' } as Env + ); + const body: any = await response.json(); + + expect(response.status).toBe(401); + expect(body.error.message).toBe('Invalid token'); + expect(body.error.data).toMatchObject({ + path: 'send', + clientError: { + code: 'UNAUTHORIZED', + message: 'Invalid token', + retryable: false, + }, + }); + }); +}); diff --git a/services/cloud-agent-next/src/middleware/balance.test.ts b/services/cloud-agent-next/src/middleware/balance.test.ts index a47a7d9207..f9beeb80cd 100644 --- a/services/cloud-agent-next/src/middleware/balance.test.ts +++ b/services/cloud-agent-next/src/middleware/balance.test.ts @@ -58,6 +58,40 @@ describe('balanceMiddleware', () => { ); } + it('returns non-retryable clientError for insufficient credits', async () => { + vi.mocked(validateBalanceOnly).mockResolvedValue({ + success: false, + status: 402, + message: 'Insufficient credits', + }); + + const response = await postTrpc('start', {}); + const body: any = await response.json(); + + expect(body.error.data.clientError).toEqual({ + code: 'PAYMENT_REQUIRED', + message: 'Insufficient credits', + retryable: false, + }); + }); + + it('returns retryable clientError for balance infrastructure failures', async () => { + vi.mocked(validateBalanceOnly).mockResolvedValue({ + success: false, + status: 500, + message: 'Failed to verify balance', + }); + + const response = await postTrpc('start', {}); + const body: any = await response.json(); + + expect(body.error.data.clientError).toEqual({ + code: 'INTERNAL_SERVER_ERROR', + message: 'Failed to verify balance', + retryable: true, + }); + }); + it('uses nested start organization context for balance validation', async () => { const orgId = '11111111-2222-3333-4444-555555555555'; diff --git a/services/cloud-agent-next/src/persistence/CloudAgentSession.ts b/services/cloud-agent-next/src/persistence/CloudAgentSession.ts index 7e97bf3e0b..3c4e9fb5b9 100644 --- a/services/cloud-agent-next/src/persistence/CloudAgentSession.ts +++ b/services/cloud-agent-next/src/persistence/CloudAgentSession.ts @@ -15,6 +15,7 @@ import { import { readProfileBundle, type SessionProfileBundle } from '../session-profile.js'; import { fitCallbackJobToQueueLimit } from '../callbacks/queue-payload.js'; import type { CallbackJob, CallbackTarget } from '../callbacks/index.js'; +import { projectTerminalClientError } from '../session/terminal-error-projector.js'; import { drizzle } from 'drizzle-orm/durable-sqlite'; import { logger } from '../logger.js'; import { BUILTIN_AGENT_MODES, Limits } from '../schema.js'; @@ -345,6 +346,9 @@ export class CloudAgentSession extends DurableObject { executionId: execution.executionId, status, errorMessage: error, + ...(status === 'completed' + ? {} + : { clientError: projectTerminalClientError({ status, error }) }), lastSeenBranch: metadata.repository?.upstreamBranch, kiloSessionId: metadata.auth.kiloSessionId, gateResult, diff --git a/services/cloud-agent-next/src/router/auth.test.ts b/services/cloud-agent-next/src/router/auth.test.ts new file mode 100644 index 0000000000..5f60dd340f --- /dev/null +++ b/services/cloud-agent-next/src/router/auth.test.ts @@ -0,0 +1,95 @@ +import { describe, expect, it } from 'vitest'; +import { TRPCError } from '@trpc/server'; +import { fetchRequestHandler } from '@trpc/server/adapters/fetch'; + +import type { TRPCContext } from '../types.js'; +import { t } from './auth.js'; + +const testRouter = t.router({ + invalid: t.procedure.query(() => { + throw new TRPCError({ code: 'BAD_REQUEST', message: 'Invalid request' }); + }), + unavailable: t.procedure.query(() => { + throw new TRPCError({ + code: 'SERVICE_UNAVAILABLE', + message: 'Sandbox unavailable', + cause: { error: 'SANDBOX_CONNECT_FAILED', retryable: true }, + }); + }), + internal: t.procedure.query(() => { + throw new TRPCError({ code: 'INTERNAL_SERVER_ERROR', message: 'Internal failure' }); + }), +}); + +async function requestProcedure(procedure: 'invalid' | 'unavailable' | 'internal') { + return fetchRequestHandler({ + endpoint: '/trpc', + req: new Request(`http://localhost/trpc/${procedure}`), + router: testRouter, + createContext: () => ({}) as TRPCContext, + }); +} + +describe('tRPC client error formatter', () => { + it('adds a non-retryable client error to known request failures', async () => { + const response = await requestProcedure('invalid'); + + await expect(response.json()).resolves.toMatchObject({ + error: { + message: 'Invalid request', + data: { + code: 'BAD_REQUEST', + httpStatus: 400, + path: 'invalid', + clientError: { + code: 'BAD_REQUEST', + message: 'Invalid request', + retryable: false, + }, + }, + }, + }); + }); + + it('preserves explicit legacy retry fields beside the client error', async () => { + const response = await requestProcedure('unavailable'); + + await expect(response.json()).resolves.toMatchObject({ + error: { + message: 'Sandbox unavailable', + data: { + code: 'SERVICE_UNAVAILABLE', + httpStatus: 503, + path: 'unavailable', + error: 'SANDBOX_CONNECT_FAILED', + retryable: true, + clientError: { + code: 'SANDBOX_CONNECT_FAILED', + message: 'Sandbox unavailable', + retryable: true, + }, + }, + }, + }); + }); + + it('defaults generic internal failures to retryable', async () => { + const response = await requestProcedure('internal'); + + await expect(response.json()).resolves.toMatchObject({ + error: { + message: 'Internal failure', + data: { + code: 'INTERNAL_SERVER_ERROR', + httpStatus: 500, + path: 'internal', + clientError: { + code: 'INTERNAL_SERVER_ERROR', + message: 'Internal failure', + retryable: true, + }, + }, + }, + }); + }); +}); diff --git a/services/cloud-agent-next/src/router/auth.ts b/services/cloud-agent-next/src/router/auth.ts index d310f76fb6..6cf8e8035b 100644 --- a/services/cloud-agent-next/src/router/auth.ts +++ b/services/cloud-agent-next/src/router/auth.ts @@ -1,34 +1,15 @@ import { initTRPC, TRPCError } from '@trpc/server'; import { timingSafeEqual } from '@kilocode/encryption'; import type { TRPCContext } from '../types.js'; - -/** - * Type for error cause data that should be surfaced in the response. - * Used for structured 409 Conflict and 503 Retryable errors. - */ -type ErrorCauseData = { - error?: string; - message?: string; - retryable?: boolean; -}; +import { projectTrpcErrorData } from '../trpc-error.js'; // Initialize tRPC with context and error formatter export const t = initTRPC.context().create({ errorFormatter({ shape, error }) { - // Surface cause data in the response for specific error types - const causeData = error.cause as ErrorCauseData | undefined; - if (causeData && typeof causeData === 'object') { - return { - ...shape, - data: { - ...shape.data, - // Include structured error info from cause - ...(causeData.error && { error: causeData.error }), - ...(causeData.retryable !== undefined && { retryable: causeData.retryable }), - }, - }; - } - return shape; + return { + ...shape, + data: projectTrpcErrorData(shape.data, shape.message, error.cause), + }; }, }); diff --git a/services/cloud-agent-next/src/router/schemas.test.ts b/services/cloud-agent-next/src/router/schemas.test.ts index 2cfb745aa1..9b20c435c3 100644 --- a/services/cloud-agent-next/src/router/schemas.test.ts +++ b/services/cloud-agent-next/src/router/schemas.test.ts @@ -387,9 +387,31 @@ describe('getMessageResult contract', () => { acceptedAt: 3, terminalAt: 4, completionSource: 'wrapper_failure', - failure: { stage: 'agent_activity', code: 'assistant_error', attempts: 2 }, + failure: { + stage: 'agent_activity', + code: 'assistant_error', + attempts: 2, + retryable: false, + }, + }).success + ).toBe(true); + }); + + it('requires retryability on failed and interrupted failure details', () => { + expect( + GetMessageResultOutput.safeParse({ + ...baseOutput, + status: 'interrupted', + failure: { retryable: true }, }).success ).toBe(true); + expect( + GetMessageResultOutput.safeParse({ + ...baseOutput, + status: 'failed', + failure: { code: 'assistant_error' }, + }).success + ).toBe(false); }); it('fails closed on contradictory lifecycle result fields', () => { @@ -397,7 +419,7 @@ describe('getMessageResult contract', () => { { ...baseOutput, status: 'queued', acceptedAt: 2 }, { ...baseOutput, status: 'queued', terminalAt: 2 }, { ...baseOutput, status: 'running', completionSource: 'assistant_message_event' }, - { ...baseOutput, status: 'queued', failure: { attempts: 1 } }, + { ...baseOutput, status: 'queued', failure: { attempts: 1, retryable: true } }, { ...baseOutput, status: 'failed', assistant: { messageId: 'assistant_1', text: 'wrong' } }, { ...baseOutput, status: 'interrupted', gateResult: 'fail' }, ]) { diff --git a/services/cloud-agent-next/src/router/schemas.ts b/services/cloud-agent-next/src/router/schemas.ts index 8e91d6e8f7..03e7428737 100644 --- a/services/cloud-agent-next/src/router/schemas.ts +++ b/services/cloud-agent-next/src/router/schemas.ts @@ -870,7 +870,7 @@ export const GetMessageResultOutput = z acceptedAt: z.number().optional(), terminalAt: z.number().optional(), completionSource: SessionMessageCompletionSourceSchema.optional(), - failure: SafeFailureProjectionSchema.optional(), + failure: SafeFailureProjectionSchema.extend({ retryable: z.boolean() }).optional(), gateResult: z.enum(['pass', 'fail']).optional(), assistant: z .object({ diff --git a/services/cloud-agent-next/src/session/message-result.test.ts b/services/cloud-agent-next/src/session/message-result.test.ts index d0daba7bc1..6c969aa8ed 100644 --- a/services/cloud-agent-next/src/session/message-result.test.ts +++ b/services/cloud-agent-next/src/session/message-result.test.ts @@ -246,8 +246,35 @@ describe('resolveSessionMessageResult', () => { subtype: 'git_clone_timeout', attempts: 2, message: 'Repository clone timed out: Clone exceeded the safe deadline', + retryable: true, }, }, }); }); + + it('projects retryable failure details for legacy terminal state without classification', async () => { + const storage = createFakeStorage(); + await putSessionMessageState( + storage, + lifecycleState(messageA, { + status: 'interrupted', + terminalAt: 4, + completionSource: 'interrupt', + error: 'private raw error', + }) + ); + + await expect(resolveSessionMessageResult(storage, messageA)).resolves.toEqual({ + type: 'found', + result: { + messageId: messageA, + status: 'interrupted', + createdAt: 1, + queuedAt: 1, + terminalAt: 4, + completionSource: 'interrupt', + failure: { retryable: true }, + }, + }); + }); }); diff --git a/services/cloud-agent-next/src/session/message-result.ts b/services/cloud-agent-next/src/session/message-result.ts index a4c0c9be79..b79590ae6c 100644 --- a/services/cloud-agent-next/src/session/message-result.ts +++ b/services/cloud-agent-next/src/session/message-result.ts @@ -10,6 +10,7 @@ import { type SessionMessageStorage, } from './session-message-state.js'; import { projectSafeFailure, type SafeFailureProjection } from './safe-failure-projection.js'; +import { isTerminalFailureRetryable } from './terminal-error-projector.js'; export type SafeMessageResult = { messageId: string; @@ -19,7 +20,7 @@ export type SafeMessageResult = { acceptedAt?: number; terminalAt?: number; completionSource?: SessionMessageCompletionSource; - failure?: SafeFailureProjection; + failure?: SafeFailureProjection & { retryable: boolean }; gateResult?: 'pass' | 'fail'; }; @@ -45,8 +46,19 @@ type ResolvedSessionMessageResult = type MessageResultStorage = SessionMessageStorage & SessionQueueStorage; +function projectFailure(state: SessionMessageState): SafeMessageResult['failure'] { + if (state.status !== 'failed' && state.status !== 'interrupted') return undefined; + return { + ...projectSafeFailure(state), + retryable: isTerminalFailureRetryable({ + failureStage: state.failureStage, + failureCode: state.failureCode, + }), + }; +} + function projectLifecycleState(state: SessionMessageState): ResolvedSessionMessageResult { - const failure = projectSafeFailure(state); + const failure = projectFailure(state); const assistantLookup: AssistantLookup | undefined = state.status === 'completed' && state.assistantMessageId ? { diff --git a/services/cloud-agent-next/src/session/message-settlement-outbox.test.ts b/services/cloud-agent-next/src/session/message-settlement-outbox.test.ts index d7bcea7990..0dd8ff86e0 100644 --- a/services/cloud-agent-next/src/session/message-settlement-outbox.test.ts +++ b/services/cloud-agent-next/src/session/message-settlement-outbox.test.ts @@ -617,10 +617,33 @@ describe('MessageSettlementOutbox', () => { attempts: 2, message: 'Repository clone timed out: Clone exceeded the safe deadline', }, + failureStage: 'pre_dispatch', + clientError: { + code: 'WORKSPACE_SETUP_FAILED', + message: 'Repository clone timed out: Clone exceeded the safe deadline', + retryable: true, + }, }); expect(JSON.stringify(harness.callbackJobs[0])).not.toContain('token=secret'); }); + it('omits clientError from completed callback jobs', async () => { + const harness = createHarness(); + await putSessionMessageState( + harness.storage, + acceptedMessageState(firstMessageId, { url: 'https://example.com/completed' }) + ); + + await harness.outbox.terminalizeSessionMessageOnce(firstMessageId, { + kind: 'completed', + completionSource: 'assistant_message_event', + }); + + expect(harness.callbackJobs).toHaveLength(1); + expect(harness.callbackJobs[0].payload.clientError).toBeUndefined(); + expect(harness.callbackJobs[0].payload.failureStage).toBeUndefined(); + }); + it('finalizes a terminal wrapper-run callback while the next run remains pending', async () => { const harness = createHarness(); await putSessionMessageState( diff --git a/services/cloud-agent-next/src/session/message-settlement-outbox.ts b/services/cloud-agent-next/src/session/message-settlement-outbox.ts index 3dded0c3b8..640490407d 100644 --- a/services/cloud-agent-next/src/session/message-settlement-outbox.ts +++ b/services/cloud-agent-next/src/session/message-settlement-outbox.ts @@ -19,6 +19,7 @@ import { type TerminalizeParams, } from './session-message-state.js'; import { projectSafeFailure, type SafeFailureProjection } from './safe-failure-projection.js'; +import { projectTerminalClientError } from './terminal-error-projector.js'; import type { AssistantMessagePart, LatestAssistantMessage } from './types.js'; const CURRENT_IDLE_BATCH_CALLBACK_KEY = 'idle_batch_callback_current'; @@ -449,6 +450,17 @@ export function createMessageSettlementOutbox( status, errorMessage: legacyErrorMessage, failure, + ...(status === 'completed' + ? {} + : { + failureStage: state.failureStage, + clientError: projectTerminalClientError({ + status, + failureStage: state.failureStage, + failureCode: state.failureCode, + error: failure?.message ?? legacyErrorMessage, + }), + }), lastSeenBranch: metadata?.repository?.upstreamBranch, kiloSessionId: metadata?.auth.kiloSessionId, gateResult: state.gateResult, diff --git a/services/cloud-agent-next/src/session/queue-message.test.ts b/services/cloud-agent-next/src/session/queue-message.test.ts index cc4adc9515..ece53dc395 100644 --- a/services/cloud-agent-next/src/session/queue-message.test.ts +++ b/services/cloud-agent-next/src/session/queue-message.test.ts @@ -210,30 +210,57 @@ describe('queueMessage', () => { ).rejects.toMatchObject({ code: 'BAD_REQUEST', message: 'nope' }); }); - it('maps PENDING_QUEUE_FULL to TOO_MANY_REQUESTS', async () => { + it('maps PENDING_QUEUE_FULL to a retryable TOO_MANY_REQUESTS error', async () => { const { stub } = makeDoStub({ success: false, code: 'PENDING_QUEUE_FULL', error: 'full' }); - await expect( - queueMessage( - { - cloudAgentSessionId: 'agent_x' as SessionId, - turn: { type: 'prompt', prompt: 'x' }, - }, - { env: makeEnv(stub) as Env, userId: 'u' } - ) - ).rejects.toMatchObject({ code: 'TOO_MANY_REQUESTS', message: 'full' }); + const error = await queueMessage( + { + cloudAgentSessionId: 'agent_x' as SessionId, + turn: { type: 'prompt', prompt: 'x' }, + }, + { env: makeEnv(stub) as Env, userId: 'u' } + ).catch(error => error); + + expect(error).toMatchObject({ + code: 'TOO_MANY_REQUESTS', + message: 'full', + cause: { error: 'PENDING_QUEUE_FULL', retryable: true }, + }); }); - it('maps INTERNAL to 500', async () => { + it('maps INTERNAL to an explicitly retryable 500 error', async () => { const { stub } = makeDoStub({ success: false, code: 'INTERNAL', error: 'boom' }); - await expect( - queueMessage( - { - cloudAgentSessionId: 'agent_x' as SessionId, - turn: { type: 'prompt', prompt: 'x' }, - }, - { env: makeEnv(stub) as Env, userId: 'u' } - ) - ).rejects.toMatchObject({ code: 'INTERNAL_SERVER_ERROR', message: 'boom' }); + const error = await queueMessage( + { + cloudAgentSessionId: 'agent_x' as SessionId, + turn: { type: 'prompt', prompt: 'x' }, + }, + { env: makeEnv(stub) as Env, userId: 'u' } + ).catch(error => error); + + expect(error).toMatchObject({ + code: 'INTERNAL_SERVER_ERROR', + message: 'boom', + cause: { error: 'INTERNAL', retryable: true }, + }); + }); + + it.each([ + ['NOT_FOUND', 'NOT_FOUND'], + ['BAD_REQUEST', 'BAD_REQUEST'], + ] as const)('marks permanent %s admission errors non-retryable', async (resultCode, trpcCode) => { + const { stub } = makeDoStub({ success: false, code: resultCode, error: 'permanent' }); + const error = await queueMessage( + { + cloudAgentSessionId: 'agent_x' as SessionId, + turn: { type: 'prompt', prompt: 'x' }, + }, + { env: makeEnv(stub) as Env, userId: 'u' } + ).catch(error => error); + + expect(error).toMatchObject({ + code: trpcCode, + cause: { error: resultCode, retryable: false }, + }); }); it('maps retryable SANDBOX_CONNECT_FAILED to SERVICE_UNAVAILABLE with retryable cause', async () => { diff --git a/services/cloud-agent-next/src/session/queue-message.ts b/services/cloud-agent-next/src/session/queue-message.ts index 2a3acbe955..e42a07e063 100644 --- a/services/cloud-agent-next/src/session/queue-message.ts +++ b/services/cloud-agent-next/src/session/queue-message.ts @@ -35,37 +35,38 @@ function isRetryableCode(code: string): code is RetryableResultCode { return RETRYABLE_CODES.includes(code as RetryableResultCode); } -type NonRetryableCode = Exclude< - Extract['code'], - RetryableResultCode ->; +type AdmissionFailureCode = Extract['code']; +type NonTransientExecutionCode = Exclude; type TRPCCodeName = ConstructorParameters[0]['code']; -const PERMANENT_CODE_TO_TRPC: Record = { +const ADMISSION_CODE_TO_TRPC: Record = { NOT_FOUND: 'NOT_FOUND', BAD_REQUEST: 'BAD_REQUEST', PENDING_QUEUE_FULL: 'TOO_MANY_REQUESTS', INTERNAL: 'INTERNAL_SERVER_ERROR', }; +function isAdmissionFailureRetryable(code: AdmissionFailureCode): boolean { + return isRetryableCode(code) || code === 'PENDING_QUEUE_FULL' || code === 'INTERNAL'; +} + export function throwAdmissionError( result: Extract ): never { - if (isRetryableCode(result.code)) { - throw new TRPCError({ - code: 'SERVICE_UNAVAILABLE', + const explicitlyRetryable = isAdmissionFailureRetryable(result.code); + const code = isRetryableCode(result.code) + ? 'SERVICE_UNAVAILABLE' + : (ADMISSION_CODE_TO_TRPC[result.code] ?? 'INTERNAL_SERVER_ERROR'); + throw new TRPCError({ + code, + message: result.error, + cause: { + error: result.code, message: result.error, - cause: { - error: result.code, - message: result.error, - retryable: true, - }, - }); - } - - const code = PERMANENT_CODE_TO_TRPC[result.code] ?? 'INTERNAL_SERVER_ERROR'; - throw new TRPCError({ code, message: result.error }); + retryable: explicitlyRetryable, + }, + }); } export type QueueMessageInput = { @@ -170,7 +171,7 @@ export async function queueMessage( sessionId, userId: ctx.userId, resultCode: result.code, - retryable: isRetryableCode(result.code), + retryable: isAdmissionFailureRetryable(result.code), }) .warn('Cloud-agent Durable Object rejected message admission request'); throwAdmissionError(result); diff --git a/services/cloud-agent-next/src/session/safe-failure-projection.ts b/services/cloud-agent-next/src/session/safe-failure-projection.ts index 9ca3851535..52e604b295 100644 --- a/services/cloud-agent-next/src/session/safe-failure-projection.ts +++ b/services/cloud-agent-next/src/session/safe-failure-projection.ts @@ -38,6 +38,7 @@ const GENERIC_FAILURE_MESSAGES = { assistant_error: 'Assistant request failed', wrapper_error_after_activity: 'Agent wrapper failed while processing the message', missing_assistant_reply: 'No assistant reply was produced', + payment_required: 'Assistant request failed: insufficient credits', user_interrupt: 'The message was interrupted by the user', container_shutdown: 'The agent container shut down', system_interrupt: 'The message was interrupted', diff --git a/services/cloud-agent-next/src/session/terminal-error-projector.test.ts b/services/cloud-agent-next/src/session/terminal-error-projector.test.ts new file mode 100644 index 0000000000..f5f7459830 --- /dev/null +++ b/services/cloud-agent-next/src/session/terminal-error-projector.test.ts @@ -0,0 +1,77 @@ +import { describe, expect, it } from 'vitest'; +import { projectTerminalClientError } from './terminal-error-projector.js'; + +describe('projectTerminalClientError', () => { + it('keeps runtime failures retryable after agent activity', () => { + expect( + projectTerminalClientError({ + status: 'failed', + failureStage: 'agent_activity', + failureCode: 'wrapper_error_after_activity', + error: 'Provider stopped after editing files', + }) + ).toEqual({ + code: 'WRAPPER_ERROR_AFTER_ACTIVITY', + message: 'Provider stopped after editing files', + retryable: true, + }); + }); + + it.each([ + 'invalid_delivery_request', + 'session_metadata_missing', + 'model_missing', + 'payment_required', + 'user_interrupt', + ] as const)('classifies %s as non-retryable regardless of stage', failureCode => { + expect( + projectTerminalClientError({ + status: 'failed', + failureStage: 'pre_dispatch', + failureCode, + }).retryable + ).toBe(false); + }); + + it.each(['assistant_error', 'wrapper_error_after_activity'] as const)( + 'classifies %s as retryable after agent activity', + failureCode => { + expect( + projectTerminalClientError({ + status: 'failed', + failureStage: 'agent_activity', + failureCode, + }).retryable + ).toBe(true); + } + ); + + it('defaults missing and unknown classifications to retryable', () => { + expect(projectTerminalClientError({ status: 'failed' })).toEqual({ + code: 'EXECUTION_FAILED', + message: 'Execution failed', + retryable: true, + }); + expect( + projectTerminalClientError({ + status: 'interrupted', + failureStage: 'unknown', + failureCode: 'unclassified', + }) + ).toEqual({ + code: 'UNCLASSIFIED', + message: 'Execution interrupted', + retryable: true, + }); + }); + + it('does not classify errors from their message text', () => { + expect( + projectTerminalClientError({ status: 'failed', error: 'assistant_error user_interrupt' }) + ).toEqual({ + code: 'EXECUTION_FAILED', + message: 'assistant_error user_interrupt', + retryable: true, + }); + }); +}); diff --git a/services/cloud-agent-next/src/session/terminal-error-projector.ts b/services/cloud-agent-next/src/session/terminal-error-projector.ts new file mode 100644 index 0000000000..7d0b0c9773 --- /dev/null +++ b/services/cloud-agent-next/src/session/terminal-error-projector.ts @@ -0,0 +1,40 @@ +import type { ClientError } from '@kilocode/worker-utils/client-error'; +import type { + SessionMessageFailureCode, + SessionMessageFailureStage, +} from './session-message-state.js'; + +const NON_RETRYABLE_FAILURE_CODES = new Set([ + 'invalid_delivery_request', + 'session_metadata_missing', + 'model_missing', + 'payment_required', + 'user_interrupt', +]); + +type TerminalFailureClassification = { + failureStage?: SessionMessageFailureStage; + failureCode?: SessionMessageFailureCode; +}; + +type TerminalErrorProjectionInput = TerminalFailureClassification & { + status: 'failed' | 'interrupted'; + error?: string; +}; + +export function isTerminalFailureRetryable(input: TerminalFailureClassification): boolean { + return input.failureCode === undefined || !NON_RETRYABLE_FAILURE_CODES.has(input.failureCode); +} + +export function projectTerminalClientError(input: TerminalErrorProjectionInput): ClientError { + const fallback = + input.status === 'failed' + ? { code: 'EXECUTION_FAILED', message: 'Execution failed' } + : { code: 'EXECUTION_INTERRUPTED', message: 'Execution interrupted' }; + + return { + code: input.failureCode?.toUpperCase() ?? fallback.code, + message: input.error || fallback.message, + retryable: isTerminalFailureRetryable(input), + }; +} diff --git a/services/cloud-agent-next/src/session/wrapper-supervisor.test.ts b/services/cloud-agent-next/src/session/wrapper-supervisor.test.ts index 05530266d6..3c608855f6 100644 --- a/services/cloud-agent-next/src/session/wrapper-supervisor.test.ts +++ b/services/cloud-agent-next/src/session/wrapper-supervisor.test.ts @@ -736,6 +736,30 @@ describe('WrapperSupervisor', () => { } ); + it.each([ + { failureCode: 'payment_required' as const, error: 'Insufficient credits' }, + { failureCode: 'model_missing' as const, error: 'Model not found' }, + ])('preserves $failureCode after agent activity', async failure => { + const harness = createHarness([liveRuntimeState(), OWNED_WRAPPER_LEASE]); + await putSessionMessageState(harness.storage, { + ...acceptedMessage(), + agentActivityObservedAt: 9_000, + }); + + await harness.supervisor.onTerminalEvent({ + wrapperRunId: WRAPPER_RUN_ID, + status: 'failed', + errorSource: 'assistant', + ...failure, + }); + + await expect(getSessionMessageState(harness.storage, MESSAGE_ID)).resolves.toMatchObject({ + status: 'failed', + failureStage: 'agent_activity', + failureCode: failure.failureCode, + }); + }); + it('persists physical stop obligation before reading messages for a failed terminal event', async () => { const storageRef: { current?: MemoryStorage } = {}; let observedStopBeforeEffects = false; diff --git a/services/cloud-agent-next/src/session/wrapper-supervisor.ts b/services/cloud-agent-next/src/session/wrapper-supervisor.ts index 5869fe919e..86714056fd 100644 --- a/services/cloud-agent-next/src/session/wrapper-supervisor.ts +++ b/services/cloud-agent-next/src/session/wrapper-supervisor.ts @@ -18,6 +18,7 @@ import { type SessionMessageState, type SessionMessageStorage, } from './session-message-state.js'; +import type { WrapperTerminalFailureCode } from '../shared/protocol.js'; import type { LatestAssistantMessage } from './types.js'; import { clearCurrentWrapperRuntimeFailureState, @@ -90,6 +91,7 @@ export type WrapperTerminalEvent = { error?: string; errorSource?: 'assistant'; interruptionSource?: 'container_shutdown'; + failureCode?: WrapperTerminalFailureCode; gateResult?: 'pass' | 'fail'; messageIds?: string[]; }; @@ -926,8 +928,16 @@ export function createWrapperSupervisor( } async function onTerminalEvent(params: WrapperTerminalEvent): Promise { - const { wrapperRunId, status, error, errorSource, interruptionSource, gateResult, messageIds } = - params; + const { + wrapperRunId, + status, + error, + errorSource, + interruptionSource, + failureCode: terminalFailureCode, + gateResult, + messageIds, + } = params; const sessionId = getSessionIdForLogs(); const state = await getWrapperRuntimeState(storage); if ( @@ -971,7 +981,7 @@ export function createWrapperSupervisor( error: error ?? 'Assistant request failed', completionSource: 'wrapper_failure', failureStage: 'agent_activity', - failureCode: 'assistant_error', + failureCode: terminalFailureCode ?? 'assistant_error', safeFailureMessage: classifyAssistantFailureMessage(error), }); continue; @@ -984,9 +994,9 @@ export function createWrapperSupervisor( error: error ?? 'Wrapper error', completionSource: 'wrapper_failure', failureStage: activityObserved ? 'agent_activity' : 'post_dispatch_no_activity', - failureCode: activityObserved - ? 'wrapper_error_after_activity' - : 'wrapper_error_before_activity', + failureCode: + terminalFailureCode ?? + (activityObserved ? 'wrapper_error_after_activity' : 'wrapper_error_before_activity'), }); continue; } diff --git a/services/cloud-agent-next/src/shared/protocol.ts b/services/cloud-agent-next/src/shared/protocol.ts index cae3a92484..cdc690df59 100644 --- a/services/cloud-agent-next/src/shared/protocol.ts +++ b/services/cloud-agent-next/src/shared/protocol.ts @@ -19,7 +19,7 @@ export type StreamEventType = | 'status' // Status message (e.g., "Auto-committing...") | 'heartbeat' // Keep-alive during idle periods | 'pong' // Response to ping command from DO - | 'error' // Error occurred { error: string, fatal: boolean, errorSource?: 'assistant' } + | 'error' // Error occurred { error: string, fatal: boolean, errorSource?: 'assistant', failureCode?: WrapperTerminalFailureCode } | 'interrupted' // User/signal interrupt { reason?: string, interruptionSource?: 'container_shutdown' } | 'complete' // Execution finished { exitCode, currentBranch?, messageIds } | 'wrapper_finalizing' // Wrapper sealed the current run batch before post-processing @@ -51,6 +51,9 @@ export type IngestEvent = { data: unknown; }; +export const WrapperTerminalFailureCodes = ['payment_required', 'model_missing'] as const; +export type WrapperTerminalFailureCode = (typeof WrapperTerminalFailureCodes)[number]; + /** * Commands sent from DO to wrapper via /ingest WebSocket. */ diff --git a/services/cloud-agent-next/src/telemetry/queue-reports.test.ts b/services/cloud-agent-next/src/telemetry/queue-reports.test.ts index 3a92a6a447..3fb2afb8f3 100644 --- a/services/cloud-agent-next/src/telemetry/queue-reports.test.ts +++ b/services/cloud-agent-next/src/telemetry/queue-reports.test.ts @@ -80,6 +80,28 @@ describe('Cloud Agent report emitter', () => { expect(JSON.stringify(reports)).not.toContain('model/test'); }); + it.each([ + ['agent_activity', 'payment_required', 'assistant_error'], + ['agent_activity', 'model_missing', 'assistant_error'], + ['post_dispatch_no_activity', 'payment_required', 'wrapper_error_before_activity'], + ['post_dispatch_no_activity', 'model_missing', 'wrapper_error_before_activity'], + ] as const)( + 'maps %s/%s to persisted failure code %s', + async (failureStage, failureCode, expectedFailureCode) => { + const reports: CloudAgentQueueReport[] = []; + await emitRunStateReport({ + queue: { send: async report => void reports.push(report) }, + cloudAgentSessionId: 'agent_report', + state: { ...state, failureStage, failureCode }, + }); + + expect(reports[0]?.run).toMatchObject({ + failureStage, + failureCode: expectedFailureCode, + }); + } + ); + it.each(WORKSPACE_FAILURE_DIAGNOSTICS)( 'emits the allowlisted diagnostic for workspace subtype %s', async (failureSubtype, expectedDiagnostic) => { diff --git a/services/cloud-agent-next/src/telemetry/queue-reports.ts b/services/cloud-agent-next/src/telemetry/queue-reports.ts index 727600b23b..e3716c6d88 100644 --- a/services/cloud-agent-next/src/telemetry/queue-reports.ts +++ b/services/cloud-agent-next/src/telemetry/queue-reports.ts @@ -51,6 +51,15 @@ function timestamp(value: number): string { return new Date(value).toISOString(); } +function persistedFailureCode(state: SessionMessageState): SessionMessageState['failureCode'] { + if (state.failureCode !== 'model_missing' && state.failureCode !== 'payment_required') { + return state.failureCode; + } + if (state.failureStage === 'agent_activity') return 'assistant_error'; + if (state.failureStage === 'post_dispatch_no_activity') return 'wrapper_error_before_activity'; + return state.failureCode; +} + function isKnownInsufficientCreditFailure(state: SessionMessageState): boolean { if ( state.failureCode !== 'assistant_error' && @@ -124,7 +133,8 @@ export async function emitRunStateReport(params: { const { state } = params; const observedDispatchAcceptedAt = state.dispatchAcceptanceKind === 'observed' ? state.acceptedAt : undefined; - const diagnostic = diagnosticForFailedRun(state); + const failureCode = persistedFailureCode(state); + const diagnostic = diagnosticForFailedRun({ ...state, failureCode }); const report: CloudAgentRunStateReport = { version: 1, type: 'run.state', @@ -143,7 +153,7 @@ export async function emitRunStateReport(params: { : { agentActivityObservedAt: timestamp(state.agentActivityObservedAt) }), ...(state.terminalAt === undefined ? {} : { terminalAt: timestamp(state.terminalAt) }), ...(state.failureStage === undefined ? {} : { failureStage: state.failureStage }), - ...(state.failureCode === undefined ? {} : { failureCode: state.failureCode }), + ...(failureCode === undefined ? {} : { failureCode }), ...(diagnostic === undefined ? {} : { diagnostic }), }, }; diff --git a/services/cloud-agent-next/src/trpc-error.test.ts b/services/cloud-agent-next/src/trpc-error.test.ts new file mode 100644 index 0000000000..174f803e6d --- /dev/null +++ b/services/cloud-agent-next/src/trpc-error.test.ts @@ -0,0 +1,143 @@ +import { describe, expect, it } from 'vitest'; +import { TRPC_ERROR_CODES_BY_KEY } from '@trpc/server/rpc'; + +import { buildTrpcErrorResponse, createClientError, projectTrpcErrorData } from './trpc-error.js'; + +describe('createClientError', () => { + it.each([ + 'PARSE_ERROR', + 'BAD_REQUEST', + 'UNAUTHORIZED', + 'PAYMENT_REQUIRED', + 'FORBIDDEN', + 'NOT_FOUND', + 'METHOD_NOT_SUPPORTED', + 'NOT_IMPLEMENTED', + 'CONFLICT', + 'PRECONDITION_FAILED', + 'PAYLOAD_TOO_LARGE', + 'UNSUPPORTED_MEDIA_TYPE', + 'UNPROCESSABLE_CONTENT', + ])('classifies %s as non-retryable', code => { + expect(createClientError(code, 'public message')).toEqual({ + code, + message: 'public message', + retryable: false, + }); + }); + + it.each([ + 'TOO_MANY_REQUESTS', + 'TIMEOUT', + 'CLIENT_CLOSED_REQUEST', + 'INTERNAL_SERVER_ERROR', + 'SERVICE_UNAVAILABLE', + 'FUTURE_ERROR', + ])('classifies %s as retryable', code => { + expect(createClientError(code, 'public message').retryable).toBe(true); + }); + + it('uses a valid explicit legacy cause without replacing the tRPC message', () => { + expect( + projectTrpcErrorData( + { code: 'SERVICE_UNAVAILABLE', httpStatus: 503, path: 'send' }, + 'Admission failed', + { error: 'SANDBOX_CONNECT_FAILED', message: 'internal detail', retryable: true } + ) + ).toEqual({ + code: 'SERVICE_UNAVAILABLE', + httpStatus: 503, + path: 'send', + error: 'SANDBOX_CONNECT_FAILED', + retryable: true, + clientError: { + code: 'SANDBOX_CONNECT_FAILED', + message: 'Admission failed', + retryable: true, + }, + }); + }); + + it('keeps known non-retryable codes non-retryable despite an inconsistent cause', () => { + expect( + projectTrpcErrorData({ code: 'BAD_REQUEST', httpStatus: 400 }, 'Invalid request', { + error: 'BAD_REQUEST', + retryable: true, + }) + ).toMatchObject({ + error: 'BAD_REQUEST', + retryable: false, + clientError: { + code: 'BAD_REQUEST', + message: 'Invalid request', + retryable: false, + }, + }); + }); + + it.each([ + null, + 'failure', + { error: 'lowercase', retryable: false }, + { error: 'VALID_CODE' }, + { error: 'VALID_CODE', retryable: 'yes' }, + { error: 'VALID_CODE', retryable: true, message: 42 }, + { arbitrary: 'exception detail' }, + ])('ignores malformed cause %j', cause => { + expect( + projectTrpcErrorData( + { code: 'INTERNAL_SERVER_ERROR', httpStatus: 500 }, + 'Safe message', + cause + ) + ).toEqual({ + code: 'INTERNAL_SERVER_ERROR', + httpStatus: 500, + clientError: { + code: 'INTERNAL_SERVER_ERROR', + message: 'Safe message', + retryable: true, + }, + }); + }); +}); + +describe('buildTrpcErrorResponse', () => { + it.each([ + [405, 'METHOD_NOT_SUPPORTED', false], + [408, 'TIMEOUT', true], + [409, 'CONFLICT', false], + [412, 'PRECONDITION_FAILED', false], + [413, 'PAYLOAD_TOO_LARGE', false], + [415, 'UNSUPPORTED_MEDIA_TYPE', false], + [422, 'UNPROCESSABLE_CONTENT', false], + [429, 'TOO_MANY_REQUESTS', true], + [499, 'CLIENT_CLOSED_REQUEST', true], + [501, 'NOT_IMPLEMENTED', false], + [503, 'SERVICE_UNAVAILABLE', true], + ] as const)( + 'maps HTTP %i to %s without changing envelope metadata', + async (status, code, retryable) => { + const response = buildTrpcErrorResponse(status, 'Original message', 'send'); + const body = await response.json(); + + expect(response.status).toBe(status); + expect(body).toEqual({ + error: { + message: 'Original message', + code: TRPC_ERROR_CODES_BY_KEY[code], + data: { + code, + httpStatus: status, + path: 'send', + clientError: { + code, + message: 'Original message', + retryable, + }, + }, + }, + }); + } + ); +}); diff --git a/services/cloud-agent-next/src/trpc-error.ts b/services/cloud-agent-next/src/trpc-error.ts index d0113ae143..10fa69f42c 100644 --- a/services/cloud-agent-next/src/trpc-error.ts +++ b/services/cloud-agent-next/src/trpc-error.ts @@ -1,33 +1,105 @@ +import { PublicErrorCodeSchema, type ClientError } from '@kilocode/worker-utils/client-error'; import { TRPC_ERROR_CODES_BY_KEY } from '@trpc/server/rpc'; +const NON_RETRYABLE_CODES = new Set([ + 'PARSE_ERROR', + 'BAD_REQUEST', + 'UNAUTHORIZED', + 'PAYMENT_REQUIRED', + 'FORBIDDEN', + 'NOT_FOUND', + 'METHOD_NOT_SUPPORTED', + 'NOT_IMPLEMENTED', + 'CONFLICT', + 'PRECONDITION_FAILED', + 'PAYLOAD_TOO_LARGE', + 'UNSUPPORTED_MEDIA_TYPE', + 'UNPROCESSABLE_CONTENT', +]); + +const STATUS_TO_TRPC_CODE = { + 400: 'BAD_REQUEST', + 401: 'UNAUTHORIZED', + 402: 'PAYMENT_REQUIRED', + 403: 'FORBIDDEN', + 404: 'NOT_FOUND', + 405: 'METHOD_NOT_SUPPORTED', + 408: 'TIMEOUT', + 409: 'CONFLICT', + 412: 'PRECONDITION_FAILED', + 413: 'PAYLOAD_TOO_LARGE', + 415: 'UNSUPPORTED_MEDIA_TYPE', + 422: 'UNPROCESSABLE_CONTENT', + 429: 'TOO_MANY_REQUESTS', + 499: 'CLIENT_CLOSED_REQUEST', + 501: 'NOT_IMPLEMENTED', + 503: 'SERVICE_UNAVAILABLE', +} satisfies Partial>; + +type TrpcErrorData = { + code: string; + httpStatus: number; + path?: string; + [key: string]: unknown; +}; + +type ExplicitLegacyCause = { + error: string; + retryable: boolean; +}; + +function parseExplicitLegacyCause(cause: unknown): ExplicitLegacyCause | undefined { + if (!cause || typeof cause !== 'object') return undefined; + if (!('error' in cause) || !('retryable' in cause)) return undefined; + const parsedError = PublicErrorCodeSchema.safeParse(cause.error); + if (!parsedError.success || typeof cause.retryable !== 'boolean') { + return undefined; + } + if ('message' in cause && cause.message !== undefined && typeof cause.message !== 'string') { + return undefined; + } + return { error: parsedError.data, retryable: cause.retryable }; +} + +export function createClientError(code: string, message: string, retryable?: boolean): ClientError { + return { + code, + message, + retryable: NON_RETRYABLE_CODES.has(code) ? false : (retryable ?? true), + }; +} + +export function projectTrpcErrorData( + data: TrpcErrorData, + message: string, + cause?: unknown +): TrpcErrorData & { clientError: ClientError } { + const explicitCause = parseExplicitLegacyCause(cause); + if (explicitCause) { + const clientError = createClientError(explicitCause.error, message, explicitCause.retryable); + return { + ...data, + error: explicitCause.error, + retryable: clientError.retryable, + clientError, + }; + } + return { + ...data, + clientError: createClientError(data.code, message), + }; +} + export function buildTrpcErrorResponse(status: number, message: string, path?: string): Response { - const code = (() => { - switch (status) { - case 400: - return 'BAD_REQUEST'; - case 401: - return 'UNAUTHORIZED'; - case 402: - return 'PAYMENT_REQUIRED'; - case 403: - return 'FORBIDDEN'; - case 404: - return 'NOT_FOUND'; - default: - return 'INTERNAL_SERVER_ERROR'; - } - })(); + const code = + STATUS_TO_TRPC_CODE[status as keyof typeof STATUS_TO_TRPC_CODE] ?? 'INTERNAL_SERVER_ERROR'; return new Response( JSON.stringify({ error: { message, code: TRPC_ERROR_CODES_BY_KEY[code], - data: { - code, - httpStatus: status, - path, - }, + data: projectTrpcErrorData({ code, httpStatus: status, path }, message), }, }), { diff --git a/services/cloud-agent-next/src/websocket/ingest.test.ts b/services/cloud-agent-next/src/websocket/ingest.test.ts index 22530aac49..b54e6ca9ff 100644 --- a/services/cloud-agent-next/src/websocket/ingest.test.ts +++ b/services/cloud-agent-next/src/websocket/ingest.test.ts @@ -1228,6 +1228,32 @@ describe('createIngestHandler', () => { expect(doContext.wrapperSupervisor.observeFinalizing).toHaveBeenCalledWith(WRAPPER_RUN_ID); }); + it.each([ + { failureCode: 'payment_required' as const, error: 'Insufficient credits' }, + { failureCode: 'model_missing' as const, error: 'Model not found' }, + ])('forwards $failureCode wrapper failures to the session coordinator', async failure => { + const doContext = createNewPathDOContext(); + const handler = createIngestHandler( + createFakeState(), + createFakeEventQueries(), + SESSION_ID, + vi.fn(), + doContext + ); + const ws = createFakeWebSocket(makeNewPathAttachment()); + + await handler.handleIngestMessage( + ws, + makeStreamMessage('error', { fatal: true, ...failure }) + ); + + expect(doContext.wrapperSupervisor.onTerminalEvent).toHaveBeenCalledWith({ + wrapperRunId: WRAPPER_RUN_ID, + status: 'failed', + ...failure, + }); + }); + it('does NOT terminalize on wrapper complete event (new path)', async () => { const state = createFakeState(); const doContext = createNewPathDOContext(); diff --git a/services/cloud-agent-next/src/websocket/ingest.ts b/services/cloud-agent-next/src/websocket/ingest.ts index 0e656ccb60..48be2b414f 100644 --- a/services/cloud-agent-next/src/websocket/ingest.ts +++ b/services/cloud-agent-next/src/websocket/ingest.ts @@ -24,7 +24,12 @@ import { handleCommandsAvailable, extractEntityId, } from '../session/ingest-handlers/index.js'; -import type { CompleteEventData, KilocodeEventData, CloudStatusData } from '../shared/protocol.js'; +import { + WrapperTerminalFailureCodes, + type CompleteEventData, + type KilocodeEventData, + type CloudStatusData, +} from '../shared/protocol.js'; import type { SlashCommandInfo } from '../shared/slash-commands.js'; import { logger } from '../logger.js'; import type { WrapperSupervisor, WrapperTerminalEvent } from '../session/wrapper-supervisor.js'; @@ -63,6 +68,7 @@ const errorEventSchema = z.object({ error: z.string().optional(), message: z.string().optional(), errorSource: z.literal('assistant').optional(), + failureCode: z.enum(WrapperTerminalFailureCodes).optional(), }); const cloudMessageCompletedEventSchema = z.object({ @@ -839,6 +845,7 @@ export function createIngestHandler( status: 'failed', error: fatalMessage, errorSource: errorData.errorSource, + failureCode: errorData.failureCode, }); logger .withFields({ diff --git a/services/cloud-agent-next/test/integration/session/legacy-callback-enqueue.test.ts b/services/cloud-agent-next/test/integration/session/legacy-callback-enqueue.test.ts index 7579869552..9d627f2669 100644 --- a/services/cloud-agent-next/test/integration/session/legacy-callback-enqueue.test.ts +++ b/services/cloud-agent-next/test/integration/session/legacy-callback-enqueue.test.ts @@ -65,4 +65,52 @@ describe('legacy execution callback enqueue', () => { status: 'completed', }); }); + + it('adds retryable fallback client errors without parsing legacy error text', async () => { + const userId = 'user_legacy_callback_error'; + const sessionId = 'agent_legacy_callback_error'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + + const jobs = await runInDurableObject(stub, async instance => { + const sentCallbackJobs: CallbackJob[] = []; + installCallbackQueue(instance, async job => { + sentCallbackJobs.push(job); + }); + await registerReadySession(instance, { + sessionId, + userId, + prompt: 'prepared prompt', + mode: 'code', + model: 'test-model', + kiloSessionId: '55555555-5555-4555-8555-555555555555', + kilocodeToken: 'token-callback-error', + callbackTarget: { url: 'https://example.com/callback' }, + }); + await instance.addExecution({ + executionId: 'exc_legacy_callback_error', + mode: 'code', + streamingMode: 'websocket', + ingestToken: 'exc_legacy_callback_error', + }); + await instance.updateExecutionStatus({ + executionId: 'exc_legacy_callback_error', + status: 'failed', + error: 'assistant_error must not control classification', + }); + return sentCallbackJobs; + }); + + expect(jobs[0].payload).toMatchObject({ + executionId: 'exc_legacy_callback_error', + status: 'failed', + errorMessage: 'assistant_error must not control classification', + clientError: { + code: 'EXECUTION_FAILED', + message: 'assistant_error must not control classification', + retryable: true, + }, + }); + }); }); diff --git a/services/cloud-agent-next/test/unit/wrapper/reconnection.test.ts b/services/cloud-agent-next/test/unit/wrapper/reconnection.test.ts index a21f5ce652..db8e48aa89 100644 --- a/services/cloud-agent-next/test/unit/wrapper/reconnection.test.ts +++ b/services/cloud-agent-next/test/unit/wrapper/reconnection.test.ts @@ -1118,7 +1118,8 @@ describe('ingest WS reconnection', () => { ); expect(sessionErrors).toHaveLength(1); expect(callbacks.onTerminalError).toHaveBeenCalledWith({ - error: 'Model not found: kilo/does-not-exist.', + code: 'model_missing', + message: 'Model not found: kilo/does-not-exist.', errorSource: 'assistant', }); }); @@ -1148,7 +1149,7 @@ describe('ingest WS reconnection', () => { await vi.advanceTimersByTimeAsync(0); expect(callbacks.onTerminalError).toHaveBeenCalledWith({ - error: errorMessage, + message: errorMessage, errorSource: 'assistant', }); } @@ -1459,7 +1460,8 @@ describe('ingest WS reconnection', () => { expect(callbacks.onTerminalError).toHaveBeenCalledTimes(terminal ? 1 : 0); if (terminal) { expect(callbacks.onTerminalError).toHaveBeenCalledWith({ - error: 'Insufficient credits', + ...(eventType === 'usage_limit_exceeded' ? {} : { code: 'payment_required' }), + message: 'Insufficient credits', errorSource: 'assistant', }); } diff --git a/services/cloud-agent-next/wrapper/src/connection.ts b/services/cloud-agent-next/wrapper/src/connection.ts index 12f20e444b..c4451dadea 100644 --- a/services/cloud-agent-next/wrapper/src/connection.ts +++ b/services/cloud-agent-next/wrapper/src/connection.ts @@ -10,7 +10,11 @@ */ import type { WrapperState } from './state.js'; -import type { IngestEvent, WrapperCommand } from '../../src/shared/protocol.js'; +import type { + IngestEvent, + WrapperCommand, + WrapperTerminalFailureCode, +} from '../../src/shared/protocol.js'; import { trimPayload } from '../../src/shared/trim-payload.js'; import { logToFile } from './utils.js'; import type { KiloEvent, WrapperKiloClient } from './kilo-api.js'; @@ -162,14 +166,15 @@ export type ConnectionConfig = { kiloClient: WrapperKiloClient; }; -export type AssistantTerminalError = { - error: string; +export type WrapperTerminalFailure = { + code?: WrapperTerminalFailureCode; + message: string; errorSource: 'assistant'; }; export type ConnectionCallbacks = { /** Called when a terminal assistant request error is detected */ - onTerminalError: (error: AssistantTerminalError) => void; + onTerminalError: (failure: WrapperTerminalFailure) => void; /** Called when a command is received from DO */ onCommand: (cmd: WrapperCommand) => void; /** Called when the connection unexpectedly closes */ @@ -739,46 +744,62 @@ export function createConnectionManager( } /** - * Check if an event represents a terminal error (payment/billing/quota/model resolution). + * Classify terminal errors that require changed model or account state. */ - function isTerminalError(eventType: string, properties: Record): boolean { + function getTerminalFailure( + eventType: string, + properties: Record + ): WrapperTerminalFailure | undefined { const eventSessionID = typeof properties.sessionID === 'string' ? properties.sessionID : undefined; if (eventSessionID && eventSessionID !== state.currentSession?.kiloSessionId) { - return false; + return undefined; } if ( eventType === 'session.error' && properties.sessionID !== state.currentSession?.kiloSessionId ) { - return false; + return undefined; } - if ( - eventType === 'payment_required' || - eventType === 'insufficient_funds' || - eventType === 'usage_limit_exceeded' - ) { - return true; - } - const error = properties.error; - if (error) { - const errorStr = typeof error === 'string' ? error : JSON.stringify(error); - const normalizedError = errorStr.toLowerCase(); - if ( - normalizedError.includes('payment') || - normalizedError.includes('credit') || - normalizedError.includes('balance') || - normalizedError.includes('quota') || - (eventType === 'session.error' && - (normalizedError.includes('usage_limit_exceeded') || - normalizedError.includes('too many requests') || - normalizedError.includes('model not found'))) - ) { - return true; + let code: WrapperTerminalFailureCode | undefined; + if (eventType === 'payment_required' || eventType === 'insufficient_funds') { + code = 'payment_required'; + } else if (eventType !== 'usage_limit_exceeded') { + const error = properties.error; + if (error) { + const errorStr = typeof error === 'string' ? error : JSON.stringify(error); + const normalizedError = errorStr.toLowerCase(); + if (eventType === 'session.error' && normalizedError.includes('model not found')) { + code = 'model_missing'; + } else if ( + normalizedError.includes('payment') || + normalizedError.includes('credit') || + normalizedError.includes('balance') || + normalizedError.includes('quota') + ) { + code = 'payment_required'; + } } } - return false; + + const isExplicitAssistantFailure = + eventType === 'usage_limit_exceeded' || + (eventType === 'session.error' && + (() => { + const normalizedError = JSON.stringify(properties.error ?? '').toLowerCase(); + return ( + normalizedError.includes('usage_limit_exceeded') || + normalizedError.includes('too many requests') + ); + })()); + if (!code && !isExplicitAssistantFailure) return undefined; + + return { + ...(code ? { code } : {}), + message: getTerminalErrorText(eventType, properties), + errorSource: 'assistant', + }; } function getTerminalErrorText(eventType: string, properties: Record): string { @@ -1024,11 +1045,9 @@ export function createConnectionManager( } // Terminal error detection - if (isTerminalError(eventType, properties)) { - callbacks.onTerminalError({ - error: getTerminalErrorText(eventType, properties), - errorSource: 'assistant', - }); + const terminalFailure = getTerminalFailure(eventType, properties); + if (terminalFailure) { + callbacks.onTerminalError(terminalFailure); return; } diff --git a/services/cloud-agent-next/wrapper/src/main.ts b/services/cloud-agent-next/wrapper/src/main.ts index 82a6765cc5..7a312612f7 100644 --- a/services/cloud-agent-next/wrapper/src/main.ts +++ b/services/cloud-agent-next/wrapper/src/main.ts @@ -392,11 +392,16 @@ async function main() { state, { kiloClient: nextKiloClient }, { - onTerminalError: terminalError => { - logToFile(`terminal error: ${terminalError.error}`); + onTerminalError: failure => { + logToFile(`terminal error: ${failure.message}`); state.sendToIngest({ streamEventType: 'error', - data: { ...terminalError, fatal: true }, + data: { + error: failure.message, + errorSource: failure.errorSource, + fatal: true, + ...(failure.code ? { failureCode: failure.code } : {}), + }, timestamp: new Date().toISOString(), }); const session = state.currentSession;