From 1a935dd83d998db87a4d26d28508b2b7a7e61198 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 12 Jun 2026 18:46:56 +0100 Subject: [PATCH 1/2] fix(sdk): custom agent loop parity with chat.agent for continuations, steering, and subtasks Three fixes for chat.customAgent raw loops and chat.createSession: Continuation boots replayed already-answered user messages into the first wait: the .in SSE tail attached (via createStopSignal or any listener) before a resume cursor existed, so S2 replayed from seq 0. The custom-agent wrapper and createChatSession's first next() now seed both manager cursors from the latest turn-complete header before anything attaches, the same boot logic chat.agent uses. Seeding only setLastSeqNum after attach (the reverted earlier attempt) does not work because dispatch is gated on the other cursor. Steering a hand-rolled loop mid-stream wiped the in-flight assistant text: pipeChatAndCapture called toUIMessageStream without generateMessageId, so a prepareStep injection starting a new step regenerated the assistant id and the frontend replaced the partial message. It now stamps the server-generated id like chat.agent's pipe. Task-backed tools (ai.toolExecute) failed from custom agent loops with "session handle is not initialized" on the child run: the chatId only threaded from the per-turn context that raw loops never set. It now falls back to the session handle the customAgent wrapper binds at boot, so child tasks can stream into the parent's chat with chat.stream.writer({ target: "root" }). --- .changeset/custom-agent-loop-fixes.md | 9 ++++ packages/trigger-sdk/src/v3/ai.ts | 75 +++++++++++++++++++++++++-- 2 files changed, 81 insertions(+), 3 deletions(-) create mode 100644 .changeset/custom-agent-loop-fixes.md diff --git a/.changeset/custom-agent-loop-fixes.md b/.changeset/custom-agent-loop-fixes.md new file mode 100644 index 0000000000..4d37fff535 --- /dev/null +++ b/.changeset/custom-agent-loop-fixes.md @@ -0,0 +1,9 @@ +--- +"@trigger.dev/sdk": patch +--- + +Three fixes for custom agent loops (`chat.customAgent`, `chat.createSession`, and hand-rolled `MessageAccumulator` loops): + +- Continuation runs no longer replay already-answered user messages into the first turn. The `.in` resume cursor is now seeded before any listener attaches (the same boot logic `chat.agent` uses), so a chat that continues after a cancel, crash, or upgrade only sees genuinely new messages. +- Steering a hand-rolled loop mid-stream no longer wipes the in-flight assistant response. `chat.pipeAndCapture` now stamps a server-generated message id on the stream, so a `prepareStep` injection keeps the partial text instead of replacing the message. +- Task-backed tools (`ai.toolExecute`) now work from custom agent loops: the parent's session is threaded to the child run, so child tasks can stream progress into the chat with `chat.stream.writer({ target: "root" })` instead of failing with "session handle is not initialized". diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index e3b3e60549..9fd0d8e95f 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -221,6 +221,44 @@ export async function __findLatestSessionInCursorForTests( return findLatestSessionInCursor(chatId); } +/** + * Seed the `.in` resume cursor for custom-agent loops (`chat.customAgent` + * raw loops and `chat.createSession`) the way `chat.agent`'s boot does. + * + * MUST run before anything attaches a `.in` listener (`createStopSignal`, + * `chat.messages.on`, the first wait): attaching opens the SSE tail with + * `Last-Event-ID` from the seeded cursor, so attach-then-seed replays + * every record from seq 0 — already-answered user messages get delivered + * into the new run's first wait and the loop re-answers them. + * + * Seeds both cursors: `setLastSeqNum` controls the SSE `Last-Event-ID`, + * `setLastDispatchedSeqNum` gates waiter dispatch — seeding only the + * former still re-delivers records the manager buffered before the seed. + * + * No-ops on fresh boots and when a cursor is already seeded (e.g. the + * `chatCustomAgent` wrapper ran before a nested `createChatSession`). + * @internal + */ +async function seedSessionInResumeCursorForCustomLoop( + payload: Pick +): Promise { + const attemptNumber = taskContext.ctx?.attempt.number ?? 1; + if (payload.continuation !== true && attemptNumber <= 1) return; + if (sessionStreams.lastSeqNum(payload.chatId, "in") !== undefined) return; + try { + const cursor = await findLatestSessionInCursor(payload.chatId); + if (cursor !== undefined) { + sessionStreams.setLastSeqNum(payload.chatId, "in", cursor); + sessionStreams.setLastDispatchedSeqNum(payload.chatId, "in", cursor); + } + } catch (error) { + logger.warn( + "chat session: session.in resume cursor lookup failed; old messages may replay", + { error: error instanceof Error ? error.message : String(error) } + ); + } +} + /** * Versioned blob written to S3 after every turn completes (when no * `hydrateMessages` hook is registered). Read at run boot to seed the @@ -921,6 +959,15 @@ function createTaskToolExecuteHandler< toolMeta.turn = chatCtx.turn; toolMeta.continuation = chatCtx.continuation; toolMeta.clientData = chatCtx.clientData; + } else { + // Hand-rolled chat.customAgent loops never set per-turn context, but + // the wrapper binds the session handle at run boot — thread the + // chatId from it so subtask chat helpers (`chat.stream.writer` + // with target "root") can open the parent's session. + const sessionHandle = locals.get(chatSessionHandleKey); + if (sessionHandle) { + toolMeta.chatId = sessionHandle.id; + } } const chatLocals: Record = {}; @@ -5113,6 +5160,10 @@ function chatCustomAgent< markChatAgentRunForStreamsWarning(); taskContext.setConversationId(payload.chatId); stampConversationIdOnActiveSpan(payload.chatId); + // Seed the `.in` resume cursor before user code attaches any `.in` + // listener — otherwise a continuation boot replays already-answered + // messages into the loop's first wait. + await seedSessionInResumeCursorForCustomLoop(payload); return userRun(payload, runOptions); }, }); @@ -8613,8 +8664,15 @@ async function pipeChatAndCapture( resolveOnFinish = r; }); + const resolvedOptions = resolveUIMessageStreamOptions(); const uiStream = source.toUIMessageStream({ - ...resolveUIMessageStreamOptions(), + ...resolvedOptions, + // Stamp a server-generated id on the start chunk, same as chat.agent's + // pipe. Without it the AI SDK regenerates the assistant id when a + // prepareStep injection (steering) starts a new step mid-stream, and + // the frontend replaces the partial message — wiping the + // pre-injection text from the UI and the captured response. + generateMessageId: resolvedOptions.generateMessageId ?? generateMessageId, onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => { captured = responseMessage; resolveOnFinish!(); @@ -8979,13 +9037,23 @@ function createChatSession( [Symbol.asyncIterator]() { let currentPayload = payload; let turn = -1; - const stop = createStopSignal(); + // Created on the first next() call, AFTER the resume-cursor seed — + // createStopSignal attaches the `.in` SSE tail, and attaching + // before the seed replays every record from seq 0 (the seed is a + // no-op when the chatCustomAgent wrapper already ran it). + let stop!: ReturnType; + let booted = false; const accumulator = new ChatMessageAccumulator(); let previousTurnUsage: LanguageModelUsage | undefined; let cumulativeUsage: LanguageModelUsage = emptyUsage(); return { async next(): Promise> { + if (!booted) { + booted = true; + await seedSessionInResumeCursorForCustomLoop(currentPayload); + stop = createStopSignal(); + } turn++; // First turn: wait when the boot payload carries no message. @@ -9328,7 +9396,8 @@ function createChatSession( }, async return() { - stop.cleanup(); + // `stop` only exists once next() has booted the iterator. + stop?.cleanup(); return { done: true, value: undefined }; }, }; From 174ba12870881ca33069815edd07eb2e7f98c058 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 12 Jun 2026 21:50:31 +0100 Subject: [PATCH 2/2] fix(sdk): seed the custom-loop resume cursor without a continuation gate The wire can omit the continuation flag on a run that still has prior turns. The cursor scan doubles as the prior-state probe (a fresh session has no turn-complete on .out and seeds nothing), so run it on every custom-loop boot instead of gating on continuation or attempt number, mirroring the snapshot-exists arm of chat.agent's boot check. --- packages/trigger-sdk/src/v3/ai.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 9fd0d8e95f..1e97630ae4 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -242,9 +242,12 @@ export async function __findLatestSessionInCursorForTests( async function seedSessionInResumeCursorForCustomLoop( payload: Pick ): Promise { - const attemptNumber = taskContext.ctx?.attempt.number ?? 1; - if (payload.continuation !== true && attemptNumber <= 1) return; if (sessionStreams.lastSeqNum(payload.chatId, "in") !== undefined) return; + // No continuation/attempt gate: the wire may omit `continuation` on a + // run that still has prior turns (chat.agent covers that case via its + // snapshot). The scan doubles as the prior-state probe — a fresh + // session has no turn-complete on `.out`, returns no cursor, and + // seeds nothing. Cost on fresh boots is one non-blocking records read. try { const cursor = await findLatestSessionInCursor(payload.chatId); if (cursor !== undefined) {