Files
sqlite3/ext/bloom/bloom.go

346 lines
7.3 KiB
Go
Raw Normal View History

2024-06-18 23:42:20 +01:00
// Package bloom provides a Bloom filter virtual table.
//
// A Bloom filter is a space-efficient probabilistic data structure
// used to test whether an element is a member of a set.
//
// https://github.com/nalgeon/sqlean/issues/27#issuecomment-1002267134
package bloom
import (
"fmt"
"io"
"math"
"strconv"
"github.com/dchest/siphash"
2024-10-18 12:20:32 +01:00
2024-06-18 23:42:20 +01:00
"github.com/ncruces/go-sqlite3"
2024-07-04 15:28:49 +01:00
"github.com/ncruces/go-sqlite3/internal/util"
2025-12-12 17:21:33 +00:00
"github.com/ncruces/go-sqlite3/util/sql3util"
2024-06-18 23:42:20 +01:00
)
// Register registers the bloom_filter virtual table:
//
// CREATE VIRTUAL TABLE foo USING bloom_filter(nElements, falseProb, kHashes)
2024-07-08 12:06:57 +01:00
func Register(db *sqlite3.Conn) error {
return sqlite3.CreateModule(db, "bloom_filter", create, connect)
2024-06-18 23:42:20 +01:00
}
type bloom struct {
db *sqlite3.Conn
schema string
storage string
prob float64
2024-06-19 14:43:44 +01:00
bytes int64
2024-06-18 23:42:20 +01:00
hashes int
}
func create(db *sqlite3.Conn, _, schema, table string, arg ...string) (_ *bloom, err error) {
2024-10-07 13:22:31 +01:00
b := bloom{
2024-06-18 23:42:20 +01:00
db: db,
schema: schema,
storage: table + "_storage",
}
2024-06-19 14:43:44 +01:00
var nelem int64
2024-06-18 23:42:20 +01:00
if len(arg) > 0 {
2024-06-19 14:43:44 +01:00
nelem, err = strconv.ParseInt(arg[0], 10, 64)
2024-06-18 23:42:20 +01:00
if err != nil {
return nil, err
}
if nelem <= 0 {
2024-07-04 15:28:49 +01:00
return nil, util.ErrorString("bloom: number of elements in filter must be positive")
2024-06-18 23:42:20 +01:00
}
2024-06-19 14:43:44 +01:00
} else {
nelem = 100
2024-06-18 23:42:20 +01:00
}
if len(arg) > 1 {
2025-12-12 17:21:33 +00:00
var ok bool
b.prob, ok = sql3util.ParseFloat(arg[1])
if !ok || b.prob <= 0 || b.prob >= 1 {
2024-07-04 15:28:49 +01:00
return nil, util.ErrorString("bloom: probability must be in the range (0,1)")
2024-06-18 23:42:20 +01:00
}
} else {
2024-10-07 13:22:31 +01:00
b.prob = 0.01
2024-06-18 23:42:20 +01:00
}
if len(arg) > 2 {
2024-10-07 13:22:31 +01:00
b.hashes, err = strconv.Atoi(arg[2])
2024-06-18 23:42:20 +01:00
if err != nil {
return nil, err
}
2024-10-07 13:22:31 +01:00
if b.hashes <= 0 {
2024-07-04 15:28:49 +01:00
return nil, util.ErrorString("bloom: number of hash functions must be positive")
2024-06-18 23:42:20 +01:00
}
} else {
2024-10-07 13:22:31 +01:00
b.hashes = max(1, numHashes(b.prob))
2024-06-18 23:42:20 +01:00
}
2024-10-07 13:22:31 +01:00
b.bytes = numBytes(nelem, b.prob)
2024-06-18 23:42:20 +01:00
2024-09-21 00:51:03 +01:00
err = db.DeclareVTab(
`CREATE TABLE x(present, word HIDDEN NOT NULL PRIMARY KEY) WITHOUT ROWID`)
if err != nil {
return nil, err
}
2024-06-18 23:42:20 +01:00
err = db.Exec(fmt.Sprintf(
`CREATE TABLE %s.%s (data BLOB, p REAL, n INTEGER, m INTEGER, k INTEGER)`,
2024-10-07 13:22:31 +01:00
sqlite3.QuoteIdentifier(b.schema), sqlite3.QuoteIdentifier(b.storage)))
2024-06-18 23:42:20 +01:00
if err != nil {
return nil, err
}
2024-06-19 23:25:05 +01:00
id := db.LastInsertRowID()
defer db.SetLastInsertRowID(id)
2024-06-18 23:42:20 +01:00
err = db.Exec(fmt.Sprintf(
`INSERT INTO %s.%s (rowid, data, p, n, m, k)
VALUES (1, zeroblob(%d), %f, %d, %d, %d)`,
2024-10-07 13:22:31 +01:00
sqlite3.QuoteIdentifier(b.schema), sqlite3.QuoteIdentifier(b.storage),
b.bytes, b.prob, nelem, 8*b.bytes, b.hashes))
2024-06-18 23:42:20 +01:00
if err != nil {
2024-10-07 13:22:31 +01:00
b.Destroy()
2024-06-18 23:42:20 +01:00
return nil, err
}
2024-10-07 13:22:31 +01:00
return &b, nil
2024-06-18 23:42:20 +01:00
}
func connect(db *sqlite3.Conn, _, schema, table string, arg ...string) (_ *bloom, err error) {
2024-10-07 13:22:31 +01:00
b := bloom{
2024-06-18 23:42:20 +01:00
db: db,
schema: schema,
storage: table + "_storage",
}
err = db.DeclareVTab(
`CREATE TABLE x(present, word HIDDEN NOT NULL PRIMARY KEY) WITHOUT ROWID`)
if err != nil {
return nil, err
}
load, _, err := db.Prepare(fmt.Sprintf(
`SELECT m/8, p, k FROM %s.%s WHERE rowid = 1`,
2024-10-07 13:22:31 +01:00
sqlite3.QuoteIdentifier(b.schema), sqlite3.QuoteIdentifier(b.storage)))
2024-06-18 23:42:20 +01:00
if err != nil {
return nil, err
}
defer load.Close()
if !load.Step() {
2024-07-23 13:28:09 +01:00
if err := load.Err(); err != nil {
return nil, err
2024-06-18 23:42:20 +01:00
}
2024-07-23 13:28:09 +01:00
return nil, sqlite3.CORRUPT_VTAB
2024-06-18 23:42:20 +01:00
}
2024-10-07 13:22:31 +01:00
b.bytes = load.ColumnInt64(0)
b.prob = load.ColumnFloat(1)
b.hashes = load.ColumnInt(2)
return &b, nil
2024-06-18 23:42:20 +01:00
}
func (b *bloom) Destroy() error {
return b.db.Exec(fmt.Sprintf(`DROP TABLE %s.%s`,
sqlite3.QuoteIdentifier(b.schema),
sqlite3.QuoteIdentifier(b.storage)))
}
func (b *bloom) Rename(new string) error {
new += "_storage"
err := b.db.Exec(fmt.Sprintf(`ALTER TABLE %s.%s RENAME TO %s`,
sqlite3.QuoteIdentifier(b.schema),
sqlite3.QuoteIdentifier(b.storage),
sqlite3.QuoteIdentifier(new),
))
if err == nil {
b.storage = new
}
return err
}
2024-07-20 01:42:50 +01:00
func (t *bloom) ShadowTables() {
// notest // not meant to be called
}
2024-06-21 13:01:55 +01:00
func (t *bloom) Integrity(schema, table string, flags int) error {
load, _, err := t.db.Prepare(fmt.Sprintf(
`SELECT typeof(data), length(data), p, n, m, k FROM %s.%s WHERE rowid = 1`,
sqlite3.QuoteIdentifier(t.schema), sqlite3.QuoteIdentifier(t.storage)))
if err != nil {
return fmt.Errorf("bloom: %v", err) // can't wrap!
}
defer load.Close()
2024-07-04 15:28:49 +01:00
err = util.ErrorString("bloom: invalid parameters")
2024-06-21 13:01:55 +01:00
if !load.Step() {
return err
}
if t := load.ColumnText(0); t != "blob" {
return err
}
if m := load.ColumnInt64(4); m <= 0 || m%8 != 0 {
return err
} else if load.ColumnInt64(1) != m/8 {
return err
}
if p := load.ColumnFloat(2); p <= 0 || p >= 1 {
return err
}
if n := load.ColumnInt64(3); n <= 0 {
return err
}
if k := load.ColumnInt(5); k <= 0 {
return err
}
return nil
}
2024-06-18 23:42:20 +01:00
func (b *bloom) BestIndex(idx *sqlite3.IndexInfo) error {
2024-09-21 00:51:03 +01:00
for i, cst := range idx.Constraint {
2024-06-18 23:42:20 +01:00
if cst.Usable && cst.Column == 1 &&
cst.Op == sqlite3.INDEX_CONSTRAINT_EQ {
2024-09-21 00:51:03 +01:00
idx.ConstraintUsage[i].ArgvIndex = 1
2024-06-19 23:25:05 +01:00
idx.OrderByConsumed = true
idx.EstimatedRows = 1
idx.EstimatedCost = float64(b.hashes)
idx.IdxFlags = sqlite3.INDEX_SCAN_UNIQUE
return nil
2024-06-18 23:42:20 +01:00
}
}
2024-06-19 23:25:05 +01:00
return sqlite3.CONSTRAINT
2024-06-18 23:42:20 +01:00
}
func (b *bloom) Update(arg ...sqlite3.Value) (rowid int64, err error) {
if arg[0].Type() != sqlite3.NULL {
if len(arg) == 1 {
2024-07-04 15:28:49 +01:00
return 0, util.ErrorString("bloom: elements cannot be deleted")
2024-06-18 23:42:20 +01:00
}
2024-07-04 15:28:49 +01:00
return 0, util.ErrorString("bloom: elements cannot be updated")
2024-06-18 23:42:20 +01:00
}
2024-07-10 00:08:59 +01:00
if arg[2].NoChange() {
return 0, nil
}
2024-06-18 23:42:20 +01:00
blob := arg[2].RawBlob()
f, err := b.db.OpenBlob(b.schema, b.storage, "data", 1, true)
if err != nil {
return 0, err
}
defer f.Close()
2025-01-24 10:46:05 +00:00
for n := range b.hashes {
2024-06-18 23:42:20 +01:00
hash := calcHash(n, blob)
2024-06-19 14:43:44 +01:00
hash %= uint64(b.bytes * 8)
2024-06-18 23:42:20 +01:00
bitpos := byte(hash % 8)
bytepos := int64(hash / 8)
var buf [1]byte
_, err = f.Seek(bytepos, io.SeekStart)
if err != nil {
return 0, err
}
_, err = f.Read(buf[:])
if err != nil {
return 0, err
}
2024-06-19 14:43:44 +01:00
buf[0] |= 1 << bitpos
2024-06-18 23:42:20 +01:00
_, err = f.Seek(bytepos, io.SeekStart)
if err != nil {
return 0, err
}
_, err = f.Write(buf[:])
if err != nil {
return 0, err
}
}
return 0, nil
}
func (b *bloom) Open() (sqlite3.VTabCursor, error) {
return &cursor{bloom: b}, nil
}
type cursor struct {
*bloom
2025-03-10 12:01:15 +00:00
arg sqlite3.Value
2024-07-02 15:42:20 +01:00
eof bool
2024-06-18 23:42:20 +01:00
}
func (c *cursor) Filter(idxNum int, idxStr string, arg ...sqlite3.Value) error {
c.eof = false
2025-03-10 12:01:15 +00:00
c.arg = arg[0]
2024-06-18 23:42:20 +01:00
blob := arg[0].RawBlob()
f, err := c.db.OpenBlob(c.schema, c.storage, "data", 1, false)
if err != nil {
return err
}
defer f.Close()
2024-06-19 14:43:44 +01:00
for n := 0; n < c.hashes && !c.eof; n++ {
2024-06-18 23:42:20 +01:00
hash := calcHash(n, blob)
2024-06-19 14:43:44 +01:00
hash %= uint64(c.bytes * 8)
2024-06-18 23:42:20 +01:00
bitpos := byte(hash % 8)
bytepos := int64(hash / 8)
var buf [1]byte
_, err = f.Seek(bytepos, io.SeekStart)
if err != nil {
return err
}
_, err = f.Read(buf[:])
if err != nil {
return err
}
2024-06-19 14:43:44 +01:00
c.eof = buf[0]&(1<<bitpos) == 0
2024-06-18 23:42:20 +01:00
}
return nil
}
2024-07-26 12:25:15 +01:00
func (c *cursor) Column(ctx sqlite3.Context, n int) error {
2024-07-10 00:08:59 +01:00
if ctx.VTabNoChange() {
return nil
}
2024-06-18 23:42:20 +01:00
switch n {
case 0:
ctx.ResultBool(true)
case 1:
2025-03-10 12:01:15 +00:00
ctx.ResultValue(c.arg)
2024-06-18 23:42:20 +01:00
}
return nil
}
func (c *cursor) Next() error {
c.eof = true
return nil
}
func (c *cursor) EOF() bool {
return c.eof
}
func (c *cursor) RowID() (int64, error) {
2024-07-10 15:41:28 +01:00
// notest // WITHOUT ROWID
2024-06-18 23:42:20 +01:00
return 0, nil
}
func calcHash(k int, b []byte) uint64 {
return siphash.Hash(^uint64(k), uint64(k), b)
}
2024-06-19 14:43:44 +01:00
func numHashes(p float64) int {
k := math.Round(-math.Log2(p))
return max(1, int(k))
}
func numBytes(n int64, p float64) int64 {
m := math.Ceil(float64(n) * math.Log(p) / -(math.Ln2 * math.Ln2))
return (int64(m) + 7) / 8
2024-06-18 23:42:20 +01:00
}