Bulk-ingest resilience: stop retrying deterministic failures; stabilize local storage root (HAL-321)#48
Conversation
A handler can now return queue.Permanent(err) to mark a failure as non-retryable. The River backend translates it to river.JobCancel, which dead-letters the job immediately instead of burning the full retry budget on an input that can never succeed. Transient failures stay ordinary errors and keep retrying. Fixes the HAL-321 symptom where an encrypted/malformed PDF was retried 5x, interleaving confusing 'object not found' errors into its history.
parse() now splits failures: a not-yet-visible source and a parse timeout stay transient (retry can recover them under load); an encrypted/malformed/no-text document is wrapped queue.Permanent so the queue cancels it. fail() marks a permanent failure as terminal immediately, regardless of attempt number, so a cancelled doc no longer wedges in 'parsing' forever.
NewLocal resolves its root with filepath.Abs so a relative default (./data/documents) can't resolve against a different working directory after an engine restart while River still holds jobs for earlier uploads — the path drift that made a source 'not found' though it was on disk. Get's not-found error now carries the resolved path, and the engine logs the absolute root at boot (also where to point a Defender exclusion).
Reviewer's GuideIntroduces a PermanentError mechanism so ingest handlers can mark deterministic parse failures as non-retryable, updates the ingest pipeline and River queue integration to stop retrying such jobs, and hardens local storage by pinning its root to an absolute path and enriching not-found errors, with accompanying tests and engine logging. Sequence diagram for ingest parse failures with PermanentError and River cancellationsequenceDiagram
participant Pipeline
participant ParserRegistry
participant QueueWorker
participant River
QueueWorker->>Pipeline: parse(ctx, parsers, payload)
Pipeline->>ParserRegistry: Parse(ctx, contentType, filename, rc)
ParserRegistry-->>Pipeline: perr
alt [isTransientParseErr(perr)]
Pipeline-->>QueueWorker: perr
QueueWorker->>River: Work returns perr
River-->>QueueWorker: retry job later
else [!isTransientParseErr(perr)]
Pipeline->>QueueWorker: queue.Permanent(perr)
QueueWorker->>River: Work returns err
River->>QueueWorker: IsPermanent(err)
alt [IsPermanent(err)]
QueueWorker->>River: JobCancel(err)
River-->>QueueWorker: dead-letter job, stop retries
end
end
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
📝 WalkthroughWalkthroughAdds a ChangesPermanent Error Classification
Local Storage Path Resolution
Sequence Diagram(s)sequenceDiagram
participant River as River Scheduler
participant envelopeWorker
participant Pipeline
participant isTransientParseErr
River->>envelopeWorker: Work(ctx, job)
envelopeWorker->>Pipeline: handle parse
Pipeline->>isTransientParseErr: classify(err)
isTransientParseErr-->>Pipeline: transient=true/false
alt transient
Pipeline-->>envelopeWorker: raw err
envelopeWorker-->>River: err (retry scheduled)
else permanent
Pipeline-->>envelopeWorker: queue.Permanent(err)
envelopeWorker->>envelopeWorker: IsPermanent → true
envelopeWorker-->>River: river.JobCancel(err) — dead-letter
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- The
isTransientParseErrclassification relies on specific substrings in error messages, which is brittle to upstream message changes; consider using well-known error types/sentinels or wrapping your own typed errors in the parser layer so this logic is less dependent on exact strings. PermanentError.Error()currently returns only the underlying error message, which can make it hard to see in logs that a failure was marked permanent; consider adding a short prefix (while still unwrapping correctly) so permanence is observable without needingIsPermanent.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `isTransientParseErr` classification relies on specific substrings in error messages, which is brittle to upstream message changes; consider using well-known error types/sentinels or wrapping your own typed errors in the parser layer so this logic is less dependent on exact strings.
- `PermanentError.Error()` currently returns only the underlying error message, which can make it hard to see in logs that a failure was marked permanent; consider adding a short prefix (while still unwrapping correctly) so permanence is observable without needing `IsPermanent`.
## Individual Comments
### Comment 1
<location path="pkg/ingest/ingest.go" line_range="599-605" />
<code_context>
+// timeouts and context cancellations are load-dependent (a complex document may
+// finish within budget on a quieter attempt), so they are transient. A genuine
+// structural rejection is not.
+func isTransientParseErr(err error) bool {
+ if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
+ return true
+ }
+ // The PDF parser's own deadline wrapper formats a plain (unwrapped) message
+ // when its internal timer fires, so match it textually as a fallback.
+ msg := err.Error()
+ return strings.Contains(msg, "parse exceeded") || strings.Contains(msg, "parse cancelled")
}
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Reliance on parser error message substrings for transient classification is brittle.
These string checks make transient vs permanent classification dependent on exact error wording and capitalization, so a small message change could silently reclassify timeouts as permanent.
Prefer a more robust signal, e.g. typed/sentinel errors from the parser with `errors.Is`/`errors.As`. If that’s not available, at least normalize the message (e.g. `strings.ToLower`) and/or rely on a structured marker rather than free text, and ensure the source of these strings is clearly constrained.
Suggested implementation:
```golang
// isTransientParseErr reports whether a parser error is worth retrying.
// Parse timeouts and context cancellations are load-dependent (a complex
// document may finish within budget on a quieter attempt), so they are
// treated as transient. A genuine structural rejection is not.
//
// The fallback string matching below is constrained to the parser's own
// deadline wrapper messages, which are kept stable and lowercase; we
// normalize the error text before matching to avoid brittleness from
// casing or minor wording changes.
func isTransientParseErr(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return true
}
// The PDF parser's own deadline wrapper formats a plain (unwrapped) message
// when its internal timer fires; normalize before matching to reduce
// dependence on exact wording/capitalization.
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "parse exceeded") || strings.Contains(msg, "parse cancelled")
}
```
If the underlying parser can expose typed/sentinel errors for deadline and cancellation conditions, prefer to:
1. Define exported sentinel errors (e.g. `var ErrParseDeadlineExceeded = errors.New("...")`) or custom types in the parser package.
2. Replace the string matching block with `errors.Is(err, parser.ErrParseDeadlineExceeded)` / `errors.Is(err, parser.ErrParseCancelled)` checks.
</issue_to_address>
### Comment 2
<location path="pkg/storage/local.go" line_range="82-87" />
<code_context>
info, err := os.Stat(full)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
- return nil, Metadata{}, ErrNotFound
+ // Wrap ErrNotFound (errors.Is still matches) but carry the resolved
+ // absolute path so the failure is self-diagnosing: a caller seeing
+ // this in a log can immediately tell whether it looked in the wrong
+ // root vs. the bytes genuinely being absent.
+ return nil, Metadata{}, fmt.Errorf("%w: %s", ErrNotFound, full)
}
return nil, Metadata{}, err
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Including the absolute path in the ErrNotFound wrapper may leak filesystem layout to callers/logs.
This improves diagnostics but also exposes the exact filesystem path wherever the error is surfaced. If errors can reach external boundaries (API responses, shared logs, multi-tenant contexts), this may leak host layout.
Consider either restricting the full path to internal logs (and using a more generic error message outward), or storing `full` in a non-string field (e.g., a custom error type) so it’s available for logging without being embedded directly in `Error()`.
Suggested implementation:
```golang
type notFoundError struct {
// path holds the absolute filesystem path that was probed. It is not
// included in Error() to avoid leaking filesystem layout across process
// boundaries, but is available to internal callers via errors.As.
path string
}
func (e notFoundError) Error() string {
// Preserve the existing outward-facing message while keeping the path
// available for internal diagnostics.
return ErrNotFound.Error()
}
func (e notFoundError) Unwrap() error {
// Maintain errors.Is(err, ErrNotFound) semantics.
return ErrNotFound
}
// Root returns the absolute filesystem path the storage is rooted at. Exposed
// so the engine can log the resolved root at boot (the single most useful fact
// when diagnosing an "object not found" on a file that appears to be on disk).
func (l *Local) Root() string { return l.root }
```
```golang
// Wrap ErrNotFound (errors.Is still matches) but carry the resolved
// absolute path in a structured error so it can be logged internally
// without leaking filesystem layout via Error().
return nil, Metadata{}, notFoundError{path: full}
```
To fully benefit from this change, any logging or diagnostics that previously relied on the error string to see the path should be updated to use `errors.As` to extract the `notFoundError`:
1. Where you log these errors, add something like:
```go
var nfErr notFoundError
if errors.As(err, &nfErr) {
logger.With("path", nfErr.path).Warn("object not found")
}
```
2. Ensure that `ErrNotFound` continues to carry whatever user-facing message you want for external boundaries, since `notFoundError.Error()` now defers to `ErrNotFound.Error()`.
</issue_to_address>
### Comment 3
<location path="pkg/queue/permanent_test.go" line_range="24-29" />
<code_context>
+ }
+}
+
+func TestPermanentDetectedThroughWrapping(t *testing.T) {
+ // A permanent error wrapped again with fmt.Errorf("%w") must still be
+ // detectable — the ingest pipeline returns queue.Permanent(...) which a
+ // caller may decorate with stage context.
+ wrapped := fmt.Errorf("parse: %w", Permanent(errors.New("malformed")))
+ if !IsPermanent(wrapped) {
+ t.Fatal("IsPermanent should see through an outer fmt.Errorf wrap")
+ }
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test that verifies river workers translate Permanent errors into JobCancel and stop retries
Current tests cover `Permanent` and `IsPermanent` wrapping semantics, but not the new River worker behavior that turns a `Permanent` error into `river.JobCancel` and stops retries. Please add a focused test in the `queue` package that constructs an `envelopeWorker` with a handler returning `queue.Permanent(errors.New("malformed"))` and asserts that `Work` yields `river.JobCancel` (or that River marks the job as canceled/dead-lettered). This ensures the higher-level retry policy actually honors `Permanent` errors, not just their detection in isolation.
Suggested implementation:
```golang
import (
"context"
"errors"
"fmt"
"testing"
"github.com/riverqueue/river"
)
```
```golang
func TestPermanentDetectedThroughWrapping(t *testing.T) {
// A permanent error wrapped again with fmt.Errorf("%w") must still be
// detectable — the ingest pipeline returns queue.Permanent(...) which a
// caller may decorate with stage context.
wrapped := fmt.Errorf("parse: %w", Permanent(errors.New("malformed")))
if !IsPermanent(wrapped) {
t.Fatal("IsPermanent should see through an outer fmt.Errorf wrap")
}
}
func TestPermanentErrorCancelsRiverJob(t *testing.T) {
t.Parallel()
// Construct a worker whose handler always returns a Permanent error.
//
// The expectation is that the queue worker layer will translate this into a
// river.JobCancel so that River marks the job as canceled and does not
// retry it.
worker := &envelopeWorker{
handler: func(ctx context.Context, env *Envelope) error {
return Permanent(errors.New("malformed"))
},
}
ctx := context.Background()
// Use a minimal job instance; adapt fields as needed for your codebase.
job := &river.Job{}
err := worker.Work(ctx, job)
if err == nil {
t.Fatalf("Work() error = nil, want river.JobCancel")
}
var jobCancel river.JobCancel
if !errors.As(err, &jobCancel) {
t.Fatalf("Work() error = %T (%v), want river.JobCancel", err, err)
}
}
```
The suggested test assumes:
1. There is an `envelopeWorker` type in the `queue` package with a field `handler` compatible with `func(context.Context, *Envelope) error` and a method `Work(ctx context.Context, job *river.Job) error`.
2. There is an `Envelope` type in the same package.
3. `Work` returns a `river.JobCancel` error when the handler returns a `Permanent` error.
To fully integrate this test, you may need to:
1. Adjust the construction of `envelopeWorker` (field names, constructor function, or handler signature) to match the real implementation. For example, if you normally construct workers via a constructor like `newEnvelopeWorker(handler Handler)`, use that instead of struct literal initialization.
2. Provide any required fields on `river.Job` (e.g., `Queue`, `Kind`, or IDs) so that `Work` runs without panicking in your environment.
3. If your worker returns a wrapped `river.JobCancel` (e.g., with `fmt.Errorf("...: %w", river.JobCancel{...})`), keep the `errors.As` assertion as-is; otherwise, if you return a sentinel error, change the assertion to compare directly with that sentinel.
4. If your worker uses a different method name than `Work` (e.g., `Handle`, `Process`), update the test accordingly.
</issue_to_address>
### Comment 4
<location path="pkg/storage/local_test.go" line_range="32-41" />
<code_context>
+ }
+}
+
+func TestLocalGetNotFoundCarriesPath(t *testing.T) {
+ l, err := NewLocal(t.TempDir())
+ if err != nil {
+ t.Fatalf("NewLocal: %v", err)
+ }
+ _, _, err = l.Get(context.Background(), "documents/missing/source.pdf")
+ if !errors.Is(err, ErrNotFound) {
+ t.Fatalf("err = %v, want ErrNotFound", err)
+ }
+ // The error must name the resolved path so an operator can immediately see
+ // WHERE the worker looked.
+ if !strings.Contains(err.Error(), "source.pdf") {
+ t.Fatalf("not-found error %q should include the resolved path", err)
+ }
</code_context>
<issue_to_address>
**suggestion (testing):** Strengthen the not-found error test by asserting the full absolute path (or at least that the path is absolute)
Since the change is about surfacing the resolved absolute path, this test should assert on that behavior rather than just the filename. For example, check that the error string includes `l.Root()` (or `filepath.Join(l.Root(), ...)`), or at least that the path in the message is absolute (e.g., using `filepath.IsAbs`). This will better protect against regressions where the filename remains but the absolute root is dropped.
Suggested implementation:
```golang
func TestLocalGetNotFoundCarriesPath(t *testing.T) {
l, err := NewLocal(t.TempDir())
if err != nil {
t.Fatalf("NewLocal: %v", err)
}
_, _, err = l.Get(context.Background(), "documents/missing/source.pdf")
if !errors.Is(err, ErrNotFound) {
t.Fatalf("err = %v, want ErrNotFound", err)
}
msg := err.Error()
// The error must name the resolved path so an operator can immediately see
// WHERE the worker looked.
if !strings.Contains(msg, "source.pdf") {
t.Fatalf("not-found error %q should include the filename", msg)
}
// The error should surface the resolved absolute path rooted at l.Root().
wantPath := filepath.Join(l.Root(), "documents", "missing", "source.pdf")
if !strings.Contains(msg, wantPath) {
t.Fatalf("not-found error %q should include resolved absolute path %q", msg, wantPath)
}
// Sanity check that the expected path is absolute; if this ever fails, the
// test itself is misconfigured.
if !filepath.IsAbs(wantPath) {
t.Fatalf("test bug: wantPath %q must be absolute", wantPath)
}
}
```
To compile, ensure `local_test.go` imports the packages used in this test:
1. `context`
2. `errors`
3. `strings`
4. `path/filepath` (likely already present, as shown in your snippet)
If any of these are missing from the import block, add them accordingly, respecting the existing grouping and style in the file.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| func isTransientParseErr(err error) bool { | ||
| if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { | ||
| return true | ||
| } | ||
| // The PDF parser's own deadline wrapper formats a plain (unwrapped) message | ||
| // when its internal timer fires, so match it textually as a fallback. | ||
| msg := err.Error() |
There was a problem hiding this comment.
suggestion (bug_risk): Reliance on parser error message substrings for transient classification is brittle.
These string checks make transient vs permanent classification dependent on exact error wording and capitalization, so a small message change could silently reclassify timeouts as permanent.
Prefer a more robust signal, e.g. typed/sentinel errors from the parser with errors.Is/errors.As. If that’s not available, at least normalize the message (e.g. strings.ToLower) and/or rely on a structured marker rather than free text, and ensure the source of these strings is clearly constrained.
Suggested implementation:
// isTransientParseErr reports whether a parser error is worth retrying.
// Parse timeouts and context cancellations are load-dependent (a complex
// document may finish within budget on a quieter attempt), so they are
// treated as transient. A genuine structural rejection is not.
//
// The fallback string matching below is constrained to the parser's own
// deadline wrapper messages, which are kept stable and lowercase; we
// normalize the error text before matching to avoid brittleness from
// casing or minor wording changes.
func isTransientParseErr(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return true
}
// The PDF parser's own deadline wrapper formats a plain (unwrapped) message
// when its internal timer fires; normalize before matching to reduce
// dependence on exact wording/capitalization.
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "parse exceeded") || strings.Contains(msg, "parse cancelled")
}If the underlying parser can expose typed/sentinel errors for deadline and cancellation conditions, prefer to:
- Define exported sentinel errors (e.g.
var ErrParseDeadlineExceeded = errors.New("...")) or custom types in the parser package. - Replace the string matching block with
errors.Is(err, parser.ErrParseDeadlineExceeded)/errors.Is(err, parser.ErrParseCancelled)checks.
| if errors.Is(err, os.ErrNotExist) { | ||
| return nil, Metadata{}, ErrNotFound | ||
| // Wrap ErrNotFound (errors.Is still matches) but carry the resolved | ||
| // absolute path so the failure is self-diagnosing: a caller seeing | ||
| // this in a log can immediately tell whether it looked in the wrong | ||
| // root vs. the bytes genuinely being absent. | ||
| return nil, Metadata{}, fmt.Errorf("%w: %s", ErrNotFound, full) |
There was a problem hiding this comment.
🚨 suggestion (security): Including the absolute path in the ErrNotFound wrapper may leak filesystem layout to callers/logs.
This improves diagnostics but also exposes the exact filesystem path wherever the error is surfaced. If errors can reach external boundaries (API responses, shared logs, multi-tenant contexts), this may leak host layout.
Consider either restricting the full path to internal logs (and using a more generic error message outward), or storing full in a non-string field (e.g., a custom error type) so it’s available for logging without being embedded directly in Error().
Suggested implementation:
type notFoundError struct {
// path holds the absolute filesystem path that was probed. It is not
// included in Error() to avoid leaking filesystem layout across process
// boundaries, but is available to internal callers via errors.As.
path string
}
func (e notFoundError) Error() string {
// Preserve the existing outward-facing message while keeping the path
// available for internal diagnostics.
return ErrNotFound.Error()
}
func (e notFoundError) Unwrap() error {
// Maintain errors.Is(err, ErrNotFound) semantics.
return ErrNotFound
}
// Root returns the absolute filesystem path the storage is rooted at. Exposed
// so the engine can log the resolved root at boot (the single most useful fact
// when diagnosing an "object not found" on a file that appears to be on disk).
func (l *Local) Root() string { return l.root } // Wrap ErrNotFound (errors.Is still matches) but carry the resolved
// absolute path in a structured error so it can be logged internally
// without leaking filesystem layout via Error().
return nil, Metadata{}, notFoundError{path: full}To fully benefit from this change, any logging or diagnostics that previously relied on the error string to see the path should be updated to use errors.As to extract the notFoundError:
- Where you log these errors, add something like:
var nfErr notFoundError if errors.As(err, &nfErr) { logger.With("path", nfErr.path).Warn("object not found") }
- Ensure that
ErrNotFoundcontinues to carry whatever user-facing message you want for external boundaries, sincenotFoundError.Error()now defers toErrNotFound.Error().
| func TestPermanentDetectedThroughWrapping(t *testing.T) { | ||
| // A permanent error wrapped again with fmt.Errorf("%w") must still be | ||
| // detectable — the ingest pipeline returns queue.Permanent(...) which a | ||
| // caller may decorate with stage context. | ||
| wrapped := fmt.Errorf("parse: %w", Permanent(errors.New("malformed"))) | ||
| if !IsPermanent(wrapped) { |
There was a problem hiding this comment.
suggestion (testing): Consider adding a test that verifies river workers translate Permanent errors into JobCancel and stop retries
Current tests cover Permanent and IsPermanent wrapping semantics, but not the new River worker behavior that turns a Permanent error into river.JobCancel and stops retries. Please add a focused test in the queue package that constructs an envelopeWorker with a handler returning queue.Permanent(errors.New("malformed")) and asserts that Work yields river.JobCancel (or that River marks the job as canceled/dead-lettered). This ensures the higher-level retry policy actually honors Permanent errors, not just their detection in isolation.
Suggested implementation:
import (
"context"
"errors"
"fmt"
"testing"
"github.com/riverqueue/river"
)func TestPermanentDetectedThroughWrapping(t *testing.T) {
// A permanent error wrapped again with fmt.Errorf("%w") must still be
// detectable — the ingest pipeline returns queue.Permanent(...) which a
// caller may decorate with stage context.
wrapped := fmt.Errorf("parse: %w", Permanent(errors.New("malformed")))
if !IsPermanent(wrapped) {
t.Fatal("IsPermanent should see through an outer fmt.Errorf wrap")
}
}
func TestPermanentErrorCancelsRiverJob(t *testing.T) {
t.Parallel()
// Construct a worker whose handler always returns a Permanent error.
//
// The expectation is that the queue worker layer will translate this into a
// river.JobCancel so that River marks the job as canceled and does not
// retry it.
worker := &envelopeWorker{
handler: func(ctx context.Context, env *Envelope) error {
return Permanent(errors.New("malformed"))
},
}
ctx := context.Background()
// Use a minimal job instance; adapt fields as needed for your codebase.
job := &river.Job{}
err := worker.Work(ctx, job)
if err == nil {
t.Fatalf("Work() error = nil, want river.JobCancel")
}
var jobCancel river.JobCancel
if !errors.As(err, &jobCancel) {
t.Fatalf("Work() error = %T (%v), want river.JobCancel", err, err)
}
}The suggested test assumes:
- There is an
envelopeWorkertype in thequeuepackage with a fieldhandlercompatible withfunc(context.Context, *Envelope) errorand a methodWork(ctx context.Context, job *river.Job) error. - There is an
Envelopetype in the same package. Workreturns ariver.JobCancelerror when the handler returns aPermanenterror.
To fully integrate this test, you may need to:
- Adjust the construction of
envelopeWorker(field names, constructor function, or handler signature) to match the real implementation. For example, if you normally construct workers via a constructor likenewEnvelopeWorker(handler Handler), use that instead of struct literal initialization. - Provide any required fields on
river.Job(e.g.,Queue,Kind, or IDs) so thatWorkruns without panicking in your environment. - If your worker returns a wrapped
river.JobCancel(e.g., withfmt.Errorf("...: %w", river.JobCancel{...})), keep theerrors.Asassertion as-is; otherwise, if you return a sentinel error, change the assertion to compare directly with that sentinel. - If your worker uses a different method name than
Work(e.g.,Handle,Process), update the test accordingly.
| func TestLocalGetNotFoundCarriesPath(t *testing.T) { | ||
| l, err := NewLocal(t.TempDir()) | ||
| if err != nil { | ||
| t.Fatalf("NewLocal: %v", err) | ||
| } | ||
| _, _, err = l.Get(context.Background(), "documents/missing/source.pdf") | ||
| if !errors.Is(err, ErrNotFound) { | ||
| t.Fatalf("err = %v, want ErrNotFound", err) | ||
| } | ||
| // The error must name the resolved path so an operator can immediately see |
There was a problem hiding this comment.
suggestion (testing): Strengthen the not-found error test by asserting the full absolute path (or at least that the path is absolute)
Since the change is about surfacing the resolved absolute path, this test should assert on that behavior rather than just the filename. For example, check that the error string includes l.Root() (or filepath.Join(l.Root(), ...)), or at least that the path in the message is absolute (e.g., using filepath.IsAbs). This will better protect against regressions where the filename remains but the absolute root is dropped.
Suggested implementation:
func TestLocalGetNotFoundCarriesPath(t *testing.T) {
l, err := NewLocal(t.TempDir())
if err != nil {
t.Fatalf("NewLocal: %v", err)
}
_, _, err = l.Get(context.Background(), "documents/missing/source.pdf")
if !errors.Is(err, ErrNotFound) {
t.Fatalf("err = %v, want ErrNotFound", err)
}
msg := err.Error()
// The error must name the resolved path so an operator can immediately see
// WHERE the worker looked.
if !strings.Contains(msg, "source.pdf") {
t.Fatalf("not-found error %q should include the filename", msg)
}
// The error should surface the resolved absolute path rooted at l.Root().
wantPath := filepath.Join(l.Root(), "documents", "missing", "source.pdf")
if !strings.Contains(msg, wantPath) {
t.Fatalf("not-found error %q should include resolved absolute path %q", msg, wantPath)
}
// Sanity check that the expected path is absolute; if this ever fails, the
// test itself is misconfigured.
if !filepath.IsAbs(wantPath) {
t.Fatalf("test bug: wantPath %q must be absolute", wantPath)
}
}To compile, ensure local_test.go imports the packages used in this test:
contexterrorsstringspath/filepath(likely already present, as shown in your snippet)
If any of these are missing from the import block, add them accordingly, respecting the existing grouping and style in the file.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/storage/local_test.go`:
- Around line 41-45: The test assertion in the error message validation only
checks if the error contains the filename suffix "source.pdf", but it should
validate the full resolved absolute path to ensure the contract is properly
tested. Construct the expected full resolved path by combining l.Root() with the
key, then update the assertion to check that err.Error() contains this complete
path instead of just the filename suffix "source.pdf". This ensures the test
properly validates that the error message includes the resolved absolute path
that an operator needs to see.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 01fda931-7091-4149-baf3-091baaa5fa40
📒 Files selected for processing (8)
cmd/engine/main.gopkg/ingest/ingest.gopkg/ingest/parse_classify_test.gopkg/queue/permanent_test.gopkg/queue/queue.gopkg/queue/river.gopkg/storage/local.gopkg/storage/local_test.go
| // The error must name the resolved path so an operator can immediately see | ||
| // WHERE the worker looked. | ||
| if !strings.Contains(err.Error(), "source.pdf") { | ||
| t.Fatalf("not-found error %q should include the resolved path", err) | ||
| } |
There was a problem hiding this comment.
Assert the full resolved path, not just filename suffix.
The current check can pass without validating the key contract (resolved absolute path in the error). Assert against the expected full path built from l.Root() and key.
Suggested test hardening
// The error must name the resolved path so an operator can immediately see
// WHERE the worker looked.
- if !strings.Contains(err.Error(), "source.pdf") {
+ wantPath := filepath.Join(l.Root(), filepath.FromSlash("documents/missing/source.pdf"))
+ if !strings.Contains(err.Error(), wantPath) {
t.Fatalf("not-found error %q should include the resolved path", err)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // The error must name the resolved path so an operator can immediately see | |
| // WHERE the worker looked. | |
| if !strings.Contains(err.Error(), "source.pdf") { | |
| t.Fatalf("not-found error %q should include the resolved path", err) | |
| } | |
| // The error must name the resolved path so an operator can immediately see | |
| // WHERE the worker looked. | |
| wantPath := filepath.Join(l.Root(), filepath.FromSlash("documents/missing/source.pdf")) | |
| if !strings.Contains(err.Error(), wantPath) { | |
| t.Fatalf("not-found error %q should include the resolved path", err) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/storage/local_test.go` around lines 41 - 45, The test assertion in the
error message validation only checks if the error contains the filename suffix
"source.pdf", but it should validate the full resolved absolute path to ensure
the contract is properly tested. Construct the expected full resolved path by
combining l.Root() with the key, then update the assertion to check that
err.Error() contains this complete path instead of just the filename suffix
"source.pdf". This ensures the test properly validates that the error message
includes the resolved absolute path that an operator needs to see.
Root cause (evidence-backed)
Inspecting the live River queue + disk proved HAL-321 is two distinct problems, not one:
(A) Transient
object not found— a Windows filesystem/AV interaction, not a write race.The per-attempt error history shows the same source file flipping found↔not-found across attempts minutes apart (e.g. job 93: read OK on attempts 1/3/4/5,
not foundon attempt 2). All 5 permanently-discarded jobs' source files exist on disk right now, full-size, at the exact path the worker computes. A file present at the computed path but intermittently invisible =os.StatreturningFILE_NOT_FOUNDon an existing file — the signature of Windows Defender real-time scanning (confirmed ON) a freshly-written multi-MB PDF; under heavy concurrent ingest those scan windows overlap the worker's reads. fsync + retry only papered over it.(B) Deterministic failures retried 5×. An encrypted PDF (
256-bit encryption key) and parse-timeouts were retried like transients, burning the retry budget and interleaving confusingnot founderrors into the history.Fix
queue.PermanentError+ RiverJobCancel: a handler can mark a failure non-retryable; River dead-letters it immediately instead of retrying 5×.fail()marks a permanent failure terminal immediately so a cancelled doc can't wedge inparsingforever.filepath.AbsinNewLocal) so a relative default can't resolve against a different CWD after a restart while River holds older jobs (the path-drift variant). Not-found errors now carry the resolved path; the engine logs the absolute root at boot (also where to point a Defender exclusion).Tests
pkg/queue/permanent_test.go— wrap/unwrap/IsPermanent through fmt wrapping.pkg/ingest/parse_classify_test.go— transient vs permanent for every real error string.pkg/storage/local_test.go— absolute root, not-found carries path, round-trip.go build ./...+go vetclean; fullpkg/ingest,pkg/queue,pkg/storagesuites pass.Operational follow-ups (not code)
VLE_STORAGE_LOCAL_ROOT.Closes HAL-321
Summary by Sourcery
Classify ingest parse failures so deterministically bad documents are treated as permanent errors, and make local storage paths stable and diagnosable to prevent misleading object-not-found errors.
New Features:
Bug Fixes:
Enhancements:
Tests:
Summary by CodeRabbit
New Features
Bug Fixes
Tests