Skip to content

Idempotent ingest + self-diagnosing not-found (HAL-323)#49

Merged
hallelx2 merged 3 commits into
mainfrom
halleluyaholudele/hal-323-bulk-ingest-still-flaky-on-windows-bench-re-uploads-each-doc
Jun 20, 2026
Merged

Idempotent ingest + self-diagnosing not-found (HAL-323)#49
hallelx2 merged 3 commits into
mainfrom
halleluyaholudele/hal-323-bulk-ingest-still-flaky-on-windows-bench-re-uploads-each-doc

Conversation

@hallelx2

@hallelx2 hallelx2 commented Jun 20, 2026

Copy link
Copy Markdown
Owner

Why

Continuation of HAL-321. A full 20-doc FinanceBench run on Windows still ended with duplicate documents (the same corpus doc ingested up to 6×) and unexplained pathless object not found failure rows. This fixes both root causes.

Root causes

Duplicate uploads. The SDK transport wraps the ingest POST in sync_retry, which re-POSTs on any transient exception (or 5xx/408). With no server-side idempotency, a connection reset that lands after the server already committed the document made the SDK create a brand-new doc_id + source + parse job on every retry. The SDK already sends an optional Idempotency-Key header ("Prevents duplicate ingestion") — the engine just ignored it.

Pathless object not found. storage.Local.Get always appends the resolved path, yet the DB had pathless rows. The other backends (and Delete) returned the bare ErrNotFound sentinel, and local.Get's open-after-stat branch returned a raw error that getSourceWithRetry would not retry. The pathless rows were pre-path-wrap leftovers; this makes every not-found self-attributing so it can't recur ambiguously.

What changed

  • Idempotencyidempotency_key column + partial unique index on (org_id, idempotency_key) (migration 0007). All three ingest handlers (standalone internal/api, multi-tenant REST internal/handler, Connect RPC internal/connecthandler) look up an existing doc by key and return it instead of creating a duplicate; the concurrent same-key race drops the orphan source and returns the winner. db.ErrConflict mapped from SQLSTATE 23505; GetDocumentByIdempotencyKey.
  • Storage — every ErrNotFound now carries the resolved key/path (local Get+Delete, s3 Get+Delete, gcs Get+Delete). local.Get wraps the open-after-stat miss as retryable ErrNotFound and adds a Windows-only bounded retry to ride through the Defender scan window that transiently hides freshly-written files.
  • Docs — OpenAPI documents the Idempotency-Key header; README documents the mandatory Windows Defender exclusion for the storage root.

Verification

  • go build / go vet clean; full test suite green, including new pkg/db integration tests against live Postgres (VLE_TEST_DATABASE_URL) and pkg/storage Windows-retry/path tests.
  • End-to-end: three POST /v1/documents with one Idempotency-Key all return the same document_id (reflecting live pending→ready status); a keyless control gets its own id; exactly one row per key in the DB.

Companion

vectorless-bench now sends a stable vlbench:{org}:{doc_id} idempotency key so all its setup retries dedup (separate PR).

Remaining (separate, infra/time-bound): Linux/CI full-run repro (AC3) and the canonical judged 3-way re-run + headline numbers (AC4).

Closes HAL-323

Summary by Sourcery

Implement idempotent document ingestion and enrich storage not-found errors with full paths to aid diagnosis.

New Features:

  • Add optional per-org idempotency keys for document ingestion across REST, standalone, and Connect APIs, reusing existing documents on repeated requests.
  • Document Idempotency-Key support in the OpenAPI spec and README, including Windows-specific guidance for local storage.

Enhancements:

  • Propagate idempotency keys into the database schema with a partial unique index and conflict mapping to prevent duplicate document rows.
  • Make local, S3, and GCS storage backends include resolved object paths in ErrNotFound errors for clearer diagnostics.
  • Introduce Windows-specific bounded retries in the local storage backend to mask transient Defender-induced file-not-found races.

Tests:

  • Add Postgres-backed integration tests validating idempotent ingest deduplication and keyless duplicate allowance.
  • Extend local storage tests to cover path-bearing not-found errors and Windows-style transient not-exist retries.

Summary by CodeRabbit

  • New Features

    • Added idempotent document uploads via Idempotency-Key header to prevent duplicate processing of repeated requests.
  • Bug Fixes

    • Improved Windows compatibility with retry logic for transient file-not-found errors caused by Windows Defender real-time protection.
    • Enhanced error messages to include storage object paths for better debugging.
  • Documentation

    • Added Windows Defender configuration guidance to README with recommended exclusion settings.
    • Updated API documentation for new idempotent upload support.

Two-part fix for flaky bulk ingestion and the bench's duplicate uploads.

Idempotent ingest (root cause of "same doc ingested up to 6x"):
- New `idempotency_key` column on documents + partial unique index on
  (org_id, idempotency_key) — migration 0007. NULL key = no dedup
  (preserves current behavior for callers that don't opt in).
- All three ingest handlers (standalone/local in internal/api, multi-tenant
  REST in internal/handler, Connect RPC in internal/connecthandler) honor the
  `Idempotency-Key` header the SDK already sends: a repeat ingest returns the
  original document instead of creating a duplicate, and the concurrent
  same-key race is resolved by dropping the orphan source and returning the
  winner. The SDK retry-on-transient (which re-POSTs after a reset that landed
  AFTER the server committed) is now a no-op.
- db: ErrConflict sentinel mapped from SQLSTATE 23505; GetDocumentByIdempotencyKey.

Self-diagnosing not-found (resolves the pathless "object not found" mystery):
- Every storage backend's ErrNotFound now carries the resolved key/path
  (local Get+Delete, s3 Get+Delete, gcs Get+Delete) so a failure can always be
  attributed to a code path — pathless rows were pre-path-wrap leftovers.
- local.Get: the open-after-stat result is wrapped as retryable ErrNotFound
  (previously a raw error getSourceWithRetry would NOT retry), and a Windows-only
  bounded retry rides through the Defender scan window that transiently hides a
  freshly-written file.

Docs: OpenAPI documents the Idempotency-Key header; README documents the
mandatory Windows Defender exclusion for the storage root.

Verified: go build/vet clean; storage + db (live-Postgres) tests pass; and an
end-to-end double-POST with one Idempotency-Key returns one document_id while a
keyless control gets its own.
@sourcery-ai

sourcery-ai Bot commented Jun 20, 2026

Copy link
Copy Markdown

Reviewer's Guide

Implements idempotent document ingestion across all ingest paths using an Idempotency-Key stored in the database, adds conflict detection and handling for duplicate ingests, and makes all storage backends return self-diagnosing not-found errors (including a Windows-specific retry for transient local file disappears), with tests, migration, and documentation updates.

Sequence diagram for idempotent document ingest with Idempotency-Key

sequenceDiagram
    actor Client
    participant DocumentsHandler
    participant DB
    participant Storage

    Client->>DocumentsHandler: HandleIngestDocument
    DocumentsHandler->>DB: GetDocumentByIdempotencyKey
    alt [existing document]
        DB-->>DocumentsHandler: Document
        DocumentsHandler-->>Client: HTTP 202 document_id,status
    else [no existing document]
        DocumentsHandler->>Storage: Put
        DocumentsHandler->>DB: NewDocument
        alt [insert succeeds]
            DB-->>DocumentsHandler: ok
            DocumentsHandler-->>Client: HTTP 202 new document_id,status
        else [ErrConflict]
            DB-->>DocumentsHandler: ErrConflict
            DocumentsHandler->>Storage: Delete
            DocumentsHandler->>DB: GetDocumentByIdempotencyKey
            DB-->>DocumentsHandler: winner Document
            DocumentsHandler-->>Client: HTTP 202 winner document_id,status
        end
    end
Loading

Entity relationship diagram for documents idempotency_key support

erDiagram
    documents {
        text id
        text org_id
        text store_id
        text idempotency_key
    }
Loading

Flow diagram for Local.Get Windows retry and self-diagnosing not-found

flowchart TD
    A[Local.Get key] --> B[full = l.path key]
    B --> C[attempts = 1 or winRetries based on runtime.GOOS]
    C --> D{for i in 0..attempts-1}
    D --> E[os.Stat full]
    E -->|err is nil| F[os.Open full]
    E -->|err not os.ErrNotExist| I[return err]
    E -->|err os.ErrNotExist| H[maybe retry]
    F -->|open ok| G[return reader and Metadata]
    F -->|openErr not os.ErrNotExist| I[return openErr]
    F -->|openErr os.ErrNotExist| H
    H -->|i < attempts-1| J[backoff with time.After]
    J --> D
    H -->|i == attempts-1| K[wrap ErrNotFound with full path]
    K --> L[return wrapped ErrNotFound]
    I --> L
Loading

File-Level Changes

Change Details Files
Add database-level idempotency support for documents and expose lookup by Idempotency-Key.
  • Extend Document model with IdempotencyKey field and persist it as nullable idempotency_key column.
  • Introduce GetDocumentByIdempotencyKey for (org_id, idempotency_key) lookups returning existing documents.
  • Map Postgres unique-constraint violations (SQLSTATE 23505) to a new ErrConflict error via mapErr.
  • Add migration 0007 to add idempotency_key column and a partial unique index on (org_id, idempotency_key) where key is not null.
  • Add integration tests verifying deduplication with keys and that keyless ingests still allow duplicates.
pkg/db/documents.go
pkg/db/db.go
pkg/db/idempotency_integration_test.go
pkg/db/migrations/0007_documents_idempotency_key.up.sql
pkg/db/migrations/0007_documents_idempotency_key.down.sql
Make all ingest handlers (standalone, multi-tenant REST, and Connect RPC) idempotent using Idempotency-Key and handle concurrent conflicts gracefully.
  • Read Idempotency-Key from HTTP/Connect headers in all ingest endpoints and short-circuit to existing documents via GetDocumentByIdempotencyKey.
  • On insert conflict with same Idempotency-Key, delete the newly written source object and return the already existing document instead of 500.
  • Populate IdempotencyKey when creating new Document records from ingest handlers.
internal/api/server.go
internal/handler/documents.go
internal/connecthandler/documents.go
Improve local storage Get/Delete to provide pathful ErrNotFound and add Windows-specific retry for transient not-exist races.
  • Refactor Local.Get to perform a bounded retry loop (Windows only) over os.Stat/os.Open to ride through Defender-induced transient not-found conditions.
  • Always wrap final not-found from Local.Get in ErrNotFound including the resolved absolute path.
  • Change Local.Delete to include the full resolved path in ErrNotFound.
  • Add tests to verify Delete not-found includes path and that Get can handle late-appearing files with/without Windows retry.
pkg/storage/local.go
pkg/storage/local_test.go
Make cloud storage backends (S3 and GCS) return ErrNotFound annotated with bucket/key for easier diagnosis.
  • Wrap not-found errors from GCS Get/Delete with ErrNotFound plus gs://bucket/key.
  • Wrap not-found errors from S3 Get/Delete with ErrNotFound plus s3://bucket/key.
pkg/storage/gcs.go
pkg/storage/s3.go
Document idempotent ingest and Windows Defender considerations in public docs.
  • Extend OpenAPI spec for POST /v1/documents with Idempotency-Key header parameter description and behavior.
  • Update README with guidance on Windows Defender exclusions for the local storage root and explanation of transient object-not-found behavior.
openapi.yaml
README.md

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@coderabbitai

coderabbitai Bot commented Jun 20, 2026

Copy link
Copy Markdown

Review Change Stack

Warning

Review limit reached

@hallelx2, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 41 minutes and 1 second. Learn how PR review limits work.

Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file).

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based credits.

🚦 How do rate limits work?

CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan refill rate.

For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, the refill rate gradually slows as usage increases. The highest same-day bursts are limited more strictly.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 53f13cd4-0c4a-4fa5-9c27-47994829fd80

📥 Commits

Reviewing files that changed from the base of the PR and between a36f6ba and 0ebf0f5.

📒 Files selected for processing (8)
  • README.md
  • internal/api/server.go
  • internal/connecthandler/documents.go
  • internal/handler/documents.go
  • internal/middleware/idempotency.go
  • internal/middleware/idempotency_test.go
  • pkg/db/documents.go
  • pkg/storage/local_test.go
📝 Walkthrough

Walkthrough

The PR adds idempotent document ingestion via an Idempotency-Key HTTP header. A new DB migration adds a nullable idempotency_key column with a partial unique index; a new ErrConflict sentinel maps SQLSTATE 23505; all three handler variants implement pre-insert lookup and conflict-aware orphan cleanup. Storage backends now wrap ErrNotFound with backend URI context, and the local backend gains a Windows-only bounded retry loop for transient file-not-found conditions.

Changes

Idempotent Document Ingestion

Layer / File(s) Summary
DB migration, ErrConflict sentinel, and Document model
pkg/db/migrations/0007_documents_idempotency_key.up.sql, pkg/db/migrations/0007_documents_idempotency_key.down.sql, pkg/db/db.go, pkg/db/documents.go
Migration 0007 adds a nullable idempotency_key column and partial unique index (org_id, idempotency_key) WHERE idempotency_key IS NOT NULL. ErrConflict is introduced as a sentinel for SQLSTATE 23505. Document gains IdempotencyKey; NewDocument conditionally inserts the key or NULL; GetDocumentByIdempotencyKey is added.
Handler idempotency: pre-check, insert, and conflict reconciliation
openapi.yaml, internal/api/server.go, internal/handler/documents.go, internal/connecthandler/documents.go
All three handler variants read Idempotency-Key, return 202 with the existing document on a DB hit, persist the key on insert, and on ErrConflict delete the orphaned storage object then fetch and return the winning document. OpenAPI spec adds the optional Idempotency-Key header parameter.
DB idempotency integration tests
pkg/db/idempotency_integration_test.go
openTestPool helper skips when VLE_TEST_DATABASE_URL is unset. TestIdempotentIngestDedup asserts a second insert with the same (org, key) returns ErrConflict and exactly one row exists. TestIngestWithoutKeyAllowsDuplicates asserts NULL keys allow concurrent duplicates.

Storage ErrNotFound Enrichment and Windows Retry

Layer / File(s) Summary
ErrNotFound context wrapping across GCS, S3, and local Delete
pkg/storage/gcs.go, pkg/storage/s3.go, pkg/storage/local.go
Get and Delete on GCS, S3, and local backends switch from returning bare ErrNotFound to returning a fmt.Errorf-wrapped error containing the backend URI (gs://, s3://, or absolute filesystem path).
Windows-only bounded retry in local.Get
pkg/storage/local.go
local.Get is rewritten with a runtime.GOOS == "windows" guard around a bounded retry loop that retries os.Stat and os.Open on os.ErrNotExist with short backoff, context cancellation, and a final wrapped ErrNotFound.
Storage tests and Windows Defender README note
pkg/storage/local_test.go, README.md
Adds TestLocalDeleteNotFoundCarriesPath (asserts error string includes resolved path) and TestLocalGetRetriesTransientNotExist (goroutine-based transient-file simulation). README documents Windows Defender scan interference and the Add-MpPreference -ExclusionPath workaround.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant Handler
  participant DB
  participant Storage

  Client->>Handler: POST /v1/documents (Idempotency-Key: K)
  Handler->>DB: GetDocumentByIdempotencyKey(orgID, K)
  alt document already exists
    DB-->>Handler: existing Document
    Handler-->>Client: 202 Accepted (document_id, status)
  else not found — proceed with ingest
    Handler->>Storage: write source object
    Handler->>DB: NewDocument(doc, IdempotencyKey=K)
    alt insert succeeds
      DB-->>Handler: ok
      Handler-->>Client: 202 Accepted (new document_id)
    else ErrConflict — concurrent winner
      DB-->>Handler: ErrConflict
      Handler->>Storage: Delete orphaned object
      Handler->>DB: GetDocumentByIdempotencyKey(orgID, K)
      DB-->>Handler: winning Document
      Handler-->>Client: 202 Accepted (winner document_id, status)
    end
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • hallelx2/vectorless-engine#48: Modifies the same pkg/storage/local.go and local_test.go files, touching missing-object error construction and local storage path resolution logic that this PR also extends.

Poem

🐇 A key has arrived, a magical thing,
No more duplicate docs—oh, what joy it will bring!
The Windows Defender once hid files away,
But retries and exclusions have saved the day.
Conflicts resolved, the winner stands tall,
One rabbit, one key, one document for all! 🗝️

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 22.22% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the two main changes: idempotent ingest (preventing duplicate document creation) and self-diagnosing not-found errors (adding path context to storage errors).
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch halleluyaholudele/hal-323-bulk-ingest-still-flaky-on-windows-bench-re-uploads-each-doc

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 6 issues, and left some high level feedback:

  • The idempotency handling logic (lookup before insert, conflict handling, orphan source cleanup, response shaping) is now duplicated across the standalone HTTP, multi-tenant HTTP, and Connect handlers; consider extracting a shared helper to centralize this behavior and reduce the chance of subtle divergence over time.
  • In the Windows-specific retry loop in Local.Get, the backoff and retry count are currently hardcoded; if you expect different environments or workloads, you might want to make these parameters configurable so they can be tuned without code changes.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The idempotency handling logic (lookup before insert, conflict handling, orphan source cleanup, response shaping) is now duplicated across the standalone HTTP, multi-tenant HTTP, and Connect handlers; consider extracting a shared helper to centralize this behavior and reduce the chance of subtle divergence over time.
- In the Windows-specific retry loop in Local.Get, the backoff and retry count are currently hardcoded; if you expect different environments or workloads, you might want to make these parameters configurable so they can be tuned without code changes.

## Individual Comments

### Comment 1
<location path="internal/api/server.go" line_range="324-332" />
<code_context>
 	}

 	if err := d.DB.NewDocument(ctx, db.Document{
-		ID:          docID,
-		OrgID:       standaloneOrgID,
-		Title:       title,
-		ContentType: contentType,
-		SourceRef:   key,
-		Status:      db.StatusPending,
-		ByteSize:    size,
+		ID:             docID,
+		OrgID:          standaloneOrgID,
+		Title:          title,
+		ContentType:    contentType,
+		SourceRef:      key,
+		Status:         db.StatusPending,
+		ByteSize:       size,
+		IdempotencyKey: idemKey,
 	}); err != nil {
+		// A concurrent same-key ingest won the race: drop the orphan source we
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Surface and log failures when cleanup deletes of orphaned sources fail

In the `ErrConflict` branch you call `d.Storage.Delete(ctx, key)` and ignore its error, which can leave dangling objects (e.g., on transient storage failures) with no visibility. Please at least log delete failures (e.g., at warn/error) so operators can detect accumulating orphans, or propagate a wrapped error when the subsequent lookup also fails. The same applies to the multi-tenant and connect handlers; consider handling and logging delete errors consistently across all three for operational clarity.

Suggested implementation:

```golang
		if idemKey != "" && errors.Is(err, db.ErrConflict) {
			if delErr := d.Storage.Delete(ctx, key); delErr != nil {
				// Log cleanup failures so operators can detect accumulating orphaned sources.
				d.Logger.Warn("failed to delete orphaned source after idempotency conflict",
					"org_id", standaloneOrgID,
					"source_ref", key,
					"idempotency_key", idemKey,
					"error", delErr,
				)
			}
			if existing, lookupErr := d.DB.GetDocumentByIdempotencyKey(ctx, standaloneOrgID, idemKey); lookupErr == nil {

```

1. Adjust the logging call to match the logger used in this package (e.g., `d.logger.Warnw`, `d.log.Error`, `log.Printf`, etc.), including any context or fields conventions already present in `internal/api/server.go`.
2. Apply the same pattern (capture `Delete` error and log at warn/error) in the multi-tenant and connect handlers where the same `_ = d.Storage.Delete(ctx, key)` pattern is used for orphan cleanup.
3. If this project prefers error propagation, you may also want to propagate a wrapped error when both the delete and the subsequent `GetDocumentByIdempotencyKey` lookup fail, so callers can distinguish between plain DB conflicts and “conflict + cleanup failure” cases.
</issue_to_address>

### Comment 2
<location path="pkg/db/documents.go" line_range="102-111" />
<code_context>
+func (p *Pool) GetDocumentByIdempotencyKey(ctx context.Context, orgID, key string) (*Document, error) {
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Populate IdempotencyKey from the DB row instead of trusting the input key

The query doesn’t select `idempotency_key`, and the struct is populated with the input `key`. Even though the unique index should keep them aligned, it’s safer and more future-proof to hydrate `IdempotencyKey` from the DB column so the `Document` always reflects the persisted value and stays consistent with how other fields are mapped.

Suggested implementation:

```golang
	row := p.QueryRow(ctx, `
        SELECT id, org_id, store_id, title, content_type, source_ref, status, error_message,
               byte_size, metadata, created_at, updated_at, toc_tree, idempotency_key
        FROM documents WHERE org_id = $1 AND idempotency_key = $2`, orgID, key)

```

Based on the partial snippet, you’ll also need to:

1. Locate the `row.Scan(...)` in `GetDocumentByIdempotencyKey`. Add a destination for `idempotency_key` in the same positional order as added in the SELECT, for example:
   - If you have something like:
     `err := row.Scan(&d.ID, &d.OrgID, &d.StoreID, &d.Title, &d.ContentType, &d.SourceRef, &d.Status, &d.ErrorMessage, &d.ByteSize, &meta, &d.CreatedAt, &d.UpdatedAt, &d.TOCTree)`
   - Change it to:
     `err := row.Scan(&d.ID, &d.OrgID, &d.StoreID, &d.Title, &d.ContentType, &d.SourceRef, &d.Status, &d.ErrorMessage, &d.ByteSize, &meta, &d.CreatedAt, &d.UpdatedAt, &d.TOCTree, &d.IdempotencyKey)`

2. Remove any code that sets `d.IdempotencyKey = key` (or equivalent) so that the value always comes from the database row.

3. Ensure the `Document` struct has an `IdempotencyKey` field with the correct type and JSON/db tags; if it doesn’t, add it and also include it in any other queries that need to hydrate this field.
</issue_to_address>

### Comment 3
<location path="pkg/storage/local_test.go" line_range="68-77" />
<code_context>
+func TestLocalGetRetriesTransientNotExist(t *testing.T) {
</code_context>
<issue_to_address>
**issue (testing):** TestLocalGetRetriesTransientNotExist doesn't actually assert the Windows retry behavior and would still pass if retries were removed

As written, the test only checks for eventual success and would still pass even if there were no retries: on Windows, a hard ErrNotFound on the first Get would still succeed after the file is written via the `if err != nil { <-done; rc, _, err = l.Get(ctx, key) }` path. To actually verify the Defender-related retry behavior, you could either:

- Make the behavior OS-specific: on Windows, require the first Get to succeed (and fail if it returns ErrNotFound or any error), while keeping the current fallback for non-Windows; or
- Add a test hook in Local (e.g., a retry counter) and assert that at least 2 attempts occur on Windows.

This ensures the test fails if the retry loop is removed or shortened.
</issue_to_address>

### Comment 4
<location path="README.md" line_range="167-168" />
<code_context>
 > **Anthropic-compatible gateways (GLM/Z.ai):** `base_url` **must include `/v1`** — the client posts to `${base_url}/messages`.

+> **Windows + local storage:** Windows Defender real-time protection scans
+> each freshly-written file and briefly hides it from `os.Stat`/`os.Open`,
+> which under heavy concurrent ingestion can surface as transient
+> `object not found` errors. The local backend rides through this window with
</code_context>
<issue_to_address>
**nitpick (typo):** Consider removing the hyphen in "freshly-written" for more natural phrasing.

Adverbs ending in -ly usually don’t take a hyphen before the participle, so "freshly written file" is the more standard form.

```suggestion
> **Windows + local storage:** Windows Defender real-time protection scans
> each freshly written file and briefly hides it from `os.Stat`/`os.Open`,
```
</issue_to_address>

### Comment 5
<location path="internal/handler/documents.go" line_range="161" />
<code_context>
 	ctx := r.Context()
 	docID := ingest.NewDocumentID()

+	// Idempotent ingest: if the client supplied an Idempotency-Key and we
+	// already have a document for this org under that key, return it instead
+	// of creating a duplicate. This makes a client/transport retry of the
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the repeated idempotency lookup and response into a shared helper to simplify HandleIngestDocument and avoid duplicated logic.

You can reduce the new branching/duplication by extracting the idempotency lookup + response into a small helper used in both places (pre-insert and conflict path). That keeps behavior identical but flattens `HandleIngestDocument` and avoids repeating the same logic/JSON response.

For example:

```go
func (h *DocumentsHandler) writeIdempotentDocIfExists(
    ctx context.Context,
    w http.ResponseWriter,
    orgID string,
    idemKey string,
) (bool, error) {
    if idemKey == "" {
        return false, nil
    }

    existing, err := h.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey)
    if err != nil {
        if errors.Is(err, db.ErrNotFound) {
            return false, nil
        }
        return false, err
    }

    writeJSON(w, http.StatusAccepted, map[string]any{
        "document_id": existing.ID,
        "status":      string(existing.Status),
    })
    return true, nil
}
```

Then `HandleIngestDocument` becomes:

```go
idemKey := r.Header.Get("Idempotency-Key")
if ok, err := h.writeIdempotentDocIfExists(ctx, w, orgID, idemKey); err != nil {
    h.logger.Error("ingest: idempotency lookup failed", "err", err)
    writeErr(w, http.StatusInternalServerError, "db read failed")
    return
} else if ok {
    return
}
```

And the conflict branch reuses the same helper instead of duplicating the lookup/response:

```go
if err := h.db.NewDocument(ctx, db.Document{
    ID:             docID,
    OrgID:          orgID,
    StoreID:        storeID(r),
    Title:          title,
    ContentType:    contentType,
    SourceRef:      key,
    Status:         db.StatusPending,
    ByteSize:       size,
    IdempotencyKey: idemKey,
}); err != nil {
    if idemKey != "" && errors.Is(err, db.ErrConflict) {
        _ = h.storage.Delete(ctx, key)
        if ok, lookupErr := h.writeIdempotentDocIfExists(ctx, w, orgID, idemKey); lookupErr == nil && ok {
            return
        }
        // fall through to generic error handling on lookup failure
    }
    h.logger.Error("ingest: db insert failed", "err", err)
    writeErr(w, http.StatusInternalServerError, "db write failed")
    return
}
```

This keeps all semantics (including logging, status codes, response shape) while centralizing the idempotency behavior and reducing the handler’s complexity.
</issue_to_address>

### Comment 6
<location path="internal/connecthandler/documents.go" line_range="85" />
<code_context>
 		return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("content is required"))
 	}

+	// Idempotent ingest: a repeat CreateDocument carrying the same
+	// Idempotency-Key returns the original document rather than a duplicate
+	// (mirrors the REST path; closes the HAL-323 duplicate-upload bug for SDK
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the idempotent ingest flow into a shared helper so the CreateDocument handler only wires inputs/outputs and error mapping.

You can reduce the added complexity by extracting the idempotency logic into a shared helper and keeping the Connect handler focused on protocol-specific wiring.

### 1. Extract transport-agnostic idempotent ingest helper

Introduce a helper that encapsulates:

- Pre-insert lookup by idempotency key.
- `NewDocument` insert (with `IdempotencyKey`).
- Conflict resolution with source cleanup and winner lookup.

Example (adapt types/names as needed):

```go
type IdempotentIngestResult struct {
	Document db.Document
	// Reused is true if we returned an existing doc instead of creating a new one.
	Reused   bool
}

func (s *Server) idempotentIngest(
	ctx context.Context,
	orgID string,
	idemKey string,
	doc db.Document,
	putSource func() (sourceKey string, err error),
	deleteSource func(ctx context.Context, key string) error,
) (IdempotentIngestResult, error) {
	if idemKey != "" {
		if existing, err := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); err == nil {
			return IdempotentIngestResult{Document: existing, Reused: true}, nil
		} else if !errors.Is(err, db.ErrNotFound) {
			return IdempotentIngestResult{}, fmt.Errorf("idempotency lookup: %w", err)
		}
	}

	key, err := putSource()
	if err != nil {
		return IdempotentIngestResult{}, fmt.Errorf("storage put: %w", err)
	}
	doc.SourceRef = key
	doc.IdempotencyKey = idemKey

	if err := s.db.NewDocument(ctx, doc); err != nil {
		if idemKey != "" && errors.Is(err, db.ErrConflict) {
			_ = deleteSource(ctx, key)
			if existing, lookupErr := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); lookupErr == nil {
				return IdempotentIngestResult{Document: existing, Reused: true}, nil
			}
		}
		return IdempotentIngestResult{}, fmt.Errorf("db insert: %w", err)
	}

	return IdempotentIngestResult{Document: doc, Reused: false}, nil
}
```

This keeps all current behavior (pre-lookup, conflict handling, source delete) but centralizes it.

### 2. Simplify `CreateDocument` to wiring + error mapping

Then your Connect handler becomes mostly data preparation and error mapping, instead of open-coding the idempotency flow:

```go
idemKey := req.Header().Get("Idempotency-Key")

doc := db.Document{
	ID:          docID,
	OrgID:       orgID,
	StoreID:     storeIDFromConnect(req),
	Title:       title,
	ContentType: contentType,
	Status:      db.StatusPending,
	ByteSize:    size,
	// SourceRef and IdempotencyKey set in helper.
}

res, err := s.idempotentIngest(
	ctx,
	orgID,
	idemKey,
	doc,
	func() (string, error) {
		if err := s.storage.Put(ctx, key, reader, size); err != nil {
			return "", err
		}
		return key, nil
	},
	s.storage.Delete,
)
if err != nil {
	s.logger.Error("ingest: idempotent ingest failed", "err", err)
	return nil, connect.NewError(connect.CodeInternal, errors.New("ingest failed"))
}

return connect.NewResponse(&v1.CreateDocumentResponse{
	DocumentId: string(res.Document.ID),
	Status:     string(res.Document.Status),
}), nil
```

Benefits:

- `CreateDocument` becomes “validate → call helper → build response”.
- Idempotency logic is shared across Connect and REST handlers, so future changes (e.g., new status handling, logging) are done in one place.
- The conflict + delete + re-lookup pattern is no longer duplicated and harder to get subtly out of sync.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread internal/api/server.go
Comment thread pkg/db/documents.go Outdated
Comment thread pkg/storage/local_test.go
Comment on lines +68 to +77
func TestLocalGetRetriesTransientNotExist(t *testing.T) {
// Simulate the Windows Defender scan window: the source does not exist when
// Get is first called, then appears shortly after. On Windows the bounded
// internal retry must ride through the gap and return the bytes rather than a
// hard not-found. (On non-Windows there is a single pass, so this only
// asserts the eventual success once the file is present.)
l, err := NewLocal(t.TempDir())
if err != nil {
t.Fatalf("NewLocal: %v", err)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (testing): TestLocalGetRetriesTransientNotExist doesn't actually assert the Windows retry behavior and would still pass if retries were removed

As written, the test only checks for eventual success and would still pass even if there were no retries: on Windows, a hard ErrNotFound on the first Get would still succeed after the file is written via the if err != nil { <-done; rc, _, err = l.Get(ctx, key) } path. To actually verify the Defender-related retry behavior, you could either:

  • Make the behavior OS-specific: on Windows, require the first Get to succeed (and fail if it returns ErrNotFound or any error), while keeping the current fallback for non-Windows; or
  • Add a test hook in Local (e.g., a retry counter) and assert that at least 2 attempts occur on Windows.

This ensures the test fails if the retry loop is removed or shortened.

Comment thread README.md Outdated
ctx := r.Context()
docID := ingest.NewDocumentID()

// Idempotent ingest: if the client supplied an Idempotency-Key and we

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the repeated idempotency lookup and response into a shared helper to simplify HandleIngestDocument and avoid duplicated logic.

You can reduce the new branching/duplication by extracting the idempotency lookup + response into a small helper used in both places (pre-insert and conflict path). That keeps behavior identical but flattens HandleIngestDocument and avoids repeating the same logic/JSON response.

For example:

func (h *DocumentsHandler) writeIdempotentDocIfExists(
    ctx context.Context,
    w http.ResponseWriter,
    orgID string,
    idemKey string,
) (bool, error) {
    if idemKey == "" {
        return false, nil
    }

    existing, err := h.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey)
    if err != nil {
        if errors.Is(err, db.ErrNotFound) {
            return false, nil
        }
        return false, err
    }

    writeJSON(w, http.StatusAccepted, map[string]any{
        "document_id": existing.ID,
        "status":      string(existing.Status),
    })
    return true, nil
}

Then HandleIngestDocument becomes:

idemKey := r.Header.Get("Idempotency-Key")
if ok, err := h.writeIdempotentDocIfExists(ctx, w, orgID, idemKey); err != nil {
    h.logger.Error("ingest: idempotency lookup failed", "err", err)
    writeErr(w, http.StatusInternalServerError, "db read failed")
    return
} else if ok {
    return
}

And the conflict branch reuses the same helper instead of duplicating the lookup/response:

if err := h.db.NewDocument(ctx, db.Document{
    ID:             docID,
    OrgID:          orgID,
    StoreID:        storeID(r),
    Title:          title,
    ContentType:    contentType,
    SourceRef:      key,
    Status:         db.StatusPending,
    ByteSize:       size,
    IdempotencyKey: idemKey,
}); err != nil {
    if idemKey != "" && errors.Is(err, db.ErrConflict) {
        _ = h.storage.Delete(ctx, key)
        if ok, lookupErr := h.writeIdempotentDocIfExists(ctx, w, orgID, idemKey); lookupErr == nil && ok {
            return
        }
        // fall through to generic error handling on lookup failure
    }
    h.logger.Error("ingest: db insert failed", "err", err)
    writeErr(w, http.StatusInternalServerError, "db write failed")
    return
}

This keeps all semantics (including logging, status codes, response shape) while centralizing the idempotency behavior and reducing the handler’s complexity.

return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("content is required"))
}

// Idempotent ingest: a repeat CreateDocument carrying the same

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the idempotent ingest flow into a shared helper so the CreateDocument handler only wires inputs/outputs and error mapping.

You can reduce the added complexity by extracting the idempotency logic into a shared helper and keeping the Connect handler focused on protocol-specific wiring.

1. Extract transport-agnostic idempotent ingest helper

Introduce a helper that encapsulates:

  • Pre-insert lookup by idempotency key.
  • NewDocument insert (with IdempotencyKey).
  • Conflict resolution with source cleanup and winner lookup.

Example (adapt types/names as needed):

type IdempotentIngestResult struct {
	Document db.Document
	// Reused is true if we returned an existing doc instead of creating a new one.
	Reused   bool
}

func (s *Server) idempotentIngest(
	ctx context.Context,
	orgID string,
	idemKey string,
	doc db.Document,
	putSource func() (sourceKey string, err error),
	deleteSource func(ctx context.Context, key string) error,
) (IdempotentIngestResult, error) {
	if idemKey != "" {
		if existing, err := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); err == nil {
			return IdempotentIngestResult{Document: existing, Reused: true}, nil
		} else if !errors.Is(err, db.ErrNotFound) {
			return IdempotentIngestResult{}, fmt.Errorf("idempotency lookup: %w", err)
		}
	}

	key, err := putSource()
	if err != nil {
		return IdempotentIngestResult{}, fmt.Errorf("storage put: %w", err)
	}
	doc.SourceRef = key
	doc.IdempotencyKey = idemKey

	if err := s.db.NewDocument(ctx, doc); err != nil {
		if idemKey != "" && errors.Is(err, db.ErrConflict) {
			_ = deleteSource(ctx, key)
			if existing, lookupErr := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); lookupErr == nil {
				return IdempotentIngestResult{Document: existing, Reused: true}, nil
			}
		}
		return IdempotentIngestResult{}, fmt.Errorf("db insert: %w", err)
	}

	return IdempotentIngestResult{Document: doc, Reused: false}, nil
}

This keeps all current behavior (pre-lookup, conflict handling, source delete) but centralizes it.

2. Simplify CreateDocument to wiring + error mapping

Then your Connect handler becomes mostly data preparation and error mapping, instead of open-coding the idempotency flow:

idemKey := req.Header().Get("Idempotency-Key")

doc := db.Document{
	ID:          docID,
	OrgID:       orgID,
	StoreID:     storeIDFromConnect(req),
	Title:       title,
	ContentType: contentType,
	Status:      db.StatusPending,
	ByteSize:    size,
	// SourceRef and IdempotencyKey set in helper.
}

res, err := s.idempotentIngest(
	ctx,
	orgID,
	idemKey,
	doc,
	func() (string, error) {
		if err := s.storage.Put(ctx, key, reader, size); err != nil {
			return "", err
		}
		return key, nil
	},
	s.storage.Delete,
)
if err != nil {
	s.logger.Error("ingest: idempotent ingest failed", "err", err)
	return nil, connect.NewError(connect.CodeInternal, errors.New("ingest failed"))
}

return connect.NewResponse(&v1.CreateDocumentResponse{
	DocumentId: string(res.Document.ID),
	Status:     string(res.Document.Status),
}), nil

Benefits:

  • CreateDocument becomes “validate → call helper → build response”.
  • Idempotency logic is shared across Connect and REST handlers, so future changes (e.g., new status handling, logging) are done in one place.
  • The conflict + delete + re-lookup pattern is no longer duplicated and harder to get subtly out of sync.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/connecthandler/documents.go`:
- Around line 137-139: The storage.Delete call in the conflict handling block is
silently ignoring cleanup errors by using the blank identifier, which allows
orphan objects to accumulate undetected during repeated conflict races. Capture
the error returned by s.storage.Delete(ctx, key) instead of discarding it with
underscore, and handle it appropriately by logging it or taking other corrective
action to ensure cleanup failures are visible and traceable rather than silently
lost.

In `@internal/handler/documents.go`:
- Around line 161-179: The idempotency middleware is caching responses based
solely on the Idempotency-Key header, which allows cross-org response leakage
where two organizations using the same key can receive a cached 202 response
from another org. You need to locate the mounted idempotency middleware
implementation and modify its cache key composition to include the org_id or
tenant identifier along with the Idempotency-Key header value, ensuring that the
middleware respects the same (org_id, idempotency_key) scoping that the handler
code in GetDocumentByIdempotencyKey enforces.

In `@pkg/storage/local_test.go`:
- Around line 95-101: The unconditional fallback to the second Get call in the
error handling block at lines 95-101 allows the test to pass even when the
Windows retry path is broken, masking potential regressions. Remove or properly
condition the fallback Get call that reads the key after detecting an error from
the done channel, ensuring the test actually validates that the Local.Get retry
behavior functions correctly on Windows rather than bypassing the failure with a
direct read fallback.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 93c8aa6a-d051-4ee7-a6c3-214b90e3b1f6

📥 Commits

Reviewing files that changed from the base of the PR and between ebb9c10 and a36f6ba.

📒 Files selected for processing (14)
  • README.md
  • internal/api/server.go
  • internal/connecthandler/documents.go
  • internal/handler/documents.go
  • openapi.yaml
  • pkg/db/db.go
  • pkg/db/documents.go
  • pkg/db/idempotency_integration_test.go
  • pkg/db/migrations/0007_documents_idempotency_key.down.sql
  • pkg/db/migrations/0007_documents_idempotency_key.up.sql
  • pkg/storage/gcs.go
  • pkg/storage/local.go
  • pkg/storage/local_test.go
  • pkg/storage/s3.go

Comment thread internal/connecthandler/documents.go
Comment thread internal/handler/documents.go
Comment thread pkg/storage/local_test.go Outdated
…an cleanup

- middleware.Idempotency: compose the cache key as method|path|org|key instead
  of the bare Idempotency-Key, so two tenants reusing the same opaque key can't
  receive each other's cached 202 (cross-tenant leak). New tests cover same-org
  replay, cross-org isolation, and keyless pass-through. (CodeRabbit, Major)
- All three ingest handlers now log orphan-source cleanup failures on the
  idempotency-conflict path instead of discarding the Delete error. (Sourcery +
  CodeRabbit)
- GetDocumentByIdempotencyKey hydrates IdempotencyKey from the DB column.
- Strengthen TestLocalGetRetriesTransientNotExist: on Windows it now asserts the
  FIRST Get succeeds, so the test fails if the retry loop is removed. (Sourcery)
- README: "freshly written" typo.
@hallelx2

Copy link
Copy Markdown
Owner Author

Thanks @coderabbitai and @sourcery-ai — triaged and addressed in 0ebf0f5:

  • Cross-tenant idempotency replay (Major): fixed. middleware.Idempotency now keys the cache by method|path|org|key instead of the bare Idempotency-Key, so two orgs reusing the same opaque key no longer collide. Added internal/middleware/idempotency_test.go covering same-org replay, cross-org isolation, and keyless pass-through.
  • Orphan cleanup errors ignored: fixed in all three ingest handlers — the conflict-path Storage.Delete failure is now logged at warn with source_ref + idempotency_key.
  • Hydrate IdempotencyKey from the DB column: done (GetDocumentByIdempotencyKey now selects + scans the column).
  • Windows-retry test was a no-op assertion: strengthened — on Windows it now asserts the first Get succeeds, so it fails if the retry loop is removed/shortened.
  • README "freshly-written": fixed.

Deliberately not doing the writeIdempotentDocIfExists helper extraction (sourcery, documents.go:161): the three handlers live in different packages with different receiver types (Deps, DocumentsHandler, DocumentsService) and different response encodings (chi JSON vs Connect proto), so a shared helper would need an awkward seam for marginal gain. The duplication is ~6 lines each and reads clearly in place.

@coderabbitai

coderabbitai Bot commented Jun 20, 2026

Copy link
Copy Markdown

Tip

For best results, initiate chat on the files or code changes.

@hallelx2 — excellent work addressing all the flagged items. Everything looks thorough:

  • Cross-tenant isolation — keying on method|path|org|key is the right fix; the three-case test matrix (same-org replay, cross-org isolation, keyless pass-through) covers the interesting edge cases well.
  • Orphan cleanup logging — warn-level with source_ref + idempotency_key gives operators enough context to reconcile manually if needed.
  • IdempotencyKey hydration — good catch; a field that's silently zero-valued in the returned struct is a subtle bug waiting to surface.
  • Windows-retry assertion — the strengthened assertion (first Get must succeed) means the test will actually catch a broken retry loop rather than silently passing.
  • Helper extraction — your reasoning is solid. Three different packages, three different receiver types, three different response-encoding paths: a shared helper would either need a wide interface or a callback, both of which add noise for ~6 lines of duplicated but clear code. Agreed.

🐇 🎉


If you found this review helpful, would you consider giving us a shout-out on X?

Thank you for using CodeRabbit!

Copy link
Copy Markdown
Owner Author

@jules run your own review

@hallelx2 hallelx2 merged commit 42c34fc into main Jun 20, 2026
8 checks passed
@hallelx2 hallelx2 deleted the halleluyaholudele/hal-323-bulk-ingest-still-flaky-on-windows-bench-re-uploads-each-doc branch June 20, 2026 14:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant