/** * AlloyDB PostgreSQL client for vector semantic memory * * Uses pgvector extension for semantic search over knowledge_chunks. * Connection pooling ensures efficient resource usage in Next.js API routes. */ import { Pool, PoolClient, QueryResult, QueryResultRow } from 'pg'; import { GoogleAuth } from 'google-auth-library'; let pool: Pool | null = null; let cachedToken: { token: string; expiresAt: number } | null = null; interface AlloyDBConfig { host: string; port: number; user: string; password: string; database: string; ssl?: boolean; maxConnections?: number; idleTimeoutMillis?: number; } /** * Get an OAuth2 access token for IAM authentication */ async function getAccessToken(): Promise { // Check if we have a cached token that's still valid (with 5 min buffer) if (cachedToken && cachedToken.expiresAt > Date.now() + 5 * 60 * 1000) { return cachedToken.token; } const auth = new GoogleAuth({ scopes: ['https://www.googleapis.com/auth/cloud-platform'], }); const client = await auth.getClient(); const tokenResponse = await client.getAccessToken(); if (!tokenResponse.token) { throw new Error('Failed to get access token for AlloyDB IAM authentication'); } // Cache the token (Google tokens typically expire in 1 hour) cachedToken = { token: tokenResponse.token, expiresAt: Date.now() + 55 * 60 * 1000, // 55 minutes (safe buffer) }; return tokenResponse.token; } /** * Load AlloyDB configuration from environment variables */ async function loadConfig(): Promise { const host = process.env.ALLOYDB_HOST; const port = process.env.ALLOYDB_PORT; const user = process.env.ALLOYDB_USER; const password = process.env.ALLOYDB_PASSWORD; const database = process.env.ALLOYDB_DATABASE; if (!host || !port || !user || !database) { throw new Error( 'Missing required AlloyDB environment variables. Required: ALLOYDB_HOST, ALLOYDB_PORT, ALLOYDB_USER, ALLOYDB_DATABASE' ); } // When using AlloyDB Auth Proxy, no password is needed (proxy handles IAM) // For direct connections with IAM, generate an OAuth token let finalPassword = password; if (!finalPassword && !host.startsWith('/')) { // Only generate token for direct IP connections, not Unix sockets finalPassword = await getAccessToken(); } return { host, port: parseInt(port, 10), user, password: finalPassword || '', // Empty string for proxy connections database, ssl: process.env.ALLOYDB_SSL ? process.env.ALLOYDB_SSL !== 'false' : false, // Enable SSL if set to anything other than 'false' maxConnections: process.env.ALLOYDB_MAX_CONNECTIONS ? parseInt(process.env.ALLOYDB_MAX_CONNECTIONS, 10) : 10, idleTimeoutMillis: 30000, // 30 seconds }; } /** * Initialize the connection pool (singleton pattern) */ async function initializePool(): Promise { if (pool) { return pool; } const config = await loadConfig(); pool = new Pool({ host: config.host, port: config.port, user: config.user, password: config.password, database: config.database, ssl: config.ssl ? { rejectUnauthorized: false } : false, max: config.maxConnections, idleTimeoutMillis: config.idleTimeoutMillis, connectionTimeoutMillis: 10000, // 10 seconds to establish connection }); // Log pool errors pool.on('error', (err) => { console.error('[AlloyDB Pool] Unexpected error on idle client:', err); }); console.log(`[AlloyDB] Connection pool initialized (max: ${config.maxConnections})`); return pool; } /** * Get a pooled client for AlloyDB * * This is the primary interface for accessing AlloyDB from Next.js API routes. * The pool handles connection reuse automatically. * * @returns Pool instance for executing queries * * @example * ```typescript * const pool = await getAlloyDbClient(); * const result = await pool.query('SELECT * FROM knowledge_chunks WHERE project_id = $1', [projectId]); * ``` */ export async function getAlloyDbClient(): Promise { if (!pool) { return await initializePool(); } return pool; } /** * Execute a query with automatic error logging * * @param text SQL query text (use $1, $2, etc. for parameters) * @param params Query parameters * @returns Query result */ export async function executeQuery( text: string, params?: any[] ): Promise> { const client = await getAlloyDbClient(); try { const result = await client.query(text, params); return result; } catch (error) { console.error('[AlloyDB Query Error]', { query: text, params, error: error instanceof Error ? error.message : String(error), }); throw error; } } /** * Get a dedicated client from the pool for transactions * * Remember to call client.release() when done! * * @example * ```typescript * const client = await getPooledClient(); * try { * await client.query('BEGIN'); * await client.query('INSERT INTO ...'); * await client.query('COMMIT'); * } catch (e) { * await client.query('ROLLBACK'); * throw e; * } finally { * client.release(); * } * ``` */ export async function getPooledClient(): Promise { const pool = await getAlloyDbClient(); return await pool.connect(); } /** * Close the connection pool gracefully * * Should be called during application shutdown (e.g., in tests or cleanup) */ export async function closeAlloyDbPool(): Promise { if (pool) { await pool.end(); pool = null; console.log('[AlloyDB] Connection pool closed'); } } /** * Health check - verify AlloyDB connection is working */ export async function checkAlloyDbHealth(): Promise { try { const result = await executeQuery('SELECT 1 as health_check'); return result.rows.length > 0 && result.rows[0].health_check === 1; } catch (error) { console.error('[AlloyDB Health Check] Failed:', error); return false; } }