6. 事务
...大约 4 分钟
1. 事务(Transaction) 和 ACID
数据库事务(transaction)是访问并可能操作各种数据项的一个数据库操作序列,这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。事务由事务开始与事务结束之间执行的全部数据库操作组成。
例如:A 转账给 B 一万元,那么数据库至少需要执行 2 个操作:
- 1)A 的账户减掉一万元。
- 2)B 的账户增加一万元。
这两个操作要么全部执行,代表转账成功。任意一个操作失败了,之前的操作都必须回退,代表转账失败。一个操作完成,另一个操作失败,这种结果是不能够接受的。这种场景就非常适合利用数据库事务的特性来解决。
数据支持事务,则需要具备 ACID 属性:
- 原子性(Atomicity):事务中的全部操作在数据库中是不可分割的,要么全部完成,要么全部不执行。
- 一致性(Consistency): 几个并行执行的事务,其执行结果必须与按某一顺序 串行执行的结果相一致。
- 隔离性(Isolation):事务的执行不受其他事务的干扰,事务执行的中间结果对其他事务必须是透明的。
- 持久性(Durability):对于任意已提交事务,系统必须保证该事务对数据库的改变不被丢失,即使数据库出现故障。
2. SQLite 和 Golang 标准库 的事务
2.1 SQLite
sqlite> BEGIN;
sqlite> DELETE FROM User WHERE Age > 25;
sqlite> INSERT INTO User VALUES ("Tom", 25), ("Jack", 18);
sqlite> COMMIT;
SQLite 中:
BEGIN
:开启事务COMMIT
:提交事务ROLLBACK
:回滚事务
2.2 Golang 标准库
package main
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
"log"
)
func main() {
db, _ := sql.Open("sqlite3", "gee.db")
defer func() {
_ = db.Close()
}()
tx, _ := db.Begin()
_, err1 := tx.Exec("INSERT INTO User(`Name`) VALUES (?)", "A")
_, err2 := tx.Exec("INSERT INTO User(`Name`) VALUES (?)", "B")
if err1 != nil || err2 != nil {
_ = tx.Rollback()
log.Println("Rollback", err1, err2)
} else {
_ = tx.Commit()
log.Println("Commit")
}
}
db.Begin
:返回*sql.Tx
tx.Exec
:执行操作tx.Rollback
:回滚tx.Commit
:提交
3. 增加事务支持
当前的数据库操作均使用sql.DB
执行,会自动提交。若要支持事务,则需要改为sql.Tx
来执行。
type Session struct {
db *sql.DB
dialect dialect.Dialect
tx *sql.Tx
refTable *schema.Schema
clause clause.Clause
sql strings.Builder
sqlVars []any
}
var (
_ CommonDB = (*sql.DB)(nil)
_ CommonDB = (*sql.Tx)(nil)
)
// CommonDB is a minimal function set of db
type CommonDB interface {
Query(query string, args ...any) (*sql.Rows, error)
QueryRow(query string, args ...interface{}) *sql.Row
Exec(query string, args ...interface{}) (sql.Result, error)
}
func (s *Session) DB() CommonDB {
if s.tx != nil {
return s.tx
}
return s.db
}
Session
:新增字段tx *sql.Tx
- 将
sql.DB
和sql.Tx
共有方法抽象为接口CommonDB
s.DB
:若开启事务则返回s.tx
,否则返回s.db
session/transaction.go
实现事务相关代码:
package session
import "geeorm/log"
func (s *Session) Begin() error {
log.Info("transaction begin")
var err error
if s.tx, err = s.db.Begin(); err != nil {
log.Error(err)
return err
}
return nil
}
func (s *Session) Commit() error {
log.Info("transaction commit")
if err := s.tx.Commit(); err != nil {
log.Error(err)
return err
}
return nil
}
func (s *Session) Rollback() error {
log.Info("transaction rollback")
if err := s.tx.Rollback(); err != nil {
log.Error(err)
return err
}
return nil
}
3.1 自动提交/回滚接口
用户只需将操作放入函数TxFunc
即可。
type TxFunc func(s *session.Session) (any, error)
func (engine *Engine) Transaction(f TxFunc) (res any, err error) {
s := engine.NewSession()
if err = s.Begin(); err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
_ = s.Rollback()
panic(p) // re-throw panic after rollback
} else if err != nil {
_ = s.Rollback() // err is non-nil
} else {
err = s.Commit() // err is nil; if Commit returns error, update err
}
}()
return f(s)
}
- 获取 Session
- 开启事务,出现错误则返回
- 执行事务
- 若出现
panic
,则回滚并将 panic 重新抛出,交由调用方处理 - 若执行事务出现错误,则回滚
- 提交事务,提交出现错误则更新错误
4. 单元测试
package geeorm
import (
"errors"
"geeorm/session"
_ "github.com/mattn/go-sqlite3"
"testing"
)
func openDB(t *testing.T) *Engine {
t.Helper()
engine, err := NewEngine("sqlite3", "gee.db")
if err != nil {
t.Fatal("failed to connect database")
}
return engine
}
func TestNewEngine(t *testing.T) {
engine := openDB(t)
defer engine.Close()
}
type User struct {
Name string `geeorm:"RPIMARY KEY"`
Age int
}
func TestEngine_Transaction(t *testing.T) {
tests := []struct {
name string
f func(t *testing.T)
}{
{name: "Rollback", f: testRollback},
{name: "Commit", f: testCommit},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.f(t)
})
}
}
func testRollback(t *testing.T) {
engine := openDB(t)
defer engine.Close()
s := engine.NewSession()
_ = s.Model(&User{}).DropTable()
_, err := engine.Transaction(func(s *session.Session) (any, error) {
_ = s.Model(&User{}).CreateTable()
_, _ = s.Insert(&User{"Tom", 18})
return nil, errors.New("inert error")
})
if err == nil || s.HasTable() {
t.Fatal("failed to rollback")
}
}
func testCommit(t *testing.T) {
engine := openDB(t)
defer engine.Close()
s := engine.NewSession()
_ = s.Model(&User{}).DropTable()
_, err := engine.Transaction(func(s *session.Session) (any, error) {
_ = s.Model(&User{}).CreateTable()
_, err := s.Insert(&User{"Tom", 18})
return nil, err
})
u := &User{}
_ = s.First(u)
if err != nil || u.Name != "Tom" {
t.Fatal("failed to commit")
}
}
testRollback
中故意返回error
。
Reference
Powered by Waline v2.15.2