Browse Source
Merge pull request #168 from Leizhengzi/main
fix: dtmcli ci lint error
pull/173/head
yedf2
4 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with
30 additions and
20 deletions
-
dtmcli/barrier.go
-
dtmcli/dtmimp/trans_xa_base.go
-
dtmcli/dtmimp/types.go
-
dtmcli/dtmimp/types_test.go
-
dtmcli/dtmimp/vars.go
-
dtmcli/logger/log.go
-
dtmcli/logger/logger_test.go
-
dtmcli/msg.go
-
dtmcli/types.go
-
dtmcli/types_test.go
-
dtmcli/xa.go
-
go.mod
|
|
|
@ -45,7 +45,7 @@ func BarrierFrom(transType, gid, branchID, op string) (*BranchBarrier, error) { |
|
|
|
Op: op, |
|
|
|
} |
|
|
|
if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.Op == "" { |
|
|
|
return nil, fmt.Errorf("invlid trans info: %v", ti) |
|
|
|
return nil, fmt.Errorf("invalid trans info: %v", ti) |
|
|
|
} |
|
|
|
return ti, nil |
|
|
|
} |
|
|
|
@ -67,10 +67,10 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) |
|
|
|
defer func() { |
|
|
|
// Logf("barrier call error is %v", rerr)
|
|
|
|
if x := recover(); x != nil { |
|
|
|
tx.Rollback() |
|
|
|
_ = tx.Rollback() |
|
|
|
panic(x) |
|
|
|
} else if rerr != nil { |
|
|
|
tx.Rollback() |
|
|
|
_ = tx.Rollback() |
|
|
|
} else { |
|
|
|
rerr = tx.Commit() |
|
|
|
} |
|
|
|
@ -101,6 +101,7 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error |
|
|
|
return bb.Call(tx, busiCall) |
|
|
|
} |
|
|
|
|
|
|
|
// QueryPrepared queries prepared data
|
|
|
|
func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error { |
|
|
|
_, err := insertBarrier(db, bb.TransType, bb.Gid, "00", "msg", "01", "rollback") |
|
|
|
var reason string |
|
|
|
|
|
|
|
@ -24,7 +24,9 @@ func (xc *XaClientBase) HandleCallback(gid string, branchID string, action strin |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
defer db.Close() |
|
|
|
defer func() { |
|
|
|
_ = db.Close() |
|
|
|
}() |
|
|
|
xaID := gid + "-" + branchID |
|
|
|
_, err = DBExec(db, GetDBSpecial().GetXaSQL(action, xaID)) |
|
|
|
if err != nil && |
|
|
|
@ -41,7 +43,7 @@ func (xc *XaClientBase) HandleLocalTrans(xa *TransBase, cb func(*sql.DB) error) |
|
|
|
if rerr != nil { |
|
|
|
return |
|
|
|
} |
|
|
|
defer func() { db.Close() }() |
|
|
|
defer func() { _ = db.Close() }() |
|
|
|
defer func() { |
|
|
|
x := recover() |
|
|
|
_, err := DBExec(db, GetDBSpecial().GetXaSQL("end", xaBranch)) |
|
|
|
|
|
|
|
@ -14,6 +14,7 @@ type DB interface { |
|
|
|
QueryRow(query string, args ...interface{}) *sql.Row |
|
|
|
} |
|
|
|
|
|
|
|
// DBConf defines db config
|
|
|
|
type DBConf struct { |
|
|
|
Driver string `yaml:"Driver"` |
|
|
|
Host string `yaml:"Host"` |
|
|
|
|
|
|
|
@ -22,4 +22,5 @@ func TestTypes(t *testing.T) { |
|
|
|
idGen := BranchIDGen{subBranchID: 99} |
|
|
|
idGen.NewSubBranchID() |
|
|
|
}) |
|
|
|
assert.Error(t, err) |
|
|
|
} |
|
|
|
|
|
|
|
@ -19,8 +19,8 @@ var ErrFailure = errors.New("FAILURE") |
|
|
|
// ErrOngoing error of ONGOING
|
|
|
|
var ErrOngoing = errors.New("ONGOING") |
|
|
|
|
|
|
|
// XaSqlTimeoutMs milliseconds for Xa sql to timeout
|
|
|
|
var XaSqlTimeoutMs = 15000 |
|
|
|
// XaSQLTimeoutMs milliseconds for Xa sql to timeout
|
|
|
|
var XaSQLTimeoutMs = 15000 |
|
|
|
|
|
|
|
// MapSuccess HTTP result of SUCCESS
|
|
|
|
var MapSuccess = map[string]interface{}{"dtm_result": ResultSuccess} |
|
|
|
|
|
|
|
@ -10,7 +10,7 @@ import ( |
|
|
|
|
|
|
|
//var logger *zap.SugaredLogger = nil
|
|
|
|
|
|
|
|
var logger Logger = nil |
|
|
|
var logger Logger |
|
|
|
|
|
|
|
func init() { |
|
|
|
InitLog(os.Getenv("LOG_LEVEL")) |
|
|
|
@ -24,6 +24,7 @@ type Logger interface { |
|
|
|
Errorf(format string, args ...interface{}) |
|
|
|
} |
|
|
|
|
|
|
|
// WithLogger replaces default logger
|
|
|
|
func WithLogger(log Logger) { |
|
|
|
logger = log |
|
|
|
} |
|
|
|
|
|
|
|
@ -1,9 +1,10 @@ |
|
|
|
package logger |
|
|
|
|
|
|
|
import ( |
|
|
|
"go.uber.org/zap" |
|
|
|
"os" |
|
|
|
"testing" |
|
|
|
|
|
|
|
"go.uber.org/zap" |
|
|
|
) |
|
|
|
|
|
|
|
func TestInitLog(t *testing.T) { |
|
|
|
@ -26,4 +27,4 @@ func TestWithLogger(t *testing.T) { |
|
|
|
Errorf("a error msg") |
|
|
|
FatalfIf(false, "nothing") |
|
|
|
FatalIfError(nil) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -40,6 +40,7 @@ func (s *Msg) Submit() error { |
|
|
|
return dtmimp.TransCallDtm(&s.TransBase, s, "submit") |
|
|
|
} |
|
|
|
|
|
|
|
// PrepareAndSubmit execs prepare and submit operation
|
|
|
|
func (s *Msg) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall BarrierBusiFunc) error { |
|
|
|
bb, err := BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared
|
|
|
|
if err == nil { |
|
|
|
|
|
|
|
@ -29,6 +29,7 @@ type DB = dtmimp.DB |
|
|
|
// TransOptions transaction option
|
|
|
|
type TransOptions = dtmimp.TransOptions |
|
|
|
|
|
|
|
// DBConf declares db configuration
|
|
|
|
type DBConf = dtmimp.DBConf |
|
|
|
|
|
|
|
// SetCurrentDBType set currentDBType
|
|
|
|
@ -41,16 +42,17 @@ func GetCurrentDBType() string { |
|
|
|
return dtmimp.GetCurrentDBType() |
|
|
|
} |
|
|
|
|
|
|
|
// SetXaSqlTimeoutMs set XaSqlTimeoutMs
|
|
|
|
func SetXaSqlTimeoutMs(ms int) { |
|
|
|
dtmimp.XaSqlTimeoutMs = ms |
|
|
|
// SetXaSQLTimeoutMs set XaSQLTimeoutMs
|
|
|
|
func SetXaSQLTimeoutMs(ms int) { |
|
|
|
dtmimp.XaSQLTimeoutMs = ms |
|
|
|
} |
|
|
|
|
|
|
|
// GetXaSqlTimeoutMs get XaSqlTimeoutMs
|
|
|
|
func GetXaSqlTimeoutMs() int { |
|
|
|
return dtmimp.XaSqlTimeoutMs |
|
|
|
// GetXaSQLTimeoutMs get XaSQLTimeoutMs
|
|
|
|
func GetXaSQLTimeoutMs() int { |
|
|
|
return dtmimp.XaSQLTimeoutMs |
|
|
|
} |
|
|
|
|
|
|
|
// SetBarrierTableName sets barrier table name
|
|
|
|
func SetBarrierTableName(tablename string) { |
|
|
|
dtmimp.BarrierTableName = tablename |
|
|
|
} |
|
|
|
|
|
|
|
@ -26,7 +26,7 @@ func TestTypes(t *testing.T) { |
|
|
|
} |
|
|
|
|
|
|
|
func TestXaSqlTimeout(t *testing.T) { |
|
|
|
old := GetXaSqlTimeoutMs() |
|
|
|
SetXaSqlTimeoutMs(old) |
|
|
|
old := GetXaSQLTimeoutMs() |
|
|
|
SetXaSQLTimeoutMs(old) |
|
|
|
SetBarrierTableName(dtmimp.BarrierTableName) // just cover this func
|
|
|
|
} |
|
|
|
|
|
|
|
@ -86,7 +86,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr e |
|
|
|
return xc.XaGlobalTransaction2(gid, func(x *Xa) {}, xaFunc) |
|
|
|
} |
|
|
|
|
|
|
|
// XaGlobalTransaction start a xa global transaction
|
|
|
|
// XaGlobalTransaction2 start a xa global transaction
|
|
|
|
func (xc *XaClient) XaGlobalTransaction2(gid string, custom func(*Xa), xaFunc XaGlobalFunc) (rerr error) { |
|
|
|
xa := &Xa{TransBase: *dtmimp.NewTransBase(gid, "xa", xc.XaClientBase.Server, "")} |
|
|
|
custom(xa) |
|
|
|
|
|
|
|
@ -3,7 +3,7 @@ module github.com/dtm-labs/dtm |
|
|
|
go 1.16 |
|
|
|
|
|
|
|
require ( |
|
|
|
bou.ke/monkey v1.0.2 // indirect |
|
|
|
bou.ke/monkey v1.0.2 |
|
|
|
github.com/dtm-labs/dtmdriver v0.0.1 |
|
|
|
github.com/dtm-labs/dtmdriver-gozero v0.0.1 |
|
|
|
github.com/dtm-labs/dtmdriver-polaris v0.0.2 |
|
|
|
|