diff --git a/README.md b/README.md index 6f04b1a..45a4062 100644 --- a/README.md +++ b/README.md @@ -164,6 +164,15 @@ llm: > **Anthropic-compatible gateways (GLM/Z.ai):** `base_url` **must include `/v1`** — the client posts to `${base_url}/messages`. +> **Windows + local storage:** Windows Defender real-time protection scans +> each freshly written file and briefly hides it from `os.Stat`/`os.Open`, +> which under heavy concurrent ingestion can surface as transient +> `object not found` errors. The local backend rides through this window with +> a short internal retry, but for large bulk loads **add a Defender exclusion +> for your storage root** (`local.root` / `VLE_STORAGE_LOCAL_ROOT`): +> `Add-MpPreference -ExclusionPath "C:\path\to\data\documents"`. Linux has no +> such scan-hold and needs no exclusion. + ### Supported formats PDF (positioned text + tables via [`pdftable`](https://github.com/hallelx2/pdftable)) · Markdown · HTML · DOCX · Text. diff --git a/internal/api/server.go b/internal/api/server.go index 021819c..93310b2 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -231,6 +231,28 @@ func (d Deps) handleIngestDocument(w http.ResponseWriter, r *http.Request) { docID := ingest.NewDocumentID() + // Idempotent ingest: if the caller supplied an Idempotency-Key and a + // document already exists under it (for the standalone org), return that + // document instead of creating a duplicate. A client — or the SDK's own + // transport retry-on-transient — that re-POSTs the same upload after a + // reset that landed AFTER the server committed the row is then a no-op. + // This is the standalone/local-mode half of the HAL-323 duplicate-upload + // fix (the multi-tenant handler in internal/handler mirrors it). + idemKey := r.Header.Get("Idempotency-Key") + if idemKey != "" { + if existing, err := d.DB.GetDocumentByIdempotencyKey(ctx, standaloneOrgID, idemKey); err == nil { + writeJSON(w, http.StatusAccepted, map[string]any{ + "document_id": existing.ID, + "status": string(existing.Status), + }) + return + } else if !errors.Is(err, db.ErrNotFound) { + d.Logger.Error("ingest: idempotency lookup failed", "err", err) + writeErr(w, http.StatusInternalServerError, "db read failed") + return + } + } + var ( filename string contentType string @@ -300,14 +322,29 @@ func (d Deps) handleIngestDocument(w http.ResponseWriter, r *http.Request) { } if err := d.DB.NewDocument(ctx, db.Document{ - ID: docID, - OrgID: standaloneOrgID, - Title: title, - ContentType: contentType, - SourceRef: key, - Status: db.StatusPending, - ByteSize: size, + ID: docID, + OrgID: standaloneOrgID, + Title: title, + ContentType: contentType, + SourceRef: key, + Status: db.StatusPending, + ByteSize: size, + IdempotencyKey: idemKey, }); err != nil { + // A concurrent same-key ingest won the race: drop the orphan source we + // just wrote and return the winner instead of a 500. + if idemKey != "" && errors.Is(err, db.ErrConflict) { + if delErr := d.Storage.Delete(ctx, key); delErr != nil { + d.Logger.Warn("ingest: orphan source cleanup failed", "err", delErr, "source_ref", key, "idempotency_key", idemKey) + } + if existing, lookupErr := d.DB.GetDocumentByIdempotencyKey(ctx, standaloneOrgID, idemKey); lookupErr == nil { + writeJSON(w, http.StatusAccepted, map[string]any{ + "document_id": existing.ID, + "status": string(existing.Status), + }) + return + } + } d.Logger.Error("ingest: db insert failed", "err", err) writeErr(w, http.StatusInternalServerError, "db write failed") return diff --git a/internal/connecthandler/documents.go b/internal/connecthandler/documents.go index c85b9f3..c9ebc1b 100644 --- a/internal/connecthandler/documents.go +++ b/internal/connecthandler/documents.go @@ -82,6 +82,23 @@ func (s *DocumentsService) CreateDocument( return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("content is required")) } + // Idempotent ingest: a repeat CreateDocument carrying the same + // Idempotency-Key returns the original document rather than a duplicate + // (mirrors the REST path; closes the HAL-323 duplicate-upload bug for SDK + // callers that retry the RPC). + idemKey := req.Header().Get("Idempotency-Key") + if idemKey != "" { + if existing, err := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); err == nil { + return connect.NewResponse(&v1.CreateDocumentResponse{ + DocumentId: string(existing.ID), + Status: string(existing.Status), + }), nil + } else if !errors.Is(err, db.ErrNotFound) { + s.logger.Error("ingest: idempotency lookup failed", "err", err) + return nil, connect.NewError(connect.CodeInternal, errors.New("db read failed")) + } + } + docID := ingest.NewDocumentID() contentType := msg.ContentType if contentType == "" { @@ -105,15 +122,29 @@ func (s *DocumentsService) CreateDocument( } if err := s.db.NewDocument(ctx, db.Document{ - ID: docID, - OrgID: orgID, - StoreID: storeIDFromConnect(req), - Title: title, - ContentType: contentType, - SourceRef: key, - Status: db.StatusPending, - ByteSize: size, + ID: docID, + OrgID: orgID, + StoreID: storeIDFromConnect(req), + Title: title, + ContentType: contentType, + SourceRef: key, + Status: db.StatusPending, + ByteSize: size, + IdempotencyKey: idemKey, }); err != nil { + // Concurrent same-key insert won the race: drop the orphan source and + // return the winner instead of erroring. + if idemKey != "" && errors.Is(err, db.ErrConflict) { + if delErr := s.storage.Delete(ctx, key); delErr != nil { + s.logger.Warn("ingest: orphan source cleanup failed", "err", delErr, "source_ref", key, "idempotency_key", idemKey) + } + if existing, lookupErr := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); lookupErr == nil { + return connect.NewResponse(&v1.CreateDocumentResponse{ + DocumentId: string(existing.ID), + Status: string(existing.Status), + }), nil + } + } s.logger.Error("ingest: db insert failed", "err", err) return nil, connect.NewError(connect.CodeInternal, errors.New("db write failed")) } diff --git a/internal/handler/documents.go b/internal/handler/documents.go index f5d7cef..80dca58 100644 --- a/internal/handler/documents.go +++ b/internal/handler/documents.go @@ -158,6 +158,25 @@ func (h *DocumentsHandler) HandleIngestDocument(w http.ResponseWriter, r *http.R ctx := r.Context() docID := ingest.NewDocumentID() + // Idempotent ingest: if the client supplied an Idempotency-Key and we + // already have a document for this org under that key, return it instead + // of creating a duplicate. This makes a client/transport retry of the + // ingest POST (the HAL-323 duplicate-upload bug) a no-op. + idemKey := r.Header.Get("Idempotency-Key") + if idemKey != "" { + if existing, err := h.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); err == nil { + writeJSON(w, http.StatusAccepted, map[string]any{ + "document_id": existing.ID, + "status": string(existing.Status), + }) + return + } else if !errors.Is(err, db.ErrNotFound) { + h.logger.Error("ingest: idempotency lookup failed", "err", err) + writeErr(w, http.StatusInternalServerError, "db read failed") + return + } + } + var ( filename string contentType string @@ -227,15 +246,31 @@ func (h *DocumentsHandler) HandleIngestDocument(w http.ResponseWriter, r *http.R } if err := h.db.NewDocument(ctx, db.Document{ - ID: docID, - OrgID: orgID, - StoreID: storeID(r), - Title: title, - ContentType: contentType, - SourceRef: key, - Status: db.StatusPending, - ByteSize: size, + ID: docID, + OrgID: orgID, + StoreID: storeID(r), + Title: title, + ContentType: contentType, + SourceRef: key, + Status: db.StatusPending, + ByteSize: size, + IdempotencyKey: idemKey, }); err != nil { + // A concurrent request with the same Idempotency-Key won the race and + // inserted first. Drop the orphan source we just wrote and return the + // winner's document rather than a 500. + if idemKey != "" && errors.Is(err, db.ErrConflict) { + if delErr := h.storage.Delete(ctx, key); delErr != nil { + h.logger.Warn("ingest: orphan source cleanup failed", "err", delErr, "source_ref", key, "idempotency_key", idemKey) + } + if existing, lookupErr := h.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); lookupErr == nil { + writeJSON(w, http.StatusAccepted, map[string]any{ + "document_id": existing.ID, + "status": string(existing.Status), + }) + return + } + } h.logger.Error("ingest: db insert failed", "err", err) writeErr(w, http.StatusInternalServerError, "db write failed") return diff --git a/internal/middleware/idempotency.go b/internal/middleware/idempotency.go index fdba808..671378a 100644 --- a/internal/middleware/idempotency.go +++ b/internal/middleware/idempotency.go @@ -122,13 +122,22 @@ func Idempotency(cfg IdempotencyConfig) func(http.Handler) http.Handler { return } - key := r.Header.Get("Idempotency-Key") - if key == "" { + rawKey := r.Header.Get("Idempotency-Key") + if rawKey == "" { // No key provided — proceed normally. next.ServeHTTP(w, r) return } + // Scope the cache entry by tenant + method + path, NOT the raw + // Idempotency-Key alone. Two orgs legitimately use the same opaque + // key (e.g. "1", a per-corpus id); replaying by the bare key would + // hand org B a 202 cached for org A's document — a cross-tenant + // leak. The org header is absent in standalone/local mode, which is + // single-tenant, so an empty org collapses to one namespace there. + org := r.Header.Get("X-Vectorless-Org") + key := r.Method + "|" + r.URL.Path + "|" + org + "|" + rawKey + // Check cache. if cr, ok := cache.get(key); ok { w.Header().Set("X-Idempotency-Replayed", "true") diff --git a/internal/middleware/idempotency_test.go b/internal/middleware/idempotency_test.go new file mode 100644 index 0000000..e624579 --- /dev/null +++ b/internal/middleware/idempotency_test.go @@ -0,0 +1,89 @@ +package middleware + +import ( + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" +) + +// handler that returns a per-org body and counts invocations, so we can tell a +// cached replay from a real pass-through. +func countingOrgHandler(calls *atomic.Int64) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + calls.Add(1) + w.WriteHeader(http.StatusAccepted) + _, _ = w.Write([]byte("doc-for-" + r.Header.Get("X-Vectorless-Org"))) + }) +} + +func post(mw http.Handler, org, key string) *httptest.ResponseRecorder { + req := httptest.NewRequest(http.MethodPost, "/v1/documents", nil) + if org != "" { + req.Header.Set("X-Vectorless-Org", org) + } + if key != "" { + req.Header.Set("Idempotency-Key", key) + } + rec := httptest.NewRecorder() + mw.ServeHTTP(rec, req) + return rec +} + +func body(rec *httptest.ResponseRecorder) string { + return rec.Body.String() +} + +func TestIdempotencySameOrgReplays(t *testing.T) { + var calls atomic.Int64 + mw := Idempotency(IdempotencyConfig{})(countingOrgHandler(&calls)) + + first := post(mw, "orgA", "k1") + second := post(mw, "orgA", "k1") + + if calls.Load() != 1 { + t.Fatalf("handler invoked %d times, want 1 (second should replay)", calls.Load()) + } + if got := second.Header().Get("X-Idempotency-Replayed"); got != "true" { + t.Fatalf("second response missing replay header, got %q", got) + } + if body(first) != body(second) { + t.Fatalf("replay body %q != original %q", body(second), body(first)) + } +} + +func TestIdempotencyDifferentOrgsDoNotCollide(t *testing.T) { + // The core cross-tenant guarantee: two orgs sending the SAME opaque + // Idempotency-Key must each hit the handler and get THEIR OWN response — + // org B must never be handed org A's cached document. + var calls atomic.Int64 + mw := Idempotency(IdempotencyConfig{})(countingOrgHandler(&calls)) + + a := post(mw, "orgA", "shared-key") + b := post(mw, "orgB", "shared-key") + + if calls.Load() != 2 { + t.Fatalf("handler invoked %d times, want 2 (no cross-org collision)", calls.Load()) + } + if b.Header().Get("X-Idempotency-Replayed") == "true" { + t.Fatal("org B got a replayed response — cross-tenant leak") + } + if body(a) == body(b) { + t.Fatalf("org B received org A's body %q — cross-tenant leak", body(b)) + } + if body(b) != "doc-for-orgB" { + t.Fatalf("org B body = %q, want doc-for-orgB", body(b)) + } +} + +func TestIdempotencyNoKeyAlwaysPassesThrough(t *testing.T) { + var calls atomic.Int64 + mw := Idempotency(IdempotencyConfig{})(countingOrgHandler(&calls)) + + post(mw, "orgA", "") + post(mw, "orgA", "") + + if calls.Load() != 2 { + t.Fatalf("handler invoked %d times, want 2 (no key = no caching)", calls.Load()) + } +} diff --git a/openapi.yaml b/openapi.yaml index b05ac9f..48bb1c3 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -113,6 +113,21 @@ paths: (field: "file") or JSON body with content. Returns immediately with a document_id in "pending" state. The ingest pipeline runs asynchronously: parse → build tree → summarize sections. + + Pass an `Idempotency-Key` header to make ingestion idempotent: a + repeated request with the same key returns the original document + (and its current status) instead of creating a duplicate. Use this + to make a client/network retry of the upload safe. + parameters: + - name: Idempotency-Key + in: header + required: false + description: | + Opaque client-chosen key, unique per document. Repeating a + request with the same key returns the existing document rather + than ingesting a duplicate. + schema: + type: string requestBody: required: true content: diff --git a/pkg/db/db.go b/pkg/db/db.go index 1eec601..a8f007d 100644 --- a/pkg/db/db.go +++ b/pkg/db/db.go @@ -16,6 +16,7 @@ import ( "time" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" ) @@ -25,6 +26,10 @@ var migrationsFS embed.FS // ErrNotFound signals a missing row. var ErrNotFound = errors.New("db: not found") +// ErrConflict signals a unique-constraint violation (SQLSTATE 23505), e.g. +// a second insert with the same (org_id, idempotency_key). +var ErrConflict = errors.New("db: conflict") + // Pool wraps *pgxpool.Pool with engine-specific helpers. type Pool struct { *pgxpool.Pool @@ -136,5 +141,9 @@ func mapErr(err error) error { if errors.Is(err, pgx.ErrNoRows) { return ErrNotFound } + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) && pgErr.Code == "23505" { + return fmt.Errorf("%w: %s", ErrConflict, pgErr.ConstraintName) + } return err } diff --git a/pkg/db/documents.go b/pkg/db/documents.go index 67cbea2..8e518f1 100644 --- a/pkg/db/documents.go +++ b/pkg/db/documents.go @@ -41,8 +41,14 @@ type Document struct { ErrorMessage string ByteSize int64 Metadata map[string]string - CreatedAt time.Time - UpdatedAt time.Time + + // IdempotencyKey is the optional client-supplied Idempotency-Key. + // When non-empty it is unique per (org_id, idempotency_key): a repeat + // ingest with the same key returns the existing document instead of + // creating a duplicate. Empty means "no dedup" (column stored as NULL). + IdempotencyKey string + CreatedAt time.Time + UpdatedAt time.Time // TOCTree is the JSONB blob persisted by the ingest pipeline's // LLM-driven TOC builder ([]tree.TOCNode marshalled). nil @@ -76,14 +82,48 @@ func (p *Pool) NewDocument(ctx context.Context, d Document) error { if storeID == "" { storeID = NilScope } + // NULL (not "") when no key, so the partial unique index ignores the row. + var idemKey any + if d.IdempotencyKey != "" { + idemKey = d.IdempotencyKey + } _, err = p.Exec(ctx, ` - INSERT INTO documents (id, org_id, store_id, title, content_type, source_ref, status, byte_size, metadata) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`, - string(d.ID), d.OrgID, storeID, d.Title, d.ContentType, d.SourceRef, string(d.Status), d.ByteSize, meta, + INSERT INTO documents (id, org_id, store_id, title, content_type, source_ref, status, byte_size, metadata, idempotency_key) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + string(d.ID), d.OrgID, storeID, d.Title, d.ContentType, d.SourceRef, string(d.Status), d.ByteSize, meta, idemKey, ) return mapErr(err) } +// GetDocumentByIdempotencyKey returns the document an org previously ingested +// under key, or ErrNotFound if none exists. Used to short-circuit a repeat +// ingest (client retry / SDK transport retry) back to the original document +// instead of creating a duplicate. +func (p *Pool) GetDocumentByIdempotencyKey(ctx context.Context, orgID, key string) (*Document, error) { + if orgID == "" { + return nil, fmt.Errorf("GetDocumentByIdempotencyKey: orgID is required") + } + if key == "" { + return nil, ErrNotFound + } + row := p.QueryRow(ctx, ` + SELECT id, org_id, store_id, title, content_type, source_ref, status, error_message, + byte_size, metadata, created_at, updated_at, toc_tree, idempotency_key + FROM documents WHERE org_id = $1 AND idempotency_key = $2`, orgID, key) + + var d Document + var status string + var rawMeta, rawTOC []byte + if err := row.Scan(&d.ID, &d.OrgID, &d.StoreID, &d.Title, &d.ContentType, &d.SourceRef, &status, + &d.ErrorMessage, &d.ByteSize, &rawMeta, &d.CreatedAt, &d.UpdatedAt, &rawTOC, &d.IdempotencyKey); err != nil { + return nil, mapErr(err) + } + d.Status = DocumentStatus(status) + d.Metadata = unmarshalMeta(rawMeta) + d.TOCTree = rawTOC + return &d, nil +} + // GetDocument fetches a document scoped to an org and (optionally) a // store. storeID == "" means "don't filter by store" — used by // header-less / pre-stores callers. A non-empty storeID restricts the diff --git a/pkg/db/idempotency_integration_test.go b/pkg/db/idempotency_integration_test.go new file mode 100644 index 0000000..5c7d07d --- /dev/null +++ b/pkg/db/idempotency_integration_test.go @@ -0,0 +1,103 @@ +package db + +import ( + "context" + "errors" + "os" + "testing" + + "github.com/google/uuid" + + "github.com/hallelx2/vectorless-engine/pkg/tree" +) + +// openTestPool connects to the database named by VLE_TEST_DATABASE_URL and +// applies migrations, or skips the test when no test DB is configured. This +// keeps the suite green in environments without Postgres while still letting +// the idempotency guarantees be verified against a real database locally/CI. +func openTestPool(t *testing.T) *Pool { + t.Helper() + url := os.Getenv("VLE_TEST_DATABASE_URL") + if url == "" { + t.Skip("VLE_TEST_DATABASE_URL not set; skipping live-DB idempotency test") + } + ctx := context.Background() + pool, err := Open(ctx, url, 4) + if err != nil { + t.Fatalf("Open: %v", err) + } + if err := pool.Migrate(ctx); err != nil { + t.Fatalf("Migrate: %v", err) + } + t.Cleanup(pool.Close) + return pool +} + +func TestIdempotentIngestDedup(t *testing.T) { + p := openTestPool(t) + ctx := context.Background() + + org := "test-org-" + uuid.NewString() + key := "vlbench:" + org + ":fb-001" + docID := tree.DocumentID("doc_" + uuid.NewString()) + + first := Document{ + ID: docID, OrgID: org, Title: "AMD 10-K", ContentType: "application/pdf", + SourceRef: "documents/" + string(docID) + "/source.pdf", Status: StatusPending, + ByteSize: 123, IdempotencyKey: key, + } + if err := p.NewDocument(ctx, first); err != nil { + t.Fatalf("first NewDocument: %v", err) + } + t.Cleanup(func() { _, _ = p.Exec(ctx, `DELETE FROM documents WHERE org_id = $1`, org) }) + + // Lookup returns the original. + got, err := p.GetDocumentByIdempotencyKey(ctx, org, key) + if err != nil { + t.Fatalf("GetDocumentByIdempotencyKey: %v", err) + } + if got.ID != docID { + t.Fatalf("looked up id = %q, want %q", got.ID, docID) + } + + // A second insert under the same (org, key) — what a client/SDK retry would + // attempt — must collide with ErrConflict, never create a duplicate row. + dup := first + dup.ID = tree.DocumentID("doc_" + uuid.NewString()) + dup.SourceRef = "documents/" + string(dup.ID) + "/source.pdf" + if err := p.NewDocument(ctx, dup); !errors.Is(err, ErrConflict) { + t.Fatalf("duplicate insert err = %v, want ErrConflict", err) + } + + // Exactly one row exists for this key. + var n int + if err := p.QueryRow(ctx, + `SELECT count(*) FROM documents WHERE org_id = $1 AND idempotency_key = $2`, + org, key).Scan(&n); err != nil { + t.Fatalf("count: %v", err) + } + if n != 1 { + t.Fatalf("rows for key = %d, want 1", n) + } +} + +func TestIngestWithoutKeyAllowsDuplicates(t *testing.T) { + p := openTestPool(t) + ctx := context.Background() + org := "test-org-" + uuid.NewString() + t.Cleanup(func() { _, _ = p.Exec(ctx, `DELETE FROM documents WHERE org_id = $1`, org) }) + + // No idempotency key → column is NULL → the partial unique index ignores + // the rows, so two keyless ingests of the same bytes are allowed (today's + // behavior is preserved for callers that don't opt in). + for i := range 2 { + id := tree.DocumentID("doc_" + uuid.NewString()) + err := p.NewDocument(ctx, Document{ + ID: id, OrgID: org, Title: "x", ContentType: "text/plain", + SourceRef: "documents/" + string(id) + "/source.txt", Status: StatusPending, + }) + if err != nil { + t.Fatalf("keyless NewDocument %d: %v", i, err) + } + } +} diff --git a/pkg/db/migrations/0007_documents_idempotency_key.down.sql b/pkg/db/migrations/0007_documents_idempotency_key.down.sql new file mode 100644 index 0000000..efb7879 --- /dev/null +++ b/pkg/db/migrations/0007_documents_idempotency_key.down.sql @@ -0,0 +1,6 @@ +-- 0007_documents_idempotency_key.down.sql + +DROP INDEX IF EXISTS documents_org_idempotency_key_uidx; + +ALTER TABLE documents + DROP COLUMN IF EXISTS idempotency_key; diff --git a/pkg/db/migrations/0007_documents_idempotency_key.up.sql b/pkg/db/migrations/0007_documents_idempotency_key.up.sql new file mode 100644 index 0000000..0d7cbe9 --- /dev/null +++ b/pkg/db/migrations/0007_documents_idempotency_key.up.sql @@ -0,0 +1,22 @@ +-- 0007_documents_idempotency_key.up.sql — idempotent ingestion. +-- +-- The SDK already sends an optional `Idempotency-Key` header on ingest +-- ("Prevents duplicate ingestion"), but the engine ignored it: a client +-- (or its built-in transport retry) that re-POSTs the same upload after a +-- transient network error created a SECOND document with a fresh id, a +-- fresh source object, and a fresh parse job. Under heavy concurrent +-- bulk ingestion on a flaky link this produced the same corpus document +-- ingested up to 6× (HAL-323). +-- +-- Honoring the key requires a place to store it and a uniqueness +-- guarantee scoped per tenant. A PARTIAL unique index lets the column be +-- NULL for callers that don't supply a key (no dedup, today's behavior) +-- while making (org_id, idempotency_key) collide-and-return for callers +-- that do. + +ALTER TABLE documents + ADD COLUMN IF NOT EXISTS idempotency_key TEXT; + +CREATE UNIQUE INDEX IF NOT EXISTS documents_org_idempotency_key_uidx + ON documents (org_id, idempotency_key) + WHERE idempotency_key IS NOT NULL; diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 776226a..c800cd6 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -73,14 +73,14 @@ func (g *GCS) Get(ctx context.Context, key string) (io.ReadCloser, Metadata, err attrs, err := obj.Attrs(ctx) if err != nil { if errors.Is(err, gcs.ErrObjectNotExist) { - return nil, Metadata{}, ErrNotFound + return nil, Metadata{}, fmt.Errorf("%w: gs://%s/%s", ErrNotFound, g.cfg.Bucket, key) } return nil, Metadata{}, fmt.Errorf("gcs storage: attrs %q: %w", key, err) } rc, err := obj.NewReader(ctx) if err != nil { if errors.Is(err, gcs.ErrObjectNotExist) { - return nil, Metadata{}, ErrNotFound + return nil, Metadata{}, fmt.Errorf("%w: gs://%s/%s", ErrNotFound, g.cfg.Bucket, key) } return nil, Metadata{}, fmt.Errorf("gcs storage: get %q: %w", key, err) } @@ -98,7 +98,7 @@ func (g *GCS) Get(ctx context.Context, key string) (io.ReadCloser, Metadata, err func (g *GCS) Delete(ctx context.Context, key string) error { err := g.bucket.Object(key).Delete(ctx) if errors.Is(err, gcs.ErrObjectNotExist) { - return ErrNotFound + return fmt.Errorf("%w: gs://%s/%s", ErrNotFound, g.cfg.Bucket, key) } if err != nil { return fmt.Errorf("gcs storage: delete %q: %w", key, err) diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 82faba2..6482507 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -7,6 +7,7 @@ import ( "io" "os" "path/filepath" + "runtime" "time" ) @@ -77,32 +78,62 @@ func (l *Local) Put(ctx context.Context, key string, r io.Reader, _ Metadata) er func (l *Local) Get(ctx context.Context, key string) (io.ReadCloser, Metadata, error) { full := l.path(key) - info, err := os.Stat(full) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - // Wrap ErrNotFound (errors.Is still matches) but carry the resolved - // absolute path so the failure is self-diagnosing: a caller seeing - // this in a log can immediately tell whether it looked in the wrong - // root vs. the bytes genuinely being absent. - return nil, Metadata{}, fmt.Errorf("%w: %s", ErrNotFound, full) - } - return nil, Metadata{}, err + + // On Windows, Defender real-time protection holds a brief lock on a + // freshly-written file while it scans it; during that window GetFileAttributes + // (os.Stat) and CreateFile (os.Open) return ERROR_FILE_NOT_FOUND on a file + // that IS on disk (HAL-321/HAL-323). The same flip can hit between our Stat + // and Open (a TOCTOU race). A short bounded retry here closes that window + // transparently so the caller never sees the transient as a hard not-found. + // On non-Windows there is no such hold, so we make a single pass. + const winRetries = 8 + attempts := 1 + if runtime.GOOS == "windows" { + attempts = winRetries } - f, err := os.Open(full) - if err != nil { - return nil, Metadata{}, err + + for i := 0; i < attempts; i++ { + info, err := os.Stat(full) + if err == nil { + f, openErr := os.Open(full) + if openErr == nil { + return f, Metadata{ + Key: key, + Size: info.Size(), + ModifiedAt: info.ModTime(), + }, nil + } + // Open can fail with not-exist even though Stat just succeeded — the + // Windows scan window flipped between the two calls. Treat it like a + // transient stat miss and retry; surface anything else immediately. + if !errors.Is(openErr, os.ErrNotExist) { + return nil, Metadata{}, openErr + } + } else if !errors.Is(err, os.ErrNotExist) { + return nil, Metadata{}, err + } + + if i < attempts-1 { + select { + case <-ctx.Done(): + return nil, Metadata{}, ctx.Err() + case <-time.After(time.Duration(i+1) * 50 * time.Millisecond): + } + } } - return f, Metadata{ - Key: key, - Size: info.Size(), - ModifiedAt: info.ModTime(), - }, nil + + // Wrap ErrNotFound (errors.Is still matches) but carry the resolved absolute + // path so the failure is self-diagnosing: a caller seeing this in a log can + // immediately tell whether it looked in the wrong root vs. the bytes + // genuinely being absent. + return nil, Metadata{}, fmt.Errorf("%w: %s", ErrNotFound, full) } func (l *Local) Delete(ctx context.Context, key string) error { - err := os.Remove(l.path(key)) + full := l.path(key) + err := os.Remove(full) if errors.Is(err, os.ErrNotExist) { - return ErrNotFound + return fmt.Errorf("%w: %s", ErrNotFound, full) } return err } diff --git a/pkg/storage/local_test.go b/pkg/storage/local_test.go index 046ed8e..cd2f25c 100644 --- a/pkg/storage/local_test.go +++ b/pkg/storage/local_test.go @@ -7,8 +7,10 @@ import ( "io" "os" "path/filepath" + "runtime" "strings" "testing" + "time" ) func TestLocalResolvesRootToAbsolute(t *testing.T) { @@ -45,6 +47,67 @@ func TestLocalGetNotFoundCarriesPath(t *testing.T) { } } +func TestLocalDeleteNotFoundCarriesPath(t *testing.T) { + // Delete of a missing key must also report ErrNotFound WITH the resolved + // path. Previously it returned the bare sentinel, which is indistinguishable + // in a DB error column from a Get miss — exactly the HAL-323 ambiguity where + // pathless "object not found" rows could not be attributed to a code path. + l, err := NewLocal(t.TempDir()) + if err != nil { + t.Fatalf("NewLocal: %v", err) + } + err = l.Delete(context.Background(), "documents/missing/source.pdf") + if !errors.Is(err, ErrNotFound) { + t.Fatalf("err = %v, want ErrNotFound", err) + } + if !strings.Contains(err.Error(), "source.pdf") { + t.Fatalf("not-found error %q should include the resolved path", err) + } +} + +func TestLocalGetRetriesTransientNotExist(t *testing.T) { + // Simulate the Windows Defender scan window: the source does not exist when + // Get is first called, then appears shortly after. On Windows the bounded + // internal retry (cumulative ~1.4s) must ride through the gap so the FIRST + // Get returns the bytes — no caller-side fallback. This test fails if the + // Windows retry loop is removed or shortened below the write delay, because + // a single-pass Get would return ErrNotFound at t≈0. Off Windows there is no + // scan-hold and Get makes a single pass, so we wait for the writer first. + l, err := NewLocal(t.TempDir()) + if err != nil { + t.Fatalf("NewLocal: %v", err) + } + ctx := context.Background() + key := "documents/doc-late/source.pdf" + want := []byte("appears late") + + done := make(chan struct{}) + go func() { + // Write the file a beat after Get starts polling — well within the + // Windows retry budget but after the first attempt would have missed. + time.Sleep(80 * time.Millisecond) + _ = l.Put(ctx, key, bytes.NewReader(want), Metadata{}) + close(done) + }() + + if runtime.GOOS != "windows" { + <-done // single-pass platforms: ensure the file is present first + } + + // On Windows this is the assertion that matters: the internal retry must + // make this succeed despite the file being absent when the call began. + rc, _, err := l.Get(ctx, key) + if err != nil { + t.Fatalf("Get rode through the write window? err = %v (want nil — retry should cover the %v file-appear delay)", err, 80*time.Millisecond) + } + defer func() { _ = rc.Close() }() + got, _ := io.ReadAll(rc) + if !bytes.Equal(got, want) { + t.Fatalf("round-trip = %q, want %q", got, want) + } + <-done // avoid leaking the writer goroutine +} + func TestLocalPutThenGetRoundTrip(t *testing.T) { l, err := NewLocal(t.TempDir()) if err != nil { diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 1796164..3506901 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -114,7 +114,7 @@ func (s *S3) Get(ctx context.Context, key string) (io.ReadCloser, Metadata, erro }) if err != nil { if isNotFound(err) { - return nil, Metadata{}, ErrNotFound + return nil, Metadata{}, fmt.Errorf("%w: s3://%s/%s", ErrNotFound, s.cfg.Bucket, key) } return nil, Metadata{}, fmt.Errorf("s3 storage: get %q: %w", key, err) } @@ -144,7 +144,7 @@ func (s *S3) Delete(ctx context.Context, key string) error { return err } if !exists { - return ErrNotFound + return fmt.Errorf("%w: s3://%s/%s", ErrNotFound, s.cfg.Bucket, key) } if _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(s.cfg.Bucket),