605 lines
18 KiB
TypeScript
605 lines
18 KiB
TypeScript
/**
|
||
* agent-session-runner.ts
|
||
*
|
||
* Upgraded Cloud Agent Executor for VibnCode.
|
||
* Implements 4-level Smart Concurrency (parallel reads/lookups) and the
|
||
* Ralph Loop (autonomous self-correction) entirely inside your secure Cloud VM.
|
||
*/
|
||
|
||
import { execSync } from "child_process";
|
||
import { callVibnChat } from "./llm/vibn-chat-model";
|
||
import { ChatMessage } from "./llm/gemini-chat";
|
||
import { AgentConfig } from "./agents";
|
||
import { executeTool, ToolContext } from "./tools";
|
||
import { resolvePrompt } from "./prompts/loader";
|
||
|
||
const MAX_TURNS = 45;
|
||
|
||
function runBuildVerification(
|
||
repoRoot: string,
|
||
appPath: string,
|
||
): { success: boolean; error?: string } {
|
||
const fs = require("fs") as typeof import("fs");
|
||
const path = require("path") as typeof import("path");
|
||
const { execSync } = require("child_process");
|
||
|
||
const absoluteAppPath = path.join(repoRoot, appPath);
|
||
const pkgJsonPath = path.join(absoluteAppPath, "package.json");
|
||
|
||
if (!fs.existsSync(pkgJsonPath)) {
|
||
return { success: true }; // No package.json, skip build check
|
||
}
|
||
|
||
try {
|
||
const pkg = JSON.parse(fs.readFileSync(pkgJsonPath, "utf8"));
|
||
// Only verify if there is an explicit build script
|
||
if (!pkg.scripts || !pkg.scripts.build) {
|
||
return { success: true };
|
||
}
|
||
|
||
console.log(
|
||
`[Ralph Loop] Running automatic build verification: npm run build inside ${absoluteAppPath}...`,
|
||
);
|
||
// Run npm run build with a 45s timeout to prevent hanging
|
||
execSync("npm run build", {
|
||
cwd: absoluteAppPath,
|
||
stdio: "pipe",
|
||
timeout: 45000,
|
||
});
|
||
return { success: true };
|
||
} catch (err: any) {
|
||
const stderr = err.stderr
|
||
? err.stderr.toString()
|
||
: err.message || String(err);
|
||
console.warn(`[Ralph Loop] Build verification failed:`, stderr);
|
||
return {
|
||
success: false,
|
||
error: stderr.slice(-3000), // Cap the log length to avoid flooding the prompt context
|
||
};
|
||
}
|
||
}
|
||
|
||
export interface OutputLine {
|
||
ts: string;
|
||
type: "step" | "stdout" | "stderr" | "info" | "error" | "done";
|
||
text: string;
|
||
}
|
||
|
||
export interface SessionRunOptions {
|
||
sessionId: string;
|
||
projectId: string;
|
||
vibnApiUrl: string; // e.g. https://vibnai.com
|
||
appPath: string; // relative path within repo, e.g. "apps/admin"
|
||
repoRoot?: string; // absolute path to the git repo root (for auto-commit)
|
||
isStopped: () => boolean;
|
||
autoApprove?: boolean;
|
||
giteaRepo?: string; // e.g. "mark/sportsy"
|
||
coolifyAppUuid?: string;
|
||
coolifyApiUrl?: string;
|
||
coolifyApiToken?: string;
|
||
}
|
||
|
||
// ── VIBN DB bridge ────────────────────────────────────────────────────────────
|
||
|
||
async function patchSession(
|
||
opts: SessionRunOptions,
|
||
payload: {
|
||
status?: string;
|
||
outputLine?: OutputLine;
|
||
changedFile?: { path: string; status: string };
|
||
error?: string;
|
||
},
|
||
): Promise<void> {
|
||
const url = `${opts.vibnApiUrl}/api/projects/${opts.projectId}/agent/sessions/${opts.sessionId}`;
|
||
try {
|
||
await fetch(url, {
|
||
method: "PATCH",
|
||
headers: {
|
||
"Content-Type": "application/json",
|
||
"x-agent-runner-secret": process.env.AGENT_RUNNER_SECRET ?? "",
|
||
},
|
||
body: JSON.stringify(payload),
|
||
});
|
||
} catch (err) {
|
||
console.warn(
|
||
"[session-runner] PATCH failed:",
|
||
err instanceof Error ? err.message : err,
|
||
);
|
||
}
|
||
}
|
||
|
||
function now(): string {
|
||
return new Date().toISOString();
|
||
}
|
||
|
||
// ── File change tracking ──────────────────────────────────────────────────────
|
||
|
||
const FILE_WRITE_TOOLS = new Set([
|
||
"write_file",
|
||
"replace_in_file",
|
||
"create_file",
|
||
"fs_write",
|
||
"fs_edit",
|
||
]);
|
||
|
||
function extractChangedFile(
|
||
toolName: string,
|
||
args: Record<string, unknown>,
|
||
workspaceRoot: string,
|
||
appPath: string,
|
||
): { path: string; status: string } | null {
|
||
if (!FILE_WRITE_TOOLS.has(toolName)) return null;
|
||
const rawPath = String(args.path ?? args.file_path ?? "");
|
||
if (!rawPath) return null;
|
||
|
||
// Make path relative to appPath for display
|
||
const fullPrefix = `${workspaceRoot}/${appPath}/`;
|
||
const appPrefix = `${appPath}/`;
|
||
let displayPath = rawPath.replace(fullPrefix, "").replace(appPrefix, "");
|
||
|
||
const fileStatus =
|
||
toolName === "write_file" || toolName === "fs_write" ? "added" : "modified";
|
||
return { path: displayPath, status: fileStatus };
|
||
}
|
||
|
||
// ── Auto-commit helper ────────────────────────────────────────────────────────
|
||
|
||
async function autoCommitAndDeploy(
|
||
opts: SessionRunOptions,
|
||
task: string,
|
||
emit: (line: OutputLine) => Promise<void>,
|
||
): Promise<void> {
|
||
const repoRoot = opts.repoRoot;
|
||
if (!repoRoot || !opts.giteaRepo) {
|
||
await emit({
|
||
ts: now(),
|
||
type: "info",
|
||
text: "Auto-approve skipped — no repo root available.",
|
||
});
|
||
return;
|
||
}
|
||
|
||
const gitOpts = { cwd: repoRoot, stdio: "pipe" as const };
|
||
const giteaApiUrl = process.env.GITEA_API_URL || "";
|
||
const giteaUsername = process.env.GITEA_USERNAME || "agent";
|
||
const giteaToken = process.env.GITEA_API_TOKEN || "";
|
||
|
||
try {
|
||
try {
|
||
execSync('git config user.email "agent@vibnai.com"', gitOpts);
|
||
execSync('git config user.name "VIBN Agent"', gitOpts);
|
||
} catch {
|
||
/* already set */
|
||
}
|
||
|
||
execSync("git add -A", gitOpts);
|
||
|
||
const status = execSync("git status --porcelain", gitOpts)
|
||
.toString()
|
||
.trim();
|
||
if (!status) {
|
||
await emit({
|
||
ts: now(),
|
||
type: "info",
|
||
text: "✓ No file changes to commit.",
|
||
});
|
||
await patchSession(opts, { status: "approved" });
|
||
return;
|
||
}
|
||
|
||
const commitMsg = `agent: ${task.slice(0, 72)}`;
|
||
const msgFile = require("path").join(
|
||
opts.repoRoot || process.cwd(),
|
||
".git",
|
||
"COMMIT_EDITMSG",
|
||
);
|
||
require("fs").writeFileSync(msgFile, commitMsg, "utf8");
|
||
execSync("git commit -F .git/COMMIT_EDITMSG", gitOpts);
|
||
try {
|
||
require("fs").unlinkSync(msgFile);
|
||
} catch {}
|
||
await emit({
|
||
ts: now(),
|
||
type: "info",
|
||
text: `✓ Committed: "${commitMsg}"`,
|
||
});
|
||
|
||
const authedUrl = `${giteaApiUrl}/${opts.giteaRepo}.git`.replace(
|
||
"https://",
|
||
`https://${giteaUsername}:${giteaToken}@`,
|
||
);
|
||
execSync(`git push "${authedUrl}" HEAD:main`, gitOpts);
|
||
await emit({ ts: now(), type: "info", text: "✓ Pushed to Gitea." });
|
||
|
||
// Optional Coolify deploy
|
||
let deployed = false;
|
||
if (opts.coolifyApiUrl && opts.coolifyApiToken && opts.coolifyAppUuid) {
|
||
try {
|
||
const deployRes = await fetch(
|
||
`${opts.coolifyApiUrl}/api/v1/applications/${opts.coolifyAppUuid}/start`,
|
||
{
|
||
method: "POST",
|
||
headers: { Authorization: `Bearer ${opts.coolifyApiToken}` },
|
||
},
|
||
);
|
||
deployed = deployRes.ok;
|
||
if (deployed)
|
||
await emit({
|
||
ts: now(),
|
||
type: "info",
|
||
text: "✓ Deployment triggered.",
|
||
});
|
||
} catch {
|
||
/* best-effort */
|
||
}
|
||
}
|
||
|
||
await patchSession(opts, {
|
||
status: "approved",
|
||
outputLine: {
|
||
ts: now(),
|
||
type: "done",
|
||
text: `✓ Auto-committed & ${deployed ? "deployed" : "pushed"}. No approval needed.`,
|
||
},
|
||
});
|
||
} catch (err) {
|
||
const msg = err instanceof Error ? err.message : String(err);
|
||
await emit({
|
||
ts: now(),
|
||
type: "error",
|
||
text: `Auto-commit failed: ${msg}`,
|
||
});
|
||
// Fall back to done so user can manually approve
|
||
await patchSession(opts, { status: "done" });
|
||
}
|
||
}
|
||
|
||
// ── Main streaming execution loop ─────────────────────────────────────────────
|
||
|
||
export async function runSessionAgent(
|
||
config: AgentConfig,
|
||
task: string,
|
||
ctx: ToolContext,
|
||
opts: SessionRunOptions,
|
||
): Promise<void> {
|
||
const systemPrompt = resolvePrompt(config.promptId);
|
||
|
||
const emit = async (line: OutputLine) => {
|
||
console.log(`[session ${opts.sessionId}] ${line.type}: ${line.text}`);
|
||
await patchSession(opts, { outputLine: line });
|
||
};
|
||
|
||
await emit({
|
||
ts: now(),
|
||
type: "info",
|
||
text: `Agent starting working in ${opts.appPath}`,
|
||
});
|
||
|
||
// Scope the system prompt to the specific app within the monorepo
|
||
const basePrompt = resolvePrompt(config.promptId);
|
||
const scopedPrompt = `${basePrompt}
|
||
\n\n## Active context
|
||
You are working inside the monorepo directory: ${opts.appPath}
|
||
All file paths you use should be relative to this directory unless otherwise specified.
|
||
When running commands, always cd into ${opts.appPath} first unless already there.
|
||
Do NOT run git commit or git push — the platform handles committing after you finish.
|
||
`;
|
||
|
||
const history: ChatMessage[] = [{ role: "user", content: task }];
|
||
|
||
let turn = 0;
|
||
let toolCallsSinceText = 0;
|
||
let roundsSinceText = 0;
|
||
const toolFingerprints: string[] = [];
|
||
let loopBreakReason: string | null = null;
|
||
let ralphIteration = 0;
|
||
|
||
function fingerprintToolCall(tc: any) {
|
||
if (tc.name === "shell_exec") {
|
||
const cmd = String(tc.args?.command ?? "").trim();
|
||
const verb =
|
||
cmd
|
||
.split("&&")
|
||
.map((s) => s.trim())
|
||
.find((s) => !s.startsWith("cd "))
|
||
?.split(/\s+/)[0] ?? "shell";
|
||
return `shell_exec:${verb}`;
|
||
}
|
||
if (
|
||
tc.name === "fs_write" ||
|
||
tc.name === "fs_edit" ||
|
||
tc.name === "fs_read"
|
||
) {
|
||
return `${tc.name}:${tc.args?.path}`;
|
||
}
|
||
return `${tc.name}:${Object.values(tc.args ?? {})[0]}`;
|
||
}
|
||
|
||
while (turn < MAX_TURNS) {
|
||
if (opts.isStopped()) {
|
||
await emit({ ts: now(), type: "info", text: "Stopped by user." });
|
||
await patchSession(opts, { status: "stopped" });
|
||
return;
|
||
}
|
||
|
||
turn++;
|
||
|
||
const isSilent = roundsSinceText >= 15 || toolCallsSinceText >= 20;
|
||
const extraSystem = isSilent
|
||
? "\n\n[STATUS NUDGE] You have run " +
|
||
`${toolCallsSinceText} tool call(s) over ${roundsSinceText} round(s) ` +
|
||
"without sending the user any text. Before any more tool calls, " +
|
||
"send ONE short sentence describing what you are currently working " +
|
||
"on and why."
|
||
: "";
|
||
|
||
let resp: any;
|
||
try {
|
||
resp = await callVibnChat({
|
||
systemPrompt: scopedPrompt + extraSystem,
|
||
messages: history as any[],
|
||
tools: config.tools,
|
||
temperature: 0.2,
|
||
});
|
||
} catch (err) {
|
||
const msg = err instanceof Error ? err.message : String(err);
|
||
await emit({ ts: now(), type: "error", text: `LLM error: ${msg}` });
|
||
await patchSession(opts, { status: "failed", error: msg });
|
||
return;
|
||
}
|
||
|
||
if (resp.error) {
|
||
await emit({
|
||
ts: now(),
|
||
type: "error",
|
||
text: `LLM error: ${resp.error}`,
|
||
});
|
||
await patchSession(opts, { status: "failed", error: resp.error });
|
||
return;
|
||
}
|
||
|
||
if (resp.text) {
|
||
await emit({ ts: now(), type: "info", text: resp.text });
|
||
roundsSinceText = 0;
|
||
toolCallsSinceText = 0;
|
||
} else if (resp.toolCalls.length) {
|
||
roundsSinceText++;
|
||
toolCallsSinceText += resp.toolCalls.length;
|
||
}
|
||
|
||
// ── Self-Correcting Ralph Loop Autonomy ──
|
||
if (!resp.toolCalls.length) {
|
||
const text = resp.text || "";
|
||
const incompleteSignals = [
|
||
"I need to",
|
||
"Let me",
|
||
"Next, I should",
|
||
"I should also",
|
||
"Additionally",
|
||
"I will now",
|
||
"I need first to",
|
||
];
|
||
const needsMoreWork = incompleteSignals.some((signal) =>
|
||
text.includes(signal),
|
||
);
|
||
|
||
if (needsMoreWork && ralphIteration < 3) {
|
||
ralphIteration++;
|
||
await emit({
|
||
ts: now(),
|
||
type: "info",
|
||
text: `🔄 [Ralph Loop] Self-reflection triggered (iteration ${ralphIteration}/3). Resuming execution...`,
|
||
});
|
||
history.push({
|
||
role: "user",
|
||
content:
|
||
"Please continue implementing the outstanding next steps to complete the task.",
|
||
});
|
||
continue;
|
||
}
|
||
|
||
// ── Cloud Build Verification (Ralph Loop integration) ──
|
||
if (opts.repoRoot && ralphIteration < 3) {
|
||
await emit({
|
||
ts: now(),
|
||
type: "info",
|
||
text: "🔍 [Ralph Loop] Initiating automatic build verification...",
|
||
});
|
||
|
||
const verification = runBuildVerification(opts.repoRoot, opts.appPath);
|
||
if (!verification.success) {
|
||
ralphIteration++;
|
||
await emit({
|
||
ts: now(),
|
||
type: "error",
|
||
text: `❌ [Ralph Loop] Build verification failed (iteration ${ralphIteration}/3). Feeding compilation errors back to the model...`,
|
||
});
|
||
|
||
history.push({
|
||
role: "user",
|
||
content: `Your previous edits completed, but the project's build check failed with compilation errors. Please fix these errors immediately so the build compiles clean:\n\n\`\`\`text\n${verification.error}\n\`\`\``,
|
||
});
|
||
continue;
|
||
} else {
|
||
await emit({
|
||
ts: now(),
|
||
type: "info",
|
||
text: "🟢 [Ralph Loop] Build verification passed successfully! 0 errors.",
|
||
});
|
||
}
|
||
}
|
||
|
||
// If fully complete, trigger auto-commit and finish
|
||
if (opts.autoApprove) {
|
||
await autoCommitAndDeploy(opts, task, emit);
|
||
} else {
|
||
await patchSession(opts, { status: "completed" });
|
||
}
|
||
return;
|
||
}
|
||
|
||
for (const tc of resp.toolCalls) {
|
||
toolFingerprints.push(fingerprintToolCall(tc));
|
||
}
|
||
const window = toolFingerprints.slice(-10);
|
||
const counts = new Map<string, number>();
|
||
for (const fp of window) counts.set(fp, (counts.get(fp) ?? 0) + 1);
|
||
|
||
let maxRepeats = 0;
|
||
let repeatedCmd = "";
|
||
for (const [fp, n] of counts.entries()) {
|
||
if (n > maxRepeats) {
|
||
maxRepeats = n;
|
||
repeatedCmd = fp.split("|")[0];
|
||
}
|
||
}
|
||
|
||
if (maxRepeats >= 6) {
|
||
loopBreakReason = `Repeated ${repeatedCmd} ${maxRepeats}× in last 10 calls`;
|
||
break;
|
||
}
|
||
|
||
history.push({
|
||
role: "assistant",
|
||
content: resp.text,
|
||
toolCalls: resp.toolCalls,
|
||
});
|
||
|
||
// ── 4-Level Smart Concurrency Tool Grouping ──
|
||
const parallelReads = resp.toolCalls.filter((tc: any) =>
|
||
[
|
||
"fs_read",
|
||
"fs_tree",
|
||
"fs_list",
|
||
"fs_glob",
|
||
"fs_grep",
|
||
"projects_list",
|
||
"project_recent_errors",
|
||
].includes(tc.name),
|
||
);
|
||
const sequentialWrites = resp.toolCalls.filter((tc: any) =>
|
||
[
|
||
"fs_write",
|
||
"fs_edit",
|
||
"create_file",
|
||
"write_file",
|
||
"replace_in_file",
|
||
"apps_create",
|
||
"databases_create",
|
||
].includes(tc.name),
|
||
);
|
||
const otherTools = resp.toolCalls.filter(
|
||
(tc: any) =>
|
||
!parallelReads.includes(tc) && !sequentialWrites.includes(tc),
|
||
);
|
||
|
||
// Stage 1: Parallel Reads
|
||
if (parallelReads.length > 0) {
|
||
await emit({
|
||
ts: now(),
|
||
type: "step",
|
||
text: `Executing ${parallelReads.length} read operations concurrently...`,
|
||
});
|
||
|
||
await Promise.all(
|
||
parallelReads.map(async (tc: any) => {
|
||
let result;
|
||
try {
|
||
result = await executeTool(tc.name, tc.args, ctx);
|
||
} catch (err) {
|
||
result = {
|
||
error: err instanceof Error ? err.message : String(err),
|
||
};
|
||
}
|
||
const resultStr =
|
||
typeof result === "string"
|
||
? result
|
||
: JSON.stringify(result, null, 2);
|
||
history.push({
|
||
role: "tool",
|
||
content: resultStr,
|
||
toolCallId: tc.id,
|
||
toolName: tc.name,
|
||
});
|
||
}),
|
||
);
|
||
}
|
||
|
||
// Stage 2: Parallelizable Other Tools
|
||
if (otherTools.length > 0) {
|
||
await Promise.all(
|
||
otherTools.map(async (tc: any) => {
|
||
await emit({
|
||
ts: now(),
|
||
type: "step",
|
||
text: `Running ${tc.name}...`,
|
||
});
|
||
let result;
|
||
try {
|
||
result = await executeTool(tc.name, tc.args, ctx);
|
||
} catch (err) {
|
||
result = {
|
||
error: err instanceof Error ? err.message : String(err),
|
||
};
|
||
}
|
||
const resultStr =
|
||
typeof result === "string"
|
||
? result
|
||
: JSON.stringify(result, null, 2);
|
||
history.push({
|
||
role: "tool",
|
||
content: resultStr,
|
||
toolCallId: tc.id,
|
||
toolName: tc.name,
|
||
});
|
||
}),
|
||
);
|
||
}
|
||
|
||
// Stage 3: Sequential User-Safe Writes/Edits
|
||
if (sequentialWrites.length > 0) {
|
||
for (const tc of sequentialWrites) {
|
||
await emit({
|
||
ts: now(),
|
||
type: "step",
|
||
text: `Writing modifications: ${tc.name}...`,
|
||
});
|
||
let result;
|
||
try {
|
||
result = await executeTool(tc.name, tc.args, ctx);
|
||
const changedFile = extractChangedFile(
|
||
tc.name,
|
||
tc.args,
|
||
ctx.workspaceRoot,
|
||
opts.appPath,
|
||
);
|
||
if (changedFile) {
|
||
await patchSession(opts, { changedFile });
|
||
}
|
||
} catch (err) {
|
||
result = { error: err instanceof Error ? err.message : String(err) };
|
||
}
|
||
const resultStr =
|
||
typeof result === "string" ? result : JSON.stringify(result, null, 2);
|
||
history.push({
|
||
role: "tool",
|
||
content: resultStr,
|
||
toolCallId: tc.id,
|
||
toolName: tc.name,
|
||
});
|
||
}
|
||
}
|
||
}
|
||
|
||
if (loopBreakReason) {
|
||
await emit({
|
||
ts: now(),
|
||
type: "error",
|
||
text: `Loop broken: ${loopBreakReason}`,
|
||
});
|
||
await patchSession(opts, { status: "failed", error: loopBreakReason });
|
||
} else {
|
||
await patchSession(opts, { status: "failed", error: "Max turns reached" });
|
||
}
|
||
}
|