From ea9a58ab1942105b806473f5f6a0013bd79f0322 Mon Sep 17 00:00:00 2001 From: Nuno Cruces Date: Tue, 16 Dec 2025 15:36:13 +0000 Subject: [PATCH] Remove singleflight. --- litestream/cache.go | 46 ++++++++++++++++++++++----------------------- litestream/go.mod | 4 +--- litestream/go.sum | 4 ++-- litestream/vfs.go | 7 ++----- 4 files changed, 27 insertions(+), 34 deletions(-) diff --git a/litestream/cache.go b/litestream/cache.go index dae5de8..0f4c111 100644 --- a/litestream/cache.go +++ b/litestream/cache.go @@ -1,19 +1,18 @@ package litestream import ( - "encoding/binary" + "context" + "fmt" "sync" - "golang.org/x/sync/singleflight" - + "github.com/benbjohnson/litestream" "github.com/superfly/ltx" ) type pageCache struct { - single singleflight.Group - pages map[uint32]cachedPage // +checklocks:mtx - size int - mtx sync.Mutex + pages map[uint32]cachedPage // +checklocks:mtx + size int + mtx sync.Mutex } type cachedPage struct { @@ -21,37 +20,36 @@ type cachedPage struct { txid ltx.TXID } -func (c *pageCache) getOrFetch(pgno uint32, maxTXID ltx.TXID, fetch func() (any, error)) ([]byte, error) { - if c.size >= 0 { +func (c *pageCache) getOrFetch(ctx context.Context, client ReplicaClient, pgno uint32, elem ltx.PageIndexElem) ([]byte, error) { + if c.size > 0 { c.mtx.Lock() - if c.pages == nil { - c.pages = map[uint32]cachedPage{} - } page := c.pages[pgno] c.mtx.Unlock() - if page.txid == maxTXID { + if page.txid == elem.MaxTXID { return page.data, nil } } - var key [12]byte - binary.LittleEndian.PutUint32(key[0:], pgno) - binary.LittleEndian.PutUint64(key[4:], uint64(maxTXID)) - v, err, _ := c.single.Do(string(key[:]), fetch) - + h, data, err := litestream.FetchPage(ctx, client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size) if err != nil { - return nil, err + return nil, fmt.Errorf("fetch page: %w", err) + } + if pgno != h.Pgno { + return nil, fmt.Errorf("fetch page: want %d, got %d", pgno, h.Pgno) } - page := cachedPage{v.([]byte), maxTXID} - if c.size >= 0 { + if c.size > 0 { c.mtx.Lock() - c.evict(len(page.data)) - c.pages[pgno] = page + if c.pages != nil { + c.evict(len(data)) + } else { + c.pages = map[uint32]cachedPage{} + } + c.pages[pgno] = cachedPage{data, elem.MaxTXID} c.mtx.Unlock() } - return page.data, nil + return data, nil } // +checklocks:c.mtx diff --git a/litestream/go.mod b/litestream/go.mod index bb815f0..d9c1571 100644 --- a/litestream/go.mod +++ b/litestream/go.mod @@ -4,10 +4,9 @@ go 1.24.4 require ( github.com/benbjohnson/litestream v0.5.3 - github.com/ncruces/go-sqlite3 v0.30.3 + github.com/ncruces/go-sqlite3 v0.30.4-0.20251216123455-0b46e74ea69b github.com/ncruces/wbt v0.2.0 github.com/superfly/ltx v0.5.0 - golang.org/x/sync v0.19.0 ) // github.com/ncruces/go-sqlite3 @@ -34,7 +33,6 @@ require ( github.com/prometheus/procfs v0.19.2 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect - golang.org/x/crypto v0.46.0 // indirect google.golang.org/protobuf v1.36.11 // indirect ) diff --git a/litestream/go.sum b/litestream/go.sum index 1d3cfc6..9f3cda9 100644 --- a/litestream/go.sum +++ b/litestream/go.sum @@ -103,8 +103,8 @@ github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/ncruces/go-sqlite3 v0.30.3 h1:X/CgWW9GzmIAkEPrifhKqf0cC15DuOVxAJaHFTTAURQ= -github.com/ncruces/go-sqlite3 v0.30.3/go.mod h1:AxKu9sRxkludimFocbktlY6LiYSkxiI5gTA8r+os/Nw= +github.com/ncruces/go-sqlite3 v0.30.4-0.20251216123455-0b46e74ea69b h1:0HG7ul3Q1d/E/jrZpBTpzx4xhxJwMKuq5J4nuZIogm8= +github.com/ncruces/go-sqlite3 v0.30.4-0.20251216123455-0b46e74ea69b/go.mod h1:wz6IQnveXfqaXZozfhM8ciIJi2LRnnifBuBQarPDYo0= github.com/ncruces/julianday v1.0.0 h1:fH0OKwa7NWvniGQtxdJRxAgkBMolni2BjDHaWTxqt7M= github.com/ncruces/julianday v1.0.0/go.mod h1:Dusn2KvZrrovOMJuOt0TNXL6tB7U2E8kvza5fFc9G7g= github.com/ncruces/litestream v0.5.3 h1:5qMwt99t2k0+AI1AGv7gY9QNkdVADWhzLp9zVcf2EGU= diff --git a/litestream/vfs.go b/litestream/vfs.go index a116dbf..1d12742 100644 --- a/litestream/vfs.go +++ b/litestream/vfs.go @@ -92,10 +92,7 @@ func (f *liteFile) ReadAt(p []byte, off int64) (n int, err error) { return 0, io.EOF } - data, err := f.db.cache.getOrFetch(pgno, elem.MaxTXID, func() (any, error) { - _, data, err := litestream.FetchPage(ctx, f.db.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size) - return data, err - }) + data, err := f.db.cache.getOrFetch(ctx, f.db.client, pgno, elem) if err != nil { f.db.opts.Logger.Error("fetch page", "error", err) return 0, err @@ -291,7 +288,7 @@ func (d *liteDB) buildIndex(ctx context.Context) error { defer d.mtx.Unlock() // Skip if we already have an index. - if d.pages != nil { + if !d.lastPoll.IsZero() { return nil }