From 09ce68047363c488d4e6fe697924c3dcff4f36a2 Mon Sep 17 00:00:00 2001 From: Halleluyah Oludele Date: Sat, 20 Jun 2026 10:35:26 +0100 Subject: [PATCH 1/3] queue: add PermanentError so deterministic failures stop retrying A handler can now return queue.Permanent(err) to mark a failure as non-retryable. The River backend translates it to river.JobCancel, which dead-letters the job immediately instead of burning the full retry budget on an input that can never succeed. Transient failures stay ordinary errors and keep retrying. Fixes the HAL-321 symptom where an encrypted/malformed PDF was retried 5x, interleaving confusing 'object not found' errors into its history. --- pkg/queue/permanent_test.go | 47 +++++++++++++++++++++++++++++++++++++ pkg/queue/queue.go | 38 ++++++++++++++++++++++++++++++ pkg/queue/river.go | 10 +++++++- 3 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 pkg/queue/permanent_test.go diff --git a/pkg/queue/permanent_test.go b/pkg/queue/permanent_test.go new file mode 100644 index 0000000..9a8b429 --- /dev/null +++ b/pkg/queue/permanent_test.go @@ -0,0 +1,47 @@ +package queue + +import ( + "errors" + "fmt" + "testing" +) + +func TestPermanentWrapsAndUnwraps(t *testing.T) { + base := errors.New("encrypted PDF") + perm := Permanent(base) + + if !IsPermanent(perm) { + t.Fatal("IsPermanent should be true for a wrapped permanent error") + } + if !errors.Is(perm, base) { + t.Fatal("errors.Is should still match the wrapped cause") + } + if perm.Error() != base.Error() { + t.Fatalf("Error() = %q, want %q", perm.Error(), base.Error()) + } +} + +func TestPermanentDetectedThroughWrapping(t *testing.T) { + // A permanent error wrapped again with fmt.Errorf("%w") must still be + // detectable — the ingest pipeline returns queue.Permanent(...) which a + // caller may decorate with stage context. + wrapped := fmt.Errorf("parse: %w", Permanent(errors.New("malformed"))) + if !IsPermanent(wrapped) { + t.Fatal("IsPermanent should see through an outer fmt.Errorf wrap") + } +} + +func TestPermanentOfNilIsNil(t *testing.T) { + if Permanent(nil) != nil { + t.Fatal("Permanent(nil) must be nil so success paths stay clean") + } + if IsPermanent(nil) { + t.Fatal("IsPermanent(nil) must be false") + } +} + +func TestOrdinaryErrorIsNotPermanent(t *testing.T) { + if IsPermanent(errors.New("storage: object not found")) { + t.Fatal("a plain (transient) error must not be classified permanent") + } +} diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index 45e0380..49da5a6 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -79,3 +79,41 @@ type Queue interface { // ErrUnknownKind is returned by Queue implementations when a job with no // registered handler is received. var ErrUnknownKind = errors.New("queue: no handler registered for job kind") + +// PermanentError marks a handler failure as NON-retryable: the input is +// deterministically bad (e.g. an encrypted or malformed document the parser +// can't open, or content with no extractable text), so re-running the job can +// only fail the same way. A queue backend that understands retries MUST stop +// retrying a job whose handler returns a PermanentError and dead-letter it +// immediately, instead of burning the full retry budget on an outcome that +// will never change. +// +// Transient failures (a not-yet-visible source under heavy concurrent +// ingestion, a parse timeout under load, a flaky network call) are the +// opposite: they are returned as ordinary errors so the queue retries them. +type PermanentError struct{ Err error } + +func (e *PermanentError) Error() string { + if e.Err == nil { + return "permanent failure" + } + return e.Err.Error() +} + +func (e *PermanentError) Unwrap() error { return e.Err } + +// Permanent wraps err so the queue treats the failure as non-retryable. It +// returns nil for a nil err so `return queue.Permanent(doThing())` stays +// correct on the success path. +func Permanent(err error) error { + if err == nil { + return nil + } + return &PermanentError{Err: err} +} + +// IsPermanent reports whether err (or anything it wraps) is a PermanentError. +func IsPermanent(err error) bool { + var pe *PermanentError + return errors.As(err, &pe) +} diff --git a/pkg/queue/river.go b/pkg/queue/river.go index 5d45859..91bbea6 100644 --- a/pkg/queue/river.go +++ b/pkg/queue/river.go @@ -80,12 +80,20 @@ func (w *envelopeWorker) Work(ctx context.Context, job *river.Job[envelopeArgs]) if job.JobRow != nil { attempt, maxAttempts = job.Attempt, job.MaxAttempts } - return h(ctx, Job{ + err := h(ctx, Job{ Kind: job.Args.DomainKind, Payload: job.Args.Payload, Attempt: attempt, MaxAttempts: maxAttempts, }) + // A PermanentError means the input is deterministically bad — retrying it + // can only waste the remaining attempts (and, worse, interleave confusing + // transient errors into the job's history). river.JobCancel stops all + // further retries and dead-letters the job now, preserving the real cause. + if IsPermanent(err) { + return river.JobCancel(err) + } + return err } // NewRiver constructs a new River-backed Queue. It opens its own pgxpool From 5e39c24026a2e2fb0b052dcff26db657e7aebbf9 Mon Sep 17 00:00:00 2001 From: Halleluyah Oludele Date: Sat, 20 Jun 2026 10:35:36 +0100 Subject: [PATCH 2/3] ingest: classify deterministic parse failures as permanent parse() now splits failures: a not-yet-visible source and a parse timeout stay transient (retry can recover them under load); an encrypted/malformed/no-text document is wrapped queue.Permanent so the queue cancels it. fail() marks a permanent failure as terminal immediately, regardless of attempt number, so a cancelled doc no longer wedges in 'parsing' forever. --- pkg/ingest/ingest.go | 55 ++++++++++++++++++++++++++----- pkg/ingest/parse_classify_test.go | 38 +++++++++++++++++++++ 2 files changed, 85 insertions(+), 8 deletions(-) create mode 100644 pkg/ingest/parse_classify_test.go diff --git a/pkg/ingest/ingest.go b/pkg/ingest/ingest.go index 9d97507..4df956c 100644 --- a/pkg/ingest/ingest.go +++ b/pkg/ingest/ingest.go @@ -566,10 +566,44 @@ func runParallelStages(ctx context.Context, summarizeFn, hydeFn func(context.Con func (p *Pipeline) parse(ctx context.Context, parsers *parser.Registry, pl Payload) (*parser.ParsedDoc, error) { rc, _, err := getSourceWithRetry(ctx, p.Storage, pl.SourceRef) if err != nil { + // A not-yet-visible source is transient (the upload's fsync'd bytes can + // briefly read back as ErrNotFound under heavy concurrent ingestion, or + // while antivirus scans a freshly-written file), so it is returned as an + // ordinary, retryable error — never permanent. return nil, fmt.Errorf("fetch source: %w", err) } defer func() { _ = rc.Close() }() // best-effort close - return parsers.Parse(ctx, pl.ContentType, pl.Filename, rc) + + parsed, perr := parsers.Parse(ctx, pl.ContentType, pl.Filename, rc) + if perr != nil { + // Classify the failure so the queue stops wasting retries on inputs + // that can never succeed. A parse TIMEOUT or a context cancellation is + // transient — the same document may parse on a less-loaded attempt — so + // it stays an ordinary (retryable) error. Everything else from the + // parser is deterministic on these bytes (encrypted PDF that won't open + // with the empty password, a backend-rejected/malformed structure, a + // scanned image with no extractable text): retrying only re-derives the + // same failure, so mark it permanent and dead-letter it immediately. + if isTransientParseErr(perr) { + return nil, perr + } + return nil, queue.Permanent(perr) + } + return parsed, nil +} + +// isTransientParseErr reports whether a parser error is worth retrying. Parse +// timeouts and context cancellations are load-dependent (a complex document may +// finish within budget on a quieter attempt), so they are transient. A genuine +// structural rejection is not. +func isTransientParseErr(err error) bool { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return true + } + // The PDF parser's own deadline wrapper formats a plain (unwrapped) message + // when its internal timer fires, so match it textually as a fallback. + msg := err.Error() + return strings.Contains(msg, "parse exceeded") || strings.Contains(msg, "parse cancelled") } // getSourceWithRetry fetches a freshly-uploaded object, tolerating the @@ -1035,13 +1069,18 @@ func (p *Pipeline) fail(ctx context.Context, store docPersister, id tree.Documen failCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - // If the queue will retry this job, the failure is (so far) transient — - // under heavy concurrent ingestion, parse can hit a parse-timeout or a - // not-yet-visible source and recover on a later attempt. Surfacing - // "failed" now would tell a polling client the document is dead when it - // isn't, so keep it in "parsing" and let the retry run. Only the final - // attempt produces a terminal "failed". - if !isLastAttempt(ctx) { + // A permanent failure (encrypted/malformed/no-text document) will NOT be + // retried by the queue — the worker cancels the job — so it is terminal + // right now regardless of which attempt we're on. Marking it failed + // immediately gives the caller the real reason instead of leaving the + // document wedged in "parsing" forever waiting for a retry that never runs. + if !queue.IsPermanent(cause) && !isLastAttempt(ctx) { + // Otherwise, if the queue will retry this job, the failure is (so far) + // transient — under heavy concurrent ingestion, parse can hit a + // parse-timeout or a not-yet-visible source and recover on a later + // attempt. Surfacing "failed" now would tell a polling client the + // document is dead when it isn't, so keep it in "parsing" and let the + // retry run. Only the final attempt produces a terminal "failed". p.Logger.Warn("ingest: transient failure, will retry", "document_id", string(id), "stage", stage, "cause", cause.Error()) if err := store.SetDocumentStatus(failCtx, id, db.StatusParsing, ""); err != nil { diff --git a/pkg/ingest/parse_classify_test.go b/pkg/ingest/parse_classify_test.go new file mode 100644 index 0000000..9bd0256 --- /dev/null +++ b/pkg/ingest/parse_classify_test.go @@ -0,0 +1,38 @@ +package ingest + +import ( + "context" + "errors" + "fmt" + "testing" +) + +// The classifier decides whether a parser failure is worth retrying. Getting +// this wrong is expensive in both directions: marking a transient timeout +// permanent dead-letters a document that would have parsed on a quieter +// attempt; marking a deterministic rejection transient burns the whole retry +// budget (and, as seen in HAL-321, interleaves confusing "object not found" +// errors into the job history). +func TestIsTransientParseErr(t *testing.T) { + cases := []struct { + name string + err error + transient bool + }{ + {"deadline exceeded", context.DeadlineExceeded, true}, + {"canceled", context.Canceled, true}, + {"wrapped deadline", fmt.Errorf("pdf: parse cancelled: %w", context.DeadlineExceeded), true}, + {"pdf timeout message", errors.New("pdf: parse exceeded 5m0s — document too complex or malformed"), true}, + {"pdf cancelled message", errors.New("pdf: parse cancelled: context deadline exceeded"), true}, + {"encrypted", errors.New("pdf: open: encrypted and could not be unlocked with empty password: x"), false}, + {"backend rejected", errors.New("pdf: open: ledongthuc/pdf backend rejected the document: malformed PDF: 256-bit encryption key"), false}, + {"no extractable text", errors.New("pdf: parsed but no extractable text — the document may be a scanned image"), false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := isTransientParseErr(tc.err); got != tc.transient { + t.Fatalf("isTransientParseErr(%q) = %v, want %v", tc.err, got, tc.transient) + } + }) + } +} From 4dbf6e02550f3c5415d2eb89346b8276c6d83a4f Mon Sep 17 00:00:00 2001 From: Halleluyah Oludele Date: Sat, 20 Jun 2026 10:35:47 +0100 Subject: [PATCH 3/3] storage: pin local root to absolute + self-diagnosing not-found MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NewLocal resolves its root with filepath.Abs so a relative default (./data/documents) can't resolve against a different working directory after an engine restart while River still holds jobs for earlier uploads — the path drift that made a source 'not found' though it was on disk. Get's not-found error now carries the resolved path, and the engine logs the absolute root at boot (also where to point a Defender exclusion). --- cmd/engine/main.go | 8 +++++ pkg/storage/local.go | 28 +++++++++++++-- pkg/storage/local_test.go | 76 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 3 deletions(-) create mode 100644 pkg/storage/local_test.go diff --git a/cmd/engine/main.go b/cmd/engine/main.go index 3d61d21..352267c 100644 --- a/cmd/engine/main.go +++ b/cmd/engine/main.go @@ -96,6 +96,14 @@ func run() error { if err != nil { return fmt.Errorf("init storage: %w", err) } + // Log the resolved storage location at boot. For the local driver this is + // the absolute root every source/section object is read from and written + // to — the single most useful fact when diagnosing an "object not found", + // and the place to point a Windows Defender exclusion so antivirus scans of + // freshly-written PDFs don't transiently hide them from the ingest worker. + if local, ok := store.(*storage.Local); ok { + logger.Info("storage: local root resolved", "root", local.Root()) + } q, err := buildQueue(cfg.Queue, cfg.Database.URL) if err != nil { diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 147fd0d..82faba2 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -18,13 +18,31 @@ type Local struct { // NewLocal returns a Local storage rooted at dir. The directory is created // if it does not exist. +// +// dir is resolved to an ABSOLUTE path up front. A relative root (the default +// is "./data/documents") is otherwise resolved against the process's current +// working directory on every call — so if the engine is ever relaunched from a +// different directory while the queue still holds jobs that reference earlier +// uploads (River persists jobs in Postgres across restarts), the worker would +// look under a different root than the one the bytes were written to and fail +// with "object not found" on a file that is in fact on disk. Pinning the root +// to an absolute path at construction makes it stable for the process lifetime. func NewLocal(dir string) (*Local, error) { - if err := os.MkdirAll(dir, 0o755); err != nil { + abs, err := filepath.Abs(dir) + if err != nil { + return nil, fmt.Errorf("resolve storage root %q: %w", dir, err) + } + if err := os.MkdirAll(abs, 0o755); err != nil { return nil, fmt.Errorf("create storage root: %w", err) } - return &Local{root: dir}, nil + return &Local{root: abs}, nil } +// Root returns the absolute filesystem path the storage is rooted at. Exposed +// so the engine can log the resolved root at boot (the single most useful fact +// when diagnosing an "object not found" on a file that appears to be on disk). +func (l *Local) Root() string { return l.root } + func (l *Local) path(key string) string { // Keys may include slashes; treat them as path separators. return filepath.Join(l.root, filepath.FromSlash(key)) @@ -62,7 +80,11 @@ func (l *Local) Get(ctx context.Context, key string) (io.ReadCloser, Metadata, e info, err := os.Stat(full) if err != nil { if errors.Is(err, os.ErrNotExist) { - return nil, Metadata{}, ErrNotFound + // Wrap ErrNotFound (errors.Is still matches) but carry the resolved + // absolute path so the failure is self-diagnosing: a caller seeing + // this in a log can immediately tell whether it looked in the wrong + // root vs. the bytes genuinely being absent. + return nil, Metadata{}, fmt.Errorf("%w: %s", ErrNotFound, full) } return nil, Metadata{}, err } diff --git a/pkg/storage/local_test.go b/pkg/storage/local_test.go new file mode 100644 index 0000000..046ed8e --- /dev/null +++ b/pkg/storage/local_test.go @@ -0,0 +1,76 @@ +package storage + +import ( + "bytes" + "context" + "errors" + "io" + "os" + "path/filepath" + "strings" + "testing" +) + +func TestLocalResolvesRootToAbsolute(t *testing.T) { + // A relative root must be pinned to an absolute path so the worker reads + // from the same place regardless of the process's working directory at + // read time (the HAL-321 "object not found on a file that is on disk" bug). + dir := t.TempDir() + rel, err := filepath.Rel(mustGetwd(t), dir) + if err != nil { + t.Skipf("cannot relativise temp dir: %v", err) + } + l, err := NewLocal(rel) + if err != nil { + t.Fatalf("NewLocal: %v", err) + } + if !filepath.IsAbs(l.Root()) { + t.Fatalf("Root() = %q, want absolute", l.Root()) + } +} + +func TestLocalGetNotFoundCarriesPath(t *testing.T) { + l, err := NewLocal(t.TempDir()) + if err != nil { + t.Fatalf("NewLocal: %v", err) + } + _, _, err = l.Get(context.Background(), "documents/missing/source.pdf") + if !errors.Is(err, ErrNotFound) { + t.Fatalf("err = %v, want ErrNotFound", err) + } + // The error must name the resolved path so an operator can immediately see + // WHERE the worker looked. + if !strings.Contains(err.Error(), "source.pdf") { + t.Fatalf("not-found error %q should include the resolved path", err) + } +} + +func TestLocalPutThenGetRoundTrip(t *testing.T) { + l, err := NewLocal(t.TempDir()) + if err != nil { + t.Fatalf("NewLocal: %v", err) + } + ctx := context.Background() + want := []byte("hello vectorless") + if err := l.Put(ctx, "documents/doc1/source.pdf", bytes.NewReader(want), Metadata{}); err != nil { + t.Fatalf("Put: %v", err) + } + rc, _, err := l.Get(ctx, "documents/doc1/source.pdf") + if err != nil { + t.Fatalf("Get: %v", err) + } + defer func() { _ = rc.Close() }() + got, _ := io.ReadAll(rc) + if !bytes.Equal(got, want) { + t.Fatalf("round-trip = %q, want %q", got, want) + } +} + +func mustGetwd(t *testing.T) string { + t.Helper() + wd, err := os.Getwd() + if err != nil { + t.Fatalf("getwd: %v", err) + } + return wd +}