221 lines
5.9 KiB
TypeScript
221 lines
5.9 KiB
TypeScript
/**
|
|
* AlloyDB PostgreSQL client for vector semantic memory
|
|
*
|
|
* Uses pgvector extension for semantic search over knowledge_chunks.
|
|
* Connection pooling ensures efficient resource usage in Next.js API routes.
|
|
*/
|
|
|
|
import { Pool, PoolClient, QueryResult, QueryResultRow } from 'pg';
|
|
import { GoogleAuth } from 'google-auth-library';
|
|
|
|
let pool: Pool | null = null;
|
|
let cachedToken: { token: string; expiresAt: number } | null = null;
|
|
|
|
interface AlloyDBConfig {
|
|
host: string;
|
|
port: number;
|
|
user: string;
|
|
password: string;
|
|
database: string;
|
|
ssl?: boolean;
|
|
maxConnections?: number;
|
|
idleTimeoutMillis?: number;
|
|
}
|
|
|
|
/**
|
|
* Get an OAuth2 access token for IAM authentication
|
|
*/
|
|
async function getAccessToken(): Promise<string> {
|
|
// Check if we have a cached token that's still valid (with 5 min buffer)
|
|
if (cachedToken && cachedToken.expiresAt > Date.now() + 5 * 60 * 1000) {
|
|
return cachedToken.token;
|
|
}
|
|
|
|
const auth = new GoogleAuth({
|
|
scopes: ['https://www.googleapis.com/auth/cloud-platform'],
|
|
});
|
|
|
|
const client = await auth.getClient();
|
|
const tokenResponse = await client.getAccessToken();
|
|
|
|
if (!tokenResponse.token) {
|
|
throw new Error('Failed to get access token for AlloyDB IAM authentication');
|
|
}
|
|
|
|
// Cache the token (Google tokens typically expire in 1 hour)
|
|
cachedToken = {
|
|
token: tokenResponse.token,
|
|
expiresAt: Date.now() + 55 * 60 * 1000, // 55 minutes (safe buffer)
|
|
};
|
|
|
|
return tokenResponse.token;
|
|
}
|
|
|
|
/**
|
|
* Load AlloyDB configuration from environment variables
|
|
*/
|
|
async function loadConfig(): Promise<AlloyDBConfig> {
|
|
const host = process.env.ALLOYDB_HOST;
|
|
const port = process.env.ALLOYDB_PORT;
|
|
const user = process.env.ALLOYDB_USER;
|
|
const password = process.env.ALLOYDB_PASSWORD;
|
|
const database = process.env.ALLOYDB_DATABASE;
|
|
|
|
if (!host || !port || !user || !database) {
|
|
throw new Error(
|
|
'Missing required AlloyDB environment variables. Required: ALLOYDB_HOST, ALLOYDB_PORT, ALLOYDB_USER, ALLOYDB_DATABASE'
|
|
);
|
|
}
|
|
|
|
// When using AlloyDB Auth Proxy, no password is needed (proxy handles IAM)
|
|
// For direct connections with IAM, generate an OAuth token
|
|
let finalPassword = password;
|
|
if (!finalPassword && !host.startsWith('/')) {
|
|
// Only generate token for direct IP connections, not Unix sockets
|
|
finalPassword = await getAccessToken();
|
|
}
|
|
|
|
return {
|
|
host,
|
|
port: parseInt(port, 10),
|
|
user,
|
|
password: finalPassword || '', // Empty string for proxy connections
|
|
database,
|
|
ssl: process.env.ALLOYDB_SSL ? process.env.ALLOYDB_SSL !== 'false' : false, // Enable SSL if set to anything other than 'false'
|
|
maxConnections: process.env.ALLOYDB_MAX_CONNECTIONS
|
|
? parseInt(process.env.ALLOYDB_MAX_CONNECTIONS, 10)
|
|
: 10,
|
|
idleTimeoutMillis: 30000, // 30 seconds
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Initialize the connection pool (singleton pattern)
|
|
*/
|
|
async function initializePool(): Promise<Pool> {
|
|
if (pool) {
|
|
return pool;
|
|
}
|
|
|
|
const config = await loadConfig();
|
|
|
|
pool = new Pool({
|
|
host: config.host,
|
|
port: config.port,
|
|
user: config.user,
|
|
password: config.password,
|
|
database: config.database,
|
|
ssl: config.ssl ? { rejectUnauthorized: false } : false,
|
|
max: config.maxConnections,
|
|
idleTimeoutMillis: config.idleTimeoutMillis,
|
|
connectionTimeoutMillis: 10000, // 10 seconds to establish connection
|
|
});
|
|
|
|
// Log pool errors
|
|
pool.on('error', (err) => {
|
|
console.error('[AlloyDB Pool] Unexpected error on idle client:', err);
|
|
});
|
|
|
|
console.log(`[AlloyDB] Connection pool initialized (max: ${config.maxConnections})`);
|
|
|
|
return pool;
|
|
}
|
|
|
|
/**
|
|
* Get a pooled client for AlloyDB
|
|
*
|
|
* This is the primary interface for accessing AlloyDB from Next.js API routes.
|
|
* The pool handles connection reuse automatically.
|
|
*
|
|
* @returns Pool instance for executing queries
|
|
*
|
|
* @example
|
|
* ```typescript
|
|
* const pool = await getAlloyDbClient();
|
|
* const result = await pool.query('SELECT * FROM knowledge_chunks WHERE project_id = $1', [projectId]);
|
|
* ```
|
|
*/
|
|
export async function getAlloyDbClient(): Promise<Pool> {
|
|
if (!pool) {
|
|
return await initializePool();
|
|
}
|
|
return pool;
|
|
}
|
|
|
|
/**
|
|
* Execute a query with automatic error logging
|
|
*
|
|
* @param text SQL query text (use $1, $2, etc. for parameters)
|
|
* @param params Query parameters
|
|
* @returns Query result
|
|
*/
|
|
export async function executeQuery<T extends QueryResultRow = any>(
|
|
text: string,
|
|
params?: any[]
|
|
): Promise<QueryResult<T>> {
|
|
const client = await getAlloyDbClient();
|
|
try {
|
|
const result = await client.query<T>(text, params);
|
|
return result;
|
|
} catch (error) {
|
|
console.error('[AlloyDB Query Error]', {
|
|
query: text,
|
|
params,
|
|
error: error instanceof Error ? error.message : String(error),
|
|
});
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get a dedicated client from the pool for transactions
|
|
*
|
|
* Remember to call client.release() when done!
|
|
*
|
|
* @example
|
|
* ```typescript
|
|
* const client = await getPooledClient();
|
|
* try {
|
|
* await client.query('BEGIN');
|
|
* await client.query('INSERT INTO ...');
|
|
* await client.query('COMMIT');
|
|
* } catch (e) {
|
|
* await client.query('ROLLBACK');
|
|
* throw e;
|
|
* } finally {
|
|
* client.release();
|
|
* }
|
|
* ```
|
|
*/
|
|
export async function getPooledClient(): Promise<PoolClient> {
|
|
const pool = await getAlloyDbClient();
|
|
return await pool.connect();
|
|
}
|
|
|
|
/**
|
|
* Close the connection pool gracefully
|
|
*
|
|
* Should be called during application shutdown (e.g., in tests or cleanup)
|
|
*/
|
|
export async function closeAlloyDbPool(): Promise<void> {
|
|
if (pool) {
|
|
await pool.end();
|
|
pool = null;
|
|
console.log('[AlloyDB] Connection pool closed');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Health check - verify AlloyDB connection is working
|
|
*/
|
|
export async function checkAlloyDbHealth(): Promise<boolean> {
|
|
try {
|
|
const result = await executeQuery('SELECT 1 as health_check');
|
|
return result.rows.length > 0 && result.rows[0].health_check === 1;
|
|
} catch (error) {
|
|
console.error('[AlloyDB Health Check] Failed:', error);
|
|
return false;
|
|
}
|
|
}
|
|
|