diff --git a/.changeset/custom-agent-loop-fixes.md b/.changeset/custom-agent-loop-fixes.md new file mode 100644 index 0000000000..4d37fff535 --- /dev/null +++ b/.changeset/custom-agent-loop-fixes.md @@ -0,0 +1,9 @@ +--- +"@trigger.dev/sdk": patch +--- + +Three fixes for custom agent loops (`chat.customAgent`, `chat.createSession`, and hand-rolled `MessageAccumulator` loops): + +- Continuation runs no longer replay already-answered user messages into the first turn. The `.in` resume cursor is now seeded before any listener attaches (the same boot logic `chat.agent` uses), so a chat that continues after a cancel, crash, or upgrade only sees genuinely new messages. +- Steering a hand-rolled loop mid-stream no longer wipes the in-flight assistant response. `chat.pipeAndCapture` now stamps a server-generated message id on the stream, so a `prepareStep` injection keeps the partial text instead of replacing the message. +- Task-backed tools (`ai.toolExecute`) now work from custom agent loops: the parent's session is threaded to the child run, so child tasks can stream progress into the chat with `chat.stream.writer({ target: "root" })` instead of failing with "session handle is not initialized". diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index e3b3e60549..1e97630ae4 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -221,6 +221,47 @@ export async function __findLatestSessionInCursorForTests( return findLatestSessionInCursor(chatId); } +/** + * Seed the `.in` resume cursor for custom-agent loops (`chat.customAgent` + * raw loops and `chat.createSession`) the way `chat.agent`'s boot does. + * + * MUST run before anything attaches a `.in` listener (`createStopSignal`, + * `chat.messages.on`, the first wait): attaching opens the SSE tail with + * `Last-Event-ID` from the seeded cursor, so attach-then-seed replays + * every record from seq 0 — already-answered user messages get delivered + * into the new run's first wait and the loop re-answers them. + * + * Seeds both cursors: `setLastSeqNum` controls the SSE `Last-Event-ID`, + * `setLastDispatchedSeqNum` gates waiter dispatch — seeding only the + * former still re-delivers records the manager buffered before the seed. + * + * No-ops on fresh boots and when a cursor is already seeded (e.g. the + * `chatCustomAgent` wrapper ran before a nested `createChatSession`). + * @internal + */ +async function seedSessionInResumeCursorForCustomLoop( + payload: Pick +): Promise { + if (sessionStreams.lastSeqNum(payload.chatId, "in") !== undefined) return; + // No continuation/attempt gate: the wire may omit `continuation` on a + // run that still has prior turns (chat.agent covers that case via its + // snapshot). The scan doubles as the prior-state probe — a fresh + // session has no turn-complete on `.out`, returns no cursor, and + // seeds nothing. Cost on fresh boots is one non-blocking records read. + try { + const cursor = await findLatestSessionInCursor(payload.chatId); + if (cursor !== undefined) { + sessionStreams.setLastSeqNum(payload.chatId, "in", cursor); + sessionStreams.setLastDispatchedSeqNum(payload.chatId, "in", cursor); + } + } catch (error) { + logger.warn( + "chat session: session.in resume cursor lookup failed; old messages may replay", + { error: error instanceof Error ? error.message : String(error) } + ); + } +} + /** * Versioned blob written to S3 after every turn completes (when no * `hydrateMessages` hook is registered). Read at run boot to seed the @@ -921,6 +962,15 @@ function createTaskToolExecuteHandler< toolMeta.turn = chatCtx.turn; toolMeta.continuation = chatCtx.continuation; toolMeta.clientData = chatCtx.clientData; + } else { + // Hand-rolled chat.customAgent loops never set per-turn context, but + // the wrapper binds the session handle at run boot — thread the + // chatId from it so subtask chat helpers (`chat.stream.writer` + // with target "root") can open the parent's session. + const sessionHandle = locals.get(chatSessionHandleKey); + if (sessionHandle) { + toolMeta.chatId = sessionHandle.id; + } } const chatLocals: Record = {}; @@ -5113,6 +5163,10 @@ function chatCustomAgent< markChatAgentRunForStreamsWarning(); taskContext.setConversationId(payload.chatId); stampConversationIdOnActiveSpan(payload.chatId); + // Seed the `.in` resume cursor before user code attaches any `.in` + // listener — otherwise a continuation boot replays already-answered + // messages into the loop's first wait. + await seedSessionInResumeCursorForCustomLoop(payload); return userRun(payload, runOptions); }, }); @@ -8613,8 +8667,15 @@ async function pipeChatAndCapture( resolveOnFinish = r; }); + const resolvedOptions = resolveUIMessageStreamOptions(); const uiStream = source.toUIMessageStream({ - ...resolveUIMessageStreamOptions(), + ...resolvedOptions, + // Stamp a server-generated id on the start chunk, same as chat.agent's + // pipe. Without it the AI SDK regenerates the assistant id when a + // prepareStep injection (steering) starts a new step mid-stream, and + // the frontend replaces the partial message — wiping the + // pre-injection text from the UI and the captured response. + generateMessageId: resolvedOptions.generateMessageId ?? generateMessageId, onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => { captured = responseMessage; resolveOnFinish!(); @@ -8979,13 +9040,23 @@ function createChatSession( [Symbol.asyncIterator]() { let currentPayload = payload; let turn = -1; - const stop = createStopSignal(); + // Created on the first next() call, AFTER the resume-cursor seed — + // createStopSignal attaches the `.in` SSE tail, and attaching + // before the seed replays every record from seq 0 (the seed is a + // no-op when the chatCustomAgent wrapper already ran it). + let stop!: ReturnType; + let booted = false; const accumulator = new ChatMessageAccumulator(); let previousTurnUsage: LanguageModelUsage | undefined; let cumulativeUsage: LanguageModelUsage = emptyUsage(); return { async next(): Promise> { + if (!booted) { + booted = true; + await seedSessionInResumeCursorForCustomLoop(currentPayload); + stop = createStopSignal(); + } turn++; // First turn: wait when the boot payload carries no message. @@ -9328,7 +9399,8 @@ function createChatSession( }, async return() { - stop.cleanup(); + // `stop` only exists once next() has booted the iterator. + stop?.cleanup(); return { done: true, value: undefined }; }, };