diff --git a/internal/api/server.go b/internal/api/server.go index d9eadf2..021819c 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -320,9 +320,16 @@ func (d Deps) handleIngestDocument(w http.ResponseWriter, r *http.Request) { SourceRef: key, }) if err := d.Queue.Enqueue(ctx, queue.Job{ - Kind: queue.KindIngestDocument, - Payload: payload, - DedupeKey: string(docID), + Kind: queue.KindIngestDocument, + Payload: payload, + // Cap retries so a transient failure (e.g. a parse-timeout or a + // not-yet-visible source under heavy concurrent ingestion) gets a + // few chances to recover, without the queue's default 25-attempt + // exponential backoff dragging a genuinely-bad document out for + // hours. The document stays "parsing" across these attempts and + // only flips to "failed" on the last one (see Pipeline.fail). + MaxRetries: 5, + DedupeKey: string(docID), }); err != nil { d.Logger.Error("ingest: enqueue failed", "err", err) writeErr(w, http.StatusInternalServerError, "enqueue failed") diff --git a/pkg/ingest/ingest.go b/pkg/ingest/ingest.go index 78647a4..9d97507 100644 --- a/pkg/ingest/ingest.go +++ b/pkg/ingest/ingest.go @@ -317,6 +317,26 @@ func (p *Pipeline) acquireGlobalLLM(ctx context.Context) (release func(), ok boo } } +// lastAttemptCtxKey carries whether the current ingest job is on its final +// queue attempt. When it isn't, a transient failure must NOT be surfaced as a +// terminal "failed" status — the queue will retry it. +type lastAttemptCtxKeyT struct{} + +func withLastAttempt(ctx context.Context, last bool) context.Context { + return context.WithValue(ctx, lastAttemptCtxKeyT{}, last) +} + +// isLastAttempt reports whether this is the final attempt. Defaults to true +// when unset (callers that don't track attempts, e.g. tests, keep the +// fail-immediately behaviour). +func isLastAttempt(ctx context.Context) bool { + v, ok := ctx.Value(lastAttemptCtxKeyT{}).(bool) + if !ok { + return true + } + return v +} + // Handler returns a queue.Handler suitable for queue.KindIngestDocument. func (p *Pipeline) Handler() queue.Handler { return func(ctx context.Context, j queue.Job) error { @@ -324,7 +344,11 @@ func (p *Pipeline) Handler() queue.Handler { if err := json.Unmarshal(j.Payload, &payload); err != nil { return fmt.Errorf("decode payload: %w", err) } - return p.Run(ctx, payload) + // A job with no attempt tracking (MaxAttempts<=0) is treated as its + // own last attempt; otherwise it's the last attempt only when the + // queue has used up its retries. + last := j.MaxAttempts <= 0 || j.Attempt >= j.MaxAttempts + return p.Run(withLastAttempt(ctx, last), payload) } } @@ -1005,11 +1029,30 @@ func fallbackSummary(title, body string) string { func (p *Pipeline) fail(ctx context.Context, store docPersister, id tree.DocumentID, stage string, cause error) { msg := fmt.Sprintf("%s: %s", stage, cause.Error()) - // Use a FRESH context for the failure write — the inbound one is - // almost certainly the reason we're failing (timeout/cancel) and - // reusing it would leave the doc stuck on "parsing" forever. + // Use a FRESH context for the status write — the inbound one is almost + // certainly the reason we're failing (timeout/cancel) and reusing it + // would leave the doc stuck mid-flight. failCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + + // If the queue will retry this job, the failure is (so far) transient — + // under heavy concurrent ingestion, parse can hit a parse-timeout or a + // not-yet-visible source and recover on a later attempt. Surfacing + // "failed" now would tell a polling client the document is dead when it + // isn't, so keep it in "parsing" and let the retry run. Only the final + // attempt produces a terminal "failed". + if !isLastAttempt(ctx) { + p.Logger.Warn("ingest: transient failure, will retry", + "document_id", string(id), "stage", stage, "cause", cause.Error()) + if err := store.SetDocumentStatus(failCtx, id, db.StatusParsing, ""); err != nil { + p.Logger.Error("ingest: failed to reset document to parsing", "err", err) + } + return + } + + // Terminal failure: log it (previously silent — failures only showed up + // in the DB error_message) and mark the document failed. + p.Logger.Error("ingest: failed", "document_id", string(id), "stage", stage, "cause", cause.Error()) if err := store.SetDocumentStatus(failCtx, id, db.StatusFailed, msg); err != nil { p.Logger.Error("ingest: failed to mark document failed", "err", err, "cause", cause) } diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index d5285e8..45e0380 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -44,6 +44,15 @@ type Job struct { // Optional: max retries before dead-lettering. MaxRetries int `json:"max_retries,omitempty"` + + // Attempt is the 1-based current attempt number, set by the queue when + // it dispatches a job to its handler (0 if the queue doesn't track + // attempts). MaxAttempts is the total before dead-lettering. Handlers + // use these to tell a transient, will-be-retried failure apart from a + // terminal one — e.g. so a document isn't marked "failed" while the + // queue will still retry it. + Attempt int `json:"-"` + MaxAttempts int `json:"-"` } // Handler processes a single job. diff --git a/pkg/queue/river.go b/pkg/queue/river.go index 282ceda..5d45859 100644 --- a/pkg/queue/river.go +++ b/pkg/queue/river.go @@ -73,7 +73,19 @@ func (w *envelopeWorker) Work(ctx context.Context, job *river.Job[envelopeArgs]) if !ok { return fmt.Errorf("%w: %q", ErrUnknownKind, job.Args.DomainKind) } - return h(ctx, Job{Kind: job.Args.DomainKind, Payload: job.Args.Payload}) + // Attempt/MaxAttempts live on River's embedded *JobRow, which is always + // populated in production but may be nil in unit tests that construct a + // bare river.Job. Guard the deref. + attempt, maxAttempts := 0, 0 + if job.JobRow != nil { + attempt, maxAttempts = job.Attempt, job.MaxAttempts + } + return h(ctx, Job{ + Kind: job.Args.DomainKind, + Payload: job.Args.Payload, + Attempt: attempt, + MaxAttempts: maxAttempts, + }) } // NewRiver constructs a new River-backed Queue. It opens its own pgxpool