diff --git a/litestream/api.go b/litestream/api.go index 00bb75d..ada21ee 100644 --- a/litestream/api.go +++ b/litestream/api.go @@ -44,7 +44,6 @@ func NewReplica(name string, client litestream.ReplicaClient, options ReplicaOpt if options.PollInterval <= 0 { options.PollInterval = DefaultPollInterval } - options.MinLevel = max(0, min(options.MinLevel, litestream.SnapshotLevel)) liteMtx.Lock() defer liteMtx.Unlock() diff --git a/litestream/example_test.go b/litestream/example_test.go new file mode 100644 index 0000000..712a09f --- /dev/null +++ b/litestream/example_test.go @@ -0,0 +1,47 @@ +package litestream_test + +import ( + "log" + "time" + + "github.com/benbjohnson/litestream/s3" + "github.com/ncruces/go-sqlite3/driver" + _ "github.com/ncruces/go-sqlite3/embed" + "github.com/ncruces/go-sqlite3/litestream" +) + +func ExampleNewReplica() { + client := s3.NewReplicaClient() + client.Bucket = "test-bucket" + client.Path = "fruits.db" + + litestream.NewReplica("fruits.db", client, litestream.ReplicaOptions{ + PollInterval: 5 * time.Second, + }) + + db, err := driver.Open("file:fruits.db?vfs=litestream") + if err != nil { + log.Fatalln(err) + } + defer db.Close() + + for { + time.Sleep(time.Second) + rows, err := db.Query("SELECT * FROM fruits") + if err != nil { + log.Fatalln(err) + } + + for rows.Next() { + var name, color string + err := rows.Scan(&name, &color) + if err != nil { + log.Fatalln(err) + } + log.Println(name, color) + } + + log.Println("===") + rows.Close() + } +} diff --git a/litestream/vfs.go b/litestream/vfs.go index f4d63ce..76750da 100644 --- a/litestream/vfs.go +++ b/litestream/vfs.go @@ -214,10 +214,7 @@ func (f *liteDB) pollReplica(ctx context.Context) (*pageIndex, ltx.TXID, error) return f.pages, f.txids[0], nil } - // Updating from MinLevel to SnapshotLevel is non-racy, - // since LTX files are compacted into higher levels - // before the lower level LTX files are deleted. - for level := f.opts.MinLevel; level <= litestream.SnapshotLevel; level++ { + for level := range pollLevels(f.opts.MinLevel) { if err := f.updateLevel(ctx, level); err != nil { f.opts.Logger.Error("cannot poll replica", "error", err) return nil, 0, err @@ -285,6 +282,23 @@ func (f *liteDB) updateInfo(ctx context.Context, info *ltx.FileInfo) error { return nil } +func pollLevels(minLevel int) (r []int) { + // Updating from lower to upper levels is non-racy, + // since LTX files are compacted into higher levels + // before the lower level LTX files are deleted. + + // Also, only level 0 compactions and snapshots delete files, + // so the intermediate levels never need to be updated. + + if minLevel <= 0 { + return append(r, 0, 1, litestream.SnapshotLevel) + } + if minLevel >= litestream.SnapshotLevel { + return append(r, litestream.SnapshotLevel) + } + return append(r, minLevel, litestream.SnapshotLevel) +} + // Type aliases; these are a mouthful. type pageIndex = wbt.Tree[uint32, ltx.PageIndexElem] type levelTXIDs = [litestream.SnapshotLevel + 1]ltx.TXID diff --git a/litestream/vfs_test.go b/litestream/vfs_test.go index 712a09f..0fb2a38 100644 --- a/litestream/vfs_test.go +++ b/litestream/vfs_test.go @@ -1,47 +1,33 @@ -package litestream_test +package litestream import ( - "log" - "time" + "slices" + "strconv" + "testing" - "github.com/benbjohnson/litestream/s3" - "github.com/ncruces/go-sqlite3/driver" + "github.com/benbjohnson/litestream" _ "github.com/ncruces/go-sqlite3/embed" - "github.com/ncruces/go-sqlite3/litestream" ) -func ExampleNewReplica() { - client := s3.NewReplicaClient() - client.Bucket = "test-bucket" - client.Path = "fruits.db" - - litestream.NewReplica("fruits.db", client, litestream.ReplicaOptions{ - PollInterval: 5 * time.Second, - }) - - db, err := driver.Open("file:fruits.db?vfs=litestream") - if err != nil { - log.Fatalln(err) +func Test_pollLevels(t *testing.T) { + tests := []struct { + minLevel int + want []int + }{ + {minLevel: -1, want: []int{0, 1, litestream.SnapshotLevel}}, + {minLevel: 0, want: []int{0, 1, litestream.SnapshotLevel}}, + {minLevel: 1, want: []int{1, litestream.SnapshotLevel}}, + {minLevel: 2, want: []int{2, litestream.SnapshotLevel}}, + {minLevel: 3, want: []int{3, litestream.SnapshotLevel}}, + {minLevel: litestream.SnapshotLevel, want: []int{litestream.SnapshotLevel}}, + {minLevel: litestream.SnapshotLevel + 1, want: []int{litestream.SnapshotLevel}}, } - defer db.Close() - - for { - time.Sleep(time.Second) - rows, err := db.Query("SELECT * FROM fruits") - if err != nil { - log.Fatalln(err) - } - - for rows.Next() { - var name, color string - err := rows.Scan(&name, &color) - if err != nil { - log.Fatalln(err) + for _, tt := range tests { + t.Run(strconv.Itoa(tt.minLevel), func(t *testing.T) { + got := pollLevels(tt.minLevel) + if !slices.Equal(got, tt.want) { + t.Errorf("pollLevels() = %v, want %v", got, tt.want) } - log.Println(name, color) - } - - log.Println("===") - rows.Close() + }) } }