diff --git a/embed/bcw2/build.sh b/embed/bcw2/build.sh index cc561b6..1a5a6e1 100755 --- a/embed/bcw2/build.sh +++ b/embed/bcw2/build.sh @@ -23,6 +23,7 @@ if [[ "$OSTYPE" == "msys" || "$OSTYPE" == "cygwin" ]]; then MSYS_NO_PATHCONV=1 nmake /f makefile.msc sqlite3.c "OPTS=-DSQLITE_ENABLE_UPDATE_DELETE_LIMIT -DSQLITE_ENABLE_ORDERED_SET_AGGREGATES" else sh configure --enable-update-limit + make verify-source OPTS=-DSQLITE_ENABLE_ORDERED_SET_AGGREGATES make sqlite3.c fi cd ~- diff --git a/litestream/vfs.go b/litestream/vfs.go index b4e218c..55feeed 100644 --- a/litestream/vfs.go +++ b/litestream/vfs.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "strconv" + "strings" "sync" "time" @@ -62,8 +63,10 @@ type liteFile struct { db *liteDB conn *sqlite3.Conn pages *pageIndex + syncTime time.Time txid ltx.TXID pageSize uint32 + locked bool } func (f *liteFile) Close() error { return nil } @@ -71,10 +74,11 @@ func (f *liteFile) Close() error { return nil } func (f *liteFile) ReadAt(p []byte, off int64) (n int, err error) { ctx := f.context() pages, txid := f.pages, f.txid - if pages == nil { + if pages == nil && f.syncTime.IsZero() { pages, txid, err = f.db.pollReplica(ctx) } if err != nil { + f.db.opts.Logger.Error("poll replica", "error", err) return 0, err } @@ -135,14 +139,25 @@ func (f *liteFile) Size() (size int64, err error) { func (f *liteFile) Lock(lock vfs.LockLevel) (err error) { if lock >= vfs.LOCK_RESERVED { + // notest // OPEN_READONLY return sqlite3.IOERR_LOCK } - f.pages, f.txid, err = f.db.pollReplica(f.context()) + if f.syncTime.IsZero() { + f.pages, f.txid, err = f.db.pollReplica(f.context()) + } + if err != nil { + f.db.opts.Logger.Error("poll replica", "error", err) + } else { + f.locked = true + } return err } func (f *liteFile) Unlock(lock vfs.LockLevel) error { - f.pages, f.txid = nil, 0 + if f.syncTime.IsZero() { + f.pages, f.txid = nil, 0 + } + f.locked = false return nil } @@ -166,7 +181,6 @@ func (f *liteFile) Pragma(name, value string) (string, error) { case "litestream_txid": txid := f.txid if txid == 0 { - // Outside transaction. f.db.mtx.Lock() txid = f.db.txids[0] f.db.mtx.Unlock() @@ -179,11 +193,45 @@ func (f *liteFile) Pragma(name, value string) (string, error) { f.db.mtx.Unlock() if lastPoll.IsZero() { - // Never polled successfully. return "-1", nil } lag := time.Since(lastPoll) / time.Second return strconv.FormatInt(int64(lag), 10), nil + + case "litestream_time": + if value == "" { + syncTime := f.syncTime + if syncTime.IsZero() { + f.db.mtx.Lock() + syncTime = f.db.lastInfo + f.db.mtx.Unlock() + } + if syncTime.IsZero() { + return "latest", nil + } + return syncTime.Format(time.RFC3339Nano), nil + } + + if !f.locked { + return "", sqlite3.MISUSE + } + + if strings.EqualFold(value, "latest") { + f.syncTime = time.Time{} + f.pages, f.txid = nil, 0 + return "", nil + } + + syncTime, err := sqlite3.TimeFormatAuto.Decode(value) + if err != nil { + return "", err + } + + err = f.buildIndex(f.context(), syncTime) + if err != nil { + f.db.opts.Logger.Error("build index", "error", err) + } + return "", err } return "", sqlite3.NOTFOUND @@ -200,27 +248,53 @@ func (f *liteFile) context() context.Context { return context.Background() } +func (f *liteFile) buildIndex(ctx context.Context, syncTime time.Time) error { + // Build the index from scratch from a Litestream restore plan. + infos, err := litestream.CalcRestorePlan(ctx, f.db.client, 0, syncTime, f.db.opts.Logger) + if err != nil { + if !errors.Is(err, litestream.ErrTxNotAvailable) { + return fmt.Errorf("calc restore plan: %w", err) + } + return nil + } + + var txid ltx.TXID + var pages *pageIndex + for _, info := range infos { + pages, err = fetchPageIndex(ctx, pages, f.db.client, info) + if err != nil { + return err + } + txid = max(txid, info.MaxTXID) + } + f.syncTime = syncTime + f.pages = pages + f.txid = txid + return nil +} + type liteDB struct { client litestream.ReplicaClient opts ReplicaOptions cache pageCache pages *pageIndex // +checklocks:mtx lastPoll time.Time // +checklocks:mtx + lastInfo time.Time // +checklocks:mtx txids levelTXIDs // +checklocks:mtx mtx sync.Mutex } -func (f *liteDB) buildIndex(ctx context.Context) error { - f.mtx.Lock() - defer f.mtx.Unlock() +func (d *liteDB) buildIndex(ctx context.Context) error { + d.mtx.Lock() + defer d.mtx.Unlock() // Skip if we already have an index. - if f.pages != nil { + if d.pages != nil { return nil } // Build the index from scratch from a Litestream restore plan. - infos, err := litestream.CalcRestorePlan(ctx, f.client, 0, time.Time{}, f.opts.Logger) + infos, err := litestream.CalcRestorePlan(ctx, d.client, 0, time.Time{}, d.opts.Logger) if err != nil { if !errors.Is(err, litestream.ErrTxNotAvailable) { return fmt.Errorf("calc restore plan: %w", err) @@ -229,47 +303,46 @@ func (f *liteDB) buildIndex(ctx context.Context) error { } for _, info := range infos { - err := f.updateInfo(ctx, info) + err := d.updateInfo(ctx, info) if err != nil { return err } } - f.lastPoll = time.Now() + d.lastPoll = time.Now() return nil } -func (f *liteDB) pollReplica(ctx context.Context) (*pageIndex, ltx.TXID, error) { - f.mtx.Lock() - defer f.mtx.Unlock() +func (d *liteDB) pollReplica(ctx context.Context) (*pageIndex, ltx.TXID, error) { + d.mtx.Lock() + defer d.mtx.Unlock() // Limit polling interval. - if time.Since(f.lastPoll) < f.opts.PollInterval { - return f.pages, f.txids[0], nil + if time.Since(d.lastPoll) < d.opts.PollInterval { + return d.pages, d.txids[0], nil } for level := range []int{0, 1, litestream.SnapshotLevel} { - if err := f.updateLevel(ctx, level); err != nil { - f.opts.Logger.Error("cannot poll replica", "error", err) + if err := d.updateLevel(ctx, level); err != nil { return nil, 0, err } } - f.lastPoll = time.Now() - return f.pages, f.txids[0], nil + d.lastPoll = time.Now() + return d.pages, d.txids[0], nil } -// +checklocks:f.mtx -func (f *liteDB) updateLevel(ctx context.Context, level int) error { +// +checklocks:d.mtx +func (d *liteDB) updateLevel(ctx context.Context, level int) error { var nextTXID ltx.TXID // Snapshots must start from scratch, // other levels can start from where they were left. if level != litestream.SnapshotLevel { - nextTXID = f.txids[level] + 1 + nextTXID = d.txids[level] + 1 } // Start reading from the next LTX file after the current position. - itr, err := f.client.LTXFiles(ctx, level, nextTXID, false) + itr, err := d.client.LTXFiles(ctx, level, nextTXID, false) if err != nil { return fmt.Errorf("ltx files: %w", err) } @@ -280,11 +353,11 @@ func (f *liteDB) updateLevel(ctx context.Context, level int) error { info := itr.Item() // Skip LTX files already fully loaded into the index. - if info.MaxTXID <= f.txids[level] { + if info.MaxTXID <= d.txids[level] { continue } - err := f.updateInfo(ctx, info) + err := d.updateInfo(ctx, info) if err != nil { return err } @@ -295,26 +368,41 @@ func (f *liteDB) updateLevel(ctx context.Context, level int) error { return itr.Close() } -// +checklocks:f.mtx -func (f *liteDB) updateInfo(ctx context.Context, info *ltx.FileInfo) error { - idx, err := litestream.FetchPageIndex(ctx, f.client, info) +// +checklocks:d.mtx +func (d *liteDB) updateInfo(ctx context.Context, info *ltx.FileInfo) error { + pages, err := fetchPageIndex(ctx, d.pages, d.client, info) if err != nil { - return fmt.Errorf("fetch page index: %w", err) + return err + } + + // Track the MaxTXID for each level. + maxTXID := &d.txids[info.Level] + *maxTXID = max(*maxTXID, info.MaxTXID) + d.txids[0] = max(d.txids[0], *maxTXID) + if d.lastInfo.Before(info.CreatedAt) { + d.lastInfo = info.CreatedAt + } + d.pages = pages + return nil +} + +func fetchPageIndex( + ctx context.Context, pages *pageIndex, + client litestream.ReplicaClient, info *ltx.FileInfo) (*pageIndex, error) { + + idx, err := litestream.FetchPageIndex(ctx, client, info) + if err != nil { + return nil, fmt.Errorf("fetch page index: %w", err) } // Replace pages in the index with new pages. for k, v := range idx { // Patch avoids mutating the index for an unmodified page. - f.pages = f.pages.Patch(k, func(node *pageIndex) (ltx.PageIndexElem, bool) { + pages = pages.Patch(k, func(node *pageIndex) (ltx.PageIndexElem, bool) { return v, node == nil || v != node.Value() }) } - - // Track the MaxTXID for each level. - maxTXID := &f.txids[info.Level] - *maxTXID = max(*maxTXID, info.MaxTXID) - f.txids[0] = max(f.txids[0], *maxTXID) - return nil + return pages, nil } // Type aliases; these are a mouthful. diff --git a/litestream/api_test.go b/litestream/vfs_test.go similarity index 100% rename from litestream/api_test.go rename to litestream/vfs_test.go diff --git a/vfs/vfs.go b/vfs/vfs.go index db53b6f..d1e1964 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -329,9 +329,9 @@ func vfsFileControlImpl(ctx context.Context, mod api.Module, file File, op _Fcnt case _FCNTL_PRAGMA: if file, ok := file.(FilePragma); ok { + var value string ptr := util.Read32[ptr_t](mod, pArg+1*ptrlen) name := util.ReadString(mod, ptr, _MAX_SQL_LENGTH) - var value string if ptr := util.Read32[ptr_t](mod, pArg+2*ptrlen); ptr != 0 { value = util.ReadString(mod, ptr, _MAX_SQL_LENGTH) }