-
Notifications
You must be signed in to change notification settings - Fork 0
Bulk-ingest resilience: stop retrying deterministic failures; stabilize local storage root (HAL-321) #48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk-ingest resilience: stop retrying deterministic failures; stabilize local storage root (HAL-321) #48
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| package ingest | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "testing" | ||
| ) | ||
|
|
||
| // The classifier decides whether a parser failure is worth retrying. Getting | ||
| // this wrong is expensive in both directions: marking a transient timeout | ||
| // permanent dead-letters a document that would have parsed on a quieter | ||
| // attempt; marking a deterministic rejection transient burns the whole retry | ||
| // budget (and, as seen in HAL-321, interleaves confusing "object not found" | ||
| // errors into the job history). | ||
| func TestIsTransientParseErr(t *testing.T) { | ||
| cases := []struct { | ||
| name string | ||
| err error | ||
| transient bool | ||
| }{ | ||
| {"deadline exceeded", context.DeadlineExceeded, true}, | ||
| {"canceled", context.Canceled, true}, | ||
| {"wrapped deadline", fmt.Errorf("pdf: parse cancelled: %w", context.DeadlineExceeded), true}, | ||
| {"pdf timeout message", errors.New("pdf: parse exceeded 5m0s — document too complex or malformed"), true}, | ||
| {"pdf cancelled message", errors.New("pdf: parse cancelled: context deadline exceeded"), true}, | ||
| {"encrypted", errors.New("pdf: open: encrypted and could not be unlocked with empty password: x"), false}, | ||
| {"backend rejected", errors.New("pdf: open: ledongthuc/pdf backend rejected the document: malformed PDF: 256-bit encryption key"), false}, | ||
| {"no extractable text", errors.New("pdf: parsed but no extractable text — the document may be a scanned image"), false}, | ||
| } | ||
| for _, tc := range cases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| if got := isTransientParseErr(tc.err); got != tc.transient { | ||
| t.Fatalf("isTransientParseErr(%q) = %v, want %v", tc.err, got, tc.transient) | ||
| } | ||
| }) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| package queue | ||
|
|
||
| import ( | ||
| "errors" | ||
| "fmt" | ||
| "testing" | ||
| ) | ||
|
|
||
| func TestPermanentWrapsAndUnwraps(t *testing.T) { | ||
| base := errors.New("encrypted PDF") | ||
| perm := Permanent(base) | ||
|
|
||
| if !IsPermanent(perm) { | ||
| t.Fatal("IsPermanent should be true for a wrapped permanent error") | ||
| } | ||
| if !errors.Is(perm, base) { | ||
| t.Fatal("errors.Is should still match the wrapped cause") | ||
| } | ||
| if perm.Error() != base.Error() { | ||
| t.Fatalf("Error() = %q, want %q", perm.Error(), base.Error()) | ||
| } | ||
| } | ||
|
|
||
| func TestPermanentDetectedThroughWrapping(t *testing.T) { | ||
| // A permanent error wrapped again with fmt.Errorf("%w") must still be | ||
| // detectable — the ingest pipeline returns queue.Permanent(...) which a | ||
| // caller may decorate with stage context. | ||
| wrapped := fmt.Errorf("parse: %w", Permanent(errors.New("malformed"))) | ||
| if !IsPermanent(wrapped) { | ||
|
Comment on lines
+24
to
+29
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion (testing): Consider adding a test that verifies river workers translate Permanent errors into JobCancel and stop retries Current tests cover Suggested implementation: import (
"context"
"errors"
"fmt"
"testing"
"github.com/riverqueue/river"
)func TestPermanentDetectedThroughWrapping(t *testing.T) {
// A permanent error wrapped again with fmt.Errorf("%w") must still be
// detectable — the ingest pipeline returns queue.Permanent(...) which a
// caller may decorate with stage context.
wrapped := fmt.Errorf("parse: %w", Permanent(errors.New("malformed")))
if !IsPermanent(wrapped) {
t.Fatal("IsPermanent should see through an outer fmt.Errorf wrap")
}
}
func TestPermanentErrorCancelsRiverJob(t *testing.T) {
t.Parallel()
// Construct a worker whose handler always returns a Permanent error.
//
// The expectation is that the queue worker layer will translate this into a
// river.JobCancel so that River marks the job as canceled and does not
// retry it.
worker := &envelopeWorker{
handler: func(ctx context.Context, env *Envelope) error {
return Permanent(errors.New("malformed"))
},
}
ctx := context.Background()
// Use a minimal job instance; adapt fields as needed for your codebase.
job := &river.Job{}
err := worker.Work(ctx, job)
if err == nil {
t.Fatalf("Work() error = nil, want river.JobCancel")
}
var jobCancel river.JobCancel
if !errors.As(err, &jobCancel) {
t.Fatalf("Work() error = %T (%v), want river.JobCancel", err, err)
}
}The suggested test assumes:
To fully integrate this test, you may need to:
|
||
| t.Fatal("IsPermanent should see through an outer fmt.Errorf wrap") | ||
| } | ||
| } | ||
|
|
||
| func TestPermanentOfNilIsNil(t *testing.T) { | ||
| if Permanent(nil) != nil { | ||
| t.Fatal("Permanent(nil) must be nil so success paths stay clean") | ||
| } | ||
| if IsPermanent(nil) { | ||
| t.Fatal("IsPermanent(nil) must be false") | ||
| } | ||
| } | ||
|
|
||
| func TestOrdinaryErrorIsNotPermanent(t *testing.T) { | ||
| if IsPermanent(errors.New("storage: object not found")) { | ||
| t.Fatal("a plain (transient) error must not be classified permanent") | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,13 +18,31 @@ type Local struct { | |
|
|
||
| // NewLocal returns a Local storage rooted at dir. The directory is created | ||
| // if it does not exist. | ||
| // | ||
| // dir is resolved to an ABSOLUTE path up front. A relative root (the default | ||
| // is "./data/documents") is otherwise resolved against the process's current | ||
| // working directory on every call — so if the engine is ever relaunched from a | ||
| // different directory while the queue still holds jobs that reference earlier | ||
| // uploads (River persists jobs in Postgres across restarts), the worker would | ||
| // look under a different root than the one the bytes were written to and fail | ||
| // with "object not found" on a file that is in fact on disk. Pinning the root | ||
| // to an absolute path at construction makes it stable for the process lifetime. | ||
| func NewLocal(dir string) (*Local, error) { | ||
| if err := os.MkdirAll(dir, 0o755); err != nil { | ||
| abs, err := filepath.Abs(dir) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("resolve storage root %q: %w", dir, err) | ||
| } | ||
| if err := os.MkdirAll(abs, 0o755); err != nil { | ||
| return nil, fmt.Errorf("create storage root: %w", err) | ||
| } | ||
| return &Local{root: dir}, nil | ||
| return &Local{root: abs}, nil | ||
| } | ||
|
|
||
| // Root returns the absolute filesystem path the storage is rooted at. Exposed | ||
| // so the engine can log the resolved root at boot (the single most useful fact | ||
| // when diagnosing an "object not found" on a file that appears to be on disk). | ||
| func (l *Local) Root() string { return l.root } | ||
|
|
||
| func (l *Local) path(key string) string { | ||
| // Keys may include slashes; treat them as path separators. | ||
| return filepath.Join(l.root, filepath.FromSlash(key)) | ||
|
|
@@ -62,7 +80,11 @@ func (l *Local) Get(ctx context.Context, key string) (io.ReadCloser, Metadata, e | |
| info, err := os.Stat(full) | ||
| if err != nil { | ||
| if errors.Is(err, os.ErrNotExist) { | ||
| return nil, Metadata{}, ErrNotFound | ||
| // Wrap ErrNotFound (errors.Is still matches) but carry the resolved | ||
| // absolute path so the failure is self-diagnosing: a caller seeing | ||
| // this in a log can immediately tell whether it looked in the wrong | ||
| // root vs. the bytes genuinely being absent. | ||
| return nil, Metadata{}, fmt.Errorf("%w: %s", ErrNotFound, full) | ||
|
Comment on lines
82
to
+87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚨 suggestion (security): Including the absolute path in the ErrNotFound wrapper may leak filesystem layout to callers/logs. This improves diagnostics but also exposes the exact filesystem path wherever the error is surfaced. If errors can reach external boundaries (API responses, shared logs, multi-tenant contexts), this may leak host layout. Consider either restricting the full path to internal logs (and using a more generic error message outward), or storing Suggested implementation: type notFoundError struct {
// path holds the absolute filesystem path that was probed. It is not
// included in Error() to avoid leaking filesystem layout across process
// boundaries, but is available to internal callers via errors.As.
path string
}
func (e notFoundError) Error() string {
// Preserve the existing outward-facing message while keeping the path
// available for internal diagnostics.
return ErrNotFound.Error()
}
func (e notFoundError) Unwrap() error {
// Maintain errors.Is(err, ErrNotFound) semantics.
return ErrNotFound
}
// Root returns the absolute filesystem path the storage is rooted at. Exposed
// so the engine can log the resolved root at boot (the single most useful fact
// when diagnosing an "object not found" on a file that appears to be on disk).
func (l *Local) Root() string { return l.root } // Wrap ErrNotFound (errors.Is still matches) but carry the resolved
// absolute path in a structured error so it can be logged internally
// without leaking filesystem layout via Error().
return nil, Metadata{}, notFoundError{path: full}To fully benefit from this change, any logging or diagnostics that previously relied on the error string to see the path should be updated to use
|
||
| } | ||
| return nil, Metadata{}, err | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,76 @@ | ||||||||||||||||||||||||
| package storage | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||
| "bytes" | ||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||
| "errors" | ||||||||||||||||||||||||
| "io" | ||||||||||||||||||||||||
| "os" | ||||||||||||||||||||||||
| "path/filepath" | ||||||||||||||||||||||||
| "strings" | ||||||||||||||||||||||||
| "testing" | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func TestLocalResolvesRootToAbsolute(t *testing.T) { | ||||||||||||||||||||||||
| // A relative root must be pinned to an absolute path so the worker reads | ||||||||||||||||||||||||
| // from the same place regardless of the process's working directory at | ||||||||||||||||||||||||
| // read time (the HAL-321 "object not found on a file that is on disk" bug). | ||||||||||||||||||||||||
| dir := t.TempDir() | ||||||||||||||||||||||||
| rel, err := filepath.Rel(mustGetwd(t), dir) | ||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||
| t.Skipf("cannot relativise temp dir: %v", err) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| l, err := NewLocal(rel) | ||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||
| t.Fatalf("NewLocal: %v", err) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| if !filepath.IsAbs(l.Root()) { | ||||||||||||||||||||||||
| t.Fatalf("Root() = %q, want absolute", l.Root()) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func TestLocalGetNotFoundCarriesPath(t *testing.T) { | ||||||||||||||||||||||||
| l, err := NewLocal(t.TempDir()) | ||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||
| t.Fatalf("NewLocal: %v", err) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| _, _, err = l.Get(context.Background(), "documents/missing/source.pdf") | ||||||||||||||||||||||||
| if !errors.Is(err, ErrNotFound) { | ||||||||||||||||||||||||
| t.Fatalf("err = %v, want ErrNotFound", err) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| // The error must name the resolved path so an operator can immediately see | ||||||||||||||||||||||||
|
Comment on lines
+32
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion (testing): Strengthen the not-found error test by asserting the full absolute path (or at least that the path is absolute) Since the change is about surfacing the resolved absolute path, this test should assert on that behavior rather than just the filename. For example, check that the error string includes Suggested implementation: func TestLocalGetNotFoundCarriesPath(t *testing.T) {
l, err := NewLocal(t.TempDir())
if err != nil {
t.Fatalf("NewLocal: %v", err)
}
_, _, err = l.Get(context.Background(), "documents/missing/source.pdf")
if !errors.Is(err, ErrNotFound) {
t.Fatalf("err = %v, want ErrNotFound", err)
}
msg := err.Error()
// The error must name the resolved path so an operator can immediately see
// WHERE the worker looked.
if !strings.Contains(msg, "source.pdf") {
t.Fatalf("not-found error %q should include the filename", msg)
}
// The error should surface the resolved absolute path rooted at l.Root().
wantPath := filepath.Join(l.Root(), "documents", "missing", "source.pdf")
if !strings.Contains(msg, wantPath) {
t.Fatalf("not-found error %q should include resolved absolute path %q", msg, wantPath)
}
// Sanity check that the expected path is absolute; if this ever fails, the
// test itself is misconfigured.
if !filepath.IsAbs(wantPath) {
t.Fatalf("test bug: wantPath %q must be absolute", wantPath)
}
}To compile, ensure
If any of these are missing from the import block, add them accordingly, respecting the existing grouping and style in the file. |
||||||||||||||||||||||||
| // WHERE the worker looked. | ||||||||||||||||||||||||
| if !strings.Contains(err.Error(), "source.pdf") { | ||||||||||||||||||||||||
| t.Fatalf("not-found error %q should include the resolved path", err) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
Comment on lines
+41
to
+45
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assert the full resolved path, not just filename suffix. The current check can pass without validating the key contract (resolved absolute path in the error). Assert against the expected full path built from Suggested test hardening // The error must name the resolved path so an operator can immediately see
// WHERE the worker looked.
- if !strings.Contains(err.Error(), "source.pdf") {
+ wantPath := filepath.Join(l.Root(), filepath.FromSlash("documents/missing/source.pdf"))
+ if !strings.Contains(err.Error(), wantPath) {
t.Fatalf("not-found error %q should include the resolved path", err)
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func TestLocalPutThenGetRoundTrip(t *testing.T) { | ||||||||||||||||||||||||
| l, err := NewLocal(t.TempDir()) | ||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||
| t.Fatalf("NewLocal: %v", err) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| ctx := context.Background() | ||||||||||||||||||||||||
| want := []byte("hello vectorless") | ||||||||||||||||||||||||
| if err := l.Put(ctx, "documents/doc1/source.pdf", bytes.NewReader(want), Metadata{}); err != nil { | ||||||||||||||||||||||||
| t.Fatalf("Put: %v", err) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| rc, _, err := l.Get(ctx, "documents/doc1/source.pdf") | ||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||
| t.Fatalf("Get: %v", err) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| defer func() { _ = rc.Close() }() | ||||||||||||||||||||||||
| got, _ := io.ReadAll(rc) | ||||||||||||||||||||||||
| if !bytes.Equal(got, want) { | ||||||||||||||||||||||||
| t.Fatalf("round-trip = %q, want %q", got, want) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func mustGetwd(t *testing.T) string { | ||||||||||||||||||||||||
| t.Helper() | ||||||||||||||||||||||||
| wd, err := os.Getwd() | ||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||
| t.Fatalf("getwd: %v", err) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| return wd | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Reliance on parser error message substrings for transient classification is brittle.
These string checks make transient vs permanent classification dependent on exact error wording and capitalization, so a small message change could silently reclassify timeouts as permanent.
Prefer a more robust signal, e.g. typed/sentinel errors from the parser with
errors.Is/errors.As. If that’s not available, at least normalize the message (e.g.strings.ToLower) and/or rely on a structured marker rather than free text, and ensure the source of these strings is clearly constrained.Suggested implementation:
If the underlying parser can expose typed/sentinel errors for deadline and cancellation conditions, prefer to:
var ErrParseDeadlineExceeded = errors.New("...")) or custom types in the parser package.errors.Is(err, parser.ErrParseDeadlineExceeded)/errors.Is(err, parser.ErrParseCancelled)checks.