From 1505af60d2781d2105e0a213a2450fee6e600baa Mon Sep 17 00:00:00 2001 From: Ilya Yakelzon Date: Tue, 16 Jun 2026 10:19:35 +0200 Subject: [PATCH 1/4] perf: pre-size reads and skip redundant copy to cut sync memory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Syncing a large source (e.g. the ~75MB MaxMind GeoLite2 mmdb that getlantern/geo pulls through FromTarGz(FromWeb(...)) into a ToChannel + ToFile pair) was buffering the whole payload with io.ReadAll up to three times per sync — once extracting the tarball, once in syncOnce, and once more in the byteChannel sink. io.ReadAll grows its buffer by repeated reallocation, so each of those reads churns through a sequence of ever-larger backing arrays before settling, and the byte-channel read was an entirely redundant second full copy of data the Runner already held. On 1GB proxy/VPS hosts this transient churn dominated RSS and tripped the "host memory >85%" alert. Two changes: - readAll(): reads into a buffer pre-sized from the reader's known length (HTTP Content-Length, archive entry size, or an in-memory reader's Len()), turning a multi-realloc read into a single allocation. webSource and tarGzSource now return size-aware readers so syncOnce benefits too. - bytesConsumer fast-path: byteChannel takes the payload the Runner already buffered directly instead of reading it into a second copy. The slice is read-only and shared across sinks (documented on bytesConsumer). No public API change. Adds read.go + tests. --- keepcurrent.go | 17 ++++++++++---- read.go | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++ read_test.go | 56 +++++++++++++++++++++++++++++++++++++++++++ sink.go | 21 +++++++++++++---- source.go | 15 +++++++++--- 5 files changed, 161 insertions(+), 12 deletions(-) create mode 100644 read.go create mode 100644 read_test.go diff --git a/keepcurrent.go b/keepcurrent.go index 9d816ae..b10b10a 100644 --- a/keepcurrent.go +++ b/keepcurrent.go @@ -7,7 +7,6 @@ import ( "bytes" "errors" "io" - "io/ioutil" "time" ) @@ -103,8 +102,10 @@ func (runner *Runner) syncOnce(from Source, chStop chan struct{}) { return } if err == nil { - // Read ahead to surface any error reading from the source - data, err = ioutil.ReadAll(rc) + // Read ahead to surface any error reading from the source. readAll + // pre-sizes its buffer when the source reports a length, avoiding the + // reallocation churn io.ReadAll incurs on large payloads. + data, err = readAll(rc) rc.Close() } if err == nil { @@ -125,7 +126,15 @@ func (runner *Runner) syncOnce(from Source, chStop chan struct{}) { } } for _, s := range runner.sinks { - if err := s.UpdateFrom(bytes.NewReader(data)); err != nil { + var err error + if bc, ok := s.(bytesConsumer); ok { + // Hand the already-buffered payload over directly rather than making + // the sink read it into a second full copy. + err = bc.updateFromBytes(data) + } else { + err = s.UpdateFrom(bytes.NewReader(data)) + } + if err != nil { runner.OnSinkError(s, err) } } diff --git a/read.go b/read.go new file mode 100644 index 0000000..c169fb6 --- /dev/null +++ b/read.go @@ -0,0 +1,64 @@ +package keepcurrent + +import ( + "bytes" + "io" +) + +// readAll reads r to EOF into a single buffer. It differs from io.ReadAll only +// in that, when r can report how many bytes remain, it allocates that buffer up +// front. io.ReadAll grows its buffer by repeatedly appending and reallocating, +// so reading an N-byte payload churns through a sequence of ever-larger backing +// arrays (N/2, 3N/4, N, ...) that all become garbage. For the multi-megabyte +// payloads keepcurrent is built to sync (e.g. a ~75MB MaxMind database) that +// transient churn dominates memory on small hosts. Pre-sizing turns the read +// into a single allocation. +func readAll(r io.Reader) ([]byte, error) { + if n, ok := knownSize(r); ok && n >= 0 { + // +bytes.MinRead leaves room for the final zero-byte read that signals + // EOF, so an exactly-sized payload never forces ReadFrom to reallocate. + buf := bytes.NewBuffer(make([]byte, 0, n+bytes.MinRead)) + _, err := buf.ReadFrom(r) + return buf.Bytes(), err + } + return io.ReadAll(r) +} + +// knownSize reports the number of bytes remaining in r when r can tell us. It +// recognises the sized readers keepcurrent constructs internally (sizedReadCloser) +// as well as the standard in-memory readers (*bytes.Reader, *bytes.Buffer, +// *strings.Reader) whose Len() reports the unread remainder. +func knownSize(r io.Reader) (int64, bool) { + switch v := r.(type) { + case interface{ size() int64 }: + return v.size(), true + case interface{ Len() int }: + return int64(v.Len()), true + } + return 0, false +} + +// sizedReadCloser couples a ReadCloser with the total number of bytes it will +// yield, so readAll can pre-size its buffer. keepcurrent wraps HTTP bodies +// (Content-Length) and extracted archive entries in this. +type sizedReadCloser struct { + io.ReadCloser + n int64 +} + +func (s sizedReadCloser) size() int64 { return s.n } + +// bytesReadCloser wraps an in-memory payload in a size-aware ReadCloser. +func bytesReadCloser(b []byte) io.ReadCloser { + return sizedReadCloser{ReadCloser: io.NopCloser(bytes.NewReader(b)), n: int64(len(b))} +} + +// bytesConsumer is an optional optimization a Sink may implement to receive the +// payload the Runner has already buffered, instead of being handed an io.Reader +// that it would have to read into a second full copy. The Runner does not retain +// or reuse the slice after the call, so a single sink may take ownership; when a +// caller wires up multiple sinks they share one backing array, so consumers must +// treat the bytes as read-only. +type bytesConsumer interface { + updateFromBytes(b []byte) error +} diff --git a/read_test.go b/read_test.go new file mode 100644 index 0000000..a5acfe8 --- /dev/null +++ b/read_test.go @@ -0,0 +1,56 @@ +package keepcurrent + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// countingReader wraps a reader and records the largest single Read it was +// asked to fill, which lets us distinguish a pre-sized read (one big Read) from +// io.ReadAll's incremental growth. +type unsizedReader struct{ r io.Reader } + +func (u *unsizedReader) Read(p []byte) (int, error) { return u.r.Read(p) } + +func TestReadAllPreSizesKnownReaders(t *testing.T) { + payload := bytes.Repeat([]byte("x"), 1<<20) // 1 MiB + + // *bytes.Reader reports Len(), so readAll should allocate exactly once and + // not overshoot the way io.ReadAll's doubling does. + got, err := readAll(bytes.NewReader(payload)) + require.NoError(t, err) + assert.Equal(t, payload, got) + assert.Equalf(t, len(payload)+bytes.MinRead, cap(got), + "buffer for a sized reader should be pre-allocated to the payload size, not grown") + + // sizedReadCloser (what the web/tar.gz sources return) is also recognised. + got, err = readAll(bytesReadCloser(payload)) + require.NoError(t, err) + assert.Equal(t, payload, got) + assert.Equal(t, len(payload)+bytes.MinRead, cap(got)) +} + +func TestReadAllFallsBackForUnsizedReaders(t *testing.T) { + payload := bytes.Repeat([]byte("y"), 4096) + // An opaque reader exposes no size; readAll must still return the full data. + got, err := readAll(&unsizedReader{bytes.NewReader(payload)}) + require.NoError(t, err) + assert.Equal(t, payload, got) +} + +func TestKnownSize(t *testing.T) { + n, ok := knownSize(bytes.NewReader(make([]byte, 42))) + assert.True(t, ok) + assert.EqualValues(t, 42, n) + + n, ok = knownSize(bytesReadCloser(make([]byte, 7))) + assert.True(t, ok) + assert.EqualValues(t, 7, n) + + _, ok = knownSize(&unsizedReader{}) + assert.False(t, ok) +} diff --git a/sink.go b/sink.go index 7dbdb59..cae2198 100644 --- a/sink.go +++ b/sink.go @@ -73,7 +73,22 @@ func ToChannel(ch chan []byte) Sink { return &byteChannel{ch} } -func (s *byteChannel) UpdateFrom(r io.Reader) (err error) { +func (s *byteChannel) UpdateFrom(r io.Reader) error { + b, err := readAll(r) + if err != nil { + return err + } + return s.send(b) +} + +// updateFromBytes implements bytesConsumer: the Runner hands us the payload it +// already buffered, so we forward it to the channel without a redundant copy. +// See bytesConsumer for the read-only ownership contract. +func (s *byteChannel) updateFromBytes(b []byte) error { + return s.send(b) +} + +func (s *byteChannel) send(b []byte) (err error) { // The channel is owned by the caller (see ToChannel) and may be closed // concurrently — e.g. on shutdown or config reload — while a Runner is // mid-sync. A send on a closed channel panics, which would crash the whole @@ -89,10 +104,6 @@ func (s *byteChannel) UpdateFrom(r io.Reader) (err error) { panic(rec) } }() - b, err := ioutil.ReadAll(r) - if err != nil { - return err - } s.ch <- b return nil } diff --git a/source.go b/source.go index ea0bd5a..60ca7c0 100644 --- a/source.go +++ b/source.go @@ -58,6 +58,10 @@ func (s *webSource) Fetch(ifNewerThan time.Time) (io.ReadCloser, error) { if etag != "" { s.setETag(etag) } + if resp.ContentLength >= 0 { + // Surface the Content-Length so the Runner can pre-size its read buffer. + return sizedReadCloser{ReadCloser: resp.Body, n: resp.ContentLength}, nil + } return resp.Body, nil } @@ -112,17 +116,22 @@ func (s *tarGzSource) Fetch(ifNewerThan time.Time) (io.ReadCloser, error) { return err } defer f.Close() - buf, err = io.ReadAll(f) - if err != nil { + // Pre-size the buffer from the archive entry's uncompressed size so + // extracting a large file (e.g. a ~75MB mmdb) is a single allocation + // rather than the reallocation churn of io.ReadAll. + bb := bytes.NewBuffer(make([]byte, 0, info.Size()+bytes.MinRead)) + if _, err := bb.ReadFrom(f); err != nil { return err } + buf = bb.Bytes() return errFound } return nil }) if errors.Is(err, errFound) { - return io.NopCloser(bytes.NewReader(buf)), nil + // Return a size-aware reader so the Runner's read can also be pre-sized. + return bytesReadCloser(buf), nil } if err != nil { return nil, err From 9cd04f52815eb93f55e60ba0bdea0411490bd118 Mon Sep 17 00:00:00 2001 From: Ilya Yakelzon Date: Tue, 16 Jun 2026 10:43:06 +0200 Subject: [PATCH 2/4] =?UTF-8?q?review:=20address=20Copilot=20=E2=80=94=20o?= =?UTF-8?q?verflow=20guard,=20reuse=20readAll=20in=20tarGz,=20doc/comment?= =?UTF-8?q?=20fixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - readAll: cap pre-allocation at 256 MiB and convert to int, so a bogus size can't panic make() (falls back to io.ReadAll). The original n+bytes.MinRead did compile — bytes.MinRead is an untyped constant — but the guard hardens against hostile/garbage sizes. - tarGzSource: reuse readAll(sizedReadCloser{...}) instead of a bespoke bytes.Buffer, inheriting the same pre-size + overflow handling. - bytesConsumer doc: state plainly that the slice is read-only (Runner may share it across sinks); consumers must copy to retain/mutate. - read_test: fix stale countingReader comment on unsizedReader. --- read.go | 19 +++++++++++++------ read_test.go | 5 ++--- source.go | 13 ++++++------- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/read.go b/read.go index c169fb6..264ba3b 100644 --- a/read.go +++ b/read.go @@ -5,6 +5,14 @@ import ( "io" ) +// maxPreAlloc caps how much readAll will allocate up front from a reader's +// self-reported size. It guards against a bogus or hostile size (e.g. a wildly +// inflated HTTP Content-Length) turning into a giant make() that panics or +// OOMs; anything larger falls back to io.ReadAll, which grows against the bytes +// actually delivered. The bound sits comfortably above the payloads keepcurrent +// syncs in practice (a MaxMind database is well under 100MB). +const maxPreAlloc = 256 << 20 // 256 MiB + // readAll reads r to EOF into a single buffer. It differs from io.ReadAll only // in that, when r can report how many bytes remain, it allocates that buffer up // front. io.ReadAll grows its buffer by repeatedly appending and reallocating, @@ -14,10 +22,10 @@ import ( // transient churn dominates memory on small hosts. Pre-sizing turns the read // into a single allocation. func readAll(r io.Reader) ([]byte, error) { - if n, ok := knownSize(r); ok && n >= 0 { + if n, ok := knownSize(r); ok && n >= 0 && n <= maxPreAlloc { // +bytes.MinRead leaves room for the final zero-byte read that signals // EOF, so an exactly-sized payload never forces ReadFrom to reallocate. - buf := bytes.NewBuffer(make([]byte, 0, n+bytes.MinRead)) + buf := bytes.NewBuffer(make([]byte, 0, int(n)+bytes.MinRead)) _, err := buf.ReadFrom(r) return buf.Bytes(), err } @@ -55,10 +63,9 @@ func bytesReadCloser(b []byte) io.ReadCloser { // bytesConsumer is an optional optimization a Sink may implement to receive the // payload the Runner has already buffered, instead of being handed an io.Reader -// that it would have to read into a second full copy. The Runner does not retain -// or reuse the slice after the call, so a single sink may take ownership; when a -// caller wires up multiple sinks they share one backing array, so consumers must -// treat the bytes as read-only. +// it would have to read into a second full copy. The Runner may pass the same +// slice to several sinks, so the bytes MUST be treated as read-only; a consumer +// that needs to retain or mutate them must make its own copy. type bytesConsumer interface { updateFromBytes(b []byte) error } diff --git a/read_test.go b/read_test.go index a5acfe8..5b5bdd5 100644 --- a/read_test.go +++ b/read_test.go @@ -9,9 +9,8 @@ import ( "github.com/stretchr/testify/require" ) -// countingReader wraps a reader and records the largest single Read it was -// asked to fill, which lets us distinguish a pre-sized read (one big Read) from -// io.ReadAll's incremental growth. +// unsizedReader hides any size its wrapped reader might otherwise expose (it +// implements only Read), forcing readAll down the io.ReadAll fallback path. type unsizedReader struct{ r io.Reader } func (u *unsizedReader) Read(p []byte) (int, error) { return u.r.Read(p) } diff --git a/source.go b/source.go index 60ca7c0..84eb6c1 100644 --- a/source.go +++ b/source.go @@ -1,7 +1,6 @@ package keepcurrent import ( - "bytes" "context" "errors" "fmt" @@ -116,14 +115,14 @@ func (s *tarGzSource) Fetch(ifNewerThan time.Time) (io.ReadCloser, error) { return err } defer f.Close() - // Pre-size the buffer from the archive entry's uncompressed size so - // extracting a large file (e.g. a ~75MB mmdb) is a single allocation - // rather than the reallocation churn of io.ReadAll. - bb := bytes.NewBuffer(make([]byte, 0, info.Size()+bytes.MinRead)) - if _, err := bb.ReadFrom(f); err != nil { + // Wrap the entry in a size-aware reader so readAll pre-sizes the + // buffer from the archive entry's uncompressed size — extracting a + // large file (e.g. a ~75MB mmdb) becomes a single allocation rather + // than the reallocation churn of io.ReadAll. + buf, err = readAll(sizedReadCloser{ReadCloser: f, n: info.Size()}) + if err != nil { return err } - buf = bb.Bytes() return errFound } return nil From 8cf4c4a07dab32369bdade438146ff60bd38e093 Mon Sep 17 00:00:00 2001 From: Ilya Yakelzon Date: Tue, 16 Jun 2026 11:01:34 +0200 Subject: [PATCH 3/4] review: close resp.Body on non-success, clarify bytesConsumer is internal, test cap fallback - webSource.Fetch now closes resp.Body on the 304 and non-200 early returns so connections are released for keep-alive reuse. - bytesConsumer doc: state it's an internal optimization for keepcurrent's own sinks; the unexported method means external sinks can't implement it and always use Sink.UpdateFrom. - Add TestReadAllFallsBackWhenSizeExceedsCap covering a reader that reports a size > maxPreAlloc while delivering a small payload. --- read.go | 13 ++++++++----- read_test.go | 18 ++++++++++++++++++ source.go | 4 ++++ 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/read.go b/read.go index 264ba3b..7cb606d 100644 --- a/read.go +++ b/read.go @@ -61,11 +61,14 @@ func bytesReadCloser(b []byte) io.ReadCloser { return sizedReadCloser{ReadCloser: io.NopCloser(bytes.NewReader(b)), n: int64(len(b))} } -// bytesConsumer is an optional optimization a Sink may implement to receive the -// payload the Runner has already buffered, instead of being handed an io.Reader -// it would have to read into a second full copy. The Runner may pass the same -// slice to several sinks, so the bytes MUST be treated as read-only; a consumer -// that needs to retain or mutate them must make its own copy. +// bytesConsumer is an internal optimization implemented by keepcurrent's own +// sinks (currently byteChannel). When a sink implements it, the Runner hands +// over the payload it has already buffered instead of an io.Reader the sink +// would read into a second full copy. The method is unexported by design, so +// sinks defined outside this package cannot implement it and always go through +// the Sink.UpdateFrom path. The Runner may pass the same slice to several sinks, +// so the bytes MUST be treated as read-only; a consumer that needs to retain or +// mutate them must make its own copy. type bytesConsumer interface { updateFromBytes(b []byte) error } diff --git a/read_test.go b/read_test.go index 5b5bdd5..7bdc2f1 100644 --- a/read_test.go +++ b/read_test.go @@ -41,6 +41,24 @@ func TestReadAllFallsBackForUnsizedReaders(t *testing.T) { assert.Equal(t, payload, got) } +func TestReadAllFallsBackWhenSizeExceedsCap(t *testing.T) { + // A reader that reports a huge size (e.g. a bogus/hostile Content-Length) + // but only delivers a small payload. readAll must not attempt the giant + // pre-allocation; it should fall back to io.ReadAll and still return the + // real bytes. + payload := bytes.Repeat([]byte("z"), 1024) + r := sizedReadCloser{ReadCloser: io.NopCloser(bytes.NewReader(payload)), n: maxPreAlloc + 1} + + n, ok := knownSize(r) + require.True(t, ok) + require.Greater(t, n, int64(maxPreAlloc)) + + got, err := readAll(r) + require.NoError(t, err) + assert.Equal(t, payload, got) + assert.LessOrEqual(t, cap(got), maxPreAlloc, "must not pre-allocate the reported (bogus) size") +} + func TestKnownSize(t *testing.T) { n, ok := knownSize(bytes.NewReader(make([]byte, 42))) assert.True(t, ok) diff --git a/source.go b/source.go index 84eb6c1..a87dc08 100644 --- a/source.go +++ b/source.go @@ -48,9 +48,13 @@ func (s *webSource) Fetch(ifNewerThan time.Time) (io.ReadCloser, error) { return nil, err } if resp.StatusCode == http.StatusNotModified { + // Drain+close so the connection can be reused for keep-alive; we return + // the body to the caller only on the success path below. + resp.Body.Close() return nil, ErrUnmodified } if resp.StatusCode != http.StatusOK { + resp.Body.Close() return nil, fmt.Errorf("unexpected HTTP status %v", resp.StatusCode) } etag := resp.Header.Get("ETag") From 117d2aaed5caa44246f3329423a0a408455dbf7b Mon Sep 17 00:00:00 2001 From: Ilya Yakelzon Date: Tue, 16 Jun 2026 11:12:14 +0200 Subject: [PATCH 4/4] review: drop no-copy fast-path, drain bodies before close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove the bytesConsumer fast-path entirely. It forwarded the Runner's shared buffer to an external channel, dropping ToChannel's independent- ownership guarantee (a receiver mutating or concurrently reading the slice could corrupt other sinks). byteChannel.UpdateFrom now copies again — but via readAll, so it's a single pre-sized allocation. The memory spike came from io.ReadAll's doubling-realloc churn, which pre-sizing fixes; the copy-elimination was marginal and not worth the semantic change. - webSource.Fetch: drain bodies to EOF before closing on the 304 / non-200 paths (via drainClose) so net/http can actually reuse the connection. Comment now matches behavior. --- keepcurrent.go | 10 +--------- read.go | 12 ------------ sink.go | 26 ++++++++++---------------- source.go | 18 ++++++++++++++---- 4 files changed, 25 insertions(+), 41 deletions(-) diff --git a/keepcurrent.go b/keepcurrent.go index b10b10a..f5b1381 100644 --- a/keepcurrent.go +++ b/keepcurrent.go @@ -126,15 +126,7 @@ func (runner *Runner) syncOnce(from Source, chStop chan struct{}) { } } for _, s := range runner.sinks { - var err error - if bc, ok := s.(bytesConsumer); ok { - // Hand the already-buffered payload over directly rather than making - // the sink read it into a second full copy. - err = bc.updateFromBytes(data) - } else { - err = s.UpdateFrom(bytes.NewReader(data)) - } - if err != nil { + if err := s.UpdateFrom(bytes.NewReader(data)); err != nil { runner.OnSinkError(s, err) } } diff --git a/read.go b/read.go index 7cb606d..6e4fc78 100644 --- a/read.go +++ b/read.go @@ -60,15 +60,3 @@ func (s sizedReadCloser) size() int64 { return s.n } func bytesReadCloser(b []byte) io.ReadCloser { return sizedReadCloser{ReadCloser: io.NopCloser(bytes.NewReader(b)), n: int64(len(b))} } - -// bytesConsumer is an internal optimization implemented by keepcurrent's own -// sinks (currently byteChannel). When a sink implements it, the Runner hands -// over the payload it has already buffered instead of an io.Reader the sink -// would read into a second full copy. The method is unexported by design, so -// sinks defined outside this package cannot implement it and always go through -// the Sink.UpdateFrom path. The Runner may pass the same slice to several sinks, -// so the bytes MUST be treated as read-only; a consumer that needs to retain or -// mutate them must make its own copy. -type bytesConsumer interface { - updateFromBytes(b []byte) error -} diff --git a/sink.go b/sink.go index cae2198..38c899d 100644 --- a/sink.go +++ b/sink.go @@ -73,22 +73,7 @@ func ToChannel(ch chan []byte) Sink { return &byteChannel{ch} } -func (s *byteChannel) UpdateFrom(r io.Reader) error { - b, err := readAll(r) - if err != nil { - return err - } - return s.send(b) -} - -// updateFromBytes implements bytesConsumer: the Runner hands us the payload it -// already buffered, so we forward it to the channel without a redundant copy. -// See bytesConsumer for the read-only ownership contract. -func (s *byteChannel) updateFromBytes(b []byte) error { - return s.send(b) -} - -func (s *byteChannel) send(b []byte) (err error) { +func (s *byteChannel) UpdateFrom(r io.Reader) (err error) { // The channel is owned by the caller (see ToChannel) and may be closed // concurrently — e.g. on shutdown or config reload — while a Runner is // mid-sync. A send on a closed channel panics, which would crash the whole @@ -104,6 +89,15 @@ func (s *byteChannel) send(b []byte) (err error) { panic(rec) } }() + // readAll pre-sizes from the reader's length (the Runner hands us a + // *bytes.Reader), so this copy is a single allocation. We deliberately copy + // rather than forward the Runner's buffer: ToChannel's contract is that each + // delivered slice is independently owned, so consumers may retain or mutate + // it freely. + b, err := readAll(r) + if err != nil { + return err + } s.ch <- b return nil } diff --git a/source.go b/source.go index a87dc08..a0251cf 100644 --- a/source.go +++ b/source.go @@ -21,6 +21,14 @@ type webSource struct { client *http.Client } +// drainClose discards any remaining body and closes it. net/http only returns a +// connection to the keep-alive pool once its response body has been read to EOF, +// so error/not-modified responses we don't hand to the caller must be drained. +func drainClose(rc io.ReadCloser) { + _, _ = io.Copy(io.Discard, rc) + _ = rc.Close() +} + // FromWeb constructs a source from the given URL. func FromWeb(url string) Source { return FromWebWithClient(url, http.DefaultClient) @@ -48,13 +56,15 @@ func (s *webSource) Fetch(ifNewerThan time.Time) (io.ReadCloser, error) { return nil, err } if resp.StatusCode == http.StatusNotModified { - // Drain+close so the connection can be reused for keep-alive; we return - // the body to the caller only on the success path below. - resp.Body.Close() + // Drain to EOF then close so net/http can return the connection to the + // pool for keep-alive reuse (it won't reuse one whose body wasn't fully + // read). 304 carries no body, so this is effectively just a close here. + // We hand the body to the caller only on the success path below. + drainClose(resp.Body) return nil, ErrUnmodified } if resp.StatusCode != http.StatusOK { - resp.Body.Close() + drainClose(resp.Body) return nil, fmt.Errorf("unexpected HTTP status %v", resp.StatusCode) } etag := resp.Header.Get("ETag")