Idempotent ingest + self-diagnosing not-found (HAL-323)#49
Conversation
Two-part fix for flaky bulk ingestion and the bench's duplicate uploads. Idempotent ingest (root cause of "same doc ingested up to 6x"): - New `idempotency_key` column on documents + partial unique index on (org_id, idempotency_key) — migration 0007. NULL key = no dedup (preserves current behavior for callers that don't opt in). - All three ingest handlers (standalone/local in internal/api, multi-tenant REST in internal/handler, Connect RPC in internal/connecthandler) honor the `Idempotency-Key` header the SDK already sends: a repeat ingest returns the original document instead of creating a duplicate, and the concurrent same-key race is resolved by dropping the orphan source and returning the winner. The SDK retry-on-transient (which re-POSTs after a reset that landed AFTER the server committed) is now a no-op. - db: ErrConflict sentinel mapped from SQLSTATE 23505; GetDocumentByIdempotencyKey. Self-diagnosing not-found (resolves the pathless "object not found" mystery): - Every storage backend's ErrNotFound now carries the resolved key/path (local Get+Delete, s3 Get+Delete, gcs Get+Delete) so a failure can always be attributed to a code path — pathless rows were pre-path-wrap leftovers. - local.Get: the open-after-stat result is wrapped as retryable ErrNotFound (previously a raw error getSourceWithRetry would NOT retry), and a Windows-only bounded retry rides through the Defender scan window that transiently hides a freshly-written file. Docs: OpenAPI documents the Idempotency-Key header; README documents the mandatory Windows Defender exclusion for the storage root. Verified: go build/vet clean; storage + db (live-Postgres) tests pass; and an end-to-end double-POST with one Idempotency-Key returns one document_id while a keyless control gets its own.
Reviewer's GuideImplements idempotent document ingestion across all ingest paths using an Idempotency-Key stored in the database, adds conflict detection and handling for duplicate ingests, and makes all storage backends return self-diagnosing not-found errors (including a Windows-specific retry for transient local file disappears), with tests, migration, and documentation updates. Sequence diagram for idempotent document ingest with Idempotency-KeysequenceDiagram
actor Client
participant DocumentsHandler
participant DB
participant Storage
Client->>DocumentsHandler: HandleIngestDocument
DocumentsHandler->>DB: GetDocumentByIdempotencyKey
alt [existing document]
DB-->>DocumentsHandler: Document
DocumentsHandler-->>Client: HTTP 202 document_id,status
else [no existing document]
DocumentsHandler->>Storage: Put
DocumentsHandler->>DB: NewDocument
alt [insert succeeds]
DB-->>DocumentsHandler: ok
DocumentsHandler-->>Client: HTTP 202 new document_id,status
else [ErrConflict]
DB-->>DocumentsHandler: ErrConflict
DocumentsHandler->>Storage: Delete
DocumentsHandler->>DB: GetDocumentByIdempotencyKey
DB-->>DocumentsHandler: winner Document
DocumentsHandler-->>Client: HTTP 202 winner document_id,status
end
end
Entity relationship diagram for documents idempotency_key supporterDiagram
documents {
text id
text org_id
text store_id
text idempotency_key
}
Flow diagram for Local.Get Windows retry and self-diagnosing not-foundflowchart TD
A[Local.Get key] --> B[full = l.path key]
B --> C[attempts = 1 or winRetries based on runtime.GOOS]
C --> D{for i in 0..attempts-1}
D --> E[os.Stat full]
E -->|err is nil| F[os.Open full]
E -->|err not os.ErrNotExist| I[return err]
E -->|err os.ErrNotExist| H[maybe retry]
F -->|open ok| G[return reader and Metadata]
F -->|openErr not os.ErrNotExist| I[return openErr]
F -->|openErr os.ErrNotExist| H
H -->|i < attempts-1| J[backoff with time.After]
J --> D
H -->|i == attempts-1| K[wrap ErrNotFound with full path]
K --> L[return wrapped ErrNotFound]
I --> L
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Warning Review limit reached
More reviews will be available in 41 minutes and 1 second. Learn how PR review limits work. Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file). ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based credits. 🚦 How do rate limits work?CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan refill rate. For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, the refill rate gradually slows as usage increases. The highest same-day bursts are limited more strictly. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (8)
📝 WalkthroughWalkthroughThe PR adds idempotent document ingestion via an ChangesIdempotent Document Ingestion
Storage ErrNotFound Enrichment and Windows Retry
Sequence Diagram(s)sequenceDiagram
participant Client
participant Handler
participant DB
participant Storage
Client->>Handler: POST /v1/documents (Idempotency-Key: K)
Handler->>DB: GetDocumentByIdempotencyKey(orgID, K)
alt document already exists
DB-->>Handler: existing Document
Handler-->>Client: 202 Accepted (document_id, status)
else not found — proceed with ingest
Handler->>Storage: write source object
Handler->>DB: NewDocument(doc, IdempotencyKey=K)
alt insert succeeds
DB-->>Handler: ok
Handler-->>Client: 202 Accepted (new document_id)
else ErrConflict — concurrent winner
DB-->>Handler: ErrConflict
Handler->>Storage: Delete orphaned object
Handler->>DB: GetDocumentByIdempotencyKey(orgID, K)
DB-->>Handler: winning Document
Handler-->>Client: 202 Accepted (winner document_id, status)
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Hey - I've found 6 issues, and left some high level feedback:
- The idempotency handling logic (lookup before insert, conflict handling, orphan source cleanup, response shaping) is now duplicated across the standalone HTTP, multi-tenant HTTP, and Connect handlers; consider extracting a shared helper to centralize this behavior and reduce the chance of subtle divergence over time.
- In the Windows-specific retry loop in Local.Get, the backoff and retry count are currently hardcoded; if you expect different environments or workloads, you might want to make these parameters configurable so they can be tuned without code changes.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The idempotency handling logic (lookup before insert, conflict handling, orphan source cleanup, response shaping) is now duplicated across the standalone HTTP, multi-tenant HTTP, and Connect handlers; consider extracting a shared helper to centralize this behavior and reduce the chance of subtle divergence over time.
- In the Windows-specific retry loop in Local.Get, the backoff and retry count are currently hardcoded; if you expect different environments or workloads, you might want to make these parameters configurable so they can be tuned without code changes.
## Individual Comments
### Comment 1
<location path="internal/api/server.go" line_range="324-332" />
<code_context>
}
if err := d.DB.NewDocument(ctx, db.Document{
- ID: docID,
- OrgID: standaloneOrgID,
- Title: title,
- ContentType: contentType,
- SourceRef: key,
- Status: db.StatusPending,
- ByteSize: size,
+ ID: docID,
+ OrgID: standaloneOrgID,
+ Title: title,
+ ContentType: contentType,
+ SourceRef: key,
+ Status: db.StatusPending,
+ ByteSize: size,
+ IdempotencyKey: idemKey,
}); err != nil {
+ // A concurrent same-key ingest won the race: drop the orphan source we
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Surface and log failures when cleanup deletes of orphaned sources fail
In the `ErrConflict` branch you call `d.Storage.Delete(ctx, key)` and ignore its error, which can leave dangling objects (e.g., on transient storage failures) with no visibility. Please at least log delete failures (e.g., at warn/error) so operators can detect accumulating orphans, or propagate a wrapped error when the subsequent lookup also fails. The same applies to the multi-tenant and connect handlers; consider handling and logging delete errors consistently across all three for operational clarity.
Suggested implementation:
```golang
if idemKey != "" && errors.Is(err, db.ErrConflict) {
if delErr := d.Storage.Delete(ctx, key); delErr != nil {
// Log cleanup failures so operators can detect accumulating orphaned sources.
d.Logger.Warn("failed to delete orphaned source after idempotency conflict",
"org_id", standaloneOrgID,
"source_ref", key,
"idempotency_key", idemKey,
"error", delErr,
)
}
if existing, lookupErr := d.DB.GetDocumentByIdempotencyKey(ctx, standaloneOrgID, idemKey); lookupErr == nil {
```
1. Adjust the logging call to match the logger used in this package (e.g., `d.logger.Warnw`, `d.log.Error`, `log.Printf`, etc.), including any context or fields conventions already present in `internal/api/server.go`.
2. Apply the same pattern (capture `Delete` error and log at warn/error) in the multi-tenant and connect handlers where the same `_ = d.Storage.Delete(ctx, key)` pattern is used for orphan cleanup.
3. If this project prefers error propagation, you may also want to propagate a wrapped error when both the delete and the subsequent `GetDocumentByIdempotencyKey` lookup fail, so callers can distinguish between plain DB conflicts and “conflict + cleanup failure” cases.
</issue_to_address>
### Comment 2
<location path="pkg/db/documents.go" line_range="102-111" />
<code_context>
+func (p *Pool) GetDocumentByIdempotencyKey(ctx context.Context, orgID, key string) (*Document, error) {
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Populate IdempotencyKey from the DB row instead of trusting the input key
The query doesn’t select `idempotency_key`, and the struct is populated with the input `key`. Even though the unique index should keep them aligned, it’s safer and more future-proof to hydrate `IdempotencyKey` from the DB column so the `Document` always reflects the persisted value and stays consistent with how other fields are mapped.
Suggested implementation:
```golang
row := p.QueryRow(ctx, `
SELECT id, org_id, store_id, title, content_type, source_ref, status, error_message,
byte_size, metadata, created_at, updated_at, toc_tree, idempotency_key
FROM documents WHERE org_id = $1 AND idempotency_key = $2`, orgID, key)
```
Based on the partial snippet, you’ll also need to:
1. Locate the `row.Scan(...)` in `GetDocumentByIdempotencyKey`. Add a destination for `idempotency_key` in the same positional order as added in the SELECT, for example:
- If you have something like:
`err := row.Scan(&d.ID, &d.OrgID, &d.StoreID, &d.Title, &d.ContentType, &d.SourceRef, &d.Status, &d.ErrorMessage, &d.ByteSize, &meta, &d.CreatedAt, &d.UpdatedAt, &d.TOCTree)`
- Change it to:
`err := row.Scan(&d.ID, &d.OrgID, &d.StoreID, &d.Title, &d.ContentType, &d.SourceRef, &d.Status, &d.ErrorMessage, &d.ByteSize, &meta, &d.CreatedAt, &d.UpdatedAt, &d.TOCTree, &d.IdempotencyKey)`
2. Remove any code that sets `d.IdempotencyKey = key` (or equivalent) so that the value always comes from the database row.
3. Ensure the `Document` struct has an `IdempotencyKey` field with the correct type and JSON/db tags; if it doesn’t, add it and also include it in any other queries that need to hydrate this field.
</issue_to_address>
### Comment 3
<location path="pkg/storage/local_test.go" line_range="68-77" />
<code_context>
+func TestLocalGetRetriesTransientNotExist(t *testing.T) {
</code_context>
<issue_to_address>
**issue (testing):** TestLocalGetRetriesTransientNotExist doesn't actually assert the Windows retry behavior and would still pass if retries were removed
As written, the test only checks for eventual success and would still pass even if there were no retries: on Windows, a hard ErrNotFound on the first Get would still succeed after the file is written via the `if err != nil { <-done; rc, _, err = l.Get(ctx, key) }` path. To actually verify the Defender-related retry behavior, you could either:
- Make the behavior OS-specific: on Windows, require the first Get to succeed (and fail if it returns ErrNotFound or any error), while keeping the current fallback for non-Windows; or
- Add a test hook in Local (e.g., a retry counter) and assert that at least 2 attempts occur on Windows.
This ensures the test fails if the retry loop is removed or shortened.
</issue_to_address>
### Comment 4
<location path="README.md" line_range="167-168" />
<code_context>
> **Anthropic-compatible gateways (GLM/Z.ai):** `base_url` **must include `/v1`** — the client posts to `${base_url}/messages`.
+> **Windows + local storage:** Windows Defender real-time protection scans
+> each freshly-written file and briefly hides it from `os.Stat`/`os.Open`,
+> which under heavy concurrent ingestion can surface as transient
+> `object not found` errors. The local backend rides through this window with
</code_context>
<issue_to_address>
**nitpick (typo):** Consider removing the hyphen in "freshly-written" for more natural phrasing.
Adverbs ending in -ly usually don’t take a hyphen before the participle, so "freshly written file" is the more standard form.
```suggestion
> **Windows + local storage:** Windows Defender real-time protection scans
> each freshly written file and briefly hides it from `os.Stat`/`os.Open`,
```
</issue_to_address>
### Comment 5
<location path="internal/handler/documents.go" line_range="161" />
<code_context>
ctx := r.Context()
docID := ingest.NewDocumentID()
+ // Idempotent ingest: if the client supplied an Idempotency-Key and we
+ // already have a document for this org under that key, return it instead
+ // of creating a duplicate. This makes a client/transport retry of the
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the repeated idempotency lookup and response into a shared helper to simplify HandleIngestDocument and avoid duplicated logic.
You can reduce the new branching/duplication by extracting the idempotency lookup + response into a small helper used in both places (pre-insert and conflict path). That keeps behavior identical but flattens `HandleIngestDocument` and avoids repeating the same logic/JSON response.
For example:
```go
func (h *DocumentsHandler) writeIdempotentDocIfExists(
ctx context.Context,
w http.ResponseWriter,
orgID string,
idemKey string,
) (bool, error) {
if idemKey == "" {
return false, nil
}
existing, err := h.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey)
if err != nil {
if errors.Is(err, db.ErrNotFound) {
return false, nil
}
return false, err
}
writeJSON(w, http.StatusAccepted, map[string]any{
"document_id": existing.ID,
"status": string(existing.Status),
})
return true, nil
}
```
Then `HandleIngestDocument` becomes:
```go
idemKey := r.Header.Get("Idempotency-Key")
if ok, err := h.writeIdempotentDocIfExists(ctx, w, orgID, idemKey); err != nil {
h.logger.Error("ingest: idempotency lookup failed", "err", err)
writeErr(w, http.StatusInternalServerError, "db read failed")
return
} else if ok {
return
}
```
And the conflict branch reuses the same helper instead of duplicating the lookup/response:
```go
if err := h.db.NewDocument(ctx, db.Document{
ID: docID,
OrgID: orgID,
StoreID: storeID(r),
Title: title,
ContentType: contentType,
SourceRef: key,
Status: db.StatusPending,
ByteSize: size,
IdempotencyKey: idemKey,
}); err != nil {
if idemKey != "" && errors.Is(err, db.ErrConflict) {
_ = h.storage.Delete(ctx, key)
if ok, lookupErr := h.writeIdempotentDocIfExists(ctx, w, orgID, idemKey); lookupErr == nil && ok {
return
}
// fall through to generic error handling on lookup failure
}
h.logger.Error("ingest: db insert failed", "err", err)
writeErr(w, http.StatusInternalServerError, "db write failed")
return
}
```
This keeps all semantics (including logging, status codes, response shape) while centralizing the idempotency behavior and reducing the handler’s complexity.
</issue_to_address>
### Comment 6
<location path="internal/connecthandler/documents.go" line_range="85" />
<code_context>
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("content is required"))
}
+ // Idempotent ingest: a repeat CreateDocument carrying the same
+ // Idempotency-Key returns the original document rather than a duplicate
+ // (mirrors the REST path; closes the HAL-323 duplicate-upload bug for SDK
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the idempotent ingest flow into a shared helper so the CreateDocument handler only wires inputs/outputs and error mapping.
You can reduce the added complexity by extracting the idempotency logic into a shared helper and keeping the Connect handler focused on protocol-specific wiring.
### 1. Extract transport-agnostic idempotent ingest helper
Introduce a helper that encapsulates:
- Pre-insert lookup by idempotency key.
- `NewDocument` insert (with `IdempotencyKey`).
- Conflict resolution with source cleanup and winner lookup.
Example (adapt types/names as needed):
```go
type IdempotentIngestResult struct {
Document db.Document
// Reused is true if we returned an existing doc instead of creating a new one.
Reused bool
}
func (s *Server) idempotentIngest(
ctx context.Context,
orgID string,
idemKey string,
doc db.Document,
putSource func() (sourceKey string, err error),
deleteSource func(ctx context.Context, key string) error,
) (IdempotentIngestResult, error) {
if idemKey != "" {
if existing, err := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); err == nil {
return IdempotentIngestResult{Document: existing, Reused: true}, nil
} else if !errors.Is(err, db.ErrNotFound) {
return IdempotentIngestResult{}, fmt.Errorf("idempotency lookup: %w", err)
}
}
key, err := putSource()
if err != nil {
return IdempotentIngestResult{}, fmt.Errorf("storage put: %w", err)
}
doc.SourceRef = key
doc.IdempotencyKey = idemKey
if err := s.db.NewDocument(ctx, doc); err != nil {
if idemKey != "" && errors.Is(err, db.ErrConflict) {
_ = deleteSource(ctx, key)
if existing, lookupErr := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); lookupErr == nil {
return IdempotentIngestResult{Document: existing, Reused: true}, nil
}
}
return IdempotentIngestResult{}, fmt.Errorf("db insert: %w", err)
}
return IdempotentIngestResult{Document: doc, Reused: false}, nil
}
```
This keeps all current behavior (pre-lookup, conflict handling, source delete) but centralizes it.
### 2. Simplify `CreateDocument` to wiring + error mapping
Then your Connect handler becomes mostly data preparation and error mapping, instead of open-coding the idempotency flow:
```go
idemKey := req.Header().Get("Idempotency-Key")
doc := db.Document{
ID: docID,
OrgID: orgID,
StoreID: storeIDFromConnect(req),
Title: title,
ContentType: contentType,
Status: db.StatusPending,
ByteSize: size,
// SourceRef and IdempotencyKey set in helper.
}
res, err := s.idempotentIngest(
ctx,
orgID,
idemKey,
doc,
func() (string, error) {
if err := s.storage.Put(ctx, key, reader, size); err != nil {
return "", err
}
return key, nil
},
s.storage.Delete,
)
if err != nil {
s.logger.Error("ingest: idempotent ingest failed", "err", err)
return nil, connect.NewError(connect.CodeInternal, errors.New("ingest failed"))
}
return connect.NewResponse(&v1.CreateDocumentResponse{
DocumentId: string(res.Document.ID),
Status: string(res.Document.Status),
}), nil
```
Benefits:
- `CreateDocument` becomes “validate → call helper → build response”.
- Idempotency logic is shared across Connect and REST handlers, so future changes (e.g., new status handling, logging) are done in one place.
- The conflict + delete + re-lookup pattern is no longer duplicated and harder to get subtly out of sync.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| func TestLocalGetRetriesTransientNotExist(t *testing.T) { | ||
| // Simulate the Windows Defender scan window: the source does not exist when | ||
| // Get is first called, then appears shortly after. On Windows the bounded | ||
| // internal retry must ride through the gap and return the bytes rather than a | ||
| // hard not-found. (On non-Windows there is a single pass, so this only | ||
| // asserts the eventual success once the file is present.) | ||
| l, err := NewLocal(t.TempDir()) | ||
| if err != nil { | ||
| t.Fatalf("NewLocal: %v", err) | ||
| } |
There was a problem hiding this comment.
issue (testing): TestLocalGetRetriesTransientNotExist doesn't actually assert the Windows retry behavior and would still pass if retries were removed
As written, the test only checks for eventual success and would still pass even if there were no retries: on Windows, a hard ErrNotFound on the first Get would still succeed after the file is written via the if err != nil { <-done; rc, _, err = l.Get(ctx, key) } path. To actually verify the Defender-related retry behavior, you could either:
- Make the behavior OS-specific: on Windows, require the first Get to succeed (and fail if it returns ErrNotFound or any error), while keeping the current fallback for non-Windows; or
- Add a test hook in Local (e.g., a retry counter) and assert that at least 2 attempts occur on Windows.
This ensures the test fails if the retry loop is removed or shortened.
| ctx := r.Context() | ||
| docID := ingest.NewDocumentID() | ||
|
|
||
| // Idempotent ingest: if the client supplied an Idempotency-Key and we |
There was a problem hiding this comment.
issue (complexity): Consider extracting the repeated idempotency lookup and response into a shared helper to simplify HandleIngestDocument and avoid duplicated logic.
You can reduce the new branching/duplication by extracting the idempotency lookup + response into a small helper used in both places (pre-insert and conflict path). That keeps behavior identical but flattens HandleIngestDocument and avoids repeating the same logic/JSON response.
For example:
func (h *DocumentsHandler) writeIdempotentDocIfExists(
ctx context.Context,
w http.ResponseWriter,
orgID string,
idemKey string,
) (bool, error) {
if idemKey == "" {
return false, nil
}
existing, err := h.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey)
if err != nil {
if errors.Is(err, db.ErrNotFound) {
return false, nil
}
return false, err
}
writeJSON(w, http.StatusAccepted, map[string]any{
"document_id": existing.ID,
"status": string(existing.Status),
})
return true, nil
}Then HandleIngestDocument becomes:
idemKey := r.Header.Get("Idempotency-Key")
if ok, err := h.writeIdempotentDocIfExists(ctx, w, orgID, idemKey); err != nil {
h.logger.Error("ingest: idempotency lookup failed", "err", err)
writeErr(w, http.StatusInternalServerError, "db read failed")
return
} else if ok {
return
}And the conflict branch reuses the same helper instead of duplicating the lookup/response:
if err := h.db.NewDocument(ctx, db.Document{
ID: docID,
OrgID: orgID,
StoreID: storeID(r),
Title: title,
ContentType: contentType,
SourceRef: key,
Status: db.StatusPending,
ByteSize: size,
IdempotencyKey: idemKey,
}); err != nil {
if idemKey != "" && errors.Is(err, db.ErrConflict) {
_ = h.storage.Delete(ctx, key)
if ok, lookupErr := h.writeIdempotentDocIfExists(ctx, w, orgID, idemKey); lookupErr == nil && ok {
return
}
// fall through to generic error handling on lookup failure
}
h.logger.Error("ingest: db insert failed", "err", err)
writeErr(w, http.StatusInternalServerError, "db write failed")
return
}This keeps all semantics (including logging, status codes, response shape) while centralizing the idempotency behavior and reducing the handler’s complexity.
| return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("content is required")) | ||
| } | ||
|
|
||
| // Idempotent ingest: a repeat CreateDocument carrying the same |
There was a problem hiding this comment.
issue (complexity): Consider extracting the idempotent ingest flow into a shared helper so the CreateDocument handler only wires inputs/outputs and error mapping.
You can reduce the added complexity by extracting the idempotency logic into a shared helper and keeping the Connect handler focused on protocol-specific wiring.
1. Extract transport-agnostic idempotent ingest helper
Introduce a helper that encapsulates:
- Pre-insert lookup by idempotency key.
NewDocumentinsert (withIdempotencyKey).- Conflict resolution with source cleanup and winner lookup.
Example (adapt types/names as needed):
type IdempotentIngestResult struct {
Document db.Document
// Reused is true if we returned an existing doc instead of creating a new one.
Reused bool
}
func (s *Server) idempotentIngest(
ctx context.Context,
orgID string,
idemKey string,
doc db.Document,
putSource func() (sourceKey string, err error),
deleteSource func(ctx context.Context, key string) error,
) (IdempotentIngestResult, error) {
if idemKey != "" {
if existing, err := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); err == nil {
return IdempotentIngestResult{Document: existing, Reused: true}, nil
} else if !errors.Is(err, db.ErrNotFound) {
return IdempotentIngestResult{}, fmt.Errorf("idempotency lookup: %w", err)
}
}
key, err := putSource()
if err != nil {
return IdempotentIngestResult{}, fmt.Errorf("storage put: %w", err)
}
doc.SourceRef = key
doc.IdempotencyKey = idemKey
if err := s.db.NewDocument(ctx, doc); err != nil {
if idemKey != "" && errors.Is(err, db.ErrConflict) {
_ = deleteSource(ctx, key)
if existing, lookupErr := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); lookupErr == nil {
return IdempotentIngestResult{Document: existing, Reused: true}, nil
}
}
return IdempotentIngestResult{}, fmt.Errorf("db insert: %w", err)
}
return IdempotentIngestResult{Document: doc, Reused: false}, nil
}This keeps all current behavior (pre-lookup, conflict handling, source delete) but centralizes it.
2. Simplify CreateDocument to wiring + error mapping
Then your Connect handler becomes mostly data preparation and error mapping, instead of open-coding the idempotency flow:
idemKey := req.Header().Get("Idempotency-Key")
doc := db.Document{
ID: docID,
OrgID: orgID,
StoreID: storeIDFromConnect(req),
Title: title,
ContentType: contentType,
Status: db.StatusPending,
ByteSize: size,
// SourceRef and IdempotencyKey set in helper.
}
res, err := s.idempotentIngest(
ctx,
orgID,
idemKey,
doc,
func() (string, error) {
if err := s.storage.Put(ctx, key, reader, size); err != nil {
return "", err
}
return key, nil
},
s.storage.Delete,
)
if err != nil {
s.logger.Error("ingest: idempotent ingest failed", "err", err)
return nil, connect.NewError(connect.CodeInternal, errors.New("ingest failed"))
}
return connect.NewResponse(&v1.CreateDocumentResponse{
DocumentId: string(res.Document.ID),
Status: string(res.Document.Status),
}), nilBenefits:
CreateDocumentbecomes “validate → call helper → build response”.- Idempotency logic is shared across Connect and REST handlers, so future changes (e.g., new status handling, logging) are done in one place.
- The conflict + delete + re-lookup pattern is no longer duplicated and harder to get subtly out of sync.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/connecthandler/documents.go`:
- Around line 137-139: The storage.Delete call in the conflict handling block is
silently ignoring cleanup errors by using the blank identifier, which allows
orphan objects to accumulate undetected during repeated conflict races. Capture
the error returned by s.storage.Delete(ctx, key) instead of discarding it with
underscore, and handle it appropriately by logging it or taking other corrective
action to ensure cleanup failures are visible and traceable rather than silently
lost.
In `@internal/handler/documents.go`:
- Around line 161-179: The idempotency middleware is caching responses based
solely on the Idempotency-Key header, which allows cross-org response leakage
where two organizations using the same key can receive a cached 202 response
from another org. You need to locate the mounted idempotency middleware
implementation and modify its cache key composition to include the org_id or
tenant identifier along with the Idempotency-Key header value, ensuring that the
middleware respects the same (org_id, idempotency_key) scoping that the handler
code in GetDocumentByIdempotencyKey enforces.
In `@pkg/storage/local_test.go`:
- Around line 95-101: The unconditional fallback to the second Get call in the
error handling block at lines 95-101 allows the test to pass even when the
Windows retry path is broken, masking potential regressions. Remove or properly
condition the fallback Get call that reads the key after detecting an error from
the done channel, ensuring the test actually validates that the Local.Get retry
behavior functions correctly on Windows rather than bypassing the failure with a
direct read fallback.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 93c8aa6a-d051-4ee7-a6c3-214b90e3b1f6
📒 Files selected for processing (14)
README.mdinternal/api/server.gointernal/connecthandler/documents.gointernal/handler/documents.goopenapi.yamlpkg/db/db.gopkg/db/documents.gopkg/db/idempotency_integration_test.gopkg/db/migrations/0007_documents_idempotency_key.down.sqlpkg/db/migrations/0007_documents_idempotency_key.up.sqlpkg/storage/gcs.gopkg/storage/local.gopkg/storage/local_test.gopkg/storage/s3.go
…an cleanup - middleware.Idempotency: compose the cache key as method|path|org|key instead of the bare Idempotency-Key, so two tenants reusing the same opaque key can't receive each other's cached 202 (cross-tenant leak). New tests cover same-org replay, cross-org isolation, and keyless pass-through. (CodeRabbit, Major) - All three ingest handlers now log orphan-source cleanup failures on the idempotency-conflict path instead of discarding the Delete error. (Sourcery + CodeRabbit) - GetDocumentByIdempotencyKey hydrates IdempotencyKey from the DB column. - Strengthen TestLocalGetRetriesTransientNotExist: on Windows it now asserts the FIRST Get succeeds, so the test fails if the retry loop is removed. (Sourcery) - README: "freshly written" typo.
|
Thanks @coderabbitai and @sourcery-ai — triaged and addressed in 0ebf0f5:
Deliberately not doing the |
|
Tip For best results, initiate chat on the files or code changes.
🐇 🎉 If you found this review helpful, would you consider giving us a shout-out on X? Thank you for using CodeRabbit! |
|
@jules run your own review |
Why
Continuation of HAL-321. A full 20-doc FinanceBench run on Windows still ended with duplicate documents (the same corpus doc ingested up to 6×) and unexplained pathless
object not foundfailure rows. This fixes both root causes.Root causes
Duplicate uploads. The SDK transport wraps the ingest POST in
sync_retry, which re-POSTs on any transient exception (or 5xx/408). With no server-side idempotency, a connection reset that lands after the server already committed the document made the SDK create a brand-newdoc_id+ source + parse job on every retry. The SDK already sends an optionalIdempotency-Keyheader ("Prevents duplicate ingestion") — the engine just ignored it.Pathless
object not found.storage.Local.Getalways appends the resolved path, yet the DB had pathless rows. The other backends (andDelete) returned the bareErrNotFoundsentinel, andlocal.Get's open-after-stat branch returned a raw error thatgetSourceWithRetrywould not retry. The pathless rows were pre-path-wrap leftovers; this makes every not-found self-attributing so it can't recur ambiguously.What changed
idempotency_keycolumn + partial unique index on(org_id, idempotency_key)(migration 0007). All three ingest handlers (standaloneinternal/api, multi-tenant RESTinternal/handler, Connect RPCinternal/connecthandler) look up an existing doc by key and return it instead of creating a duplicate; the concurrent same-key race drops the orphan source and returns the winner.db.ErrConflictmapped from SQLSTATE 23505;GetDocumentByIdempotencyKey.ErrNotFoundnow carries the resolved key/path (local Get+Delete, s3 Get+Delete, gcs Get+Delete).local.Getwraps the open-after-stat miss as retryable ErrNotFound and adds a Windows-only bounded retry to ride through the Defender scan window that transiently hides freshly-written files.Idempotency-Keyheader; README documents the mandatory Windows Defender exclusion for the storage root.Verification
go build/go vetclean; full test suite green, including newpkg/dbintegration tests against live Postgres (VLE_TEST_DATABASE_URL) andpkg/storageWindows-retry/path tests.POST /v1/documentswith oneIdempotency-Keyall return the samedocument_id(reflecting live pending→ready status); a keyless control gets its own id; exactly one row per key in the DB.Companion
vectorless-benchnow sends a stablevlbench:{org}:{doc_id}idempotency key so all its setup retries dedup (separate PR).Remaining (separate, infra/time-bound): Linux/CI full-run repro (AC3) and the canonical judged 3-way re-run + headline numbers (AC4).
Closes HAL-323
Summary by Sourcery
Implement idempotent document ingestion and enrich storage not-found errors with full paths to aid diagnosis.
New Features:
Enhancements:
Tests:
Summary by CodeRabbit
New Features
Idempotency-Keyheader to prevent duplicate processing of repeated requests.Bug Fixes
Documentation