feat(agent): event timeline API, SSE stream, Coolify DDL, env template

- Add agent_session_events table + GET/POST events + SSE stream routes
- Build Agent tab: hydrate from events + EventSource while running
- entrypoint: create agent_sessions + agent_session_events on container start
- .env.example for AGENT_RUNNER_URL, AGENT_RUNNER_SECRET, DATABASE_URL

Made-with: Cursor
This commit is contained in:
2026-04-01 11:48:55 -07:00
parent a11caafd22
commit 26429f3517
9 changed files with 497 additions and 15 deletions

23
.env.example Normal file
View File

@@ -0,0 +1,23 @@
# Copy to Coolify environment variables (or .env.local for dev). Do not commit secrets.
# --- Postgres (Coolify internal service DNS, same stack as this app) ---
# Example: postgresql://USER:PASS@<coolify-service-uuid>:5432/vibn
DATABASE_URL=
POSTGRES_URL=
# --- Public URL of this Next app (OAuth callbacks, runner callbacks) ---
NEXTAUTH_URL=https://vibnai.com
NEXTAUTH_SECRET=
# --- vibn-agent-runner (same Docker network: http://<service-name>:3333 — or public https://agents.vibnai.com) ---
AGENT_RUNNER_URL=http://localhost:3333
# --- Shared secret: must match runner. Required for PATCH session + POST /events ingest ---
AGENT_RUNNER_SECRET=
# --- Optional: one-shot DDL via POST /api/admin/migrate ---
# ADMIN_MIGRATE_SECRET=
# --- Google OAuth / Gemini (see .google.env locally) ---
GOOGLE_CLIENT_ID=
GOOGLE_CLIENT_SECRET=

1
.gitignore vendored
View File

@@ -32,6 +32,7 @@ yarn-error.log*
# env files (can opt-in for committing if needed) # env files (can opt-in for committing if needed)
.env* .env*
!.env.example
# vercel # vercel
.vercel .vercel

View File

@@ -236,6 +236,33 @@ const STATUS_LABELS: Record<string, string> = {
}; };
const FILE_STATUS_COLORS: Record<string, string> = { added: "#2e7d32", modified: "#d4a04a", deleted: "#c62828" }; const FILE_STATUS_COLORS: Record<string, string> = { added: "#2e7d32", modified: "#d4a04a", deleted: "#c62828" };
/** Maps persisted / SSE agent event row to terminal line (output.* mirrors legacy outputLine types). */
interface AgentLogLine {
seq?: number;
ts: string;
type: string;
text: string;
}
interface StreamEventRow {
seq: number;
ts: string;
type: string;
payload?: Record<string, unknown>;
}
function streamRowToLine(e: StreamEventRow): AgentLogLine {
const text =
typeof e.payload?.text === "string"
? e.payload.text
: "";
if (e.type.startsWith("output.")) {
const sub = e.type.slice("output.".length);
return { seq: e.seq, ts: e.ts, type: sub, text: text || e.type };
}
return { seq: e.seq, ts: e.ts, type: "info", text: text || e.type };
}
function elapsed(start: string | null): string { function elapsed(start: string | null): string {
if (!start) return ""; if (!start) return "";
const s = Math.floor((Date.now() - new Date(start).getTime()) / 1000); const s = Math.floor((Date.now() - new Date(start).getTime()) / 1000);
@@ -249,6 +276,9 @@ function AgentMode({ projectId, appName, appPath }: { projectId: string; appName
const [sessions, setSessions] = useState<AgentSession[]>([]); const [sessions, setSessions] = useState<AgentSession[]>([]);
const [activeSessionId, setActiveSessionId] = useState<string | null>(null); const [activeSessionId, setActiveSessionId] = useState<string | null>(null);
const [activeSession, setActiveSession] = useState<AgentSession | null>(null); const [activeSession, setActiveSession] = useState<AgentSession | null>(null);
const [eventLog, setEventLog] = useState<AgentLogLine[]>([]);
const maxStreamSeqRef = useRef(0);
const pollOutputRef = useRef<AgentSession["output"]>([]);
const [submitting, setSubmitting] = useState(false); const [submitting, setSubmitting] = useState(false);
const [loadingSessions, setLoadingSessions] = useState(true); const [loadingSessions, setLoadingSessions] = useState(true);
const [approving, setApproving] = useState(false); const [approving, setApproving] = useState(false);
@@ -262,6 +292,95 @@ function AgentMode({ projectId, appName, appPath }: { projectId: string; appName
if (el) el.scrollTop = el.scrollHeight; if (el) el.scrollTop = el.scrollHeight;
}, []); }, []);
useEffect(() => {
pollOutputRef.current = activeSession?.output ?? [];
}, [activeSession?.output]);
// Load historical events + live SSE tail (replaces tight polling for log lines when events exist)
useEffect(() => {
if (!activeSessionId || !appName) return;
let cancelled = false;
let es: EventSource | null = null;
(async () => {
setEventLog([]);
maxStreamSeqRef.current = 0;
try {
const evRes = await fetch(
`/api/projects/${projectId}/agent/sessions/${activeSessionId}/events?afterSeq=0`
);
if (evRes.ok) {
const d = (await evRes.json()) as { events?: StreamEventRow[]; maxSeq?: number };
if (!cancelled) {
const mapped = (d.events ?? []).map(streamRowToLine);
setEventLog(mapped);
maxStreamSeqRef.current = typeof d.maxSeq === "number" ? d.maxSeq : 0;
}
}
} catch {
/* migration not applied or network */
}
if (cancelled) return;
let status = "";
try {
const sRes = await fetch(`/api/projects/${projectId}/agent/sessions/${activeSessionId}`);
const sJson = (await sRes.json()) as { session?: { status: string } };
status = sJson.session?.status ?? "";
} catch {
return;
}
if (status !== "running" && status !== "pending") return;
const url = `/api/projects/${projectId}/agent/sessions/${activeSessionId}/events/stream?afterSeq=${maxStreamSeqRef.current}`;
es = new EventSource(url);
es.onmessage = (msg: MessageEvent<string>) => {
try {
const data = JSON.parse(msg.data) as Record<string, unknown>;
if (data.type === "_heartbeat") return;
if (data.type === "_stream.end" || data.type === "_stream.error") {
es?.close();
return;
}
if (typeof data.seq !== "number") return;
maxStreamSeqRef.current = data.seq as number;
const line = streamRowToLine(data as StreamEventRow);
setEventLog((prev) => {
if (prev.some((p) => p.seq === line.seq)) return prev;
if (prev.length === 0 && pollOutputRef.current.length > 0) {
const seed = pollOutputRef.current.map((l, i) => ({
ts: l.ts,
type: l.type,
text: l.text,
seq: -(i + 1),
}));
return [...seed, line];
}
return [...prev, line];
});
} catch {
/* ignore malformed */
}
};
es.onerror = () => {
es?.close();
};
})();
return () => {
cancelled = true;
es?.close();
};
}, [activeSessionId, projectId, appName]);
// Load session list — auto-select the most recent active or last session // Load session list — auto-select the most recent active or last session
useEffect(() => { useEffect(() => {
if (!appName) return; if (!appName) return;
@@ -447,22 +566,26 @@ function AgentMode({ projectId, appName, appPath }: { projectId: string; appName
)} )}
</div> </div>
{/* Output stream */} {/* Output stream — prefer persisted event timeline + SSE when available */}
<div ref={outputRef} style={{ flex: 1, overflow: "auto", background: "#1a1a1a", padding: "16px 20px", fontFamily: "IBM Plex Mono, monospace", fontSize: "0.72rem", lineHeight: 1.6 }}> <div ref={outputRef} style={{ flex: 1, overflow: "auto", background: "#1a1a1a", padding: "16px 20px", fontFamily: "IBM Plex Mono, monospace", fontSize: "0.72rem", lineHeight: 1.6 }}>
{activeSession.output.length === 0 && ( {(() => {
<span style={{ color: "#555" }}>Starting agent</span> const displayLines: AgentLogLine[] =
)} eventLog.length > 0 ? eventLog : activeSession.output.map((l) => ({ ts: l.ts, type: l.type, text: l.text }));
{activeSession.output.map((line, i) => { if (displayLines.length === 0) {
return <span style={{ color: "#555" }}>Starting agent</span>;
}
return displayLines.map((line, i) => {
const color = line.type === "error" ? "#f87171" : line.type === "stderr" ? "#fb923c" const color = line.type === "error" ? "#f87171" : line.type === "stderr" ? "#fb923c"
: line.type === "info" ? "#60a5fa" : line.type === "step" ? "#a78bfa" : "#d4d4d4"; : line.type === "info" ? "#60a5fa" : line.type === "step" ? "#a78bfa" : line.type === "done" ? "#86efac" : "#d4d4d4";
const prefix = line.type === "step" ? "▶ " : line.type === "error" ? "✗ " const prefix = line.type === "step" ? "▶ " : line.type === "error" ? "✗ "
: line.type === "info" ? "→ " : " "; : line.type === "info" ? "→ " : line.type === "done" ? "✓ " : " ";
return ( return (
<div key={i} style={{ color, marginBottom: 1 }}> <div key={line.seq ?? `o-${i}`} style={{ color, marginBottom: 1 }}>
{prefix}{line.text} {prefix}{line.text}
</div> </div>
); );
})} });
})()}
{["running", "pending"].includes(activeSession.status) && ( {["running", "pending"].includes(activeSession.status) && (
<span style={{ color: "#555", animation: "pulse 1.5s infinite" }}></span> <span style={{ color: "#555", animation: "pulse 1.5s infinite" }}></span>
)} )}

View File

@@ -90,6 +90,20 @@ export async function POST(req: NextRequest) {
`CREATE INDEX IF NOT EXISTS agent_sessions_project_idx ON agent_sessions (project_id, created_at DESC)`, `CREATE INDEX IF NOT EXISTS agent_sessions_project_idx ON agent_sessions (project_id, created_at DESC)`,
`CREATE INDEX IF NOT EXISTS agent_sessions_status_idx ON agent_sessions (status)`, `CREATE INDEX IF NOT EXISTS agent_sessions_status_idx ON agent_sessions (status)`,
`CREATE TABLE IF NOT EXISTS agent_session_events (
id BIGSERIAL PRIMARY KEY,
session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE,
project_id TEXT NOT NULL,
seq INT NOT NULL,
ts TIMESTAMPTZ NOT NULL,
type TEXT NOT NULL,
payload JSONB NOT NULL DEFAULT '{}'::jsonb,
client_event_id UUID UNIQUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(session_id, seq)
)`,
`CREATE INDEX IF NOT EXISTS agent_session_events_session_seq_idx ON agent_session_events (session_id, seq)`,
// NextAuth / Prisma tables // NextAuth / Prisma tables
`CREATE TABLE IF NOT EXISTS users ( `CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,

View File

@@ -0,0 +1,142 @@
/**
* GET /api/projects/[projectId]/agent/sessions/[sessionId]/events?afterSeq=0
* List persisted agent events for replay (user session auth).
*
* POST /api/projects/[projectId]/agent/sessions/[sessionId]/events
* Batch append from vibn-agent-runner (x-agent-runner-secret).
*/
import { NextResponse } from "next/server";
import { getServerSession } from "next-auth";
import { authOptions } from "@/lib/auth/authOptions";
import { query, getPool } from "@/lib/db-postgres";
export interface AgentSessionEventRow {
seq: number;
ts: string;
type: string;
payload: Record<string, unknown>;
}
export async function GET(
req: Request,
{ params }: { params: Promise<{ projectId: string; sessionId: string }> }
) {
try {
const { projectId, sessionId } = await params;
const session = await getServerSession(authOptions);
if (!session?.user?.email) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
const afterSeq = Math.max(0, parseInt(new URL(req.url).searchParams.get("afterSeq") ?? "0", 10) || 0);
const rows = await query<AgentSessionEventRow>(
`SELECT e.seq, e.ts::text, e.type, e.payload
FROM agent_session_events e
JOIN agent_sessions s ON s.id = e.session_id
JOIN fs_projects p ON p.id::text = s.project_id::text
JOIN fs_users u ON u.id = p.user_id
WHERE e.session_id = $1::uuid AND s.project_id::text = $2 AND u.data->>'email' = $3
AND e.seq > $4
ORDER BY e.seq ASC
LIMIT 2000`,
[sessionId, projectId, session.user.email, afterSeq]
);
const maxSeq = rows.length ? rows[rows.length - 1].seq : afterSeq;
return NextResponse.json({ events: rows, maxSeq });
} catch (err) {
console.error("[agent/sessions/.../events GET]", err);
return NextResponse.json({ error: "Failed to list events" }, { status: 500 });
}
}
type IngestBody = {
events: Array<{
clientEventId: string;
ts: string;
type: string;
payload?: Record<string, unknown>;
}>;
};
export async function POST(
req: Request,
{ params }: { params: Promise<{ projectId: string; sessionId: string }> }
) {
const secret = process.env.AGENT_RUNNER_SECRET ?? "";
const incomingSecret = req.headers.get("x-agent-runner-secret") ?? "";
if (secret && incomingSecret !== secret) {
return NextResponse.json({ error: "Forbidden" }, { status: 403 });
}
const { projectId, sessionId } = await params;
let body: IngestBody;
try {
body = (await req.json()) as IngestBody;
} catch {
return NextResponse.json({ error: "Invalid JSON" }, { status: 400 });
}
if (!body.events?.length) {
return NextResponse.json({ ok: true, inserted: 0 });
}
const pool = getPool();
const client = await pool.connect();
try {
const exists = await client.query<{ n: string }>(
`SELECT 1 AS n FROM agent_sessions WHERE id = $1::uuid AND project_id::text = $2 LIMIT 1`,
[sessionId, projectId]
);
if (exists.rowCount === 0) {
return NextResponse.json({ error: "Session not found" }, { status: 404 });
}
await client.query("BEGIN");
await client.query("SELECT pg_advisory_xact_lock(hashtext($1::text))", [sessionId]);
let inserted = 0;
for (const ev of body.events) {
if (!ev.clientEventId || !ev.type || !ev.ts) continue;
const maxRes = await client.query<{ m: string }>(
`SELECT COALESCE(MAX(seq), 0)::text AS m FROM agent_session_events WHERE session_id = $1::uuid`,
[sessionId]
);
const nextSeq = Number(maxRes.rows[0].m) + 1;
const ins = await client.query(
`INSERT INTO agent_session_events (session_id, project_id, seq, ts, type, payload, client_event_id)
VALUES ($1::uuid, $2, $3, $4::timestamptz, $5, $6::jsonb, $7::uuid)
ON CONFLICT (client_event_id) DO NOTHING`,
[
sessionId,
projectId,
nextSeq,
ev.ts,
ev.type,
JSON.stringify(ev.payload ?? {}),
ev.clientEventId,
]
);
if (ins.rowCount) inserted += ins.rowCount;
}
await client.query("COMMIT");
return NextResponse.json({ ok: true, inserted });
} catch (err) {
try {
await client.query("ROLLBACK");
} catch {
/* ignore */
}
console.error("[agent/sessions/.../events POST]", err);
return NextResponse.json({ error: "Failed to ingest events" }, { status: 500 });
} finally {
client.release();
}
}

View File

@@ -0,0 +1,122 @@
/**
* GET /api/projects/.../agent/sessions/.../events/stream?afterSeq=0
* Server-Sent Events: tail agent_session_events while the session is active.
*/
import { getServerSession } from "next-auth";
import { authOptions } from "@/lib/auth/authOptions";
import { query, queryOne } from "@/lib/db-postgres";
export const dynamic = "force-dynamic";
export const runtime = "nodejs";
/** Long-lived SSE — raise if your host defaults to a shorter limit (e.g. Vercel). */
export const maxDuration = 300;
const TERMINAL = new Set(["done", "approved", "failed", "stopped"]);
export async function GET(
req: Request,
{ params }: { params: Promise<{ projectId: string; sessionId: string }> }
) {
const session = await getServerSession(authOptions);
if (!session?.user?.email) {
return new Response("Unauthorized", { status: 401 });
}
const { projectId, sessionId } = await params;
let afterSeq = Math.max(0, parseInt(new URL(req.url).searchParams.get("afterSeq") ?? "0", 10) || 0);
const allowed = await queryOne<{ n: string }>(
`SELECT 1 AS n FROM agent_sessions s
JOIN fs_projects p ON p.id::text = s.project_id::text
JOIN fs_users u ON u.id = p.user_id
WHERE s.id = $1::uuid AND s.project_id::text = $2 AND u.data->>'email' = $3
LIMIT 1`,
[sessionId, projectId, session.user.email]
);
if (!allowed) {
return new Response("Not found", { status: 404 });
}
const encoder = new TextEncoder();
const signal = req.signal;
const stream = new ReadableStream({
async start(controller) {
const send = (obj: object) => {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(obj)}\n\n`));
};
let idleAfterTerminal = 0;
let lastHeartbeat = Date.now();
try {
while (!signal.aborted) {
const rows = await query<{ seq: number; ts: string; type: string; payload: Record<string, unknown> }>(
`SELECT e.seq, e.ts::text, e.type, e.payload
FROM agent_session_events e
WHERE e.session_id = $1::uuid AND e.seq > $2
ORDER BY e.seq ASC
LIMIT 200`,
[sessionId, afterSeq]
);
for (const row of rows) {
afterSeq = row.seq;
send({ seq: row.seq, ts: row.ts, type: row.type, payload: row.payload });
}
const st = await queryOne<{ status: string }>(
`SELECT status FROM agent_sessions WHERE id = $1::uuid LIMIT 1`,
[sessionId]
);
const status = st?.status ?? "";
const terminal = TERMINAL.has(status);
if (rows.length === 0) {
if (terminal) {
idleAfterTerminal++;
if (idleAfterTerminal >= 3) {
send({ type: "_stream.end", seq: afterSeq });
break;
}
} else {
idleAfterTerminal = 0;
}
} else {
idleAfterTerminal = 0;
}
const now = Date.now();
if (now - lastHeartbeat > 20000) {
send({ type: "_heartbeat", t: now });
lastHeartbeat = now;
}
await new Promise((r) => setTimeout(r, 750));
}
} catch (e) {
console.error("[events/stream]", e);
try {
send({ type: "_stream.error", message: "stream failed" });
} catch {
/* ignore */
}
} finally {
try {
controller.close();
} catch {
/* ignore */
}
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
},
});
}

View File

@@ -66,6 +66,13 @@ export async function POST(
); );
const giteaRepo = proj[0]?.data?.giteaRepo as string | undefined; const giteaRepo = proj[0]?.data?.giteaRepo as string | undefined;
// Clear persisted event timeline so SSE / replay matches the new run (no-op if table missing)
try {
await query(`DELETE FROM agent_session_events WHERE session_id = $1::uuid`, [sessionId]);
} catch {
/* table may not exist until admin migrate */
}
// Reset the session row so the frontend shows it as running again // Reset the session row so the frontend shows it as running again
await query( await query(
`UPDATE agent_sessions `UPDATE agent_sessions

View File

@@ -53,6 +53,37 @@ pool.query(\`
messages JSONB NOT NULL DEFAULT '[]', messages JSONB NOT NULL DEFAULT '[]',
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
); );
CREATE TABLE IF NOT EXISTS agent_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
project_id TEXT NOT NULL,
app_name TEXT NOT NULL,
app_path TEXT NOT NULL,
task TEXT NOT NULL,
plan JSONB,
status TEXT NOT NULL DEFAULT 'pending',
output JSONB NOT NULL DEFAULT '[]'::jsonb,
changed_files JSONB NOT NULL DEFAULT '[]'::jsonb,
error TEXT,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS agent_sessions_project_idx ON agent_sessions (project_id, created_at DESC);
CREATE INDEX IF NOT EXISTS agent_sessions_status_idx ON agent_sessions (status);
CREATE TABLE IF NOT EXISTS agent_session_events (
id BIGSERIAL PRIMARY KEY,
session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE,
project_id TEXT NOT NULL,
seq INT NOT NULL,
ts TIMESTAMPTZ NOT NULL,
type TEXT NOT NULL,
payload JSONB NOT NULL DEFAULT '{}'::jsonb,
client_event_id UUID UNIQUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(session_id, seq)
);
CREATE INDEX IF NOT EXISTS agent_session_events_session_seq_idx ON agent_session_events (session_id, seq);
\`).then(() => { console.log('App tables ready'); pool.end(); }).catch(e => { console.error('Table init error:', e.message); pool.end(); }); \`).then(() => { console.log('App tables ready'); pool.end(); }).catch(e => { console.error('Table init error:', e.message); pool.end(); });
" "

View File

@@ -93,6 +93,25 @@ CREATE INDEX IF NOT EXISTS agent_sessions_project_idx
CREATE INDEX IF NOT EXISTS agent_sessions_status_idx CREATE INDEX IF NOT EXISTS agent_sessions_status_idx
ON agent_sessions (status); ON agent_sessions (status);
-- ---------------------------------------------------------------------------
-- agent_session_events (append-only timeline for SSE + replay)
-- ---------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS agent_session_events (
id BIGSERIAL PRIMARY KEY,
session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE,
project_id TEXT NOT NULL,
seq INT NOT NULL,
ts TIMESTAMPTZ NOT NULL,
type TEXT NOT NULL,
payload JSONB NOT NULL DEFAULT '{}'::jsonb,
client_event_id UUID UNIQUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(session_id, seq)
);
CREATE INDEX IF NOT EXISTS agent_session_events_session_seq_idx
ON agent_session_events (session_id, seq);
-- --------------------------------------------------------------------------- -- ---------------------------------------------------------------------------
-- NextAuth / Prisma tables (required by PrismaAdapter + strategy:"database") -- NextAuth / Prisma tables (required by PrismaAdapter + strategy:"database")
-- Only created if not already present from a prisma migrate run. -- Only created if not already present from a prisma migrate run.