VIBN Frontend for Coolify deployment
This commit is contained in:
220
lib/db/alloydb.ts
Normal file
220
lib/db/alloydb.ts
Normal file
@@ -0,0 +1,220 @@
|
||||
/**
|
||||
* AlloyDB PostgreSQL client for vector semantic memory
|
||||
*
|
||||
* Uses pgvector extension for semantic search over knowledge_chunks.
|
||||
* Connection pooling ensures efficient resource usage in Next.js API routes.
|
||||
*/
|
||||
|
||||
import { Pool, PoolClient, QueryResult, QueryResultRow } from 'pg';
|
||||
import { GoogleAuth } from 'google-auth-library';
|
||||
|
||||
let pool: Pool | null = null;
|
||||
let cachedToken: { token: string; expiresAt: number } | null = null;
|
||||
|
||||
interface AlloyDBConfig {
|
||||
host: string;
|
||||
port: number;
|
||||
user: string;
|
||||
password: string;
|
||||
database: string;
|
||||
ssl?: boolean;
|
||||
maxConnections?: number;
|
||||
idleTimeoutMillis?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an OAuth2 access token for IAM authentication
|
||||
*/
|
||||
async function getAccessToken(): Promise<string> {
|
||||
// Check if we have a cached token that's still valid (with 5 min buffer)
|
||||
if (cachedToken && cachedToken.expiresAt > Date.now() + 5 * 60 * 1000) {
|
||||
return cachedToken.token;
|
||||
}
|
||||
|
||||
const auth = new GoogleAuth({
|
||||
scopes: ['https://www.googleapis.com/auth/cloud-platform'],
|
||||
});
|
||||
|
||||
const client = await auth.getClient();
|
||||
const tokenResponse = await client.getAccessToken();
|
||||
|
||||
if (!tokenResponse.token) {
|
||||
throw new Error('Failed to get access token for AlloyDB IAM authentication');
|
||||
}
|
||||
|
||||
// Cache the token (Google tokens typically expire in 1 hour)
|
||||
cachedToken = {
|
||||
token: tokenResponse.token,
|
||||
expiresAt: Date.now() + 55 * 60 * 1000, // 55 minutes (safe buffer)
|
||||
};
|
||||
|
||||
return tokenResponse.token;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load AlloyDB configuration from environment variables
|
||||
*/
|
||||
async function loadConfig(): Promise<AlloyDBConfig> {
|
||||
const host = process.env.ALLOYDB_HOST;
|
||||
const port = process.env.ALLOYDB_PORT;
|
||||
const user = process.env.ALLOYDB_USER;
|
||||
const password = process.env.ALLOYDB_PASSWORD;
|
||||
const database = process.env.ALLOYDB_DATABASE;
|
||||
|
||||
if (!host || !port || !user || !database) {
|
||||
throw new Error(
|
||||
'Missing required AlloyDB environment variables. Required: ALLOYDB_HOST, ALLOYDB_PORT, ALLOYDB_USER, ALLOYDB_DATABASE'
|
||||
);
|
||||
}
|
||||
|
||||
// When using AlloyDB Auth Proxy, no password is needed (proxy handles IAM)
|
||||
// For direct connections with IAM, generate an OAuth token
|
||||
let finalPassword = password;
|
||||
if (!finalPassword && !host.startsWith('/')) {
|
||||
// Only generate token for direct IP connections, not Unix sockets
|
||||
finalPassword = await getAccessToken();
|
||||
}
|
||||
|
||||
return {
|
||||
host,
|
||||
port: parseInt(port, 10),
|
||||
user,
|
||||
password: finalPassword || '', // Empty string for proxy connections
|
||||
database,
|
||||
ssl: process.env.ALLOYDB_SSL ? process.env.ALLOYDB_SSL !== 'false' : false, // Enable SSL if set to anything other than 'false'
|
||||
maxConnections: process.env.ALLOYDB_MAX_CONNECTIONS
|
||||
? parseInt(process.env.ALLOYDB_MAX_CONNECTIONS, 10)
|
||||
: 10,
|
||||
idleTimeoutMillis: 30000, // 30 seconds
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the connection pool (singleton pattern)
|
||||
*/
|
||||
async function initializePool(): Promise<Pool> {
|
||||
if (pool) {
|
||||
return pool;
|
||||
}
|
||||
|
||||
const config = await loadConfig();
|
||||
|
||||
pool = new Pool({
|
||||
host: config.host,
|
||||
port: config.port,
|
||||
user: config.user,
|
||||
password: config.password,
|
||||
database: config.database,
|
||||
ssl: config.ssl ? { rejectUnauthorized: false } : false,
|
||||
max: config.maxConnections,
|
||||
idleTimeoutMillis: config.idleTimeoutMillis,
|
||||
connectionTimeoutMillis: 10000, // 10 seconds to establish connection
|
||||
});
|
||||
|
||||
// Log pool errors
|
||||
pool.on('error', (err) => {
|
||||
console.error('[AlloyDB Pool] Unexpected error on idle client:', err);
|
||||
});
|
||||
|
||||
console.log(`[AlloyDB] Connection pool initialized (max: ${config.maxConnections})`);
|
||||
|
||||
return pool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a pooled client for AlloyDB
|
||||
*
|
||||
* This is the primary interface for accessing AlloyDB from Next.js API routes.
|
||||
* The pool handles connection reuse automatically.
|
||||
*
|
||||
* @returns Pool instance for executing queries
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const pool = await getAlloyDbClient();
|
||||
* const result = await pool.query('SELECT * FROM knowledge_chunks WHERE project_id = $1', [projectId]);
|
||||
* ```
|
||||
*/
|
||||
export async function getAlloyDbClient(): Promise<Pool> {
|
||||
if (!pool) {
|
||||
return await initializePool();
|
||||
}
|
||||
return pool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a query with automatic error logging
|
||||
*
|
||||
* @param text SQL query text (use $1, $2, etc. for parameters)
|
||||
* @param params Query parameters
|
||||
* @returns Query result
|
||||
*/
|
||||
export async function executeQuery<T extends QueryResultRow = any>(
|
||||
text: string,
|
||||
params?: any[]
|
||||
): Promise<QueryResult<T>> {
|
||||
const client = await getAlloyDbClient();
|
||||
try {
|
||||
const result = await client.query<T>(text, params);
|
||||
return result;
|
||||
} catch (error) {
|
||||
console.error('[AlloyDB Query Error]', {
|
||||
query: text,
|
||||
params,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a dedicated client from the pool for transactions
|
||||
*
|
||||
* Remember to call client.release() when done!
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const client = await getPooledClient();
|
||||
* try {
|
||||
* await client.query('BEGIN');
|
||||
* await client.query('INSERT INTO ...');
|
||||
* await client.query('COMMIT');
|
||||
* } catch (e) {
|
||||
* await client.query('ROLLBACK');
|
||||
* throw e;
|
||||
* } finally {
|
||||
* client.release();
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
export async function getPooledClient(): Promise<PoolClient> {
|
||||
const pool = await getAlloyDbClient();
|
||||
return await pool.connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the connection pool gracefully
|
||||
*
|
||||
* Should be called during application shutdown (e.g., in tests or cleanup)
|
||||
*/
|
||||
export async function closeAlloyDbPool(): Promise<void> {
|
||||
if (pool) {
|
||||
await pool.end();
|
||||
pool = null;
|
||||
console.log('[AlloyDB] Connection pool closed');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Health check - verify AlloyDB connection is working
|
||||
*/
|
||||
export async function checkAlloyDbHealth(): Promise<boolean> {
|
||||
try {
|
||||
const result = await executeQuery('SELECT 1 as health_check');
|
||||
return result.rows.length > 0 && result.rows[0].health_check === 1;
|
||||
} catch (error) {
|
||||
console.error('[AlloyDB Health Check] Failed:', error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user