-
Notifications
You must be signed in to change notification settings - Fork 0
Idempotent ingest + self-diagnosing not-found (HAL-323) #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -82,6 +82,23 @@ func (s *DocumentsService) CreateDocument( | |
| return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("content is required")) | ||
| } | ||
|
|
||
| // Idempotent ingest: a repeat CreateDocument carrying the same | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider extracting the idempotent ingest flow into a shared helper so the CreateDocument handler only wires inputs/outputs and error mapping. You can reduce the added complexity by extracting the idempotency logic into a shared helper and keeping the Connect handler focused on protocol-specific wiring. 1. Extract transport-agnostic idempotent ingest helperIntroduce a helper that encapsulates:
Example (adapt types/names as needed): type IdempotentIngestResult struct {
Document db.Document
// Reused is true if we returned an existing doc instead of creating a new one.
Reused bool
}
func (s *Server) idempotentIngest(
ctx context.Context,
orgID string,
idemKey string,
doc db.Document,
putSource func() (sourceKey string, err error),
deleteSource func(ctx context.Context, key string) error,
) (IdempotentIngestResult, error) {
if idemKey != "" {
if existing, err := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); err == nil {
return IdempotentIngestResult{Document: existing, Reused: true}, nil
} else if !errors.Is(err, db.ErrNotFound) {
return IdempotentIngestResult{}, fmt.Errorf("idempotency lookup: %w", err)
}
}
key, err := putSource()
if err != nil {
return IdempotentIngestResult{}, fmt.Errorf("storage put: %w", err)
}
doc.SourceRef = key
doc.IdempotencyKey = idemKey
if err := s.db.NewDocument(ctx, doc); err != nil {
if idemKey != "" && errors.Is(err, db.ErrConflict) {
_ = deleteSource(ctx, key)
if existing, lookupErr := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); lookupErr == nil {
return IdempotentIngestResult{Document: existing, Reused: true}, nil
}
}
return IdempotentIngestResult{}, fmt.Errorf("db insert: %w", err)
}
return IdempotentIngestResult{Document: doc, Reused: false}, nil
}This keeps all current behavior (pre-lookup, conflict handling, source delete) but centralizes it. 2. Simplify
|
||
| // Idempotency-Key returns the original document rather than a duplicate | ||
| // (mirrors the REST path; closes the HAL-323 duplicate-upload bug for SDK | ||
| // callers that retry the RPC). | ||
| idemKey := req.Header().Get("Idempotency-Key") | ||
| if idemKey != "" { | ||
| if existing, err := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); err == nil { | ||
| return connect.NewResponse(&v1.CreateDocumentResponse{ | ||
| DocumentId: string(existing.ID), | ||
| Status: string(existing.Status), | ||
| }), nil | ||
| } else if !errors.Is(err, db.ErrNotFound) { | ||
| s.logger.Error("ingest: idempotency lookup failed", "err", err) | ||
| return nil, connect.NewError(connect.CodeInternal, errors.New("db read failed")) | ||
| } | ||
| } | ||
|
|
||
| docID := ingest.NewDocumentID() | ||
| contentType := msg.ContentType | ||
| if contentType == "" { | ||
|
|
@@ -105,15 +122,29 @@ func (s *DocumentsService) CreateDocument( | |
| } | ||
|
|
||
| if err := s.db.NewDocument(ctx, db.Document{ | ||
| ID: docID, | ||
| OrgID: orgID, | ||
| StoreID: storeIDFromConnect(req), | ||
| Title: title, | ||
| ContentType: contentType, | ||
| SourceRef: key, | ||
| Status: db.StatusPending, | ||
| ByteSize: size, | ||
| ID: docID, | ||
| OrgID: orgID, | ||
| StoreID: storeIDFromConnect(req), | ||
| Title: title, | ||
| ContentType: contentType, | ||
| SourceRef: key, | ||
| Status: db.StatusPending, | ||
| ByteSize: size, | ||
| IdempotencyKey: idemKey, | ||
| }); err != nil { | ||
| // Concurrent same-key insert won the race: drop the orphan source and | ||
| // return the winner instead of erroring. | ||
| if idemKey != "" && errors.Is(err, db.ErrConflict) { | ||
| if delErr := s.storage.Delete(ctx, key); delErr != nil { | ||
| s.logger.Warn("ingest: orphan source cleanup failed", "err", delErr, "source_ref", key, "idempotency_key", idemKey) | ||
| } | ||
| if existing, lookupErr := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); lookupErr == nil { | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| return connect.NewResponse(&v1.CreateDocumentResponse{ | ||
| DocumentId: string(existing.ID), | ||
| Status: string(existing.Status), | ||
| }), nil | ||
| } | ||
| } | ||
| s.logger.Error("ingest: db insert failed", "err", err) | ||
| return nil, connect.NewError(connect.CodeInternal, errors.New("db write failed")) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -158,6 +158,25 @@ func (h *DocumentsHandler) HandleIngestDocument(w http.ResponseWriter, r *http.R | |
| ctx := r.Context() | ||
| docID := ingest.NewDocumentID() | ||
|
|
||
| // Idempotent ingest: if the client supplied an Idempotency-Key and we | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider extracting the repeated idempotency lookup and response into a shared helper to simplify HandleIngestDocument and avoid duplicated logic. You can reduce the new branching/duplication by extracting the idempotency lookup + response into a small helper used in both places (pre-insert and conflict path). That keeps behavior identical but flattens For example: func (h *DocumentsHandler) writeIdempotentDocIfExists(
ctx context.Context,
w http.ResponseWriter,
orgID string,
idemKey string,
) (bool, error) {
if idemKey == "" {
return false, nil
}
existing, err := h.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey)
if err != nil {
if errors.Is(err, db.ErrNotFound) {
return false, nil
}
return false, err
}
writeJSON(w, http.StatusAccepted, map[string]any{
"document_id": existing.ID,
"status": string(existing.Status),
})
return true, nil
}Then idemKey := r.Header.Get("Idempotency-Key")
if ok, err := h.writeIdempotentDocIfExists(ctx, w, orgID, idemKey); err != nil {
h.logger.Error("ingest: idempotency lookup failed", "err", err)
writeErr(w, http.StatusInternalServerError, "db read failed")
return
} else if ok {
return
}And the conflict branch reuses the same helper instead of duplicating the lookup/response: if err := h.db.NewDocument(ctx, db.Document{
ID: docID,
OrgID: orgID,
StoreID: storeID(r),
Title: title,
ContentType: contentType,
SourceRef: key,
Status: db.StatusPending,
ByteSize: size,
IdempotencyKey: idemKey,
}); err != nil {
if idemKey != "" && errors.Is(err, db.ErrConflict) {
_ = h.storage.Delete(ctx, key)
if ok, lookupErr := h.writeIdempotentDocIfExists(ctx, w, orgID, idemKey); lookupErr == nil && ok {
return
}
// fall through to generic error handling on lookup failure
}
h.logger.Error("ingest: db insert failed", "err", err)
writeErr(w, http.StatusInternalServerError, "db write failed")
return
}This keeps all semantics (including logging, status codes, response shape) while centralizing the idempotency behavior and reducing the handler’s complexity. |
||
| // already have a document for this org under that key, return it instead | ||
| // of creating a duplicate. This makes a client/transport retry of the | ||
| // ingest POST (the HAL-323 duplicate-upload bug) a no-op. | ||
| idemKey := r.Header.Get("Idempotency-Key") | ||
| if idemKey != "" { | ||
| if existing, err := h.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); err == nil { | ||
| writeJSON(w, http.StatusAccepted, map[string]any{ | ||
| "document_id": existing.ID, | ||
| "status": string(existing.Status), | ||
| }) | ||
| return | ||
| } else if !errors.Is(err, db.ErrNotFound) { | ||
| h.logger.Error("ingest: idempotency lookup failed", "err", err) | ||
| writeErr(w, http.StatusInternalServerError, "db read failed") | ||
| return | ||
| } | ||
| } | ||
|
|
||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| var ( | ||
| filename string | ||
| contentType string | ||
|
|
@@ -227,15 +246,31 @@ func (h *DocumentsHandler) HandleIngestDocument(w http.ResponseWriter, r *http.R | |
| } | ||
|
|
||
| if err := h.db.NewDocument(ctx, db.Document{ | ||
| ID: docID, | ||
| OrgID: orgID, | ||
| StoreID: storeID(r), | ||
| Title: title, | ||
| ContentType: contentType, | ||
| SourceRef: key, | ||
| Status: db.StatusPending, | ||
| ByteSize: size, | ||
| ID: docID, | ||
| OrgID: orgID, | ||
| StoreID: storeID(r), | ||
| Title: title, | ||
| ContentType: contentType, | ||
| SourceRef: key, | ||
| Status: db.StatusPending, | ||
| ByteSize: size, | ||
| IdempotencyKey: idemKey, | ||
| }); err != nil { | ||
| // A concurrent request with the same Idempotency-Key won the race and | ||
| // inserted first. Drop the orphan source we just wrote and return the | ||
| // winner's document rather than a 500. | ||
| if idemKey != "" && errors.Is(err, db.ErrConflict) { | ||
| if delErr := h.storage.Delete(ctx, key); delErr != nil { | ||
| h.logger.Warn("ingest: orphan source cleanup failed", "err", delErr, "source_ref", key, "idempotency_key", idemKey) | ||
| } | ||
| if existing, lookupErr := h.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); lookupErr == nil { | ||
| writeJSON(w, http.StatusAccepted, map[string]any{ | ||
| "document_id": existing.ID, | ||
| "status": string(existing.Status), | ||
| }) | ||
| return | ||
| } | ||
| } | ||
| h.logger.Error("ingest: db insert failed", "err", err) | ||
| writeErr(w, http.StatusInternalServerError, "db write failed") | ||
| return | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| package middleware | ||
|
|
||
| import ( | ||
| "net/http" | ||
| "net/http/httptest" | ||
| "sync/atomic" | ||
| "testing" | ||
| ) | ||
|
|
||
| // handler that returns a per-org body and counts invocations, so we can tell a | ||
| // cached replay from a real pass-through. | ||
| func countingOrgHandler(calls *atomic.Int64) http.Handler { | ||
| return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| calls.Add(1) | ||
| w.WriteHeader(http.StatusAccepted) | ||
| _, _ = w.Write([]byte("doc-for-" + r.Header.Get("X-Vectorless-Org"))) | ||
| }) | ||
| } | ||
|
|
||
| func post(mw http.Handler, org, key string) *httptest.ResponseRecorder { | ||
| req := httptest.NewRequest(http.MethodPost, "/v1/documents", nil) | ||
| if org != "" { | ||
| req.Header.Set("X-Vectorless-Org", org) | ||
| } | ||
| if key != "" { | ||
| req.Header.Set("Idempotency-Key", key) | ||
| } | ||
| rec := httptest.NewRecorder() | ||
| mw.ServeHTTP(rec, req) | ||
| return rec | ||
| } | ||
|
|
||
| func body(rec *httptest.ResponseRecorder) string { | ||
| return rec.Body.String() | ||
| } | ||
|
|
||
| func TestIdempotencySameOrgReplays(t *testing.T) { | ||
| var calls atomic.Int64 | ||
| mw := Idempotency(IdempotencyConfig{})(countingOrgHandler(&calls)) | ||
|
|
||
| first := post(mw, "orgA", "k1") | ||
| second := post(mw, "orgA", "k1") | ||
|
|
||
| if calls.Load() != 1 { | ||
| t.Fatalf("handler invoked %d times, want 1 (second should replay)", calls.Load()) | ||
| } | ||
| if got := second.Header().Get("X-Idempotency-Replayed"); got != "true" { | ||
| t.Fatalf("second response missing replay header, got %q", got) | ||
| } | ||
| if body(first) != body(second) { | ||
| t.Fatalf("replay body %q != original %q", body(second), body(first)) | ||
| } | ||
| } | ||
|
|
||
| func TestIdempotencyDifferentOrgsDoNotCollide(t *testing.T) { | ||
| // The core cross-tenant guarantee: two orgs sending the SAME opaque | ||
| // Idempotency-Key must each hit the handler and get THEIR OWN response — | ||
| // org B must never be handed org A's cached document. | ||
| var calls atomic.Int64 | ||
| mw := Idempotency(IdempotencyConfig{})(countingOrgHandler(&calls)) | ||
|
|
||
| a := post(mw, "orgA", "shared-key") | ||
| b := post(mw, "orgB", "shared-key") | ||
|
|
||
| if calls.Load() != 2 { | ||
| t.Fatalf("handler invoked %d times, want 2 (no cross-org collision)", calls.Load()) | ||
| } | ||
| if b.Header().Get("X-Idempotency-Replayed") == "true" { | ||
| t.Fatal("org B got a replayed response — cross-tenant leak") | ||
| } | ||
| if body(a) == body(b) { | ||
| t.Fatalf("org B received org A's body %q — cross-tenant leak", body(b)) | ||
| } | ||
| if body(b) != "doc-for-orgB" { | ||
| t.Fatalf("org B body = %q, want doc-for-orgB", body(b)) | ||
| } | ||
| } | ||
|
|
||
| func TestIdempotencyNoKeyAlwaysPassesThrough(t *testing.T) { | ||
| var calls atomic.Int64 | ||
| mw := Idempotency(IdempotencyConfig{})(countingOrgHandler(&calls)) | ||
|
|
||
| post(mw, "orgA", "") | ||
| post(mw, "orgA", "") | ||
|
|
||
| if calls.Load() != 2 { | ||
| t.Fatalf("handler invoked %d times, want 2 (no key = no caching)", calls.Load()) | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.