diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..c646f38 --- /dev/null +++ b/.env.example @@ -0,0 +1,23 @@ +# Copy to Coolify environment variables (or .env.local for dev). Do not commit secrets. + +# --- Postgres (Coolify internal service DNS, same stack as this app) --- +# Example: postgresql://USER:PASS@:5432/vibn +DATABASE_URL= +POSTGRES_URL= + +# --- Public URL of this Next app (OAuth callbacks, runner callbacks) --- +NEXTAUTH_URL=https://vibnai.com +NEXTAUTH_SECRET= + +# --- vibn-agent-runner (same Docker network: http://:3333 — or public https://agents.vibnai.com) --- +AGENT_RUNNER_URL=http://localhost:3333 + +# --- Shared secret: must match runner. Required for PATCH session + POST /events ingest --- +AGENT_RUNNER_SECRET= + +# --- Optional: one-shot DDL via POST /api/admin/migrate --- +# ADMIN_MIGRATE_SECRET= + +# --- Google OAuth / Gemini (see .google.env locally) --- +GOOGLE_CLIENT_ID= +GOOGLE_CLIENT_SECRET= diff --git a/.gitignore b/.gitignore index 5ef6a52..7b8da95 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,7 @@ yarn-error.log* # env files (can opt-in for committing if needed) .env* +!.env.example # vercel .vercel diff --git a/app/[workspace]/project/[projectId]/build/page.tsx b/app/[workspace]/project/[projectId]/build/page.tsx index 26c7368..7055fdd 100644 --- a/app/[workspace]/project/[projectId]/build/page.tsx +++ b/app/[workspace]/project/[projectId]/build/page.tsx @@ -236,6 +236,33 @@ const STATUS_LABELS: Record = { }; const FILE_STATUS_COLORS: Record = { added: "#2e7d32", modified: "#d4a04a", deleted: "#c62828" }; +/** Maps persisted / SSE agent event row to terminal line (output.* mirrors legacy outputLine types). */ +interface AgentLogLine { + seq?: number; + ts: string; + type: string; + text: string; +} + +interface StreamEventRow { + seq: number; + ts: string; + type: string; + payload?: Record; +} + +function streamRowToLine(e: StreamEventRow): AgentLogLine { + const text = + typeof e.payload?.text === "string" + ? e.payload.text + : ""; + if (e.type.startsWith("output.")) { + const sub = e.type.slice("output.".length); + return { seq: e.seq, ts: e.ts, type: sub, text: text || e.type }; + } + return { seq: e.seq, ts: e.ts, type: "info", text: text || e.type }; +} + function elapsed(start: string | null): string { if (!start) return ""; const s = Math.floor((Date.now() - new Date(start).getTime()) / 1000); @@ -249,6 +276,9 @@ function AgentMode({ projectId, appName, appPath }: { projectId: string; appName const [sessions, setSessions] = useState([]); const [activeSessionId, setActiveSessionId] = useState(null); const [activeSession, setActiveSession] = useState(null); + const [eventLog, setEventLog] = useState([]); + const maxStreamSeqRef = useRef(0); + const pollOutputRef = useRef([]); const [submitting, setSubmitting] = useState(false); const [loadingSessions, setLoadingSessions] = useState(true); const [approving, setApproving] = useState(false); @@ -262,6 +292,95 @@ function AgentMode({ projectId, appName, appPath }: { projectId: string; appName if (el) el.scrollTop = el.scrollHeight; }, []); + useEffect(() => { + pollOutputRef.current = activeSession?.output ?? []; + }, [activeSession?.output]); + + // Load historical events + live SSE tail (replaces tight polling for log lines when events exist) + useEffect(() => { + if (!activeSessionId || !appName) return; + + let cancelled = false; + let es: EventSource | null = null; + + (async () => { + setEventLog([]); + maxStreamSeqRef.current = 0; + + try { + const evRes = await fetch( + `/api/projects/${projectId}/agent/sessions/${activeSessionId}/events?afterSeq=0` + ); + if (evRes.ok) { + const d = (await evRes.json()) as { events?: StreamEventRow[]; maxSeq?: number }; + if (!cancelled) { + const mapped = (d.events ?? []).map(streamRowToLine); + setEventLog(mapped); + maxStreamSeqRef.current = typeof d.maxSeq === "number" ? d.maxSeq : 0; + } + } + } catch { + /* migration not applied or network */ + } + + if (cancelled) return; + + let status = ""; + try { + const sRes = await fetch(`/api/projects/${projectId}/agent/sessions/${activeSessionId}`); + const sJson = (await sRes.json()) as { session?: { status: string } }; + status = sJson.session?.status ?? ""; + } catch { + return; + } + + if (status !== "running" && status !== "pending") return; + + const url = `/api/projects/${projectId}/agent/sessions/${activeSessionId}/events/stream?afterSeq=${maxStreamSeqRef.current}`; + es = new EventSource(url); + + es.onmessage = (msg: MessageEvent) => { + try { + const data = JSON.parse(msg.data) as Record; + if (data.type === "_heartbeat") return; + if (data.type === "_stream.end" || data.type === "_stream.error") { + es?.close(); + return; + } + if (typeof data.seq !== "number") return; + + maxStreamSeqRef.current = data.seq as number; + const line = streamRowToLine(data as StreamEventRow); + + setEventLog((prev) => { + if (prev.some((p) => p.seq === line.seq)) return prev; + if (prev.length === 0 && pollOutputRef.current.length > 0) { + const seed = pollOutputRef.current.map((l, i) => ({ + ts: l.ts, + type: l.type, + text: l.text, + seq: -(i + 1), + })); + return [...seed, line]; + } + return [...prev, line]; + }); + } catch { + /* ignore malformed */ + } + }; + + es.onerror = () => { + es?.close(); + }; + })(); + + return () => { + cancelled = true; + es?.close(); + }; + }, [activeSessionId, projectId, appName]); + // Load session list — auto-select the most recent active or last session useEffect(() => { if (!appName) return; @@ -447,22 +566,26 @@ function AgentMode({ projectId, appName, appPath }: { projectId: string; appName )} - {/* Output stream */} + {/* Output stream — prefer persisted event timeline + SSE when available */}
- {activeSession.output.length === 0 && ( - Starting agent… - )} - {activeSession.output.map((line, i) => { - const color = line.type === "error" ? "#f87171" : line.type === "stderr" ? "#fb923c" - : line.type === "info" ? "#60a5fa" : line.type === "step" ? "#a78bfa" : "#d4d4d4"; - const prefix = line.type === "step" ? "▶ " : line.type === "error" ? "✗ " - : line.type === "info" ? "→ " : " "; - return ( -
- {prefix}{line.text} -
- ); - })} + {(() => { + const displayLines: AgentLogLine[] = + eventLog.length > 0 ? eventLog : activeSession.output.map((l) => ({ ts: l.ts, type: l.type, text: l.text })); + if (displayLines.length === 0) { + return Starting agent…; + } + return displayLines.map((line, i) => { + const color = line.type === "error" ? "#f87171" : line.type === "stderr" ? "#fb923c" + : line.type === "info" ? "#60a5fa" : line.type === "step" ? "#a78bfa" : line.type === "done" ? "#86efac" : "#d4d4d4"; + const prefix = line.type === "step" ? "▶ " : line.type === "error" ? "✗ " + : line.type === "info" ? "→ " : line.type === "done" ? "✓ " : " "; + return ( +
+ {prefix}{line.text} +
+ ); + }); + })()} {["running", "pending"].includes(activeSession.status) && ( )} diff --git a/app/api/admin/migrate/route.ts b/app/api/admin/migrate/route.ts index f9578bd..9d3e8a1 100644 --- a/app/api/admin/migrate/route.ts +++ b/app/api/admin/migrate/route.ts @@ -90,6 +90,20 @@ export async function POST(req: NextRequest) { `CREATE INDEX IF NOT EXISTS agent_sessions_project_idx ON agent_sessions (project_id, created_at DESC)`, `CREATE INDEX IF NOT EXISTS agent_sessions_status_idx ON agent_sessions (status)`, + `CREATE TABLE IF NOT EXISTS agent_session_events ( + id BIGSERIAL PRIMARY KEY, + session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE, + project_id TEXT NOT NULL, + seq INT NOT NULL, + ts TIMESTAMPTZ NOT NULL, + type TEXT NOT NULL, + payload JSONB NOT NULL DEFAULT '{}'::jsonb, + client_event_id UUID UNIQUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE(session_id, seq) + )`, + `CREATE INDEX IF NOT EXISTS agent_session_events_session_seq_idx ON agent_session_events (session_id, seq)`, + // NextAuth / Prisma tables `CREATE TABLE IF NOT EXISTS users ( id TEXT PRIMARY KEY, diff --git a/app/api/projects/[projectId]/agent/sessions/[sessionId]/events/route.ts b/app/api/projects/[projectId]/agent/sessions/[sessionId]/events/route.ts new file mode 100644 index 0000000..1125d03 --- /dev/null +++ b/app/api/projects/[projectId]/agent/sessions/[sessionId]/events/route.ts @@ -0,0 +1,142 @@ +/** + * GET /api/projects/[projectId]/agent/sessions/[sessionId]/events?afterSeq=0 + * List persisted agent events for replay (user session auth). + * + * POST /api/projects/[projectId]/agent/sessions/[sessionId]/events + * Batch append from vibn-agent-runner (x-agent-runner-secret). + */ +import { NextResponse } from "next/server"; +import { getServerSession } from "next-auth"; +import { authOptions } from "@/lib/auth/authOptions"; +import { query, getPool } from "@/lib/db-postgres"; + +export interface AgentSessionEventRow { + seq: number; + ts: string; + type: string; + payload: Record; +} + +export async function GET( + req: Request, + { params }: { params: Promise<{ projectId: string; sessionId: string }> } +) { + try { + const { projectId, sessionId } = await params; + const session = await getServerSession(authOptions); + if (!session?.user?.email) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + const afterSeq = Math.max(0, parseInt(new URL(req.url).searchParams.get("afterSeq") ?? "0", 10) || 0); + + const rows = await query( + `SELECT e.seq, e.ts::text, e.type, e.payload + FROM agent_session_events e + JOIN agent_sessions s ON s.id = e.session_id + JOIN fs_projects p ON p.id::text = s.project_id::text + JOIN fs_users u ON u.id = p.user_id + WHERE e.session_id = $1::uuid AND s.project_id::text = $2 AND u.data->>'email' = $3 + AND e.seq > $4 + ORDER BY e.seq ASC + LIMIT 2000`, + [sessionId, projectId, session.user.email, afterSeq] + ); + + const maxSeq = rows.length ? rows[rows.length - 1].seq : afterSeq; + + return NextResponse.json({ events: rows, maxSeq }); + } catch (err) { + console.error("[agent/sessions/.../events GET]", err); + return NextResponse.json({ error: "Failed to list events" }, { status: 500 }); + } +} + +type IngestBody = { + events: Array<{ + clientEventId: string; + ts: string; + type: string; + payload?: Record; + }>; +}; + +export async function POST( + req: Request, + { params }: { params: Promise<{ projectId: string; sessionId: string }> } +) { + const secret = process.env.AGENT_RUNNER_SECRET ?? ""; + const incomingSecret = req.headers.get("x-agent-runner-secret") ?? ""; + if (secret && incomingSecret !== secret) { + return NextResponse.json({ error: "Forbidden" }, { status: 403 }); + } + + const { projectId, sessionId } = await params; + + let body: IngestBody; + try { + body = (await req.json()) as IngestBody; + } catch { + return NextResponse.json({ error: "Invalid JSON" }, { status: 400 }); + } + + if (!body.events?.length) { + return NextResponse.json({ ok: true, inserted: 0 }); + } + + const pool = getPool(); + const client = await pool.connect(); + + try { + const exists = await client.query<{ n: string }>( + `SELECT 1 AS n FROM agent_sessions WHERE id = $1::uuid AND project_id::text = $2 LIMIT 1`, + [sessionId, projectId] + ); + if (exists.rowCount === 0) { + return NextResponse.json({ error: "Session not found" }, { status: 404 }); + } + + await client.query("BEGIN"); + await client.query("SELECT pg_advisory_xact_lock(hashtext($1::text))", [sessionId]); + + let inserted = 0; + for (const ev of body.events) { + if (!ev.clientEventId || !ev.type || !ev.ts) continue; + + const maxRes = await client.query<{ m: string }>( + `SELECT COALESCE(MAX(seq), 0)::text AS m FROM agent_session_events WHERE session_id = $1::uuid`, + [sessionId] + ); + const nextSeq = Number(maxRes.rows[0].m) + 1; + + const ins = await client.query( + `INSERT INTO agent_session_events (session_id, project_id, seq, ts, type, payload, client_event_id) + VALUES ($1::uuid, $2, $3, $4::timestamptz, $5, $6::jsonb, $7::uuid) + ON CONFLICT (client_event_id) DO NOTHING`, + [ + sessionId, + projectId, + nextSeq, + ev.ts, + ev.type, + JSON.stringify(ev.payload ?? {}), + ev.clientEventId, + ] + ); + if (ins.rowCount) inserted += ins.rowCount; + } + + await client.query("COMMIT"); + return NextResponse.json({ ok: true, inserted }); + } catch (err) { + try { + await client.query("ROLLBACK"); + } catch { + /* ignore */ + } + console.error("[agent/sessions/.../events POST]", err); + return NextResponse.json({ error: "Failed to ingest events" }, { status: 500 }); + } finally { + client.release(); + } +} diff --git a/app/api/projects/[projectId]/agent/sessions/[sessionId]/events/stream/route.ts b/app/api/projects/[projectId]/agent/sessions/[sessionId]/events/stream/route.ts new file mode 100644 index 0000000..1c2171b --- /dev/null +++ b/app/api/projects/[projectId]/agent/sessions/[sessionId]/events/stream/route.ts @@ -0,0 +1,122 @@ +/** + * GET /api/projects/.../agent/sessions/.../events/stream?afterSeq=0 + * Server-Sent Events: tail agent_session_events while the session is active. + */ +import { getServerSession } from "next-auth"; +import { authOptions } from "@/lib/auth/authOptions"; +import { query, queryOne } from "@/lib/db-postgres"; + +export const dynamic = "force-dynamic"; +export const runtime = "nodejs"; +/** Long-lived SSE — raise if your host defaults to a shorter limit (e.g. Vercel). */ +export const maxDuration = 300; + +const TERMINAL = new Set(["done", "approved", "failed", "stopped"]); + +export async function GET( + req: Request, + { params }: { params: Promise<{ projectId: string; sessionId: string }> } +) { + const session = await getServerSession(authOptions); + if (!session?.user?.email) { + return new Response("Unauthorized", { status: 401 }); + } + + const { projectId, sessionId } = await params; + let afterSeq = Math.max(0, parseInt(new URL(req.url).searchParams.get("afterSeq") ?? "0", 10) || 0); + + const allowed = await queryOne<{ n: string }>( + `SELECT 1 AS n FROM agent_sessions s + JOIN fs_projects p ON p.id::text = s.project_id::text + JOIN fs_users u ON u.id = p.user_id + WHERE s.id = $1::uuid AND s.project_id::text = $2 AND u.data->>'email' = $3 + LIMIT 1`, + [sessionId, projectId, session.user.email] + ); + if (!allowed) { + return new Response("Not found", { status: 404 }); + } + + const encoder = new TextEncoder(); + const signal = req.signal; + + const stream = new ReadableStream({ + async start(controller) { + const send = (obj: object) => { + controller.enqueue(encoder.encode(`data: ${JSON.stringify(obj)}\n\n`)); + }; + + let idleAfterTerminal = 0; + let lastHeartbeat = Date.now(); + + try { + while (!signal.aborted) { + const rows = await query<{ seq: number; ts: string; type: string; payload: Record }>( + `SELECT e.seq, e.ts::text, e.type, e.payload + FROM agent_session_events e + WHERE e.session_id = $1::uuid AND e.seq > $2 + ORDER BY e.seq ASC + LIMIT 200`, + [sessionId, afterSeq] + ); + + for (const row of rows) { + afterSeq = row.seq; + send({ seq: row.seq, ts: row.ts, type: row.type, payload: row.payload }); + } + + const st = await queryOne<{ status: string }>( + `SELECT status FROM agent_sessions WHERE id = $1::uuid LIMIT 1`, + [sessionId] + ); + const status = st?.status ?? ""; + const terminal = TERMINAL.has(status); + + if (rows.length === 0) { + if (terminal) { + idleAfterTerminal++; + if (idleAfterTerminal >= 3) { + send({ type: "_stream.end", seq: afterSeq }); + break; + } + } else { + idleAfterTerminal = 0; + } + } else { + idleAfterTerminal = 0; + } + + const now = Date.now(); + if (now - lastHeartbeat > 20000) { + send({ type: "_heartbeat", t: now }); + lastHeartbeat = now; + } + + await new Promise((r) => setTimeout(r, 750)); + } + } catch (e) { + console.error("[events/stream]", e); + try { + send({ type: "_stream.error", message: "stream failed" }); + } catch { + /* ignore */ + } + } finally { + try { + controller.close(); + } catch { + /* ignore */ + } + } + }, + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream; charset=utf-8", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + "X-Accel-Buffering": "no", + }, + }); +} diff --git a/app/api/projects/[projectId]/agent/sessions/[sessionId]/retry/route.ts b/app/api/projects/[projectId]/agent/sessions/[sessionId]/retry/route.ts index 5cf4941..539ba19 100644 --- a/app/api/projects/[projectId]/agent/sessions/[sessionId]/retry/route.ts +++ b/app/api/projects/[projectId]/agent/sessions/[sessionId]/retry/route.ts @@ -66,6 +66,13 @@ export async function POST( ); const giteaRepo = proj[0]?.data?.giteaRepo as string | undefined; + // Clear persisted event timeline so SSE / replay matches the new run (no-op if table missing) + try { + await query(`DELETE FROM agent_session_events WHERE session_id = $1::uuid`, [sessionId]); + } catch { + /* table may not exist until admin migrate */ + } + // Reset the session row so the frontend shows it as running again await query( `UPDATE agent_sessions diff --git a/entrypoint.sh b/entrypoint.sh index 54eecb5..a61c178 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -53,6 +53,37 @@ pool.query(\` messages JSONB NOT NULL DEFAULT '[]', updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); + CREATE TABLE IF NOT EXISTS agent_sessions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + project_id TEXT NOT NULL, + app_name TEXT NOT NULL, + app_path TEXT NOT NULL, + task TEXT NOT NULL, + plan JSONB, + status TEXT NOT NULL DEFAULT 'pending', + output JSONB NOT NULL DEFAULT '[]'::jsonb, + changed_files JSONB NOT NULL DEFAULT '[]'::jsonb, + error TEXT, + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() + ); + CREATE INDEX IF NOT EXISTS agent_sessions_project_idx ON agent_sessions (project_id, created_at DESC); + CREATE INDEX IF NOT EXISTS agent_sessions_status_idx ON agent_sessions (status); + CREATE TABLE IF NOT EXISTS agent_session_events ( + id BIGSERIAL PRIMARY KEY, + session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE, + project_id TEXT NOT NULL, + seq INT NOT NULL, + ts TIMESTAMPTZ NOT NULL, + type TEXT NOT NULL, + payload JSONB NOT NULL DEFAULT '{}'::jsonb, + client_event_id UUID UNIQUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE(session_id, seq) + ); + CREATE INDEX IF NOT EXISTS agent_session_events_session_seq_idx ON agent_session_events (session_id, seq); \`).then(() => { console.log('App tables ready'); pool.end(); }).catch(e => { console.error('Table init error:', e.message); pool.end(); }); " diff --git a/scripts/migrate-fs-tables.sql b/scripts/migrate-fs-tables.sql index e69a2a1..7e01bb0 100644 --- a/scripts/migrate-fs-tables.sql +++ b/scripts/migrate-fs-tables.sql @@ -93,6 +93,25 @@ CREATE INDEX IF NOT EXISTS agent_sessions_project_idx CREATE INDEX IF NOT EXISTS agent_sessions_status_idx ON agent_sessions (status); +-- --------------------------------------------------------------------------- +-- agent_session_events (append-only timeline for SSE + replay) +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS agent_session_events ( + id BIGSERIAL PRIMARY KEY, + session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE, + project_id TEXT NOT NULL, + seq INT NOT NULL, + ts TIMESTAMPTZ NOT NULL, + type TEXT NOT NULL, + payload JSONB NOT NULL DEFAULT '{}'::jsonb, + client_event_id UUID UNIQUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE(session_id, seq) +); + +CREATE INDEX IF NOT EXISTS agent_session_events_session_seq_idx + ON agent_session_events (session_id, seq); + -- --------------------------------------------------------------------------- -- NextAuth / Prisma tables (required by PrismaAdapter + strategy:"database") -- Only created if not already present from a prisma migrate run.