mirror of
https://github.com/ncruces/go-sqlite3.git
synced 2026-01-12 05:59:14 +00:00
Lines virtual table.
This commit is contained in:
@@ -59,7 +59,7 @@ func TestRegister(t *testing.T) {
|
||||
|
||||
csv.Register(db)
|
||||
|
||||
data := `
|
||||
const data = `
|
||||
"Rob" "Pike" rob
|
||||
"Ken" Thompson ken
|
||||
Robert "Griesemer" "gri"`
|
||||
@@ -116,35 +116,35 @@ func TestRegister_errors(t *testing.T) {
|
||||
|
||||
err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv()`)
|
||||
if err == nil {
|
||||
t.Fatal(err)
|
||||
t.Fatal("want error")
|
||||
} else {
|
||||
t.Log(err)
|
||||
}
|
||||
|
||||
err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv(data='abc', data='abc')`)
|
||||
if err == nil {
|
||||
t.Fatal(err)
|
||||
t.Fatal("want error")
|
||||
} else {
|
||||
t.Log(err)
|
||||
}
|
||||
|
||||
err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv(data='abc', xpto='abc')`)
|
||||
if err == nil {
|
||||
t.Fatal(err)
|
||||
t.Fatal("want error")
|
||||
} else {
|
||||
t.Log(err)
|
||||
}
|
||||
|
||||
err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv(data='abc', comma='"')`)
|
||||
if err == nil {
|
||||
t.Fatal(err)
|
||||
t.Fatal("want error")
|
||||
} else {
|
||||
t.Log(err)
|
||||
}
|
||||
|
||||
err = db.Exec(`CREATE VIRTUAL TABLE temp.users USING csv(data='abc', header=tru)`)
|
||||
if err == nil {
|
||||
t.Fatal(err)
|
||||
t.Fatal("want error")
|
||||
} else {
|
||||
t.Log(err)
|
||||
}
|
||||
|
||||
121
ext/lines/lines.go
Normal file
121
ext/lines/lines.go
Normal file
@@ -0,0 +1,121 @@
|
||||
// Package lines provides a virtual table to read large files line-by-line.
|
||||
package lines
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
|
||||
"github.com/ncruces/go-sqlite3"
|
||||
)
|
||||
|
||||
// Register registers the lines and lines_read virtual tables.
|
||||
// The lines virtual table reads from a database blob or text.
|
||||
// The lines_read virtual table reads from a file or an [io.ReaderAt].
|
||||
func Register(db *sqlite3.Conn) {
|
||||
sqlite3.CreateModule[lines](db, "lines", nil,
|
||||
func(db *sqlite3.Conn, arg ...string) (lines, error) {
|
||||
err := db.DeclareVtab(`CREATE TABLE x(line TEXT, data HIDDEN)`)
|
||||
db.VtabConfig(sqlite3.VTAB_INNOCUOUS)
|
||||
return false, err
|
||||
})
|
||||
sqlite3.CreateModule[lines](db, "lines_read", nil,
|
||||
func(db *sqlite3.Conn, arg ...string) (lines, error) {
|
||||
err := db.DeclareVtab(`CREATE TABLE x(line TEXT, data HIDDEN)`)
|
||||
db.VtabConfig(sqlite3.VTAB_DIRECTONLY)
|
||||
return true, err
|
||||
})
|
||||
}
|
||||
|
||||
type lines bool
|
||||
|
||||
func (l lines) BestIndex(idx *sqlite3.IndexInfo) error {
|
||||
for i, cst := range idx.Constraint {
|
||||
if cst.Column == 1 && cst.Op == sqlite3.INDEX_CONSTRAINT_EQ && cst.Usable {
|
||||
idx.ConstraintUsage[i] = sqlite3.IndexConstraintUsage{
|
||||
Omit: true,
|
||||
ArgvIndex: 1,
|
||||
}
|
||||
idx.EstimatedCost = 1e6
|
||||
idx.EstimatedRows = 100
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return sqlite3.CONSTRAINT
|
||||
}
|
||||
|
||||
func (l lines) Open() (sqlite3.VTabCursor, error) {
|
||||
return &cursor{reader: bool(l)}, nil
|
||||
}
|
||||
|
||||
type cursor struct {
|
||||
reader bool
|
||||
scanner *bufio.Scanner
|
||||
closer io.Closer
|
||||
rowID int64
|
||||
eof bool
|
||||
}
|
||||
|
||||
func (c *cursor) Close() (err error) {
|
||||
if c.closer != nil {
|
||||
err = c.closer.Close()
|
||||
c.closer = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *cursor) EOF() bool {
|
||||
return c.eof
|
||||
}
|
||||
|
||||
func (c *cursor) Next() error {
|
||||
c.rowID++
|
||||
c.eof = !c.scanner.Scan()
|
||||
return c.scanner.Err()
|
||||
}
|
||||
|
||||
func (c *cursor) RowID() (int64, error) {
|
||||
return c.rowID, nil
|
||||
}
|
||||
|
||||
func (c *cursor) Column(ctx *sqlite3.Context, n int) error {
|
||||
if n == 0 {
|
||||
ctx.ResultRawText(c.scanner.Bytes())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cursor) Filter(idxNum int, idxStr string, arg ...sqlite3.Value) error {
|
||||
if err := c.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var r io.Reader
|
||||
data := arg[0]
|
||||
if c.reader {
|
||||
if data.Type() == sqlite3.NULL {
|
||||
if p, ok := data.Pointer().(io.ReaderAt); ok {
|
||||
r = io.NewSectionReader(p, 0, math.MaxInt64)
|
||||
}
|
||||
} else {
|
||||
f, err := os.Open(data.Text())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.closer = f
|
||||
r = f
|
||||
}
|
||||
} else if data.Type() != sqlite3.NULL {
|
||||
r = bytes.NewReader(data.RawBlob())
|
||||
}
|
||||
|
||||
if r == nil {
|
||||
return fmt.Errorf("lines: unsupported argument:%.0w %v", sqlite3.MISMATCH, data.Type())
|
||||
}
|
||||
c.scanner = bufio.NewScanner(r)
|
||||
c.rowID = 0
|
||||
return c.Next()
|
||||
}
|
||||
178
ext/lines/lines_test.go
Normal file
178
ext/lines/lines_test.go
Normal file
@@ -0,0 +1,178 @@
|
||||
package lines_test
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/ncruces/go-sqlite3"
|
||||
"github.com/ncruces/go-sqlite3/driver"
|
||||
_ "github.com/ncruces/go-sqlite3/embed"
|
||||
"github.com/ncruces/go-sqlite3/ext/lines"
|
||||
)
|
||||
|
||||
func Example() {
|
||||
db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error {
|
||||
lines.Register(c)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// https://storage.googleapis.com/quickdraw_dataset/full/simplified/calendar.ndjson
|
||||
f, err := os.Open("calendar.ndjson")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
rows, err := db.Query(`
|
||||
SELECT
|
||||
line ->> '$.countrycode' as countrycode,
|
||||
COUNT(*)
|
||||
FROM lines_read(?)
|
||||
GROUP BY 1
|
||||
ORDER BY 2 DESC
|
||||
LIMIT 5`,
|
||||
sqlite3.Pointer(f))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var countrycode sql.RawBytes
|
||||
var count int
|
||||
for rows.Next() {
|
||||
err := rows.Scan(&countrycode, &count)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Printf("%s: %d\n", countrycode, count)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// Sample output:
|
||||
// US: 141001
|
||||
// GB: 22560
|
||||
// CA: 11759
|
||||
// RU: 9250
|
||||
// DE: 8748
|
||||
}
|
||||
|
||||
func Test_lines(t *testing.T) {
|
||||
db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error {
|
||||
lines.Register(c)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
const data = "line 1\nline 2\nline 3"
|
||||
|
||||
rows, err := db.Query(`SELECT rowid, line FROM lines(?)`, data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var id int64
|
||||
var line string
|
||||
err := rows.Scan(&id, &line)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Test_lines_error(t *testing.T) {
|
||||
db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error {
|
||||
lines.Register(c)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
_, err = db.Exec(`SELECT rowid, line FROM lines(?)`, nil)
|
||||
if err == nil {
|
||||
t.Fatal("want error")
|
||||
} else {
|
||||
t.Log(err)
|
||||
}
|
||||
|
||||
_, err = db.Exec(`SELECT rowid, line FROM lines_read(?)`, "xpto")
|
||||
if err == nil {
|
||||
t.Fatal("want error")
|
||||
} else {
|
||||
t.Log(err)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_lines_read(t *testing.T) {
|
||||
db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error {
|
||||
lines.Register(c)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
const data = "line 1\nline 2\nline 3"
|
||||
|
||||
rows, err := db.Query(`SELECT rowid, line FROM lines_read(?)`,
|
||||
sqlite3.Pointer(strings.NewReader(data)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var id int64
|
||||
var line string
|
||||
err := rows.Scan(&id, &line)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Test_lines_test(t *testing.T) {
|
||||
db, err := driver.Open(":memory:", func(c *sqlite3.Conn) error {
|
||||
lines.Register(c)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
rows, err := db.Query(`SELECT rowid, line FROM lines_read(?)`, "lines_test.go")
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
t.Skip(err)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var id int64
|
||||
var line string
|
||||
err := rows.Scan(&id, &line)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user