From ba9caf0405f3d3312629049fde30d9a6bbccbcd7 Mon Sep 17 00:00:00 2001 From: Nuno Cruces Date: Thu, 20 Nov 2025 16:54:51 +0000 Subject: [PATCH] Shared page cache. --- litestream/api.go | 28 ++++++++++++++---- litestream/cache.go | 72 +++++++++++++++++++++++++++++++++++++++++++++ litestream/vfs.go | 8 +++-- 3 files changed, 100 insertions(+), 8 deletions(-) create mode 100644 litestream/cache.go diff --git a/litestream/api.go b/litestream/api.go index ada21ee..76328e1 100644 --- a/litestream/api.go +++ b/litestream/api.go @@ -10,8 +10,13 @@ import ( "github.com/ncruces/go-sqlite3/vfs" ) -// The default poll interval. -const DefaultPollInterval = 1 * time.Second +const ( + // The default poll interval. + DefaultPollInterval = 1 * time.Second + + // The default cache size: 10 MiB. + DefaultCacheSize = 10 * 1024 * 1024 +) func init() { vfs.Register("litestream", liteVFS{}) @@ -27,11 +32,18 @@ var ( type ReplicaOptions struct { // Where to log error messages. May be nil. Logger *slog.Logger - // Minimum compaction level to track. - MinLevel int - // Replica poll interval. Must be less than the compaction interval + + // Replica poll interval. + // Should be less than the compaction interval // used by the replica at MinLevel+1. PollInterval time.Duration + + // Minimum compaction level to track. + MinLevel int + + // CacheSize is the maximum size of the page cache in bytes. + // Zero means DefaultCacheSize, negative disables caching. + CacheSize int } // NewReplica creates a read-replica from a Litestream client. @@ -44,12 +56,16 @@ func NewReplica(name string, client litestream.ReplicaClient, options ReplicaOpt if options.PollInterval <= 0 { options.PollInterval = DefaultPollInterval } + if options.CacheSize == 0 { + options.CacheSize = DefaultCacheSize + } liteMtx.Lock() defer liteMtx.Unlock() liteDBs[name] = &liteDB{ client: client, - opts: &options, + opts: options, + cache: pageCache{size: options.CacheSize}, } } diff --git a/litestream/cache.go b/litestream/cache.go new file mode 100644 index 0000000..dae5de8 --- /dev/null +++ b/litestream/cache.go @@ -0,0 +1,72 @@ +package litestream + +import ( + "encoding/binary" + "sync" + + "golang.org/x/sync/singleflight" + + "github.com/superfly/ltx" +) + +type pageCache struct { + single singleflight.Group + pages map[uint32]cachedPage // +checklocks:mtx + size int + mtx sync.Mutex +} + +type cachedPage struct { + data []byte + txid ltx.TXID +} + +func (c *pageCache) getOrFetch(pgno uint32, maxTXID ltx.TXID, fetch func() (any, error)) ([]byte, error) { + if c.size >= 0 { + c.mtx.Lock() + if c.pages == nil { + c.pages = map[uint32]cachedPage{} + } + page := c.pages[pgno] + c.mtx.Unlock() + + if page.txid == maxTXID { + return page.data, nil + } + } + + var key [12]byte + binary.LittleEndian.PutUint32(key[0:], pgno) + binary.LittleEndian.PutUint64(key[4:], uint64(maxTXID)) + v, err, _ := c.single.Do(string(key[:]), fetch) + + if err != nil { + return nil, err + } + + page := cachedPage{v.([]byte), maxTXID} + if c.size >= 0 { + c.mtx.Lock() + c.evict(len(page.data)) + c.pages[pgno] = page + c.mtx.Unlock() + } + return page.data, nil +} + +// +checklocks:c.mtx +func (c *pageCache) evict(pageSize int) { + // Evict random keys until we're under the maximum size. + // SQLite has its own page cache, which it will use for each connection. + // Since this is a second layer of shared cache, + // random eviction is probably good enough. + if pageSize*len(c.pages) < c.size { + return + } + for key := range c.pages { + delete(c.pages, key) + if pageSize*len(c.pages) < c.size { + return + } + } +} diff --git a/litestream/vfs.go b/litestream/vfs.go index 76750da..b2f4be6 100644 --- a/litestream/vfs.go +++ b/litestream/vfs.go @@ -86,7 +86,10 @@ func (f *liteFile) ReadAt(p []byte, off int64) (n int, err error) { return 0, io.EOF } - _, data, err := litestream.FetchPage(ctx, f.db.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size) + data, err := f.db.cache.getOrFetch(pgno, elem.MaxTXID, func() (any, error) { + _, data, err := litestream.FetchPage(ctx, f.db.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size) + return data, err + }) if err != nil { f.db.opts.Logger.Error("fetch page", "error", err) return 0, err @@ -169,7 +172,8 @@ func (f *liteFile) context() context.Context { type liteDB struct { client litestream.ReplicaClient - opts *ReplicaOptions + opts ReplicaOptions + cache pageCache pages *pageIndex // +checklocks:mtx lastPoll time.Time // +checklocks:mtx txids levelTXIDs // +checklocks:mtx