Concurrent transactions.

This commit is contained in:
Nuno Cruces
2024-07-24 20:03:23 +01:00
parent 19639be9f9
commit 64b77f1a79
3 changed files with 56 additions and 33 deletions

View File

@@ -119,10 +119,8 @@ func (d *SQLite) newConnector(name string) (*connector, error) {
}
switch txlock {
case "":
c.txBegin = "BEGIN"
case "deferred", "immediate", "exclusive":
c.txBegin = "BEGIN " + txlock
case "", "deferred", "concurrent", "immediate", "exclusive":
c.txLock = txlock
default:
return nil, fmt.Errorf("sqlite3: invalid _txlock: %s", txlock)
}
@@ -147,7 +145,7 @@ func (d *SQLite) newConnector(name string) (*connector, error) {
type connector struct {
driver *SQLite
name string
txBegin string
txLock string
tmRead sqlite3.TimeFormat
tmWrite sqlite3.TimeFormat
pragmas bool
@@ -159,7 +157,7 @@ func (n *connector) Driver() driver.Driver {
func (n *connector) Connect(ctx context.Context) (_ driver.Conn, err error) {
c := &conn{
txBegin: n.txBegin,
txLock: n.txLock,
tmRead: n.tmRead,
tmWrite: n.tmWrite,
}
@@ -209,12 +207,11 @@ func (n *connector) Connect(ctx context.Context) (_ driver.Conn, err error) {
type conn struct {
*sqlite3.Conn
txBegin string
txCommit string
txRollback string
tmRead sqlite3.TimeFormat
tmWrite sqlite3.TimeFormat
readOnly byte
txLock string
txReset string
tmRead sqlite3.TimeFormat
tmWrite sqlite3.TimeFormat
readOnly byte
}
var (
@@ -236,27 +233,25 @@ func (c *conn) Begin() (driver.Tx, error) {
}
func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
txBegin := c.txBegin
c.txCommit = `COMMIT`
c.txRollback = `ROLLBACK`
if opts.ReadOnly {
txBegin = `
BEGIN deferred;
PRAGMA query_only=on`
c.txRollback = `
ROLLBACK;
PRAGMA query_only=` + string(c.readOnly)
c.txCommit = c.txRollback
}
var txLock string
switch opts.Isolation {
default:
return nil, util.IsolationErr
case
driver.IsolationLevel(sql.LevelDefault),
driver.IsolationLevel(sql.LevelSerializable):
break
case driver.IsolationLevel(sql.LevelLinearizable):
txLock = "exclusive"
case driver.IsolationLevel(sql.LevelSerializable):
txLock = "immediate"
case driver.IsolationLevel(sql.LevelDefault):
if !opts.ReadOnly {
txLock = c.txLock
}
}
c.txReset = ``
txBegin := `BEGIN ` + txLock
if opts.ReadOnly {
txBegin += ` ; PRAGMA query_only=on`
c.txReset = `; PRAGMA query_only=` + string(c.readOnly)
}
old := c.Conn.SetInterrupt(ctx)
@@ -270,7 +265,7 @@ func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, e
}
func (c *conn) Commit() error {
err := c.Conn.Exec(c.txCommit)
err := c.Conn.Exec(`COMMIT` + c.txReset)
if err != nil && !c.Conn.GetAutocommit() {
c.Rollback()
}
@@ -278,11 +273,11 @@ func (c *conn) Commit() error {
}
func (c *conn) Rollback() error {
err := c.Conn.Exec(c.txRollback)
err := c.Conn.Exec(`ROLLBACK` + c.txReset)
if errors.Is(err, sqlite3.INTERRUPT) {
old := c.Conn.SetInterrupt(context.Background())
defer c.Conn.SetInterrupt(old)
err = c.Conn.Exec(c.txRollback)
err = c.Conn.Exec(`ROLLBACK` + c.txReset)
}
return err
}

View File

@@ -339,6 +339,21 @@ func TestConn_Transaction_rollback(t *testing.T) {
}
}
func TestConn_Transaction_concurrent(t *testing.T) {
t.Parallel()
db, err := sqlite3.Open(":memory:")
if err != nil {
t.Fatal(err)
}
defer db.Close()
_, err = db.BeginConcurrent()
if !errors.Is(err, sqlite3.ERROR) {
t.Errorf("got %v, want sqlite3.ERROR", err)
}
}
func TestConn_Savepoint_exec(t *testing.T) {
t.Parallel()

13
txn.go
View File

@@ -32,6 +32,19 @@ func (c *Conn) Begin() Txn {
return Txn{c}
}
// BeginConcurrent starts a concurrent transaction.
//
// Experimental: requires a custom build of SQLite.
//
// https://sqlite.org/cgi/src/doc/begin-concurrent/doc/begin_concurrent.md
func (c *Conn) BeginConcurrent() (Txn, error) {
err := c.Exec(`BEGIN CONCURRENT`)
if err != nil {
return Txn{}, err
}
return Txn{c}, nil
}
// BeginImmediate starts an immediate transaction.
//
// https://sqlite.org/lang_transaction.html