diff --git a/litestream/api.go b/litestream/api.go index 2ceabf8..6bb9554 100644 --- a/litestream/api.go +++ b/litestream/api.go @@ -49,7 +49,7 @@ type ReplicaOptions struct { } // NewReplica creates a read-replica from a Litestream client. -func NewReplica(name string, client litestream.ReplicaClient, options ReplicaOptions) { +func NewReplica(name string, client ReplicaClient, options ReplicaOptions) { if options.Logger != nil { options.Logger = options.Logger.With("name", name) } else { @@ -61,6 +61,7 @@ func NewReplica(name string, client litestream.ReplicaClient, options ReplicaOpt if options.CacheSize == 0 { options.CacheSize = DefaultCacheSize } + options.MinLevel = max(0, min(options.MinLevel, litestream.SnapshotLevel)) liteMtx.Lock() defer liteMtx.Unlock() @@ -78,9 +79,9 @@ func RemoveReplica(name string) { delete(liteDBs, name) } -// NewPrimary creates a new primary that replicates through a client. +// NewPrimary creates a new primary that replicates through client. // If restore is not nil, the database is first restored. -func NewPrimary(ctx context.Context, path string, client litestream.ReplicaClient, restore *litestream.RestoreOptions) (*litestream.DB, error) { +func NewPrimary(ctx context.Context, path string, client ReplicaClient, restore *RestoreOptions) (*litestream.DB, error) { lsdb := litestream.NewDB(path) lsdb.Replica = litestream.NewReplicaWithClient(lsdb, client) @@ -97,3 +98,8 @@ func NewPrimary(ctx context.Context, path string, client litestream.ReplicaClien } return lsdb, nil } + +type ( + ReplicaClient = litestream.ReplicaClient + RestoreOptions = litestream.RestoreOptions +) diff --git a/litestream/api_test.go b/litestream/api_test.go new file mode 100644 index 0000000..dd150e3 --- /dev/null +++ b/litestream/api_test.go @@ -0,0 +1,104 @@ +package litestream + +import ( + "path/filepath" + "testing" + "time" + + "github.com/benbjohnson/litestream" + "github.com/benbjohnson/litestream/file" + "github.com/ncruces/go-sqlite3/driver" + _ "github.com/ncruces/go-sqlite3/embed" +) + +func Test_integration(t *testing.T) { + dir := t.TempDir() + dbpath := filepath.Join(dir, "test.db") + backup := filepath.Join(dir, "backup", "test.db") + + db, err := driver.Open(dbpath) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + client := file.NewReplicaClient(backup) + NewReplica("test.db", client, ReplicaOptions{}) + + if err := setupPrimary(t, dbpath, client); err != nil { + t.Fatal(err) + } + + replica, err := driver.Open("file:test.db?vfs=litestream") + if err != nil { + t.Fatal(err) + } + defer replica.Close() + + _, err = db.ExecContext(t.Context(), `CREATE TABLE users (id INT, name VARCHAR(10))`) + if err != nil { + t.Fatal(err) + } + + _, err = db.ExecContext(t.Context(), + `INSERT INTO users (id, name) VALUES (0, 'go'), (1, 'zig'), (2, 'whatever')`) + if err != nil { + t.Fatal(err) + } + + time.Sleep(DefaultPollInterval + litestream.DefaultMonitorInterval) + + rows, err := replica.QueryContext(t.Context(), `SELECT id, name FROM users`) + if err != nil { + t.Fatal(err) + } + defer rows.Close() + + row := 0 + ids := []int{0, 1, 2} + names := []string{"go", "zig", "whatever"} + for ; rows.Next(); row++ { + var id int + var name string + err := rows.Scan(&id, &name) + if err != nil { + t.Fatal(err) + } + + if id != ids[row] { + t.Errorf("got %d, want %d", id, ids[row]) + } + if name != names[row] { + t.Errorf("got %q, want %q", name, names[row]) + } + } + if row != 3 { + t.Errorf("got %d, want %d", row, len(ids)) + } + + var lag int + err = replica.QueryRowContext(t.Context(), `PRAGMA litestream_lag`).Scan(&lag) + if err != nil { + t.Fatal(err) + } + if lag < 0 || lag > 2 { + t.Errorf("got %d", lag) + } + + var txid string + err = replica.QueryRowContext(t.Context(), `PRAGMA litestream_txid`).Scan(&txid) + if err != nil { + t.Fatal(err) + } + if txid != "0000000000000001" { + t.Errorf("got %q", txid) + } +} + +func setupPrimary(tb testing.TB, path string, client ReplicaClient) error { + db, err := NewPrimary(tb.Context(), path, client, nil) + if err == nil { + tb.Cleanup(func() { db.Close(tb.Context()) }) + } + return err +} diff --git a/litestream/go.mod b/litestream/go.mod index 1a1175c..67dc7e3 100644 --- a/litestream/go.mod +++ b/litestream/go.mod @@ -3,8 +3,8 @@ module github.com/ncruces/go-sqlite3/litestream go 1.24.4 require ( - github.com/benbjohnson/litestream v0.5.2 - github.com/ncruces/go-sqlite3 v0.30.1 + github.com/benbjohnson/litestream v0.5.3-0.20251109214555-48dc960260f0 + github.com/ncruces/go-sqlite3 v0.30.2 github.com/ncruces/wbt v0.2.0 github.com/superfly/ltx v0.5.0 golang.org/x/sync v0.18.0 diff --git a/litestream/go.sum b/litestream/go.sum index ad5acb4..a7a06a1 100644 --- a/litestream/go.sum +++ b/litestream/go.sum @@ -60,8 +60,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.35.1 h1:iF4Xxkc0H9c/K2dS0zZw3SCkj0Z7 github.com/aws/aws-sdk-go-v2/service/sts v1.35.1/go.mod h1:0bxIatfN0aLq4mjoLDeBpOjOke68OsFlXPDFJ7V0MYw= github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= -github.com/benbjohnson/litestream v0.5.2 h1:uD9I17n6RgUgyCwPM/Sw2YXNmMGixecUB5kmJ4FL08o= -github.com/benbjohnson/litestream v0.5.2/go.mod h1:jSW6AGqbxmJnEXGjMHchlZclGphzbJ6jGrGo5fYIDhU= +github.com/benbjohnson/litestream v0.5.3-0.20251109214555-48dc960260f0 h1:190qM2axbs1p7NyRm5/ygqrDdcIYhc5QXPTK04Cgyno= +github.com/benbjohnson/litestream v0.5.3-0.20251109214555-48dc960260f0/go.mod h1:0gjSLi7Qm5INdtdo6YJMFMsS5e/KM2s3CFYF3OtLmY8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -113,8 +113,8 @@ github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/ncruces/go-sqlite3 v0.30.1 h1:pHC3YsyRdJv4pCMB4MO1Q2BXw/CAa+Hoj7GSaKtVk+g= -github.com/ncruces/go-sqlite3 v0.30.1/go.mod h1:UVsWrQaq1qkcal5/vT5lOJnZCVlR5rsThKdwidjFsKc= +github.com/ncruces/go-sqlite3 v0.30.2 h1:1GVbHAkKAOwjJd3JYl8ldrYROudfZUOah7oXPD7VZbQ= +github.com/ncruces/go-sqlite3 v0.30.2/go.mod h1:AxKu9sRxkludimFocbktlY6LiYSkxiI5gTA8r+os/Nw= github.com/ncruces/go-sqlite3/litestream/modernc v0.30.1 h1:3SNAOrm+qmLprkZybcvBrVNHyt0QYHljUGxmOXnL+K0= github.com/ncruces/go-sqlite3/litestream/modernc v0.30.1/go.mod h1:GSM2gXEOb9HIFFtsl0IUtnpvpDmVi7Kbp8z5GzwA0Tw= github.com/ncruces/julianday v1.0.0 h1:fH0OKwa7NWvniGQtxdJRxAgkBMolni2BjDHaWTxqt7M= @@ -143,6 +143,8 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/studio-b12/gowebdav v0.11.0 h1:qbQzq4USxY28ZYsGJUfO5jR+xkFtcnwWgitp4Zp1irU= +github.com/studio-b12/gowebdav v0.11.0/go.mod h1:bHA7t77X/QFExdeAnDzK6vKM34kEZAcE1OX4MfiwjkE= github.com/superfly/ltx v0.5.0 h1:dXNrcT3ZtMb6iKZopIV7z5UBscnapg0b0F02loQsk5o= github.com/superfly/ltx v0.5.0/go.mod h1:Nf50QAIXU/ET4ua3AuQ2fh31MbgNQZA7r/DYx6Os77s= github.com/tetratelabs/wazero v1.10.1 h1:2DugeJf6VVk58KTPszlNfeeN8AhhpwcZqkJj2wwFuH8= diff --git a/litestream/vfs.go b/litestream/vfs.go index f92f93e..f385a06 100644 --- a/litestream/vfs.go +++ b/litestream/vfs.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "strconv" "sync" "time" @@ -160,6 +161,34 @@ func (f *liteFile) DeviceCharacteristics() vfs.DeviceCharacteristic { return 0 } +func (f *liteFile) Pragma(name, value string) (string, error) { + switch name { + case "litestream_txid": + txid := f.txid + if txid == 0 { + // Outside transaction. + f.db.mtx.Lock() + txid = f.db.txids[f.db.opts.MinLevel] + f.db.mtx.Unlock() + } + return txid.String(), nil + + case "litestream_lag": + f.db.mtx.Lock() + lastPoll := f.db.lastPoll + f.db.mtx.Unlock() + + if lastPoll.IsZero() { + // Never polled successfully. + return "-1", nil + } + lag := time.Since(lastPoll) / time.Second + return strconv.FormatInt(int64(lag), 10), nil + } + + return "", sqlite3.NOTFOUND +} + func (f *liteFile) SetDB(conn any) { f.conn = conn.(*sqlite3.Conn) } @@ -216,7 +245,7 @@ func (f *liteDB) pollReplica(ctx context.Context) (*pageIndex, ltx.TXID, error) // Limit polling interval. if time.Since(f.lastPoll) < f.opts.PollInterval { - return f.pages, f.txids[0], nil + return f.pages, f.txids[f.opts.MinLevel], nil } for level := range pollLevels(f.opts.MinLevel) { @@ -227,7 +256,7 @@ func (f *liteDB) pollReplica(ctx context.Context) (*pageIndex, ltx.TXID, error) } f.lastPoll = time.Now() - return f.pages, f.txids[0], nil + return f.pages, f.txids[f.opts.MinLevel], nil } // +checklocks:f.mtx