Browse Source

Merge pull request #347 from Chiwency/main

feature:topic
pull/360/head
yedf2 3 years ago
committed by GitHub
parent
commit
9b868a815b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 7891
      admin/package-lock.json
  2. 4
      admin/package.json
  3. 45
      admin/src/api/api_dtm.ts
  4. 1
      admin/src/components.d.ts
  5. 5
      admin/src/main.ts
  6. 16
      admin/src/router/index.ts
  7. 155
      admin/src/views/Dashboard/KVPairs/Topics.vue
  8. 99
      admin/src/views/Dashboard/KVPairs/_Components/DialogTopicDetail.vue
  9. 71
      admin/src/views/Dashboard/KVPairs/_Components/DialogTopicSubscribe.vue
  10. 1892
      admin/yarn.lock
  11. 2
      client/dtmcli/dtmimp/consts.go
  12. 6
      client/dtmcli/trans_msg.go
  13. 187
      client/dtmgrpc/dtmgpb/dtmgimp.pb.go
  14. 9
      client/dtmgrpc/dtmgpb/dtmgimp.proto
  15. 110
      client/dtmgrpc/dtmgpb/dtmgimp_grpc.pb.go
  16. 6
      client/dtmgrpc/msg.go
  17. 4
      conf.sample.yml
  18. 12
      dtmsvr/api_grpc.go
  19. 45
      dtmsvr/api_http.go
  20. 1
      dtmsvr/config/config.go
  21. 13
      dtmsvr/cron.go
  22. 142
      dtmsvr/storage/boltdb/boltdb.go
  23. 129
      dtmsvr/storage/redis/redis.go
  24. 75
      dtmsvr/storage/sql/sql.go
  25. 5
      dtmsvr/storage/store.go
  26. 14
      dtmsvr/storage/trans.go
  27. 2
      dtmsvr/svr.go
  28. 115
      dtmsvr/topics.go
  29. 24
      dtmsvr/trans_type_msg.go
  30. 1
      go.mod
  31. 14
      sqls/dtmsvr.storage.postgres.sql
  32. 60
      test/api_test.go
  33. 5
      test/main_test.go
  34. 12
      test/msg_grpc_test.go
  35. 22
      test/msg_test.go
  36. 151
      test/topic_test.go

7891
admin/package-lock.json

File diff suppressed because it is too large

4
admin/package.json

@ -10,6 +10,7 @@
"dependencies": {
"ant-design-vue": "^3.1.1",
"vue": "^3.2.25",
"vue-demi": "^0.13.11",
"vue-request": "^1.2.4"
},
"devDependencies": {
@ -22,6 +23,7 @@
"axios": "^0.26.1",
"eslint": "^8.13.0",
"eslint-plugin-vue": "^8.6.0",
"fast-glob": "^3.2.11",
"nprogress": "^0.2.0",
"pinia": "^2.0.0-rc.10",
"postcss": "^8.4.12",
@ -32,7 +34,7 @@
"tailwindcss": "^3.0.24",
"typescript": "^4.5.4",
"unplugin-vue-components": "^0.19.3",
"vite": "^2.9.13",
"vite": "^3.0.9",
"vite-plugin-svg-icons": "^1.1.0",
"vue-router": "^4.0.13",
"vue-tsc": "^0.29.8"

45
admin/src/api/api_dtm.ts

@ -7,6 +7,12 @@ export interface IListAllTransactionsReq {
position?: string
}
export interface IListAllKVReq {
cat: string
limit: number
position?: string
}
export function listAllTransactions<T>(payload: IListAllTransactionsReq): Promise<AxiosResponse<T>> {
return request({
url: '/api/dtmsvr/all',
@ -23,6 +29,45 @@ export function forceStopTransaction(gid: string): Promise<AxiosResponse> {
})
}
export function queryKVPair<T>(payload: { "cat": string, "key": string }): Promise<AxiosResponse<T>> {
return request({
url: '/api/dtmsvr/queryKV',
method: 'get',
params: payload
})
}
export function listKVPairs<T>(payload: IListAllKVReq): Promise<AxiosResponse<T>> {
return request({
url: '/api/dtmsvr/scanKV',
method: 'get',
params: payload
})
}
export function deleteTopic<T>(topicName: string): Promise<AxiosResponse<T>> {
return request({
url: '/api/dtmsvr/topic/' + topicName,
method: 'delete'
})
}
export function subscribe<T>(payload: { topic: string, url: string, remark: string }): Promise<AxiosResponse<T>> {
return request({
url: '/api/dtmsvr/subscribe',
method: 'get',
params: payload
})
}
export function unsubscribe(payload: { topic: string, url: string }): Promise<AxiosResponse> {
return request({
url: '/api/dtmsvr/unsubscribe',
method: 'get',
params: payload
})
}
export function getTransaction<T>(payload: { gid: string }): Promise<AxiosResponse<T>> {
return request({
url: '/api/dtmsvr/query',

1
admin/src/components.d.ts

@ -23,7 +23,6 @@ declare module '@vue/runtime-core' {
ASpace: typeof import('ant-design-vue/es')['Space']
ASubMenu: typeof import('ant-design-vue/es')['SubMenu']
ATable: typeof import('ant-design-vue/es')['Table']
ATag: typeof import('ant-design-vue/es')['Tag']
ATextarea: typeof import('ant-design-vue/es')['Textarea']
RouterLink: typeof import('vue-router')['RouterLink']
RouterView: typeof import('vue-router')['RouterView']

5
admin/src/main.ts

@ -5,6 +5,7 @@ import { pinia } from '/@/store'
import { useLayoutStore } from '/@/store/modules/layout'
import '/@/permission'
import 'ant-design-vue/dist/antd.css'
import '/@/assets/css/index.css'
import 'virtual:svg-icons-register'
@ -13,10 +14,8 @@ app.use(router)
app.use(pinia)
app.mount('#app')
window.onunhandledrejection = (ev: PromiseRejectionEvent) => {
showAlert(ev.reason.stack || ev.reason.message)
}
window.onerror = err => {
if (typeof err === "string") {
@ -30,4 +29,4 @@ function showAlert(msg: string) {
if (!layout.globalError) {
layout.setGlobalError(msg)
}
}
}

16
admin/src/router/index.ts

@ -46,7 +46,21 @@ export const allowRouter: Array<IMenubarList> = [
// meta: { title: 'Unfinished Transactions' },
}
]
}
},{
name: 'KVPairs',
path: '/admin/kv',
component: Components['LayoutMain'],
meta: { title: 'Key-Value Pairs' },
children: [
{
name: 'Topics',
path: '/admin/kv/topics',
component: Components['Topics'],
meta: { title: 'Topics' },
}
]
},
]
}
]

155
admin/src/views/Dashboard/KVPairs/Topics.vue

@ -0,0 +1,155 @@
<template>
<div>
<a-button type="primary" class="mb-2" @click="handleTopicSubscribe('')">Subscribe</a-button>
<a-table :columns="columns" :data-source="dataSource" :loading="loading" :pagination="false">
<template #bodyCell="{column, record}">
<template v-if="column.key === 'subscribers'">
<span>{{ JSON.parse(record.v).length }}</span>
</template>
<template v-if="column.key === 'action'">
<span>
<a class="mr-2 font-medium" @click="handleTopicSubscribe(record.k)">Subscribe</a>
<a class="mr-2 font-medium" @click="handleTopicDetail(record.k,record.v)">Detail</a>
<a class="text-red-400 font-medium" @click="handleDeleteTopic(record.k)">Delete</a>
</span>
</template>
</template>
</a-table>
<div class="flex justify-center mt-2 text-lg pager" v-if="canPrev || canNext">
<a-button type="text" :disabled="!canPrev" @click="handlePrevPage">Previous</a-button>
<a-button type="text" :disabled="!canNext" @click="handleNextPage">Next</a-button>
</div>
<DialogTopicDetail ref="topicDetail" @unsubscribed="handleRefreshData"/>
<DialogTopicSubscribe ref="topicSubscribe" @subscribed="handleRefreshData"/>
</div>
</template>
<script setup lang="ts">
import {deleteTopic, IListAllKVReq, listKVPairs} from '/@/api/api_dtm'
import {computed, ref} from 'vue-demi'
import {usePagination} from 'vue-request'
import DialogTopicDetail from './_Components/DialogTopicDetail.vue';
import DialogTopicSubscribe from './_Components/DialogTopicSubscribe.vue';
import {message, Modal} from 'ant-design-vue';
const columns = [
{
title: 'Name',
dataIndex: 'k',
key: 'name'
}, {
title: 'Subscribers',
dataIndex: 'v',
key: 'subscribers'
}, {
title: 'Version',
dataIndex: 'version',
key: 'version'
}, {
title: 'Action',
key: 'action'
}
]
const pages = ref([''])
const curPage = ref(1)
const canPrev = computed(() => {
return curPage.value > 1
})
const canNext = computed(() => {
return data.value?.data.next_position !== ""
})
type Data = {
kv: {
k: string
v: string
version: number
}[]
next_position: string
}
const queryData = (params: IListAllKVReq) => {
return listKVPairs<Data>(params)
}
const {data, run, current, loading, pageSize} = usePagination(queryData, {
defaultParams: [
{
cat: "topics",
limit: 10,
}
],
pagination: {
pageSizeKey: 'limit'
}
})
const dataSource = computed(() => data.value?.data.kv || [])
const handlePrevPage = () => {
curPage.value -= 1;
const params = {
cat: "topics",
limit: pageSize.value,
position: pages.value[curPage.value] as string
}
run(params)
}
const handleNextPage = () => {
curPage.value += 1;
pages.value[curPage.value] = data.value?.data.next_position as string
run({
cat: "topics",
position: data.value?.data.next_position,
limit: pageSize.value,
})
}
const handleRefreshData = () => {
run({cat: 'topics', limit: pageSize.value})
}
const handleDeleteTopic = (topic: string) => {
Modal.confirm({
title: 'Delete',
content: 'Do you want delete this topic? ',
okText: 'Yes',
okType: 'danger',
cancelText: 'Cancel',
onOk: async () => {
await deleteTopic(topic)
message.success('Delete topic succeed')
run({cat: 'topics', limit: pageSize.value})
}
})
}
const topicDetail = ref<null | { open: (topic: string, subscribers: string) => null }>(null)
const handleTopicDetail = (topic: string, subscribers: string) => {
topicDetail.value?.open(topic, subscribers)
}
const topicSubscribe = ref<null | { open: (topic: string) => null }>(null)
const handleTopicSubscribe = (topic: string) => {
topicSubscribe.value?.open(topic)
}
</script>
<style lang="postcss" scoped>
::deep .ant-pagination-item {
display: none;
}
.pager .ant-btn-text {
font-weight: 500;
}
.pager .ant-btn {
padding: 6px;
}
</style>

99
admin/src/views/Dashboard/KVPairs/_Components/DialogTopicDetail.vue

@ -0,0 +1,99 @@
<template>
<div>
<a-modal v-model:visible="visible" :title=topicName width="100%" wrap-class-name="full-modal" :footer="null">
<a-table :columns="columns" :data-source="dataSource" :pagination="false">
<template #bodyCell="{column, record}">
<template v-if="column.key === 'action'">
<span>
<a class="text-red-400 font-medium" @click="handleUnsubscribe(record.url)">Unsubscribe</a>
</span>
</template>
</template>
</a-table>
<!-- <div class="mt-10 relative">
<a-textarea id="qs" v-model:value="textVal" :auto-size="{ minRows: 10, maxRows: 10 }" />
<screenfull class="absolute right-2 top-3 z-50" identity="qs" />
</div> -->
</a-modal>
</div>
</template>
<script setup lang="ts">
import {ref} from 'vue';
import {unsubscribe} from "/@/api/api_dtm";
import {message, Modal} from "ant-design-vue";
// import VueJsonPretty from 'vue-json-pretty';
// import 'vue-json-pretty/lib/styles.css'
const dataSource = ref<Subscriber[]>([])
const visible = ref(false)
const topicName = ref<string>("");
const open = async (topic: string, subscribers: string) => {
dataSource.value = JSON.parse(subscribers)
topicName.value = topic
visible.value = true
}
const columns = [
{
title: 'URL',
dataIndex: 'url',
key: 'url'
}, {
title: 'Remark',
dataIndex: 'remark',
key: 'remark'
}, {
title: 'Action',
key: 'action'
}
]
interface Subscriber {
url: string
remark: string
}
const handleUnsubscribe = async (url: string) => {
Modal.confirm({
title: 'Unsubscribe',
content: 'Do you want unsubscribe this topic?',
okText: 'Yes',
okType: 'danger',
cancelText: 'Cancel',
onOk: async () => {
await unsubscribe({
topic: topicName.value,
url: url
})
message.success('Unsubscribe topic succeed')
location.reload()
}
})
}
defineExpose({
open
})
</script>
<style lang="postcss">
.full-modal .ant-modal {
max-width: 100%;
top: 0;
padding-bottom: 0;
margin: 0;
}
.full-modal .ant-modal-content {
display: flex;
flex-direction: column;
height: calc(100vh);
}
.full-modal .ant-modal-body {
flex: 1;
}
</style>

71
admin/src/views/Dashboard/KVPairs/_Components/DialogTopicSubscribe.vue

@ -0,0 +1,71 @@
<template>
<div>
<a-modal v-model:visible="visible" width="60%" title="Topic Subscribe" :confirm-loading="confirmLoading"
@ok="handleSubscribe">
<a-form v-bind="layout" :mode="form">
<a-form-item label="Topic: ">
<a-input v-model:value="form.topic" placeholder="Please input your topic..."/>
</a-form-item>
<a-form-item label="URL: ">
<a-input v-model:value="form.url" placeholder="Please input your url..."/>
</a-form-item>
<a-form-item label="Remark">
<a-textarea v-model:value="form.remark" :rows="6" placeholder="Please input your remark..."/>
</a-form-item>
</a-form>
</a-modal>
</div>
</template>
<script setup lang="ts">
import {message} from 'ant-design-vue';
import {reactive, ref} from 'vue';
import {subscribe} from '/@/api/api_dtm'
interface formState {
topic: string
url: string
remark: string
}
const layout = {
labelCol: {span: 4},
wrapperCol: {span: 16},
}
const form = reactive<formState>({
topic: '',
url: '',
remark: ''
})
const visible = ref(false)
const open = async (topic: string) => {
form.topic = topic
visible.value = true
}
const emit = defineEmits(['subscribed'])
const confirmLoading = ref<boolean>(false)
const handleSubscribe = async () => {
confirmLoading.value = true
await subscribe<string>(form).then(
() => {
visible.value = false
message.success('Subscribe succeed')
confirmLoading.value = false
emit('subscribed')
}
).catch(() => {
message.error('Failed')
confirmLoading.value = false
return
})
}
defineExpose({
open
})
</script>

1892
admin/yarn.lock

File diff suppressed because it is too large

2
client/dtmcli/dtmimp/consts.go

@ -52,6 +52,8 @@ const (
MsgDoBarrier1 = "01"
// MsgDoOp const for DoAndSubmit barrier op
MsgDoOp = "msg"
//MsgTopicPrefix const for Add topic msg
MsgTopicPrefix = "topic://"
// XaBarrier1 const for xa barrier id
XaBarrier1 = "01"

6
client/dtmcli/trans_msg.go

@ -9,6 +9,7 @@ package dtmcli
import (
"database/sql"
"errors"
"fmt"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
)
@ -31,6 +32,11 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
return s
}
// AddTopic add a new topic step
func (s *Msg) AddTopic(topic string, postData interface{}) *Msg {
return s.Add(fmt.Sprintf("%s%s", dtmimp.MsgTopicPrefix, topic), postData)
}
// SetDelay delay call branch, unit second
func (s *Msg) SetDelay(delay uint64) *Msg {
s.delay = delay

187
client/dtmgrpc/dtmgpb/dtmgimp.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.21.5
// protoc v3.15.6
// source: client/dtmgrpc/dtmgpb/dtmgimp.proto
package dtmgpb
@ -552,6 +552,69 @@ func (x *DtmProgress) GetOp() string {
return ""
}
type DtmTopicRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Topic string `protobuf:"bytes,1,opt,name=Topic,proto3" json:"Topic,omitempty"`
URL string `protobuf:"bytes,2,opt,name=URL,proto3" json:"URL,omitempty"`
Remark string `protobuf:"bytes,3,opt,name=Remark,proto3" json:"Remark,omitempty"`
}
func (x *DtmTopicRequest) Reset() {
*x = DtmTopicRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_client_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *DtmTopicRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*DtmTopicRequest) ProtoMessage() {}
func (x *DtmTopicRequest) ProtoReflect() protoreflect.Message {
mi := &file_client_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[7]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use DtmTopicRequest.ProtoReflect.Descriptor instead.
func (*DtmTopicRequest) Descriptor() ([]byte, []int) {
return file_client_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescGZIP(), []int{7}
}
func (x *DtmTopicRequest) GetTopic() string {
if x != nil {
return x.Topic
}
return ""
}
func (x *DtmTopicRequest) GetURL() string {
if x != nil {
return x.URL
}
return ""
}
func (x *DtmTopicRequest) GetRemark() string {
if x != nil {
return x.Remark
}
return ""
}
var File_client_dtmgrpc_dtmgpb_dtmgimp_proto protoreflect.FileDescriptor
var file_client_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{
@ -649,32 +712,49 @@ var file_client_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc = []byte{
0x69, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x12, 0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68,
0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68,
0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x4f, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02,
0x4f, 0x70, 0x32, 0xf8, 0x02, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x38, 0x0a, 0x06, 0x4e, 0x65,
0x77, 0x47, 0x69, 0x64, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x14, 0x2e, 0x64,
0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64, 0x52, 0x65, 0x70,
0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x13,
0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a,
0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69,
0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e,
0x4f, 0x70, 0x22, 0x51, 0x0a, 0x0f, 0x44, 0x74, 0x6d, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x10, 0x0a, 0x03, 0x55,
0x52, 0x4c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x55, 0x52, 0x4c, 0x12, 0x16, 0x0a,
0x06, 0x52, 0x65, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x52,
0x65, 0x6d, 0x61, 0x72, 0x6b, 0x32, 0xbf, 0x04, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x38, 0x0a,
0x06, 0x4e, 0x65, 0x77, 0x47, 0x69, 0x64, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a,
0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x47, 0x69, 0x64,
0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69,
0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00,
0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x12, 0x13, 0x2e, 0x64, 0x74,
0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x41, 0x62,
0x6f, 0x72, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74,
0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79,
0x22, 0x00, 0x12, 0x45, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x42, 0x72,
0x61, 0x6e, 0x63, 0x68, 0x12, 0x19, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44,
0x74, 0x6d, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0f, 0x50, 0x72, 0x65,
0x70, 0x61, 0x72, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x13, 0x2e, 0x64,
0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x1b, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x50,
0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00,
0x12, 0x3f, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x18, 0x2e,
0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x6f, 0x70, 0x69, 0x63,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22,
0x00, 0x12, 0x41, 0x0a, 0x0b, 0x55, 0x6e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x12, 0x18, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x6f,
0x70, 0x69, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70,
0x74, 0x79, 0x22, 0x00, 0x12, 0x41, 0x0a, 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x54, 0x6f,
0x70, 0x69, 0x63, 0x12, 0x18, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74,
0x6d, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x41, 0x62, 0x6f, 0x72, 0x74,
0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12,
0x45, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x42, 0x72, 0x61, 0x6e, 0x63,
0x68, 0x12, 0x19, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x42,
0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67,
0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45,
0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0f, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72,
0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67,
0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b,
0x2e, 0x64, 0x74, 0x6d, 0x67, 0x69, 0x6d, 0x70, 0x2e, 0x44, 0x74, 0x6d, 0x50, 0x72, 0x6f, 0x67,
0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a,
0x08, 0x2e, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x33,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x64, 0x74, 0x6d,
0x67, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -689,7 +769,7 @@ func file_client_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescGZIP() []byte {
return file_client_dtmgrpc_dtmgpb_dtmgimp_proto_rawDescData
}
var file_client_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
var file_client_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes = make([]protoimpl.MessageInfo, 11)
var file_client_dtmgrpc_dtmgpb_dtmgimp_proto_goTypes = []interface{}{
(*DtmTransOptions)(nil), // 0: dtmgimp.DtmTransOptions
(*DtmRequest)(nil), // 1: dtmgimp.DtmRequest
@ -698,32 +778,39 @@ var file_client_dtmgrpc_dtmgpb_dtmgimp_proto_goTypes = []interface{}{
(*DtmProgressesReply)(nil), // 4: dtmgimp.DtmProgressesReply
(*DtmTransaction)(nil), // 5: dtmgimp.DtmTransaction
(*DtmProgress)(nil), // 6: dtmgimp.DtmProgress
nil, // 7: dtmgimp.DtmTransOptions.BranchHeadersEntry
nil, // 8: dtmgimp.DtmRequest.ReqExtraEntry
nil, // 9: dtmgimp.DtmBranchRequest.DataEntry
(*emptypb.Empty)(nil), // 10: google.protobuf.Empty
(*DtmTopicRequest)(nil), // 7: dtmgimp.DtmTopicRequest
nil, // 8: dtmgimp.DtmTransOptions.BranchHeadersEntry
nil, // 9: dtmgimp.DtmRequest.ReqExtraEntry
nil, // 10: dtmgimp.DtmBranchRequest.DataEntry
(*emptypb.Empty)(nil), // 11: google.protobuf.Empty
}
var file_client_dtmgrpc_dtmgpb_dtmgimp_proto_depIdxs = []int32{
7, // 0: dtmgimp.DtmTransOptions.BranchHeaders:type_name -> dtmgimp.DtmTransOptions.BranchHeadersEntry
8, // 0: dtmgimp.DtmTransOptions.BranchHeaders:type_name -> dtmgimp.DtmTransOptions.BranchHeadersEntry
0, // 1: dtmgimp.DtmRequest.TransOptions:type_name -> dtmgimp.DtmTransOptions
8, // 2: dtmgimp.DtmRequest.ReqExtra:type_name -> dtmgimp.DtmRequest.ReqExtraEntry
9, // 3: dtmgimp.DtmBranchRequest.Data:type_name -> dtmgimp.DtmBranchRequest.DataEntry
9, // 2: dtmgimp.DtmRequest.ReqExtra:type_name -> dtmgimp.DtmRequest.ReqExtraEntry
10, // 3: dtmgimp.DtmBranchRequest.Data:type_name -> dtmgimp.DtmBranchRequest.DataEntry
5, // 4: dtmgimp.DtmProgressesReply.Transaction:type_name -> dtmgimp.DtmTransaction
6, // 5: dtmgimp.DtmProgressesReply.Progresses:type_name -> dtmgimp.DtmProgress
10, // 6: dtmgimp.Dtm.NewGid:input_type -> google.protobuf.Empty
11, // 6: dtmgimp.Dtm.NewGid:input_type -> google.protobuf.Empty
1, // 7: dtmgimp.Dtm.Submit:input_type -> dtmgimp.DtmRequest
1, // 8: dtmgimp.Dtm.Prepare:input_type -> dtmgimp.DtmRequest
1, // 9: dtmgimp.Dtm.Abort:input_type -> dtmgimp.DtmRequest
3, // 10: dtmgimp.Dtm.RegisterBranch:input_type -> dtmgimp.DtmBranchRequest
1, // 11: dtmgimp.Dtm.PrepareWorkflow:input_type -> dtmgimp.DtmRequest
2, // 12: dtmgimp.Dtm.NewGid:output_type -> dtmgimp.DtmGidReply
10, // 13: dtmgimp.Dtm.Submit:output_type -> google.protobuf.Empty
10, // 14: dtmgimp.Dtm.Prepare:output_type -> google.protobuf.Empty
10, // 15: dtmgimp.Dtm.Abort:output_type -> google.protobuf.Empty
10, // 16: dtmgimp.Dtm.RegisterBranch:output_type -> google.protobuf.Empty
4, // 17: dtmgimp.Dtm.PrepareWorkflow:output_type -> dtmgimp.DtmProgressesReply
12, // [12:18] is the sub-list for method output_type
6, // [6:12] is the sub-list for method input_type
7, // 12: dtmgimp.Dtm.Subscribe:input_type -> dtmgimp.DtmTopicRequest
7, // 13: dtmgimp.Dtm.Unsubscribe:input_type -> dtmgimp.DtmTopicRequest
7, // 14: dtmgimp.Dtm.DeleteTopic:input_type -> dtmgimp.DtmTopicRequest
2, // 15: dtmgimp.Dtm.NewGid:output_type -> dtmgimp.DtmGidReply
11, // 16: dtmgimp.Dtm.Submit:output_type -> google.protobuf.Empty
11, // 17: dtmgimp.Dtm.Prepare:output_type -> google.protobuf.Empty
11, // 18: dtmgimp.Dtm.Abort:output_type -> google.protobuf.Empty
11, // 19: dtmgimp.Dtm.RegisterBranch:output_type -> google.protobuf.Empty
4, // 20: dtmgimp.Dtm.PrepareWorkflow:output_type -> dtmgimp.DtmProgressesReply
11, // 21: dtmgimp.Dtm.Subscribe:output_type -> google.protobuf.Empty
11, // 22: dtmgimp.Dtm.Unsubscribe:output_type -> google.protobuf.Empty
11, // 23: dtmgimp.Dtm.DeleteTopic:output_type -> google.protobuf.Empty
15, // [15:24] is the sub-list for method output_type
6, // [6:15] is the sub-list for method input_type
6, // [6:6] is the sub-list for extension type_name
6, // [6:6] is the sub-list for extension extendee
0, // [0:6] is the sub-list for field type_name
@ -819,6 +906,18 @@ func file_client_dtmgrpc_dtmgpb_dtmgimp_proto_init() {
return nil
}
}
file_client_dtmgrpc_dtmgpb_dtmgimp_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DtmTopicRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
@ -826,7 +925,7 @@ func file_client_dtmgrpc_dtmgpb_dtmgimp_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_client_dtmgrpc_dtmgpb_dtmgimp_proto_rawDesc,
NumEnums: 0,
NumMessages: 10,
NumMessages: 11,
NumExtensions: 0,
NumServices: 1,
},

9
client/dtmgrpc/dtmgpb/dtmgimp.proto

@ -13,6 +13,9 @@ service Dtm {
rpc Abort(DtmRequest) returns (google.protobuf.Empty) {}
rpc RegisterBranch(DtmBranchRequest) returns (google.protobuf.Empty) {}
rpc PrepareWorkflow(DtmRequest) returns (DtmProgressesReply) {}
rpc Subscribe(DtmTopicRequest) returns (google.protobuf.Empty){}
rpc Unsubscribe(DtmTopicRequest) returns (google.protobuf.Empty){}
rpc DeleteTopic(DtmTopicRequest) returns (google.protobuf.Empty){}
}
message DtmTransOptions {
@ -68,4 +71,10 @@ message DtmProgress {
bytes BinData = 2;
string BranchID = 3;
string Op = 4;
}
message DtmTopicRequest {
string Topic = 1;
string URL = 2;
string Remark = 3;
}

110
client/dtmgrpc/dtmgpb/dtmgimp_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.17.3
// - protoc v3.15.6
// source: client/dtmgrpc/dtmgpb/dtmgimp.proto
package dtmgpb
@ -29,6 +29,9 @@ type DtmClient interface {
Abort(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
RegisterBranch(ctx context.Context, in *DtmBranchRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
PrepareWorkflow(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*DtmProgressesReply, error)
Subscribe(ctx context.Context, in *DtmTopicRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
Unsubscribe(ctx context.Context, in *DtmTopicRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
DeleteTopic(ctx context.Context, in *DtmTopicRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
}
type dtmClient struct {
@ -93,6 +96,33 @@ func (c *dtmClient) PrepareWorkflow(ctx context.Context, in *DtmRequest, opts ..
return out, nil
}
func (c *dtmClient) Subscribe(ctx context.Context, in *DtmTopicRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/dtmgimp.Dtm/Subscribe", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dtmClient) Unsubscribe(ctx context.Context, in *DtmTopicRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/dtmgimp.Dtm/Unsubscribe", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dtmClient) DeleteTopic(ctx context.Context, in *DtmTopicRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/dtmgimp.Dtm/DeleteTopic", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// DtmServer is the server API for Dtm service.
// All implementations must embed UnimplementedDtmServer
// for forward compatibility
@ -103,6 +133,9 @@ type DtmServer interface {
Abort(context.Context, *DtmRequest) (*emptypb.Empty, error)
RegisterBranch(context.Context, *DtmBranchRequest) (*emptypb.Empty, error)
PrepareWorkflow(context.Context, *DtmRequest) (*DtmProgressesReply, error)
Subscribe(context.Context, *DtmTopicRequest) (*emptypb.Empty, error)
Unsubscribe(context.Context, *DtmTopicRequest) (*emptypb.Empty, error)
DeleteTopic(context.Context, *DtmTopicRequest) (*emptypb.Empty, error)
mustEmbedUnimplementedDtmServer()
}
@ -128,6 +161,15 @@ func (UnimplementedDtmServer) RegisterBranch(context.Context, *DtmBranchRequest)
func (UnimplementedDtmServer) PrepareWorkflow(context.Context, *DtmRequest) (*DtmProgressesReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method PrepareWorkflow not implemented")
}
func (UnimplementedDtmServer) Subscribe(context.Context, *DtmTopicRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Subscribe not implemented")
}
func (UnimplementedDtmServer) Unsubscribe(context.Context, *DtmTopicRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Unsubscribe not implemented")
}
func (UnimplementedDtmServer) DeleteTopic(context.Context, *DtmTopicRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method DeleteTopic not implemented")
}
func (UnimplementedDtmServer) mustEmbedUnimplementedDtmServer() {}
// UnsafeDtmServer may be embedded to opt out of forward compatibility for this service.
@ -249,6 +291,60 @@ func _Dtm_PrepareWorkflow_Handler(srv interface{}, ctx context.Context, dec func
return interceptor(ctx, in, info, handler)
}
func _Dtm_Subscribe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DtmTopicRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DtmServer).Subscribe(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dtmgimp.Dtm/Subscribe",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DtmServer).Subscribe(ctx, req.(*DtmTopicRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Dtm_Unsubscribe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DtmTopicRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DtmServer).Unsubscribe(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dtmgimp.Dtm/Unsubscribe",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DtmServer).Unsubscribe(ctx, req.(*DtmTopicRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Dtm_DeleteTopic_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DtmTopicRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DtmServer).DeleteTopic(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dtmgimp.Dtm/DeleteTopic",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DtmServer).DeleteTopic(ctx, req.(*DtmTopicRequest))
}
return interceptor(ctx, in, info, handler)
}
// Dtm_ServiceDesc is the grpc.ServiceDesc for Dtm service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -280,6 +376,18 @@ var Dtm_ServiceDesc = grpc.ServiceDesc{
MethodName: "PrepareWorkflow",
Handler: _Dtm_PrepareWorkflow_Handler,
},
{
MethodName: "Subscribe",
Handler: _Dtm_Subscribe_Handler,
},
{
MethodName: "Unsubscribe",
Handler: _Dtm_Unsubscribe_Handler,
},
{
MethodName: "DeleteTopic",
Handler: _Dtm_DeleteTopic_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "client/dtmgrpc/dtmgpb/dtmgimp.proto",

6
client/dtmgrpc/msg.go

@ -9,6 +9,7 @@ package dtmgrpc
import (
"database/sql"
"errors"
"fmt"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@ -34,6 +35,11 @@ func (s *MsgGrpc) Add(action string, msg proto.Message) *MsgGrpc {
return s
}
// AddTopic add a new topic step
func (s *MsgGrpc) AddTopic(topic string, msg proto.Message) *MsgGrpc {
return s.Add(fmt.Sprintf("%s%s", dtmimp.MsgTopicPrefix, topic), msg)
}
// SetDelay delay call branch, unit second
func (s *MsgGrpc) SetDelay(delay uint64) *MsgGrpc {
s.Msg.SetDelay(delay)

4
conf.sample.yml

@ -62,4 +62,6 @@
### advanced options
# UpdateBranchAsyncGoroutineNum: 1 # num of async goroutine to update branch status
# TimeZoneOffset: '' #default '' using system default. '+8': Asia/Shanghai; '0': GMT
# TimeZoneOffset: '' #default '' using system default. '+8': Asia/Shanghai; '0': GMT
# ConfigUpdateInterval: 10 # the interval to update configuration in memory such as topics map... (seconds)

12
dtmsvr/api_grpc.go

@ -69,3 +69,15 @@ func (s *dtmServer) PrepareWorkflow(ctx context.Context, in *pb.DtmRequest) (*pb
}
return reply, dtmgrpc.DtmError2GrpcError(err)
}
func (s *dtmServer) Subscribe(ctx context.Context, in *pb.DtmTopicRequest) (*emptypb.Empty, error) {
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(Subscribe(in.Topic, in.URL, in.Remark))
}
func (s *dtmServer) Unsubscribe(ctx context.Context, in *pb.DtmTopicRequest) (*emptypb.Empty, error) {
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(Unsubscribe(in.Topic, in.URL))
}
func (s *dtmServer) DeleteTopic(ctx context.Context, in *pb.DtmTopicRequest) (*emptypb.Empty, error) {
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(GetStore().DeleteKV(topicsCat, in.Topic))
}

45
dtmsvr/api_http.go

@ -34,6 +34,11 @@ func addRoute(engine *gin.Engine) {
engine.GET("/api/dtmsvr/query", dtmutil.WrapHandler2(query))
engine.GET("/api/dtmsvr/all", dtmutil.WrapHandler2(all))
engine.GET("/api/dtmsvr/resetCronTime", dtmutil.WrapHandler2(resetCronTime))
engine.GET("/api/dtmsvr/subscribe", dtmutil.WrapHandler2(subscribe))
engine.GET("/api/dtmsvr/unsubscribe", dtmutil.WrapHandler2(unsubscribe))
engine.DELETE("/api/dtmsvr/topic/:topicName", dtmutil.WrapHandler2(deleteTopic))
engine.GET("/api/dtmsvr/scanKV", dtmutil.WrapHandler2(scanKV))
engine.GET("/api/dtmsvr/queryKV", dtmutil.WrapHandler2(queryKV))
// add prometheus exporter
h := promhttp.Handler()
@ -123,3 +128,43 @@ func resetCronTime(c *gin.Context) interface{} {
}
return map[string]interface{}{"has_remaining": hasRemaining, "succeed_count": succeedCount}
}
func scanKV(c *gin.Context) interface{} {
cat := c.DefaultQuery("cat", "")
position := c.Query("position")
sLimit := dtmimp.OrString(c.Query("limit"), "100")
kv := GetStore().ScanKV(cat, &position, int64(dtmimp.MustAtoi(sLimit)))
return map[string]interface{}{"kv": kv, "next_position": position}
}
func queryKV(c *gin.Context) interface{} {
cat := c.DefaultQuery("cat", "")
key := c.DefaultQuery("key", "")
kv := GetStore().FindKV(cat, key)
return map[string]interface{}{"kv": kv}
}
func subscribe(c *gin.Context) interface{} {
topic := c.Query("topic")
url := c.Query("url")
remark := c.Query("remark")
return Subscribe(topic, url, remark)
}
func unsubscribe(c *gin.Context) interface{} {
topic := c.Query("topic")
url := c.Query("url")
return Unsubscribe(topic, url)
}
func deleteTopic(c *gin.Context) interface{} {
topic := c.Param("topicName")
if topic == "" {
return errors.New("empty topic")
}
return GetStore().DeleteKV(topicsCat, topic)
}

1
dtmsvr/config/config.go

@ -98,6 +98,7 @@ type Type struct {
LogLevel string `yaml:"LogLevel" default:"info"`
Log Log `yaml:"Log"`
TimeZoneOffset string `yaml:"TimeZoneOffset"`
ConfigUpdateInterval int64 `yaml:"ConfigUpdateInterval" default:"3"`
}
// Config config

13
dtmsvr/cron.go

@ -49,6 +49,19 @@ func CronExpiredTrans(num int) {
}
}
// CronUpdateTopicsMap cron updates topics map
func CronUpdateTopicsMap() {
for {
time.Sleep(time.Duration(conf.ConfigUpdateInterval) * time.Second)
cronUpdateTopicsMapOnce()
}
}
func cronUpdateTopicsMapOnce() {
defer handlePanic(nil)
updateTopicsMap()
}
func lockOneTrans(expireIn time.Duration) *TransGlobal {
global := GetStore().LockOneGlobalTrans(expireIn)
if global == nil {

142
dtmsvr/storage/boltdb/boltdb.go

@ -174,10 +174,12 @@ func cleanupIndexWithGids(t *bolt.Tx, gids map[string]struct{}) {
var bucketGlobal = []byte("global")
var bucketBranches = []byte("branches")
var bucketIndex = []byte("index")
var bucketKV = []byte("kv")
var allBuckets = [][]byte{
bucketBranches,
bucketGlobal,
bucketIndex,
bucketKV,
}
func tGetGlobal(t *bolt.Tx, gid string) *storage.TransGlobalStore {
@ -246,6 +248,30 @@ func tPutIndex(t *bolt.Tx, unix int64, gid string) {
dtmimp.E2P(err)
}
func tGetKV(t *bolt.Tx, cat, key string) *storage.KVStore {
k := fmt.Sprintf("%s-%s", cat, key)
kv := storage.KVStore{}
res := t.Bucket(bucketKV).Get([]byte(k))
if res == nil {
return nil
}
dtmimp.MustUnmarshal(res, &kv)
return &kv
}
func tPutKV(t *bolt.Tx, kv *storage.KVStore) {
k := fmt.Sprintf("%s-%s", kv.Cat, kv.K)
kvJSON := dtmimp.MustMarshal(kv)
err := t.Bucket(bucketKV).Put([]byte(k), kvJSON)
dtmimp.E2P(err)
}
func tDelKV(t *bolt.Tx, cat, key string) {
k := fmt.Sprintf("%s-%s", cat, key)
err := t.Bucket(bucketKV).Delete([]byte(k))
dtmimp.E2P(err)
}
// Ping execs ping cmd to boltdb
func (s *Store) Ping() error {
return nil
@ -258,13 +284,15 @@ func (s *Store) PopulateData(skipDrop bool) {
dtmimp.E2P(t.DeleteBucket(bucketIndex))
dtmimp.E2P(t.DeleteBucket(bucketBranches))
dtmimp.E2P(t.DeleteBucket(bucketGlobal))
dtmimp.E2P(t.DeleteBucket(bucketKV))
_, err := t.CreateBucket(bucketIndex)
dtmimp.E2P(err)
_, err = t.CreateBucket(bucketBranches)
dtmimp.E2P(err)
_, err = t.CreateBucket(bucketGlobal)
dtmimp.E2P(err)
_, err = t.CreateBucket(bucketKV)
dtmimp.E2P(err)
return nil
})
dtmimp.E2P(err)
@ -447,3 +475,115 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in
})
return
}
// ScanKV lists KV pairs
func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore {
kvs := []storage.KVStore{}
err := s.boltDb.View(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketKV).Cursor()
for k, v := cursor.Seek([]byte(*position)); k != nil; k, v = cursor.Next() {
if string(k) == *position {
continue
}
if !strings.HasPrefix(string(k), cat) {
continue
}
kv := storage.KVStore{}
dtmimp.MustUnmarshal(v, &kv)
kvs = append(kvs, kv)
if len(kvs) == int(limit) {
break
}
}
return nil
})
dtmimp.E2P(err)
if len(kvs) < int(limit) {
*position = ""
} else {
*position = fmt.Sprintf("%s-%s", cat, kvs[len(kvs)-1].K)
}
return kvs
}
// FindKV finds key-value pairs
func (s *Store) FindKV(cat, key string) []storage.KVStore {
kvs := []storage.KVStore{}
if cat != "" && key != "" {
err := s.boltDb.View(func(t *bolt.Tx) error {
kv := tGetKV(t, cat, key)
if kv != nil {
kvs = append(kvs, *kv)
}
return nil
})
dtmimp.E2P(err)
return kvs
}
err := s.boltDb.View(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketKV).Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
if !strings.HasPrefix(string(k), cat) {
continue
}
kv := storage.KVStore{}
dtmimp.MustUnmarshal(v, &kv)
kvs = append(kvs, kv)
}
return nil
})
dtmimp.E2P(err)
return kvs
}
// UpdateKV updates key-value pair
func (s *Store) UpdateKV(kv *storage.KVStore) error {
now := time.Now()
kv.UpdateTime = &now
oldVersion := kv.Version
kv.Version = oldVersion + 1
return s.boltDb.Update(func(t *bolt.Tx) error {
res := tGetKV(t, kv.Cat, kv.K)
if res == nil || res.Version != oldVersion {
return storage.ErrNotFound
}
tPutKV(t, kv)
return nil
})
}
// DeleteKV deletes key-value pair
func (s *Store) DeleteKV(cat, key string) error {
return s.boltDb.Update(func(t *bolt.Tx) error {
res := tGetKV(t, cat, key)
if res == nil {
return storage.ErrNotFound
}
tDelKV(t, cat, key)
return nil
})
}
// CreateKV creates key-value pair
func (s *Store) CreateKV(cat, key, value string) error {
now := time.Now()
kv := &storage.KVStore{
ModelBase: dtmutil.ModelBase{
CreateTime: &now,
UpdateTime: &now,
},
Cat: cat,
K: key,
V: value,
Version: 1,
}
return s.boltDb.Update(func(t *bolt.Tx) error {
res := tGetKV(t, cat, key)
if res != nil {
return storage.ErrUniqueConflict
}
tPutKV(t, kv)
return nil
})
}

129
dtmsvr/storage/redis/redis.go

@ -13,13 +13,12 @@ import (
"sync"
"time"
"github.com/go-redis/redis/v8"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr/config"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/logger"
"github.com/go-redis/redis/v8"
)
// TODO: optimize this, it's very strange to use pointer to dtmutil.Config
@ -333,6 +332,132 @@ redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2])
dtmimp.E2P(err)
}
// ScanKV lists KV pairs
func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore {
logger.Debugf("calling ScanKV: %s %s %d", cat, *position, limit)
lid := uint64(0)
if *position != "" {
lid = uint64(dtmimp.MustAtoi(*position))
}
keys, cursor, err := redisGet().Scan(ctx, lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit).Result()
dtmimp.E2P(err)
kvs := []storage.KVStore{}
if len(keys) > 0 {
values, err := redisGet().MGet(ctx, keys...).Result()
dtmimp.E2P(err)
for _, v := range values {
if v == nil {
continue
}
kv := storage.KVStore{}
dtmimp.MustUnmarshalString(v.(string), &kv)
kvs = append(kvs, kv)
}
}
if cursor > 0 {
*position = fmt.Sprintf("%d", cursor)
} else {
*position = ""
}
return kvs
}
// FindKV finds key-value pairs
func (s *Store) FindKV(cat, key string) []storage.KVStore {
var keys []string
pattern := conf.Store.RedisPrefix + "_kv_"
if cat != "" {
pattern += cat + "_"
}
if key != "" {
keys = []string{pattern + key}
} else {
lid := uint64(0)
r := redisGet().Scan(ctx, lid, pattern+"*", int64(-1))
dtmimp.E2P(r.Err())
keys, _ = r.Val()
}
kvs := []storage.KVStore{}
if len(keys) <= 0 {
return nil
}
values, err := redisGet().MGet(ctx, keys...).Result()
dtmimp.E2P(err)
for _, v := range values {
if v == nil {
continue
}
kv := storage.KVStore{}
dtmimp.MustUnmarshalString(v.(string), &kv)
kvs = append(kvs, kv)
}
return kvs
}
// UpdateKV updates key-value pair
func (s *Store) UpdateKV(kv *storage.KVStore) error {
now := time.Now()
kv.UpdateTime = &now
oldVersion := kv.Version
kv.Version = oldVersion + 1
redisKey := fmt.Sprintf("%s_kv_%s_%s", conf.Store.RedisPrefix, kv.Cat, kv.K)
args := &argList{}
args.Keys = append(args.Keys, redisKey)
args.AppendRaw(oldVersion)
args.AppendObject(kv)
_, err := callLua(args, `-- UpdateKV
local oldJson = redis.call('GET', KEYS[1])
if oldJson == false then
return 'NOT_FOUND'
end
local old = cjson.decode(oldJson)
if tostring(old.version) == ARGV[1] then
redis.call('SET', KEYS[1], ARGV[2])
else
return 'NOT_FOUND'
end
`)
return err
}
// DeleteKV deletes key-value pair
func (s *Store) DeleteKV(cat, key string) error {
affected, err := redisGet().Del(ctx, fmt.Sprintf("%s_kv_%s_%s", conf.Store.RedisPrefix, cat, key)).Result()
if err == nil && affected == 0 {
return storage.ErrNotFound
}
return err
}
// CreateKV creates key-value pair
func (s *Store) CreateKV(cat, key, value string) error {
now := time.Now()
kv := &storage.KVStore{
ModelBase: dtmutil.ModelBase{
CreateTime: &now,
UpdateTime: &now,
},
Cat: cat,
K: key,
V: value,
Version: 1,
}
redisKey := fmt.Sprintf("%s_kv_%s_%s", conf.Store.RedisPrefix, kv.Cat, kv.K)
args := &argList{}
args.Keys = append(args.Keys, redisKey)
args.AppendObject(kv)
_, err := callLua(args, `-- CreateKV
local key = redis.call('GET', KEYS[1])
if key ~= false then
return 'UNIQUE_CONFLICT'
end
redis.call('SET', KEYS[1], ARGV[1])
`)
return err
}
var (
rdb *redis.Client
once sync.Once

75
dtmsvr/storage/sql/sql.go

@ -181,6 +181,81 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in
return affected, affected == limit, err
}
// ScanKV lists KV pairs
func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore {
kvs := []storage.KVStore{}
lid := math.MaxInt64
if *position != "" {
lid = dtmimp.MustAtoi(*position)
}
dbr := dbGet().Must().Where("cat = ? and id < ?", cat, lid).Order("id desc").Limit(int(limit)).Find(&kvs)
if dbr.RowsAffected < limit {
*position = ""
} else {
*position = fmt.Sprintf("%d", kvs[len(kvs)-1].ID)
}
return kvs
}
// FindKV finds key-value pairs
func (s *Store) FindKV(cat, key string) []storage.KVStore {
kvs := []storage.KVStore{}
db := dbGet().Model(&storage.KVStore{})
if cat != "" {
db = db.Where("cat=?", cat)
}
if key != "" {
db = db.Where("k=?", key)
}
db.Find(&kvs)
return kvs
}
// UpdateKV updates key-value pair
func (s *Store) UpdateKV(kv *storage.KVStore) error {
now := time.Now()
kv.UpdateTime = &now
oldVersion := kv.Version
kv.Version = oldVersion + 1
dbr := dbGet().Model(&storage.KVStore{}).Where("id=? and version=?", kv.ID, oldVersion).
Updates(kv)
if dbr.Error == nil && dbr.RowsAffected == 0 {
return storage.ErrNotFound
}
return dbr.Error
}
// DeleteKV deletes key-value pair
func (s *Store) DeleteKV(cat, key string) error {
dbr := dbGet().Where("cat=? and k=?", cat, key).Delete(&storage.KVStore{})
if dbr.Error == nil && dbr.RowsAffected == 0 {
return storage.ErrNotFound
}
return dbr.Error
}
// CreateKV creates key-value pair
func (s *Store) CreateKV(cat, key, value string) error {
now := time.Now()
kv := &storage.KVStore{
ModelBase: dtmutil.ModelBase{
CreateTime: &now,
UpdateTime: &now,
},
Cat: cat,
K: key,
V: value,
Version: 1,
}
dbr := dbGet().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(kv)
if dbr.Error == nil && dbr.RowsAffected == 0 {
return storage.ErrUniqueConflict
}
return dbr.Error
}
// SetDBConn sets db conn pool
func SetDBConn(db *gorm.DB) {
sqldb, _ := db.DB()

5
dtmsvr/storage/store.go

@ -31,4 +31,9 @@ type Store interface {
TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time)
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error)
ScanKV(cat string, position *string, limit int64) []KVStore
FindKV(cat, key string) []KVStore
UpdateKV(kv *KVStore) error
DeleteKV(cat, key string) error
CreateKV(cat, key, value string) error
}

14
dtmsvr/storage/trans.go

@ -80,3 +80,17 @@ func (b *TransBranchStore) TableName() string {
func (b *TransBranchStore) String() string {
return dtmimp.MustMarshalString(*b)
}
//KVStore defines Key-Value storage info
type KVStore struct {
dtmutil.ModelBase
Cat string `json:"cat"`
K string `json:"k"`
V string `json:"v"`
Version uint64 `json:"version"`
}
// TableName TableName
func (k *KVStore) TableName() string {
return "kv"
}

2
dtmsvr/svr.go

@ -69,6 +69,8 @@ func StartSvr() *gin.Engine {
for i := 0; i < int(conf.UpdateBranchAsyncGoroutineNum); i++ {
go updateBranchAsync()
}
updateTopicsMap()
go CronUpdateTopicsMap()
time.Sleep(100 * time.Millisecond)
err = dtmdriver.Use(conf.MicroService.Driver)

115
dtmsvr/topics.go

@ -0,0 +1,115 @@
package dtmsvr
import (
"errors"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/client/dtmcli/logger"
)
const (
topicsCat = "topics"
)
var topicsMap = map[string]Topic{}
//Topic define topic info
type Topic struct {
Name string `json:"k"`
Subscribers []Subscriber `json:"v"`
Version uint64 `json:"version"`
}
//Subscriber define subscriber info
type Subscriber struct {
URL string `json:"url"`
Remark string `json:"remark"`
}
func topic2urls(topic string) []string {
urls := make([]string, len(topicsMap[topic].Subscribers))
for k, subscriber := range topicsMap[topic].Subscribers {
urls[k] = subscriber.URL
}
return urls
}
// Subscribe subscribes topic, create topic if not exist
func Subscribe(topic, url, remark string) error {
if topic == "" {
return errors.New("empty topic")
}
if url == "" {
return errors.New("empty url")
}
newSubscriber := Subscriber{
URL: url,
Remark: remark,
}
kvs := GetStore().FindKV(topicsCat, topic)
if len(kvs) == 0 {
return GetStore().CreateKV(topicsCat, topic, dtmimp.MustMarshalString([]Subscriber{newSubscriber}))
}
subscribers := []Subscriber{}
dtmimp.MustUnmarshalString(kvs[0].V, &subscribers)
for _, subscriber := range subscribers {
if subscriber.URL == url {
return errors.New("this url exists")
}
}
subscribers = append(subscribers, newSubscriber)
kvs[0].V = dtmimp.MustMarshalString(subscribers)
return GetStore().UpdateKV(&kvs[0])
}
// Unsubscribe unsubscribes the topic
func Unsubscribe(topic, url string) error {
if topic == "" {
return errors.New("empty topic")
}
if url == "" {
return errors.New("empty url")
}
kvs := GetStore().FindKV(topicsCat, topic)
if len(kvs) == 0 {
return errors.New("no such a topic")
}
subscribers := []Subscriber{}
dtmimp.MustUnmarshalString(kvs[0].V, &subscribers)
if len(subscribers) == 0 {
return errors.New("this topic is empty")
}
n := len(subscribers)
for k, subscriber := range subscribers {
if subscriber.URL == url {
subscribers = append(subscribers[:k], subscribers[k+1:]...)
break
}
}
if len(subscribers) == n {
return errors.New("no such an url ")
}
kvs[0].V = dtmimp.MustMarshalString(subscribers)
return GetStore().UpdateKV(&kvs[0])
}
// updateTopicsMap updates the topicsMap variable, unsafe for concurrent
func updateTopicsMap() {
kvs := GetStore().FindKV(topicsCat, "")
for _, kv := range kvs {
topic := topicsMap[kv.K]
if topic.Version >= kv.Version {
continue
}
newTopic := Topic{}
newTopic.Name = kv.K
newTopic.Version = kv.Version
dtmimp.MustUnmarshalString(kv.V, &newTopic.Subscribers)
topicsMap[kv.K] = newTopic
logger.Infof("topic updated. old topic:%v new topic:%v", topicsMap[kv.K], newTopic)
}
logger.Infof("all topic updated. topic:%v", topicsMap)
}

24
dtmsvr/trans_type_msg.go

@ -3,6 +3,7 @@ package dtmsvr
import (
"errors"
"fmt"
"strings"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@ -20,15 +21,22 @@ func init() {
func (t *transMsgProcessor) GenBranches() []TransBranch {
branches := []TransBranch{}
for i, step := range t.Steps {
b := &TransBranch{
Gid: t.Gid,
BranchID: fmt.Sprintf("%02d", i+1),
BinData: t.BinPayloads[i],
URL: step[dtmimp.OpAction],
Op: dtmimp.OpAction,
Status: dtmcli.StatusPrepared,
mayTopic := strings.TrimPrefix(step[dtmimp.OpAction], dtmimp.MsgTopicPrefix)
urls := dtmimp.If(mayTopic == step[dtmimp.OpAction], []string{mayTopic}, topic2urls(mayTopic)).([]string)
if len(urls) == 0 {
e2p(errors.New("topic not found"))
}
for j, url := range urls {
b := TransBranch{
Gid: t.Gid,
BranchID: fmt.Sprintf("%02d%s", i+1, dtmimp.If(len(urls) == 1, "", fmt.Sprintf("-%02d", j+1)).(string)),
BinData: t.BinPayloads[i],
URL: url,
Op: dtmimp.OpAction,
Status: dtmcli.StatusPrepared,
}
branches = append(branches, b)
}
branches = append(branches, *b)
}
return branches
}

1
go.mod

@ -13,6 +13,7 @@ require (
github.com/dtm-labs/dtmdriver-springcloud v1.2.3
github.com/dtm-labs/logger v0.0.1
github.com/gin-gonic/gin v1.7.7
github.com/go-errors/errors v1.4.2
github.com/go-redis/redis/v8 v8.11.5
github.com/go-resty/resty/v2 v2.7.0
github.com/go-sql-driver/mysql v1.6.0

14
sqls/dtmsvr.storage.postgres.sql

@ -44,3 +44,17 @@ CREATE TABLE IF NOT EXISTS trans_branch_op (
PRIMARY KEY (id),
CONSTRAINT gid_branch_uniq UNIQUE (gid, branch_id, op)
);
drop table IF EXISTS kv;
CREATE SEQUENCE if not EXISTS kv_seq;
CREATE TABLE IF NOT EXISTS kv (
id bigint NOT NULL DEFAULT NEXTVAL ('kv_seq'),
cat varchar(45) NOT NULL,
k varchar(128) NOT NULL,
v TEXT,
version bigint default 1,
create_time timestamp(0) with time zone DEFAULT NULL,
update_time timestamp(0) with time zone DEFAULT NULL,
PRIMARY KEY (id),
CONSTRAINT uniq_k UNIQUE(cat, k)
);

60
test/api_test.go

@ -88,6 +88,66 @@ func TestAPIAll(t *testing.T) {
dtmimp.MustUnmarshalString(resp.String(), &m)
nextPos3 := m["next_position"].(string)
assert.Equal(t, "", nextPos3)
//fmt.Printf("pos1:%s,pos2:%s,pos3:%s", nextPos, nextPos2, nextPos3)
}
func TestAPIScanKV(t *testing.T) {
for i := 0; i < 3; i++ { // add three
assert.Nil(t, httpSubscribe("test_topic"+fmt.Sprintf("%d", i), "http://dtm/test1"))
}
resp, err := dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"cat": "topics",
"limit": "1",
}).Get(dtmutil.DefaultHTTPServer + "/scanKV")
assert.Nil(t, err)
m := map[string]interface{}{}
dtmimp.MustUnmarshalString(resp.String(), &m)
nextPos := m["next_position"].(string)
assert.NotEqual(t, "", nextPos)
resp, err = dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"cat": "topics",
"limit": "1",
"position": nextPos,
}).Get(dtmutil.DefaultHTTPServer + "/scanKV")
assert.Nil(t, err)
dtmimp.MustUnmarshalString(resp.String(), &m)
nextPos2 := m["next_position"].(string)
assert.NotEqual(t, "", nextPos2)
assert.NotEqual(t, nextPos, nextPos2)
resp, err = dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"cat": "topics",
"limit": "1000",
"position": nextPos,
}).Get(dtmutil.DefaultHTTPServer + "/scanKV")
assert.Nil(t, err)
dtmimp.MustUnmarshalString(resp.String(), &m)
nextPos3 := m["next_position"].(string)
assert.Equal(t, "", nextPos3)
}
func TestAPIQueryKV(t *testing.T) {
m := map[string]interface{}{}
// normal
assert.Nil(t, httpSubscribe("test_topic_TestAPIQueryKV", "http://dtm/test1"))
resp, err := dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"cat": "topics",
"key": "test_topic_TestAPIQueryKV",
}).Get(dtmutil.DefaultHTTPServer + "/queryKV")
assert.Nil(t, err)
dtmimp.MustUnmarshalString(resp.String(), &m)
assert.Equal(t, 1, len(m["kv"].([]interface{})))
// query non_existent topic
resp, err = dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"cat": "topics",
"key": "non_existent_topic_TestAPIQueryKV",
}).Get(dtmutil.DefaultHTTPServer + "/queryKV")
assert.Nil(t, err)
dtmimp.MustUnmarshalString(resp.String(), &m)
assert.Equal(t, 0, len(m["kv"].([]interface{})))
}
func TestDtmMetrics(t *testing.T) {

5
test/main_test.go

@ -30,6 +30,7 @@ func TestMain(m *testing.M) {
dtmsvr.NowForwardDuration = 0 * time.Second
dtmsvr.CronForwardDuration = 180 * time.Second
conf.UpdateBranchSync = 1
conf.ConfigUpdateInterval = 1
dtmdriver.Middlewares.HTTP = append(dtmdriver.Middlewares.HTTP, busi.SetHTTPHeaderForHeadersYes)
dtmdriver.Middlewares.Grpc = append(dtmdriver.Middlewares.Grpc, busi.SetGrpcHeaderForHeadersYes)
@ -69,6 +70,10 @@ func TestMain(m *testing.M) {
workflow.InitGrpc(dtmutil.DefaultGrpcServer, busi.BusiGrpc, gsvr)
go busi.RunGrpc(gsvr)
go busi.RunHTTP(hsvr)
subscribeTopic()
subscribeGrpcTopic()
r := m.Run()
if r != 0 {
os.Exit(r)

12
test/msg_grpc_test.go

@ -9,6 +9,7 @@ package test
import (
"fmt"
"testing"
"time"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@ -61,8 +62,15 @@ func TestMsgGrpcTimeoutFailed(t *testing.T) {
func genGrpcMsg(gid string) *dtmgrpc.MsgGrpc {
req := &busi.ReqGrpc{Amount: 30}
msg := dtmgrpc.NewMsgGrpc(dtmutil.DefaultGrpcServer, gid).
Add(busi.BusiGrpc+"/busi.Busi/TransOut", req).
Add(busi.BusiGrpc+"/busi.Busi/TransIn", req)
AddTopic("grpc_trans", req)
msg.QueryPrepared = fmt.Sprintf("%s/busi.Busi/QueryPrepared", busi.BusiGrpc)
return msg
}
func subscribeGrpcTopic() {
e2p(grpcSubscribe("grpc_trans", busi.BusiGrpc+"/busi.Busi/TransOut"))
e2p(grpcSubscribe("grpc_trans", busi.BusiGrpc+"/busi.Busi/TransIn"))
// wait for the topic configuration to take effect
time.Sleep(time.Second * time.Duration(conf.ConfigUpdateInterval+1))
}

22
test/msg_test.go

@ -7,7 +7,9 @@
package test
import (
"strings"
"testing"
"time"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@ -67,11 +69,27 @@ func TestMsgAbnormal(t *testing.T) {
assert.Error(t, err)
}
func TestMsgTopicNotFoundFailed(t *testing.T) {
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, dtmimp.GetFuncName()).
AddTopic("non_existent_topic_TestMsgTopicNotFoundFailed", &req)
msg.QueryPrepared = busi.Busi + "/QueryPrepared"
assert.True(t, strings.Contains(msg.Submit().Error(), "topic not found"))
}
func genMsg(gid string) *dtmcli.Msg {
req := busi.GenReqHTTP(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid).
Add(busi.Busi+"/TransOut", &req).
Add(busi.Busi+"/TransIn", &req)
AddTopic("http_trans", &req)
msg.QueryPrepared = busi.Busi + "/QueryPrepared"
return msg
}
func subscribeTopic() {
e2p(httpSubscribe("http_trans", busi.Busi+"/TransOut"))
e2p(httpSubscribe("http_trans", busi.Busi+"/TransIn"))
// wait for the topic configuration to take effect
time.Sleep(time.Second * time.Duration(conf.ConfigUpdateInterval+1))
}

151
test/topic_test.go

@ -0,0 +1,151 @@
package test
import (
"context"
"strconv"
"sync"
"testing"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgpb"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
const (
testTopicTestTopicNormal = "test_topic_TestTopicNormal"
testTopicTestConcurrentUpdateTopic = "concurrent_topic_TestConcurrentUpdateTopic"
)
func TestTopicNormal(t *testing.T) {
testSubscribe(t, httpSubscribe)
testUnsubscribe(t, httpUnsubscribe)
testDeleteTopic(t, httpDeleteTopic)
testSubscribe(t, grpcSubscribe)
testUnsubscribe(t, grpcUnsubscribe)
testDeleteTopic(t, grpcDeleteTopic)
}
func TestConcurrentUpdateTopic(t *testing.T) {
var wg sync.WaitGroup
var urls []string
var errNum int
concurrentTimes := 20
// concurrently updates the topic, part of them succeed
for i := 0; i < concurrentTimes; i++ {
wg.Add(1)
go func(i int) {
url := "http://dtm/test" + strconv.Itoa(i)
err := httpSubscribe(testTopicTestConcurrentUpdateTopic, url)
if err == nil {
urls = append(urls, url)
} else {
errNum++
}
wg.Done()
}(i)
}
wg.Wait()
assert.True(t, len(urls) > 0)
// delete successfully subscribed urls above, all of them should succeed
for _, url := range urls {
assert.Nil(t, httpUnsubscribe(testTopicTestConcurrentUpdateTopic, url))
}
// finally, the topic version should be correct
m := map[string]interface{}{}
resp, err := dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"cat": "topics",
"key": testTopicTestConcurrentUpdateTopic,
}).Get(dtmutil.DefaultHTTPServer + "/queryKV")
assert.Nil(t, err)
dtmimp.MustUnmarshalString(resp.String(), &m)
dtmimp.MustRemarshal(m["kv"].([]interface{})[0], &m)
assert.Equal(t, float64((concurrentTimes-errNum)*2), m["version"])
}
func testSubscribe(t *testing.T, subscribe func(topic, url string) error) {
assert.Nil(t, subscribe(testTopicTestTopicNormal, "http://dtm/test1"))
assert.Error(t, subscribe(testTopicTestTopicNormal, "http://dtm/test1")) // error:repeat subscription
assert.Error(t, subscribe("", "http://dtm/test1")) // error:empty topic
assert.Error(t, subscribe(testTopicTestTopicNormal, "")) // error:empty url
assert.Nil(t, subscribe(testTopicTestTopicNormal, "http://dtm/test2"))
}
func testUnsubscribe(t *testing.T, unsubscribe func(topic, url string) error) {
assert.Nil(t, unsubscribe(testTopicTestTopicNormal, "http://dtm/test1"))
assert.Error(t, unsubscribe(testTopicTestTopicNormal, "http://dtm/test1")) // error:repeat unsubscription
assert.Error(t, unsubscribe("", "http://dtm/test1")) // error:empty topic
assert.Error(t, unsubscribe(testTopicTestTopicNormal, "")) // error:empty url
assert.Error(t, unsubscribe("non_existent_topic", "http://dtm/test1")) // error:unsubscribe a non-existent topic
assert.Nil(t, unsubscribe(testTopicTestTopicNormal, "http://dtm/test2"))
assert.Error(t, unsubscribe(testTopicTestTopicNormal, "http://dtm/test2"))
}
func testDeleteTopic(t *testing.T, deleteTopic func(topic string) error) {
assert.Error(t, deleteTopic("non_existent_testDeleteTopic"))
assert.Nil(t, deleteTopic(testTopicTestTopicNormal))
}
func httpSubscribe(topic, url string) error {
resp, err := dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"topic": topic,
"url": url,
"remark": "for test",
}).Get(dtmutil.DefaultHTTPServer + "/subscribe")
e2p(err)
if resp.StatusCode() != 200 {
err = errors.Errorf("Http Request Error. Resp:%v", resp.String())
}
return err
}
func httpUnsubscribe(topic, url string) error {
resp, err := dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"topic": topic,
"url": url,
}).Get(dtmutil.DefaultHTTPServer + "/unsubscribe")
e2p(err)
if resp.StatusCode() != 200 {
err = errors.Errorf("Http Request Error. Resp:%+v", resp.String())
}
return err
}
func httpDeleteTopic(topic string) error {
resp, err := dtmcli.GetRestyClient().R().Delete(dtmutil.DefaultHTTPServer + "/topic/" + topic)
e2p(err)
if resp.StatusCode() != 200 {
err = errors.Errorf("Http Request Error. Resp:%+v", resp.String())
}
return err
}
func grpcSubscribe(topic, url string) error {
_, err := dtmgimp.MustGetDtmClient(dtmutil.DefaultGrpcServer).Subscribe(context.Background(),
&dtmgpb.DtmTopicRequest{
Topic: topic,
URL: url,
Remark: "for test"})
return err
}
func grpcUnsubscribe(topic, url string) error {
_, err := dtmgimp.MustGetDtmClient(dtmutil.DefaultGrpcServer).Unsubscribe(context.Background(),
&dtmgpb.DtmTopicRequest{
Topic: topic,
URL: url})
return err
}
func grpcDeleteTopic(topic string) error {
_, err := dtmgimp.MustGetDtmClient(dtmutil.DefaultGrpcServer).DeleteTopic(context.Background(),
&dtmgpb.DtmTopicRequest{
Topic: topic})
return err
}
Loading…
Cancel
Save