diff --git a/agentic/agent.py b/agentic/agent.py index 57d66e5..f236a36 100644 --- a/agentic/agent.py +++ b/agentic/agent.py @@ -4,13 +4,33 @@ from datetime import datetime, timezone from strands import Agent from strands.models import BedrockModel -from tools import estimate_image_cost, check_wallet_balance, make_payment, generate_image, analyze_content_monetization, IMAGE_STORAGE +from tools import estimate_image_cost, check_wallet_balance, make_payment, generate_image, analyze_content_monetization, get_session_storage from memory_hook import MemoryHook, MEMORY_ID import os import logging # Configure logging logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def validate_config(): + """Fail fast if required environment variables are missing or invalid.""" + required = ['GATEWAY_URL', 'SELLER_WALLET', 'CDP_API_KEY_ID', 'CDP_API_KEY_SECRET'] + missing = [k for k in required if not os.getenv(k)] + if missing: + raise RuntimeError(f"Missing required environment variables: {', '.join(missing)}") + + from web3_provider import get_web3 + try: + w3 = get_web3() + if not w3.is_connected(): + raise RuntimeError("RPC endpoint unreachable — check WEB3_PROVIDER_URL") + except Exception as e: + raise RuntimeError(f"Web3 connectivity check failed: {e}") from e + + +validate_config() app = FastAPI(title="Content Monetization Agent", version="1.0.0") @@ -54,35 +74,29 @@ class InvocationResponse(BaseModel): @app.post("/invocations", response_model=InvocationResponse) async def invoke_agent(request: InvocationRequest): - import logging - logger = logging.getLogger(__name__) - try: logger.info(f"📥 [REQUEST] Session:{request.session_id} | Prompt:{request.input.get('prompt', '')[:100]}") - + user_message = request.input.get("prompt", "") if not user_message: raise HTTPException( status_code=400, detail="No prompt found in input. Please provide a 'prompt' key." ) - + # Set session ID for memory isolation session_id = request.session_id or request.input.get("session_id", "default") agent.state.session_id = session_id - + logger.info(f"🤖 [AGENT_START] Session:{session_id} | Message:{user_message[:100]}") result = agent(user_message) logger.info(f"💬 [AGENT_RESPONSE] Session:{session_id} | Response:{str(result.message)[:300]}...") - - # Extract images from global storage (images are returned to user) - images = {} - for image_id, image_data in IMAGE_STORAGE.items(): - images[image_id] = image_data - - # Clear storage after extraction - IMAGE_STORAGE.clear() - + + # Extract images from session-scoped storage to prevent cross-session leakage + storage = get_session_storage(session_id) + images = dict(storage.image_storage) + storage.image_storage.clear() + response = { "message": result.message, "timestamp": datetime.now(timezone.utc).isoformat(), diff --git a/agentic/lambda/seller.js b/agentic/lambda/seller.js index 4691771..4467daa 100644 --- a/agentic/lambda/seller.js +++ b/agentic/lambda/seller.js @@ -1,10 +1,10 @@ import { Hono } from 'hono'; import { handle } from 'hono/aws-lambda'; import https from 'https'; +import { DynamoDBClient, PutItemCommand, GetItemCommand, UpdateItemCommand } from '@aws-sdk/client-dynamodb'; const app = new Hono(); -// x402 Configuration const X402_CONFIG = { facilitatorUrl: 'https://x402.org/facilitator', usdcBase: '0x036CbD53842c5426634e7929541eC2318f3dCF7e', @@ -12,288 +12,212 @@ const X402_CONFIG = { scheme: 'exact' }; -// Idempotency cache - for production, persist to DynamoDB for multi-instance scalability -const processedPayments = new Map(); +// DynamoDB-backed nonce store — shared across all Lambda instances so replay +// protection holds even when the function scales horizontally. +// Falls back to an in-process Map when the env var is absent (local dev / tests). +const dynamodb = process.env.NONCE_TABLE_NAME + ? new DynamoDBClient({ region: process.env.AWS_REGION || 'us-east-1' }) + : null; + +const localNonceCache = new Map(); + +const MAX_PAYMENT_AGE_SEC = 300; + +async function markNoncePending(nonce, paymentPayload, paymentRequirements) { + const ttl = Math.floor(Date.now() / 1000) + 3600; + if (dynamodb) { + await dynamodb.send(new PutItemCommand({ + TableName: process.env.NONCE_TABLE_NAME, + Item: { + nonce: { S: nonce }, + status: { S: 'pending' }, + paymentPayload: { S: JSON.stringify(paymentPayload) }, + paymentRequirements: { S: JSON.stringify(paymentRequirements) }, + createdAt: { N: String(Math.floor(Date.now() / 1000)) }, + ttl: { N: String(ttl) }, + }, + ConditionExpression: 'attribute_not_exists(nonce)', + })); + } else { + if (localNonceCache.has(nonce)) throw new Error('Duplicate nonce'); + localNonceCache.set(nonce, { status: 'pending', paymentPayload, paymentRequirements, ts: Date.now() }); + } +} + +async function getNonceEntry(nonce) { + if (dynamodb) { + const res = await dynamodb.send(new GetItemCommand({ + TableName: process.env.NONCE_TABLE_NAME, + Key: { nonce: { S: nonce } }, + })); + if (!res.Item) return null; + return { + status: res.Item.status.S, + paymentPayload: JSON.parse(res.Item.paymentPayload.S), + paymentRequirements: JSON.parse(res.Item.paymentRequirements.S), + }; + } else { + return localNonceCache.get(nonce) || null; + } +} + +async function markNonceSettled(nonce) { + if (dynamodb) { + await dynamodb.send(new UpdateItemCommand({ + TableName: process.env.NONCE_TABLE_NAME, + Key: { nonce: { S: nonce } }, + UpdateExpression: 'SET #s = :settled', + ExpressionAttributeNames: { '#s': 'status' }, + ExpressionAttributeValues: { ':settled': { S: 'settled' } }, + })); + } else { + const entry = localNonceCache.get(nonce); + if (entry) entry.status = 'settled'; + } +} -// CORS middleware app.use('*', async (c, next) => { c.header('Access-Control-Allow-Origin', '*'); c.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS'); c.header('Access-Control-Allow-Headers', 'Content-Type, Authorization, X-PAYMENT, PAYMENT-SIGNATURE'); c.header('Access-Control-Expose-Headers', 'PAYMENT-REQUIRED, PAYMENT-RESPONSE'); - - if (c.req.method === 'OPTIONS') { - return c.text('', 200); - } - + if (c.req.method === 'OPTIONS') return c.text('', 200); await next(); }); -// JWT generation for CDP facilitator (mainnet use) -// Currently using x402.org facilitator for Base Sepolia testnet which requires no authentication. -// For mainnet deployment with CDP facilitator, uncomment this function and update verify/settle -// functions to use CDP API endpoints (https://api.cdp.coinbase.com/platform/v2/x402/*) -// with JWT authentication in the Authorization header. -/* -const { generateJwt } = require('@coinbase/cdp-sdk/auth'); - -const generateCDPJWT = async (requestMethod, requestPath) => { - const keyName = process.env.CDP_API_KEY_NAME; - const keySecret = process.env.CDP_API_KEY_SECRET; - - if (!keyName || !keySecret) { - throw new Error('CDP API credentials not configured'); - } - - return await generateJwt({ - apiKeyId: keyName, - apiKeySecret: keySecret, - requestMethod: requestMethod, - requestHost: 'api.cdp.coinbase.com', - requestPath: requestPath, - expiresIn: 120 - }); -}; -*/ - -// Verify payment with x402.org facilitator const verifyPayment = async (paymentPayload, paymentRequirements) => { const requestBody = { x402Version: 1, - paymentPayload: { - x402Version: 1, - scheme: X402_CONFIG.scheme, - network: X402_CONFIG.network, - payload: paymentPayload - }, + paymentPayload: { x402Version: 1, scheme: X402_CONFIG.scheme, network: X402_CONFIG.network, payload: paymentPayload }, paymentRequirements }; - - console.log('=== VERIFY REQUEST ==='); const bodyString = JSON.stringify(requestBody); - return new Promise((resolve, reject) => { const makeRequest = (url) => { const parsedUrl = new URL(url); const req = https.request({ - hostname: parsedUrl.hostname, - port: parsedUrl.port || 443, - path: parsedUrl.pathname, - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Content-Length': Buffer.byteLength(bodyString) - } + hostname: parsedUrl.hostname, port: parsedUrl.port || 443, path: parsedUrl.pathname, + method: 'POST', headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(bodyString) } }, (res) => { - // Handle redirects - if (res.statusCode >= 300 && res.statusCode < 400 && res.headers.location) { - console.log(`Redirecting to: ${res.headers.location}`); - return makeRequest(res.headers.location); - } - + if (res.statusCode >= 300 && res.statusCode < 400 && res.headers.location) return makeRequest(res.headers.location); let data = ''; res.on('data', (chunk) => data += chunk); - res.on('end', () => { - console.log('Verify response status:', res.statusCode); - try { - const result = JSON.parse(data); - console.log('Is valid:', result.isValid); - if (result.invalidReason) console.log('Invalid reason:', result.invalidReason); - resolve(result); - } catch (e) { - reject(new Error(`Failed to parse response: ${data}`)); - } - }); + res.on('end', () => { try { resolve(JSON.parse(data)); } catch (e) { reject(new Error(`Failed to parse: ${data}`)); } }); }); - req.on('error', (e) => reject(e)); req.write(bodyString); req.end(); }; - makeRequest('https://x402.org/facilitator/verify'); }); }; -// Settle payment with x402.org facilitator const settlePayment = async (paymentPayload, paymentRequirements) => { const requestBody = { x402Version: 1, - paymentPayload: { - x402Version: 1, - scheme: X402_CONFIG.scheme, - network: X402_CONFIG.network, - payload: paymentPayload - }, + paymentPayload: { x402Version: 1, scheme: X402_CONFIG.scheme, network: X402_CONFIG.network, payload: paymentPayload }, paymentRequirements }; - - console.log('=== SETTLE REQUEST ==='); const bodyString = JSON.stringify(requestBody); - return new Promise((resolve, reject) => { const makeRequest = (url) => { const parsedUrl = new URL(url); const req = https.request({ - hostname: parsedUrl.hostname, - port: parsedUrl.port || 443, - path: parsedUrl.pathname, - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Content-Length': Buffer.byteLength(bodyString) - } + hostname: parsedUrl.hostname, port: parsedUrl.port || 443, path: parsedUrl.pathname, + method: 'POST', headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(bodyString) } }, (res) => { - // Handle redirects - if (res.statusCode >= 300 && res.statusCode < 400 && res.headers.location) { - console.log(`Redirecting to: ${res.headers.location}`); - return makeRequest(res.headers.location); - } - + if (res.statusCode >= 300 && res.statusCode < 400 && res.headers.location) return makeRequest(res.headers.location); let data = ''; res.on('data', (chunk) => data += chunk); - res.on('end', () => { - console.log('Settle response status:', res.statusCode); - try { - const result = JSON.parse(data); - resolve(result); - } catch (e) { - reject(new Error(`Failed to parse response: ${data}`)); - } - }); + res.on('end', () => { try { resolve(JSON.parse(data)); } catch (e) { reject(new Error(`Failed to parse: ${data}`)); } }); }); - req.on('error', (e) => reject(e)); req.write(bodyString); req.end(); }; - makeRequest('https://x402.org/facilitator/settle'); }); }; -// x402 compliant payment middleware for /generate_image route app.use('/generate_image', async (c, next) => { try { const body = await c.req.json(); const { request_id, prompt, price } = body; - - // Use provided price or default estimate (in USDC wei) - const estimatedCost = price || '20000'; // ~$0.02 default - - // Check for PAYMENT-SIGNATURE header (x402 v2 standard) or X-PAYMENT (legacy) + const estimatedCost = price || '20000'; const paymentHeader = c.req.header('PAYMENT-SIGNATURE') || c.req.header('X-PAYMENT'); - + if (!paymentHeader) { const sellerWallet = process.env.SELLER_WALLET; const paymentRequirements = { - scheme: X402_CONFIG.scheme, - network: X402_CONFIG.network, + scheme: X402_CONFIG.scheme, network: X402_CONFIG.network, maxAmountRequired: String(estimatedCost), resource: `${(process.env.GATEWAY_URL || 'https://example.com').replace(/\/$/, '')}/generate_image`, - description: 'AI image generation with Nova Canvas', - mimeType: 'application/json', + description: 'AI image generation with Nova Canvas', mimeType: 'application/json', outputSchema: { status: 'string', request_id: 'string', message: 'string' }, - payTo: sellerWallet, - asset: X402_CONFIG.usdcBase, - maxTimeoutSeconds: 300, - extra: { - name: 'USDC', - version: '2', - chainId: 84532 - } + payTo: sellerWallet, asset: X402_CONFIG.usdcBase, maxTimeoutSeconds: 300, + extra: { name: 'USDC', version: '2', chainId: 84532 } }; - const x402Response = { - x402Version: 1, - accepts: [paymentRequirements], - error: 'Payment required' - }; - console.log('=== 402 RESPONSE ==='); - console.log(JSON.stringify(x402Response, null, 2)); - return c.json(x402Response, 402); + return c.json({ x402Version: 1, accepts: [paymentRequirements], error: 'Payment required' }, 402); } - - // Parse payment payload (base64 encoded) + let paymentPayload; try { - const decoded = Buffer.from(paymentHeader, 'base64').toString('utf-8'); - paymentPayload = JSON.parse(decoded); - console.log('=== PAYMENT RECEIVED ==='); - console.log('Payload:', JSON.stringify(paymentPayload, null, 2)); + paymentPayload = JSON.parse(Buffer.from(paymentHeader, 'base64').toString('utf-8')); } catch (error) { - console.log('Parse error:', error.message); return c.json({ error: 'Invalid payment payload' }, 400); } - - // Extract authorization (handle both formats) + const authorization = paymentPayload.payload?.authorization || paymentPayload.authorization; const authorizedValue = authorization?.value; - if (!authorizedValue) { - console.log('Missing authorization value'); - return c.json({ error: 'Missing authorization value' }, 400); + if (!authorizedValue) return c.json({ error: 'Missing authorization value' }, 400); + + // Reject expired or stale signatures + const validBefore = Number(authorization?.validBefore || 0); + const nowSec = Math.floor(Date.now() / 1000); + if (validBefore > 0 && nowSec > validBefore) { + return c.json({ error: 'payment_expired', reason: 'Payment signature has expired' }, 402); + } + const validAfter = Number(authorization?.validAfter || 0); + if ((nowSec - validAfter) > MAX_PAYMENT_AGE_SEC) { + return c.json({ error: 'payment_expired', reason: `Signature older than ${MAX_PAYMENT_AGE_SEC} seconds` }, 402); } - - // Idempotency check using nonce + + // Idempotency check — DynamoDB-backed across Lambda instances const nonce = authorization?.nonce; - if (nonce && processedPayments.has(nonce)) { - return c.json({ error: 'Payment already processed' }, 409); + if (nonce) { + const existing = await getNonceEntry(nonce); + if (existing) return c.json({ error: 'Payment already processed' }, 409); } - - // Create payment requirements using EXACT value from authorization + const sellerWallet = process.env.SELLER_WALLET; const paymentRequirements = { - scheme: X402_CONFIG.scheme, - network: X402_CONFIG.network, + scheme: X402_CONFIG.scheme, network: X402_CONFIG.network, maxAmountRequired: authorizedValue, resource: `${(process.env.GATEWAY_URL || 'https://example.com').replace(/\/$/, '')}/generate_image`, - description: 'AI image generation with Nova Canvas', - mimeType: 'application/json', + description: 'AI image generation with Nova Canvas', mimeType: 'application/json', outputSchema: { status: 'string', request_id: 'string', message: 'string' }, - payTo: sellerWallet, - asset: X402_CONFIG.usdcBase, - maxTimeoutSeconds: 300, - extra: { - name: 'USDC', - version: '2', - chainId: 84532 - } + payTo: sellerWallet, asset: X402_CONFIG.usdcBase, maxTimeoutSeconds: 300, + extra: { name: 'USDC', version: '2', chainId: 84532 } }; - - console.log('=== PAYMENT VERIFICATION ==='); - console.log('Payment requirements:', JSON.stringify(paymentRequirements, null, 2)); - console.log('Payment payload from client:', JSON.stringify(paymentPayload, null, 2)); - console.log('Authorization value:', authorizedValue); - console.log('Seller wallet:', sellerWallet); - console.log('Asset (USDC):', X402_CONFIG.usdcBase); - - // Verify payment with x402.org facilitator (do NOT settle yet - settle after content delivery per x402 spec) + const verification = await verifyPayment(paymentPayload.payload || paymentPayload, paymentRequirements); if (!verification.isValid) { - return c.json({ - error: 'Payment verification failed', - reason: verification.invalidReason - }, 402); + return c.json({ error: 'Payment verification failed', reason: verification.invalidReason }, 402); } - - console.log('Payment verified successfully!'); - - // Store payment data for deferred settlement via /settle endpoint - c.set('paymentPayload', paymentPayload); - c.set('paymentRequirements', paymentRequirements); - c.set('nonce', nonce); - - // Mark nonce as pending (prevents replay while awaiting settlement) + + // Persist nonce atomically — ConditionalCheckFailedException means concurrent replay if (nonce) { - processedPayments.set(nonce, { timestamp: Date.now(), status: 'pending', paymentPayload: paymentPayload.payload || paymentPayload, paymentRequirements }); - - // Clean up stale pending entries (never settled, e.g. Bedrock failure) and old settled entries - const oneHourAgo = Date.now() - 3600000; - for (const [n, entry] of processedPayments.entries()) { - const ts = typeof entry === 'object' ? entry.timestamp : entry; - if (ts < oneHourAgo) processedPayments.delete(n); + try { + await markNoncePending(nonce, paymentPayload.payload || paymentPayload, paymentRequirements); + } catch (err) { + if (err.name === 'ConditionalCheckFailedException') return c.json({ error: 'Payment already processed' }, 409); + throw err; } } - + + c.set('paymentPayload', paymentPayload); + c.set('paymentRequirements', paymentRequirements); + c.set('nonce', nonce); await next(); } catch (error) { console.error('Payment middleware error:', error); @@ -301,79 +225,43 @@ app.use('/generate_image', async (c, next) => { } }); -// Protected generate_image endpoint - payment verified, content can proceed app.post('/generate_image', async (c) => { try { const body = await c.req.json(); const nonce = c.get('nonce'); - - return c.json({ - status: 'payment_verified', - request_id: body.request_id, - message: 'Payment verified - proceed with image generation', - nonce: nonce || null - }); + return c.json({ status: 'payment_verified', request_id: body.request_id, message: 'Payment verified - proceed with image generation', nonce: nonce || null }); } catch (error) { - console.error('Generate error:', error); - return c.json({ - status: 'error', - error: error.message - }, 500); + return c.json({ status: 'error', error: error.message }, 500); } }); -// x402 spec: settle after content delivery (fair billing - only charge on success) app.post('/settle', async (c) => { try { const body = await c.req.json(); const { nonce } = body; - - if (!nonce) { - return c.json({ error: 'Missing nonce' }, 400); - } - - // Look up pending payment data by nonce - const pendingPayment = processedPayments.get(nonce); + if (!nonce) return c.json({ error: 'Missing nonce' }, 400); + + const pendingPayment = await getNonceEntry(nonce); if (!pendingPayment || pendingPayment.status !== 'pending') { return c.json({ error: 'No pending payment found for nonce' }, 404); } - + const { paymentPayload, paymentRequirements } = pendingPayment; - let transactionHash = null; try { const settlement = await settlePayment(paymentPayload, paymentRequirements); - if (settlement.success) { - console.log('Payment settled successfully'); - console.log('Transaction:', settlement.transaction); - transactionHash = settlement.transaction; - } else { - console.log('Settlement failed (testnet expected):', settlement.errorReason); - } + if (settlement.success) transactionHash = settlement.transaction; } catch (error) { console.log('Settlement error (testnet expected):', error.message); } - - // Mark as settled and clean old entries - processedPayments.set(nonce, { timestamp: Date.now(), status: 'settled' }); - const oneHourAgo = Date.now() - 3600000; - for (const [n, entry] of processedPayments.entries()) { - const ts = typeof entry === 'object' ? entry.timestamp : entry; - if (ts < oneHourAgo) processedPayments.delete(n); - } - - return c.json({ - status: 'settled', - transaction_hash: transactionHash - }); + + await markNonceSettled(nonce); + return c.json({ status: 'settled', transaction_hash: transactionHash }); } catch (error) { - console.error('Settlement error:', error); return c.json({ error: error.message }, 500); } }); -app.get('/health', (c) => { - return c.json({ status: 'healthy' }); -}); +app.get('/health', (c) => c.json({ status: 'healthy' })); export const handler = handle(app); diff --git a/agentic/tools.py b/agentic/tools.py index 9fa80b8..985941d 100644 --- a/agentic/tools.py +++ b/agentic/tools.py @@ -15,7 +15,7 @@ bedrock_runtime = boto3.client('bedrock-runtime', region_name=os.getenv('AWS_REGION', 'us-east-1')) -# Session-level storage - will be managed per session +# Session-level storage - managed per session to prevent cross-session data leakage # authorize_check: User consent layer before x402 automatic payment # Enables natural language approval ("yes, proceed") with optional AgentKit spending allowances class SessionStorage: @@ -26,21 +26,18 @@ def __init__(self): self.current_request_id = None # Track current request_id self.current_cost = None # Track current cost -# Global fallback for backward compatibility -IMAGE_STORAGE = {} -AUTHORIZE_CHECK = {} # Consent gate: x402 handles payment automatically after user authorizes -AUTH_VERIFIED = set() def get_session_storage(session_id="default"): - """Get or create session-specific storage""" + """Get or create session-specific storage.""" if not hasattr(get_session_storage, '_sessions'): get_session_storage._sessions = {} - + if session_id not in get_session_storage._sessions: get_session_storage._sessions[session_id] = SessionStorage() - + return get_session_storage._sessions[session_id] + # Seller wallet address SELLER_WALLET = os.getenv('SELLER_WALLET') @@ -54,21 +51,21 @@ def get_session_storage(session_id="default"): def estimate_image_cost(prompt: str, session_id: str = "default") -> str: """ Estimate the cost to generate an image with Nova Canvas. - + Args: prompt: Description of the image to generate - + Returns: Cost estimate in USDC with request_id """ storage = get_session_storage(session_id) - + # Check if there's already an active unauthorized request if storage.current_request_id and storage.current_request_id in storage.authorize_check: existing = storage.authorize_check[storage.current_request_id] if not existing['auth']: return f"Active request exists. Cost: {existing['cost']:.4f} USDC. Use make_payment() to proceed." - + # Nova Canvas: 1024x1024 standard = $0.04 estimate = estimate_cost(prompt, 'nova-canvas', resolution='1024x1024', quality='standard') request_id = str(uuid.uuid4()) @@ -81,15 +78,13 @@ def estimate_image_cost(prompt: str, session_id: str = "default") -> str: # Store current request_id and cost in session storage.current_request_id = request_id storage.current_cost = cost_usd - # Also update global for backward compatibility - AUTHORIZE_CHECK[request_id] = storage.authorize_check[request_id] return f"REQUEST_ID:{request_id}|COST:{cost_usd:.4f}|USD:{cost_usd:.4f}" @tool def check_wallet_balance(session_id: str = "default") -> str: """ Check the agent's wallet balances (ETH and USDC). - + Returns: Wallet balance information """ @@ -101,77 +96,76 @@ def check_wallet_balance(session_id: str = "default") -> str: @tool def make_payment(request_id: str = None, session_id: str = "default") -> str: """Authorize payment via natural language consent - x402 handles the actual transfer automatically. - + This is a user consent gate, not the actual payment. Users can approve with natural language (e.g., "yes, proceed") and optionally set AgentKit spending allowances for auto-approval. """ storage = get_session_storage(session_id) - + # If no request_id provided, use current session request_id if request_id is None: request_id = storage.current_request_id - + # If still None, user needs to estimate cost first if request_id is None: return "Error: No active request. Please use estimate_image_cost first to get a request ID." - + if request_id not in storage.authorize_check: return "Error: Invalid request ID. Please estimate image cost first." - + if storage.authorize_check[request_id]['auth']: return "Payment already authorized" - + amount_usdc = storage.authorize_check[request_id]['cost'] - + balance_info = get_balance(AGENT_WALLET) if balance_info['usdc_balance'] < amount_usdc: return f"Error: Insufficient balance. Need {amount_usdc:.6f} USDC, have {balance_info['usdc_balance']:.6f} USDC" - + storage.authorize_check[request_id]['auth'] = True storage.auth_verified.add(request_id) - # Update global for backward compatibility - AUTHORIZE_CHECK[request_id] = storage.authorize_check[request_id] - AUTH_VERIFIED.add(request_id) - + return f"✅ Payment authorized for {amount_usdc:.4f} USDC! Ready to generate image." @tool def generate_image(request_id: str = None, session_id: str = "default") -> str: """ Generate an image using Amazon Nova Canvas with x402 automatic payment. - + Args: request_id: The request ID from estimate_image_cost - + Returns: Success message with image ID """ import asyncio - + storage = get_session_storage(session_id) - + # If no request_id provided, use current session request_id if request_id is None: request_id = storage.current_request_id - + # If still None, user needs to estimate cost first if request_id is None: return "Error: No active request. Please use estimate_image_cost first to get a request ID." - + if request_id not in storage.authorize_check: return "Error: Invalid request ID. Use estimate_image_cost first." - + prompt = storage.authorize_check[request_id]['prompt'] cost_usdc = storage.authorize_check[request_id]['cost'] - + # Get gateway URL from environment - gateway_url = os.getenv('GATEWAY_URL').rstrip('/') + gateway_url = os.getenv('GATEWAY_URL', '').rstrip('/') + if not gateway_url: + return "Error: GATEWAY_URL not configured." print(f"Using gateway URL: {gateway_url}") - + # Check if payment was authorized - if not, return authorization required if not storage.authorize_check[request_id].get('auth'): return f"AUTHORIZE_CHECK - Cost: {cost_usdc:.4f} USDC. Payment authorization needed before image generation." - + # Use x402 httpx client - it handles 402 and payment automatically async def make_request(): async with get_x402_httpx_client(AGENT_WALLET, gateway_url) as client: @@ -179,7 +173,7 @@ async def make_request(): print(f"Gateway: {gateway_url}/generate_image") print(f"Request ID: {request_id}") print(f"Cost: {cost_usdc} USDC") - + # Convert USDC to wei for x402 protocol cost_wei = int(cost_usdc * 1e6) response = await client.post( @@ -187,27 +181,27 @@ async def make_request(): json={'request_id': request_id, 'prompt': prompt, 'price': str(cost_wei)}, timeout=30 ) - + print(f"Response status: {response.status_code}") print(f"Response body: {response.text[:500]}") return response - + try: response = asyncio.run(make_request()) - + if response.status_code != 200: return f"Error: Gateway returned {response.status_code}. Response: {response.text[:200]}" - + # Extract nonce for deferred settlement response_data = response.json() payment_nonce = response_data.get('nonce') - + except Exception as e: import traceback print(f"x402 error: {str(e)}") print(traceback.format_exc()) return f"Error: {str(e)}" - + # Generate image with Bedrock request_body = { "taskType": "TEXT_IMAGE", @@ -221,15 +215,15 @@ async def make_request(): "width": 1024 } } - + bedrock_response = bedrock_runtime.invoke_model( modelId="amazon.nova-canvas-v1:0", body=json.dumps(request_body) ) - + response_body = json.loads(bedrock_response['body'].read()) image_base64 = response_body['images'][0] - + # x402 spec: settle after content delivery (fair billing - only charge on success) transaction_hash = None if payment_nonce: @@ -247,27 +241,24 @@ async def make_request(): print(f"Settlement returned {settle_response.status_code} (testnet expected)") except Exception as e: print(f"Settlement error (testnet expected): {e}") - - # Store image with unique ID (don't return base64 to agent) + + # Store image in session-scoped storage only (prevents cross-session data leakage) image_id = str(uuid.uuid4()) image_data = f"data:image/png;base64,{image_base64}" storage.image_storage[image_id] = image_data - # Update global for backward compatibility - IMAGE_STORAGE[image_id] = image_data - + # Store image_id for potential analysis storage.authorize_check[request_id]['image_id'] = image_id - AUTHORIZE_CHECK[request_id] = storage.authorize_check[request_id] - + # Clear current request_id after successful completion to allow new requests storage.current_request_id = None storage.current_cost = None - + # Build success message with transaction info success_msg = f"SUCCESS|IMAGE_ID:{image_id}\n\nImage generated successfully! Payment verified on base-sepolia.\nImage ID: {image_id}" if transaction_hash: success_msg += f"\nTransaction: {transaction_hash}\nExplorer: https://sepolia.basescan.org/tx/{transaction_hash}" - + return success_msg @@ -276,11 +267,11 @@ def analyze_content_monetization(image_id: str, analysis_type: str = "monetizati """ Analyze image using Claude Sonnet 4 with vision. ONLY use when user EXPLICITLY requests analysis, description, poem, or other image analysis. DO NOT use automatically after image generation unless specifically asked. - + Args: image_id: The ID of the generated image to analyze (format: IMAGE_ID:uuid) analysis_type: Type of analysis (monetization, description, poem, etc.) - + Returns: Analysis based on requested type """ @@ -289,15 +280,13 @@ def analyze_content_monetization(image_id: str, analysis_type: str = "monetizati uuid_part = image_id.replace("IMAGE_ID:", "") else: uuid_part = image_id - - # Get image from session storage first, fallback to global + + # Get image from session-scoped storage storage = get_session_storage(session_id) - if uuid_part in storage.image_storage: - image_data = storage.image_storage[uuid_part] - elif uuid_part in IMAGE_STORAGE: - image_data = IMAGE_STORAGE[uuid_part] - else: + if uuid_part not in storage.image_storage: return "Error: Image not found. Please generate an image first." + + image_data = storage.image_storage[uuid_part] image_base64 = image_data.replace("data:image/png;base64,", "") request_body = { "anthropic_version": "bedrock-2023-05-31", @@ -322,11 +311,11 @@ def analyze_content_monetization(image_id: str, analysis_type: str = "monetizati } ] } - + response = bedrock_runtime.invoke_model( modelId="us.anthropic.claude-sonnet-4-20250514-v1:0", body=json.dumps(request_body) ) - + response_body = json.loads(response['body'].read()) return response_body['content'][0]['text'] diff --git a/serverless/lambda/seller/seller.js b/serverless/lambda/seller/seller.js index ef11d2c..ba6d635 100644 --- a/serverless/lambda/seller/seller.js +++ b/serverless/lambda/seller/seller.js @@ -1,6 +1,7 @@ const { Hono } = require('hono'); const { handle } = require('hono/aws-lambda'); const { LambdaClient, InvokeCommand } = require('@aws-sdk/client-lambda'); +const { DynamoDBClient, PutItemCommand, GetItemCommand, UpdateItemCommand } = require('@aws-sdk/client-dynamodb'); //const { generateJwt } = require('@coinbase/cdp-sdk/auth'); const https = require('https'); @@ -16,8 +17,69 @@ const X402_CONFIG = { scheme: 'exact' }; -// Idempotency cache - for production, persist to DynamoDB for multi-instance scalability -const processedPayments = new Map(); +// DynamoDB-backed nonce store — shared across all Lambda instances so replay +// protection holds even when the function scales horizontally. +// Falls back to an in-process Map when the env var is absent (local dev / tests). +const dynamodb = process.env.NONCE_TABLE_NAME + ? new DynamoDBClient({ region: process.env.AWS_REGION || 'us-east-1' }) + : null; + +const localNonceCache = new Map(); + +const MAX_PAYMENT_AGE_SEC = 300; + +async function markNoncePending(nonce, paymentPayload, paymentRequirements) { + const ttl = Math.floor(Date.now() / 1000) + 3600; + if (dynamodb) { + await dynamodb.send(new PutItemCommand({ + TableName: process.env.NONCE_TABLE_NAME, + Item: { + nonce: { S: nonce }, + status: { S: 'pending' }, + paymentPayload: { S: JSON.stringify(paymentPayload) }, + paymentRequirements: { S: JSON.stringify(paymentRequirements) }, + createdAt: { N: String(Math.floor(Date.now() / 1000)) }, + ttl: { N: String(ttl) }, + }, + ConditionExpression: 'attribute_not_exists(nonce)', + })); + } else { + if (localNonceCache.has(nonce)) throw new Error('Duplicate nonce'); + localNonceCache.set(nonce, { status: 'pending', paymentPayload, paymentRequirements, ts: Date.now() }); + } +} + +async function getNonceEntry(nonce) { + if (dynamodb) { + const res = await dynamodb.send(new GetItemCommand({ + TableName: process.env.NONCE_TABLE_NAME, + Key: { nonce: { S: nonce } }, + })); + if (!res.Item) return null; + return { + status: res.Item.status.S, + paymentPayload: JSON.parse(res.Item.paymentPayload.S), + paymentRequirements: JSON.parse(res.Item.paymentRequirements.S), + }; + } else { + return localNonceCache.get(nonce) || null; + } +} + +async function markNonceSettled(nonce) { + if (dynamodb) { + await dynamodb.send(new UpdateItemCommand({ + TableName: process.env.NONCE_TABLE_NAME, + Key: { nonce: { S: nonce } }, + UpdateExpression: 'SET #s = :settled', + ExpressionAttributeNames: { '#s': 'status' }, + ExpressionAttributeValues: { ':settled': { S: 'settled' } }, + })); + } else { + const entry = localNonceCache.get(nonce); + if (entry) entry.status = 'settled'; + } +} // Add CORS middleware app.use('*', async (c, next) => { @@ -315,13 +377,25 @@ app.use('/generate', async (c, next) => { if (!authorizedValue) { return c.json({ error: 'Missing authorization value' }, 400); } - - // Idempotency check using nonce + + // Reject expired or stale signatures + const validBefore = Number(paymentPayload.authorization?.validBefore || 0); + const nowSec = Math.floor(Date.now() / 1000); + if (validBefore > 0 && nowSec > validBefore) { + return c.json({ error: 'payment_expired', reason: 'Payment signature has expired' }, 402); + } + const validAfter = Number(paymentPayload.authorization?.validAfter || 0); + if ((nowSec - validAfter) > MAX_PAYMENT_AGE_SEC) { + return c.json({ error: 'payment_expired', reason: `Signature older than ${MAX_PAYMENT_AGE_SEC} seconds` }, 402); + } + + // Idempotency check — DynamoDB-backed across Lambda instances const nonce = paymentPayload.authorization?.nonce; - if (nonce && processedPayments.has(nonce)) { - return c.json({ error: 'Payment already processed' }, 409); + if (nonce) { + const existing = await getNonceEntry(nonce); + if (existing) return c.json({ error: 'Payment already processed' }, 409); } - + // Create payment requirements using the EXACT value from authorization const publicWallet = process.env.SELLER_WALLET_ADDRESS; const paymentRequirements = { @@ -352,17 +426,27 @@ app.use('/generate', async (c, next) => { // Verify payment with facilitator (do NOT settle yet - settle after content delivery per x402 spec) const verification = await verifyPayment(paymentPayload, paymentRequirements); if (!verification.isValid) { - return c.json({ - error: 'Payment verification failed', - reason: verification.invalidReason + return c.json({ + error: 'Payment verification failed', + reason: verification.invalidReason }, 402); } - + + // Persist nonce atomically — ConditionalCheckFailedException means concurrent replay + if (nonce) { + try { + await markNoncePending(nonce, paymentPayload, paymentRequirements); + } catch (err) { + if (err.name === 'ConditionalCheckFailedException') return c.json({ error: 'Payment already processed' }, 409); + throw err; + } + } + // Store payment data for post-delivery settlement c.set('paymentPayload', paymentPayload); c.set('paymentRequirements', paymentRequirements); c.set('nonce', nonce); - + await next(); } catch (error) { console.error('Payment middleware error:', error); @@ -391,16 +475,12 @@ app.post('/generate', async (c) => { console.warn('Settlement failed after content delivery:', settlement.errorReason); } - // Mark transaction as processed using nonce + // Mark nonce as settled in DynamoDB so it cannot be replayed if (nonce) { - processedPayments.set(nonce, Date.now()); - const oneHourAgo = Date.now() - 3600000; - for (const [n, timestamp] of processedPayments.entries()) { - if (timestamp < oneHourAgo) processedPayments.delete(n); - } + await markNonceSettled(nonce); } - - const response = { + + const response = { message: "Payment verified - content generated successfully", status: "success", content: bedrockResponse.content || 'No content generated',