From 0026bc91aa11ea349dfcad924caa417f43ce5db7 Mon Sep 17 00:00:00 2001 From: Nuno Cruces Date: Thu, 21 Aug 2025 18:44:40 +0100 Subject: [PATCH] MVCC memory VFS. (#309) --- go.mod | 1 + go.sum | 2 + tests/parallel/parallel_test.go | 27 ++ vfs/mvcc/README.md | 9 + vfs/mvcc/api.go | 67 +++++ vfs/mvcc/example_test.go | 50 ++++ vfs/mvcc/mvcc.go | 333 ++++++++++++++++++++++++ vfs/mvcc/mvcc_test.go | 30 +++ vfs/mvcc/testdata/test.db | Bin 0 -> 1024 bytes vfs/mvcc/testdata/wal.db | Bin 0 -> 512 bytes vfs/tests/mptest/mptest_test.go | 45 ++++ vfs/tests/speedtest1/speedtest1_test.go | 1 + 12 files changed, 565 insertions(+) create mode 100644 vfs/mvcc/README.md create mode 100644 vfs/mvcc/api.go create mode 100644 vfs/mvcc/example_test.go create mode 100644 vfs/mvcc/mvcc.go create mode 100644 vfs/mvcc/mvcc_test.go create mode 100644 vfs/mvcc/testdata/test.db create mode 100644 vfs/mvcc/testdata/wal.db diff --git a/go.mod b/go.mod index 99f272c..9b5ecdd 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23.0 toolchain go1.24.0 require ( + github.com/ncruces/aa v0.3.0 github.com/ncruces/julianday v1.0.0 github.com/ncruces/sort v0.1.5 github.com/tetratelabs/wazero v1.9.0 diff --git a/go.sum b/go.sum index 44598cb..51b7bce 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/ncruces/aa v0.3.0 h1:6NPcK3jsyPWRZBZWCyF7c4IzEQX4eJtkKJBA+IRKTkQ= +github.com/ncruces/aa v0.3.0/go.mod h1:ctOw1LVqfuqzqg2S9LlR045bLAiXtaTiPMCL3zzl7Ik= github.com/ncruces/julianday v1.0.0 h1:fH0OKwa7NWvniGQtxdJRxAgkBMolni2BjDHaWTxqt7M= github.com/ncruces/julianday v1.0.0/go.mod h1:Dusn2KvZrrovOMJuOt0TNXL6tB7U2E8kvza5fFc9G7g= github.com/ncruces/sort v0.1.5 h1:fiFWXXAqKI8QckPf/6hu/bGFwcEPrirIOFaJqWujs4k= diff --git a/tests/parallel/parallel_test.go b/tests/parallel/parallel_test.go index cb59874..3e1630a 100644 --- a/tests/parallel/parallel_test.go +++ b/tests/parallel/parallel_test.go @@ -18,6 +18,7 @@ import ( "github.com/ncruces/go-sqlite3/vfs" _ "github.com/ncruces/go-sqlite3/vfs/adiantum" "github.com/ncruces/go-sqlite3/vfs/memdb" + "github.com/ncruces/go-sqlite3/vfs/mvcc" _ "github.com/ncruces/go-sqlite3/vfs/xts" ) @@ -98,6 +99,22 @@ func Test_memdb(t *testing.T) { testIntegrity(t, name) } +func Test_mvcc(t *testing.T) { + var iter int + if testing.Short() { + iter = 1000 + } else { + iter = 5000 + } + + mvcc.Create("test.db", "") + name := "file:/test.db?vfs=mvcc" + + "&_pragma=busy_timeout(10000)" + createDB(t, name) + testParallel(t, name, iter) + testIntegrity(t, name) +} + func Test_adiantum(t *testing.T) { if !vfs.SupportsFileLocking { t.Skip("skipping without locks") @@ -312,6 +329,16 @@ func Benchmark_memdb(b *testing.B) { testParallel(b, name, b.N) } +func Benchmark_mvcc(b *testing.B) { + mvcc.Create("test.db", "") + name := "file:/test.db?vfs=mvcc" + + "&_pragma=busy_timeout(10000)" + createDB(b, name) + + b.ResetTimer() + testParallel(b, name, b.N) +} + func createDB(t testing.TB, name string) { db, err := sqlite3.Open(name) if err != nil { diff --git a/vfs/mvcc/README.md b/vfs/mvcc/README.md new file mode 100644 index 0000000..e39efaf --- /dev/null +++ b/vfs/mvcc/README.md @@ -0,0 +1,9 @@ +# Go `mvcc` SQLite VFS + +This package implements the **EXPERIMENTAL** `"mvcc"` in-memory SQLite VFS. + +It has some benefits over the [`"memdb"`](../memdb/README.md) VFS: +- panics do not corrupt a shared database; +- single-writer not blocked by readers, +- readers never block, +- instant snapshots. \ No newline at end of file diff --git a/vfs/mvcc/api.go b/vfs/mvcc/api.go new file mode 100644 index 0000000..c131250 --- /dev/null +++ b/vfs/mvcc/api.go @@ -0,0 +1,67 @@ +// Package mvcc implements the "mvcc" SQLite VFS. +// +// The "mvcc" [vfs.VFS] allows the same in-memory database to be shared +// among multiple database connections in the same process, +// as long as the database name begins with "/". +// +// Importing package mvcc registers the VFS: +// +// import _ "github.com/ncruces/go-sqlite3/vfs/mvcc" +package mvcc + +import ( + "sync" + + "github.com/ncruces/go-sqlite3/vfs" +) + +func init() { + vfs.Register("mvcc", mvccVFS{}) +} + +var ( + memoryMtx sync.Mutex + // +checklocks:memoryMtx + memoryDBs = map[string]*mvccDB{} +) + +// Create creates a shared memory database, +// using data as its initial contents. +func Create(name string, data string) { + memoryMtx.Lock() + defer memoryMtx.Unlock() + + db := &mvccDB{ + refs: 1, + name: name, + } + memoryDBs[name] = db + if len(data) == 0 { + return + } + // Convert data from WAL/2 to rollback journal. + if len(data) >= 20 && (false || + data[18] == 2 && data[19] == 2 || + data[18] == 3 && data[19] == 3) { + db.data = db.data. + Put(0, data[:18]). + Put(18, "\001\001"). + Put(20, data[20:]) + } else { + db.data = db.data.Put(0, data) + } +} + +// Delete deletes a shared memory database. +func Delete(name string) { + memoryMtx.Lock() + defer memoryMtx.Unlock() + delete(memoryDBs, name) +} + +// Snapshot stores a snapshot of database src into dst. +func Snapshot(dst, src string) { + memoryMtx.Lock() + defer memoryMtx.Unlock() + memoryDBs[dst] = memoryDBs[src].fork() +} diff --git a/vfs/mvcc/example_test.go b/vfs/mvcc/example_test.go new file mode 100644 index 0000000..d2c98a6 --- /dev/null +++ b/vfs/mvcc/example_test.go @@ -0,0 +1,50 @@ +package mvcc_test + +import ( + "database/sql" + _ "embed" + "fmt" + "log" + + _ "github.com/ncruces/go-sqlite3/driver" + _ "github.com/ncruces/go-sqlite3/embed" + "github.com/ncruces/go-sqlite3/vfs/mvcc" +) + +//go:embed testdata/test.db +var testDB string + +func Example() { + mvcc.Create("test.db", testDB) + + db, err := sql.Open("sqlite3", "file:/test.db?vfs=mvcc") + if err != nil { + log.Fatal(err) + } + defer db.Close() + + _, err = db.Exec(`INSERT INTO users (id, name) VALUES (3, 'rust')`) + if err != nil { + log.Fatal(err) + } + + rows, err := db.Query(`SELECT id, name FROM users`) + if err != nil { + log.Fatal(err) + } + defer rows.Close() + + for rows.Next() { + var id, name string + err = rows.Scan(&id, &name) + if err != nil { + log.Fatal(err) + } + fmt.Printf("%s %s\n", id, name) + } + // Output: + // 0 go + // 1 zig + // 2 whatever + // 3 rust +} diff --git a/vfs/mvcc/mvcc.go b/vfs/mvcc/mvcc.go new file mode 100644 index 0000000..e74522a --- /dev/null +++ b/vfs/mvcc/mvcc.go @@ -0,0 +1,333 @@ +package mvcc + +import ( + "io" + "strings" + "sync" + "time" + + "github.com/ncruces/aa" + "github.com/ncruces/go-sqlite3" + "github.com/ncruces/go-sqlite3/util/vfsutil" + "github.com/ncruces/go-sqlite3/vfs" +) + +type mvccVFS struct{} + +func (mvccVFS) Open(name string, flags vfs.OpenFlag) (vfs.File, vfs.OpenFlag, error) { + // Temporary files use SliceFile. + if name == "" || flags&vfs.OPEN_DELETEONCLOSE != 0 { + return &vfsutil.SliceFile{}, flags | vfs.OPEN_MEMORY, nil + } + + // Only main databases benefit from multiversion concurrency control. + // Refuse to open all other file types. + // Returning OPEN_MEMORY means SQLite won't ask us to. + if flags&vfs.OPEN_MAIN_DB == 0 { + // notest // OPEN_MEMORY + return nil, flags, sqlite3.CANTOPEN + } + + // A shared database has a name that begins with "/". + shared := strings.HasPrefix(name, "/") + + var db *mvccDB + if shared { + name = name[1:] + memoryMtx.Lock() + defer memoryMtx.Unlock() + db = memoryDBs[name] + } + if db == nil { + if flags&vfs.OPEN_CREATE == 0 { + return nil, flags, sqlite3.CANTOPEN + } + db = &mvccDB{name: name} + } + if shared { + db.refs++ // +checklocksforce: memoryMtx is held + memoryDBs[name] = db + } + + return &mvccFile{ + mvccDB: db, + readOnly: flags&vfs.OPEN_READONLY != 0, + }, flags | vfs.OPEN_MEMORY, nil +} + +func (mvccVFS) Delete(name string, dirSync bool) error { + return sqlite3.IOERR_DELETE_NOENT // used to delete journals +} + +func (mvccVFS) Access(name string, flag vfs.AccessFlag) (bool, error) { + return false, nil // used to check for journals +} + +func (mvccVFS) FullPathname(name string) (string, error) { + return name, nil +} + +type mvccDB struct { + data *aa.Tree[int64, string] // +checklocks:mtx + owner *mvccFile // +checklocks:mtx + waiter *sync.Cond // +checklocks:mtx + + name string + refs int // +checklocks:memoryMtx + mtx sync.Mutex +} + +func (m *mvccDB) release() { + memoryMtx.Lock() + defer memoryMtx.Unlock() + if m.refs--; m.refs == 0 && m == memoryDBs[m.name] { + delete(memoryDBs, m.name) + } +} + +func (m *mvccDB) fork() *mvccDB { + m.mtx.Lock() + defer m.mtx.Unlock() + return &mvccDB{ + refs: 1, + name: m.name, + data: m.data, + } +} + +type mvccFile struct { + *mvccDB + data *aa.Tree[int64, string] + lock vfs.LockLevel + readOnly bool +} + +var ( + // Ensure these interfaces are implemented: + _ vfs.FileLockState = &mvccFile{} + _ vfs.FileCommitPhaseTwo = &mvccFile{} +) + +func (m *mvccFile) Close() error { + // Relase ownership, discard changes. + m.release() + m.data = nil + m.lock = vfs.LOCK_NONE + m.mtx.Lock() + defer m.mtx.Unlock() + if m.owner == m { + m.owner = nil + } + return nil +} + +func (m *mvccFile) ReadAt(b []byte, off int64) (n int, err error) { + // If unlocked, use a snapshot of the database. + data := m.data + if m.lock == vfs.LOCK_NONE { + m.mtx.Lock() + defer m.mtx.Unlock() + data = m.mvccDB.data + } + + for k, v := range data.AscendFloor(off) { + if i := k - off; i >= 0 { + if +i > int64(n) { + // Missing data. + clear(b[n:]) + } + if +i < int64(len(b)) { + // Copy prefix. + n = copy(b[+i:], v) + int(i) + } + } else { + if -i < int64(len(v)) { + // Copy suffix. + n = copy(b, v[-i:]) + } + } + if n >= len(b) { + return n, nil + } + } + return n, io.EOF +} + +func (m *mvccFile) WriteAt(b []byte, off int64) (n int, err error) { + // If unlocked, take a snapshot of the database. + data := m.data + if m.lock == vfs.LOCK_NONE { + m.mtx.Lock() + defer m.mtx.Unlock() + data = m.mvccDB.data + m.lock = vfs.LOCK_EXCLUSIVE + 1 // UNKNOWN_LOCK + } + + next := off + int64(len(b)) + for k, v := range data.AscendFloor(off) { + if k >= next { + break + } + switch { + case k > off: + // Delete overlap. + data = data.Delete(k) + case k < off && off < k+int64(len(v)): + // Reinsert prefix. + data = data.Put(k, v[:off-k]) + } + if k+int64(len(v)) > next { + // Reinsert suffix. + data = data.Put(next, v[next-k:]) + } + } + + m.data = data.Put(off, string(b)) + return len(b), nil +} + +func (m *mvccFile) Size() (int64, error) { + // If unlocked, use a snapshot of the database. + data := m.data + if m.lock == vfs.LOCK_NONE { + m.mtx.Lock() + defer m.mtx.Unlock() + data = m.mvccDB.data + } + + if data == nil { + return 0, nil + } + data = data.Max() + return data.Key() + int64(len(data.Value())), nil +} + +func (m *mvccFile) Truncate(size int64) error { + // If unlocked, take a snapshot of the database. + data := m.data + if m.lock == vfs.LOCK_NONE { + m.mtx.Lock() + defer m.mtx.Unlock() + data = m.mvccDB.data + m.lock = vfs.LOCK_EXCLUSIVE + 1 // UNKNOWN_LOCK + } + + for data != nil && data.Key() >= size { + data = data.Left() + } + for k := range data.AscendCeil(size) { + data = data.Delete(k) + } + m.data = data + return nil +} + +func (m *mvccFile) Lock(lock vfs.LockLevel) error { + if m.lock >= lock { + return nil + } + + if m.readOnly && lock >= vfs.LOCK_RESERVED { + return sqlite3.IOERR_LOCK + } + + m.mtx.Lock() + defer m.mtx.Unlock() + + // Take a snapshot of the database. + if lock == vfs.LOCK_SHARED { + m.data = m.mvccDB.data + m.lock = lock + return nil + } + // We are the owners. + if m.owner == m { + m.lock = lock + return nil + } + // Someone else is the owner. + if m.owner != nil { + before := time.Now() + if m.waiter == nil { + m.waiter = sync.NewCond(&m.mtx) + } + defer time.AfterFunc(time.Millisecond, m.waiter.Broadcast).Stop() + for m.owner != nil { + // Our snapshot is invalid. + if m.data != m.mvccDB.data { + return sqlite3.BUSY_SNAPSHOT + } + if time.Since(before) > time.Millisecond { + return sqlite3.BUSY + } + m.waiter.Wait() + } + } + // Our snapshot is invalid. + if m.data != m.mvccDB.data { + return sqlite3.BUSY_SNAPSHOT + } + // Take ownership. + m.lock = lock + m.owner = m + return nil +} + +func (m *mvccFile) Unlock(lock vfs.LockLevel) error { + if m.lock <= lock { + return nil + } + + m.mtx.Lock() + defer m.mtx.Unlock() + + // Relase ownership, commit changes. + if m.owner == m { + m.owner = nil + m.mvccDB.data = m.data + if m.waiter != nil { + m.waiter.Broadcast() + } + } + m.lock = lock + return nil +} + +func (m *mvccFile) CheckReservedLock() (bool, error) { + // notest // OPEN_MEMORY + if m.lock >= vfs.LOCK_RESERVED { + return true, nil + } + m.mtx.Lock() + defer m.mtx.Unlock() + return m.owner != nil, nil +} + +func (m *mvccFile) CommitPhaseTwo() error { + // Modified without lock, commit changes. + if m.lock > vfs.LOCK_EXCLUSIVE { + m.mtx.Lock() + defer m.mtx.Unlock() + m.mvccDB.data = m.data + } + return nil +} + +func (m *mvccFile) LockState() vfs.LockLevel { + return m.lock +} + +func (*mvccFile) Sync(flag vfs.SyncFlag) error { return nil } + +func (*mvccFile) SectorSize() int { + // notest // safe default + return 0 +} + +func (*mvccFile) DeviceCharacteristics() vfs.DeviceCharacteristic { + return vfs.IOCAP_ATOMIC | + vfs.IOCAP_SEQUENTIAL | + vfs.IOCAP_SAFE_APPEND | + vfs.IOCAP_POWERSAFE_OVERWRITE | + vfs.IOCAP_SUBPAGE_READ +} diff --git a/vfs/mvcc/mvcc_test.go b/vfs/mvcc/mvcc_test.go new file mode 100644 index 0000000..8715adb --- /dev/null +++ b/vfs/mvcc/mvcc_test.go @@ -0,0 +1,30 @@ +package mvcc + +import ( + _ "embed" + "testing" + + "github.com/ncruces/go-sqlite3" + _ "github.com/ncruces/go-sqlite3/embed" + _ "github.com/ncruces/go-sqlite3/internal/testcfg" +) + +//go:embed testdata/wal.db +var walDB string + +func Test_wal(t *testing.T) { + t.Parallel() + + Create("test.db", walDB) + + db, err := sqlite3.Open("file:/test.db?vfs=mvcc") + if err != nil { + t.Fatal(err) + } + defer db.Close() + + err = db.Exec(`CREATE TABLE users (id INT, name VARCHAR(10))`) + if err != nil { + t.Fatal(err) + } +} diff --git a/vfs/mvcc/testdata/test.db b/vfs/mvcc/testdata/test.db new file mode 100644 index 0000000000000000000000000000000000000000..bd97e0053ce89a8c9523fb57a0627a59ff3d3395 GIT binary patch literal 1024 zcmWFz^vNtqRY=P(%1ta$FlJz4U}R))P*7lC05TyMNPz)}&jMwGC`KeUE+!L$URgIU zNHODX1|S**)C&P;MmBMAamMtL#H5_m(&E&jVlctv9OUX4;;Inh=;Y(702Wfv$V^f2 z^b65Z$V<#kRS0toa`tcx(l9j8)C9#1Gvi|h#-EHIfz&7%ej&ia%*-guRGyJol3JEp U#Ky$TDO{DA&dSKlA();I02EgyJOBUy literal 0 HcmV?d00001 diff --git a/vfs/mvcc/testdata/wal.db b/vfs/mvcc/testdata/wal.db new file mode 100644 index 0000000000000000000000000000000000000000..90b6151ca2fe3f23a4e4cb1c1632108db10af818 GIT binary patch literal 512 zcmWFz^vNtqRY=P(%1ta$FlJz4U}9o$P*7lC05TaE7=aiDkl8RXLNpVDURgIUNEOJm JQ84I2002nB2KfL0 literal 0 HcmV?d00001 diff --git a/vfs/tests/mptest/mptest_test.go b/vfs/tests/mptest/mptest_test.go index 7b6aa9a..60a7479 100644 --- a/vfs/tests/mptest/mptest_test.go +++ b/vfs/tests/mptest/mptest_test.go @@ -24,6 +24,7 @@ import ( "github.com/ncruces/go-sqlite3/vfs" _ "github.com/ncruces/go-sqlite3/vfs/adiantum" "github.com/ncruces/go-sqlite3/vfs/memdb" + "github.com/ncruces/go-sqlite3/vfs/mvcc" _ "github.com/ncruces/go-sqlite3/vfs/xts" ) @@ -195,6 +196,50 @@ func Test_multiwrite01_memory(t *testing.T) { mod.Close(ctx) } +func Test_config01_mvcc(t *testing.T) { + mvcc.Create("test.db", "") + ctx := util.NewContext(newContext(t)) + cfg := config(ctx).WithArgs("mptest", "/test.db", "config01.test", + "--vfs", "mvcc") + mod, err := rt.InstantiateModule(ctx, module, cfg) + if err != nil { + t.Fatal(err) + } + mod.Close(ctx) +} + +func Test_crash01_mvcc(t *testing.T) { + if testing.Short() { + t.Skip("skipping in short mode") + } + + mvcc.Create("test.db", "") + ctx := util.NewContext(newContext(t)) + cfg := config(ctx).WithArgs("mptest", "/test.db", "crash01.test", + "--vfs", "mvcc") + mod, err := rt.InstantiateModule(ctx, module, cfg) + if err != nil { + t.Fatal(err) + } + mod.Close(ctx) +} + +func Test_multiwrite01_mvcc(t *testing.T) { + if testing.Short() && os.Getenv("CI") != "" { + t.Skip("skipping in slow CI") + } + + mvcc.Create("test.db", "") + ctx := util.NewContext(newContext(t)) + cfg := config(ctx).WithArgs("mptest", "/test.db", "multiwrite01.test", + "--vfs", "mvcc") + mod, err := rt.InstantiateModule(ctx, module, cfg) + if err != nil { + t.Fatal(err) + } + mod.Close(ctx) +} + func Test_crash01_wal(t *testing.T) { if testing.Short() { t.Skip("skipping in short mode") diff --git a/vfs/tests/speedtest1/speedtest1_test.go b/vfs/tests/speedtest1/speedtest1_test.go index 9fafb2c..2bf320d 100644 --- a/vfs/tests/speedtest1/speedtest1_test.go +++ b/vfs/tests/speedtest1/speedtest1_test.go @@ -21,6 +21,7 @@ import ( "github.com/ncruces/go-sqlite3/vfs" _ "github.com/ncruces/go-sqlite3/vfs/adiantum" _ "github.com/ncruces/go-sqlite3/vfs/memdb" + _ "github.com/ncruces/go-sqlite3/vfs/mvcc" _ "github.com/ncruces/go-sqlite3/vfs/xts" )