feat: decoupled training telemetry microservice
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import { GoogleGenAI } from "@google/genai";
|
||||
import fs from "fs";
|
||||
import { logTrainingTelemetryDb } from "./telemetry-db";
|
||||
|
||||
const GEMINI_API_KEY = process.env.GOOGLE_API_KEY || "";
|
||||
const GEMINI_MODEL = process.env.VIBN_CHAT_MODEL || "gemini-3.1-pro-preview";
|
||||
@@ -140,11 +141,13 @@ export async function callGeminiChat(opts: {
|
||||
);
|
||||
console.log("========================================================\n");
|
||||
|
||||
const startTime = Date.now();
|
||||
const response = await ai.models.generateContent({
|
||||
model: GEMINI_MODEL,
|
||||
contents: toGeminiContents(opts.messages),
|
||||
config,
|
||||
});
|
||||
const durationMs = Date.now() - startTime;
|
||||
|
||||
console.log("\n========================================================");
|
||||
console.log("⬅️ [GEMINI API RESPONSE]");
|
||||
@@ -204,6 +207,19 @@ export async function callGeminiChat(opts: {
|
||||
text = thoughts.trim();
|
||||
}
|
||||
|
||||
logTrainingTelemetryDb({
|
||||
model: GEMINI_MODEL,
|
||||
systemPrompt: opts.systemPrompt,
|
||||
messages: opts.messages,
|
||||
response: { text, thoughts, toolCalls },
|
||||
metrics: {
|
||||
promptTokens: response.usageMetadata?.promptTokenCount,
|
||||
completionTokens: response.usageMetadata?.candidatesTokenCount,
|
||||
totalTokens: response.usageMetadata?.totalTokenCount,
|
||||
durationMs,
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
text,
|
||||
thoughts,
|
||||
@@ -251,6 +267,7 @@ export async function* streamGeminiChat(opts: {
|
||||
);
|
||||
console.log("========================================================\n");
|
||||
|
||||
const startTime = Date.now();
|
||||
const streamResult = await ai.models.generateContentStream({
|
||||
model: GEMINI_MODEL,
|
||||
contents: toGeminiContents(opts.messages),
|
||||
@@ -261,16 +278,38 @@ export async function* streamGeminiChat(opts: {
|
||||
const isPartThought = (p: Record<string, unknown>) =>
|
||||
Boolean(p.thought || p.thoughtSignature);
|
||||
|
||||
let text = "";
|
||||
let thoughts = "";
|
||||
const toolCalls: any[] = [];
|
||||
|
||||
for await (const chunk of streamResult) {
|
||||
const parts = chunk.candidates?.[0]?.content?.parts ?? [];
|
||||
for (const part of parts) {
|
||||
if (part.text) {
|
||||
yield isPartThought(part as Record<string, unknown>)
|
||||
? { type: "thinking", text: part.text }
|
||||
: { type: "text", text: part.text };
|
||||
if (isPartThought(part as Record<string, unknown>)) {
|
||||
thoughts += part.text;
|
||||
yield { type: "thinking", text: part.text };
|
||||
} else {
|
||||
text += part.text;
|
||||
yield { type: "text", text: part.text };
|
||||
}
|
||||
}
|
||||
if (part.functionCall) {
|
||||
toolCalls.push(part.functionCall);
|
||||
}
|
||||
}
|
||||
}
|
||||
const durationMs = Date.now() - startTime;
|
||||
|
||||
logTrainingTelemetryDb({
|
||||
model: GEMINI_MODEL,
|
||||
systemPrompt: opts.systemPrompt,
|
||||
messages: opts.messages,
|
||||
response: { text, thoughts, toolCalls },
|
||||
metrics: {
|
||||
durationMs,
|
||||
},
|
||||
});
|
||||
|
||||
yield { type: "done" };
|
||||
} catch (error) {
|
||||
|
||||
46
lib/ai/telemetry-db.ts
Normal file
46
lib/ai/telemetry-db.ts
Normal file
@@ -0,0 +1,46 @@
|
||||
export interface TelemetryPayload {
|
||||
projectId?: string;
|
||||
model: string;
|
||||
systemPrompt: string;
|
||||
messages: any[];
|
||||
response: {
|
||||
text: string;
|
||||
thoughts: string;
|
||||
toolCalls: any[];
|
||||
};
|
||||
metrics: {
|
||||
promptTokens?: number;
|
||||
completionTokens?: number;
|
||||
totalTokens?: number;
|
||||
durationMs: number;
|
||||
};
|
||||
}
|
||||
|
||||
// Fire and forget function to send telemetry to our Coolify Microservice
|
||||
export function logTrainingTelemetryDb(data: TelemetryPayload) {
|
||||
setTimeout(async () => {
|
||||
try {
|
||||
const telemetryUrl = process.env.TELEMETRY_SERVICE_URL;
|
||||
|
||||
if (!telemetryUrl) {
|
||||
console.warn(
|
||||
"[Telemetry] TELEMETRY_SERVICE_URL is not set. Skipping log.",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
await fetch(`${telemetryUrl}/ingest`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify(data),
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(
|
||||
"[Telemetry] Failed to send training data to microservice:",
|
||||
error,
|
||||
);
|
||||
}
|
||||
}, 0);
|
||||
}
|
||||
@@ -1,117 +1,21 @@
|
||||
-- =====================================================================
|
||||
-- knowledge_chunks table: Stores chunked content with vector embeddings
|
||||
-- =====================================================================
|
||||
--
|
||||
-- This table stores semantic chunks of knowledge_items for vector search.
|
||||
-- Each chunk is embedded using an LLM embedding model (e.g., Gemini embeddings)
|
||||
-- and stored with pgvector for efficient similarity search.
|
||||
--
|
||||
-- Prerequisites:
|
||||
-- 1. Enable pgvector extension: CREATE EXTENSION IF NOT EXISTS vector;
|
||||
-- 2. Enable uuid generation: CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
|
||||
--
|
||||
|
||||
-- Enable required extensions
|
||||
CREATE EXTENSION IF NOT EXISTS vector;
|
||||
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
|
||||
|
||||
-- Create the knowledge_chunks table
|
||||
CREATE TABLE IF NOT EXISTS knowledge_chunks (
|
||||
-- Primary key (UUID auto-generated)
|
||||
-- Add the telemetry table script alongside your existing db scripts
|
||||
CREATE TABLE IF NOT EXISTS agent_telemetry (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
|
||||
-- References to parent entities (Firestore IDs stored as TEXT)
|
||||
project_id TEXT NOT NULL,
|
||||
knowledge_item_id TEXT NOT NULL,
|
||||
|
||||
-- Chunk metadata
|
||||
chunk_index INT NOT NULL,
|
||||
content TEXT NOT NULL,
|
||||
|
||||
-- Vector embedding (768 dimensions for Gemini text-embedding-004)
|
||||
-- NOTE: OpenAI embeddings use 1536 dims, but Gemini uses 768
|
||||
embedding VECTOR(768) NOT NULL,
|
||||
|
||||
-- Source and importance metadata (optional, from knowledge_items)
|
||||
source_type TEXT,
|
||||
importance TEXT CHECK (importance IN ('primary', 'supporting', 'irrelevant') OR importance IS NULL),
|
||||
|
||||
-- Timestamps
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
||||
project_id VARCHAR(255),
|
||||
model_used VARCHAR(255) NOT NULL,
|
||||
system_prompt TEXT NOT NULL,
|
||||
chat_history JSONB NOT NULL,
|
||||
response_text TEXT,
|
||||
response_thoughts TEXT,
|
||||
tool_calls JSONB,
|
||||
prompt_tokens INTEGER,
|
||||
completion_tokens INTEGER,
|
||||
total_tokens INTEGER,
|
||||
duration_ms INTEGER NOT NULL
|
||||
);
|
||||
|
||||
-- =====================================================================
|
||||
-- Indexes for efficient querying
|
||||
-- =====================================================================
|
||||
|
||||
-- Standard indexes for filtering by project and knowledge_item
|
||||
CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_project_id
|
||||
ON knowledge_chunks (project_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_knowledge_item_id
|
||||
ON knowledge_chunks (knowledge_item_id);
|
||||
|
||||
-- Composite index for project + knowledge_item queries
|
||||
CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_project_knowledge
|
||||
ON knowledge_chunks (project_id, knowledge_item_id);
|
||||
|
||||
-- Index for chunk ordering within a knowledge_item
|
||||
CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_item_index
|
||||
ON knowledge_chunks (knowledge_item_id, chunk_index);
|
||||
|
||||
-- Vector similarity index using IVFFlat (pgvector)
|
||||
-- This enables fast approximate nearest neighbor search
|
||||
-- The 'lists' parameter controls the number of clusters (tune based on data size)
|
||||
-- For < 100k rows, lists=100 is reasonable. Scale up for larger datasets.
|
||||
-- Using cosine distance (vector_cosine_ops) for semantic similarity
|
||||
CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_embedding
|
||||
ON knowledge_chunks
|
||||
USING ivfflat (embedding vector_cosine_ops)
|
||||
WITH (lists = 100);
|
||||
|
||||
-- Alternative: Use HNSW index for better recall at higher cost
|
||||
-- Uncomment if you prefer HNSW over IVFFlat:
|
||||
-- CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_embedding_hnsw
|
||||
-- ON knowledge_chunks
|
||||
-- USING hnsw (embedding vector_cosine_ops)
|
||||
-- WITH (m = 16, ef_construction = 64);
|
||||
|
||||
-- =====================================================================
|
||||
-- Optional: Trigger to auto-update updated_at timestamp
|
||||
-- =====================================================================
|
||||
CREATE OR REPLACE FUNCTION update_updated_at_column()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
NEW.updated_at = NOW();
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER update_knowledge_chunks_updated_at
|
||||
BEFORE UPDATE ON knowledge_chunks
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION update_updated_at_column();
|
||||
|
||||
-- =====================================================================
|
||||
-- Helpful queries for monitoring and debugging
|
||||
-- =====================================================================
|
||||
|
||||
-- Count chunks per project
|
||||
-- SELECT project_id, COUNT(*) as chunk_count FROM knowledge_chunks GROUP BY project_id;
|
||||
|
||||
-- Count chunks per knowledge_item
|
||||
-- SELECT knowledge_item_id, COUNT(*) as chunk_count FROM knowledge_chunks GROUP BY knowledge_item_id;
|
||||
|
||||
-- Find chunks similar to a query vector (example)
|
||||
-- SELECT id, content, 1 - (embedding <=> '[0.1, 0.2, ...]') AS similarity
|
||||
-- FROM knowledge_chunks
|
||||
-- WHERE project_id = 'your-project-id'
|
||||
-- ORDER BY embedding <=> '[0.1, 0.2, ...]'
|
||||
-- LIMIT 10;
|
||||
|
||||
-- Check index usage
|
||||
-- SELECT schemaname, tablename, indexname, idx_scan, idx_tup_read, idx_tup_fetch
|
||||
-- FROM pg_stat_user_indexes
|
||||
-- WHERE tablename = 'knowledge_chunks';
|
||||
|
||||
-- Index for fast querying by project
|
||||
CREATE INDEX IF NOT EXISTS idx_agent_telemetry_project ON agent_telemetry(project_id);
|
||||
-- Index for chronological sorting
|
||||
CREATE INDEX IF NOT EXISTS idx_agent_telemetry_created_at ON agent_telemetry(created_at DESC);
|
||||
|
||||
Reference in New Issue
Block a user