diff --git a/litestream/api.go b/litestream/api.go index 1608c91..e61bd76 100644 --- a/litestream/api.go +++ b/litestream/api.go @@ -59,19 +59,19 @@ func NewReplica(name string, client ReplicaClient, options ReplicaOptions) { } liteMtx.Lock() - defer liteMtx.Unlock() liteDBs[name] = &liteDB{ client: client, opts: options, cache: pageCache{size: options.CacheSize}, } + liteMtx.Unlock() } // RemoveReplica removes a replica by name. func RemoveReplica(name string) { liteMtx.Lock() - defer liteMtx.Unlock() delete(liteDBs, name) + liteMtx.Unlock() } type ReplicaClient = litestream.ReplicaClient diff --git a/litestream/go.mod b/litestream/go.mod index cada5b8..3611251 100644 --- a/litestream/go.mod +++ b/litestream/go.mod @@ -4,7 +4,7 @@ go 1.24.4 require ( github.com/benbjohnson/litestream v0.5.5 - github.com/ncruces/go-sqlite3 v0.30.4-0.20251216123455-0b46e74ea69b + github.com/ncruces/go-sqlite3 v0.30.4 github.com/ncruces/wbt v0.2.0 github.com/superfly/ltx v0.5.1 ) diff --git a/litestream/go.sum b/litestream/go.sum index c5adc0a..19679ae 100644 --- a/litestream/go.sum +++ b/litestream/go.sum @@ -105,8 +105,8 @@ github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/ncruces/go-sqlite3 v0.30.4-0.20251216123455-0b46e74ea69b h1:0HG7ul3Q1d/E/jrZpBTpzx4xhxJwMKuq5J4nuZIogm8= -github.com/ncruces/go-sqlite3 v0.30.4-0.20251216123455-0b46e74ea69b/go.mod h1:wz6IQnveXfqaXZozfhM8ciIJi2LRnnifBuBQarPDYo0= +github.com/ncruces/go-sqlite3 v0.30.4 h1:j9hEoOL7f9ZoXl8uqXVniaq1VNwlWAXihZbTvhqPPjA= +github.com/ncruces/go-sqlite3 v0.30.4/go.mod h1:7WR20VSC5IZusKhUdiR9y1NsUqnZgqIYCmKKoMEYg68= github.com/ncruces/julianday v1.0.0 h1:fH0OKwa7NWvniGQtxdJRxAgkBMolni2BjDHaWTxqt7M= github.com/ncruces/julianday v1.0.0/go.mod h1:Dusn2KvZrrovOMJuOt0TNXL6tB7U2E8kvza5fFc9G7g= github.com/ncruces/litestream v0.5.5 h1:LUoyorC+Xx0TtiuEjwd0+GIusCK5IIZwTPsO1+se55g= diff --git a/litestream/vfs.go b/litestream/vfs.go index 1d12742..74c6db0 100644 --- a/litestream/vfs.go +++ b/litestream/vfs.go @@ -3,7 +3,6 @@ package litestream import ( "context" "encoding/binary" - "errors" "fmt" "io" "strconv" @@ -15,7 +14,6 @@ import ( "github.com/superfly/ltx" "github.com/ncruces/go-sqlite3" - "github.com/ncruces/go-sqlite3/util/vfsutil" "github.com/ncruces/go-sqlite3/vfs" "github.com/ncruces/wbt" ) @@ -23,9 +21,9 @@ import ( type liteVFS struct{} func (liteVFS) Open(name string, flags vfs.OpenFlag) (vfs.File, vfs.OpenFlag, error) { - // Temp journals, as used by the sorter, use SliceFile. + // Temp journals, as used by the sorter, use a temporary file. if flags&vfs.OPEN_TEMP_JOURNAL != 0 { - return &vfsutil.SliceFile{}, flags | vfs.OPEN_MEMORY, nil + return vfs.Find("").Open(name, flags) } // Refuse to open all other file types. if flags&vfs.OPEN_MAIN_DB == 0 { @@ -33,16 +31,19 @@ func (liteVFS) Open(name string, flags vfs.OpenFlag) (vfs.File, vfs.OpenFlag, er } liteMtx.RLock() - defer liteMtx.RUnlock() - if db, ok := liteDBs[name]; ok { - // Build the page index so we can lookup individual pages. - if err := db.buildIndex(context.Background()); err != nil { - db.opts.Logger.Error("build index", "error", err) - return nil, 0, err - } - return &liteFile{db: db}, flags | vfs.OPEN_READONLY, nil + db := liteDBs[name] + liteMtx.RUnlock() + + if db == nil { + return nil, flags, sqlite3.CANTOPEN } - return nil, flags, sqlite3.CANTOPEN + + // Build the page index so we can lookup individual pages. + if err := db.buildIndex(context.Background()); err != nil { + db.opts.Logger.Error("build index", "error", err) + return nil, 0, err + } + return &liteFile{db: db}, flags | vfs.OPEN_READONLY, nil } func (liteVFS) Delete(name string, dirSync bool) error { @@ -253,17 +254,21 @@ func (f *liteFile) context() context.Context { func (f *liteFile) buildIndex(ctx context.Context, syncTime time.Time) error { // Build the index from scratch from a Litestream restore plan. infos, err := litestream.CalcRestorePlan(ctx, f.db.client, 0, syncTime, f.db.opts.Logger) - if err != nil && !errors.Is(err, litestream.ErrTxNotAvailable) { + if err != nil { return fmt.Errorf("calc restore plan: %w", err) } var txid ltx.TXID var pages *pageIndex + syncTime = time.Time{} for _, info := range infos { pages, err = fetchPageIndex(ctx, pages, f.db.client, info) if err != nil { return err } + if syncTime.Before(info.CreatedAt) { + syncTime = info.CreatedAt + } txid = max(txid, info.MaxTXID) } f.syncTime = syncTime @@ -294,7 +299,7 @@ func (d *liteDB) buildIndex(ctx context.Context) error { // Build the index from scratch from a Litestream restore plan. infos, err := litestream.CalcRestorePlan(ctx, d.client, 0, time.Time{}, d.opts.Logger) - if err != nil && !errors.Is(err, litestream.ErrTxNotAvailable) { + if err != nil { return fmt.Errorf("calc restore plan: %w", err) } diff --git a/litestream/vfs_test.go b/litestream/vfs_test.go index 1a47a7e..abb46d8 100644 --- a/litestream/vfs_test.go +++ b/litestream/vfs_test.go @@ -90,35 +90,6 @@ func Test_integration(t *testing.T) { if txid != "0000000000000001" { t.Errorf("got %q", txid) } - - _, err = replica.ExecContext(t.Context(), `PRAGMA litestream_time='-1.5h'`) - if err != nil { - t.Fatal(err) - } - - _, err = replica.ExecContext(t.Context(), `PRAGMA litestream_time='-00:01'`) - if err != nil { - t.Fatal(err) - } - - _, err = replica.ExecContext(t.Context(), `PRAGMA litestream_time='-2.5 years'`) - if err != nil { - t.Fatal(err) - } - - _, err = replica.ExecContext(t.Context(), `PRAGMA litestream_time='1970-01-01'`) - if err != nil { - t.Fatal(err) - } - - var sync time.Time - err = replica.QueryRowContext(t.Context(), `PRAGMA litestream_time`).Scan(&sync) - if err != nil { - t.Fatal(err) - } - if !sync.Equal(time.Unix(0, 0)) { - t.Errorf("got %v", sync) - } } func setupReplication(tb testing.TB, path string, client ReplicaClient) { diff --git a/registry.go b/registry.go index 043d69e..4376f67 100644 --- a/registry.go +++ b/registry.go @@ -14,8 +14,8 @@ var ( // https://sqlite.org/c3ref/auto_extension.html func AutoExtension(entryPoint func(*Conn) error) { extRegistryMtx.Lock() - defer extRegistryMtx.Unlock() extRegistry = append(extRegistry, entryPoint) + extRegistryMtx.Unlock() } func initExtensions(c *Conn) error { diff --git a/vfs/memdb/api.go b/vfs/memdb/api.go index a128198..5a51272 100644 --- a/vfs/memdb/api.go +++ b/vfs/memdb/api.go @@ -34,9 +34,6 @@ var ( // The new database takes ownership of data, // and the caller should not use data after this call. func Create(name string, data []byte) { - memoryMtx.Lock() - defer memoryMtx.Unlock() - db := &memDB{ refs: 1, name: name, @@ -63,14 +60,16 @@ func Create(name string, data []byte) { } } + memoryMtx.Lock() memoryDBs[name] = db + memoryMtx.Unlock() } // Delete deletes a shared memory database. func Delete(name string) { memoryMtx.Lock() - defer memoryMtx.Unlock() delete(memoryDBs, name) + memoryMtx.Unlock() } // TestDB creates an empty shared memory database for the test to use. diff --git a/vfs/memdb/memdb.go b/vfs/memdb/memdb.go index eb05059..125f54c 100644 --- a/vfs/memdb/memdb.go +++ b/vfs/memdb/memdb.go @@ -92,10 +92,10 @@ type memDB struct { func (m *memDB) release() { memoryMtx.Lock() - defer memoryMtx.Unlock() if m.refs--; m.refs == 0 && m == memoryDBs[m.name] { delete(memoryDBs, m.name) } + memoryMtx.Unlock() } type memFile struct { diff --git a/vfs/mvcc/api.go b/vfs/mvcc/api.go index 5f20e53..3809e81 100644 --- a/vfs/mvcc/api.go +++ b/vfs/mvcc/api.go @@ -35,13 +35,12 @@ var ( // using a snapshot as its initial contents. func Create(name string, snapshot Snapshot) { memoryMtx.Lock() - defer memoryMtx.Unlock() - memoryDBs[name] = &mvccDB{ refs: 1, name: name, data: snapshot.Tree, } + memoryMtx.Unlock() } // Delete deletes a shared memory database. @@ -49,8 +48,8 @@ func Delete(name string) { name = getName(name) memoryMtx.Lock() - defer memoryMtx.Unlock() delete(memoryDBs, name) + memoryMtx.Unlock() } // Snapshot represents a database snapshot. @@ -83,8 +82,9 @@ func TakeSnapshot(name string) Snapshot { name = getName(name) memoryMtx.Lock() - defer memoryMtx.Unlock() db := memoryDBs[name] + memoryMtx.Unlock() + if db == nil { return Snapshot{} } diff --git a/vfs/mvcc/mvcc.go b/vfs/mvcc/mvcc.go index 7e6880f..87437c3 100644 --- a/vfs/mvcc/mvcc.go +++ b/vfs/mvcc/mvcc.go @@ -79,10 +79,10 @@ type mvccDB struct { func (m *mvccDB) release() { memoryMtx.Lock() - defer memoryMtx.Unlock() if m.refs--; m.refs == 0 && m == memoryDBs[m.name] { delete(memoryDBs, m.name) } + memoryMtx.Unlock() } type mvccFile struct { @@ -105,10 +105,10 @@ func (m *mvccFile) Close() error { m.data = nil m.lock = vfs.LOCK_NONE m.mtx.Lock() - defer m.mtx.Unlock() if m.owner == m { m.owner = nil } + m.mtx.Unlock() return nil } @@ -313,10 +313,10 @@ func (m *mvccFile) CommitPhaseTwo() error { // Modified without lock, commit changes. if m.lock > vfs.LOCK_EXCLUSIVE { m.mtx.Lock() - defer m.mtx.Unlock() m.mvccDB.data = m.data m.lock = vfs.LOCK_NONE m.data = nil + m.mtx.Unlock() } return nil } diff --git a/vfs/readervfs/api.go b/vfs/readervfs/api.go index 60813e7..fd5852e 100644 --- a/vfs/readervfs/api.go +++ b/vfs/readervfs/api.go @@ -30,13 +30,13 @@ var ( // otherwise SQLite might return incorrect query results and/or [sqlite3.CORRUPT] errors. func Create(name string, reader ioutil.SizeReaderAt) { readerMtx.Lock() - defer readerMtx.Unlock() readerDBs[name] = reader + readerMtx.Unlock() } // Delete deletes a shared memory database. func Delete(name string) { readerMtx.Lock() - defer readerMtx.Unlock() delete(readerDBs, name) + readerMtx.Unlock() } diff --git a/vfs/readervfs/reader.go b/vfs/readervfs/reader.go index 4312744..098f432 100644 --- a/vfs/readervfs/reader.go +++ b/vfs/readervfs/reader.go @@ -3,16 +3,15 @@ package readervfs import ( "github.com/ncruces/go-sqlite3" "github.com/ncruces/go-sqlite3/util/ioutil" - "github.com/ncruces/go-sqlite3/util/vfsutil" "github.com/ncruces/go-sqlite3/vfs" ) type readerVFS struct{} func (readerVFS) Open(name string, flags vfs.OpenFlag) (vfs.File, vfs.OpenFlag, error) { - // Temp journals, as used by the sorter, use SliceFile. + // Temp journals, as used by the sorter, use a temporary file. if flags&vfs.OPEN_TEMP_JOURNAL != 0 { - return &vfsutil.SliceFile{}, flags | vfs.OPEN_MEMORY, nil + return vfs.Find("").Open(name, flags) } // Refuse to open all other file types. if flags&vfs.OPEN_MAIN_DB == 0 { diff --git a/vfs/registry.go b/vfs/registry.go index 42a2106..32ddfbd 100644 --- a/vfs/registry.go +++ b/vfs/registry.go @@ -10,13 +10,17 @@ var ( // Find returns a VFS given its name. // If there is no match, nil is returned. -// If name is empty, the default VFS is returned. +// If name is empty or "os", the default VFS is returned. // // https://sqlite.org/c3ref/vfs_find.html func Find(name string) VFS { if name == "" || name == "os" { return vfsOS{} } + return find(name) +} + +func find(name string) VFS { vfsRegistryMtx.RLock() defer vfsRegistryMtx.RUnlock() return vfsRegistry[name] @@ -31,11 +35,11 @@ func Register(name string, vfs VFS) { return } vfsRegistryMtx.Lock() - defer vfsRegistryMtx.Unlock() if vfsRegistry == nil { vfsRegistry = map[string]VFS{} } vfsRegistry[name] = vfs + vfsRegistryMtx.Unlock() } // Unregister unregisters a VFS. @@ -43,6 +47,6 @@ func Register(name string, vfs VFS) { // https://sqlite.org/c3ref/vfs_find.html func Unregister(name string) { vfsRegistryMtx.Lock() - defer vfsRegistryMtx.Unlock() delete(vfsRegistry, name) + vfsRegistryMtx.Unlock() } diff --git a/vfs/vfs.go b/vfs/vfs.go index d1e1964..6720766 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -50,8 +50,7 @@ func ExportHostFunctions(env wazero.HostModuleBuilder) wazero.HostModuleBuilder } func vfsFind(ctx context.Context, mod api.Module, zVfsName ptr_t) uint32 { - name := util.ReadString(mod, zVfsName, _MAX_NAME) - if vfs := Find(name); vfs != nil && vfs != (vfsOS{}) { + if find(util.ReadString(mod, zVfsName, _MAX_NAME)) != nil { return 1 } return 0