Files
vibn-frontend/lib/db/alloydb.ts

221 lines
5.9 KiB
TypeScript

/**
* AlloyDB PostgreSQL client for vector semantic memory
*
* Uses pgvector extension for semantic search over knowledge_chunks.
* Connection pooling ensures efficient resource usage in Next.js API routes.
*/
import { Pool, PoolClient, QueryResult, QueryResultRow } from 'pg';
import { GoogleAuth } from 'google-auth-library';
let pool: Pool | null = null;
let cachedToken: { token: string; expiresAt: number } | null = null;
interface AlloyDBConfig {
host: string;
port: number;
user: string;
password: string;
database: string;
ssl?: boolean;
maxConnections?: number;
idleTimeoutMillis?: number;
}
/**
* Get an OAuth2 access token for IAM authentication
*/
async function getAccessToken(): Promise<string> {
// Check if we have a cached token that's still valid (with 5 min buffer)
if (cachedToken && cachedToken.expiresAt > Date.now() + 5 * 60 * 1000) {
return cachedToken.token;
}
const auth = new GoogleAuth({
scopes: ['https://www.googleapis.com/auth/cloud-platform'],
});
const client = await auth.getClient();
const tokenResponse = await client.getAccessToken();
if (!tokenResponse.token) {
throw new Error('Failed to get access token for AlloyDB IAM authentication');
}
// Cache the token (Google tokens typically expire in 1 hour)
cachedToken = {
token: tokenResponse.token,
expiresAt: Date.now() + 55 * 60 * 1000, // 55 minutes (safe buffer)
};
return tokenResponse.token;
}
/**
* Load AlloyDB configuration from environment variables
*/
async function loadConfig(): Promise<AlloyDBConfig> {
const host = process.env.ALLOYDB_HOST;
const port = process.env.ALLOYDB_PORT;
const user = process.env.ALLOYDB_USER;
const password = process.env.ALLOYDB_PASSWORD;
const database = process.env.ALLOYDB_DATABASE;
if (!host || !port || !user || !database) {
throw new Error(
'Missing required AlloyDB environment variables. Required: ALLOYDB_HOST, ALLOYDB_PORT, ALLOYDB_USER, ALLOYDB_DATABASE'
);
}
// When using AlloyDB Auth Proxy, no password is needed (proxy handles IAM)
// For direct connections with IAM, generate an OAuth token
let finalPassword = password;
if (!finalPassword && !host.startsWith('/')) {
// Only generate token for direct IP connections, not Unix sockets
finalPassword = await getAccessToken();
}
return {
host,
port: parseInt(port, 10),
user,
password: finalPassword || '', // Empty string for proxy connections
database,
ssl: process.env.ALLOYDB_SSL ? process.env.ALLOYDB_SSL !== 'false' : false, // Enable SSL if set to anything other than 'false'
maxConnections: process.env.ALLOYDB_MAX_CONNECTIONS
? parseInt(process.env.ALLOYDB_MAX_CONNECTIONS, 10)
: 10,
idleTimeoutMillis: 30000, // 30 seconds
};
}
/**
* Initialize the connection pool (singleton pattern)
*/
async function initializePool(): Promise<Pool> {
if (pool) {
return pool;
}
const config = await loadConfig();
pool = new Pool({
host: config.host,
port: config.port,
user: config.user,
password: config.password,
database: config.database,
ssl: config.ssl ? { rejectUnauthorized: false } : false,
max: config.maxConnections,
idleTimeoutMillis: config.idleTimeoutMillis,
connectionTimeoutMillis: 10000, // 10 seconds to establish connection
});
// Log pool errors
pool.on('error', (err) => {
console.error('[AlloyDB Pool] Unexpected error on idle client:', err);
});
console.log(`[AlloyDB] Connection pool initialized (max: ${config.maxConnections})`);
return pool;
}
/**
* Get a pooled client for AlloyDB
*
* This is the primary interface for accessing AlloyDB from Next.js API routes.
* The pool handles connection reuse automatically.
*
* @returns Pool instance for executing queries
*
* @example
* ```typescript
* const pool = await getAlloyDbClient();
* const result = await pool.query('SELECT * FROM knowledge_chunks WHERE project_id = $1', [projectId]);
* ```
*/
export async function getAlloyDbClient(): Promise<Pool> {
if (!pool) {
return await initializePool();
}
return pool;
}
/**
* Execute a query with automatic error logging
*
* @param text SQL query text (use $1, $2, etc. for parameters)
* @param params Query parameters
* @returns Query result
*/
export async function executeQuery<T extends QueryResultRow = any>(
text: string,
params?: any[]
): Promise<QueryResult<T>> {
const client = await getAlloyDbClient();
try {
const result = await client.query<T>(text, params);
return result;
} catch (error) {
console.error('[AlloyDB Query Error]', {
query: text,
params,
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
}
/**
* Get a dedicated client from the pool for transactions
*
* Remember to call client.release() when done!
*
* @example
* ```typescript
* const client = await getPooledClient();
* try {
* await client.query('BEGIN');
* await client.query('INSERT INTO ...');
* await client.query('COMMIT');
* } catch (e) {
* await client.query('ROLLBACK');
* throw e;
* } finally {
* client.release();
* }
* ```
*/
export async function getPooledClient(): Promise<PoolClient> {
const pool = await getAlloyDbClient();
return await pool.connect();
}
/**
* Close the connection pool gracefully
*
* Should be called during application shutdown (e.g., in tests or cleanup)
*/
export async function closeAlloyDbPool(): Promise<void> {
if (pool) {
await pool.end();
pool = null;
console.log('[AlloyDB] Connection pool closed');
}
}
/**
* Health check - verify AlloyDB connection is working
*/
export async function checkAlloyDbHealth(): Promise<boolean> {
try {
const result = await executeQuery('SELECT 1 as health_check');
return result.rows.length > 0 && result.rows[0].health_check === 1;
} catch (error) {
console.error('[AlloyDB Health Check] Failed:', error);
return false;
}
}