-
Notifications
You must be signed in to change notification settings - Fork 5
fix(snapshot): make snapshot serving range-aware #356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -320,7 +320,24 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, | |
| } | ||
| s.maybeBackgroundFetch(repo) | ||
|
|
||
| reader, headers, err := s.cache.Open(ctx, cacheKey) | ||
| // Forward only Range/If-Range here; If-Match/If-None-Match are evaluated | ||
| // against the served body below via CheckConditionals. | ||
| reader, headers, err := s.cache.Open(ctx, cacheKey, httputil.RangeOptions(r)...) | ||
| if errors.Is(err, cache.ErrRangeNotSatisfiable) { | ||
| // A failed If-Match (412) or satisfied If-None-Match (304) takes | ||
| // precedence over an unsatisfiable range (RFC 7232 §3, RFC 7233 §3.1). | ||
| if status := httputil.CheckConditionals(r, headers.Get(cache.ETagKey)); status != 0 { | ||
| w.WriteHeader(status) | ||
| return | ||
| } | ||
| w.Header().Set("Content-Type", "application/zstd") | ||
| w.Header().Set("Accept-Ranges", "bytes") | ||
| if cr := headers.Get("Content-Range"); cr != "" { | ||
| w.Header().Set("Content-Range", cr) | ||
| } | ||
| w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) | ||
| return | ||
| } | ||
| if err != nil && !errors.Is(err, os.ErrNotExist) { | ||
| logger.ErrorContext(ctx, "Failed to open snapshot from cache", "upstream", upstreamURL, "error", err) | ||
| http.Error(w, "Internal server error", http.StatusInternalServerError) | ||
|
|
@@ -517,23 +534,42 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h | |
| } | ||
|
|
||
| func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, r *http.Request, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL, repoName string, start time.Time) error { | ||
| snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit") | ||
| mirrorHead := s.getMirrorHead(ctx, repo) | ||
|
|
||
| // Forward the snapshot commit to the client so it knows whether the | ||
| // snapshot is fresh (no bundle URL = already at HEAD, skip freshen). | ||
| if snapshotCommit != "" { | ||
| w.Header().Set("X-Cachew-Snapshot-Commit", snapshotCommit) | ||
| // If-Match/If-None-Match are evaluated before serving any body: a satisfied | ||
| // If-None-Match (304) or a failed If-Match (412) takes precedence over a | ||
| // range response, so revalidating clients are not handed a 206 they would | ||
| // have to discard (RFC 7232 §3, RFC 7233 §3.1). | ||
| if status := httputil.CheckConditionals(r, headers.Get(cache.ETagKey)); status != 0 { | ||
| w.WriteHeader(status) | ||
| s.metrics.recordSnapshotServe(ctx, "cache", repoName, 0, time.Since(start)) | ||
| if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { | ||
| span.SetAttributes(attribute.String("cachew.source", "cache"), attribute.Int64("cachew.bytes", 0)) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| if snapshotCommit != "" && mirrorHead != "" && snapshotCommit != mirrorHead { | ||
| repoPath, err := gitclone.RepoPathFromURL(upstreamURL) | ||
| if err == nil { | ||
| bundleURL := fmt.Sprintf("/git/%s/snapshot.bundle?base=%s", repoPath, snapshotCommit) | ||
| w.Header().Set("X-Cachew-Bundle-Url", bundleURL) | ||
| // A satisfied byte range is served as-is. Bundle negotiation applies only to | ||
| // whole-snapshot downloads, so a partial read (e.g. a client.ParallelGet | ||
| // chunk) skips it and returns 206 directly. | ||
| if cr := headers.Get("Content-Range"); cr != "" { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a client sends a satisfiable Useful? React with 👍 / 👎. |
||
| applySnapshotCacheHeaders(w, headers) | ||
| w.Header().Set("Accept-Ranges", "bytes") | ||
| w.Header().Set("Content-Range", cr) | ||
| // Ranged clients (client.ParallelGet) read the freshen metadata from the | ||
| // discovery chunk, so it must be present on partial responses too. | ||
| s.setSnapshotMetadataHeaders(ctx, w, headers, repo, upstreamURL) | ||
| w.WriteHeader(http.StatusPartialContent) | ||
| n, err := io.Copy(w, reader) | ||
| s.metrics.recordSnapshotServe(ctx, "cache_range", repoName, n, time.Since(start)) | ||
| if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { | ||
| span.SetAttributes(attribute.String("cachew.source", "cache_range"), attribute.Int64("cachew.bytes", n)) | ||
| } | ||
| return errors.Wrap(err, "stream snapshot range") | ||
| } | ||
|
|
||
| snapshotCommit, bundleURL := s.setSnapshotMetadataHeaders(ctx, w, headers, repo, upstreamURL) | ||
|
|
||
| // Proactively generate and cache the bundle so any pod can serve it. | ||
| // Proactively generate and cache the advertised bundle so any pod can serve it. | ||
| if bundleURL != "" { | ||
| go func() { | ||
| bgCtx := context.WithoutCancel(ctx) | ||
| logger := logging.FromContext(bgCtx) | ||
|
|
@@ -550,25 +586,43 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW | |
| } | ||
|
|
||
| applySnapshotCacheHeaders(w, headers) | ||
| w.Header().Set("Accept-Ranges", "bytes") | ||
|
|
||
| // Honour conditional GETs against the advertised ETag. ServeContent does this | ||
| // natively for *os.File readers, but cache backends returning non-file readers | ||
| // (S3, memory, remote) fall through to io.Copy, so revalidate explicitly to | ||
| // avoid streaming the full snapshot when the client already has it. | ||
| var n int64 | ||
| var err error | ||
| if status := httputil.CheckConditionals(r, headers.Get(cache.ETagKey)); status != 0 { | ||
| w.WriteHeader(status) | ||
| } else { | ||
| n, err = serveReaderFast(w, r, reader) | ||
| } | ||
| n, err := serveReaderFast(w, r, reader) | ||
| s.metrics.recordSnapshotServe(ctx, "cache", repoName, n, time.Since(start)) | ||
| if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { | ||
| span.SetAttributes(attribute.String("cachew.source", "cache"), attribute.Int64("cachew.bytes", n)) | ||
| } | ||
| return errors.Wrap(err, "stream snapshot") | ||
| } | ||
|
|
||
| // setSnapshotMetadataHeaders advertises the snapshot's commit and, when the | ||
| // snapshot trails the mirror's HEAD, the delta-bundle URL clients use to | ||
| // fast-forward. Shared by the full and ranged serve paths so ranged clients | ||
| // (client.ParallelGet) receive the same freshen metadata on the discovery | ||
| // chunk. It returns the snapshot commit and the bundle URL it set (empty when | ||
| // the snapshot is already at HEAD), so callers can decide whether to | ||
| // pre-generate the bundle. | ||
| func (s *Strategy) setSnapshotMetadataHeaders(ctx context.Context, w http.ResponseWriter, headers http.Header, repo *gitclone.Repository, upstreamURL string) (snapshotCommit, bundleURL string) { | ||
| snapshotCommit = headers.Get("X-Cachew-Snapshot-Commit") | ||
| if snapshotCommit == "" { | ||
| return "", "" | ||
| } | ||
| w.Header().Set("X-Cachew-Snapshot-Commit", snapshotCommit) | ||
|
|
||
| mirrorHead := s.getMirrorHead(ctx, repo) | ||
| if mirrorHead == "" || snapshotCommit == mirrorHead { | ||
| return snapshotCommit, "" | ||
| } | ||
| repoPath, err := gitclone.RepoPathFromURL(upstreamURL) | ||
| if err != nil { | ||
| return snapshotCommit, "" | ||
| } | ||
| bundleURL = fmt.Sprintf("/git/%s/snapshot.bundle?base=%s", repoPath, snapshotCommit) | ||
| w.Header().Set("X-Cachew-Bundle-Url", bundleURL) | ||
| return snapshotCommit, bundleURL | ||
| } | ||
|
|
||
| // applySnapshotCacheHeaders forwards the cached snapshot's validators so clients | ||
| // can revalidate (ETag) and size the transfer (Content-Length). Content-Type is | ||
| // fixed for snapshots regardless of what the cache backend recorded. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a GET or ranged GET includes a matching
If-None-Match, this early return writes the 304 beforeapplySnapshotCacheHeadersorsetSnapshotMetadataHeadersrun. The previous GET path populated the ETag andX-Cachew-*headers before the conditional status, and stale snapshots still needX-Cachew-Bundle-Urlso a client revalidating a cached tarball can apply the delta instead of treating that cached snapshot as current.Useful? React with 👍 / 👎.