Implement accumulate-then-act streaming for thinking models
This commit is contained in:
@@ -17,7 +17,7 @@
|
|||||||
import { NextResponse } from "next/server";
|
import { NextResponse } from "next/server";
|
||||||
import { requireWorkspacePrincipal } from "@/lib/auth/workspace-auth";
|
import { requireWorkspacePrincipal } from "@/lib/auth/workspace-auth";
|
||||||
import { query, queryOne } from "@/lib/db-postgres";
|
import { query, queryOne } from "@/lib/db-postgres";
|
||||||
import { callVibnChat } from "@/lib/ai/vibn-chat-model";
|
import { callVibnChat, streamVibnChat } from "@/lib/ai/vibn-chat-model";
|
||||||
import {
|
import {
|
||||||
VIBN_TOOL_DEFINITIONS,
|
VIBN_TOOL_DEFINITIONS,
|
||||||
executeMcpTool,
|
executeMcpTool,
|
||||||
@@ -1165,8 +1165,8 @@ export async function POST(request: Request) {
|
|||||||
extraSystem += `\n\n[WARNING] You only have ${maxToolRounds - round} tool calls left before you are forcefully terminated. Stop exploring, make your final edits, and write your final response to the user NOW.`;
|
extraSystem += `\n\n[WARNING] You only have ${maxToolRounds - round} tool calls left before you are forcefully terminated. Stop exploring, make your final edits, and write your final response to the user NOW.`;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute tool calls and add results. OpenAI-compatible APIs
|
// Execute tool calls and add results using accumulating stream.
|
||||||
const resp = await callVibnChat({
|
const stream = streamVibnChat({
|
||||||
systemPrompt: systemPrompt + extraSystem,
|
systemPrompt: systemPrompt + extraSystem,
|
||||||
messages,
|
messages,
|
||||||
tools: toolDefs,
|
tools: toolDefs,
|
||||||
@@ -1175,14 +1175,48 @@ export async function POST(request: Request) {
|
|||||||
signal: clientSignal,
|
signal: clientSignal,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const resp = {
|
||||||
|
text: "",
|
||||||
|
thoughts: "",
|
||||||
|
toolCalls: [] as any[],
|
||||||
|
error: undefined as string | undefined,
|
||||||
|
};
|
||||||
|
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
if (aborted) break;
|
||||||
|
|
||||||
|
if (chunk.type === "thinking_delta" && chunk.text) {
|
||||||
|
resp.thoughts += chunk.text;
|
||||||
|
emit({ type: "thinking_delta", text: chunk.text });
|
||||||
|
} else if (chunk.type === "text_delta" && chunk.text) {
|
||||||
|
resp.text += chunk.text;
|
||||||
|
emit({ type: "text_delta", text: chunk.text });
|
||||||
|
} else if (chunk.type === "tool_calls" && chunk.toolCalls) {
|
||||||
|
resp.toolCalls = chunk.toolCalls;
|
||||||
|
} else if (chunk.type === "error" && chunk.error) {
|
||||||
|
resp.error = chunk.error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the model produced any thoughts or text, record them in the timeline once stream is complete.
|
||||||
|
// (The UI handles the delta-rendering live, but we save the complete chunk to Postgres).
|
||||||
|
if (resp.thoughts) {
|
||||||
|
assistantTimeline.push({ kind: "thought", text: resp.thoughts });
|
||||||
|
}
|
||||||
|
if (resp.text) {
|
||||||
|
assistantText += (assistantText ? "\n\n" : "") + resp.text;
|
||||||
|
assistantTextSegments.push(resp.text);
|
||||||
|
assistantTimeline.push({ kind: "text", text: resp.text });
|
||||||
|
roundsSinceText = 0;
|
||||||
|
toolCallsSinceText = 0;
|
||||||
|
} else if (resp.toolCalls.length) {
|
||||||
|
roundsSinceText++;
|
||||||
|
toolCallsSinceText += resp.toolCalls.length;
|
||||||
|
}
|
||||||
|
|
||||||
// When the model first reaches for a mutation, advance the phase so
|
// When the model first reaches for a mutation, advance the phase so
|
||||||
// the UI reflects "Executing Code Edits". We deliberately do NOT force
|
// the UI reflects "Executing Code Edits".
|
||||||
// a separate planning round or discard the edit (the old "C-08
|
const requestedMutations = resp.toolCalls.filter((tc: any) =>
|
||||||
// checkpoint" dance) — that made the model plan, stall on an empty
|
|
||||||
// turn, and never execute, and it seeded scope-creep via the forced
|
|
||||||
// "verification plan". The agent edits directly; the post-loop
|
|
||||||
// verification layer checks the result and drives any fixes.
|
|
||||||
const requestedMutations = resp.toolCalls.filter((tc) =>
|
|
||||||
[
|
[
|
||||||
"fs_write",
|
"fs_write",
|
||||||
"fs_edit",
|
"fs_edit",
|
||||||
@@ -1197,10 +1231,7 @@ export async function POST(request: Request) {
|
|||||||
emit({ type: "phase", phase, label: "Executing Code Edits" });
|
emit({ type: "phase", phase, label: "Executing Code Edits" });
|
||||||
}
|
}
|
||||||
|
|
||||||
// A Stop click aborts the in-flight generation, which surfaces here
|
// A Stop click aborts the in-flight generation
|
||||||
// as resp.error === "aborted". Treat it as a clean user stop (break to
|
|
||||||
// the post-loop abort handling that persists the partial reply),
|
|
||||||
// NOT as a fatal error shown to the user.
|
|
||||||
if (resp.error === "aborted" || aborted) {
|
if (resp.error === "aborted" || aborted) {
|
||||||
aborted = true;
|
aborted = true;
|
||||||
break;
|
break;
|
||||||
@@ -1212,28 +1243,6 @@ export async function POST(request: Request) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stream the model's reasoning narration as a separate SSE
|
|
||||||
// event type. We pay for thinking tokens whether or not we
|
|
||||||
// ask for them, so making them visible is free transparency
|
|
||||||
// — and it cures the "tool tray with no narrative" feel.
|
|
||||||
if (resp.thoughts) {
|
|
||||||
assistantTimeline.push({ kind: "thought", text: resp.thoughts });
|
|
||||||
emit({ type: "thinking", text: resp.thoughts });
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stream user-facing text to client.
|
|
||||||
if (resp.text) {
|
|
||||||
assistantText += (assistantText ? "\n\n" : "") + resp.text;
|
|
||||||
assistantTextSegments.push(resp.text);
|
|
||||||
assistantTimeline.push({ kind: "text", text: resp.text });
|
|
||||||
emit({ type: "text", text: resp.text });
|
|
||||||
roundsSinceText = 0;
|
|
||||||
toolCallsSinceText = 0;
|
|
||||||
} else if (resp.toolCalls.length) {
|
|
||||||
roundsSinceText++;
|
|
||||||
toolCallsSinceText += resp.toolCalls.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Announce tool calls
|
// Announce tool calls
|
||||||
for (const tc of resp.toolCalls) {
|
for (const tc of resp.toolCalls) {
|
||||||
assistantToolCalls.push(tc);
|
assistantToolCalls.push(tc);
|
||||||
|
|||||||
@@ -1281,6 +1281,7 @@ export function ChatPanel({
|
|||||||
setMessages((prev) => [...prev, userMsg]);
|
setMessages((prev) => [...prev, userMsg]);
|
||||||
|
|
||||||
let assistantContent = "";
|
let assistantContent = "";
|
||||||
|
let lastKind = "";
|
||||||
const assistantMsg: Message = { role: "assistant", content: "" };
|
const assistantMsg: Message = { role: "assistant", content: "" };
|
||||||
let msgIndex = -1;
|
let msgIndex = -1;
|
||||||
|
|
||||||
@@ -1402,6 +1403,43 @@ export function ChatPanel({
|
|||||||
}
|
}
|
||||||
return next;
|
return next;
|
||||||
});
|
});
|
||||||
|
} else if (ev.type === "text_delta" && ev.text) {
|
||||||
|
if (lastKind === "text") {
|
||||||
|
assistantContent += ev.text;
|
||||||
|
} else {
|
||||||
|
assistantContent += (assistantContent ? "\n\n" : "") + ev.text;
|
||||||
|
lastKind = "text";
|
||||||
|
}
|
||||||
|
setMessages((prev) => {
|
||||||
|
const next = [...prev];
|
||||||
|
if (msgIndex >= 0 && next[msgIndex]) {
|
||||||
|
const tl = [...(next[msgIndex].timeline ?? [])];
|
||||||
|
const last = tl[tl.length - 1];
|
||||||
|
if (last && last.kind === "text") {
|
||||||
|
tl[tl.length - 1] = { ...last, text: last.text + ev.text };
|
||||||
|
} else {
|
||||||
|
tl.push({ kind: "text", text: ev.text });
|
||||||
|
}
|
||||||
|
next[msgIndex] = { ...next[msgIndex], timeline: tl };
|
||||||
|
}
|
||||||
|
return next;
|
||||||
|
});
|
||||||
|
} else if (ev.type === "thinking_delta" && ev.text) {
|
||||||
|
lastKind = "thought";
|
||||||
|
setMessages((prev) => {
|
||||||
|
const next = [...prev];
|
||||||
|
if (msgIndex >= 0 && next[msgIndex]) {
|
||||||
|
const tl = [...(next[msgIndex].timeline ?? [])];
|
||||||
|
const last = tl[tl.length - 1];
|
||||||
|
if (last && last.kind === "thought") {
|
||||||
|
tl[tl.length - 1] = { ...last, text: last.text + ev.text };
|
||||||
|
} else {
|
||||||
|
tl.push({ kind: "thought", text: ev.text });
|
||||||
|
}
|
||||||
|
next[msgIndex] = { ...next[msgIndex], timeline: tl };
|
||||||
|
}
|
||||||
|
return next;
|
||||||
|
});
|
||||||
} else if (ev.type === "thinking" && ev.text) {
|
} else if (ev.type === "thinking" && ev.text) {
|
||||||
// Each thinking event from the server is one round of the
|
// Each thinking event from the server is one round of the
|
||||||
// model's reasoning. Push as a separate timeline entry so
|
// model's reasoning. Push as a separate timeline entry so
|
||||||
|
|||||||
@@ -39,10 +39,11 @@ export interface ToolDefinition {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export interface ChatChunk {
|
export interface ChatChunk {
|
||||||
type: "text" | "thinking" | "tool_call" | "done" | "error";
|
type: "text" | "thinking" | "text_delta" | "thinking_delta" | "tool_calls" | "done" | "error";
|
||||||
text?: string;
|
text?: string;
|
||||||
toolCall?: ToolCall;
|
toolCalls?: ToolCall[];
|
||||||
error?: string;
|
error?: string;
|
||||||
|
finishReason?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
type GeminiPart = Record<string, unknown>;
|
type GeminiPart = Record<string, unknown>;
|
||||||
@@ -313,17 +314,26 @@ export async function* streamGeminiChat(opts: {
|
|||||||
if (part.text) {
|
if (part.text) {
|
||||||
if (isPartThought(part as Record<string, unknown>)) {
|
if (isPartThought(part as Record<string, unknown>)) {
|
||||||
thoughts += part.text;
|
thoughts += part.text;
|
||||||
yield { type: "thinking", text: part.text };
|
yield { type: "thinking_delta", text: part.text };
|
||||||
} else {
|
} else {
|
||||||
text += part.text;
|
text += part.text;
|
||||||
yield { type: "text", text: part.text };
|
yield { type: "text_delta", text: part.text };
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (part.functionCall) {
|
if (part.functionCall) {
|
||||||
toolCalls.push(part.functionCall);
|
toolCalls.push({
|
||||||
|
id: `tc-${Date.now()}-${Math.random().toString(36).slice(2)}`,
|
||||||
|
name: part.functionCall.name,
|
||||||
|
args: (part.functionCall.args as Record<string, unknown>) ?? {},
|
||||||
|
thoughtSignature: (part as { thoughtSignature?: string }).thoughtSignature,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (toolCalls.length > 0) {
|
||||||
|
yield { type: "tool_calls", toolCalls };
|
||||||
|
}
|
||||||
const durationMs = Date.now() - startTime;
|
const durationMs = Date.now() - startTime;
|
||||||
|
|
||||||
logTrainingTelemetryDb({
|
logTrainingTelemetryDb({
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
* We normalize them to JSON Schema before sending.
|
* We normalize them to JSON Schema before sending.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import type { ChatMessage, ToolCall, ToolDefinition } from "./gemini-chat";
|
import type { ChatMessage, ToolCall, ToolDefinition, ChatChunk } from "./gemini-chat";
|
||||||
|
|
||||||
const DEFAULT_CHAT_URL = "https://api.deepseek.com/chat/completions";
|
const DEFAULT_CHAT_URL = "https://api.deepseek.com/chat/completions";
|
||||||
|
|
||||||
@@ -389,3 +389,149 @@ export async function callOpenAiCompatibleChat(opts: {
|
|||||||
|
|
||||||
return { text, thoughts, toolCalls, finishReason };
|
return { text, thoughts, toolCalls, finishReason };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
export async function* streamOpenAiCompatibleChat(opts: {
|
||||||
|
systemPrompt: string;
|
||||||
|
messages: ChatMessage[];
|
||||||
|
tools?: ToolDefinition[];
|
||||||
|
temperature?: number;
|
||||||
|
includeThoughts?: boolean;
|
||||||
|
signal?: AbortSignal;
|
||||||
|
}): AsyncGenerator<ChatChunk> {
|
||||||
|
const apiKey = resolveApiKey();
|
||||||
|
if (!apiKey) {
|
||||||
|
yield {
|
||||||
|
type: "error",
|
||||||
|
error: "No API key: set DEEPSEEK_API_KEY or VIBN_OPENAI_COMPATIBLE_API_KEY for OpenAI-compatible chat.",
|
||||||
|
};
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const url = resolveChatUrl();
|
||||||
|
const model = resolveModel();
|
||||||
|
const tools = toOpenAiTools(opts.tools);
|
||||||
|
const oaiMessages = toOpenAiMessages(opts.systemPrompt, opts.messages);
|
||||||
|
const body: Record<string, unknown> = {
|
||||||
|
model,
|
||||||
|
messages: oaiMessages,
|
||||||
|
temperature: opts.temperature ?? 0.7,
|
||||||
|
max_tokens: 8192,
|
||||||
|
stream: true,
|
||||||
|
};
|
||||||
|
if (tools?.length) body.tools = tools;
|
||||||
|
|
||||||
|
let res: Response;
|
||||||
|
try {
|
||||||
|
res = await fetch(url, {
|
||||||
|
method: "POST",
|
||||||
|
headers: {
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
Authorization: `Bearer ${apiKey}`,
|
||||||
|
},
|
||||||
|
body: JSON.stringify(body),
|
||||||
|
signal: opts.signal,
|
||||||
|
});
|
||||||
|
} catch (e) {
|
||||||
|
const aborted = opts.signal?.aborted || (e instanceof Error && e.name === "AbortError");
|
||||||
|
yield {
|
||||||
|
type: "error",
|
||||||
|
error: aborted ? "aborted" : `Network error: ${e instanceof Error ? e.message : String(e)}`,
|
||||||
|
};
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!res.ok) {
|
||||||
|
const text = await res.text().catch(() => "");
|
||||||
|
yield { type: "error", error: `Chat API error ${res.status}: ${text}` };
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const reader = res.body?.getReader();
|
||||||
|
if (!reader) {
|
||||||
|
yield { type: "error", error: "No response body stream." };
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const decoder = new TextDecoder("utf-8");
|
||||||
|
let buffer = "";
|
||||||
|
|
||||||
|
// Accumulated tool calls
|
||||||
|
const toolCallsAcc: Record<number, { id: string; name: string; argsStr: string }> = {};
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
|
||||||
|
buffer += decoder.decode(value, { stream: true });
|
||||||
|
const lines = buffer.split("\n");
|
||||||
|
buffer = lines.pop() ?? "";
|
||||||
|
|
||||||
|
for (const line of lines) {
|
||||||
|
const tLine = line.trim();
|
||||||
|
if (!tLine || !tLine.startsWith("data: ")) continue;
|
||||||
|
const dataStr = tLine.slice(6);
|
||||||
|
if (dataStr === "[DONE]") continue;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(dataStr);
|
||||||
|
const delta = parsed.choices?.[0]?.delta;
|
||||||
|
if (!delta) continue;
|
||||||
|
|
||||||
|
if (typeof delta.reasoning_content === "string" && delta.reasoning_content.length > 0) {
|
||||||
|
yield { type: "thinking_delta", text: delta.reasoning_content };
|
||||||
|
}
|
||||||
|
if (typeof delta.content === "string" && delta.content.length > 0) {
|
||||||
|
yield { type: "text_delta", text: delta.content };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (delta.tool_calls && Array.isArray(delta.tool_calls)) {
|
||||||
|
for (const tc of delta.tool_calls) {
|
||||||
|
const idx = tc.index;
|
||||||
|
if (idx === undefined) continue;
|
||||||
|
if (!toolCallsAcc[idx]) {
|
||||||
|
toolCallsAcc[idx] = { id: "", name: "", argsStr: "" };
|
||||||
|
}
|
||||||
|
if (tc.id) toolCallsAcc[idx].id = tc.id;
|
||||||
|
if (tc.function?.name) toolCallsAcc[idx].name += tc.function.name;
|
||||||
|
if (tc.function?.arguments) toolCallsAcc[idx].argsStr += tc.function.arguments;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
// ignore unparseable chunks
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
const aborted = opts.signal?.aborted || (e instanceof Error && e.name === "AbortError");
|
||||||
|
yield {
|
||||||
|
type: "error",
|
||||||
|
error: aborted ? "aborted" : `Stream read error: ${e instanceof Error ? e.message : String(e)}`,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const toolCalls: ToolCall[] = [];
|
||||||
|
for (const idx of Object.keys(toolCallsAcc).sort((a,b) => Number(a) - Number(b))) {
|
||||||
|
const acc = toolCallsAcc[Number(idx)];
|
||||||
|
let args = {};
|
||||||
|
try {
|
||||||
|
if (acc.argsStr) args = JSON.parse(acc.argsStr);
|
||||||
|
} catch {
|
||||||
|
// ignore bad json
|
||||||
|
}
|
||||||
|
if (acc.name) {
|
||||||
|
toolCalls.push({
|
||||||
|
id: acc.id || `tc-${Date.now()}-${Math.random().toString(36).slice(2)}`,
|
||||||
|
name: acc.name,
|
||||||
|
args
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (toolCalls.length > 0) {
|
||||||
|
yield { type: "tool_calls", toolCalls };
|
||||||
|
}
|
||||||
|
|
||||||
|
yield { type: "done" };
|
||||||
|
}
|
||||||
|
|||||||
@@ -13,8 +13,8 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import type { ChatMessage, ToolDefinition } from "./gemini-chat";
|
import type { ChatMessage, ToolDefinition } from "./gemini-chat";
|
||||||
import { callGeminiChat } from "./gemini-chat";
|
import { callGeminiChat, streamGeminiChat } from "./gemini-chat";
|
||||||
import { callOpenAiCompatibleChat } from "./openai-compatible-chat";
|
import { callOpenAiCompatibleChat, streamOpenAiCompatibleChat } from "./openai-compatible-chat";
|
||||||
|
|
||||||
export type VibnChatCallOpts = {
|
export type VibnChatCallOpts = {
|
||||||
systemPrompt: string;
|
systemPrompt: string;
|
||||||
@@ -33,3 +33,13 @@ export async function callVibnChat(opts: VibnChatCallOpts) {
|
|||||||
}
|
}
|
||||||
return callGeminiChat(opts);
|
return callGeminiChat(opts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
export async function* streamVibnChat(opts: VibnChatCallOpts) {
|
||||||
|
const p = (process.env.VIBN_CHAT_PROVIDER || "gemini").toLowerCase().trim();
|
||||||
|
if (p === "deepseek" || p === "openai_compatible") {
|
||||||
|
yield* streamOpenAiCompatibleChat(opts);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
yield* streamGeminiChat(opts);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user