6. 事务

Kesa...大约 4 分钟golang

day6-transactionopen in new window

1. 事务(Transaction) 和 ACID

数据库事务(transaction)是访问并可能操作各种数据项的一个数据库操作序列,这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。事务由事务开始与事务结束之间执行的全部数据库操作组成。

例如:A 转账给 B 一万元,那么数据库至少需要执行 2 个操作:

  • 1)A 的账户减掉一万元。
  • 2)B 的账户增加一万元。

这两个操作要么全部执行,代表转账成功。任意一个操作失败了,之前的操作都必须回退,代表转账失败。一个操作完成,另一个操作失败,这种结果是不能够接受的。这种场景就非常适合利用数据库事务的特性来解决。

数据支持事务,则需要具备 ACID 属性:

  1. 原子性(Atomicity):事务中的全部操作在数据库中是不可分割的,要么全部完成,要么全部不执行。
  2. 一致性(Consistency): 几个并行执行的事务,其执行结果必须与按某一顺序 串行执行的结果相一致。
  3. 隔离性(Isolation):事务的执行不受其他事务的干扰,事务执行的中间结果对其他事务必须是透明的。
  4. 持久性(Durability):对于任意已提交事务,系统必须保证该事务对数据库的改变不被丢失,即使数据库出现故障。

2. SQLite 和 Golang 标准库 的事务

2.1 SQLite

sqlite> BEGIN;
sqlite> DELETE FROM User WHERE Age > 25;
sqlite> INSERT INTO User VALUES ("Tom", 25), ("Jack", 18);
sqlite> COMMIT;

SQLite 中:

  • BEGIN:开启事务
  • COMMIT:提交事务
  • ROLLBACK:回滚事务

2.2 Golang 标准库

package main

import (
	"database/sql"
	_ "github.com/mattn/go-sqlite3"
	"log"
)

func main() {
	db, _ := sql.Open("sqlite3", "gee.db")
	defer func() {
		_ = db.Close()
	}()

	tx, _ := db.Begin()
	_, err1 := tx.Exec("INSERT INTO User(`Name`) VALUES (?)", "A")
	_, err2 := tx.Exec("INSERT INTO User(`Name`) VALUES (?)", "B")

	if err1 != nil || err2 != nil {
		_ = tx.Rollback()
		log.Println("Rollback", err1, err2)
	} else {
		_ = tx.Commit()
		log.Println("Commit")
	}
}
  • db.Begin:返回*sql.Tx
  • tx.Exec:执行操作
  • tx.Rollback:回滚
  • tx.Commit:提交

3. 增加事务支持

当前的数据库操作均使用sql.DB执行,会自动提交。若要支持事务,则需要改为sql.Tx来执行。

type Session struct {
	db       *sql.DB
	dialect  dialect.Dialect
	tx       *sql.Tx
	refTable *schema.Schema
	clause   clause.Clause
	sql      strings.Builder
	sqlVars  []any
}

var (
	_ CommonDB = (*sql.DB)(nil)
	_ CommonDB = (*sql.Tx)(nil)
)

// CommonDB is a minimal function set of db
type CommonDB interface {
	Query(query string, args ...any) (*sql.Rows, error)
	QueryRow(query string, args ...interface{}) *sql.Row
	Exec(query string, args ...interface{}) (sql.Result, error)
}

func (s *Session) DB() CommonDB {
	if s.tx != nil {
		return s.tx
	}
	return s.db
}
  • Session:新增字段tx *sql.Tx
  • sql.DBsql.Tx共有方法抽象为接口 CommonDB
  • s.DB:若开启事务则返回s.tx,否则返回s.db

session/transaction.go实现事务相关代码:

package session

import "geeorm/log"

func (s *Session) Begin() error {
	log.Info("transaction begin")
	var err error
	if s.tx, err = s.db.Begin(); err != nil {
		log.Error(err)
		return err
	}
	return nil
}

func (s *Session) Commit() error {
	log.Info("transaction commit")
	if err := s.tx.Commit(); err != nil {
		log.Error(err)
		return err
	}
	return nil
}

func (s *Session) Rollback() error {
	log.Info("transaction rollback")
	if err := s.tx.Rollback(); err != nil {
		log.Error(err)
		return err
	}
	return nil
}

3.1 自动提交/回滚接口

用户只需将操作放入函数TxFunc即可。

type TxFunc func(s *session.Session) (any, error)

func (engine *Engine) Transaction(f TxFunc) (res any, err error) {
	s := engine.NewSession()
	if err = s.Begin(); err != nil {
		return nil, err
	}
	defer func() {
		if p := recover(); p != nil {
			_ = s.Rollback()
			panic(p) // re-throw panic after rollback
		} else if err != nil {
			_ = s.Rollback() // err is non-nil
		} else {
			err = s.Commit() // err is nil; if Commit returns error, update err
		}
	}()

	return f(s)
}
  1. 获取 Session
  2. 开启事务,出现错误则返回
  3. 执行事务
  4. 若出现 panic,则回滚并将 panic 重新抛出,交由调用方处理
  5. 若执行事务出现错误,则回滚
  6. 提交事务,提交出现错误则更新错误

4. 单元测试

package geeorm

import (
	"errors"
	"geeorm/session"
	_ "github.com/mattn/go-sqlite3"
	"testing"
)

func openDB(t *testing.T) *Engine {
	t.Helper()
	engine, err := NewEngine("sqlite3", "gee.db")
	if err != nil {
		t.Fatal("failed to connect database")
	}
	return engine
}

func TestNewEngine(t *testing.T) {
	engine := openDB(t)
	defer engine.Close()
}

type User struct {
	Name string `geeorm:"RPIMARY KEY"`
	Age  int
}

func TestEngine_Transaction(t *testing.T) {
	tests := []struct {
		name string
		f    func(t *testing.T)
	}{
		{name: "Rollback", f: testRollback},
		{name: "Commit", f: testCommit},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			tt.f(t)
		})
	}
}

func testRollback(t *testing.T) {
	engine := openDB(t)
	defer engine.Close()
	s := engine.NewSession()
	_ = s.Model(&User{}).DropTable()
	_, err := engine.Transaction(func(s *session.Session) (any, error) {
		_ = s.Model(&User{}).CreateTable()
		_, _ = s.Insert(&User{"Tom", 18})
		return nil, errors.New("inert error")
	})

	if err == nil || s.HasTable() {
		t.Fatal("failed to rollback")
	}
}

func testCommit(t *testing.T) {
	engine := openDB(t)
	defer engine.Close()
	s := engine.NewSession()
	_ = s.Model(&User{}).DropTable()
	_, err := engine.Transaction(func(s *session.Session) (any, error) {
		_ = s.Model(&User{}).CreateTable()
		_, err := s.Insert(&User{"Tom", 18})
		return nil, err
	})

	u := &User{}
	_ = s.First(u)

	if err != nil || u.Name != "Tom" {
		t.Fatal("failed to commit")
	}
}

testRollback中故意返回error

Reference

  1. https://geektutu.com/post/geeorm-day6.htmlopen in new window
上次编辑于:
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2