mirror of https://github.com/dtm-labs/dtm.git
csharpjavadistributed-transactionsdtmgogolangmicroservicenodejsphpdatabasesagaseatatcctransactiontransactionsxapythondistributed
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
160 lines
4.8 KiB
160 lines
4.8 KiB
/*
|
|
* Copyright (c) 2021 yedf. All rights reserved.
|
|
* Use of this source code is governed by a BSD-style
|
|
* license that can be found in the LICENSE file.
|
|
*/
|
|
|
|
package dtmimp
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-resty/resty/v2"
|
|
)
|
|
|
|
// BranchIDGen used to generate a sub branch id
|
|
type BranchIDGen struct {
|
|
BranchID string
|
|
subBranchID int
|
|
}
|
|
|
|
// NewSubBranchID generate a sub branch id
|
|
func (g *BranchIDGen) NewSubBranchID() string {
|
|
if g.subBranchID >= 99 {
|
|
panic(fmt.Errorf("branch id is larger than 99"))
|
|
}
|
|
if len(g.BranchID) >= 20 {
|
|
panic(fmt.Errorf("total branch id is longer than 20"))
|
|
}
|
|
g.subBranchID = g.subBranchID + 1
|
|
return g.CurrentSubBranchID()
|
|
}
|
|
|
|
// CurrentSubBranchID return current branchID
|
|
func (g *BranchIDGen) CurrentSubBranchID() string {
|
|
return g.BranchID + fmt.Sprintf("%02d", g.subBranchID)
|
|
}
|
|
|
|
// TransOptions transaction options
|
|
type TransOptions struct {
|
|
WaitResult bool `json:"wait_result,omitempty" gorm:"-"`
|
|
TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc, unit: second
|
|
RequestTimeout int64 `json:"requestTimeout" gorm:"-"` // for global trans resets request timeout, unit: second
|
|
RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc, unit: second
|
|
PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"`
|
|
BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` // custom branch headers, dtm server => service api
|
|
Concurrent bool `json:"concurrent" gorm:"-"` // for trans type: saga msg
|
|
}
|
|
|
|
// TransBase base for all trans
|
|
type TransBase struct {
|
|
Gid string `json:"gid"`
|
|
TransType string `json:"trans_type"`
|
|
Dtm string `json:"-"`
|
|
CustomData string `json:"custom_data,omitempty"`
|
|
TransOptions
|
|
|
|
Steps []map[string]string `json:"steps,omitempty"` // use in MSG/SAGA
|
|
Payloads []string `json:"payloads,omitempty"` // used in MSG/SAGA
|
|
BinPayloads [][]byte `json:"-"`
|
|
BranchIDGen `json:"-"` // used in XA/TCC
|
|
Op string `json:"-"` // used in XA/TCC
|
|
|
|
QueryPrepared string `json:"query_prepared,omitempty"` // used in MSG
|
|
Protocol string `json:"protocol"`
|
|
}
|
|
|
|
// NewTransBase new a TransBase
|
|
func NewTransBase(gid string, transType string, dtm string, branchID string) *TransBase {
|
|
return &TransBase{
|
|
Gid: gid,
|
|
TransType: transType,
|
|
BranchIDGen: BranchIDGen{BranchID: branchID},
|
|
Dtm: dtm,
|
|
TransOptions: TransOptions{PassthroughHeaders: PassthroughHeaders},
|
|
}
|
|
}
|
|
|
|
// WithGlobalTransRequestTimeout defines global trans request timeout
|
|
func (t *TransBase) WithGlobalTransRequestTimeout(timeout int64) {
|
|
t.RequestTimeout = timeout
|
|
}
|
|
|
|
// TransBaseFromQuery construct transaction info from request
|
|
func TransBaseFromQuery(qs url.Values) *TransBase {
|
|
return NewTransBase(qs.Get("gid"), qs.Get("trans_type"), qs.Get("dtm"), qs.Get("branch_id"))
|
|
}
|
|
|
|
// TransCallDtm TransBase call dtm
|
|
func TransCallDtm(tb *TransBase, body interface{}, operation string) error {
|
|
if tb.RequestTimeout != 0 {
|
|
RestyClient.SetTimeout(time.Duration(tb.RequestTimeout) * time.Second)
|
|
}
|
|
if tb.Protocol == Jrpc {
|
|
var result map[string]interface{}
|
|
resp, err := RestyClient.R().
|
|
SetBody(map[string]interface{}{
|
|
"jsonrpc": "2.0",
|
|
"id": "no-use",
|
|
"method": operation,
|
|
"params": body,
|
|
}).
|
|
SetResult(&result).
|
|
Post(tb.Dtm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.StatusCode() != http.StatusOK || result["error"] != nil {
|
|
return errors.New(resp.String())
|
|
}
|
|
return nil
|
|
}
|
|
resp, err := RestyClient.R().
|
|
SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.StatusCode() != http.StatusOK || strings.Contains(resp.String(), ResultFailure) {
|
|
return errors.New(resp.String())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TransRegisterBranch TransBase register a branch to dtm
|
|
func TransRegisterBranch(tb *TransBase, added map[string]string, operation string) error {
|
|
m := map[string]string{
|
|
"gid": tb.Gid,
|
|
"trans_type": tb.TransType,
|
|
}
|
|
for k, v := range added {
|
|
m[k] = v
|
|
}
|
|
return TransCallDtm(tb, m, operation)
|
|
}
|
|
|
|
// TransRequestBranch TransBase request branch result
|
|
func TransRequestBranch(t *TransBase, method string, body interface{}, branchID string, op string, url string) (*resty.Response, error) {
|
|
if url == "" {
|
|
return nil, nil
|
|
}
|
|
resp, err := RestyClient.R().
|
|
SetBody(body).
|
|
SetQueryParams(map[string]string{
|
|
"dtm": t.Dtm,
|
|
"gid": t.Gid,
|
|
"branch_id": branchID,
|
|
"trans_type": t.TransType,
|
|
"op": op,
|
|
}).
|
|
SetHeaders(t.BranchHeaders).
|
|
Execute(method, url)
|
|
if err == nil {
|
|
err = RespAsErrorCompatible(resp)
|
|
}
|
|
return resp, err
|
|
}
|
|
|