diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d08ef2e..3078294 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -68,8 +68,8 @@ jobs: shell: bash run: | go work init . - go work use -r embed/bcw2 gormlite litestream - go test ./embed/bcw2 ./gormlite ./litestream + go work use -r embed/bcw2 gormlite + go test ./embed/bcw2 ./gormlite - name: Test GORM shell: bash diff --git a/litestream/README.md b/litestream/README.md deleted file mode 100644 index ec46c85..0000000 --- a/litestream/README.md +++ /dev/null @@ -1,11 +0,0 @@ -# Litestream lightweight read-replicas - -This package implements the **EXPERIMENTAL** `"litestream"` SQLite VFS -that offers Litestream [lightweight read-replicas](https://fly.io/blog/litestream-revamped/#lightweight-read-replicas). - -See the [example](example_test.go) for how to use. - -Our `PRAGMA litestream_time` accepts: -- Go [duration strings](https://pkg.go.dev/time#ParseDuration) -- SQLite [time values](https://sqlite.org/lang_datefunc.html#time_values) -- SQLite [time modifiers 1 through 13](https://sqlite.org/lang_datefunc.html#modifiers) diff --git a/litestream/api.go b/litestream/api.go deleted file mode 100644 index e61bd76..0000000 --- a/litestream/api.go +++ /dev/null @@ -1,77 +0,0 @@ -// Package litestream implements a Litestream lightweight read-replica VFS. -package litestream - -import ( - "log/slog" - "sync" - "time" - - "github.com/benbjohnson/litestream" - - "github.com/ncruces/go-sqlite3/vfs" -) - -const ( - // The default poll interval. - DefaultPollInterval = 1 * time.Second - - // The default cache size: 10 MiB. - DefaultCacheSize = 10 * 1024 * 1024 -) - -func init() { - vfs.Register("litestream", liteVFS{}) -} - -var ( - liteMtx sync.RWMutex - // +checklocks:liteMtx - liteDBs = map[string]*liteDB{} -) - -// ReplicaOptions represents options for [NewReplica]. -type ReplicaOptions struct { - // Where to log error messages. May be nil. - Logger *slog.Logger - - // Replica poll interval. - // Should be less than the compaction interval - // used by the replica at MinLevel+1. - PollInterval time.Duration - - // CacheSize is the maximum size of the page cache in bytes. - // Zero means DefaultCacheSize, negative disables caching. - CacheSize int -} - -// NewReplica creates a read-replica from a Litestream client. -func NewReplica(name string, client ReplicaClient, options ReplicaOptions) { - if options.Logger != nil { - options.Logger = options.Logger.With("name", name) - } else { - options.Logger = slog.New(slog.DiscardHandler) - } - if options.PollInterval <= 0 { - options.PollInterval = DefaultPollInterval - } - if options.CacheSize == 0 { - options.CacheSize = DefaultCacheSize - } - - liteMtx.Lock() - liteDBs[name] = &liteDB{ - client: client, - opts: options, - cache: pageCache{size: options.CacheSize}, - } - liteMtx.Unlock() -} - -// RemoveReplica removes a replica by name. -func RemoveReplica(name string) { - liteMtx.Lock() - delete(liteDBs, name) - liteMtx.Unlock() -} - -type ReplicaClient = litestream.ReplicaClient diff --git a/litestream/cache.go b/litestream/cache.go deleted file mode 100644 index 0f4c111..0000000 --- a/litestream/cache.go +++ /dev/null @@ -1,70 +0,0 @@ -package litestream - -import ( - "context" - "fmt" - "sync" - - "github.com/benbjohnson/litestream" - "github.com/superfly/ltx" -) - -type pageCache struct { - pages map[uint32]cachedPage // +checklocks:mtx - size int - mtx sync.Mutex -} - -type cachedPage struct { - data []byte - txid ltx.TXID -} - -func (c *pageCache) getOrFetch(ctx context.Context, client ReplicaClient, pgno uint32, elem ltx.PageIndexElem) ([]byte, error) { - if c.size > 0 { - c.mtx.Lock() - page := c.pages[pgno] - c.mtx.Unlock() - - if page.txid == elem.MaxTXID { - return page.data, nil - } - } - - h, data, err := litestream.FetchPage(ctx, client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size) - if err != nil { - return nil, fmt.Errorf("fetch page: %w", err) - } - if pgno != h.Pgno { - return nil, fmt.Errorf("fetch page: want %d, got %d", pgno, h.Pgno) - } - - if c.size > 0 { - c.mtx.Lock() - if c.pages != nil { - c.evict(len(data)) - } else { - c.pages = map[uint32]cachedPage{} - } - c.pages[pgno] = cachedPage{data, elem.MaxTXID} - c.mtx.Unlock() - } - return data, nil -} - -// +checklocks:c.mtx -func (c *pageCache) evict(pageSize int) { - // Evict random keys until we're under the maximum size. - // SQLite has its own page cache, which it will use for each connection. - // Since this is a second layer of shared cache, - // random eviction is probably good enough. - if pageSize*len(c.pages) < c.size { - return - } - for key := range c.pages { - delete(c.pages, key) - if pageSize*len(c.pages) < c.size { - return - } - } -} diff --git a/litestream/example_test.go b/litestream/example_test.go deleted file mode 100644 index 8c89387..0000000 --- a/litestream/example_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package litestream_test - -import ( - "log" - "time" - - "github.com/benbjohnson/litestream/s3" - - "github.com/ncruces/go-sqlite3/driver" - _ "github.com/ncruces/go-sqlite3/embed" - "github.com/ncruces/go-sqlite3/litestream" -) - -func ExampleNewReplica() { - client := s3.NewReplicaClient() - client.Bucket = "test-bucket" - client.Path = "fruits.db" - - litestream.NewReplica("fruits.db", client, litestream.ReplicaOptions{ - PollInterval: 5 * time.Second, - }) - - db, err := driver.Open("file:fruits.db?vfs=litestream") - if err != nil { - log.Fatalln(err) - } - defer db.Close() - - for { - time.Sleep(time.Second) - rows, err := db.Query("SELECT * FROM fruits") - if err != nil { - log.Fatalln(err) - } - - for rows.Next() { - var name, color string - err := rows.Scan(&name, &color) - if err != nil { - log.Fatalln(err) - } - log.Println(name, color) - } - - log.Println("===") - rows.Close() - } -} diff --git a/litestream/go.mod b/litestream/go.mod deleted file mode 100644 index 3611251..0000000 --- a/litestream/go.mod +++ /dev/null @@ -1,62 +0,0 @@ -module github.com/ncruces/go-sqlite3/litestream - -go 1.24.4 - -require ( - github.com/benbjohnson/litestream v0.5.5 - github.com/ncruces/go-sqlite3 v0.30.4 - github.com/ncruces/wbt v0.2.0 - github.com/superfly/ltx v0.5.1 -) - -// github.com/ncruces/go-sqlite3 -require ( - github.com/ncruces/julianday v1.0.0 // indirect - github.com/tetratelabs/wazero v1.11.0 // indirect - golang.org/x/sys v0.39.0 // indirect -) - -// github.com/superfly/ltx -require github.com/pierrec/lz4/v4 v4.1.22 // indirect - -// github.com/benbjohnson/litestream -require ( - github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/kr/text v0.2.0 // indirect - github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_golang v1.23.2 // indirect - github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.67.4 // indirect - github.com/prometheus/procfs v0.19.2 // indirect - github.com/rogpeppe/go-internal v1.12.0 // indirect - go.yaml.in/yaml/v2 v2.4.3 // indirect - google.golang.org/protobuf v1.36.11 // indirect -) - -// github.com/benbjohnson/litestream/s3 -require ( - github.com/aws/aws-sdk-go-v2 v1.37.1 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 // indirect - github.com/aws/aws-sdk-go-v2/config v1.30.2 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.18.2 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.1 // indirect - github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.1 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.1 // indirect - github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect - github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.1 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.1 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.1 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.1 // indirect - github.com/aws/aws-sdk-go-v2/service/s3 v1.85.1 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.26.1 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.31.1 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.35.1 // indirect - github.com/aws/smithy-go v1.22.5 // indirect -) - -replace github.com/benbjohnson/litestream => github.com/ncruces/litestream v0.5.5 diff --git a/litestream/go.sum b/litestream/go.sum deleted file mode 100644 index 19679ae..0000000 --- a/litestream/go.sum +++ /dev/null @@ -1,190 +0,0 @@ -cloud.google.com/go v0.110.10 h1:LXy9GEO+timppncPIAZoOj3l58LIU9k+kn48AN7IO3Y= -cloud.google.com/go v0.110.10/go.mod h1:v1OoFqYxiBkUrruItNM3eT4lLByNjxmJSV/xDKJNnic= -cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk= -cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI= -cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= -cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= -cloud.google.com/go/iam v1.1.5 h1:1jTsCu4bcsNsE4iiqNT5SHwrDRCfRmIaaaVFhRveTJI= -cloud.google.com/go/iam v1.1.5/go.mod h1:rB6P/Ic3mykPbFio+vo7403drjlgvoWfYpJhMXEbzv8= -cloud.google.com/go/storage v1.36.0 h1:P0mOkAcaJxhCTvAkMhxMfrTKiNcub4YmmPBtlhAyTr8= -cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYEsng2xgOs8= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.2 h1:Hr5FTipp7SL07o2FvoVOX9HRiRH3CR3Mj8pxqCcdD5A= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.2/go.mod h1:QyVsSSN64v5TGltphKLQ2sQxe4OBQg0J1eKRcVBnfgE= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.11.0 h1:MhRfI58HblXzCtWEZCO0feHs8LweePB3s90r7WaR1KU= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.11.0/go.mod h1:okZ+ZURbArNdlJ+ptXoyHNuOETzOl1Oww19rm8I2WLA= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 h1:9iefClla7iYpfYWdzPCRDozdmndjTm8DXdpCzPajMgA= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2/go.mod h1:XtLgD3ZD34DAaVIIAyG3objl5DynM3CQ/vMcbBNJZGI= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 h1:FwladfywkNirM+FZYLBR2kBz5C8Tg0fw5w5Y7meRXWI= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2/go.mod h1:vv5Ad0RrIoT1lJFdWBZwt4mB1+j+V8DUroixmKDTCdk= -github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 h1:oygO0locgZJe7PpYPXT5A29ZkwJaPqcva7BVeemZOZs= -github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= -github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.3.0 h1:wQlqotpyjYPjJz+Noh5bRu7Snmydk8SKC5Z6u1CR20Y= -github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.3.0/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M= -github.com/aws/aws-sdk-go-v2 v1.37.1 h1:SMUxeNz3Z6nqGsXv0JuJXc8w5YMtrQMuIBmDx//bBDY= -github.com/aws/aws-sdk-go-v2 v1.37.1/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 h1:6GMWV6CNpA/6fbFHnoAjrv4+LGfyTqZz2LtCHnspgDg= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0/go.mod h1:/mXlTIVG9jbxkqDnr5UQNQxW1HRYxeGklkM9vAFeabg= -github.com/aws/aws-sdk-go-v2/config v1.30.2 h1:YE1BmSc4fFYqFgN1mN8uzrtc7R9x+7oSWeX8ckoltAw= -github.com/aws/aws-sdk-go-v2/config v1.30.2/go.mod h1:UNrLGZ6jfAVjgVJpkIxjLufRJqTXCVYOpkeVf83kwBo= -github.com/aws/aws-sdk-go-v2/credentials v1.18.2 h1:mfm0GKY/PHLhs7KO0sUaOtFnIQ15Qqxt+wXbO/5fIfs= -github.com/aws/aws-sdk-go-v2/credentials v1.18.2/go.mod h1:v0SdJX6ayPeZFQxgXUKw5RhLpAoZUuynxWDfh8+Eknc= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.1 h1:owmNBboeA0kHKDcdF8KiSXmrIuXZustfMGGytv6OMkM= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.1/go.mod h1:Bg1miN59SGxrZqlP8vJZSmXW+1N8Y1MjQDq1OfuNod8= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.2 h1:YFX4DvH1CPQXgQR8935b46Om+L7+6jus4aTdKqyDR84= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.2/go.mod h1:DgMPy7GqxcV0RSyaITnI3rw8HC3lIHB87U3KPQKDxHg= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.1 h1:ksZXBYv80EFTcgc8OJO48aQ8XDWXIQL7gGasPeCoTzI= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.1/go.mod h1:HSksQyyJETVZS7uM54cir0IgxttTD+8aEoJMPGepHBI= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.1 h1:+dn/xF/05utS7tUhjIcndbuaPjfll2LhbH1cCDGLYUQ= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.1/go.mod h1:hyAGz30LHdm5KBZDI58MXx5lDVZ5CUfvfTZvMu4HCZo= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.1 h1:4HbnOGE9491a9zYJ9VpPh1ApgEq6ZlD4Kuv1PJenFpc= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.1/go.mod h1:Z6QnHC6TmpJWUxAy8FI4JzA7rTwl6EIANkyK9OR5z5w= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 h1:6+lZi2JeGKtCraAj1rpoZfKqnQ9SptseRZioejfUOLM= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0/go.mod h1:eb3gfbVIxIoGgJsi9pGne19dhCBpK6opTYpQqAmdy44= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.1 h1:ps3nrmBWdWwakZBydGX1CxeYFK80HsQ79JLMwm7Y4/c= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.1/go.mod h1:bAdfrfxENre68Hh2swNaGEVuFYE74o0SaSCAlaG9E74= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.1 h1:ky79ysLMxhwk5rxJtS+ILd3Mc8kC5fhsLBrP27r6h4I= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.1/go.mod h1:+2MmkvFvPYM1vsozBWduoLJUi5maxFk5B7KJFECujhY= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.1 h1:MdVYlN5pcQu1t1OYx4Ajo3fKl1IEhzgdPQbYFCRjYS8= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.1/go.mod h1:iikmNLrvHm2p4a3/4BPeix2S9P+nW8yM1IZW73x8bFA= -github.com/aws/aws-sdk-go-v2/service/s3 v1.85.1 h1:Hsqo8+dFxSdDvv9B2PgIx1AJAnDpqgS0znVI+R+MoGY= -github.com/aws/aws-sdk-go-v2/service/s3 v1.85.1/go.mod h1:8Q0TAPXD68Z8YqlcIGHs/UNIDHsxErV9H4dl4vJEpgw= -github.com/aws/aws-sdk-go-v2/service/sso v1.26.1 h1:uWaz3DoNK9MNhm7i6UGxqufwu3BEuJZm72WlpGwyVtY= -github.com/aws/aws-sdk-go-v2/service/sso v1.26.1/go.mod h1:ILpVNjL0BO+Z3Mm0SbEeUoYS9e0eJWV1BxNppp0fcb8= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.31.1 h1:XdG6/o1/ZDmn3wJU5SRAejHaWgKS4zHv0jBamuKuS2k= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.31.1/go.mod h1:oiotGTKadCOCl3vg/tYh4k45JlDF81Ka8rdumNhEnIQ= -github.com/aws/aws-sdk-go-v2/service/sts v1.35.1 h1:iF4Xxkc0H9c/K2dS0zZw3SCkj0Z7n6AMnUiiyoJND+I= -github.com/aws/aws-sdk-go-v2/service/sts v1.35.1/go.mod h1:0bxIatfN0aLq4mjoLDeBpOjOke68OsFlXPDFJ7V0MYw= -github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= -github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= -github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= -github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= -github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= -github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= -github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= -github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= -github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= -github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= -github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= -github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= -github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= -github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= -github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= -github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= -github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= -github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= -github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= -github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/nats-io/nats.go v1.44.0 h1:ECKVrDLdh/kDPV1g0gAQ+2+m2KprqZK5O/eJAyAnH2M= -github.com/nats-io/nats.go v1.44.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= -github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= -github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= -github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= -github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/ncruces/go-sqlite3 v0.30.4 h1:j9hEoOL7f9ZoXl8uqXVniaq1VNwlWAXihZbTvhqPPjA= -github.com/ncruces/go-sqlite3 v0.30.4/go.mod h1:7WR20VSC5IZusKhUdiR9y1NsUqnZgqIYCmKKoMEYg68= -github.com/ncruces/julianday v1.0.0 h1:fH0OKwa7NWvniGQtxdJRxAgkBMolni2BjDHaWTxqt7M= -github.com/ncruces/julianday v1.0.0/go.mod h1:Dusn2KvZrrovOMJuOt0TNXL6tB7U2E8kvza5fFc9G7g= -github.com/ncruces/litestream v0.5.5 h1:LUoyorC+Xx0TtiuEjwd0+GIusCK5IIZwTPsO1+se55g= -github.com/ncruces/litestream v0.5.5/go.mod h1:yLedOf7Gj4ZQQB6tCB06zm9wpNc2d8TdemfWjjc6fNk= -github.com/ncruces/wbt v0.2.0 h1:Q9zlKOBSZc7Yy/R2cGa35g6RKUUE3BjNIW3tfGC4F04= -github.com/ncruces/wbt v0.2.0/go.mod h1:DtF92amvMxH69EmBFUSFWRDAlo6hOEfoNQnClxj9C/c= -github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= -github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= -github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= -github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo= -github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk= -github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= -github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= -github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= -github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= -github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= -github.com/prometheus/common v0.67.4 h1:yR3NqWO1/UyO1w2PhUvXlGQs/PtFmoveVO0KZ4+Lvsc= -github.com/prometheus/common v0.67.4/go.mod h1:gP0fq6YjjNCLssJCQp0yk4M8W6ikLURwkdd/YKtTbyI= -github.com/prometheus/procfs v0.19.2 h1:zUMhqEW66Ex7OXIiDkll3tl9a1ZdilUOd/F6ZXw4Vws= -github.com/prometheus/procfs v0.19.2/go.mod h1:M0aotyiemPhBCM0z5w87kL22CxfcH05ZpYlu+b4J7mw= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= -github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= -github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -github.com/studio-b12/gowebdav v0.11.0 h1:qbQzq4USxY28ZYsGJUfO5jR+xkFtcnwWgitp4Zp1irU= -github.com/studio-b12/gowebdav v0.11.0/go.mod h1:bHA7t77X/QFExdeAnDzK6vKM34kEZAcE1OX4MfiwjkE= -github.com/superfly/ltx v0.5.1 h1:vGUeBhKvBKZ2s2TsTrOSahz+m0PswfqOWDAD+ICiiYY= -github.com/superfly/ltx v0.5.1/go.mod h1:Nf50QAIXU/ET4ua3AuQ2fh31MbgNQZA7r/DYx6Os77s= -github.com/tetratelabs/wazero v1.11.0 h1:+gKemEuKCTevU4d7ZTzlsvgd1uaToIDtlQlmNbwqYhA= -github.com/tetratelabs/wazero v1.11.0/go.mod h1:eV28rsN8Q+xwjogd7f4/Pp4xFxO7uOGbLcD/LzB1wiU= -go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= -go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo= -go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= -go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= -go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= -go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= -go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= -go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= -go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= -golang.org/x/crypto v0.46.0 h1:cKRW/pmt1pKAfetfu+RCEvjvZkA9RimPbh7bhFjGVBU= -golang.org/x/crypto v0.46.0/go.mod h1:Evb/oLKmMraqjZ2iQTwDwvCtJkczlDuTmdJXoZVzqU0= -golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= -golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= -golang.org/x/oauth2 v0.32.0 h1:jsCblLleRMDrxMN29H3z/k1KliIvpLgCkE6R8FXXNgY= -golang.org/x/oauth2 v0.32.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= -golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= -golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= -golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= -golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= -golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= -google.golang.org/api v0.154.0 h1:X7QkVKZBskztmpPKWQXgjJRPA2dJYrL6r+sYPRLj050= -google.golang.org/api v0.154.0/go.mod h1:qhSMkM85hgqiokIYsrRyKxrjfBeIhgl4Z2JmeRkYylc= -google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f h1:Vn+VyHU5guc9KjB5KrjI2q0wCOWEOIh0OEsleqakHJg= -google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f/go.mod h1:nWSwAFPb+qfNJXsoeO3Io7zf4tMSfN8EA8RlDA04GhY= -google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f h1:2yNACc1O40tTnrsbk9Cv6oxiW8pxI/pXj0wRtdlYmgY= -google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f/go.mod h1:Uy9bTZJqmfrw2rIBxgGLnamc78euZULUBrLZ9XTITKI= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM= -google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= -google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= -google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= -google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/litestream/time.go b/litestream/time.go deleted file mode 100644 index 8328cb9..0000000 --- a/litestream/time.go +++ /dev/null @@ -1,63 +0,0 @@ -package litestream - -import ( - "math" - "strings" - "time" - - "github.com/ncruces/go-sqlite3/util/sql3util" -) - -func parseTimeDelta(s string) (years, months, days int, duration time.Duration, ok bool) { - duration, err := time.ParseDuration(s) - if err == nil { - return 0, 0, 0, duration, true - } - - if strings.EqualFold(s, "now") { - return 0, 0, 0, 0, true - } - - ss := strings.TrimSuffix(strings.ToLower(s), "s") - switch { - case strings.HasSuffix(ss, " year"): - years, duration, ok = parseDateUnit(ss, " year", 365*86400) - - case strings.HasSuffix(ss, " month"): - months, duration, ok = parseDateUnit(ss, " month", 30*86400) - - case strings.HasSuffix(ss, " day"): - months, duration, ok = parseDateUnit(ss, " day", 86400) - - case strings.HasSuffix(ss, " hour"): - duration, ok = parseTimeUnit(ss, " hour", time.Hour) - - case strings.HasSuffix(ss, " minute"): - duration, ok = parseTimeUnit(ss, " minute", time.Minute) - - case strings.HasSuffix(ss, " second"): - duration, ok = parseTimeUnit(ss, " second", time.Second) - - default: - return sql3util.ParseTimeShift(s) - } - return -} - -func parseDateUnit(s, unit string, seconds float64) (int, time.Duration, bool) { - f, ok := sql3util.ParseFloat(s[:len(s)-len(unit)]) - if !ok { - return 0, 0, false - } - - i, f := math.Modf(f) - if math.MinInt <= i && i <= math.MaxInt { - return int(i), time.Duration(f * seconds * float64(time.Second)), true - } - return 0, 0, false -} - -func parseTimeUnit(s, unit string, scale time.Duration) (time.Duration, bool) { - f, ok := sql3util.ParseFloat(s[:len(s)-len(unit)]) - return time.Duration(f * float64(scale)), ok -} diff --git a/litestream/vfs.go b/litestream/vfs.go deleted file mode 100644 index 74c6db0..0000000 --- a/litestream/vfs.go +++ /dev/null @@ -1,411 +0,0 @@ -package litestream - -import ( - "context" - "encoding/binary" - "fmt" - "io" - "strconv" - "strings" - "sync" - "time" - - "github.com/benbjohnson/litestream" - "github.com/superfly/ltx" - - "github.com/ncruces/go-sqlite3" - "github.com/ncruces/go-sqlite3/vfs" - "github.com/ncruces/wbt" -) - -type liteVFS struct{} - -func (liteVFS) Open(name string, flags vfs.OpenFlag) (vfs.File, vfs.OpenFlag, error) { - // Temp journals, as used by the sorter, use a temporary file. - if flags&vfs.OPEN_TEMP_JOURNAL != 0 { - return vfs.Find("").Open(name, flags) - } - // Refuse to open all other file types. - if flags&vfs.OPEN_MAIN_DB == 0 { - return nil, flags, sqlite3.CANTOPEN - } - - liteMtx.RLock() - db := liteDBs[name] - liteMtx.RUnlock() - - if db == nil { - return nil, flags, sqlite3.CANTOPEN - } - - // Build the page index so we can lookup individual pages. - if err := db.buildIndex(context.Background()); err != nil { - db.opts.Logger.Error("build index", "error", err) - return nil, 0, err - } - return &liteFile{db: db}, flags | vfs.OPEN_READONLY, nil -} - -func (liteVFS) Delete(name string, dirSync bool) error { - // notest // used to delete journals - return sqlite3.IOERR_DELETE_NOENT -} - -func (liteVFS) Access(name string, flag vfs.AccessFlag) (bool, error) { - // notest // used to check for journals - return false, nil -} - -func (liteVFS) FullPathname(name string) (string, error) { - return name, nil -} - -type liteFile struct { - db *liteDB - conn *sqlite3.Conn - pages *pageIndex - syncTime time.Time - txid ltx.TXID - pageSize uint32 - locked bool -} - -func (f *liteFile) Close() error { return nil } - -func (f *liteFile) ReadAt(p []byte, off int64) (n int, err error) { - ctx := f.context() - pages, txid := f.pages, f.txid - if pages == nil && f.syncTime.IsZero() { - pages, txid, err = f.db.pollReplica(ctx) - } - if err != nil { - f.db.opts.Logger.Error("poll replica", "error", err) - return 0, err - } - - pgno := uint32(1) - if off >= 512 { - pgno += uint32(off / int64(f.pageSize)) - } - - elem, ok := pages.Get(pgno) - if !ok { - return 0, io.EOF - } - - data, err := f.db.cache.getOrFetch(ctx, f.db.client, pgno, elem) - if err != nil { - f.db.opts.Logger.Error("fetch page", "error", err) - return 0, err - } - - // Update the first page to pretend we are in journal mode, - // load the page size and track changes to the database. - if pgno == 1 && len(data) >= 100 && - data[18] >= 1 && data[19] >= 1 && - data[18] <= 3 && data[19] <= 3 { - data[18], data[19] = 0x01, 0x01 - binary.BigEndian.PutUint32(data[24:28], uint32(txid)) - f.pageSize = uint32(256 * binary.LittleEndian.Uint16(data[16:18])) - } - - n = copy(p, data[off%int64(len(data)):]) - return n, nil -} - -func (f *liteFile) WriteAt(b []byte, off int64) (n int, err error) { - // notest // OPEN_READONLY - return 0, sqlite3.IOERR_WRITE -} - -func (f *liteFile) Truncate(size int64) error { - // notest // OPEN_READONLY - return sqlite3.IOERR_TRUNCATE -} - -func (f *liteFile) Sync(flag vfs.SyncFlag) error { - // notest // OPEN_READONLY - return sqlite3.IOERR_FSYNC -} - -func (f *liteFile) Size() (size int64, err error) { - if max := f.pages.Max(); max != nil { - size = int64(max.Key()) * int64(f.pageSize) - } - return -} - -func (f *liteFile) Lock(lock vfs.LockLevel) (err error) { - if lock >= vfs.LOCK_RESERVED { - // notest // OPEN_READONLY - return sqlite3.IOERR_LOCK - } - if f.syncTime.IsZero() { - f.pages, f.txid, err = f.db.pollReplica(f.context()) - } - if err != nil { - f.db.opts.Logger.Error("poll replica", "error", err) - } else { - f.locked = true - } - return err -} - -func (f *liteFile) Unlock(lock vfs.LockLevel) error { - if f.syncTime.IsZero() { - f.pages, f.txid = nil, 0 - } - f.locked = false - return nil -} - -func (f *liteFile) CheckReservedLock() (bool, error) { - // notest // used to check for hot journals - return false, nil -} - -func (f *liteFile) SectorSize() int { - // notest // safe default - return 0 -} - -func (f *liteFile) DeviceCharacteristics() vfs.DeviceCharacteristic { - // notest // safe default - return 0 -} - -func (f *liteFile) Pragma(name, value string) (string, error) { - switch name { - case "litestream_txid": - txid := f.txid - if txid == 0 { - f.db.mtx.Lock() - txid = f.db.txids[0] - f.db.mtx.Unlock() - } - return txid.String(), nil - - case "litestream_lag": - f.db.mtx.Lock() - lastPoll := f.db.lastPoll - f.db.mtx.Unlock() - - if lastPoll.IsZero() { - return "-1", nil - } - lag := time.Since(lastPoll) / time.Second - return strconv.FormatInt(int64(lag), 10), nil - - case "litestream_time": - if value == "" { - syncTime := f.syncTime - if syncTime.IsZero() { - f.db.mtx.Lock() - syncTime = f.db.lastInfo - f.db.mtx.Unlock() - } - if syncTime.IsZero() { - return "latest", nil - } - return syncTime.Format(time.RFC3339Nano), nil - } - - if f.locked { - return "", sqlite3.MISUSE - } - - if strings.EqualFold(value, "latest") { - f.syncTime = time.Time{} - f.pages, f.txid = nil, 0 - return "", nil - } - - var syncTime time.Time - if years, months, days, duration, ok := parseTimeDelta(value); ok { - syncTime = time.Now().AddDate(years, months, days).Add(duration) - } else { - syncTime, _ = sqlite3.TimeFormatAuto.Decode(value) - } - if syncTime.IsZero() { - return "", sqlite3.MISUSE - } - - err := f.buildIndex(f.context(), syncTime) - if err != nil { - f.db.opts.Logger.Error("build index", "error", err) - } - return "", err - } - - return "", sqlite3.NOTFOUND -} - -func (f *liteFile) SetDB(conn any) { - f.conn = conn.(*sqlite3.Conn) -} - -func (f *liteFile) context() context.Context { - if f.conn != nil { - return f.conn.GetInterrupt() - } - return context.Background() -} - -func (f *liteFile) buildIndex(ctx context.Context, syncTime time.Time) error { - // Build the index from scratch from a Litestream restore plan. - infos, err := litestream.CalcRestorePlan(ctx, f.db.client, 0, syncTime, f.db.opts.Logger) - if err != nil { - return fmt.Errorf("calc restore plan: %w", err) - } - - var txid ltx.TXID - var pages *pageIndex - syncTime = time.Time{} - for _, info := range infos { - pages, err = fetchPageIndex(ctx, pages, f.db.client, info) - if err != nil { - return err - } - if syncTime.Before(info.CreatedAt) { - syncTime = info.CreatedAt - } - txid = max(txid, info.MaxTXID) - } - f.syncTime = syncTime - f.pages = pages - f.txid = txid - return nil -} - -type liteDB struct { - client litestream.ReplicaClient - opts ReplicaOptions - cache pageCache - pages *pageIndex // +checklocks:mtx - lastPoll time.Time // +checklocks:mtx - lastInfo time.Time // +checklocks:mtx - txids levelTXIDs // +checklocks:mtx - mtx sync.Mutex -} - -func (d *liteDB) buildIndex(ctx context.Context) error { - d.mtx.Lock() - defer d.mtx.Unlock() - - // Skip if we already have an index. - if !d.lastPoll.IsZero() { - return nil - } - - // Build the index from scratch from a Litestream restore plan. - infos, err := litestream.CalcRestorePlan(ctx, d.client, 0, time.Time{}, d.opts.Logger) - if err != nil { - return fmt.Errorf("calc restore plan: %w", err) - } - - for _, info := range infos { - err := d.updateInfo(ctx, info) - if err != nil { - return err - } - } - - d.lastPoll = time.Now() - return nil -} - -func (d *liteDB) pollReplica(ctx context.Context) (*pageIndex, ltx.TXID, error) { - d.mtx.Lock() - defer d.mtx.Unlock() - - // Limit polling interval. - if time.Since(d.lastPoll) < d.opts.PollInterval { - return d.pages, d.txids[0], nil - } - - for level := range []int{0, 1, litestream.SnapshotLevel} { - if err := d.updateLevel(ctx, level); err != nil { - return nil, 0, err - } - } - - d.lastPoll = time.Now() - return d.pages, d.txids[0], nil -} - -// +checklocks:d.mtx -func (d *liteDB) updateLevel(ctx context.Context, level int) error { - var nextTXID ltx.TXID - // Snapshots must start from scratch, - // other levels can start from where they were left. - if level != litestream.SnapshotLevel { - nextTXID = d.txids[level] + 1 - } - - // Start reading from the next LTX file after the current position. - itr, err := d.client.LTXFiles(ctx, level, nextTXID, false) - if err != nil { - return fmt.Errorf("ltx files: %w", err) - } - defer itr.Close() - - // Build an update across all new LTX files. - for itr.Next() { - info := itr.Item() - - // Skip LTX files already fully loaded into the index. - if info.MaxTXID <= d.txids[level] { - continue - } - - err := d.updateInfo(ctx, info) - if err != nil { - return err - } - } - if err := itr.Err(); err != nil { - return err - } - return itr.Close() -} - -// +checklocks:d.mtx -func (d *liteDB) updateInfo(ctx context.Context, info *ltx.FileInfo) error { - pages, err := fetchPageIndex(ctx, d.pages, d.client, info) - if err != nil { - return err - } - - // Track the MaxTXID for each level. - maxTXID := &d.txids[info.Level] - *maxTXID = max(*maxTXID, info.MaxTXID) - d.txids[0] = max(d.txids[0], *maxTXID) - if d.lastInfo.Before(info.CreatedAt) { - d.lastInfo = info.CreatedAt - } - d.pages = pages - return nil -} - -func fetchPageIndex( - ctx context.Context, pages *pageIndex, - client litestream.ReplicaClient, info *ltx.FileInfo) (*pageIndex, error) { - - idx, err := litestream.FetchPageIndex(ctx, client, info) - if err != nil { - return nil, fmt.Errorf("fetch page index: %w", err) - } - - // Replace pages in the index with new pages. - for k, v := range idx { - // Patch avoids mutating the index for an unmodified page. - pages = pages.Patch(k, func(node *pageIndex) (ltx.PageIndexElem, bool) { - return v, node == nil || v != node.Value() - }) - } - return pages, nil -} - -// Type aliases; these are a mouthful. -type pageIndex = wbt.Tree[uint32, ltx.PageIndexElem] -type levelTXIDs = [litestream.SnapshotLevel + 1]ltx.TXID diff --git a/litestream/vfs_test.go b/litestream/vfs_test.go deleted file mode 100644 index abb46d8..0000000 --- a/litestream/vfs_test.go +++ /dev/null @@ -1,104 +0,0 @@ -package litestream - -import ( - "path/filepath" - "testing" - "time" - - "github.com/benbjohnson/litestream" - "github.com/benbjohnson/litestream/file" - "github.com/ncruces/go-sqlite3/driver" - _ "github.com/ncruces/go-sqlite3/embed" -) - -func Test_integration(t *testing.T) { - dir := t.TempDir() - dbpath := filepath.Join(dir, "test.db") - backup := filepath.Join(dir, "backup", "test.db") - - db, err := driver.Open(dbpath) - if err != nil { - t.Fatal(err) - } - defer db.Close() - - client := file.NewReplicaClient(backup) - setupReplication(t, dbpath, client) - - NewReplica("test.db", client, ReplicaOptions{}) - replica, err := driver.Open("file:test.db?vfs=litestream") - if err != nil { - t.Fatal(err) - } - defer replica.Close() - - _, err = db.ExecContext(t.Context(), `CREATE TABLE users (id INT, name VARCHAR(10))`) - if err != nil { - t.Fatal(err) - } - - _, err = db.ExecContext(t.Context(), - `INSERT INTO users (id, name) VALUES (0, 'go'), (1, 'zig'), (2, 'whatever')`) - if err != nil { - t.Fatal(err) - } - - time.Sleep(DefaultPollInterval + litestream.DefaultMonitorInterval) - - rows, err := replica.QueryContext(t.Context(), `SELECT id, name FROM users`) - if err != nil { - t.Fatal(err) - } - defer rows.Close() - - row := 0 - ids := []int{0, 1, 2} - names := []string{"go", "zig", "whatever"} - for ; rows.Next(); row++ { - var id int - var name string - err := rows.Scan(&id, &name) - if err != nil { - t.Fatal(err) - } - - if id != ids[row] { - t.Errorf("got %d, want %d", id, ids[row]) - } - if name != names[row] { - t.Errorf("got %q, want %q", name, names[row]) - } - } - if row != 3 { - t.Errorf("got %d, want %d", row, len(ids)) - } - - var lag int - err = replica.QueryRowContext(t.Context(), `PRAGMA litestream_lag`).Scan(&lag) - if err != nil { - t.Fatal(err) - } - if lag < 0 || lag > 2 { - t.Errorf("got %d", lag) - } - - var txid string - err = replica.QueryRowContext(t.Context(), `PRAGMA litestream_txid`).Scan(&txid) - if err != nil { - t.Fatal(err) - } - if txid != "0000000000000001" { - t.Errorf("got %q", txid) - } -} - -func setupReplication(tb testing.TB, path string, client ReplicaClient) { - lsdb := litestream.NewDB(path) - lsdb.Replica = litestream.NewReplicaWithClient(lsdb, client) - - err := lsdb.Open() - if err != nil { - tb.Fatal(err) - } - tb.Cleanup(func() { lsdb.Close(tb.Context()) }) -} diff --git a/vfs/README.md b/vfs/README.md index 0b99cb2..27b76ab 100644 --- a/vfs/README.md +++ b/vfs/README.md @@ -125,5 +125,5 @@ The VFS can be customized with a few build tags: wraps a VFS to offer encryption at rest. - [`github.com/ncruces/go-sqlite3/vfs/xts`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/vfs/xts) wraps a VFS to offer encryption at rest. -- [`github.com/ncruces/go-sqlite3/litestream`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/litestream) +- [`github.com/ncruces/litestream`](https://pkg.go.dev/github.com/ncruces/litestream) implements Litestream [lightweight read-replicas](https://fly.io/blog/litestream-revamped/#lightweight-read-replicas).