feat: decoupled training telemetry microservice

This commit is contained in:
2026-06-10 15:11:26 -07:00
parent 3c0a6860fc
commit 4f76b0f3b7
3 changed files with 106 additions and 117 deletions

View File

@@ -1,5 +1,6 @@
import { GoogleGenAI } from "@google/genai";
import fs from "fs";
import { logTrainingTelemetryDb } from "./telemetry-db";
const GEMINI_API_KEY = process.env.GOOGLE_API_KEY || "";
const GEMINI_MODEL = process.env.VIBN_CHAT_MODEL || "gemini-3.1-pro-preview";
@@ -140,11 +141,13 @@ export async function callGeminiChat(opts: {
);
console.log("========================================================\n");
const startTime = Date.now();
const response = await ai.models.generateContent({
model: GEMINI_MODEL,
contents: toGeminiContents(opts.messages),
config,
});
const durationMs = Date.now() - startTime;
console.log("\n========================================================");
console.log("⬅️ [GEMINI API RESPONSE]");
@@ -204,6 +207,19 @@ export async function callGeminiChat(opts: {
text = thoughts.trim();
}
logTrainingTelemetryDb({
model: GEMINI_MODEL,
systemPrompt: opts.systemPrompt,
messages: opts.messages,
response: { text, thoughts, toolCalls },
metrics: {
promptTokens: response.usageMetadata?.promptTokenCount,
completionTokens: response.usageMetadata?.candidatesTokenCount,
totalTokens: response.usageMetadata?.totalTokenCount,
durationMs,
},
});
return {
text,
thoughts,
@@ -251,6 +267,7 @@ export async function* streamGeminiChat(opts: {
);
console.log("========================================================\n");
const startTime = Date.now();
const streamResult = await ai.models.generateContentStream({
model: GEMINI_MODEL,
contents: toGeminiContents(opts.messages),
@@ -261,16 +278,38 @@ export async function* streamGeminiChat(opts: {
const isPartThought = (p: Record<string, unknown>) =>
Boolean(p.thought || p.thoughtSignature);
let text = "";
let thoughts = "";
const toolCalls: any[] = [];
for await (const chunk of streamResult) {
const parts = chunk.candidates?.[0]?.content?.parts ?? [];
for (const part of parts) {
if (part.text) {
yield isPartThought(part as Record<string, unknown>)
? { type: "thinking", text: part.text }
: { type: "text", text: part.text };
if (isPartThought(part as Record<string, unknown>)) {
thoughts += part.text;
yield { type: "thinking", text: part.text };
} else {
text += part.text;
yield { type: "text", text: part.text };
}
}
if (part.functionCall) {
toolCalls.push(part.functionCall);
}
}
}
const durationMs = Date.now() - startTime;
logTrainingTelemetryDb({
model: GEMINI_MODEL,
systemPrompt: opts.systemPrompt,
messages: opts.messages,
response: { text, thoughts, toolCalls },
metrics: {
durationMs,
},
});
yield { type: "done" };
} catch (error) {

View File

@@ -0,0 +1,46 @@
export interface TelemetryPayload {
projectId?: string;
model: string;
systemPrompt: string;
messages: any[];
response: {
text: string;
thoughts: string;
toolCalls: any[];
};
metrics: {
promptTokens?: number;
completionTokens?: number;
totalTokens?: number;
durationMs: number;
};
}
// Fire and forget function to send telemetry to our Coolify Microservice
export function logTrainingTelemetryDb(data: TelemetryPayload) {
setTimeout(async () => {
try {
const telemetryUrl = process.env.TELEMETRY_SERVICE_URL;
if (!telemetryUrl) {
console.warn(
"[Telemetry] TELEMETRY_SERVICE_URL is not set. Skipping log.",
);
return;
}
await fetch(`${telemetryUrl}/ingest`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(data),
});
} catch (error) {
console.error(
"[Telemetry] Failed to send training data to microservice:",
error,
);
}
}, 0);
}

View File

@@ -1,117 +1,21 @@
-- =====================================================================
-- knowledge_chunks table: Stores chunked content with vector embeddings
-- =====================================================================
--
-- This table stores semantic chunks of knowledge_items for vector search.
-- Each chunk is embedded using an LLM embedding model (e.g., Gemini embeddings)
-- and stored with pgvector for efficient similarity search.
--
-- Prerequisites:
-- 1. Enable pgvector extension: CREATE EXTENSION IF NOT EXISTS vector;
-- 2. Enable uuid generation: CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
--
-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- Create the knowledge_chunks table
CREATE TABLE IF NOT EXISTS knowledge_chunks (
-- Primary key (UUID auto-generated)
-- Add the telemetry table script alongside your existing db scripts
CREATE TABLE IF NOT EXISTS agent_telemetry (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- References to parent entities (Firestore IDs stored as TEXT)
project_id TEXT NOT NULL,
knowledge_item_id TEXT NOT NULL,
-- Chunk metadata
chunk_index INT NOT NULL,
content TEXT NOT NULL,
-- Vector embedding (768 dimensions for Gemini text-embedding-004)
-- NOTE: OpenAI embeddings use 1536 dims, but Gemini uses 768
embedding VECTOR(768) NOT NULL,
-- Source and importance metadata (optional, from knowledge_items)
source_type TEXT,
importance TEXT CHECK (importance IN ('primary', 'supporting', 'irrelevant') OR importance IS NULL),
-- Timestamps
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
project_id VARCHAR(255),
model_used VARCHAR(255) NOT NULL,
system_prompt TEXT NOT NULL,
chat_history JSONB NOT NULL,
response_text TEXT,
response_thoughts TEXT,
tool_calls JSONB,
prompt_tokens INTEGER,
completion_tokens INTEGER,
total_tokens INTEGER,
duration_ms INTEGER NOT NULL
);
-- =====================================================================
-- Indexes for efficient querying
-- =====================================================================
-- Standard indexes for filtering by project and knowledge_item
CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_project_id
ON knowledge_chunks (project_id);
CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_knowledge_item_id
ON knowledge_chunks (knowledge_item_id);
-- Composite index for project + knowledge_item queries
CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_project_knowledge
ON knowledge_chunks (project_id, knowledge_item_id);
-- Index for chunk ordering within a knowledge_item
CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_item_index
ON knowledge_chunks (knowledge_item_id, chunk_index);
-- Vector similarity index using IVFFlat (pgvector)
-- This enables fast approximate nearest neighbor search
-- The 'lists' parameter controls the number of clusters (tune based on data size)
-- For < 100k rows, lists=100 is reasonable. Scale up for larger datasets.
-- Using cosine distance (vector_cosine_ops) for semantic similarity
CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_embedding
ON knowledge_chunks
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
-- Alternative: Use HNSW index for better recall at higher cost
-- Uncomment if you prefer HNSW over IVFFlat:
-- CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_embedding_hnsw
-- ON knowledge_chunks
-- USING hnsw (embedding vector_cosine_ops)
-- WITH (m = 16, ef_construction = 64);
-- =====================================================================
-- Optional: Trigger to auto-update updated_at timestamp
-- =====================================================================
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_knowledge_chunks_updated_at
BEFORE UPDATE ON knowledge_chunks
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
-- =====================================================================
-- Helpful queries for monitoring and debugging
-- =====================================================================
-- Count chunks per project
-- SELECT project_id, COUNT(*) as chunk_count FROM knowledge_chunks GROUP BY project_id;
-- Count chunks per knowledge_item
-- SELECT knowledge_item_id, COUNT(*) as chunk_count FROM knowledge_chunks GROUP BY knowledge_item_id;
-- Find chunks similar to a query vector (example)
-- SELECT id, content, 1 - (embedding <=> '[0.1, 0.2, ...]') AS similarity
-- FROM knowledge_chunks
-- WHERE project_id = 'your-project-id'
-- ORDER BY embedding <=> '[0.1, 0.2, ...]'
-- LIMIT 10;
-- Check index usage
-- SELECT schemaname, tablename, indexname, idx_scan, idx_tup_read, idx_tup_fetch
-- FROM pg_stat_user_indexes
-- WHERE tablename = 'knowledge_chunks';
-- Index for fast querying by project
CREATE INDEX IF NOT EXISTS idx_agent_telemetry_project ON agent_telemetry(project_id);
-- Index for chronological sorting
CREATE INDEX IF NOT EXISTS idx_agent_telemetry_created_at ON agent_telemetry(created_at DESC);