This repository has been archived on 2026-06-07. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
master-ai/vibn-agent-runner/src/agent-session-runner.ts

530 lines
15 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* agent-session-runner.ts
*
* Upgraded Cloud Agent Executor for VibnCode.
* Implements 4-level Smart Concurrency (parallel reads/lookups) and the
* Ralph Loop (autonomous self-correction) entirely inside your secure Cloud VM.
*/
import { execSync } from "child_process";
import { callVibnChat } from "./llm/vibn-chat-model";
import { ChatMessage } from "./llm/gemini-chat";
import { AgentConfig } from "./agents";
import { executeTool, ToolContext } from "./tools";
import { resolvePrompt } from "./prompts/loader";
const MAX_TURNS = 45;
export interface OutputLine {
ts: string;
type: "step" | "stdout" | "stderr" | "info" | "error" | "done";
text: string;
}
export interface SessionRunOptions {
sessionId: string;
projectId: string;
vibnApiUrl: string; // e.g. https://vibnai.com
appPath: string; // relative path within repo, e.g. "apps/admin"
repoRoot?: string; // absolute path to the git repo root (for auto-commit)
isStopped: () => boolean;
autoApprove?: boolean;
giteaRepo?: string; // e.g. "mark/sportsy"
coolifyAppUuid?: string;
coolifyApiUrl?: string;
coolifyApiToken?: string;
}
// ── VIBN DB bridge ────────────────────────────────────────────────────────────
async function patchSession(
opts: SessionRunOptions,
payload: {
status?: string;
outputLine?: OutputLine;
changedFile?: { path: string; status: string };
error?: string;
},
): Promise<void> {
const url = `${opts.vibnApiUrl}/api/projects/${opts.projectId}/agent/sessions/${opts.sessionId}`;
try {
await fetch(url, {
method: "PATCH",
headers: {
"Content-Type": "application/json",
"x-agent-runner-secret": process.env.AGENT_RUNNER_SECRET ?? "",
},
body: JSON.stringify(payload),
});
} catch (err) {
console.warn(
"[session-runner] PATCH failed:",
err instanceof Error ? err.message : err,
);
}
}
function now(): string {
return new Date().toISOString();
}
// ── File change tracking ──────────────────────────────────────────────────────
const FILE_WRITE_TOOLS = new Set([
"write_file",
"replace_in_file",
"create_file",
"fs_write",
"fs_edit",
]);
function extractChangedFile(
toolName: string,
args: Record<string, unknown>,
workspaceRoot: string,
appPath: string,
): { path: string; status: string } | null {
if (!FILE_WRITE_TOOLS.has(toolName)) return null;
const rawPath = String(args.path ?? args.file_path ?? "");
if (!rawPath) return null;
// Make path relative to appPath for display
const fullPrefix = `${workspaceRoot}/${appPath}/`;
const appPrefix = `${appPath}/`;
let displayPath = rawPath.replace(fullPrefix, "").replace(appPrefix, "");
const fileStatus =
toolName === "write_file" || toolName === "fs_write" ? "added" : "modified";
return { path: displayPath, status: fileStatus };
}
// ── Auto-commit helper ────────────────────────────────────────────────────────
async function autoCommitAndDeploy(
opts: SessionRunOptions,
task: string,
emit: (line: OutputLine) => Promise<void>,
): Promise<void> {
const repoRoot = opts.repoRoot;
if (!repoRoot || !opts.giteaRepo) {
await emit({
ts: now(),
type: "info",
text: "Auto-approve skipped — no repo root available.",
});
return;
}
const gitOpts = { cwd: repoRoot, stdio: "pipe" as const };
const giteaApiUrl = process.env.GITEA_API_URL || "";
const giteaUsername = process.env.GITEA_USERNAME || "agent";
const giteaToken = process.env.GITEA_API_TOKEN || "";
try {
try {
execSync('git config user.email "agent@vibnai.com"', gitOpts);
execSync('git config user.name "VIBN Agent"', gitOpts);
} catch {
/* already set */
}
execSync("git add -A", gitOpts);
const status = execSync("git status --porcelain", gitOpts)
.toString()
.trim();
if (!status) {
await emit({
ts: now(),
type: "info",
text: "✓ No file changes to commit.",
});
await patchSession(opts, { status: "approved" });
return;
}
const commitMsg = `agent: ${task.slice(0, 72)}`;
const msgFile = require("path").join(
opts.repoRoot || process.cwd(),
".git",
"COMMIT_EDITMSG",
);
require("fs").writeFileSync(msgFile, commitMsg, "utf8");
execSync("git commit -F .git/COMMIT_EDITMSG", gitOpts);
try {
require("fs").unlinkSync(msgFile);
} catch {}
await emit({
ts: now(),
type: "info",
text: `✓ Committed: "${commitMsg}"`,
});
const authedUrl = `${giteaApiUrl}/${opts.giteaRepo}.git`.replace(
"https://",
`https://${giteaUsername}:${giteaToken}@`,
);
execSync(`git push "${authedUrl}" HEAD:main`, gitOpts);
await emit({ ts: now(), type: "info", text: "✓ Pushed to Gitea." });
// Optional Coolify deploy
let deployed = false;
if (opts.coolifyApiUrl && opts.coolifyApiToken && opts.coolifyAppUuid) {
try {
const deployRes = await fetch(
`${opts.coolifyApiUrl}/api/v1/applications/${opts.coolifyAppUuid}/start`,
{
method: "POST",
headers: { Authorization: `Bearer ${opts.coolifyApiToken}` },
},
);
deployed = deployRes.ok;
if (deployed)
await emit({
ts: now(),
type: "info",
text: "✓ Deployment triggered.",
});
} catch {
/* best-effort */
}
}
await patchSession(opts, {
status: "approved",
outputLine: {
ts: now(),
type: "done",
text: `✓ Auto-committed & ${deployed ? "deployed" : "pushed"}. No approval needed.`,
},
});
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await emit({
ts: now(),
type: "error",
text: `Auto-commit failed: ${msg}`,
});
// Fall back to done so user can manually approve
await patchSession(opts, { status: "done" });
}
}
// ── Main streaming execution loop ─────────────────────────────────────────────
export async function runSessionAgent(
config: AgentConfig,
task: string,
ctx: ToolContext,
opts: SessionRunOptions,
): Promise<void> {
const systemPrompt = resolvePrompt(config.promptId);
const emit = async (line: OutputLine) => {
console.log(`[session ${opts.sessionId}] ${line.type}: ${line.text}`);
await patchSession(opts, { outputLine: line });
};
await emit({
ts: now(),
type: "info",
text: `Agent starting working in ${opts.appPath}`,
});
// Scope the system prompt to the specific app within the monorepo
const basePrompt = resolvePrompt(config.promptId);
const scopedPrompt = `${basePrompt}
\n\n## Active context
You are working inside the monorepo directory: ${opts.appPath}
All file paths you use should be relative to this directory unless otherwise specified.
When running commands, always cd into ${opts.appPath} first unless already there.
Do NOT run git commit or git push — the platform handles committing after you finish.
`;
const history: ChatMessage[] = [{ role: "user", content: task }];
let turn = 0;
let toolCallsSinceText = 0;
let roundsSinceText = 0;
const toolFingerprints: string[] = [];
let loopBreakReason: string | null = null;
let ralphIteration = 0;
function fingerprintToolCall(tc: any) {
if (tc.name === "shell_exec") {
const cmd = String(tc.args?.command ?? "").trim();
const verb =
cmd
.split("&&")
.map((s) => s.trim())
.find((s) => !s.startsWith("cd "))
?.split(/\s+/)[0] ?? "shell";
return `shell_exec:${verb}`;
}
if (
tc.name === "fs_write" ||
tc.name === "fs_edit" ||
tc.name === "fs_read"
) {
return `${tc.name}:${tc.args?.path}`;
}
return `${tc.name}:${Object.values(tc.args ?? {})[0]}`;
}
while (turn < MAX_TURNS) {
if (opts.isStopped()) {
await emit({ ts: now(), type: "info", text: "Stopped by user." });
await patchSession(opts, { status: "stopped" });
return;
}
turn++;
const isSilent = roundsSinceText >= 15 || toolCallsSinceText >= 20;
const extraSystem = isSilent
? "\n\n[STATUS NUDGE] You have run " +
`${toolCallsSinceText} tool call(s) over ${roundsSinceText} round(s) ` +
"without sending the user any text. Before any more tool calls, " +
"send ONE short sentence describing what you are currently working " +
"on and why."
: "";
let resp: any;
try {
resp = await callVibnChat({
systemPrompt: scopedPrompt + extraSystem,
messages: history as any[],
tools: config.tools,
temperature: 0.2,
});
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await emit({ ts: now(), type: "error", text: `LLM error: ${msg}` });
await patchSession(opts, { status: "failed", error: msg });
return;
}
if (resp.error) {
await emit({
ts: now(),
type: "error",
text: `LLM error: ${resp.error}`,
});
await patchSession(opts, { status: "failed", error: resp.error });
return;
}
if (resp.text) {
await emit({ ts: now(), type: "info", text: resp.text });
roundsSinceText = 0;
toolCallsSinceText = 0;
} else if (resp.toolCalls.length) {
roundsSinceText++;
toolCallsSinceText += resp.toolCalls.length;
}
// ── Self-Correcting Ralph Loop Autonomy ──
if (!resp.toolCalls.length) {
const text = resp.text || "";
const incompleteSignals = [
"I need to",
"Let me",
"Next, I should",
"I should also",
"Additionally",
"I will now",
"I need first to",
];
const needsMoreWork = incompleteSignals.some((signal) =>
text.includes(signal),
);
if (needsMoreWork && ralphIteration < 3) {
ralphIteration++;
await emit({
ts: now(),
type: "info",
text: `🔄 [Ralph Loop] Self-reflection triggered (iteration ${ralphIteration}/3). Resuming execution...`,
});
history.push({
role: "user",
content:
"Please continue implementing the outstanding next steps to complete the task.",
});
continue;
}
// If fully complete, trigger auto-commit and finish
if (opts.autoApprove) {
await autoCommitAndDeploy(opts, task, emit);
} else {
await patchSession(opts, { status: "completed" });
}
return;
}
for (const tc of resp.toolCalls) {
toolFingerprints.push(fingerprintToolCall(tc));
}
const window = toolFingerprints.slice(-10);
const counts = new Map<string, number>();
for (const fp of window) counts.set(fp, (counts.get(fp) ?? 0) + 1);
let maxRepeats = 0;
let repeatedCmd = "";
for (const [fp, n] of counts.entries()) {
if (n > maxRepeats) {
maxRepeats = n;
repeatedCmd = fp.split("|")[0];
}
}
if (maxRepeats >= 6) {
loopBreakReason = `Repeated ${repeatedCmd} ${maxRepeats}× in last 10 calls`;
break;
}
history.push({
role: "assistant",
content: resp.text,
toolCalls: resp.toolCalls,
});
// ── 4-Level Smart Concurrency Tool Grouping ──
const parallelReads = resp.toolCalls.filter((tc: any) =>
[
"fs_read",
"fs_tree",
"fs_list",
"fs_glob",
"fs_grep",
"projects_list",
"project_recent_errors",
].includes(tc.name),
);
const sequentialWrites = resp.toolCalls.filter((tc: any) =>
[
"fs_write",
"fs_edit",
"create_file",
"write_file",
"replace_in_file",
"apps_create",
"databases_create",
].includes(tc.name),
);
const otherTools = resp.toolCalls.filter(
(tc: any) =>
!parallelReads.includes(tc) && !sequentialWrites.includes(tc),
);
// Stage 1: Parallel Reads
if (parallelReads.length > 0) {
await emit({
ts: now(),
type: "step",
text: `Executing ${parallelReads.length} read operations concurrently...`,
});
await Promise.all(
parallelReads.map(async (tc: any) => {
let result;
try {
result = await executeTool(tc.name, tc.args, ctx);
} catch (err) {
result = {
error: err instanceof Error ? err.message : String(err),
};
}
const resultStr =
typeof result === "string"
? result
: JSON.stringify(result, null, 2);
history.push({
role: "tool",
content: resultStr,
toolCallId: tc.id,
toolName: tc.name,
});
}),
);
}
// Stage 2: Parallelizable Other Tools
if (otherTools.length > 0) {
await Promise.all(
otherTools.map(async (tc: any) => {
await emit({
ts: now(),
type: "step",
text: `Running ${tc.name}...`,
});
let result;
try {
result = await executeTool(tc.name, tc.args, ctx);
} catch (err) {
result = {
error: err instanceof Error ? err.message : String(err),
};
}
const resultStr =
typeof result === "string"
? result
: JSON.stringify(result, null, 2);
history.push({
role: "tool",
content: resultStr,
toolCallId: tc.id,
toolName: tc.name,
});
}),
);
}
// Stage 3: Sequential User-Safe Writes/Edits
if (sequentialWrites.length > 0) {
for (const tc of sequentialWrites) {
await emit({
ts: now(),
type: "step",
text: `Writing modifications: ${tc.name}...`,
});
let result;
try {
result = await executeTool(tc.name, tc.args, ctx);
const changedFile = extractChangedFile(
tc.name,
tc.args,
ctx.workspaceRoot,
opts.appPath,
);
if (changedFile) {
await patchSession(opts, { changedFile });
}
} catch (err) {
result = { error: err instanceof Error ? err.message : String(err) };
}
const resultStr =
typeof result === "string" ? result : JSON.stringify(result, null, 2);
history.push({
role: "tool",
content: resultStr,
toolCallId: tc.id,
toolName: tc.name,
});
}
}
}
if (loopBreakReason) {
await emit({
ts: now(),
type: "error",
text: `Loop broken: ${loopBreakReason}`,
});
await patchSession(opts, { status: "failed", error: loopBreakReason });
} else {
await patchSession(opts, { status: "failed", error: "Max turns reached" });
}
}