Compare commits

...

1 Commits

Author SHA1 Message Date
Nuno Cruces
e31a42fb22 Lines virtual table. 2023-11-23 15:59:24 +00:00
6 changed files with 309 additions and 6 deletions

View File

@@ -28,6 +28,8 @@ and uses [wazero](https://wazero.io/) to provide `cgo`-free SQLite bindings.
simplifies [incremental BLOB I/O](https://sqlite.org/c3ref/blob_open.html).
- [`github.com/ncruces/go-sqlite3/ext/csv`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/ext/csv)
reads [comma-separated values](https://sqlite.org/csv.html).
- [`github.com/ncruces/go-sqlite3/ext/lines`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/ext/lines)
reads files [line-by-line](https://github.com/asg017/sqlite-lines).
- [`github.com/ncruces/go-sqlite3/ext/stats`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/ext/stats)
provides [statistics functions](https://www.oreilly.com/library/view/sql-in-a/9780596155322/ch04s02.html).
- [`github.com/ncruces/go-sqlite3/ext/unicode`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/ext/unicode)

View File

@@ -297,6 +297,7 @@ func Test_QueryRow_blob_null(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer rows.Close()
want := [][]byte{nil, {0xca, 0xfe}, {0xba, 0xbe}, nil}
for i := 0; rows.Next(); i++ {

View File

@@ -59,7 +59,7 @@ func TestRegister(t *testing.T) {
csv.Register(db)
data := `
const data = `
"Rob" "Pike" rob
"Ken" Thompson ken
Robert "Griesemer" "gri"`
@@ -116,35 +116,35 @@ func TestRegister_errors(t *testing.T) {
err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv()`)
if err == nil {
t.Fatal(err)
t.Fatal("want error")
} else {
t.Log(err)
}
err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv(data='abc', data='abc')`)
if err == nil {
t.Fatal(err)
t.Fatal("want error")
} else {
t.Log(err)
}
err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv(data='abc', xpto='abc')`)
if err == nil {
t.Fatal(err)
t.Fatal("want error")
} else {
t.Log(err)
}
err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv(data='abc', comma='"')`)
if err == nil {
t.Fatal(err)
t.Fatal("want error")
} else {
t.Log(err)
}
err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv(data='abc', header=tru)`)
if err == nil {
t.Fatal(err)
t.Fatal("want error")
} else {
t.Log(err)
}

121
ext/lines/lines.go Normal file
View File

@@ -0,0 +1,121 @@
// Package lines provides a virtual table to read large files line-by-line.
package lines
import (
"bufio"
"bytes"
"fmt"
"io"
"math"
"os"
"github.com/ncruces/go-sqlite3"
)
// Register registers the lines and lines_read virtual tables.
// The lines virtual table reads from a database blob or text.
// The lines_read virtual table reads from a file or an [io.ReaderAt].
func Register(db *sqlite3.Conn) {
sqlite3.CreateModule[lines](db, "lines", nil,
func(db *sqlite3.Conn, arg ...string) (lines, error) {
err := db.DeclareVtab(`CREATE TABLE x(line TEXT, data HIDDEN)`)
db.VtabConfig(sqlite3.VTAB_INNOCUOUS)
return false, err
})
sqlite3.CreateModule[lines](db, "lines_read", nil,
func(db *sqlite3.Conn, arg ...string) (lines, error) {
err := db.DeclareVtab(`CREATE TABLE x(line TEXT, data HIDDEN)`)
db.VtabConfig(sqlite3.VTAB_DIRECTONLY)
return true, err
})
}
type lines bool
func (l lines) BestIndex(idx *sqlite3.IndexInfo) error {
for i, cst := range idx.Constraint {
if cst.Column == 1 && cst.Op == sqlite3.INDEX_CONSTRAINT_EQ && cst.Usable {
idx.ConstraintUsage[i] = sqlite3.IndexConstraintUsage{
Omit: true,
ArgvIndex: 1,
}
idx.EstimatedCost = 1e6
idx.EstimatedRows = 100
return nil
}
}
return sqlite3.CONSTRAINT
}
func (l lines) Open() (sqlite3.VTabCursor, error) {
return &cursor{reader: bool(l)}, nil
}
type cursor struct {
reader bool
scanner *bufio.Scanner
closer io.Closer
rowID int64
eof bool
}
func (c *cursor) Close() (err error) {
if c.closer != nil {
err = c.closer.Close()
c.closer = nil
}
return err
}
func (c *cursor) EOF() bool {
return c.eof
}
func (c *cursor) Next() error {
c.rowID++
c.eof = !c.scanner.Scan()
return c.scanner.Err()
}
func (c *cursor) RowID() (int64, error) {
return c.rowID, nil
}
func (c *cursor) Column(ctx *sqlite3.Context, n int) error {
if n == 0 {
ctx.ResultRawText(c.scanner.Bytes())
}
return nil
}
func (c *cursor) Filter(idxNum int, idxStr string, arg ...sqlite3.Value) error {
if err := c.Close(); err != nil {
return err
}
var r io.Reader
data := arg[0]
if c.reader {
if data.Type() == sqlite3.NULL {
if p, ok := data.Pointer().(io.ReaderAt); ok {
r = io.NewSectionReader(p, 0, math.MaxInt64)
}
} else {
f, err := os.Open(data.Text())
if err != nil {
return err
}
c.closer = f
r = f
}
} else if data.Type() != sqlite3.NULL {
r = bytes.NewReader(data.RawBlob())
}
if r == nil {
return fmt.Errorf("lines: unsupported argument:%.0w %v", sqlite3.MISMATCH, data.Type())
}
c.scanner = bufio.NewScanner(r)
c.rowID = 0
return c.Next()
}

178
ext/lines/lines_test.go Normal file
View File

@@ -0,0 +1,178 @@
package lines_test
import (
"database/sql"
"errors"
"fmt"
"log"
"os"
"strings"
"testing"
"github.com/ncruces/go-sqlite3"
"github.com/ncruces/go-sqlite3/driver"
_ "github.com/ncruces/go-sqlite3/embed"
"github.com/ncruces/go-sqlite3/ext/lines"
)
func Example() {
db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error {
lines.Register(c)
return nil
})
if err != nil {
log.Fatal(err)
}
defer db.Close()
// https://storage.googleapis.com/quickdraw_dataset/full/simplified/calendar.ndjson
f, err := os.Open("calendar.ndjson")
if err != nil {
log.Fatal(err)
}
defer f.Close()
rows, err := db.Query(`
SELECT
line ->> '$.countrycode' as countrycode,
COUNT(*)
FROM lines_read(?)
GROUP BY 1
ORDER BY 2 DESC
LIMIT 5`,
sqlite3.Pointer(f))
if err != nil {
log.Fatal(err)
}
defer rows.Close()
var countrycode sql.RawBytes
var count int
for rows.Next() {
err := rows.Scan(&countrycode, &count)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s: %d\n", countrycode, count)
}
if err := rows.Err(); err != nil {
log.Fatal(err)
}
// Sample output:
// US: 141001
// GB: 22560
// CA: 11759
// RU: 9250
// DE: 8748
}
func Test_lines(t *testing.T) {
db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error {
lines.Register(c)
return nil
})
if err != nil {
log.Fatal(err)
}
defer db.Close()
const data = "line 1\nline 2\nline 3"
rows, err := db.Query(`SELECT rowid, line FROM lines(?)`, data)
if err != nil {
t.Fatal(err)
}
defer rows.Close()
for rows.Next() {
var id int64
var line string
err := rows.Scan(&id, &line)
if err != nil {
t.Fatal(err)
}
}
}
func Test_lines_error(t *testing.T) {
db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error {
lines.Register(c)
return nil
})
if err != nil {
log.Fatal(err)
}
defer db.Close()
_, err = db.Exec(`SELECT rowid, line FROM lines(?)`, nil)
if err == nil {
t.Fatal("want error")
} else {
t.Log(err)
}
_, err = db.Exec(`SELECT rowid, line FROM lines_read(?)`, "xpto")
if err == nil {
t.Fatal("want error")
} else {
t.Log(err)
}
}
func Test_lines_read(t *testing.T) {
db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error {
lines.Register(c)
return nil
})
if err != nil {
log.Fatal(err)
}
defer db.Close()
const data = "line 1\nline 2\nline 3"
rows, err := db.Query(`SELECT rowid, line FROM lines_read(?)`,
sqlite3.Pointer(strings.NewReader(data)))
if err != nil {
t.Fatal(err)
}
defer rows.Close()
for rows.Next() {
var id int64
var line string
err := rows.Scan(&id, &line)
if err != nil {
t.Fatal(err)
}
}
}
func Test_lines_test(t *testing.T) {
db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error {
lines.Register(c)
return nil
})
if err != nil {
log.Fatal(err)
}
defer db.Close()
rows, err := db.Query(`SELECT rowid, line FROM lines_read(?)`, "lines_test.go")
if errors.Is(err, os.ErrNotExist) {
t.Skip(err)
}
if err != nil {
t.Fatal(err)
}
defer rows.Close()
for rows.Next() {
var id int64
var line string
err := rows.Scan(&id, &line)
if err != nil {
t.Fatal(err)
}
}
}

View File

@@ -58,6 +58,7 @@ func TestJSON(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer rows.Close()
want := []string{
"null", "1", "3.141592653589793",