Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ llm:

> **Anthropic-compatible gateways (GLM/Z.ai):** `base_url` **must include `/v1`** — the client posts to `${base_url}/messages`.

> **Windows + local storage:** Windows Defender real-time protection scans
> each freshly written file and briefly hides it from `os.Stat`/`os.Open`,
> which under heavy concurrent ingestion can surface as transient
> `object not found` errors. The local backend rides through this window with
> a short internal retry, but for large bulk loads **add a Defender exclusion
> for your storage root** (`local.root` / `VLE_STORAGE_LOCAL_ROOT`):
> `Add-MpPreference -ExclusionPath "C:\path\to\data\documents"`. Linux has no
> such scan-hold and needs no exclusion.

### Supported formats

PDF (positioned text + tables via [`pdftable`](https://github.com/hallelx2/pdftable)) · Markdown · HTML · DOCX · Text.
Expand Down
51 changes: 44 additions & 7 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,28 @@ func (d Deps) handleIngestDocument(w http.ResponseWriter, r *http.Request) {

docID := ingest.NewDocumentID()

// Idempotent ingest: if the caller supplied an Idempotency-Key and a
// document already exists under it (for the standalone org), return that
// document instead of creating a duplicate. A client — or the SDK's own
// transport retry-on-transient — that re-POSTs the same upload after a
// reset that landed AFTER the server committed the row is then a no-op.
// This is the standalone/local-mode half of the HAL-323 duplicate-upload
// fix (the multi-tenant handler in internal/handler mirrors it).
idemKey := r.Header.Get("Idempotency-Key")
if idemKey != "" {
if existing, err := d.DB.GetDocumentByIdempotencyKey(ctx, standaloneOrgID, idemKey); err == nil {
writeJSON(w, http.StatusAccepted, map[string]any{
"document_id": existing.ID,
"status": string(existing.Status),
})
return
} else if !errors.Is(err, db.ErrNotFound) {
d.Logger.Error("ingest: idempotency lookup failed", "err", err)
writeErr(w, http.StatusInternalServerError, "db read failed")
return
}
}

var (
filename string
contentType string
Expand Down Expand Up @@ -300,14 +322,29 @@ func (d Deps) handleIngestDocument(w http.ResponseWriter, r *http.Request) {
}

if err := d.DB.NewDocument(ctx, db.Document{
ID: docID,
OrgID: standaloneOrgID,
Title: title,
ContentType: contentType,
SourceRef: key,
Status: db.StatusPending,
ByteSize: size,
ID: docID,
OrgID: standaloneOrgID,
Title: title,
ContentType: contentType,
SourceRef: key,
Status: db.StatusPending,
ByteSize: size,
IdempotencyKey: idemKey,
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
}); err != nil {
// A concurrent same-key ingest won the race: drop the orphan source we
// just wrote and return the winner instead of a 500.
if idemKey != "" && errors.Is(err, db.ErrConflict) {
if delErr := d.Storage.Delete(ctx, key); delErr != nil {
d.Logger.Warn("ingest: orphan source cleanup failed", "err", delErr, "source_ref", key, "idempotency_key", idemKey)
}
if existing, lookupErr := d.DB.GetDocumentByIdempotencyKey(ctx, standaloneOrgID, idemKey); lookupErr == nil {
writeJSON(w, http.StatusAccepted, map[string]any{
"document_id": existing.ID,
"status": string(existing.Status),
})
return
}
}
d.Logger.Error("ingest: db insert failed", "err", err)
writeErr(w, http.StatusInternalServerError, "db write failed")
return
Expand Down
47 changes: 39 additions & 8 deletions internal/connecthandler/documents.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,23 @@ func (s *DocumentsService) CreateDocument(
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("content is required"))
}

// Idempotent ingest: a repeat CreateDocument carrying the same

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the idempotent ingest flow into a shared helper so the CreateDocument handler only wires inputs/outputs and error mapping.

You can reduce the added complexity by extracting the idempotency logic into a shared helper and keeping the Connect handler focused on protocol-specific wiring.

1. Extract transport-agnostic idempotent ingest helper

Introduce a helper that encapsulates:

  • Pre-insert lookup by idempotency key.
  • NewDocument insert (with IdempotencyKey).
  • Conflict resolution with source cleanup and winner lookup.

Example (adapt types/names as needed):

type IdempotentIngestResult struct {
	Document db.Document
	// Reused is true if we returned an existing doc instead of creating a new one.
	Reused   bool
}

func (s *Server) idempotentIngest(
	ctx context.Context,
	orgID string,
	idemKey string,
	doc db.Document,
	putSource func() (sourceKey string, err error),
	deleteSource func(ctx context.Context, key string) error,
) (IdempotentIngestResult, error) {
	if idemKey != "" {
		if existing, err := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); err == nil {
			return IdempotentIngestResult{Document: existing, Reused: true}, nil
		} else if !errors.Is(err, db.ErrNotFound) {
			return IdempotentIngestResult{}, fmt.Errorf("idempotency lookup: %w", err)
		}
	}

	key, err := putSource()
	if err != nil {
		return IdempotentIngestResult{}, fmt.Errorf("storage put: %w", err)
	}
	doc.SourceRef = key
	doc.IdempotencyKey = idemKey

	if err := s.db.NewDocument(ctx, doc); err != nil {
		if idemKey != "" && errors.Is(err, db.ErrConflict) {
			_ = deleteSource(ctx, key)
			if existing, lookupErr := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); lookupErr == nil {
				return IdempotentIngestResult{Document: existing, Reused: true}, nil
			}
		}
		return IdempotentIngestResult{}, fmt.Errorf("db insert: %w", err)
	}

	return IdempotentIngestResult{Document: doc, Reused: false}, nil
}

This keeps all current behavior (pre-lookup, conflict handling, source delete) but centralizes it.

2. Simplify CreateDocument to wiring + error mapping

Then your Connect handler becomes mostly data preparation and error mapping, instead of open-coding the idempotency flow:

idemKey := req.Header().Get("Idempotency-Key")

doc := db.Document{
	ID:          docID,
	OrgID:       orgID,
	StoreID:     storeIDFromConnect(req),
	Title:       title,
	ContentType: contentType,
	Status:      db.StatusPending,
	ByteSize:    size,
	// SourceRef and IdempotencyKey set in helper.
}

res, err := s.idempotentIngest(
	ctx,
	orgID,
	idemKey,
	doc,
	func() (string, error) {
		if err := s.storage.Put(ctx, key, reader, size); err != nil {
			return "", err
		}
		return key, nil
	},
	s.storage.Delete,
)
if err != nil {
	s.logger.Error("ingest: idempotent ingest failed", "err", err)
	return nil, connect.NewError(connect.CodeInternal, errors.New("ingest failed"))
}

return connect.NewResponse(&v1.CreateDocumentResponse{
	DocumentId: string(res.Document.ID),
	Status:     string(res.Document.Status),
}), nil

Benefits:

  • CreateDocument becomes “validate → call helper → build response”.
  • Idempotency logic is shared across Connect and REST handlers, so future changes (e.g., new status handling, logging) are done in one place.
  • The conflict + delete + re-lookup pattern is no longer duplicated and harder to get subtly out of sync.

// Idempotency-Key returns the original document rather than a duplicate
// (mirrors the REST path; closes the HAL-323 duplicate-upload bug for SDK
// callers that retry the RPC).
idemKey := req.Header().Get("Idempotency-Key")
if idemKey != "" {
if existing, err := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); err == nil {
return connect.NewResponse(&v1.CreateDocumentResponse{
DocumentId: string(existing.ID),
Status: string(existing.Status),
}), nil
} else if !errors.Is(err, db.ErrNotFound) {
s.logger.Error("ingest: idempotency lookup failed", "err", err)
return nil, connect.NewError(connect.CodeInternal, errors.New("db read failed"))
}
}

docID := ingest.NewDocumentID()
contentType := msg.ContentType
if contentType == "" {
Expand All @@ -105,15 +122,29 @@ func (s *DocumentsService) CreateDocument(
}

if err := s.db.NewDocument(ctx, db.Document{
ID: docID,
OrgID: orgID,
StoreID: storeIDFromConnect(req),
Title: title,
ContentType: contentType,
SourceRef: key,
Status: db.StatusPending,
ByteSize: size,
ID: docID,
OrgID: orgID,
StoreID: storeIDFromConnect(req),
Title: title,
ContentType: contentType,
SourceRef: key,
Status: db.StatusPending,
ByteSize: size,
IdempotencyKey: idemKey,
}); err != nil {
// Concurrent same-key insert won the race: drop the orphan source and
// return the winner instead of erroring.
if idemKey != "" && errors.Is(err, db.ErrConflict) {
if delErr := s.storage.Delete(ctx, key); delErr != nil {
s.logger.Warn("ingest: orphan source cleanup failed", "err", delErr, "source_ref", key, "idempotency_key", idemKey)
}
if existing, lookupErr := s.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); lookupErr == nil {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return connect.NewResponse(&v1.CreateDocumentResponse{
DocumentId: string(existing.ID),
Status: string(existing.Status),
}), nil
}
}
s.logger.Error("ingest: db insert failed", "err", err)
return nil, connect.NewError(connect.CodeInternal, errors.New("db write failed"))
}
Expand Down
51 changes: 43 additions & 8 deletions internal/handler/documents.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,25 @@ func (h *DocumentsHandler) HandleIngestDocument(w http.ResponseWriter, r *http.R
ctx := r.Context()
docID := ingest.NewDocumentID()

// Idempotent ingest: if the client supplied an Idempotency-Key and we

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the repeated idempotency lookup and response into a shared helper to simplify HandleIngestDocument and avoid duplicated logic.

You can reduce the new branching/duplication by extracting the idempotency lookup + response into a small helper used in both places (pre-insert and conflict path). That keeps behavior identical but flattens HandleIngestDocument and avoids repeating the same logic/JSON response.

For example:

func (h *DocumentsHandler) writeIdempotentDocIfExists(
    ctx context.Context,
    w http.ResponseWriter,
    orgID string,
    idemKey string,
) (bool, error) {
    if idemKey == "" {
        return false, nil
    }

    existing, err := h.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey)
    if err != nil {
        if errors.Is(err, db.ErrNotFound) {
            return false, nil
        }
        return false, err
    }

    writeJSON(w, http.StatusAccepted, map[string]any{
        "document_id": existing.ID,
        "status":      string(existing.Status),
    })
    return true, nil
}

Then HandleIngestDocument becomes:

idemKey := r.Header.Get("Idempotency-Key")
if ok, err := h.writeIdempotentDocIfExists(ctx, w, orgID, idemKey); err != nil {
    h.logger.Error("ingest: idempotency lookup failed", "err", err)
    writeErr(w, http.StatusInternalServerError, "db read failed")
    return
} else if ok {
    return
}

And the conflict branch reuses the same helper instead of duplicating the lookup/response:

if err := h.db.NewDocument(ctx, db.Document{
    ID:             docID,
    OrgID:          orgID,
    StoreID:        storeID(r),
    Title:          title,
    ContentType:    contentType,
    SourceRef:      key,
    Status:         db.StatusPending,
    ByteSize:       size,
    IdempotencyKey: idemKey,
}); err != nil {
    if idemKey != "" && errors.Is(err, db.ErrConflict) {
        _ = h.storage.Delete(ctx, key)
        if ok, lookupErr := h.writeIdempotentDocIfExists(ctx, w, orgID, idemKey); lookupErr == nil && ok {
            return
        }
        // fall through to generic error handling on lookup failure
    }
    h.logger.Error("ingest: db insert failed", "err", err)
    writeErr(w, http.StatusInternalServerError, "db write failed")
    return
}

This keeps all semantics (including logging, status codes, response shape) while centralizing the idempotency behavior and reducing the handler’s complexity.

// already have a document for this org under that key, return it instead
// of creating a duplicate. This makes a client/transport retry of the
// ingest POST (the HAL-323 duplicate-upload bug) a no-op.
idemKey := r.Header.Get("Idempotency-Key")
if idemKey != "" {
if existing, err := h.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); err == nil {
writeJSON(w, http.StatusAccepted, map[string]any{
"document_id": existing.ID,
"status": string(existing.Status),
})
return
} else if !errors.Is(err, db.ErrNotFound) {
h.logger.Error("ingest: idempotency lookup failed", "err", err)
writeErr(w, http.StatusInternalServerError, "db read failed")
return
}
}

Comment thread
coderabbitai[bot] marked this conversation as resolved.
var (
filename string
contentType string
Expand Down Expand Up @@ -227,15 +246,31 @@ func (h *DocumentsHandler) HandleIngestDocument(w http.ResponseWriter, r *http.R
}

if err := h.db.NewDocument(ctx, db.Document{
ID: docID,
OrgID: orgID,
StoreID: storeID(r),
Title: title,
ContentType: contentType,
SourceRef: key,
Status: db.StatusPending,
ByteSize: size,
ID: docID,
OrgID: orgID,
StoreID: storeID(r),
Title: title,
ContentType: contentType,
SourceRef: key,
Status: db.StatusPending,
ByteSize: size,
IdempotencyKey: idemKey,
}); err != nil {
// A concurrent request with the same Idempotency-Key won the race and
// inserted first. Drop the orphan source we just wrote and return the
// winner's document rather than a 500.
if idemKey != "" && errors.Is(err, db.ErrConflict) {
if delErr := h.storage.Delete(ctx, key); delErr != nil {
h.logger.Warn("ingest: orphan source cleanup failed", "err", delErr, "source_ref", key, "idempotency_key", idemKey)
}
if existing, lookupErr := h.db.GetDocumentByIdempotencyKey(ctx, orgID, idemKey); lookupErr == nil {
writeJSON(w, http.StatusAccepted, map[string]any{
"document_id": existing.ID,
"status": string(existing.Status),
})
return
}
}
h.logger.Error("ingest: db insert failed", "err", err)
writeErr(w, http.StatusInternalServerError, "db write failed")
return
Expand Down
13 changes: 11 additions & 2 deletions internal/middleware/idempotency.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,22 @@ func Idempotency(cfg IdempotencyConfig) func(http.Handler) http.Handler {
return
}

key := r.Header.Get("Idempotency-Key")
if key == "" {
rawKey := r.Header.Get("Idempotency-Key")
if rawKey == "" {
// No key provided — proceed normally.
next.ServeHTTP(w, r)
return
}

// Scope the cache entry by tenant + method + path, NOT the raw
// Idempotency-Key alone. Two orgs legitimately use the same opaque
// key (e.g. "1", a per-corpus id); replaying by the bare key would
// hand org B a 202 cached for org A's document — a cross-tenant
// leak. The org header is absent in standalone/local mode, which is
// single-tenant, so an empty org collapses to one namespace there.
org := r.Header.Get("X-Vectorless-Org")
key := r.Method + "|" + r.URL.Path + "|" + org + "|" + rawKey

// Check cache.
if cr, ok := cache.get(key); ok {
w.Header().Set("X-Idempotency-Replayed", "true")
Expand Down
89 changes: 89 additions & 0 deletions internal/middleware/idempotency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package middleware

import (
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
)

// handler that returns a per-org body and counts invocations, so we can tell a
// cached replay from a real pass-through.
func countingOrgHandler(calls *atomic.Int64) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
calls.Add(1)
w.WriteHeader(http.StatusAccepted)
_, _ = w.Write([]byte("doc-for-" + r.Header.Get("X-Vectorless-Org")))
})
}

func post(mw http.Handler, org, key string) *httptest.ResponseRecorder {
req := httptest.NewRequest(http.MethodPost, "/v1/documents", nil)
if org != "" {
req.Header.Set("X-Vectorless-Org", org)
}
if key != "" {
req.Header.Set("Idempotency-Key", key)
}
rec := httptest.NewRecorder()
mw.ServeHTTP(rec, req)
return rec
}

func body(rec *httptest.ResponseRecorder) string {
return rec.Body.String()
}

func TestIdempotencySameOrgReplays(t *testing.T) {
var calls atomic.Int64
mw := Idempotency(IdempotencyConfig{})(countingOrgHandler(&calls))

first := post(mw, "orgA", "k1")
second := post(mw, "orgA", "k1")

if calls.Load() != 1 {
t.Fatalf("handler invoked %d times, want 1 (second should replay)", calls.Load())
}
if got := second.Header().Get("X-Idempotency-Replayed"); got != "true" {
t.Fatalf("second response missing replay header, got %q", got)
}
if body(first) != body(second) {
t.Fatalf("replay body %q != original %q", body(second), body(first))
}
}

func TestIdempotencyDifferentOrgsDoNotCollide(t *testing.T) {
// The core cross-tenant guarantee: two orgs sending the SAME opaque
// Idempotency-Key must each hit the handler and get THEIR OWN response —
// org B must never be handed org A's cached document.
var calls atomic.Int64
mw := Idempotency(IdempotencyConfig{})(countingOrgHandler(&calls))

a := post(mw, "orgA", "shared-key")
b := post(mw, "orgB", "shared-key")

if calls.Load() != 2 {
t.Fatalf("handler invoked %d times, want 2 (no cross-org collision)", calls.Load())
}
if b.Header().Get("X-Idempotency-Replayed") == "true" {
t.Fatal("org B got a replayed response — cross-tenant leak")
}
if body(a) == body(b) {
t.Fatalf("org B received org A's body %q — cross-tenant leak", body(b))
}
if body(b) != "doc-for-orgB" {
t.Fatalf("org B body = %q, want doc-for-orgB", body(b))
}
}

func TestIdempotencyNoKeyAlwaysPassesThrough(t *testing.T) {
var calls atomic.Int64
mw := Idempotency(IdempotencyConfig{})(countingOrgHandler(&calls))

post(mw, "orgA", "")
post(mw, "orgA", "")

if calls.Load() != 2 {
t.Fatalf("handler invoked %d times, want 2 (no key = no caching)", calls.Load())
}
}
15 changes: 15 additions & 0 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ paths:
(field: "file") or JSON body with content. Returns immediately
with a document_id in "pending" state. The ingest pipeline runs
asynchronously: parse → build tree → summarize sections.

Pass an `Idempotency-Key` header to make ingestion idempotent: a
repeated request with the same key returns the original document
(and its current status) instead of creating a duplicate. Use this
to make a client/network retry of the upload safe.
parameters:
- name: Idempotency-Key
in: header
required: false
description: |
Opaque client-chosen key, unique per document. Repeating a
request with the same key returns the existing document rather
than ingesting a duplicate.
schema:
type: string
requestBody:
required: true
content:
Expand Down
9 changes: 9 additions & 0 deletions pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)

Expand All @@ -25,6 +26,10 @@ var migrationsFS embed.FS
// ErrNotFound signals a missing row.
var ErrNotFound = errors.New("db: not found")

// ErrConflict signals a unique-constraint violation (SQLSTATE 23505), e.g.
// a second insert with the same (org_id, idempotency_key).
var ErrConflict = errors.New("db: conflict")

// Pool wraps *pgxpool.Pool with engine-specific helpers.
type Pool struct {
*pgxpool.Pool
Expand Down Expand Up @@ -136,5 +141,9 @@ func mapErr(err error) error {
if errors.Is(err, pgx.ErrNoRows) {
return ErrNotFound
}
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == "23505" {
return fmt.Errorf("%w: %s", ErrConflict, pgErr.ConstraintName)
}
return err
}
Loading
Loading