Files
sqlite3/ext/lines/lines.go

234 lines
4.9 KiB
Go
Raw Normal View History

2023-12-14 20:36:07 +00:00
// Package lines provides a virtual table to read data line-by-line.
//
// It is particularly useful for line-oriented datasets,
// like [ndjson] or [JSON Lines],
// when paired with SQLite's JSON support.
//
// https://github.com/asg017/sqlite-lines
//
// [ndjson]: https://ndjson.org/
// [JSON Lines]: https://jsonlines.org/
2023-11-23 15:32:28 +00:00
package lines
import (
"bufio"
"bytes"
2024-07-08 12:06:57 +01:00
"errors"
2023-11-23 15:32:28 +00:00
"fmt"
"io"
2023-12-19 15:24:54 +00:00
"io/fs"
2023-11-23 15:32:28 +00:00
"github.com/ncruces/go-sqlite3"
2024-01-03 12:47:49 +00:00
"github.com/ncruces/go-sqlite3/util/osutil"
2023-11-23 15:32:28 +00:00
)
2024-01-03 00:54:30 +00:00
// Register registers the lines and lines_read table-valued functions.
// The lines function reads from a database blob or text.
// The lines_read function reads from a file or an [io.Reader].
2023-12-19 15:24:54 +00:00
// If a filename is specified, [os.Open] is used to open the file.
2024-07-08 12:06:57 +01:00
func Register(db *sqlite3.Conn) error {
return RegisterFS(db, osutil.FS{})
2023-12-19 15:24:54 +00:00
}
2024-01-03 00:54:30 +00:00
// RegisterFS registers the lines and lines_read table-valued functions.
// The lines function reads from a database blob or text.
// The lines_read function reads from a file or an [io.Reader].
2023-12-19 15:24:54 +00:00
// If a filename is specified, fsys is used to open the file.
2024-07-08 12:06:57 +01:00
func RegisterFS(db *sqlite3.Conn, fsys fs.FS) error {
return errors.Join(
sqlite3.CreateModule(db, "lines", nil,
func(db *sqlite3.Conn, _, _, _ string, _ ...string) (lines, error) {
2025-01-05 19:32:42 +00:00
err := db.DeclareVTab(`CREATE TABLE x(line TEXT, data HIDDEN, delim HIDDEN)`)
2024-10-07 13:22:31 +01:00
if err == nil {
err = db.VTabConfig(sqlite3.VTAB_INNOCUOUS)
}
2024-07-08 12:06:57 +01:00
return lines{}, err
}),
sqlite3.CreateModule(db, "lines_read", nil,
func(db *sqlite3.Conn, _, _, _ string, _ ...string) (lines, error) {
2025-01-05 19:32:42 +00:00
err := db.DeclareVTab(`CREATE TABLE x(line TEXT, data HIDDEN, delim HIDDEN)`)
2024-10-07 13:22:31 +01:00
if err == nil {
err = db.VTabConfig(sqlite3.VTAB_DIRECTONLY)
}
2024-07-08 12:06:57 +01:00
return lines{fsys}, err
}))
2023-11-23 15:32:28 +00:00
}
2023-12-19 15:24:54 +00:00
type lines struct {
fsys fs.FS
}
2023-11-23 15:32:28 +00:00
2025-01-05 19:32:42 +00:00
func (l lines) BestIndex(idx *sqlite3.IndexInfo) (err error) {
err = sqlite3.CONSTRAINT
2023-11-23 15:32:28 +00:00
for i, cst := range idx.Constraint {
2025-01-05 19:32:42 +00:00
if !cst.Usable || cst.Op != sqlite3.INDEX_CONSTRAINT_EQ {
continue
}
switch cst.Column {
case 1:
2023-11-23 15:32:28 +00:00
idx.ConstraintUsage[i] = sqlite3.IndexConstraintUsage{
Omit: true,
ArgvIndex: 1,
}
idx.EstimatedCost = 1e6
idx.EstimatedRows = 100
2025-01-05 19:32:42 +00:00
err = nil
case 2:
idx.ConstraintUsage[i] = sqlite3.IndexConstraintUsage{
Omit: true,
ArgvIndex: 2,
}
2023-11-23 15:32:28 +00:00
}
}
2025-01-05 19:32:42 +00:00
return err
2023-11-23 15:32:28 +00:00
}
func (l lines) Open() (sqlite3.VTabCursor, error) {
2023-12-19 15:24:54 +00:00
if l.fsys != nil {
return &reader{fsys: l.fsys}, nil
2023-12-19 00:13:51 +00:00
} else {
return &buffer{}, nil
}
2023-11-23 15:32:28 +00:00
}
type cursor struct {
2023-12-19 00:13:51 +00:00
line []byte
rowID int64
eof bool
2025-01-05 19:32:42 +00:00
delim byte
2023-11-23 15:32:28 +00:00
}
func (c *cursor) EOF() bool {
return c.eof
}
func (c *cursor) RowID() (int64, error) {
return c.rowID, nil
}
2024-07-26 12:25:15 +01:00
func (c *cursor) Column(ctx sqlite3.Context, n int) error {
2023-11-23 15:32:28 +00:00
if n == 0 {
2023-12-19 00:13:51 +00:00
ctx.ResultRawText(c.line)
2023-11-23 15:32:28 +00:00
}
return nil
}
2023-12-19 00:13:51 +00:00
type reader struct {
2023-12-19 15:24:54 +00:00
fsys fs.FS
2023-12-19 00:13:51 +00:00
reader *bufio.Reader
closer io.Closer
cursor
}
func (c *reader) Close() (err error) {
if c.closer != nil {
err = c.closer.Close()
c.closer = nil
}
return err
}
func (c *reader) Filter(idxNum int, idxStr string, arg ...sqlite3.Value) error {
2023-11-23 15:32:28 +00:00
if err := c.Close(); err != nil {
return err
}
var r io.Reader
2023-12-19 00:13:51 +00:00
typ := arg[0].Type()
switch typ {
case sqlite3.NULL:
if p, ok := arg[0].Pointer().(io.Reader); ok {
r = p
2023-11-23 15:32:28 +00:00
}
2023-12-19 00:13:51 +00:00
case sqlite3.TEXT:
2023-12-19 15:24:54 +00:00
f, err := c.fsys.Open(arg[0].Text())
2023-12-19 00:13:51 +00:00
if err != nil {
return err
2023-11-27 14:57:04 +00:00
}
2023-12-19 00:13:51 +00:00
r = f
2023-11-23 15:32:28 +00:00
}
if r == nil {
2023-11-27 14:57:04 +00:00
return fmt.Errorf("lines: unsupported argument:%.0w %v", sqlite3.MISMATCH, typ)
2023-11-23 15:32:28 +00:00
}
2023-12-19 00:13:51 +00:00
2025-01-05 19:32:42 +00:00
c.delim = '\n'
if len(arg) > 1 {
b := arg[1].RawText()
if len(b) != 1 {
return fmt.Errorf("lines: delimiter must be a single byte%.0w", sqlite3.MISMATCH)
}
c.delim = b[0]
}
2023-12-19 00:13:51 +00:00
c.reader = bufio.NewReader(r)
c.closer, _ = r.(io.Closer)
2023-11-23 15:32:28 +00:00
c.rowID = 0
return c.Next()
}
2023-12-19 00:13:51 +00:00
func (c *reader) Next() (err error) {
c.line = c.line[:0]
for more := true; more; {
var line []byte
2025-01-05 19:32:42 +00:00
if c.delim == '\n' {
line, more, err = c.reader.ReadLine()
} else {
line, err = c.reader.ReadSlice(c.delim)
more = err == bufio.ErrBufferFull
}
2023-12-19 00:13:51 +00:00
c.line = append(c.line, line...)
}
if err == io.EOF {
c.eof = true
err = nil
}
c.rowID++
return err
}
type buffer struct {
data []byte
cursor
}
func (c *buffer) Filter(idxNum int, idxStr string, arg ...sqlite3.Value) error {
typ := arg[0].Type()
switch typ {
case sqlite3.TEXT:
c.data = arg[0].RawText()
case sqlite3.BLOB:
c.data = arg[0].RawBlob()
default:
return fmt.Errorf("lines: unsupported argument:%.0w %v", sqlite3.MISMATCH, typ)
}
2025-01-05 19:32:42 +00:00
c.delim = '\n'
if len(arg) > 1 {
b := arg[1].RawText()
if len(b) != 1 {
return fmt.Errorf("lines: delimiter must be a single byte%.0w", sqlite3.MISMATCH)
}
c.delim = b[0]
}
2023-12-19 00:13:51 +00:00
c.rowID = 0
return c.Next()
}
func (c *buffer) Next() error {
2025-01-05 19:32:42 +00:00
i := bytes.IndexByte(c.data, c.delim)
2023-12-19 00:13:51 +00:00
j := i + 1
switch {
case i < 0:
i = len(c.data)
j = i
2025-01-05 19:32:42 +00:00
case i > 0 && c.delim == '\n' && c.data[i-1] == '\r':
2023-12-19 00:13:51 +00:00
i--
}
c.eof = len(c.data) == 0
c.line = c.data[:i]
c.data = c.data[j:]
c.rowID++
return nil
}