Remove singleflight.

This commit is contained in:
Nuno Cruces
2025-12-16 15:36:13 +00:00
parent 0b46e74ea6
commit ea9a58ab19
4 changed files with 27 additions and 34 deletions

View File

@@ -1,19 +1,18 @@
package litestream
import (
"encoding/binary"
"context"
"fmt"
"sync"
"golang.org/x/sync/singleflight"
"github.com/benbjohnson/litestream"
"github.com/superfly/ltx"
)
type pageCache struct {
single singleflight.Group
pages map[uint32]cachedPage // +checklocks:mtx
size int
mtx sync.Mutex
pages map[uint32]cachedPage // +checklocks:mtx
size int
mtx sync.Mutex
}
type cachedPage struct {
@@ -21,37 +20,36 @@ type cachedPage struct {
txid ltx.TXID
}
func (c *pageCache) getOrFetch(pgno uint32, maxTXID ltx.TXID, fetch func() (any, error)) ([]byte, error) {
if c.size >= 0 {
func (c *pageCache) getOrFetch(ctx context.Context, client ReplicaClient, pgno uint32, elem ltx.PageIndexElem) ([]byte, error) {
if c.size > 0 {
c.mtx.Lock()
if c.pages == nil {
c.pages = map[uint32]cachedPage{}
}
page := c.pages[pgno]
c.mtx.Unlock()
if page.txid == maxTXID {
if page.txid == elem.MaxTXID {
return page.data, nil
}
}
var key [12]byte
binary.LittleEndian.PutUint32(key[0:], pgno)
binary.LittleEndian.PutUint64(key[4:], uint64(maxTXID))
v, err, _ := c.single.Do(string(key[:]), fetch)
h, data, err := litestream.FetchPage(ctx, client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size)
if err != nil {
return nil, err
return nil, fmt.Errorf("fetch page: %w", err)
}
if pgno != h.Pgno {
return nil, fmt.Errorf("fetch page: want %d, got %d", pgno, h.Pgno)
}
page := cachedPage{v.([]byte), maxTXID}
if c.size >= 0 {
if c.size > 0 {
c.mtx.Lock()
c.evict(len(page.data))
c.pages[pgno] = page
if c.pages != nil {
c.evict(len(data))
} else {
c.pages = map[uint32]cachedPage{}
}
c.pages[pgno] = cachedPage{data, elem.MaxTXID}
c.mtx.Unlock()
}
return page.data, nil
return data, nil
}
// +checklocks:c.mtx

View File

@@ -4,10 +4,9 @@ go 1.24.4
require (
github.com/benbjohnson/litestream v0.5.3
github.com/ncruces/go-sqlite3 v0.30.3
github.com/ncruces/go-sqlite3 v0.30.4-0.20251216123455-0b46e74ea69b
github.com/ncruces/wbt v0.2.0
github.com/superfly/ltx v0.5.0
golang.org/x/sync v0.19.0
)
// github.com/ncruces/go-sqlite3
@@ -34,7 +33,6 @@ require (
github.com/prometheus/procfs v0.19.2 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
golang.org/x/crypto v0.46.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
)

View File

@@ -103,8 +103,8 @@ github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/ncruces/go-sqlite3 v0.30.3 h1:X/CgWW9GzmIAkEPrifhKqf0cC15DuOVxAJaHFTTAURQ=
github.com/ncruces/go-sqlite3 v0.30.3/go.mod h1:AxKu9sRxkludimFocbktlY6LiYSkxiI5gTA8r+os/Nw=
github.com/ncruces/go-sqlite3 v0.30.4-0.20251216123455-0b46e74ea69b h1:0HG7ul3Q1d/E/jrZpBTpzx4xhxJwMKuq5J4nuZIogm8=
github.com/ncruces/go-sqlite3 v0.30.4-0.20251216123455-0b46e74ea69b/go.mod h1:wz6IQnveXfqaXZozfhM8ciIJi2LRnnifBuBQarPDYo0=
github.com/ncruces/julianday v1.0.0 h1:fH0OKwa7NWvniGQtxdJRxAgkBMolni2BjDHaWTxqt7M=
github.com/ncruces/julianday v1.0.0/go.mod h1:Dusn2KvZrrovOMJuOt0TNXL6tB7U2E8kvza5fFc9G7g=
github.com/ncruces/litestream v0.5.3 h1:5qMwt99t2k0+AI1AGv7gY9QNkdVADWhzLp9zVcf2EGU=

View File

@@ -92,10 +92,7 @@ func (f *liteFile) ReadAt(p []byte, off int64) (n int, err error) {
return 0, io.EOF
}
data, err := f.db.cache.getOrFetch(pgno, elem.MaxTXID, func() (any, error) {
_, data, err := litestream.FetchPage(ctx, f.db.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size)
return data, err
})
data, err := f.db.cache.getOrFetch(ctx, f.db.client, pgno, elem)
if err != nil {
f.db.opts.Logger.Error("fetch page", "error", err)
return 0, err
@@ -291,7 +288,7 @@ func (d *liteDB) buildIndex(ctx context.Context) error {
defer d.mtx.Unlock()
// Skip if we already have an index.
if d.pages != nil {
if !d.lastPoll.IsZero() {
return nil
}