MVCC memory VFS. (#309)

This commit is contained in:
Nuno Cruces
2025-08-21 18:44:40 +01:00
committed by GitHub
parent d84ca9d627
commit 0026bc91aa
12 changed files with 565 additions and 0 deletions

1
go.mod
View File

@@ -5,6 +5,7 @@ go 1.23.0
toolchain go1.24.0 toolchain go1.24.0
require ( require (
github.com/ncruces/aa v0.3.0
github.com/ncruces/julianday v1.0.0 github.com/ncruces/julianday v1.0.0
github.com/ncruces/sort v0.1.5 github.com/ncruces/sort v0.1.5
github.com/tetratelabs/wazero v1.9.0 github.com/tetratelabs/wazero v1.9.0

2
go.sum
View File

@@ -2,6 +2,8 @@ github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA=
github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc= github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/ncruces/aa v0.3.0 h1:6NPcK3jsyPWRZBZWCyF7c4IzEQX4eJtkKJBA+IRKTkQ=
github.com/ncruces/aa v0.3.0/go.mod h1:ctOw1LVqfuqzqg2S9LlR045bLAiXtaTiPMCL3zzl7Ik=
github.com/ncruces/julianday v1.0.0 h1:fH0OKwa7NWvniGQtxdJRxAgkBMolni2BjDHaWTxqt7M= github.com/ncruces/julianday v1.0.0 h1:fH0OKwa7NWvniGQtxdJRxAgkBMolni2BjDHaWTxqt7M=
github.com/ncruces/julianday v1.0.0/go.mod h1:Dusn2KvZrrovOMJuOt0TNXL6tB7U2E8kvza5fFc9G7g= github.com/ncruces/julianday v1.0.0/go.mod h1:Dusn2KvZrrovOMJuOt0TNXL6tB7U2E8kvza5fFc9G7g=
github.com/ncruces/sort v0.1.5 h1:fiFWXXAqKI8QckPf/6hu/bGFwcEPrirIOFaJqWujs4k= github.com/ncruces/sort v0.1.5 h1:fiFWXXAqKI8QckPf/6hu/bGFwcEPrirIOFaJqWujs4k=

View File

@@ -18,6 +18,7 @@ import (
"github.com/ncruces/go-sqlite3/vfs" "github.com/ncruces/go-sqlite3/vfs"
_ "github.com/ncruces/go-sqlite3/vfs/adiantum" _ "github.com/ncruces/go-sqlite3/vfs/adiantum"
"github.com/ncruces/go-sqlite3/vfs/memdb" "github.com/ncruces/go-sqlite3/vfs/memdb"
"github.com/ncruces/go-sqlite3/vfs/mvcc"
_ "github.com/ncruces/go-sqlite3/vfs/xts" _ "github.com/ncruces/go-sqlite3/vfs/xts"
) )
@@ -98,6 +99,22 @@ func Test_memdb(t *testing.T) {
testIntegrity(t, name) testIntegrity(t, name)
} }
func Test_mvcc(t *testing.T) {
var iter int
if testing.Short() {
iter = 1000
} else {
iter = 5000
}
mvcc.Create("test.db", "")
name := "file:/test.db?vfs=mvcc" +
"&_pragma=busy_timeout(10000)"
createDB(t, name)
testParallel(t, name, iter)
testIntegrity(t, name)
}
func Test_adiantum(t *testing.T) { func Test_adiantum(t *testing.T) {
if !vfs.SupportsFileLocking { if !vfs.SupportsFileLocking {
t.Skip("skipping without locks") t.Skip("skipping without locks")
@@ -312,6 +329,16 @@ func Benchmark_memdb(b *testing.B) {
testParallel(b, name, b.N) testParallel(b, name, b.N)
} }
func Benchmark_mvcc(b *testing.B) {
mvcc.Create("test.db", "")
name := "file:/test.db?vfs=mvcc" +
"&_pragma=busy_timeout(10000)"
createDB(b, name)
b.ResetTimer()
testParallel(b, name, b.N)
}
func createDB(t testing.TB, name string) { func createDB(t testing.TB, name string) {
db, err := sqlite3.Open(name) db, err := sqlite3.Open(name)
if err != nil { if err != nil {

9
vfs/mvcc/README.md Normal file
View File

@@ -0,0 +1,9 @@
# Go `mvcc` SQLite VFS
This package implements the **EXPERIMENTAL** `"mvcc"` in-memory SQLite VFS.
It has some benefits over the [`"memdb"`](../memdb/README.md) VFS:
- panics do not corrupt a shared database;
- single-writer not blocked by readers,
- readers never block,
- instant snapshots.

67
vfs/mvcc/api.go Normal file
View File

@@ -0,0 +1,67 @@
// Package mvcc implements the "mvcc" SQLite VFS.
//
// The "mvcc" [vfs.VFS] allows the same in-memory database to be shared
// among multiple database connections in the same process,
// as long as the database name begins with "/".
//
// Importing package mvcc registers the VFS:
//
// import _ "github.com/ncruces/go-sqlite3/vfs/mvcc"
package mvcc
import (
"sync"
"github.com/ncruces/go-sqlite3/vfs"
)
func init() {
vfs.Register("mvcc", mvccVFS{})
}
var (
memoryMtx sync.Mutex
// +checklocks:memoryMtx
memoryDBs = map[string]*mvccDB{}
)
// Create creates a shared memory database,
// using data as its initial contents.
func Create(name string, data string) {
memoryMtx.Lock()
defer memoryMtx.Unlock()
db := &mvccDB{
refs: 1,
name: name,
}
memoryDBs[name] = db
if len(data) == 0 {
return
}
// Convert data from WAL/2 to rollback journal.
if len(data) >= 20 && (false ||
data[18] == 2 && data[19] == 2 ||
data[18] == 3 && data[19] == 3) {
db.data = db.data.
Put(0, data[:18]).
Put(18, "\001\001").
Put(20, data[20:])
} else {
db.data = db.data.Put(0, data)
}
}
// Delete deletes a shared memory database.
func Delete(name string) {
memoryMtx.Lock()
defer memoryMtx.Unlock()
delete(memoryDBs, name)
}
// Snapshot stores a snapshot of database src into dst.
func Snapshot(dst, src string) {
memoryMtx.Lock()
defer memoryMtx.Unlock()
memoryDBs[dst] = memoryDBs[src].fork()
}

50
vfs/mvcc/example_test.go Normal file
View File

@@ -0,0 +1,50 @@
package mvcc_test
import (
"database/sql"
_ "embed"
"fmt"
"log"
_ "github.com/ncruces/go-sqlite3/driver"
_ "github.com/ncruces/go-sqlite3/embed"
"github.com/ncruces/go-sqlite3/vfs/mvcc"
)
//go:embed testdata/test.db
var testDB string
func Example() {
mvcc.Create("test.db", testDB)
db, err := sql.Open("sqlite3", "file:/test.db?vfs=mvcc")
if err != nil {
log.Fatal(err)
}
defer db.Close()
_, err = db.Exec(`INSERT INTO users (id, name) VALUES (3, 'rust')`)
if err != nil {
log.Fatal(err)
}
rows, err := db.Query(`SELECT id, name FROM users`)
if err != nil {
log.Fatal(err)
}
defer rows.Close()
for rows.Next() {
var id, name string
err = rows.Scan(&id, &name)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s %s\n", id, name)
}
// Output:
// 0 go
// 1 zig
// 2 whatever
// 3 rust
}

333
vfs/mvcc/mvcc.go Normal file
View File

@@ -0,0 +1,333 @@
package mvcc
import (
"io"
"strings"
"sync"
"time"
"github.com/ncruces/aa"
"github.com/ncruces/go-sqlite3"
"github.com/ncruces/go-sqlite3/util/vfsutil"
"github.com/ncruces/go-sqlite3/vfs"
)
type mvccVFS struct{}
func (mvccVFS) Open(name string, flags vfs.OpenFlag) (vfs.File, vfs.OpenFlag, error) {
// Temporary files use SliceFile.
if name == "" || flags&vfs.OPEN_DELETEONCLOSE != 0 {
return &vfsutil.SliceFile{}, flags | vfs.OPEN_MEMORY, nil
}
// Only main databases benefit from multiversion concurrency control.
// Refuse to open all other file types.
// Returning OPEN_MEMORY means SQLite won't ask us to.
if flags&vfs.OPEN_MAIN_DB == 0 {
// notest // OPEN_MEMORY
return nil, flags, sqlite3.CANTOPEN
}
// A shared database has a name that begins with "/".
shared := strings.HasPrefix(name, "/")
var db *mvccDB
if shared {
name = name[1:]
memoryMtx.Lock()
defer memoryMtx.Unlock()
db = memoryDBs[name]
}
if db == nil {
if flags&vfs.OPEN_CREATE == 0 {
return nil, flags, sqlite3.CANTOPEN
}
db = &mvccDB{name: name}
}
if shared {
db.refs++ // +checklocksforce: memoryMtx is held
memoryDBs[name] = db
}
return &mvccFile{
mvccDB: db,
readOnly: flags&vfs.OPEN_READONLY != 0,
}, flags | vfs.OPEN_MEMORY, nil
}
func (mvccVFS) Delete(name string, dirSync bool) error {
return sqlite3.IOERR_DELETE_NOENT // used to delete journals
}
func (mvccVFS) Access(name string, flag vfs.AccessFlag) (bool, error) {
return false, nil // used to check for journals
}
func (mvccVFS) FullPathname(name string) (string, error) {
return name, nil
}
type mvccDB struct {
data *aa.Tree[int64, string] // +checklocks:mtx
owner *mvccFile // +checklocks:mtx
waiter *sync.Cond // +checklocks:mtx
name string
refs int // +checklocks:memoryMtx
mtx sync.Mutex
}
func (m *mvccDB) release() {
memoryMtx.Lock()
defer memoryMtx.Unlock()
if m.refs--; m.refs == 0 && m == memoryDBs[m.name] {
delete(memoryDBs, m.name)
}
}
func (m *mvccDB) fork() *mvccDB {
m.mtx.Lock()
defer m.mtx.Unlock()
return &mvccDB{
refs: 1,
name: m.name,
data: m.data,
}
}
type mvccFile struct {
*mvccDB
data *aa.Tree[int64, string]
lock vfs.LockLevel
readOnly bool
}
var (
// Ensure these interfaces are implemented:
_ vfs.FileLockState = &mvccFile{}
_ vfs.FileCommitPhaseTwo = &mvccFile{}
)
func (m *mvccFile) Close() error {
// Relase ownership, discard changes.
m.release()
m.data = nil
m.lock = vfs.LOCK_NONE
m.mtx.Lock()
defer m.mtx.Unlock()
if m.owner == m {
m.owner = nil
}
return nil
}
func (m *mvccFile) ReadAt(b []byte, off int64) (n int, err error) {
// If unlocked, use a snapshot of the database.
data := m.data
if m.lock == vfs.LOCK_NONE {
m.mtx.Lock()
defer m.mtx.Unlock()
data = m.mvccDB.data
}
for k, v := range data.AscendFloor(off) {
if i := k - off; i >= 0 {
if +i > int64(n) {
// Missing data.
clear(b[n:])
}
if +i < int64(len(b)) {
// Copy prefix.
n = copy(b[+i:], v) + int(i)
}
} else {
if -i < int64(len(v)) {
// Copy suffix.
n = copy(b, v[-i:])
}
}
if n >= len(b) {
return n, nil
}
}
return n, io.EOF
}
func (m *mvccFile) WriteAt(b []byte, off int64) (n int, err error) {
// If unlocked, take a snapshot of the database.
data := m.data
if m.lock == vfs.LOCK_NONE {
m.mtx.Lock()
defer m.mtx.Unlock()
data = m.mvccDB.data
m.lock = vfs.LOCK_EXCLUSIVE + 1 // UNKNOWN_LOCK
}
next := off + int64(len(b))
for k, v := range data.AscendFloor(off) {
if k >= next {
break
}
switch {
case k > off:
// Delete overlap.
data = data.Delete(k)
case k < off && off < k+int64(len(v)):
// Reinsert prefix.
data = data.Put(k, v[:off-k])
}
if k+int64(len(v)) > next {
// Reinsert suffix.
data = data.Put(next, v[next-k:])
}
}
m.data = data.Put(off, string(b))
return len(b), nil
}
func (m *mvccFile) Size() (int64, error) {
// If unlocked, use a snapshot of the database.
data := m.data
if m.lock == vfs.LOCK_NONE {
m.mtx.Lock()
defer m.mtx.Unlock()
data = m.mvccDB.data
}
if data == nil {
return 0, nil
}
data = data.Max()
return data.Key() + int64(len(data.Value())), nil
}
func (m *mvccFile) Truncate(size int64) error {
// If unlocked, take a snapshot of the database.
data := m.data
if m.lock == vfs.LOCK_NONE {
m.mtx.Lock()
defer m.mtx.Unlock()
data = m.mvccDB.data
m.lock = vfs.LOCK_EXCLUSIVE + 1 // UNKNOWN_LOCK
}
for data != nil && data.Key() >= size {
data = data.Left()
}
for k := range data.AscendCeil(size) {
data = data.Delete(k)
}
m.data = data
return nil
}
func (m *mvccFile) Lock(lock vfs.LockLevel) error {
if m.lock >= lock {
return nil
}
if m.readOnly && lock >= vfs.LOCK_RESERVED {
return sqlite3.IOERR_LOCK
}
m.mtx.Lock()
defer m.mtx.Unlock()
// Take a snapshot of the database.
if lock == vfs.LOCK_SHARED {
m.data = m.mvccDB.data
m.lock = lock
return nil
}
// We are the owners.
if m.owner == m {
m.lock = lock
return nil
}
// Someone else is the owner.
if m.owner != nil {
before := time.Now()
if m.waiter == nil {
m.waiter = sync.NewCond(&m.mtx)
}
defer time.AfterFunc(time.Millisecond, m.waiter.Broadcast).Stop()
for m.owner != nil {
// Our snapshot is invalid.
if m.data != m.mvccDB.data {
return sqlite3.BUSY_SNAPSHOT
}
if time.Since(before) > time.Millisecond {
return sqlite3.BUSY
}
m.waiter.Wait()
}
}
// Our snapshot is invalid.
if m.data != m.mvccDB.data {
return sqlite3.BUSY_SNAPSHOT
}
// Take ownership.
m.lock = lock
m.owner = m
return nil
}
func (m *mvccFile) Unlock(lock vfs.LockLevel) error {
if m.lock <= lock {
return nil
}
m.mtx.Lock()
defer m.mtx.Unlock()
// Relase ownership, commit changes.
if m.owner == m {
m.owner = nil
m.mvccDB.data = m.data
if m.waiter != nil {
m.waiter.Broadcast()
}
}
m.lock = lock
return nil
}
func (m *mvccFile) CheckReservedLock() (bool, error) {
// notest // OPEN_MEMORY
if m.lock >= vfs.LOCK_RESERVED {
return true, nil
}
m.mtx.Lock()
defer m.mtx.Unlock()
return m.owner != nil, nil
}
func (m *mvccFile) CommitPhaseTwo() error {
// Modified without lock, commit changes.
if m.lock > vfs.LOCK_EXCLUSIVE {
m.mtx.Lock()
defer m.mtx.Unlock()
m.mvccDB.data = m.data
}
return nil
}
func (m *mvccFile) LockState() vfs.LockLevel {
return m.lock
}
func (*mvccFile) Sync(flag vfs.SyncFlag) error { return nil }
func (*mvccFile) SectorSize() int {
// notest // safe default
return 0
}
func (*mvccFile) DeviceCharacteristics() vfs.DeviceCharacteristic {
return vfs.IOCAP_ATOMIC |
vfs.IOCAP_SEQUENTIAL |
vfs.IOCAP_SAFE_APPEND |
vfs.IOCAP_POWERSAFE_OVERWRITE |
vfs.IOCAP_SUBPAGE_READ
}

30
vfs/mvcc/mvcc_test.go Normal file
View File

@@ -0,0 +1,30 @@
package mvcc
import (
_ "embed"
"testing"
"github.com/ncruces/go-sqlite3"
_ "github.com/ncruces/go-sqlite3/embed"
_ "github.com/ncruces/go-sqlite3/internal/testcfg"
)
//go:embed testdata/wal.db
var walDB string
func Test_wal(t *testing.T) {
t.Parallel()
Create("test.db", walDB)
db, err := sqlite3.Open("file:/test.db?vfs=mvcc")
if err != nil {
t.Fatal(err)
}
defer db.Close()
err = db.Exec(`CREATE TABLE users (id INT, name VARCHAR(10))`)
if err != nil {
t.Fatal(err)
}
}

BIN
vfs/mvcc/testdata/test.db vendored Normal file

Binary file not shown.

BIN
vfs/mvcc/testdata/wal.db vendored Normal file

Binary file not shown.

View File

@@ -24,6 +24,7 @@ import (
"github.com/ncruces/go-sqlite3/vfs" "github.com/ncruces/go-sqlite3/vfs"
_ "github.com/ncruces/go-sqlite3/vfs/adiantum" _ "github.com/ncruces/go-sqlite3/vfs/adiantum"
"github.com/ncruces/go-sqlite3/vfs/memdb" "github.com/ncruces/go-sqlite3/vfs/memdb"
"github.com/ncruces/go-sqlite3/vfs/mvcc"
_ "github.com/ncruces/go-sqlite3/vfs/xts" _ "github.com/ncruces/go-sqlite3/vfs/xts"
) )
@@ -195,6 +196,50 @@ func Test_multiwrite01_memory(t *testing.T) {
mod.Close(ctx) mod.Close(ctx)
} }
func Test_config01_mvcc(t *testing.T) {
mvcc.Create("test.db", "")
ctx := util.NewContext(newContext(t))
cfg := config(ctx).WithArgs("mptest", "/test.db", "config01.test",
"--vfs", "mvcc")
mod, err := rt.InstantiateModule(ctx, module, cfg)
if err != nil {
t.Fatal(err)
}
mod.Close(ctx)
}
func Test_crash01_mvcc(t *testing.T) {
if testing.Short() {
t.Skip("skipping in short mode")
}
mvcc.Create("test.db", "")
ctx := util.NewContext(newContext(t))
cfg := config(ctx).WithArgs("mptest", "/test.db", "crash01.test",
"--vfs", "mvcc")
mod, err := rt.InstantiateModule(ctx, module, cfg)
if err != nil {
t.Fatal(err)
}
mod.Close(ctx)
}
func Test_multiwrite01_mvcc(t *testing.T) {
if testing.Short() && os.Getenv("CI") != "" {
t.Skip("skipping in slow CI")
}
mvcc.Create("test.db", "")
ctx := util.NewContext(newContext(t))
cfg := config(ctx).WithArgs("mptest", "/test.db", "multiwrite01.test",
"--vfs", "mvcc")
mod, err := rt.InstantiateModule(ctx, module, cfg)
if err != nil {
t.Fatal(err)
}
mod.Close(ctx)
}
func Test_crash01_wal(t *testing.T) { func Test_crash01_wal(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skipping in short mode") t.Skip("skipping in short mode")

View File

@@ -21,6 +21,7 @@ import (
"github.com/ncruces/go-sqlite3/vfs" "github.com/ncruces/go-sqlite3/vfs"
_ "github.com/ncruces/go-sqlite3/vfs/adiantum" _ "github.com/ncruces/go-sqlite3/vfs/adiantum"
_ "github.com/ncruces/go-sqlite3/vfs/memdb" _ "github.com/ncruces/go-sqlite3/vfs/memdb"
_ "github.com/ncruces/go-sqlite3/vfs/mvcc"
_ "github.com/ncruces/go-sqlite3/vfs/xts" _ "github.com/ncruces/go-sqlite3/vfs/xts"
) )