feat: implement Cloud Git Worktree Pool in agent-runner to isolate parallel sessions
This commit is contained in:
@@ -24,41 +24,100 @@ const PORT = process.env.PORT || 3333;
|
||||
// Build ToolContext from environment variables
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function ensureWorkspace(repo?: string): string {
|
||||
function ensureWorkspace(repo?: string, sessionId?: string): string {
|
||||
const base = process.env.WORKSPACE_BASE || "/workspaces";
|
||||
if (!repo) {
|
||||
const dir = path.join(base, "default");
|
||||
fs.mkdirSync(dir, { recursive: true });
|
||||
return dir;
|
||||
}
|
||||
const dir = path.join(base, repo.replace("/", "_"));
|
||||
const mainRepoDir = path.join(base, repo.replace("/", "_"));
|
||||
const gitea = {
|
||||
apiUrl: process.env.GITEA_API_URL || "",
|
||||
apiToken: process.env.GITEA_API_TOKEN || "",
|
||||
username: process.env.GITEA_USERNAME || "",
|
||||
};
|
||||
if (!fs.existsSync(path.join(dir, ".git"))) {
|
||||
fs.mkdirSync(dir, { recursive: true });
|
||||
|
||||
// 1. Ensure main repo clone exists
|
||||
if (!fs.existsSync(path.join(mainRepoDir, ".git"))) {
|
||||
fs.mkdirSync(mainRepoDir, { recursive: true });
|
||||
const authedUrl = `${gitea.apiUrl}/${repo}.git`.replace(
|
||||
"https://",
|
||||
`https://${gitea.username}:${gitea.apiToken}@`,
|
||||
);
|
||||
try {
|
||||
execSync(`git clone "${authedUrl}" "${dir}"`, { stdio: "pipe" });
|
||||
execSync(`git clone "${authedUrl}" "${mainRepoDir}"`, { stdio: "pipe" });
|
||||
} catch {
|
||||
// Repo may not exist yet — just init
|
||||
execSync(`git init`, { cwd: dir, stdio: "pipe" });
|
||||
execSync(`git init`, { cwd: mainRepoDir, stdio: "pipe" });
|
||||
execSync(`git remote add origin "${authedUrl}"`, {
|
||||
cwd: dir,
|
||||
cwd: mainRepoDir,
|
||||
stdio: "pipe",
|
||||
});
|
||||
}
|
||||
}
|
||||
return dir;
|
||||
|
||||
// 2. If no sessionId, fall back to main repo clone directly
|
||||
if (!sessionId) {
|
||||
return mainRepoDir;
|
||||
}
|
||||
|
||||
// 3. Isolated Worktree Directory per task session
|
||||
const taskWorktreePath = path.join(base, "tasks", sessionId);
|
||||
fs.mkdirSync(path.join(base, "tasks"), { recursive: true });
|
||||
|
||||
// 4. Create isolated worktree if not yet active
|
||||
if (!fs.existsSync(path.join(taskWorktreePath, ".git"))) {
|
||||
// Clean up any stale directory from previous failed runs before adding worktree
|
||||
if (fs.existsSync(taskWorktreePath)) {
|
||||
try {
|
||||
fs.rmSync(taskWorktreePath, { recursive: true, force: true });
|
||||
} catch {}
|
||||
}
|
||||
|
||||
try {
|
||||
console.log(
|
||||
`[worktree] Adding isolated git worktree for session ${sessionId} at ${taskWorktreePath}...`,
|
||||
);
|
||||
|
||||
// Check if the branch task-sessionId already exists in the main repository
|
||||
let branchExists = false;
|
||||
try {
|
||||
const branches = execSync(`git branch --list "task-${sessionId}"`, {
|
||||
cwd: mainRepoDir,
|
||||
}).toString();
|
||||
branchExists = branches.trim().length > 0;
|
||||
} catch {
|
||||
branchExists = false;
|
||||
}
|
||||
|
||||
if (branchExists) {
|
||||
// Checkout the existing branch into the new worktree path
|
||||
execSync(
|
||||
`git worktree add -f "${taskWorktreePath}" "task-${sessionId}"`,
|
||||
{ cwd: mainRepoDir, stdio: "pipe" },
|
||||
);
|
||||
} else {
|
||||
// Create and checkout a new isolated branch
|
||||
execSync(
|
||||
`git worktree add -f -b "task-${sessionId}" "${taskWorktreePath}"`,
|
||||
{ cwd: mainRepoDir, stdio: "pipe" },
|
||||
);
|
||||
}
|
||||
} catch (e: any) {
|
||||
console.error(
|
||||
"[worktree] Failed to add git worktree, falling back to main clone:",
|
||||
e.message || String(e),
|
||||
);
|
||||
return mainRepoDir;
|
||||
}
|
||||
}
|
||||
|
||||
return taskWorktreePath;
|
||||
}
|
||||
|
||||
function buildContext(repo?: string): ToolContext {
|
||||
const workspaceRoot = ensureWorkspace(repo);
|
||||
function buildContext(repo?: string, sessionId?: string): ToolContext {
|
||||
const workspaceRoot = ensureWorkspace(repo, sessionId);
|
||||
|
||||
return {
|
||||
workspaceRoot,
|
||||
@@ -77,6 +136,39 @@ function buildContext(repo?: string): ToolContext {
|
||||
};
|
||||
}
|
||||
|
||||
function cleanupWorkspace(repo: string, sessionId: string) {
|
||||
const base = process.env.WORKSPACE_BASE || "/workspaces";
|
||||
const mainRepoDir = path.join(base, repo.replace("/", "_"));
|
||||
const taskWorktreePath = path.join(base, "tasks", sessionId);
|
||||
|
||||
if (fs.existsSync(taskWorktreePath)) {
|
||||
try {
|
||||
console.log(
|
||||
`[worktree] Pruning and removing git worktree for session ${sessionId}...`,
|
||||
);
|
||||
// 1. Tell git to remove the worktree references
|
||||
execSync(`git worktree remove --force "${taskWorktreePath}"`, {
|
||||
cwd: mainRepoDir,
|
||||
stdio: "pipe",
|
||||
});
|
||||
// 2. Delete the temporary branch from the main repository index
|
||||
execSync(`git branch -D "task-${sessionId}"`, {
|
||||
cwd: mainRepoDir,
|
||||
stdio: "pipe",
|
||||
});
|
||||
// 3. Force clean directory
|
||||
if (fs.existsSync(taskWorktreePath)) {
|
||||
fs.rmSync(taskWorktreePath, { recursive: true, force: true });
|
||||
}
|
||||
} catch (e: any) {
|
||||
console.warn(
|
||||
`[worktree] Non-fatal cleanup error for session ${sessionId}:`,
|
||||
e.message || String(e),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Routes
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -244,7 +336,7 @@ app.post("/agent/execute", async (req: Request, res: Response) => {
|
||||
// Build workspace context — clone/update the Gitea repo if provided
|
||||
let ctx: ReturnType<typeof buildContext>;
|
||||
try {
|
||||
ctx = buildContext(giteaRepo);
|
||||
ctx = buildContext(giteaRepo, sessionId);
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
console.error("[agent/execute] buildContext failed:", msg);
|
||||
@@ -332,6 +424,9 @@ app.post("/agent/execute", async (req: Request, res: Response) => {
|
||||
})
|
||||
.finally(() => {
|
||||
activeSessions.delete(sessionId);
|
||||
if (giteaRepo && sessionId) {
|
||||
cleanupWorkspace(giteaRepo, sessionId);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user