From e31a42fb22e01025eb26e70efc91fda655d99817 Mon Sep 17 00:00:00 2001 From: Nuno Cruces Date: Thu, 23 Nov 2023 15:32:28 +0000 Subject: [PATCH] Lines virtual table. --- README.md | 2 + driver/driver_test.go | 1 + ext/csv/csv_test.go | 12 +-- ext/lines/lines.go | 121 +++++++++++++++++++++++++++ ext/lines/lines_test.go | 178 ++++++++++++++++++++++++++++++++++++++++ tests/json_test.go | 1 + 6 files changed, 309 insertions(+), 6 deletions(-) create mode 100644 ext/lines/lines.go create mode 100644 ext/lines/lines_test.go diff --git a/README.md b/README.md index 9bbb15e..217c049 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,8 @@ and uses [wazero](https://wazero.io/) to provide `cgo`-free SQLite bindings. simplifies [incremental BLOB I/O](https://sqlite.org/c3ref/blob_open.html). - [`github.com/ncruces/go-sqlite3/ext/csv`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/ext/csv) reads [comma-separated values](https://sqlite.org/csv.html). +- [`github.com/ncruces/go-sqlite3/ext/lines`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/ext/lines) + reads files [line-by-line](https://github.com/asg017/sqlite-lines). - [`github.com/ncruces/go-sqlite3/ext/stats`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/ext/stats) provides [statistics functions](https://www.oreilly.com/library/view/sql-in-a/9780596155322/ch04s02.html). - [`github.com/ncruces/go-sqlite3/ext/unicode`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/ext/unicode) diff --git a/driver/driver_test.go b/driver/driver_test.go index 8889885..04541e4 100644 --- a/driver/driver_test.go +++ b/driver/driver_test.go @@ -297,6 +297,7 @@ func Test_QueryRow_blob_null(t *testing.T) { if err != nil { t.Fatal(err) } + defer rows.Close() want := [][]byte{nil, {0xca, 0xfe}, {0xba, 0xbe}, nil} for i := 0; rows.Next(); i++ { diff --git a/ext/csv/csv_test.go b/ext/csv/csv_test.go index aa8763a..3beea74 100644 --- a/ext/csv/csv_test.go +++ b/ext/csv/csv_test.go @@ -59,7 +59,7 @@ func TestRegister(t *testing.T) { csv.Register(db) - data := ` + const data = ` "Rob" "Pike" rob "Ken" Thompson ken Robert "Griesemer" "gri"` @@ -116,35 +116,35 @@ func TestRegister_errors(t *testing.T) { err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv()`) if err == nil { - t.Fatal(err) + t.Fatal("want error") } else { t.Log(err) } err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv(data='abc', data='abc')`) if err == nil { - t.Fatal(err) + t.Fatal("want error") } else { t.Log(err) } err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv(data='abc', xpto='abc')`) if err == nil { - t.Fatal(err) + t.Fatal("want error") } else { t.Log(err) } err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv(data='abc', comma='"')`) if err == nil { - t.Fatal(err) + t.Fatal("want error") } else { t.Log(err) } err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv(data='abc', header=tru)`) if err == nil { - t.Fatal(err) + t.Fatal("want error") } else { t.Log(err) } diff --git a/ext/lines/lines.go b/ext/lines/lines.go new file mode 100644 index 0000000..f2c9bb3 --- /dev/null +++ b/ext/lines/lines.go @@ -0,0 +1,121 @@ +// Package lines provides a virtual table to read large files line-by-line. +package lines + +import ( + "bufio" + "bytes" + "fmt" + "io" + "math" + "os" + + "github.com/ncruces/go-sqlite3" +) + +// Register registers the lines and lines_read virtual tables. +// The lines virtual table reads from a database blob or text. +// The lines_read virtual table reads from a file or an [io.ReaderAt]. +func Register(db *sqlite3.Conn) { + sqlite3.CreateModule[lines](db, "lines", nil, + func(db *sqlite3.Conn, arg ...string) (lines, error) { + err := db.DeclareVtab(`CREATE TABLE x(line TEXT, data HIDDEN)`) + db.VtabConfig(sqlite3.VTAB_INNOCUOUS) + return false, err + }) + sqlite3.CreateModule[lines](db, "lines_read", nil, + func(db *sqlite3.Conn, arg ...string) (lines, error) { + err := db.DeclareVtab(`CREATE TABLE x(line TEXT, data HIDDEN)`) + db.VtabConfig(sqlite3.VTAB_DIRECTONLY) + return true, err + }) +} + +type lines bool + +func (l lines) BestIndex(idx *sqlite3.IndexInfo) error { + for i, cst := range idx.Constraint { + if cst.Column == 1 && cst.Op == sqlite3.INDEX_CONSTRAINT_EQ && cst.Usable { + idx.ConstraintUsage[i] = sqlite3.IndexConstraintUsage{ + Omit: true, + ArgvIndex: 1, + } + idx.EstimatedCost = 1e6 + idx.EstimatedRows = 100 + return nil + } + } + return sqlite3.CONSTRAINT +} + +func (l lines) Open() (sqlite3.VTabCursor, error) { + return &cursor{reader: bool(l)}, nil +} + +type cursor struct { + reader bool + scanner *bufio.Scanner + closer io.Closer + rowID int64 + eof bool +} + +func (c *cursor) Close() (err error) { + if c.closer != nil { + err = c.closer.Close() + c.closer = nil + } + return err +} + +func (c *cursor) EOF() bool { + return c.eof +} + +func (c *cursor) Next() error { + c.rowID++ + c.eof = !c.scanner.Scan() + return c.scanner.Err() +} + +func (c *cursor) RowID() (int64, error) { + return c.rowID, nil +} + +func (c *cursor) Column(ctx *sqlite3.Context, n int) error { + if n == 0 { + ctx.ResultRawText(c.scanner.Bytes()) + } + return nil +} + +func (c *cursor) Filter(idxNum int, idxStr string, arg ...sqlite3.Value) error { + if err := c.Close(); err != nil { + return err + } + + var r io.Reader + data := arg[0] + if c.reader { + if data.Type() == sqlite3.NULL { + if p, ok := data.Pointer().(io.ReaderAt); ok { + r = io.NewSectionReader(p, 0, math.MaxInt64) + } + } else { + f, err := os.Open(data.Text()) + if err != nil { + return err + } + c.closer = f + r = f + } + } else if data.Type() != sqlite3.NULL { + r = bytes.NewReader(data.RawBlob()) + } + + if r == nil { + return fmt.Errorf("lines: unsupported argument:%.0w %v", sqlite3.MISMATCH, data.Type()) + } + c.scanner = bufio.NewScanner(r) + c.rowID = 0 + return c.Next() +} diff --git a/ext/lines/lines_test.go b/ext/lines/lines_test.go new file mode 100644 index 0000000..d0912b1 --- /dev/null +++ b/ext/lines/lines_test.go @@ -0,0 +1,178 @@ +package lines_test + +import ( + "database/sql" + "errors" + "fmt" + "log" + "os" + "strings" + "testing" + + "github.com/ncruces/go-sqlite3" + "github.com/ncruces/go-sqlite3/driver" + _ "github.com/ncruces/go-sqlite3/embed" + "github.com/ncruces/go-sqlite3/ext/lines" +) + +func Example() { + db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error { + lines.Register(c) + return nil + }) + if err != nil { + log.Fatal(err) + } + defer db.Close() + + // https://storage.googleapis.com/quickdraw_dataset/full/simplified/calendar.ndjson + f, err := os.Open("calendar.ndjson") + if err != nil { + log.Fatal(err) + } + defer f.Close() + + rows, err := db.Query(` + SELECT + line ->> '$.countrycode' as countrycode, + COUNT(*) + FROM lines_read(?) + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 5`, + sqlite3.Pointer(f)) + if err != nil { + log.Fatal(err) + } + defer rows.Close() + + var countrycode sql.RawBytes + var count int + for rows.Next() { + err := rows.Scan(&countrycode, &count) + if err != nil { + log.Fatal(err) + } + fmt.Printf("%s: %d\n", countrycode, count) + } + if err := rows.Err(); err != nil { + log.Fatal(err) + } + // Sample output: + // US: 141001 + // GB: 22560 + // CA: 11759 + // RU: 9250 + // DE: 8748 +} + +func Test_lines(t *testing.T) { + db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error { + lines.Register(c) + return nil + }) + if err != nil { + log.Fatal(err) + } + defer db.Close() + + const data = "line 1\nline 2\nline 3" + + rows, err := db.Query(`SELECT rowid, line FROM lines(?)`, data) + if err != nil { + t.Fatal(err) + } + defer rows.Close() + + for rows.Next() { + var id int64 + var line string + err := rows.Scan(&id, &line) + if err != nil { + t.Fatal(err) + } + } +} + +func Test_lines_error(t *testing.T) { + db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error { + lines.Register(c) + return nil + }) + if err != nil { + log.Fatal(err) + } + defer db.Close() + + _, err = db.Exec(`SELECT rowid, line FROM lines(?)`, nil) + if err == nil { + t.Fatal("want error") + } else { + t.Log(err) + } + + _, err = db.Exec(`SELECT rowid, line FROM lines_read(?)`, "xpto") + if err == nil { + t.Fatal("want error") + } else { + t.Log(err) + } +} + +func Test_lines_read(t *testing.T) { + db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error { + lines.Register(c) + return nil + }) + if err != nil { + log.Fatal(err) + } + defer db.Close() + + const data = "line 1\nline 2\nline 3" + + rows, err := db.Query(`SELECT rowid, line FROM lines_read(?)`, + sqlite3.Pointer(strings.NewReader(data))) + if err != nil { + t.Fatal(err) + } + defer rows.Close() + + for rows.Next() { + var id int64 + var line string + err := rows.Scan(&id, &line) + if err != nil { + t.Fatal(err) + } + } +} + +func Test_lines_test(t *testing.T) { + db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error { + lines.Register(c) + return nil + }) + if err != nil { + log.Fatal(err) + } + defer db.Close() + + rows, err := db.Query(`SELECT rowid, line FROM lines_read(?)`, "lines_test.go") + if errors.Is(err, os.ErrNotExist) { + t.Skip(err) + } + if err != nil { + t.Fatal(err) + } + defer rows.Close() + + for rows.Next() { + var id int64 + var line string + err := rows.Scan(&id, &line) + if err != nil { + t.Fatal(err) + } + } +} diff --git a/tests/json_test.go b/tests/json_test.go index a06092e..7eb2218 100644 --- a/tests/json_test.go +++ b/tests/json_test.go @@ -58,6 +58,7 @@ func TestJSON(t *testing.T) { if err != nil { t.Fatal(err) } + defer rows.Close() want := []string{ "null", "1", "3.141592653589793",