From 8f9a6ca4c1d816ed084674d138d41e989c6b6141 Mon Sep 17 00:00:00 2001 From: Nuno Cruces Date: Sun, 9 Nov 2025 13:16:08 +0000 Subject: [PATCH] Litestream lightweight read-replicas. (#328) --- .github/workflows/test.yml | 2 +- litestream/README.md | 11 ++ litestream/api.go | 62 ++++++++ litestream/go.mod | 63 ++++++++ litestream/go.sum | 197 +++++++++++++++++++++++++ litestream/vfs.go | 290 +++++++++++++++++++++++++++++++++++++ litestream/vfs_test.go | 47 ++++++ vfs/README.md | 4 + 8 files changed, 675 insertions(+), 1 deletion(-) create mode 100644 litestream/README.md create mode 100644 litestream/api.go create mode 100644 litestream/go.mod create mode 100644 litestream/go.sum create mode 100644 litestream/vfs.go create mode 100644 litestream/vfs_test.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7ed91da..30a8f4d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -74,7 +74,7 @@ jobs: - name: Test GORM shell: bash run: gormlite/test.sh - if: matrix.os != 'windows-latest' + if: matrix.os == 'ubuntu-latest' - name: Collect coverage run: | diff --git a/litestream/README.md b/litestream/README.md new file mode 100644 index 0000000..d1fcb41 --- /dev/null +++ b/litestream/README.md @@ -0,0 +1,11 @@ +# Litestream lightweight read-replicas + +This package implements the **EXPERIMENTAL** `"litestream"` SQLite VFS +that offers Litestream [lightweight read-replicas](https://fly.io/blog/litestream-revamped/#lightweight-read-replicas). + +See the [example](vfs_test.go) for how to use. + +To improve performance, +increase `PollInterval` (and `MinLevel`) as much as you can, +and set [`PRAGMA cache_size=N`](https://www.sqlite.org/pragma.html#pragma_cache_size) +(or use `_pragma=cache_size(N)`). \ No newline at end of file diff --git a/litestream/api.go b/litestream/api.go new file mode 100644 index 0000000..00bb75d --- /dev/null +++ b/litestream/api.go @@ -0,0 +1,62 @@ +// Package litestream implements a Litestream lightweight read-replica VFS. +package litestream + +import ( + "log/slog" + "sync" + "time" + + "github.com/benbjohnson/litestream" + "github.com/ncruces/go-sqlite3/vfs" +) + +// The default poll interval. +const DefaultPollInterval = 1 * time.Second + +func init() { + vfs.Register("litestream", liteVFS{}) +} + +var ( + liteMtx sync.RWMutex + // +checklocks:liteMtx + liteDBs = map[string]*liteDB{} +) + +// ReplicaOptions represents options for [NewReplica]. +type ReplicaOptions struct { + // Where to log error messages. May be nil. + Logger *slog.Logger + // Minimum compaction level to track. + MinLevel int + // Replica poll interval. Must be less than the compaction interval + // used by the replica at MinLevel+1. + PollInterval time.Duration +} + +// NewReplica creates a read-replica from a Litestream client. +func NewReplica(name string, client litestream.ReplicaClient, options ReplicaOptions) { + if options.Logger != nil { + options.Logger = options.Logger.With("name", name) + } else { + options.Logger = slog.New(slog.DiscardHandler) + } + if options.PollInterval <= 0 { + options.PollInterval = DefaultPollInterval + } + options.MinLevel = max(0, min(options.MinLevel, litestream.SnapshotLevel)) + + liteMtx.Lock() + defer liteMtx.Unlock() + liteDBs[name] = &liteDB{ + client: client, + opts: &options, + } +} + +// RemoveReplica removes a replica by name. +func RemoveReplica(name string) { + liteMtx.Lock() + defer liteMtx.Unlock() + delete(liteDBs, name) +} diff --git a/litestream/go.mod b/litestream/go.mod new file mode 100644 index 0000000..f11c5c0 --- /dev/null +++ b/litestream/go.mod @@ -0,0 +1,63 @@ +module github.com/ncruces/go-sqlite3/litestream + +go 1.24.4 + +require ( + github.com/benbjohnson/litestream v0.5.2 + github.com/ncruces/go-sqlite3 v0.30.1 + github.com/ncruces/wbt v0.2.0 + github.com/superfly/ltx v0.5.0 +) + +// github.com/ncruces/go-sqlite3 +require ( + github.com/ncruces/julianday v1.0.0 // indirect + github.com/tetratelabs/wazero v1.10.0 // indirect + golang.org/x/sys v0.38.0 // indirect +) + +// github.com/superfly/ltx +require github.com/pierrec/lz4/v4 v4.1.22 // indirect + +// github.com/benbjohnson/litestream +require ( + filippo.io/age v1.2.1 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_golang v1.23.2 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.67.2 // indirect + github.com/prometheus/procfs v0.19.2 // indirect + github.com/psanford/sqlite3vfs v0.0.0-20240315230605-24e1d98cf361 // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect + golang.org/x/crypto v0.43.0 // indirect + google.golang.org/protobuf v1.36.10 // indirect + modernc.org/sqlite v1.40.0 // indirect +) + +// github.com/benbjohnson/litestream/s3 +require ( + github.com/aws/aws-sdk-go-v2 v1.37.1 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 // indirect + github.com/aws/aws-sdk-go-v2/config v1.30.2 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.18.2 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.1 // indirect + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.1 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.85.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.26.1 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.31.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.35.1 // indirect + github.com/aws/smithy-go v1.22.5 // indirect +) + +replace modernc.org/sqlite => github.com/ncruces/go-sqlite3/litestream/modernc v0.0.0-20251109124432-99b097de3b79 diff --git a/litestream/go.sum b/litestream/go.sum new file mode 100644 index 0000000..e2b3db0 --- /dev/null +++ b/litestream/go.sum @@ -0,0 +1,197 @@ +c2sp.org/CCTV/age v0.0.0-20240306222714-3ec4d716e805 h1:u2qwJeEvnypw+OCPUHmoZE3IqwfuN5kgDfo5MLzpNM0= +c2sp.org/CCTV/age v0.0.0-20240306222714-3ec4d716e805/go.mod h1:FomMrUJ2Lxt5jCLmZkG3FHa72zUprnhd3v/Z18Snm4w= +cloud.google.com/go v0.111.0 h1:YHLKNupSD1KqjDbQ3+LVdQ81h/UJbJyZG203cEfnQgM= +cloud.google.com/go v0.111.0/go.mod h1:0mibmpKP1TyOOFYQY5izo0LnT+ecvOQ0Sg3OdmMiNRU= +cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk= +cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= +cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= +cloud.google.com/go/iam v1.1.5 h1:1jTsCu4bcsNsE4iiqNT5SHwrDRCfRmIaaaVFhRveTJI= +cloud.google.com/go/iam v1.1.5/go.mod h1:rB6P/Ic3mykPbFio+vo7403drjlgvoWfYpJhMXEbzv8= +cloud.google.com/go/storage v1.36.0 h1:P0mOkAcaJxhCTvAkMhxMfrTKiNcub4YmmPBtlhAyTr8= +cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYEsng2xgOs8= +filippo.io/age v1.2.1 h1:X0TZjehAZylOIj4DubWYU1vWQxv9bJpo+Uu2/LGhi1o= +filippo.io/age v1.2.1/go.mod h1:JL9ew2lTN+Pyft4RiNGguFfOpewKwSHm5ayKD/A4004= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.2 h1:Hr5FTipp7SL07o2FvoVOX9HRiRH3CR3Mj8pxqCcdD5A= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.2/go.mod h1:QyVsSSN64v5TGltphKLQ2sQxe4OBQg0J1eKRcVBnfgE= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.11.0 h1:MhRfI58HblXzCtWEZCO0feHs8LweePB3s90r7WaR1KU= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.11.0/go.mod h1:okZ+ZURbArNdlJ+ptXoyHNuOETzOl1Oww19rm8I2WLA= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 h1:9iefClla7iYpfYWdzPCRDozdmndjTm8DXdpCzPajMgA= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2/go.mod h1:XtLgD3ZD34DAaVIIAyG3objl5DynM3CQ/vMcbBNJZGI= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 h1:FwladfywkNirM+FZYLBR2kBz5C8Tg0fw5w5Y7meRXWI= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2/go.mod h1:vv5Ad0RrIoT1lJFdWBZwt4mB1+j+V8DUroixmKDTCdk= +github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 h1:oygO0locgZJe7PpYPXT5A29ZkwJaPqcva7BVeemZOZs= +github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/aws/aws-sdk-go-v2 v1.37.1 h1:SMUxeNz3Z6nqGsXv0JuJXc8w5YMtrQMuIBmDx//bBDY= +github.com/aws/aws-sdk-go-v2 v1.37.1/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 h1:6GMWV6CNpA/6fbFHnoAjrv4+LGfyTqZz2LtCHnspgDg= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0/go.mod h1:/mXlTIVG9jbxkqDnr5UQNQxW1HRYxeGklkM9vAFeabg= +github.com/aws/aws-sdk-go-v2/config v1.30.2 h1:YE1BmSc4fFYqFgN1mN8uzrtc7R9x+7oSWeX8ckoltAw= +github.com/aws/aws-sdk-go-v2/config v1.30.2/go.mod h1:UNrLGZ6jfAVjgVJpkIxjLufRJqTXCVYOpkeVf83kwBo= +github.com/aws/aws-sdk-go-v2/credentials v1.18.2 h1:mfm0GKY/PHLhs7KO0sUaOtFnIQ15Qqxt+wXbO/5fIfs= +github.com/aws/aws-sdk-go-v2/credentials v1.18.2/go.mod h1:v0SdJX6ayPeZFQxgXUKw5RhLpAoZUuynxWDfh8+Eknc= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.1 h1:owmNBboeA0kHKDcdF8KiSXmrIuXZustfMGGytv6OMkM= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.1/go.mod h1:Bg1miN59SGxrZqlP8vJZSmXW+1N8Y1MjQDq1OfuNod8= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.2 h1:YFX4DvH1CPQXgQR8935b46Om+L7+6jus4aTdKqyDR84= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.2/go.mod h1:DgMPy7GqxcV0RSyaITnI3rw8HC3lIHB87U3KPQKDxHg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.1 h1:ksZXBYv80EFTcgc8OJO48aQ8XDWXIQL7gGasPeCoTzI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.1/go.mod h1:HSksQyyJETVZS7uM54cir0IgxttTD+8aEoJMPGepHBI= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.1 h1:+dn/xF/05utS7tUhjIcndbuaPjfll2LhbH1cCDGLYUQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.1/go.mod h1:hyAGz30LHdm5KBZDI58MXx5lDVZ5CUfvfTZvMu4HCZo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.1 h1:4HbnOGE9491a9zYJ9VpPh1ApgEq6ZlD4Kuv1PJenFpc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.1/go.mod h1:Z6QnHC6TmpJWUxAy8FI4JzA7rTwl6EIANkyK9OR5z5w= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 h1:6+lZi2JeGKtCraAj1rpoZfKqnQ9SptseRZioejfUOLM= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0/go.mod h1:eb3gfbVIxIoGgJsi9pGne19dhCBpK6opTYpQqAmdy44= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.1 h1:ps3nrmBWdWwakZBydGX1CxeYFK80HsQ79JLMwm7Y4/c= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.1/go.mod h1:bAdfrfxENre68Hh2swNaGEVuFYE74o0SaSCAlaG9E74= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.1 h1:ky79ysLMxhwk5rxJtS+ILd3Mc8kC5fhsLBrP27r6h4I= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.1/go.mod h1:+2MmkvFvPYM1vsozBWduoLJUi5maxFk5B7KJFECujhY= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.1 h1:MdVYlN5pcQu1t1OYx4Ajo3fKl1IEhzgdPQbYFCRjYS8= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.1/go.mod h1:iikmNLrvHm2p4a3/4BPeix2S9P+nW8yM1IZW73x8bFA= +github.com/aws/aws-sdk-go-v2/service/s3 v1.85.1 h1:Hsqo8+dFxSdDvv9B2PgIx1AJAnDpqgS0znVI+R+MoGY= +github.com/aws/aws-sdk-go-v2/service/s3 v1.85.1/go.mod h1:8Q0TAPXD68Z8YqlcIGHs/UNIDHsxErV9H4dl4vJEpgw= +github.com/aws/aws-sdk-go-v2/service/sso v1.26.1 h1:uWaz3DoNK9MNhm7i6UGxqufwu3BEuJZm72WlpGwyVtY= +github.com/aws/aws-sdk-go-v2/service/sso v1.26.1/go.mod h1:ILpVNjL0BO+Z3Mm0SbEeUoYS9e0eJWV1BxNppp0fcb8= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.31.1 h1:XdG6/o1/ZDmn3wJU5SRAejHaWgKS4zHv0jBamuKuS2k= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.31.1/go.mod h1:oiotGTKadCOCl3vg/tYh4k45JlDF81Ka8rdumNhEnIQ= +github.com/aws/aws-sdk-go-v2/service/sts v1.35.1 h1:iF4Xxkc0H9c/K2dS0zZw3SCkj0Z7n6AMnUiiyoJND+I= +github.com/aws/aws-sdk-go-v2/service/sts v1.35.1/go.mod h1:0bxIatfN0aLq4mjoLDeBpOjOke68OsFlXPDFJ7V0MYw= +github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= +github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= +github.com/benbjohnson/litestream v0.5.2 h1:uD9I17n6RgUgyCwPM/Sw2YXNmMGixecUB5kmJ4FL08o= +github.com/benbjohnson/litestream v0.5.2/go.mod h1:jSW6AGqbxmJnEXGjMHchlZclGphzbJ6jGrGo5fYIDhU= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= +github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= +github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= +github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= +github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= +github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= +github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/mattn/go-sqlite3 v1.14.8/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/mattn/go-sqlite3 v1.14.19 h1:fhGleo2h1p8tVChob4I9HpmVFIAkKGpiukdrgQbWfGI= +github.com/mattn/go-sqlite3 v1.14.19/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nats-io/nats.go v1.44.0 h1:ECKVrDLdh/kDPV1g0gAQ+2+m2KprqZK5O/eJAyAnH2M= +github.com/nats-io/nats.go v1.44.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= +github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/ncruces/go-sqlite3 v0.30.1 h1:pHC3YsyRdJv4pCMB4MO1Q2BXw/CAa+Hoj7GSaKtVk+g= +github.com/ncruces/go-sqlite3 v0.30.1/go.mod h1:UVsWrQaq1qkcal5/vT5lOJnZCVlR5rsThKdwidjFsKc= +github.com/ncruces/go-sqlite3/litestream/modernc v0.0.0-20251109124432-99b097de3b79 h1:evpQceUV2vRbOe84U/QhBBchfqFERRHTx1JOadFFMLE= +github.com/ncruces/go-sqlite3/litestream/modernc v0.0.0-20251109124432-99b097de3b79/go.mod h1:GSM2gXEOb9HIFFtsl0IUtnpvpDmVi7Kbp8z5GzwA0Tw= +github.com/ncruces/julianday v1.0.0 h1:fH0OKwa7NWvniGQtxdJRxAgkBMolni2BjDHaWTxqt7M= +github.com/ncruces/julianday v1.0.0/go.mod h1:Dusn2KvZrrovOMJuOt0TNXL6tB7U2E8kvza5fFc9G7g= +github.com/ncruces/wbt v0.2.0 h1:Q9zlKOBSZc7Yy/R2cGa35g6RKUUE3BjNIW3tfGC4F04= +github.com/ncruces/wbt v0.2.0/go.mod h1:DtF92amvMxH69EmBFUSFWRDAlo6hOEfoNQnClxj9C/c= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= +github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo= +github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.67.2 h1:PcBAckGFTIHt2+L3I33uNRTlKTplNzFctXcWhPyAEN8= +github.com/prometheus/common v0.67.2/go.mod h1:63W3KZb1JOKgcjlIr64WW/LvFGAqKPj0atm+knVGEko= +github.com/prometheus/procfs v0.19.2 h1:zUMhqEW66Ex7OXIiDkll3tl9a1ZdilUOd/F6ZXw4Vws= +github.com/prometheus/procfs v0.19.2/go.mod h1:M0aotyiemPhBCM0z5w87kL22CxfcH05ZpYlu+b4J7mw= +github.com/psanford/sqlite3vfs v0.0.0-20240315230605-24e1d98cf361 h1:vAKifIJuYY306ZJSrwDgKonWcJGELijdaenABqbV03E= +github.com/psanford/sqlite3vfs v0.0.0-20240315230605-24e1d98cf361/go.mod h1:iW4cSew5PAb1sMZiTEkVJAIBNrepaB6jTYjeP47WtI0= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/superfly/ltx v0.5.0 h1:dXNrcT3ZtMb6iKZopIV7z5UBscnapg0b0F02loQsk5o= +github.com/superfly/ltx v0.5.0/go.mod h1:Nf50QAIXU/ET4ua3AuQ2fh31MbgNQZA7r/DYx6Os77s= +github.com/tetratelabs/wazero v1.10.0 h1:CXP3zneLDl6J4Zy8N/J+d5JsWKfrjE6GtvVK1fpnDlk= +github.com/tetratelabs/wazero v1.10.0/go.mod h1:DRm5twOQ5Gr1AoEdSi0CLjDQF1J9ZAuyqFIjl1KKfQU= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo= +go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= +go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= +go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= +go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= +go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= +go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= +go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= +golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= +golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= +golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= +golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= +golang.org/x/oauth2 v0.32.0 h1:jsCblLleRMDrxMN29H3z/k1KliIvpLgCkE6R8FXXNgY= +golang.org/x/oauth2 v0.32.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= +golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/api v0.154.0 h1:X7QkVKZBskztmpPKWQXgjJRPA2dJYrL6r+sYPRLj050= +google.golang.org/api v0.154.0/go.mod h1:qhSMkM85hgqiokIYsrRyKxrjfBeIhgl4Z2JmeRkYylc= +google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 h1:YJ5pD9rF8o9Qtta0Cmy9rdBwkSjrTCT6XTiUQVOtIos= +google.golang.org/genproto v0.0.0-20231212172506-995d672761c0/go.mod h1:l/k7rMz0vFTBPy+tFSGvXEd3z+BcoG1k7EHbqm+YBsY= +google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0 h1:s1w3X6gQxwrLEpxnLd/qXTVLgQE2yXwaOaoa6IlY/+o= +google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0/go.mod h1:CAny0tYF+0/9rmDB9fahA9YLzX3+AEVl1qXbv5hhj6c= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 h1:/jFB8jK5R3Sq3i/lmeZO0cATSzFfZaJq1J2Euan3XKU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0/go.mod h1:FUoWkonphQm3RhTS+kOEhF8h0iDpm4tdXolVCeZ9KKA= +google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= +google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/litestream/vfs.go b/litestream/vfs.go new file mode 100644 index 0000000..f4d63ce --- /dev/null +++ b/litestream/vfs.go @@ -0,0 +1,290 @@ +package litestream + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "sync" + "time" + + "github.com/benbjohnson/litestream" + "github.com/ncruces/go-sqlite3" + "github.com/ncruces/go-sqlite3/util/vfsutil" + "github.com/ncruces/go-sqlite3/vfs" + "github.com/ncruces/wbt" + "github.com/superfly/ltx" +) + +type liteVFS struct{} + +func (liteVFS) Open(name string, flags vfs.OpenFlag) (vfs.File, vfs.OpenFlag, error) { + // Temp journals, as used by the sorter, use SliceFile. + if flags&vfs.OPEN_TEMP_JOURNAL != 0 { + return &vfsutil.SliceFile{}, flags | vfs.OPEN_MEMORY, nil + } + // Refuse to open all other file types. + if flags&vfs.OPEN_MAIN_DB == 0 { + return nil, flags, sqlite3.CANTOPEN + } + + liteMtx.RLock() + defer liteMtx.RUnlock() + if db, ok := liteDBs[name]; ok { + // Build the page index so we can lookup individual pages. + if err := db.buildIndex(context.Background()); err != nil { + db.opts.Logger.Error("build index", "error", err) + return nil, 0, err + } + return &liteFile{db: db}, flags | vfs.OPEN_READONLY, nil + } + return nil, flags, sqlite3.CANTOPEN +} + +func (liteVFS) Delete(name string, dirSync bool) error { + // notest // used to delete journals + return sqlite3.IOERR_DELETE_NOENT +} + +func (liteVFS) Access(name string, flag vfs.AccessFlag) (bool, error) { + // notest // used to check for journals + return false, nil +} + +func (liteVFS) FullPathname(name string) (string, error) { + return name, nil +} + +type liteFile struct { + db *liteDB + conn *sqlite3.Conn + pages *pageIndex + txid ltx.TXID + pageSize uint32 +} + +func (f *liteFile) Close() error { return nil } + +func (f *liteFile) ReadAt(p []byte, off int64) (n int, err error) { + ctx := f.context() + pages, txid := f.pages, f.txid + if pages == nil { + pages, txid, err = f.db.pollReplica(ctx) + } + if err != nil { + return 0, err + } + + pgno := uint32(1) + if off >= 512 { + pgno += uint32(off / int64(f.pageSize)) + } + + elem, ok := pages.Get(pgno) + if !ok { + return 0, io.EOF + } + + _, data, err := litestream.FetchPage(ctx, f.db.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size) + if err != nil { + f.db.opts.Logger.Error("fetch page", "error", err) + return 0, err + } + + // Update the first page to pretend we are in journal mode, + // load the page size and track changes to the database. + if pgno == 1 && len(data) >= 100 && + data[18] >= 1 && data[19] >= 1 && + data[18] <= 3 && data[19] <= 3 { + data[18], data[19] = 0x01, 0x01 + binary.BigEndian.PutUint32(data[24:28], uint32(txid)) + f.pageSize = uint32(256 * binary.LittleEndian.Uint16(data[16:18])) + } + + n = copy(p, data[off%int64(len(data)):]) + return n, nil +} + +func (f *liteFile) WriteAt(b []byte, off int64) (n int, err error) { + // notest // OPEN_READONLY + return 0, sqlite3.IOERR_WRITE +} + +func (f *liteFile) Truncate(size int64) error { + // notest // OPEN_READONLY + return sqlite3.IOERR_TRUNCATE +} + +func (f *liteFile) Sync(flag vfs.SyncFlag) error { + // notest // OPEN_READONLY + return sqlite3.IOERR_FSYNC +} + +func (f *liteFile) Size() (size int64, err error) { + if max := f.pages.Max(); max != nil { + size = int64(max.Key()) * int64(f.pageSize) + } + return +} + +func (f *liteFile) Lock(lock vfs.LockLevel) (err error) { + if lock >= vfs.LOCK_RESERVED { + return sqlite3.IOERR_LOCK + } + f.pages, f.txid, err = f.db.pollReplica(f.context()) + return err +} + +func (f *liteFile) Unlock(lock vfs.LockLevel) error { + f.pages, f.txid = nil, 0 + return nil +} + +func (f *liteFile) CheckReservedLock() (bool, error) { + // notest // used to check for hot journals + return false, nil +} + +func (f *liteFile) SectorSize() int { + // notest // safe default + return 0 +} + +func (f *liteFile) DeviceCharacteristics() vfs.DeviceCharacteristic { + // notest // safe default + return 0 +} + +func (f *liteFile) SetDB(conn any) { + f.conn = conn.(*sqlite3.Conn) +} + +func (f *liteFile) context() context.Context { + if f.conn != nil { + return f.conn.GetInterrupt() + } + return context.Background() +} + +type liteDB struct { + client litestream.ReplicaClient + opts *ReplicaOptions + pages *pageIndex // +checklocks:mtx + lastPoll time.Time // +checklocks:mtx + txids levelTXIDs // +checklocks:mtx + mtx sync.Mutex +} + +func (f *liteDB) buildIndex(ctx context.Context) error { + f.mtx.Lock() + defer f.mtx.Unlock() + + // Skip if we already have an index. + if f.pages != nil { + return nil + } + + // Build the index from scratch from a Litestream restore plan. + infos, err := litestream.CalcRestorePlan(ctx, f.client, 0, time.Time{}, f.opts.Logger) + if err != nil { + if !errors.Is(err, litestream.ErrTxNotAvailable) { + return fmt.Errorf("calc restore plan: %w", err) + } + return nil + } + + for _, info := range infos { + err := f.updateInfo(ctx, info) + if err != nil { + return err + } + } + + f.lastPoll = time.Now() + return nil +} + +func (f *liteDB) pollReplica(ctx context.Context) (*pageIndex, ltx.TXID, error) { + f.mtx.Lock() + defer f.mtx.Unlock() + + // Limit polling interval. + if time.Since(f.lastPoll) < f.opts.PollInterval { + return f.pages, f.txids[0], nil + } + + // Updating from MinLevel to SnapshotLevel is non-racy, + // since LTX files are compacted into higher levels + // before the lower level LTX files are deleted. + for level := f.opts.MinLevel; level <= litestream.SnapshotLevel; level++ { + if err := f.updateLevel(ctx, level); err != nil { + f.opts.Logger.Error("cannot poll replica", "error", err) + return nil, 0, err + } + } + + f.lastPoll = time.Now() + return f.pages, f.txids[0], nil +} + +// +checklocks:f.mtx +func (f *liteDB) updateLevel(ctx context.Context, level int) error { + var nextTXID ltx.TXID + // Snapshots must start from scratch, + // other levels can start from where they were left. + if level != litestream.SnapshotLevel { + nextTXID = f.txids[level] + 1 + } + + // Start reading from the next LTX file after the current position. + itr, err := f.client.LTXFiles(ctx, level, nextTXID, false) + if err != nil { + return fmt.Errorf("ltx files: %w", err) + } + defer itr.Close() + + // Build an update across all new LTX files. + for itr.Next() { + info := itr.Item() + + // Skip LTX files already fully loaded into the index. + if info.MaxTXID <= f.txids[level] { + continue + } + + err := f.updateInfo(ctx, info) + if err != nil { + return err + } + } + if err := itr.Err(); err != nil { + return err + } + return itr.Close() +} + +// +checklocks:f.mtx +func (f *liteDB) updateInfo(ctx context.Context, info *ltx.FileInfo) error { + idx, err := litestream.FetchPageIndex(ctx, f.client, info) + if err != nil { + return fmt.Errorf("fetch page index: %w", err) + } + + // Replace pages in the index with new pages. + for k, v := range idx { + // Patch avoids mutating the index for an unmodified page. + f.pages = f.pages.Patch(k, func(node *pageIndex) (ltx.PageIndexElem, bool) { + return v, node == nil || v != node.Value() + }) + } + + // Track the MaxTXID for each level. + maxTXID := &f.txids[info.Level] + *maxTXID = max(*maxTXID, info.MaxTXID) + return nil +} + +// Type aliases; these are a mouthful. +type pageIndex = wbt.Tree[uint32, ltx.PageIndexElem] +type levelTXIDs = [litestream.SnapshotLevel + 1]ltx.TXID diff --git a/litestream/vfs_test.go b/litestream/vfs_test.go new file mode 100644 index 0000000..712a09f --- /dev/null +++ b/litestream/vfs_test.go @@ -0,0 +1,47 @@ +package litestream_test + +import ( + "log" + "time" + + "github.com/benbjohnson/litestream/s3" + "github.com/ncruces/go-sqlite3/driver" + _ "github.com/ncruces/go-sqlite3/embed" + "github.com/ncruces/go-sqlite3/litestream" +) + +func ExampleNewReplica() { + client := s3.NewReplicaClient() + client.Bucket = "test-bucket" + client.Path = "fruits.db" + + litestream.NewReplica("fruits.db", client, litestream.ReplicaOptions{ + PollInterval: 5 * time.Second, + }) + + db, err := driver.Open("file:fruits.db?vfs=litestream") + if err != nil { + log.Fatalln(err) + } + defer db.Close() + + for { + time.Sleep(time.Second) + rows, err := db.Query("SELECT * FROM fruits") + if err != nil { + log.Fatalln(err) + } + + for rows.Next() { + var name, color string + err := rows.Scan(&name, &color) + if err != nil { + log.Fatalln(err) + } + log.Println(name, color) + } + + log.Println("===") + rows.Close() + } +} diff --git a/vfs/README.md b/vfs/README.md index f34e08b..0b99cb2 100644 --- a/vfs/README.md +++ b/vfs/README.md @@ -117,9 +117,13 @@ The VFS can be customized with a few build tags: - [`github.com/ncruces/go-sqlite3/vfs/memdb`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/vfs/memdb) implements an in-memory VFS. +- [`github.com/ncruces/go-sqlite3/vfs/mvcc`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/vfs/mvcc) + implements an in-memory MVCC VFS. - [`github.com/ncruces/go-sqlite3/vfs/readervfs`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/vfs/readervfs) implements a VFS for immutable databases. - [`github.com/ncruces/go-sqlite3/vfs/adiantum`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/vfs/adiantum) wraps a VFS to offer encryption at rest. - [`github.com/ncruces/go-sqlite3/vfs/xts`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/vfs/xts) wraps a VFS to offer encryption at rest. +- [`github.com/ncruces/go-sqlite3/litestream`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/litestream) + implements Litestream [lightweight read-replicas](https://fly.io/blog/litestream-revamped/#lightweight-read-replicas).