diff --git a/keepcurrent_test.go b/keepcurrent_test.go index 5cfd116..a68c056 100644 --- a/keepcurrent_test.go +++ b/keepcurrent_test.go @@ -6,8 +6,11 @@ import ( "compress/gzip" "crypto/rand" "encoding/json" + "fmt" "io" "io/ioutil" + "net/http" + "net/http/httptest" "os" "strings" "sync/atomic" @@ -103,7 +106,13 @@ func writeTempFile(t *testing.T, b []byte) (string, []byte) { func TestUpdateFromWeb(t *testing.T) { ch := make(chan []byte) - url := "https://httpbin.org/get" + // Serve an httpbin-/get-shaped body locally so the test stays hermetic — + // the live httpbin.org dependency intermittently hung and timed out CI. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, `{"url": %q}`, "http://"+r.Host+r.URL.String()) + })) + defer srv.Close() + url := srv.URL + "/get" runner := New(FromWeb(url), ToChannel(ch)) runner.OnSourceError = func(err error, tries int) time.Duration { assert.Fail(t, "unexpected source error "+err.Error()) diff --git a/read_test.go b/read_test.go index 7bdc2f1..8d6f252 100644 --- a/read_test.go +++ b/read_test.go @@ -3,12 +3,59 @@ package keepcurrent import ( "bytes" "io" + "os" + "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +// FromFile (no preprocessor) must hand back a size-aware reader so the Runner's +// readAll pre-sizes its buffer rather than falling back to io.ReadAll's +// doubling growth — the cached-mmdb-at-startup case that regressed otherwise. +func TestFileSourceIsSizeAware(t *testing.T) { + payload := bytes.Repeat([]byte("m"), 3*1024*1024) // 3 MiB + path := filepath.Join(t.TempDir(), "cached.bin") + require.NoError(t, os.WriteFile(path, payload, 0o644)) + + rc, err := FromFile(path).Fetch(time.Time{}) + require.NoError(t, err) + defer rc.Close() + + n, ok := knownSize(rc) + assert.True(t, ok, "FromFile reader should report its size") + assert.EqualValues(t, len(payload), n) + + got, err := readAll(rc) + require.NoError(t, err) + assert.Equal(t, payload, got) + assert.LessOrEqual(t, cap(got), len(payload)+bytes.MinRead, "must be a single pre-sized allocation, not grown") +} + +// Fetch must honour its "only if modified since" contract: when ifNewerThan is +// at or after the file's modtime, it returns ErrUnmodified and leaves no reader +// (hence no leaked descriptor) for the caller to close. +func TestFileSourceUnmodified(t *testing.T) { + path := filepath.Join(t.TempDir(), "cached.bin") + require.NoError(t, os.WriteFile(path, []byte("payload"), 0o644)) + + fi, err := os.Stat(path) + require.NoError(t, err) + + // A cutoff strictly after the modtime => unmodified. + rc, err := FromFile(path).Fetch(fi.ModTime().Add(time.Hour)) + assert.ErrorIs(t, err, ErrUnmodified) + assert.Nil(t, rc) + + // A cutoff strictly before the modtime => the file is newer, so we fetch. + rc, err = FromFile(path).Fetch(fi.ModTime().Add(-time.Hour)) + require.NoError(t, err) + require.NotNil(t, rc) + require.NoError(t, rc.Close()) +} + // unsizedReader hides any size its wrapped reader might otherwise expose (it // implements only Read), forcing readAll down the io.ReadAll fallback path. type unsizedReader struct{ r io.Reader } diff --git a/source.go b/source.go index a0251cf..907a262 100644 --- a/source.go +++ b/source.go @@ -157,6 +157,22 @@ type fileSource struct { preprocessor func(io.ReadCloser) (io.ReadCloser, error) } +// fileBackedReadCloser ties a preprocessed stream's lifetime to the file it was +// derived from, closing both when Close is called. A double close of the file +// (when the preprocessor's stream already closes it) is tolerated. +type fileBackedReadCloser struct { + io.ReadCloser + f *os.File +} + +func (r fileBackedReadCloser) Close() error { + err := r.ReadCloser.Close() + if cerr := r.f.Close(); cerr != nil && !errors.Is(cerr, os.ErrClosed) && err == nil { + err = cerr + } + return err +} + // FromFile constructs a source from the given file path. func FromFile(path string) Source { return &fileSource{path, nil} @@ -174,18 +190,32 @@ func (s *fileSource) Fetch(ifNewerThan time.Time) (io.ReadCloser, error) { } fi, err := f.Stat() if err != nil { + f.Close() return nil, err } - if !ifNewerThan.IsZero() && ifNewerThan.Before(fi.ModTime()) { + // Fetch's contract is "fetch only if modified since ifNewerThan", so we + // signal ErrUnmodified when the file is no newer than the cutoff — mirroring + // byteSource.Fetch in the tests. (Earlier this comparison was inverted.) + if !ifNewerThan.IsZero() && !fi.ModTime().After(ifNewerThan) { + f.Close() return nil, ErrUnmodified } - var result io.ReadCloser - result = f if s.preprocessor != nil { - result, err = s.preprocessor(f) + // A preprocessor may change the byte count, so we can't declare a size + // up front; readAll falls back to io.ReadAll for the preprocessed stream. + result, err := s.preprocessor(f) if err != nil { + f.Close() return nil, err } - } - return result, nil + // The preprocessor may hand back a stream that doesn't close the file it + // wraps (e.g. io.NopCloser), so couple Close to the underlying file to + // avoid leaking the descriptor. + return fileBackedReadCloser{ReadCloser: result, f: f}, nil + } + // Surface the file size so the Runner's readAll pre-sizes its buffer instead + // of growing by reallocation. The cached payloads read here are exactly the + // large ones that churn — e.g. geo loads a ~75MB MaxMind mmdb from disk via + // InitFrom(FromFile(...)) at startup. + return sizedReadCloser{ReadCloser: f, n: fi.Size()}, nil }