mirror of https://github.com/dtm-labs/dtm.git
csharpjavadistributed-transactionsdtmgogolangmicroservicenodejsphpdatabasesagaseatatcctransactiontransactionsxapythondistributed
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
151 lines
4.9 KiB
151 lines
4.9 KiB
package test
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/dtm-labs/dtm/client/dtmcli"
|
|
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
|
|
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgimp"
|
|
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgpb"
|
|
"github.com/dtm-labs/dtm/dtmutil"
|
|
"github.com/pkg/errors"
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
const (
|
|
testTopicTestTopicNormal = "test_topic_TestTopicNormal"
|
|
testTopicTestConcurrentUpdateTopic = "concurrent_topic_TestConcurrentUpdateTopic"
|
|
)
|
|
|
|
func TestTopicNormal(t *testing.T) {
|
|
testSubscribe(t, httpSubscribe)
|
|
testUnsubscribe(t, httpUnsubscribe)
|
|
testDeleteTopic(t, httpDeleteTopic)
|
|
|
|
testSubscribe(t, grpcSubscribe)
|
|
testUnsubscribe(t, grpcUnsubscribe)
|
|
testDeleteTopic(t, grpcDeleteTopic)
|
|
}
|
|
|
|
func TestConcurrentUpdateTopic(t *testing.T) {
|
|
var wg sync.WaitGroup
|
|
var urls []string
|
|
var errNum int
|
|
concurrentTimes := 20
|
|
// concurrently updates the topic, part of them succeed
|
|
for i := 0; i < concurrentTimes; i++ {
|
|
wg.Add(1)
|
|
go func(i int) {
|
|
url := "http://dtm/test" + strconv.Itoa(i)
|
|
err := httpSubscribe(testTopicTestConcurrentUpdateTopic, url)
|
|
if err == nil {
|
|
urls = append(urls, url)
|
|
} else {
|
|
errNum++
|
|
}
|
|
wg.Done()
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
assert.True(t, len(urls) > 0)
|
|
|
|
// delete successfully subscribed urls above, all of them should succeed
|
|
for _, url := range urls {
|
|
assert.Nil(t, httpUnsubscribe(testTopicTestConcurrentUpdateTopic, url))
|
|
}
|
|
|
|
// finally, the topic version should be correct
|
|
m := map[string]interface{}{}
|
|
resp, err := dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
|
|
"cat": "topics",
|
|
"key": testTopicTestConcurrentUpdateTopic,
|
|
}).Get(dtmutil.DefaultHTTPServer + "/queryKV")
|
|
assert.Nil(t, err)
|
|
dtmimp.MustUnmarshalString(resp.String(), &m)
|
|
dtmimp.MustRemarshal(m["kv"].([]interface{})[0], &m)
|
|
assert.Equal(t, float64((concurrentTimes-errNum)*2), m["version"])
|
|
}
|
|
|
|
func testSubscribe(t *testing.T, subscribe func(topic, url string) error) {
|
|
assert.Nil(t, subscribe(testTopicTestTopicNormal, "http://dtm/test1"))
|
|
assert.Error(t, subscribe(testTopicTestTopicNormal, "http://dtm/test1")) // error:repeat subscription
|
|
assert.Error(t, subscribe("", "http://dtm/test1")) // error:empty topic
|
|
assert.Error(t, subscribe(testTopicTestTopicNormal, "")) // error:empty url
|
|
assert.Nil(t, subscribe(testTopicTestTopicNormal, "http://dtm/test2"))
|
|
}
|
|
|
|
func testUnsubscribe(t *testing.T, unsubscribe func(topic, url string) error) {
|
|
assert.Nil(t, unsubscribe(testTopicTestTopicNormal, "http://dtm/test1"))
|
|
assert.Error(t, unsubscribe(testTopicTestTopicNormal, "http://dtm/test1")) // error:repeat unsubscription
|
|
assert.Error(t, unsubscribe("", "http://dtm/test1")) // error:empty topic
|
|
assert.Error(t, unsubscribe(testTopicTestTopicNormal, "")) // error:empty url
|
|
assert.Error(t, unsubscribe("non_existent_topic", "http://dtm/test1")) // error:unsubscribe a non-existent topic
|
|
assert.Nil(t, unsubscribe(testTopicTestTopicNormal, "http://dtm/test2"))
|
|
assert.Error(t, unsubscribe(testTopicTestTopicNormal, "http://dtm/test2"))
|
|
}
|
|
|
|
func testDeleteTopic(t *testing.T, deleteTopic func(topic string) error) {
|
|
assert.Error(t, deleteTopic("non_existent_testDeleteTopic"))
|
|
assert.Nil(t, deleteTopic(testTopicTestTopicNormal))
|
|
}
|
|
|
|
func httpSubscribe(topic, url string) error {
|
|
resp, err := dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
|
|
"topic": topic,
|
|
"url": url,
|
|
"remark": "for test",
|
|
}).Get(dtmutil.DefaultHTTPServer + "/subscribe")
|
|
e2p(err)
|
|
if resp.StatusCode() != 200 {
|
|
err = errors.Errorf("Http Request Error. Resp:%v", resp.String())
|
|
}
|
|
return err
|
|
}
|
|
|
|
func httpUnsubscribe(topic, url string) error {
|
|
resp, err := dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
|
|
"topic": topic,
|
|
"url": url,
|
|
}).Get(dtmutil.DefaultHTTPServer + "/unsubscribe")
|
|
e2p(err)
|
|
if resp.StatusCode() != 200 {
|
|
err = errors.Errorf("Http Request Error. Resp:%+v", resp.String())
|
|
}
|
|
return err
|
|
}
|
|
|
|
func httpDeleteTopic(topic string) error {
|
|
resp, err := dtmcli.GetRestyClient().R().Delete(dtmutil.DefaultHTTPServer + "/topic/" + topic)
|
|
e2p(err)
|
|
if resp.StatusCode() != 200 {
|
|
err = errors.Errorf("Http Request Error. Resp:%+v", resp.String())
|
|
}
|
|
return err
|
|
}
|
|
|
|
func grpcSubscribe(topic, url string) error {
|
|
_, err := dtmgimp.MustGetDtmClient(dtmutil.DefaultGrpcServer).Subscribe(context.Background(),
|
|
&dtmgpb.DtmTopicRequest{
|
|
Topic: topic,
|
|
URL: url,
|
|
Remark: "for test"})
|
|
return err
|
|
}
|
|
|
|
func grpcUnsubscribe(topic, url string) error {
|
|
_, err := dtmgimp.MustGetDtmClient(dtmutil.DefaultGrpcServer).Unsubscribe(context.Background(),
|
|
&dtmgpb.DtmTopicRequest{
|
|
Topic: topic,
|
|
URL: url})
|
|
return err
|
|
}
|
|
|
|
func grpcDeleteTopic(topic string) error {
|
|
_, err := dtmgimp.MustGetDtmClient(dtmutil.DefaultGrpcServer).DeleteTopic(context.Background(),
|
|
&dtmgpb.DtmTopicRequest{
|
|
Topic: topic})
|
|
return err
|
|
}
|
|
|