Skip to content

Bulk-ingest resilience: stop retrying deterministic failures; stabilize local storage root (HAL-321)#48

Merged
hallelx2 merged 3 commits into
mainfrom
halleluyaholudele/hal-321-bulk-ingest-resilience
Jun 20, 2026
Merged

Bulk-ingest resilience: stop retrying deterministic failures; stabilize local storage root (HAL-321)#48
hallelx2 merged 3 commits into
mainfrom
halleluyaholudele/hal-321-bulk-ingest-resilience

Conversation

@hallelx2

@hallelx2 hallelx2 commented Jun 20, 2026

Copy link
Copy Markdown
Owner

Root cause (evidence-backed)

Inspecting the live River queue + disk proved HAL-321 is two distinct problems, not one:

(A) Transient object not found — a Windows filesystem/AV interaction, not a write race.
The per-attempt error history shows the same source file flipping found↔not-found across attempts minutes apart (e.g. job 93: read OK on attempts 1/3/4/5, not found on attempt 2). All 5 permanently-discarded jobs' source files exist on disk right now, full-size, at the exact path the worker computes. A file present at the computed path but intermittently invisible = os.Stat returning FILE_NOT_FOUND on an existing file — the signature of Windows Defender real-time scanning (confirmed ON) a freshly-written multi-MB PDF; under heavy concurrent ingest those scan windows overlap the worker's reads. fsync + retry only papered over it.

(B) Deterministic failures retried 5×. An encrypted PDF (256-bit encryption key) and parse-timeouts were retried like transients, burning the retry budget and interleaving confusing not found errors into the history.

Fix

  1. queue.PermanentError + River JobCancel: a handler can mark a failure non-retryable; River dead-letters it immediately instead of retrying 5×.
  2. Ingest classifies parse failures: not-yet-visible source and parse timeout stay transient (retry recovers them under load); encrypted/malformed/no-text become permanent. fail() marks a permanent failure terminal immediately so a cancelled doc can't wedge in parsing forever.
  3. Local storage root pinned to absolute (filepath.Abs in NewLocal) so a relative default can't resolve against a different CWD after a restart while River holds older jobs (the path-drift variant). Not-found errors now carry the resolved path; the engine logs the absolute root at boot (also where to point a Defender exclusion).

Tests

  • pkg/queue/permanent_test.go — wrap/unwrap/IsPermanent through fmt wrapping.
  • pkg/ingest/parse_classify_test.go — transient vs permanent for every real error string.
  • pkg/storage/local_test.go — absolute root, not-found carries path, round-trip.
  • go build ./... + go vet clean; full pkg/ingest, pkg/queue, pkg/storage suites pass.

Operational follow-ups (not code)

  • Add a Defender exclusion for the engine's storage root (eliminates the transient at source).
  • Run the bench engine with an absolute VLE_STORAGE_LOCAL_ROOT.
  • Clear the contaminated River jobs from prior mixed-config runs before the next clean bench.

Closes HAL-321

Summary by Sourcery

Classify ingest parse failures so deterministically bad documents are treated as permanent errors, and make local storage paths stable and diagnosable to prevent misleading object-not-found errors.

New Features:

  • Introduce queue.PermanentError to mark non-retryable job failures and have the River worker cancel such jobs immediately instead of retrying.
  • Classify parser errors into transient versus permanent so encrypted, malformed, or no-text documents are dead-lettered instead of retried.
  • Expose the resolved absolute root path from the local storage driver and log it at engine startup for operational visibility.

Bug Fixes:

  • Resolve local storage roots to absolute paths up front to avoid path drift across process restarts causing false object-not-found errors.
  • Include the resolved filesystem path in local storage not-found errors to aid diagnosis of missing objects.
  • Ensure permanent ingest failures transition documents to a terminal failed state immediately instead of remaining stuck in parsing.

Enhancements:

  • Treat parse timeouts and context cancellations as transient errors using both structured causes and known parser error message patterns.

Tests:

  • Add tests covering permanent error wrapping/unwrapping and detection through fmt.Errorf layers.
  • Add tests verifying parse error classification between transient and permanent cases.
  • Add tests ensuring local storage resolves roots to absolute paths, includes paths in not-found errors, and supports put/get round trips.

Summary by CodeRabbit

  • New Features

    • Implemented error classification system distinguishing permanent from transient parse failures for optimized job retry behavior
    • Added diagnostic logging of resolved local storage path at startup
  • Bug Fixes

    • Enhanced storage operation error messages with resolved absolute file paths for easier troubleshooting
  • Tests

    • Added comprehensive test coverage for error classification, retry logic, and storage backend operations

hallelx2 added 3 commits June 20, 2026 10:35
A handler can now return queue.Permanent(err) to mark a failure as
non-retryable. The River backend translates it to river.JobCancel, which
dead-letters the job immediately instead of burning the full retry budget on
an input that can never succeed. Transient failures stay ordinary errors and
keep retrying. Fixes the HAL-321 symptom where an encrypted/malformed PDF was
retried 5x, interleaving confusing 'object not found' errors into its history.
parse() now splits failures: a not-yet-visible source and a parse timeout stay
transient (retry can recover them under load); an encrypted/malformed/no-text
document is wrapped queue.Permanent so the queue cancels it. fail() marks a
permanent failure as terminal immediately, regardless of attempt number, so a
cancelled doc no longer wedges in 'parsing' forever.
NewLocal resolves its root with filepath.Abs so a relative default
(./data/documents) can't resolve against a different working directory after an
engine restart while River still holds jobs for earlier uploads — the path
drift that made a source 'not found' though it was on disk. Get's not-found
error now carries the resolved path, and the engine logs the absolute root at
boot (also where to point a Defender exclusion).
@sourcery-ai

sourcery-ai Bot commented Jun 20, 2026

Copy link
Copy Markdown

Reviewer's Guide

Introduces a PermanentError mechanism so ingest handlers can mark deterministic parse failures as non-retryable, updates the ingest pipeline and River queue integration to stop retrying such jobs, and hardens local storage by pinning its root to an absolute path and enriching not-found errors, with accompanying tests and engine logging.

Sequence diagram for ingest parse failures with PermanentError and River cancellation

sequenceDiagram
    participant Pipeline
    participant ParserRegistry
    participant QueueWorker
    participant River

    QueueWorker->>Pipeline: parse(ctx, parsers, payload)
    Pipeline->>ParserRegistry: Parse(ctx, contentType, filename, rc)
    ParserRegistry-->>Pipeline: perr
    alt [isTransientParseErr(perr)]
        Pipeline-->>QueueWorker: perr
        QueueWorker->>River: Work returns perr
        River-->>QueueWorker: retry job later
    else [!isTransientParseErr(perr)]
        Pipeline->>QueueWorker: queue.Permanent(perr)
        QueueWorker->>River: Work returns err
        River->>QueueWorker: IsPermanent(err)
        alt [IsPermanent(err)]
            QueueWorker->>River: JobCancel(err)
            River-->>QueueWorker: dead-letter job, stop retries
        end
    end
Loading

File-Level Changes

Change Details Files
Classify parser failures as transient vs permanent and surface permanent ones to the queue so they are not retried.
  • Wrap parser failures in parse() with a transient/permanent classifier, returning queue.Permanent for deterministic failures.
  • Introduce isTransientParseErr to treat timeouts and context cancellations (including specific PDF timeout strings) as retryable.
  • Adjust fail() to treat permanent errors as immediately terminal regardless of attempt number, while keeping transient failures non-terminal until the last attempt.
  • Add parse_classify_test.go to exhaustively test transient vs permanent classification for representative error strings.
pkg/ingest/ingest.go
pkg/ingest/parse_classify_test.go
Add a PermanentError type and integrate it with the River queue worker to cancel deterministically failing jobs instead of retrying them.
  • Define PermanentError, Permanent(), and IsPermanent() helpers in the queue package to mark and detect non-retryable failures.
  • Update the River envelope worker to translate PermanentError results into river.JobCancel, preventing further retries and dead-lettering the job.
  • Add permanent_test.go to verify wrapping/unwrapping behavior, detection through fmt.Errorf wrapping, nil handling, and that ordinary errors are not classified as permanent.
pkg/queue/queue.go
pkg/queue/river.go
pkg/queue/permanent_test.go
Stabilize local storage by making its root absolute and improving observability of not-found errors.
  • Resolve the storage root to an absolute path in NewLocal, ensure the directory exists, and expose it via a new Root() accessor.
  • Wrap ErrNotFound in Local.Get with an error that includes the resolved absolute path while preserving errors.Is behavior.
  • Log the resolved local storage root at engine startup for easier diagnosis and AV exclusion configuration.
  • Add local_test.go to cover absolute root resolution, not-found error message content, and Put/Get round-trip behavior.
pkg/storage/local.go
cmd/engine/main.go
pkg/storage/local_test.go

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@coderabbitai

coderabbitai Bot commented Jun 20, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

Adds a PermanentError type to the queue package to mark non-retryable failures. The River worker uses it to dead-letter jobs via river.JobCancel. The ingest pipeline classifies parse errors as transient or permanent and makes fail immediately terminal for permanent causes. Separately, local storage now resolves its root to an absolute path, exposes it via Root(), and enriches not-found errors with the resolved path; startup logs that root.

Changes

Permanent Error Classification

Layer / File(s) Summary
PermanentError type and helpers
pkg/queue/queue.go, pkg/queue/permanent_test.go
Adds PermanentError struct with Error() and Unwrap(), plus Permanent() and IsPermanent() helpers. Tests cover wrapping, unwrapping, nil input, and detection through additional fmt.Errorf wrapping.
River worker permanent-error cancellation
pkg/queue/river.go
envelopeWorker.Work captures the handler error and, when IsPermanent returns true, returns river.JobCancel to dead-letter the job instead of allowing further retries.
Ingest parse transient/permanent classification
pkg/ingest/ingest.go, pkg/ingest/parse_classify_test.go
Pipeline.parse routes errors through isTransientParseErr (matching context deadline/cancel and specific message substrings) to return raw retryable errors or queue.Permanent-wrapped dead-letter errors. Pipeline.fail skips the transient-retry branch when the cause is permanent.

Local Storage Path Resolution

Layer / File(s) Summary
Local storage absolute path, Root(), enriched errors, and startup log
pkg/storage/local.go, pkg/storage/local_test.go, cmd/engine/main.go
NewLocal resolves its dir argument to an absolute path before creating directories. A new Root() method exposes that value. Get wraps ErrNotFound with the resolved absolute path. Tests verify absolute root, not-found message contents, and a put/get round-trip. run() logs the resolved root when the local driver is active.

Sequence Diagram(s)

sequenceDiagram
  participant River as River Scheduler
  participant envelopeWorker
  participant Pipeline
  participant isTransientParseErr

  River->>envelopeWorker: Work(ctx, job)
  envelopeWorker->>Pipeline: handle parse
  Pipeline->>isTransientParseErr: classify(err)
  isTransientParseErr-->>Pipeline: transient=true/false
  alt transient
    Pipeline-->>envelopeWorker: raw err
    envelopeWorker-->>River: err (retry scheduled)
  else permanent
    Pipeline-->>envelopeWorker: queue.Permanent(err)
    envelopeWorker->>envelopeWorker: IsPermanent → true
    envelopeWorker-->>River: river.JobCancel(err) — dead-letter
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • hallelx2/vectorless-engine#47: Modifies pkg/ingest/ingest.go failure and retry handling to distinguish transient vs terminal outcomes, directly overlapping with this PR's permanent error classification changes to Pipeline.fail and Pipeline.parse.

Poem

🐇 Hoppity-hop through the error maze,
Permanent failures meet their final days!
No retry loops for the broken PDF—
Dead-lettered neat, with a JobCancel ref.
And local roots? Resolved with care,
Absolute paths logged through the air! 🌟

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 35.71% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and accurately summarizes the two main changes: stopping retries for deterministic failures and stabilizing the local storage root, matching the core objectives of the PR.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch halleluyaholudele/hal-321-bulk-ingest-resilience

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 4 issues, and left some high level feedback:

  • The isTransientParseErr classification relies on specific substrings in error messages, which is brittle to upstream message changes; consider using well-known error types/sentinels or wrapping your own typed errors in the parser layer so this logic is less dependent on exact strings.
  • PermanentError.Error() currently returns only the underlying error message, which can make it hard to see in logs that a failure was marked permanent; consider adding a short prefix (while still unwrapping correctly) so permanence is observable without needing IsPermanent.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The `isTransientParseErr` classification relies on specific substrings in error messages, which is brittle to upstream message changes; consider using well-known error types/sentinels or wrapping your own typed errors in the parser layer so this logic is less dependent on exact strings.
- `PermanentError.Error()` currently returns only the underlying error message, which can make it hard to see in logs that a failure was marked permanent; consider adding a short prefix (while still unwrapping correctly) so permanence is observable without needing `IsPermanent`.

## Individual Comments

### Comment 1
<location path="pkg/ingest/ingest.go" line_range="599-605" />
<code_context>
+// timeouts and context cancellations are load-dependent (a complex document may
+// finish within budget on a quieter attempt), so they are transient. A genuine
+// structural rejection is not.
+func isTransientParseErr(err error) bool {
+	if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
+		return true
+	}
+	// The PDF parser's own deadline wrapper formats a plain (unwrapped) message
+	// when its internal timer fires, so match it textually as a fallback.
+	msg := err.Error()
+	return strings.Contains(msg, "parse exceeded") || strings.Contains(msg, "parse cancelled")
 }
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Reliance on parser error message substrings for transient classification is brittle.

These string checks make transient vs permanent classification dependent on exact error wording and capitalization, so a small message change could silently reclassify timeouts as permanent.

Prefer a more robust signal, e.g. typed/sentinel errors from the parser with `errors.Is`/`errors.As`. If that’s not available, at least normalize the message (e.g. `strings.ToLower`) and/or rely on a structured marker rather than free text, and ensure the source of these strings is clearly constrained.

Suggested implementation:

```golang
 // isTransientParseErr reports whether a parser error is worth retrying.
 // Parse timeouts and context cancellations are load-dependent (a complex
 // document may finish within budget on a quieter attempt), so they are
 // treated as transient. A genuine structural rejection is not.
 //
 // The fallback string matching below is constrained to the parser's own
 // deadline wrapper messages, which are kept stable and lowercase; we
 // normalize the error text before matching to avoid brittleness from
 // casing or minor wording changes.
func isTransientParseErr(err error) bool {
	if err == nil {
		return false
	}

	if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
		return true
	}

	// The PDF parser's own deadline wrapper formats a plain (unwrapped) message
	// when its internal timer fires; normalize before matching to reduce
	// dependence on exact wording/capitalization.
	msg := strings.ToLower(err.Error())
	return strings.Contains(msg, "parse exceeded") || strings.Contains(msg, "parse cancelled")
}

```

If the underlying parser can expose typed/sentinel errors for deadline and cancellation conditions, prefer to:
1. Define exported sentinel errors (e.g. `var ErrParseDeadlineExceeded = errors.New("...")`) or custom types in the parser package.
2. Replace the string matching block with `errors.Is(err, parser.ErrParseDeadlineExceeded)` / `errors.Is(err, parser.ErrParseCancelled)` checks.
</issue_to_address>

### Comment 2
<location path="pkg/storage/local.go" line_range="82-87" />
<code_context>
 	info, err := os.Stat(full)
 	if err != nil {
 		if errors.Is(err, os.ErrNotExist) {
-			return nil, Metadata{}, ErrNotFound
+			// Wrap ErrNotFound (errors.Is still matches) but carry the resolved
+			// absolute path so the failure is self-diagnosing: a caller seeing
+			// this in a log can immediately tell whether it looked in the wrong
+			// root vs. the bytes genuinely being absent.
+			return nil, Metadata{}, fmt.Errorf("%w: %s", ErrNotFound, full)
 		}
 		return nil, Metadata{}, err
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Including the absolute path in the ErrNotFound wrapper may leak filesystem layout to callers/logs.

This improves diagnostics but also exposes the exact filesystem path wherever the error is surfaced. If errors can reach external boundaries (API responses, shared logs, multi-tenant contexts), this may leak host layout.

Consider either restricting the full path to internal logs (and using a more generic error message outward), or storing `full` in a non-string field (e.g., a custom error type) so it’s available for logging without being embedded directly in `Error()`.

Suggested implementation:

```golang
type notFoundError struct {
	// path holds the absolute filesystem path that was probed. It is not
	// included in Error() to avoid leaking filesystem layout across process
	// boundaries, but is available to internal callers via errors.As.
	path string
}

func (e notFoundError) Error() string {
	// Preserve the existing outward-facing message while keeping the path
	// available for internal diagnostics.
	return ErrNotFound.Error()
}

func (e notFoundError) Unwrap() error {
	// Maintain errors.Is(err, ErrNotFound) semantics.
	return ErrNotFound
}

// Root returns the absolute filesystem path the storage is rooted at. Exposed
// so the engine can log the resolved root at boot (the single most useful fact
// when diagnosing an "object not found" on a file that appears to be on disk).
func (l *Local) Root() string { return l.root }

```

```golang
			// Wrap ErrNotFound (errors.Is still matches) but carry the resolved
			// absolute path in a structured error so it can be logged internally
			// without leaking filesystem layout via Error().
			return nil, Metadata{}, notFoundError{path: full}

```

To fully benefit from this change, any logging or diagnostics that previously relied on the error string to see the path should be updated to use `errors.As` to extract the `notFoundError`:

1. Where you log these errors, add something like:
   ```go
   var nfErr notFoundError
   if errors.As(err, &nfErr) {
       logger.With("path", nfErr.path).Warn("object not found")
   }
   ```
2. Ensure that `ErrNotFound` continues to carry whatever user-facing message you want for external boundaries, since `notFoundError.Error()` now defers to `ErrNotFound.Error()`.
</issue_to_address>

### Comment 3
<location path="pkg/queue/permanent_test.go" line_range="24-29" />
<code_context>
+	}
+}
+
+func TestPermanentDetectedThroughWrapping(t *testing.T) {
+	// A permanent error wrapped again with fmt.Errorf("%w") must still be
+	// detectable — the ingest pipeline returns queue.Permanent(...) which a
+	// caller may decorate with stage context.
+	wrapped := fmt.Errorf("parse: %w", Permanent(errors.New("malformed")))
+	if !IsPermanent(wrapped) {
+		t.Fatal("IsPermanent should see through an outer fmt.Errorf wrap")
+	}
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test that verifies river workers translate Permanent errors into JobCancel and stop retries

Current tests cover `Permanent` and `IsPermanent` wrapping semantics, but not the new River worker behavior that turns a `Permanent` error into `river.JobCancel` and stops retries. Please add a focused test in the `queue` package that constructs an `envelopeWorker` with a handler returning `queue.Permanent(errors.New("malformed"))` and asserts that `Work` yields `river.JobCancel` (or that River marks the job as canceled/dead-lettered). This ensures the higher-level retry policy actually honors `Permanent` errors, not just their detection in isolation.

Suggested implementation:

```golang
import (
	"context"
	"errors"
	"fmt"
	"testing"

	"github.com/riverqueue/river"
)

```

```golang
func TestPermanentDetectedThroughWrapping(t *testing.T) {
	// A permanent error wrapped again with fmt.Errorf("%w") must still be
	// detectable — the ingest pipeline returns queue.Permanent(...) which a
	// caller may decorate with stage context.
	wrapped := fmt.Errorf("parse: %w", Permanent(errors.New("malformed")))
	if !IsPermanent(wrapped) {
		t.Fatal("IsPermanent should see through an outer fmt.Errorf wrap")
	}
}

func TestPermanentErrorCancelsRiverJob(t *testing.T) {
	t.Parallel()

	// Construct a worker whose handler always returns a Permanent error.
	//
	// The expectation is that the queue worker layer will translate this into a
	// river.JobCancel so that River marks the job as canceled and does not
	// retry it.
	worker := &envelopeWorker{
		handler: func(ctx context.Context, env *Envelope) error {
			return Permanent(errors.New("malformed"))
		},
	}

	ctx := context.Background()

	// Use a minimal job instance; adapt fields as needed for your codebase.
	job := &river.Job{}

	err := worker.Work(ctx, job)
	if err == nil {
		t.Fatalf("Work() error = nil, want river.JobCancel")
	}

	var jobCancel river.JobCancel
	if !errors.As(err, &jobCancel) {
		t.Fatalf("Work() error = %T (%v), want river.JobCancel", err, err)
	}
}

```

The suggested test assumes:
1. There is an `envelopeWorker` type in the `queue` package with a field `handler` compatible with `func(context.Context, *Envelope) error` and a method `Work(ctx context.Context, job *river.Job) error`.
2. There is an `Envelope` type in the same package.
3. `Work` returns a `river.JobCancel` error when the handler returns a `Permanent` error.

To fully integrate this test, you may need to:
1. Adjust the construction of `envelopeWorker` (field names, constructor function, or handler signature) to match the real implementation. For example, if you normally construct workers via a constructor like `newEnvelopeWorker(handler Handler)`, use that instead of struct literal initialization.
2. Provide any required fields on `river.Job` (e.g., `Queue`, `Kind`, or IDs) so that `Work` runs without panicking in your environment.
3. If your worker returns a wrapped `river.JobCancel` (e.g., with `fmt.Errorf("...: %w", river.JobCancel{...})`), keep the `errors.As` assertion as-is; otherwise, if you return a sentinel error, change the assertion to compare directly with that sentinel.
4. If your worker uses a different method name than `Work` (e.g., `Handle`, `Process`), update the test accordingly.
</issue_to_address>

### Comment 4
<location path="pkg/storage/local_test.go" line_range="32-41" />
<code_context>
+	}
+}
+
+func TestLocalGetNotFoundCarriesPath(t *testing.T) {
+	l, err := NewLocal(t.TempDir())
+	if err != nil {
+		t.Fatalf("NewLocal: %v", err)
+	}
+	_, _, err = l.Get(context.Background(), "documents/missing/source.pdf")
+	if !errors.Is(err, ErrNotFound) {
+		t.Fatalf("err = %v, want ErrNotFound", err)
+	}
+	// The error must name the resolved path so an operator can immediately see
+	// WHERE the worker looked.
+	if !strings.Contains(err.Error(), "source.pdf") {
+		t.Fatalf("not-found error %q should include the resolved path", err)
+	}
</code_context>
<issue_to_address>
**suggestion (testing):** Strengthen the not-found error test by asserting the full absolute path (or at least that the path is absolute)

Since the change is about surfacing the resolved absolute path, this test should assert on that behavior rather than just the filename. For example, check that the error string includes `l.Root()` (or `filepath.Join(l.Root(), ...)`), or at least that the path in the message is absolute (e.g., using `filepath.IsAbs`). This will better protect against regressions where the filename remains but the absolute root is dropped.

Suggested implementation:

```golang
func TestLocalGetNotFoundCarriesPath(t *testing.T) {
	l, err := NewLocal(t.TempDir())
	if err != nil {
		t.Fatalf("NewLocal: %v", err)
	}

	_, _, err = l.Get(context.Background(), "documents/missing/source.pdf")
	if !errors.Is(err, ErrNotFound) {
		t.Fatalf("err = %v, want ErrNotFound", err)
	}

	msg := err.Error()

	// The error must name the resolved path so an operator can immediately see
	// WHERE the worker looked.
	if !strings.Contains(msg, "source.pdf") {
		t.Fatalf("not-found error %q should include the filename", msg)
	}

	// The error should surface the resolved absolute path rooted at l.Root().
	wantPath := filepath.Join(l.Root(), "documents", "missing", "source.pdf")
	if !strings.Contains(msg, wantPath) {
		t.Fatalf("not-found error %q should include resolved absolute path %q", msg, wantPath)
	}

	// Sanity check that the expected path is absolute; if this ever fails, the
	// test itself is misconfigured.
	if !filepath.IsAbs(wantPath) {
		t.Fatalf("test bug: wantPath %q must be absolute", wantPath)
	}
}

```

To compile, ensure `local_test.go` imports the packages used in this test:
1. `context`
2. `errors`
3. `strings`
4. `path/filepath` (likely already present, as shown in your snippet)

If any of these are missing from the import block, add them accordingly, respecting the existing grouping and style in the file.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread pkg/ingest/ingest.go
Comment on lines +599 to +605
func isTransientParseErr(err error) bool {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return true
}
// The PDF parser's own deadline wrapper formats a plain (unwrapped) message
// when its internal timer fires, so match it textually as a fallback.
msg := err.Error()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Reliance on parser error message substrings for transient classification is brittle.

These string checks make transient vs permanent classification dependent on exact error wording and capitalization, so a small message change could silently reclassify timeouts as permanent.

Prefer a more robust signal, e.g. typed/sentinel errors from the parser with errors.Is/errors.As. If that’s not available, at least normalize the message (e.g. strings.ToLower) and/or rely on a structured marker rather than free text, and ensure the source of these strings is clearly constrained.

Suggested implementation:

 // isTransientParseErr reports whether a parser error is worth retrying.
 // Parse timeouts and context cancellations are load-dependent (a complex
 // document may finish within budget on a quieter attempt), so they are
 // treated as transient. A genuine structural rejection is not.
 //
 // The fallback string matching below is constrained to the parser's own
 // deadline wrapper messages, which are kept stable and lowercase; we
 // normalize the error text before matching to avoid brittleness from
 // casing or minor wording changes.
func isTransientParseErr(err error) bool {
	if err == nil {
		return false
	}

	if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
		return true
	}

	// The PDF parser's own deadline wrapper formats a plain (unwrapped) message
	// when its internal timer fires; normalize before matching to reduce
	// dependence on exact wording/capitalization.
	msg := strings.ToLower(err.Error())
	return strings.Contains(msg, "parse exceeded") || strings.Contains(msg, "parse cancelled")
}

If the underlying parser can expose typed/sentinel errors for deadline and cancellation conditions, prefer to:

  1. Define exported sentinel errors (e.g. var ErrParseDeadlineExceeded = errors.New("...")) or custom types in the parser package.
  2. Replace the string matching block with errors.Is(err, parser.ErrParseDeadlineExceeded) / errors.Is(err, parser.ErrParseCancelled) checks.

Comment thread pkg/storage/local.go
Comment on lines 82 to +87
if errors.Is(err, os.ErrNotExist) {
return nil, Metadata{}, ErrNotFound
// Wrap ErrNotFound (errors.Is still matches) but carry the resolved
// absolute path so the failure is self-diagnosing: a caller seeing
// this in a log can immediately tell whether it looked in the wrong
// root vs. the bytes genuinely being absent.
return nil, Metadata{}, fmt.Errorf("%w: %s", ErrNotFound, full)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 suggestion (security): Including the absolute path in the ErrNotFound wrapper may leak filesystem layout to callers/logs.

This improves diagnostics but also exposes the exact filesystem path wherever the error is surfaced. If errors can reach external boundaries (API responses, shared logs, multi-tenant contexts), this may leak host layout.

Consider either restricting the full path to internal logs (and using a more generic error message outward), or storing full in a non-string field (e.g., a custom error type) so it’s available for logging without being embedded directly in Error().

Suggested implementation:

type notFoundError struct {
	// path holds the absolute filesystem path that was probed. It is not
	// included in Error() to avoid leaking filesystem layout across process
	// boundaries, but is available to internal callers via errors.As.
	path string
}

func (e notFoundError) Error() string {
	// Preserve the existing outward-facing message while keeping the path
	// available for internal diagnostics.
	return ErrNotFound.Error()
}

func (e notFoundError) Unwrap() error {
	// Maintain errors.Is(err, ErrNotFound) semantics.
	return ErrNotFound
}

// Root returns the absolute filesystem path the storage is rooted at. Exposed
// so the engine can log the resolved root at boot (the single most useful fact
// when diagnosing an "object not found" on a file that appears to be on disk).
func (l *Local) Root() string { return l.root }
			// Wrap ErrNotFound (errors.Is still matches) but carry the resolved
			// absolute path in a structured error so it can be logged internally
			// without leaking filesystem layout via Error().
			return nil, Metadata{}, notFoundError{path: full}

To fully benefit from this change, any logging or diagnostics that previously relied on the error string to see the path should be updated to use errors.As to extract the notFoundError:

  1. Where you log these errors, add something like:
    var nfErr notFoundError
    if errors.As(err, &nfErr) {
        logger.With("path", nfErr.path).Warn("object not found")
    }
  2. Ensure that ErrNotFound continues to carry whatever user-facing message you want for external boundaries, since notFoundError.Error() now defers to ErrNotFound.Error().

Comment on lines +24 to +29
func TestPermanentDetectedThroughWrapping(t *testing.T) {
// A permanent error wrapped again with fmt.Errorf("%w") must still be
// detectable — the ingest pipeline returns queue.Permanent(...) which a
// caller may decorate with stage context.
wrapped := fmt.Errorf("parse: %w", Permanent(errors.New("malformed")))
if !IsPermanent(wrapped) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding a test that verifies river workers translate Permanent errors into JobCancel and stop retries

Current tests cover Permanent and IsPermanent wrapping semantics, but not the new River worker behavior that turns a Permanent error into river.JobCancel and stops retries. Please add a focused test in the queue package that constructs an envelopeWorker with a handler returning queue.Permanent(errors.New("malformed")) and asserts that Work yields river.JobCancel (or that River marks the job as canceled/dead-lettered). This ensures the higher-level retry policy actually honors Permanent errors, not just their detection in isolation.

Suggested implementation:

import (
	"context"
	"errors"
	"fmt"
	"testing"

	"github.com/riverqueue/river"
)
func TestPermanentDetectedThroughWrapping(t *testing.T) {
	// A permanent error wrapped again with fmt.Errorf("%w") must still be
	// detectable — the ingest pipeline returns queue.Permanent(...) which a
	// caller may decorate with stage context.
	wrapped := fmt.Errorf("parse: %w", Permanent(errors.New("malformed")))
	if !IsPermanent(wrapped) {
		t.Fatal("IsPermanent should see through an outer fmt.Errorf wrap")
	}
}

func TestPermanentErrorCancelsRiverJob(t *testing.T) {
	t.Parallel()

	// Construct a worker whose handler always returns a Permanent error.
	//
	// The expectation is that the queue worker layer will translate this into a
	// river.JobCancel so that River marks the job as canceled and does not
	// retry it.
	worker := &envelopeWorker{
		handler: func(ctx context.Context, env *Envelope) error {
			return Permanent(errors.New("malformed"))
		},
	}

	ctx := context.Background()

	// Use a minimal job instance; adapt fields as needed for your codebase.
	job := &river.Job{}

	err := worker.Work(ctx, job)
	if err == nil {
		t.Fatalf("Work() error = nil, want river.JobCancel")
	}

	var jobCancel river.JobCancel
	if !errors.As(err, &jobCancel) {
		t.Fatalf("Work() error = %T (%v), want river.JobCancel", err, err)
	}
}

The suggested test assumes:

  1. There is an envelopeWorker type in the queue package with a field handler compatible with func(context.Context, *Envelope) error and a method Work(ctx context.Context, job *river.Job) error.
  2. There is an Envelope type in the same package.
  3. Work returns a river.JobCancel error when the handler returns a Permanent error.

To fully integrate this test, you may need to:

  1. Adjust the construction of envelopeWorker (field names, constructor function, or handler signature) to match the real implementation. For example, if you normally construct workers via a constructor like newEnvelopeWorker(handler Handler), use that instead of struct literal initialization.
  2. Provide any required fields on river.Job (e.g., Queue, Kind, or IDs) so that Work runs without panicking in your environment.
  3. If your worker returns a wrapped river.JobCancel (e.g., with fmt.Errorf("...: %w", river.JobCancel{...})), keep the errors.As assertion as-is; otherwise, if you return a sentinel error, change the assertion to compare directly with that sentinel.
  4. If your worker uses a different method name than Work (e.g., Handle, Process), update the test accordingly.

Comment thread pkg/storage/local_test.go
Comment on lines +32 to +41
func TestLocalGetNotFoundCarriesPath(t *testing.T) {
l, err := NewLocal(t.TempDir())
if err != nil {
t.Fatalf("NewLocal: %v", err)
}
_, _, err = l.Get(context.Background(), "documents/missing/source.pdf")
if !errors.Is(err, ErrNotFound) {
t.Fatalf("err = %v, want ErrNotFound", err)
}
// The error must name the resolved path so an operator can immediately see

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Strengthen the not-found error test by asserting the full absolute path (or at least that the path is absolute)

Since the change is about surfacing the resolved absolute path, this test should assert on that behavior rather than just the filename. For example, check that the error string includes l.Root() (or filepath.Join(l.Root(), ...)), or at least that the path in the message is absolute (e.g., using filepath.IsAbs). This will better protect against regressions where the filename remains but the absolute root is dropped.

Suggested implementation:

func TestLocalGetNotFoundCarriesPath(t *testing.T) {
	l, err := NewLocal(t.TempDir())
	if err != nil {
		t.Fatalf("NewLocal: %v", err)
	}

	_, _, err = l.Get(context.Background(), "documents/missing/source.pdf")
	if !errors.Is(err, ErrNotFound) {
		t.Fatalf("err = %v, want ErrNotFound", err)
	}

	msg := err.Error()

	// The error must name the resolved path so an operator can immediately see
	// WHERE the worker looked.
	if !strings.Contains(msg, "source.pdf") {
		t.Fatalf("not-found error %q should include the filename", msg)
	}

	// The error should surface the resolved absolute path rooted at l.Root().
	wantPath := filepath.Join(l.Root(), "documents", "missing", "source.pdf")
	if !strings.Contains(msg, wantPath) {
		t.Fatalf("not-found error %q should include resolved absolute path %q", msg, wantPath)
	}

	// Sanity check that the expected path is absolute; if this ever fails, the
	// test itself is misconfigured.
	if !filepath.IsAbs(wantPath) {
		t.Fatalf("test bug: wantPath %q must be absolute", wantPath)
	}
}

To compile, ensure local_test.go imports the packages used in this test:

  1. context
  2. errors
  3. strings
  4. path/filepath (likely already present, as shown in your snippet)

If any of these are missing from the import block, add them accordingly, respecting the existing grouping and style in the file.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@pkg/storage/local_test.go`:
- Around line 41-45: The test assertion in the error message validation only
checks if the error contains the filename suffix "source.pdf", but it should
validate the full resolved absolute path to ensure the contract is properly
tested. Construct the expected full resolved path by combining l.Root() with the
key, then update the assertion to check that err.Error() contains this complete
path instead of just the filename suffix "source.pdf". This ensures the test
properly validates that the error message includes the resolved absolute path
that an operator needs to see.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 01fda931-7091-4149-baf3-091baaa5fa40

📥 Commits

Reviewing files that changed from the base of the PR and between b81db7e and 4dbf6e0.

📒 Files selected for processing (8)
  • cmd/engine/main.go
  • pkg/ingest/ingest.go
  • pkg/ingest/parse_classify_test.go
  • pkg/queue/permanent_test.go
  • pkg/queue/queue.go
  • pkg/queue/river.go
  • pkg/storage/local.go
  • pkg/storage/local_test.go

Comment thread pkg/storage/local_test.go
Comment on lines +41 to +45
// The error must name the resolved path so an operator can immediately see
// WHERE the worker looked.
if !strings.Contains(err.Error(), "source.pdf") {
t.Fatalf("not-found error %q should include the resolved path", err)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Assert the full resolved path, not just filename suffix.

The current check can pass without validating the key contract (resolved absolute path in the error). Assert against the expected full path built from l.Root() and key.

Suggested test hardening
 	// The error must name the resolved path so an operator can immediately see
 	// WHERE the worker looked.
-	if !strings.Contains(err.Error(), "source.pdf") {
+	wantPath := filepath.Join(l.Root(), filepath.FromSlash("documents/missing/source.pdf"))
+	if !strings.Contains(err.Error(), wantPath) {
 		t.Fatalf("not-found error %q should include the resolved path", err)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// The error must name the resolved path so an operator can immediately see
// WHERE the worker looked.
if !strings.Contains(err.Error(), "source.pdf") {
t.Fatalf("not-found error %q should include the resolved path", err)
}
// The error must name the resolved path so an operator can immediately see
// WHERE the worker looked.
wantPath := filepath.Join(l.Root(), filepath.FromSlash("documents/missing/source.pdf"))
if !strings.Contains(err.Error(), wantPath) {
t.Fatalf("not-found error %q should include the resolved path", err)
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/storage/local_test.go` around lines 41 - 45, The test assertion in the
error message validation only checks if the error contains the filename suffix
"source.pdf", but it should validate the full resolved absolute path to ensure
the contract is properly tested. Construct the expected full resolved path by
combining l.Root() with the key, then update the assertion to check that
err.Error() contains this complete path instead of just the filename suffix
"source.pdf". This ensures the test properly validates that the error message
includes the resolved absolute path that an operator needs to see.

@hallelx2 hallelx2 merged commit ebb9c10 into main Jun 20, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant