feat(agent): POST timeline events to vibn-frontend ingest API

- vibn-events-ingest.ts + emit() dual-write with session PATCH
- .env.example: VIBN_API_URL, AGENT_RUNNER_SECRET

Made-with: Cursor
This commit is contained in:
2026-04-01 11:48:57 -07:00
parent 1ff020cf53
commit 419af40ca2
7 changed files with 226 additions and 8 deletions

View File

@@ -45,5 +45,12 @@ WORKSPACE_BASE=/workspaces
# Internal URL of this service (used by spawn_agent to self-call) # Internal URL of this service (used by spawn_agent to self-call)
AGENT_RUNNER_URL=http://localhost:3333 AGENT_RUNNER_URL=http://localhost:3333
# Base URL of the vibn-frontend Next app (runner PATCHes sessions + POSTs timeline events)
# Production: https://vibnai.com (must be reachable from this container)
VIBN_API_URL=http://localhost:3000
# Same value as AGENT_RUNNER_SECRET on the Next app (ingest + session PATCH)
AGENT_RUNNER_SECRET=
# Optional: shared secret for validating Gitea webhook POSTs # Optional: shared secret for validating Gitea webhook POSTs
WEBHOOK_SECRET= WEBHOOK_SECRET=

View File

@@ -23,6 +23,13 @@ export interface SessionRunOptions {
projectId: string; projectId: string;
vibnApiUrl: string; vibnApiUrl: string;
appPath: string; appPath: string;
repoRoot?: string;
isStopped: () => boolean; isStopped: () => boolean;
autoApprove?: boolean;
giteaRepo?: string;
coolifyAppUuid?: string;
coolifyApiUrl?: string;
coolifyApiToken?: string;
theiaWorkspaceSubdir?: string;
} }
export declare function runSessionAgent(config: AgentConfig, task: string, ctx: ToolContext, opts: SessionRunOptions): Promise<void>; export declare function runSessionAgent(config: AgentConfig, task: string, ctx: ToolContext, opts: SessionRunOptions): Promise<void>;

View File

@@ -14,9 +14,12 @@
*/ */
Object.defineProperty(exports, "__esModule", { value: true }); Object.defineProperty(exports, "__esModule", { value: true });
exports.runSessionAgent = runSessionAgent; exports.runSessionAgent = runSessionAgent;
const child_process_1 = require("child_process");
const llm_1 = require("./llm"); const llm_1 = require("./llm");
const tools_1 = require("./tools"); const tools_1 = require("./tools");
const loader_1 = require("./prompts/loader"); const loader_1 = require("./prompts/loader");
const theia_exec_1 = require("./theia-exec");
const vibn_events_ingest_1 = require("./vibn-events-ingest");
const MAX_TURNS = 60; const MAX_TURNS = 60;
// ── VIBN DB bridge ──────────────────────────────────────────────────────────── // ── VIBN DB bridge ────────────────────────────────────────────────────────────
async function patchSession(opts, payload) { async function patchSession(opts, payload) {
@@ -53,13 +56,90 @@ function extractChangedFile(toolName, args, workspaceRoot, appPath) {
const fileStatus = toolName === 'write_file' ? 'added' : 'modified'; const fileStatus = toolName === 'write_file' ? 'added' : 'modified';
return { path: displayPath, status: fileStatus }; return { path: displayPath, status: fileStatus };
} }
// ── Auto-commit helper ────────────────────────────────────────────────────────
async function autoCommitAndDeploy(opts, task, emit) {
const repoRoot = opts.repoRoot;
if (!repoRoot || !opts.giteaRepo) {
await emit({ ts: now(), type: 'info', text: 'Auto-approve skipped — no repo root available.' });
return;
}
const gitOpts = { cwd: repoRoot, stdio: 'pipe' };
const giteaApiUrl = process.env.GITEA_API_URL || '';
const giteaUsername = process.env.GITEA_USERNAME || 'agent';
const giteaToken = process.env.GITEA_API_TOKEN || '';
try {
// Sync files into Theia via the sync-server so "Open in Theia" shows latest code
if (opts.giteaRepo && await (0, theia_exec_1.isTheiaSyncAvailable)()) {
await emit({ ts: now(), type: 'info', text: `Syncing to Theia…` });
const syncResult = await (0, theia_exec_1.syncRepoToTheia)(opts.giteaRepo);
if (syncResult.ok) {
await emit({ ts: now(), type: 'info', text: `✓ Theia synced (${syncResult.action}) — open theia.vibnai.com to inspect.` });
}
else {
console.warn('[session-runner] Theia sync failed:', syncResult.error);
}
}
try {
(0, child_process_1.execSync)('git config user.email "agent@vibnai.com"', gitOpts);
(0, child_process_1.execSync)('git config user.name "VIBN Agent"', gitOpts);
}
catch { /* already set */ }
(0, child_process_1.execSync)('git add -A', gitOpts);
const status = (0, child_process_1.execSync)('git status --porcelain', gitOpts).toString().trim();
if (!status) {
await emit({ ts: now(), type: 'info', text: '✓ No file changes to commit.' });
await patchSession(opts, { status: 'approved' });
return;
}
const commitMsg = `agent: ${task.slice(0, 72)}`;
(0, child_process_1.execSync)(`git commit -m ${JSON.stringify(commitMsg)}`, gitOpts);
await emit({ ts: now(), type: 'info', text: `✓ Committed: "${commitMsg}"` });
const authedUrl = `${giteaApiUrl}/${opts.giteaRepo}.git`
.replace('https://', `https://${giteaUsername}:${giteaToken}@`);
(0, child_process_1.execSync)(`git push "${authedUrl}" HEAD:main`, gitOpts);
await emit({ ts: now(), type: 'info', text: '✓ Pushed to Gitea.' });
// Optional Coolify deploy
let deployed = false;
if (opts.coolifyApiUrl && opts.coolifyApiToken && opts.coolifyAppUuid) {
try {
const deployRes = await fetch(`${opts.coolifyApiUrl}/api/v1/applications/${opts.coolifyAppUuid}/start`, { method: 'POST', headers: { Authorization: `Bearer ${opts.coolifyApiToken}` } });
deployed = deployRes.ok;
if (deployed)
await emit({ ts: now(), type: 'info', text: '✓ Deployment triggered.' });
}
catch { /* best-effort */ }
}
await patchSession(opts, {
status: 'approved',
outputLine: {
ts: now(), type: 'done',
text: `✓ Auto-committed & ${deployed ? 'deployed' : 'pushed'}. No approval needed.`,
},
});
}
catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await emit({ ts: now(), type: 'error', text: `Auto-commit failed: ${msg}` });
// Fall back to done so user can manually approve
await patchSession(opts, { status: 'done' });
}
}
// ── Main streaming execution loop ───────────────────────────────────────────── // ── Main streaming execution loop ─────────────────────────────────────────────
async function runSessionAgent(config, task, ctx, opts) { async function runSessionAgent(config, task, ctx, opts) {
const llm = (0, llm_1.createLLM)(config.model, { temperature: 0.2 }); const llm = (0, llm_1.createLLM)(config.model, { temperature: 0.2 });
const oaiTools = (0, llm_1.toOAITools)(config.tools); const oaiTools = (0, llm_1.toOAITools)(config.tools);
const emit = async (line) => { const emit = async (line) => {
console.log(`[session ${opts.sessionId}] ${line.type}: ${line.text}`); console.log(`[session ${opts.sessionId}] ${line.type}: ${line.text}`);
await patchSession(opts, { outputLine: line }); await Promise.all([
patchSession(opts, { outputLine: line }),
(0, vibn_events_ingest_1.ingestSessionEvents)(opts.vibnApiUrl, opts.projectId, opts.sessionId, [
{
type: `output.${line.type}`,
payload: { text: line.text },
ts: line.ts,
},
]),
]);
}; };
await emit({ ts: now(), type: 'info', text: `Agent starting (${llm.modelId}) — working in ${opts.appPath}` }); await emit({ ts: now(), type: 'info', text: `Agent starting (${llm.modelId}) — working in ${opts.appPath}` });
// Scope the system prompt to the specific app within the monorepo // Scope the system prompt to the specific app within the monorepo
@@ -70,7 +150,7 @@ async function runSessionAgent(config, task, ctx, opts) {
You are working inside the monorepo directory: ${opts.appPath} You are working inside the monorepo directory: ${opts.appPath}
All file paths you use should be relative to this directory unless otherwise specified. All file paths you use should be relative to this directory unless otherwise specified.
When running commands, always cd into ${opts.appPath} first unless already there. When running commands, always cd into ${opts.appPath} first unless already there.
When you are done, do NOT commit directly — leave the changes uncommitted so the user can review and approve them. Do NOT run git commit or git push — the platform handles committing after you finish.
`; `;
const history = [ const history = [
{ role: 'user', content: task } { role: 'user', content: task }
@@ -127,8 +207,24 @@ When you are done, do NOT commit directly — leave the changes uncommitted so t
await emit({ ts: now(), type: 'step', text: stepLabel }); await emit({ ts: now(), type: 'step', text: stepLabel });
let result; let result;
try { try {
// Route execute_command through Theia when available so npm/node
// commands run inside Theia's persistent dev environment
if (fnName === 'execute_command' && (0, theia_exec_1.isTheiaAvailable)()) {
const command = String(fnArgs.command ?? '');
const subCwd = fnArgs.working_directory
? `${opts.theiaWorkspaceSubdir ?? ''}/${fnArgs.working_directory}`.replace(/\/+/g, '/')
: opts.theiaWorkspaceSubdir ?? undefined;
result = await (0, theia_exec_1.theiaExec)(command, subCwd ? `${process.env.THEIA_WORKSPACE ?? '/home/node/workspace'}/${subCwd}` : undefined);
if (result?.error && result?.exitCode !== 0) {
// Fallback to local execution if Theia exec fails
console.warn('[session-runner] Theia exec failed, falling back to local:', result.error);
result = await (0, tools_1.executeTool)(fnName, fnArgs, ctx); result = await (0, tools_1.executeTool)(fnName, fnArgs, ctx);
} }
}
else {
result = await (0, tools_1.executeTool)(fnName, fnArgs, ctx);
}
}
catch (err) { catch (err) {
result = { error: err instanceof Error ? err.message : String(err) }; result = { error: err instanceof Error ? err.message : String(err) };
} }
@@ -168,10 +264,15 @@ When you are done, do NOT commit directly — leave the changes uncommitted so t
finalText = `Hit the ${MAX_TURNS}-turn limit. Stopping.`; finalText = `Hit the ${MAX_TURNS}-turn limit. Stopping.`;
} }
await emit({ ts: now(), type: 'done', text: finalText }); await emit({ ts: now(), type: 'done', text: finalText });
if (opts.autoApprove) {
await autoCommitAndDeploy(opts, task, emit);
}
else {
await patchSession(opts, { await patchSession(opts, {
status: 'done', status: 'done',
outputLine: { ts: now(), type: 'done', text: '✓ Complete — review changes and approve to commit.' } outputLine: { ts: now(), type: 'done', text: '✓ Complete — review changes and approve to commit.' },
}); });
}
} }
// ── Step label helpers ──────────────────────────────────────────────────────── // ── Step label helpers ────────────────────────────────────────────────────────
function buildStepLabel(tool, args) { function buildStepLabel(tool, args) {

6
dist/vibn-events-ingest.d.ts vendored Normal file
View File

@@ -0,0 +1,6 @@
export interface IngestEventInput {
type: string;
payload?: Record<string, unknown>;
ts?: string;
}
export declare function ingestSessionEvents(vibnApiUrl: string, projectId: string, sessionId: string, events: IngestEventInput[]): Promise<void>;

39
dist/vibn-events-ingest.js vendored Normal file
View File

@@ -0,0 +1,39 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.ingestSessionEvents = ingestSessionEvents;
/**
* Push structured timeline events to vibn-frontend (Postgres via ingest API).
* Complements PATCH output lines — enables SSE replay without polling every line.
*/
const crypto_1 = require("crypto");
async function ingestSessionEvents(vibnApiUrl, projectId, sessionId, events) {
if (events.length === 0)
return;
const secret = process.env.AGENT_RUNNER_SECRET ?? '';
const base = vibnApiUrl.replace(/\/$/, '');
const url = `${base}/api/projects/${projectId}/agent/sessions/${sessionId}/events`;
try {
const res = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-agent-runner-secret': secret,
},
body: JSON.stringify({
events: events.map((e) => ({
clientEventId: (0, crypto_1.randomUUID)(),
ts: e.ts ?? new Date().toISOString(),
type: e.type,
payload: e.payload ?? {},
})),
}),
});
if (!res.ok) {
const t = await res.text();
console.warn('[ingest-events]', res.status, t.slice(0, 240));
}
}
catch (err) {
console.warn('[ingest-events]', err instanceof Error ? err.message : err);
}
}

View File

@@ -18,6 +18,7 @@ import { AgentConfig } from './agents';
import { executeTool, ToolContext } from './tools'; import { executeTool, ToolContext } from './tools';
import { resolvePrompt } from './prompts/loader'; import { resolvePrompt } from './prompts/loader';
import { isTheiaAvailable, theiaExec, syncRepoToTheia, isTheiaSyncAvailable } from './theia-exec'; import { isTheiaAvailable, theiaExec, syncRepoToTheia, isTheiaSyncAvailable } from './theia-exec';
import { ingestSessionEvents } from './vibn-events-ingest';
const MAX_TURNS = 60; const MAX_TURNS = 60;
@@ -191,7 +192,16 @@ export async function runSessionAgent(
const emit = async (line: OutputLine) => { const emit = async (line: OutputLine) => {
console.log(`[session ${opts.sessionId}] ${line.type}: ${line.text}`); console.log(`[session ${opts.sessionId}] ${line.type}: ${line.text}`);
await patchSession(opts, { outputLine: line }); await Promise.all([
patchSession(opts, { outputLine: line }),
ingestSessionEvents(opts.vibnApiUrl, opts.projectId, opts.sessionId, [
{
type: `output.${line.type}`,
payload: { text: line.text },
ts: line.ts,
},
]),
]);
}; };
await emit({ ts: now(), type: 'info', text: `Agent starting (${llm.modelId}) — working in ${opts.appPath}` }); await emit({ ts: now(), type: 'info', text: `Agent starting (${llm.modelId}) — working in ${opts.appPath}` });

48
src/vibn-events-ingest.ts Normal file
View File

@@ -0,0 +1,48 @@
/**
* Push structured timeline events to vibn-frontend (Postgres via ingest API).
* Complements PATCH output lines — enables SSE replay without polling every line.
*/
import { randomUUID } from 'crypto';
export interface IngestEventInput {
type: string;
payload?: Record<string, unknown>;
ts?: string;
}
export async function ingestSessionEvents(
vibnApiUrl: string,
projectId: string,
sessionId: string,
events: IngestEventInput[]
): Promise<void> {
if (events.length === 0) return;
const secret = process.env.AGENT_RUNNER_SECRET ?? '';
const base = vibnApiUrl.replace(/\/$/, '');
const url = `${base}/api/projects/${projectId}/agent/sessions/${sessionId}/events`;
try {
const res = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-agent-runner-secret': secret,
},
body: JSON.stringify({
events: events.map((e) => ({
clientEventId: randomUUID(),
ts: e.ts ?? new Date().toISOString(),
type: e.type,
payload: e.payload ?? {},
})),
}),
});
if (!res.ok) {
const t = await res.text();
console.warn('[ingest-events]', res.status, t.slice(0, 240));
}
} catch (err) {
console.warn('[ingest-events]', err instanceof Error ? err.message : err);
}
}