Browse Source

完成基本API

yjp 1 year ago
parent
commit
3e7c118f18
8 changed files with 667 additions and 109 deletions
  1. 428 0
      api.go
  2. 23 0
      config_params.go
  3. 6 2
      go.mod
  4. 10 3
      go.sum
  5. 173 0
      message_receiver.go
  6. 23 0
      response_message.go
  7. 4 0
      response_user.go
  8. 0 104
      sdk.go

+ 428 - 0
api.go

@@ -0,0 +1,428 @@
+package ups_sdk
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"github.com/go-resty/resty/v2"
+	"net/http"
+	"net/url"
+	"path"
+	"strconv"
+)
+
+const (
+	registerUserUrl       = "/ups/api/v1/user/register"
+	deleteUserUrl         = "/ups/api/v1/user/delete"
+	identifyUrl           = "/ups/api/v1/user/identify"
+	updateUserUserNameUrl = "/ups/api/v1/user/userName/update"
+	updateUserPasswordUrl = "/ups/api/v1/user/password/update"
+	mergeUserUrl          = "/ups/api/v1/user/merge"
+	getUsersUrl           = "/ups/api/v1/user/query"
+	getMessagesUrl        = "/ups/api/v1/message/query"
+)
+
+type API struct {
+	baseUrl    string
+	httpClient *resty.Client
+}
+
+// NewAPI 创建API
+func NewAPI(baseUrl string) (*API, error) {
+	_, err := url.ParseRequestURI(baseUrl)
+	if err != nil {
+		return nil, err
+	}
+
+	return &API{
+		baseUrl:    baseUrl,
+		httpClient: resty.New().SetTimeout(HttpClientTimeout),
+	}, nil
+}
+
+type SaveUserTransactionFunc func() error
+type SaveUserRollbackFunc func() error
+type SaveUserCommitFunc func() error
+
+type RegisterUserFlowParams struct {
+	UserName                string
+	Password                string
+	SaveUserTransactionFunc SaveUserTransactionFunc
+	SaveUserRollbackFunc    SaveUserRollbackFunc
+	SaveUserCommitFunc      SaveUserCommitFunc
+}
+
+func (params *RegisterUserFlowParams) Check() error {
+	if IsStringEmpty(params.UserName) {
+		return errors.New("没有传递用户名")
+	}
+
+	if IsStringEmpty(params.Password) {
+		return errors.New("没有传递密码")
+	}
+
+	if params.SaveUserTransactionFunc == nil {
+		return errors.New("没有传递保存用户名事务函数")
+	}
+
+	if params.SaveUserRollbackFunc == nil {
+		return errors.New("没有传递保存用户名事务回滚函数")
+	}
+
+	if params.SaveUserCommitFunc == nil {
+		return errors.New("没有传递保存用户名事务回滚函数")
+	}
+
+	return nil
+}
+
+// RegisterUserFlow 注册用户流程
+func (api *API) RegisterUserFlow(params *RegisterUserFlowParams) error {
+
+	return nil
+}
+
+// RegisterUser 注册用户
+func (api *API) RegisterUser(userName string, password string) error {
+	if IsStringEmpty(userName) {
+		return errors.New("没有传递用户名")
+	}
+
+	if IsStringEmpty(password) {
+		return errors.New("没有传递密码")
+	}
+
+	return api.registerUser(userName, password)
+}
+
+// DeleteUser 删除用户
+func (api *API) DeleteUser(userName string) error {
+	if IsStringEmpty(userName) {
+		return errors.New("没有传递用户名")
+	}
+
+	return api.deleteUser(userName)
+}
+
+// Identify 认证用户
+func (api *API) Identify(userName string, password string) error {
+	if IsStringEmpty(userName) {
+		return errors.New("没有传递用户名")
+	}
+
+	if IsStringEmpty(password) {
+		return errors.New("没有传递密码")
+	}
+
+	return api.identify(userName, password)
+}
+
+// UpdateUserUserName 修改用户用户名
+func (api *API) UpdateUserUserName(oldUserName string, newUserName string) error {
+	if IsStringEmpty(oldUserName) {
+		return errors.New("没有传递旧用户名")
+	}
+
+	if IsStringEmpty(newUserName) {
+		return errors.New("没有传递新用户名")
+	}
+
+	return api.updateUserUserName(oldUserName, newUserName)
+}
+
+// UpdateUserPassword 修改用户密码
+func (api *API) UpdateUserPassword(userName string, newPassword string) error {
+	if IsStringEmpty(userName) {
+		return errors.New("没有传递用户名")
+	}
+
+	if IsStringEmpty(newPassword) {
+		return errors.New("没有传递新密码")
+	}
+
+	return api.updateUserPassword(userName, newPassword)
+}
+
+// MergeUser 合并用户,实际作用校验from和to用户存在性,保留to用户,删除from用户
+func (api *API) MergeUser(fromUserName string, toUserName string) error {
+	if IsStringEmpty(fromUserName) {
+		return errors.New("没有传递被合并的用户名")
+	}
+
+	if IsStringEmpty(toUserName) {
+		return errors.New("没有传递合并到的用户名")
+	}
+
+	return api.mergeUser(fromUserName, toUserName)
+}
+
+// GetUsers 查询用户
+func (api *API) GetUsers(userName string, pageNo int, pageSize int) (*QueryUserResult, error) {
+	return api.getUsers(userName, pageNo, pageSize)
+}
+
+// GetMessages 查询消息
+func (api *API) GetMessages(topic string, startSendTime string, endSendTime string, pageNo int, pageSize int) (*QueryMessageResult, error) {
+	return api.getMessages(topic, startSendTime, endSendTime, pageNo, pageSize)
+}
+
+func (api *API) registerUser(userName string, password string) error {
+	requestUrl := path.Join(api.baseUrl, registerUserUrl)
+
+	resp, err := api.httpClient.R().
+		SetHeader("Content-Type", "application/json").
+		SetBody(&RegisterUserRequest{
+			UserName: userName,
+			Password: password,
+		}).
+		Post(requestUrl)
+	if err != nil {
+		return err
+	}
+
+	if resp.IsError() {
+		return fmt.Errorf("请求失败: URL %s, Method %s, Status Code %d, Status %s\n",
+			requestUrl, http.MethodPost, resp.StatusCode(), resp.Status())
+	}
+
+	response := new(RegisterUserResponse)
+	err = json.Unmarshal(resp.Body(), response)
+	if err != nil {
+		return err
+	}
+
+	if !response.Success {
+		return fmt.Errorf("请求失败: URL %s, Method: %s, Error %s\n",
+			requestUrl, http.MethodPost, response.Msg)
+	}
+
+	return nil
+}
+
+func (api *API) deleteUser(userName string) error {
+	requestUrl := path.Join(api.baseUrl, deleteUserUrl)
+
+	resp, err := api.httpClient.R().
+		SetHeader("Content-Type", "application/json").
+		SetQueryParam("userName", userName).
+		Delete(requestUrl)
+	if err != nil {
+		return err
+	}
+
+	if resp.IsError() {
+		return fmt.Errorf("请求失败: URL %s, Method: %s, Status Code %d, Status %s\n",
+			requestUrl, http.MethodDelete, resp.StatusCode(), resp.Status())
+	}
+
+	response := new(DeleteUserResponse)
+	err = json.Unmarshal(resp.Body(), response)
+	if err != nil {
+		return err
+	}
+
+	if !response.Success {
+		return fmt.Errorf("请求失败: URL %s, Method: %s, Error %s\n",
+			requestUrl, http.MethodDelete, response.Msg)
+	}
+
+	return nil
+}
+
+func (api *API) identify(userName string, password string) error {
+	requestUrl := path.Join(api.baseUrl, identifyUrl)
+
+	resp, err := api.httpClient.R().
+		SetHeader("Content-Type", "application/json").
+		SetBody(&IdentifyRequest{
+			UserName: userName,
+			Password: password,
+		}).
+		Post(requestUrl)
+	if err != nil {
+		return err
+	}
+
+	if resp.IsError() {
+		return fmt.Errorf("请求失败: URL %s, Method %s, Status Code %d, Status %s\n",
+			requestUrl, http.MethodPost, resp.StatusCode(), resp.Status())
+	}
+
+	response := new(IdentifyResponse)
+	err = json.Unmarshal(resp.Body(), response)
+	if err != nil {
+		return err
+	}
+
+	if !response.Success {
+		return fmt.Errorf("请求失败: URL %s, Method: %s, Error %s\n",
+			requestUrl, http.MethodPost, response.Msg)
+	}
+
+	return nil
+}
+
+func (api *API) updateUserUserName(oldUserName string, newUserName string) error {
+	requestUrl := path.Join(api.baseUrl, updateUserUserNameUrl)
+
+	resp, err := api.httpClient.R().
+		SetHeader("Content-Type", "application/json").
+		SetBody(&UpdateUserUserNameRequest{
+			OldUserName: oldUserName,
+			NewUserName: newUserName,
+		}).
+		Put(requestUrl)
+	if err != nil {
+		return err
+	}
+
+	if resp.IsError() {
+		return fmt.Errorf("请求失败: URL %s, Method %s, Status Code %d, Status %s\n",
+			requestUrl, http.MethodPut, resp.StatusCode(), resp.Status())
+	}
+
+	response := new(UpdateUserUserNameResponse)
+	err = json.Unmarshal(resp.Body(), response)
+	if err != nil {
+		return err
+	}
+
+	if !response.Success {
+		return fmt.Errorf("请求失败: URL %s, Method: %s, Error %s\n",
+			requestUrl, http.MethodPut, response.Msg)
+	}
+
+	return nil
+}
+
+func (api *API) updateUserPassword(userName string, newPassword string) error {
+	requestUrl := path.Join(api.baseUrl, updateUserPasswordUrl)
+
+	resp, err := api.httpClient.R().
+		SetHeader("Content-Type", "application/json").
+		SetBody(&UpdateUserPasswordRequest{
+			UserName:    userName,
+			NewPassword: newPassword,
+		}).
+		Put(requestUrl)
+	if err != nil {
+		return err
+	}
+
+	if resp.IsError() {
+		return fmt.Errorf("请求失败: URL %s, Method %s, Status Code %d, Status %s\n",
+			requestUrl, http.MethodPut, resp.StatusCode(), resp.Status())
+	}
+
+	response := new(UpdateUserPasswordResponse)
+	err = json.Unmarshal(resp.Body(), response)
+	if err != nil {
+		return err
+	}
+
+	if !response.Success {
+		return fmt.Errorf("请求失败: URL %s, Method: %s, Error %s\n",
+			requestUrl, http.MethodPut, response.Msg)
+	}
+
+	return nil
+}
+
+func (api *API) mergeUser(fromUserName string, toUserName string) error {
+	requestUrl := path.Join(api.baseUrl, mergeUserUrl)
+
+	resp, err := api.httpClient.R().
+		SetHeader("Content-Type", "application/json").
+		SetBody(&MergeUserRequest{
+			FromUserName: fromUserName,
+			ToUserName:   toUserName,
+		}).
+		Post(requestUrl)
+	if err != nil {
+		return err
+	}
+
+	if resp.IsError() {
+		return fmt.Errorf("请求失败: URL %s, Method %s, Status Code %d, Status %s\n",
+			requestUrl, http.MethodPost, resp.StatusCode(), resp.Status())
+	}
+
+	response := new(MergeUserResponse)
+	err = json.Unmarshal(resp.Body(), response)
+	if err != nil {
+		return err
+	}
+
+	if !response.Success {
+		return fmt.Errorf("请求失败: URL %s, Method: %s, Error %s\n",
+			requestUrl, http.MethodPost, response.Msg)
+	}
+
+	return nil
+}
+
+func (api *API) getUsers(userName string, pageNo int, pageSize int) (*QueryUserResult, error) {
+	requestUrl := path.Join(api.baseUrl, getUsersUrl)
+
+	resp, err := api.httpClient.R().
+		SetHeader("Content-Type", "application/json").
+		SetQueryParam("userName", userName).
+		SetQueryParam("pageNo", strconv.Itoa(pageNo)).
+		SetQueryParam("pageSize", strconv.Itoa(pageSize)).
+		Get(requestUrl)
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.IsError() {
+		return nil, fmt.Errorf("请求失败: URL %s, Method %s, Status Code %d, Status %s\n",
+			requestUrl, http.MethodGet, resp.StatusCode(), resp.Status())
+	}
+
+	response := new(GetUsersResponse)
+	err = json.Unmarshal(resp.Body(), response)
+	if err != nil {
+		return nil, err
+	}
+
+	if !response.Success {
+		return nil, fmt.Errorf("请求失败: URL %s, Method: %s, Error %s\n",
+			requestUrl, http.MethodGet, response.Msg)
+	}
+
+	return &response.QueryUserResult, nil
+}
+
+func (api *API) getMessages(topic string, startSendTime string, endSendTime string, pageNo int, pageSize int) (*QueryMessageResult, error) {
+	requestUrl := path.Join(api.baseUrl, getMessagesUrl)
+
+	resp, err := api.httpClient.R().
+		SetHeader("Content-Type", "application/json").
+		SetQueryParam("topic", topic).
+		SetQueryParam("startSendTime", startSendTime).
+		SetQueryParam("endSendTime", endSendTime).
+		SetQueryParam("pageNo", strconv.Itoa(pageNo)).
+		SetQueryParam("pageSize", strconv.Itoa(pageSize)).
+		Get(requestUrl)
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.IsError() {
+		return nil, fmt.Errorf("请求失败: URL %s, Method %s, Status Code %d, Status %s\n",
+			requestUrl, http.MethodGet, resp.StatusCode(), resp.Status())
+	}
+
+	response := new(GetMessagesResponse)
+	err = json.Unmarshal(resp.Body(), response)
+	if err != nil {
+		return nil, err
+	}
+
+	if !response.Success {
+		return nil, fmt.Errorf("请求失败: URL %s, Method: %s, Error %s\n",
+			requestUrl, http.MethodGet, response.Msg)
+	}
+
+	return &response.QueryMessageResult, nil
+}

+ 23 - 0
config_params.go

@@ -0,0 +1,23 @@
+package ups_sdk
+
+import "time"
+
+var (
+	// HttpClientTimeout HTTP超时
+	HttpClientTimeout = 30 * time.Second
+
+	// MessageReceiverVisibilityTimeout 消息保持pending的时长,超过这个时长的空闲消息会被消费者再次消费
+	MessageReceiverVisibilityTimeout = 30 * time.Second
+
+	// MessageReceiverBlockingTimeout 获取消息的阻塞时长,会影响停止接收消息时的等待时长
+	MessageReceiverBlockingTimeout = 5 * time.Second
+
+	// MessageReceiverReclaimInterval 重新拉取消息的间隔
+	MessageReceiverReclaimInterval = 1 * time.Second
+
+	// MessageReceiverBufferSize 消息队列的缓冲大小,缓存多少消息
+	MessageReceiverBufferSize = 1000
+
+	// MessageReceiverConcurrency 处理消息的协程数
+	MessageReceiverConcurrency = 10
+)

+ 6 - 2
go.mod

@@ -2,7 +2,10 @@ module git.sxidc.com/service-supports/ups-sdk
 
 go 1.20
 
-require github.com/robinjoseph08/redisqueue/v2 v2.1.0
+require (
+	github.com/go-resty/resty/v2 v2.7.0
+	github.com/robinjoseph08/redisqueue/v2 v2.1.0
+)
 
 require (
 	github.com/fatih/color v1.7.0 // indirect
@@ -17,7 +20,8 @@ require (
 	github.com/pkg/errors v0.9.1 // indirect
 	github.com/tsuyoshiwada/go-gitcmd v0.0.0-20180205145712-5f1f5f9475df // indirect
 	github.com/urfave/cli v1.20.0 // indirect
-	golang.org/x/sys v0.0.0-20191010194322-b09406accb47 // indirect
+	golang.org/x/net v0.0.0-20211029224645-99673261e6eb // indirect
+	golang.org/x/sys v0.0.0-20210423082822-04245dca01da // indirect
 	golang.org/x/tools v0.0.0-20190706070813-72ffa07ba3db // indirect
 	gopkg.in/AlecAivazis/survey.v1 v1.8.5 // indirect
 	gopkg.in/kyokomi/emoji.v1 v1.5.1 // indirect

+ 10 - 3
go.sum

@@ -10,6 +10,8 @@ github.com/git-chglog/git-chglog v0.0.0-20190611050339-63a4e637021f h1:8l4Aw3Jmx
 github.com/git-chglog/git-chglog v0.0.0-20190611050339-63a4e637021f/go.mod h1:Dcsy1kii/xFyNad5JqY/d0GO5mu91sungp5xotbm3Yk=
 github.com/go-redis/redis/v7 v7.3.0 h1:3oHqd0W7f/VLKBxeYTEpqdMUsmMectngjM9OtoRoIgg=
 github.com/go-redis/redis/v7 v7.3.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
+github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
+github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
 github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
@@ -64,19 +66,24 @@ golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b/go.mod h1:6SG95UA2DQfeDnf
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
 golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20211029224645-99673261e6eb h1:pirldcYWx7rx7kE5r+9WsOXPXK0+WH5+uZ7uPmJ44uM=
+golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180606202747-9527bec2660b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY=
 golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20190706070813-72ffa07ba3db h1:9hRk1xeL9LTT3yX/941DqeBz87XgHAQuj+TbimYJuiw=
 golang.org/x/tools v0.0.0-20190706070813-72ffa07ba3db/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=

+ 173 - 0
message_receiver.go

@@ -0,0 +1,173 @@
+package ups_sdk
+
+import (
+	"encoding/json"
+	"errors"
+	"github.com/robinjoseph08/redisqueue/v2"
+	"time"
+)
+
+const (
+	topicUserRegistered      = "ups:user-registered"
+	topicUserUserNameUpdated = "ups:user-username-updated"
+	topicUserDeleted         = "ups:user-deleted"
+)
+
+type UserRegisteredCallback func(data *UserRegisteredMessageData, err error) error
+type UserUserNameUpdatedCallback func(data *UserUserNameUpdatedMessageData, err error) error
+type UserDeletedCallback func(data *UserDeletedMessageData, err error) error
+
+type MessageReceiver struct {
+	consumer                    *redisqueue.Consumer
+	userRegisteredCallback      UserRegisteredCallback
+	userUserNameUpdatedCallback UserUserNameUpdatedCallback
+	userDeletedCallback         UserDeletedCallback
+}
+
+type MessageReceiverOptions struct {
+	// 接收者名称,必传
+	ReceiverName string
+
+	// 消息组名称,必传
+	MessageGroupName string
+
+	// 用户注册消息回调,必传
+	UserRegisteredCallback UserRegisteredCallback
+
+	// 用户名更新消息回调,必传
+	UserUserNameUpdatedCallback UserUserNameUpdatedCallback
+
+	// 用户删除消息回调,必传
+	UserDeletedCallback UserDeletedCallback
+
+	// Redis配置
+	RedisOptions
+}
+
+type RedisOptions struct {
+	Address      string
+	Password     string
+	DB           int
+	MaxRetries   int
+	DialTimeout  time.Duration
+	ReadTimeout  time.Duration
+	WriteTimeout time.Duration
+}
+
+// NewMessageReceiver 创建MessageReceiver
+func NewMessageReceiver(options *MessageReceiverOptions) (*MessageReceiver, error) {
+	if IsStringEmpty(options.ReceiverName) {
+		return nil, errors.New("没有传递接受者名称")
+	}
+
+	if IsStringEmpty(options.MessageGroupName) {
+		return nil, errors.New("没有传递消息组名称")
+	}
+
+	if IsStringEmpty(options.Address) {
+		return nil, errors.New("没有传递Redis地址")
+	}
+
+	if IsStringEmpty(options.Password) {
+		return nil, errors.New("没有传递Redis密码")
+	}
+
+	if options.UserRegisteredCallback == nil ||
+		options.UserUserNameUpdatedCallback == nil ||
+		options.UserDeletedCallback == nil {
+		return nil, errors.New("没有传递需要的回调函数")
+	}
+
+	consumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
+		Name:              options.ReceiverName,
+		GroupName:         options.MessageGroupName,
+		VisibilityTimeout: MessageReceiverVisibilityTimeout,
+		BlockingTimeout:   MessageReceiverBlockingTimeout,
+		ReclaimInterval:   MessageReceiverReclaimInterval,
+		BufferSize:        MessageReceiverBufferSize,
+		Concurrency:       MessageReceiverConcurrency,
+		RedisOptions: &redisqueue.RedisOptions{
+			Addr:         options.Address,
+			Password:     options.Password,
+			DB:           options.DB,
+			MaxRetries:   options.MaxRetries,
+			DialTimeout:  options.DialTimeout,
+			ReadTimeout:  options.ReadTimeout,
+			WriteTimeout: options.WriteTimeout,
+		},
+	})
+	if err != nil {
+		return nil, err
+	}
+
+	return &MessageReceiver{
+		consumer:                    consumer,
+		userRegisteredCallback:      options.UserRegisteredCallback,
+		userUserNameUpdatedCallback: options.UserUserNameUpdatedCallback,
+		userDeletedCallback:         options.UserDeletedCallback,
+	}, nil
+}
+
+// DestroyMessageReceiver 销毁MessageReceiver
+func DestroyMessageReceiver(messageReceiver *MessageReceiver) {
+	messageReceiver.userDeletedCallback = nil
+	messageReceiver.userUserNameUpdatedCallback = nil
+	messageReceiver.userRegisteredCallback = nil
+	messageReceiver.consumer = nil
+	messageReceiver = nil
+}
+
+// StartMessageReceiver 启动消息接收
+func (receiver *MessageReceiver) StartMessageReceiver() error {
+	receiver.consumer.Register(topicUserRegistered, receiver.processMessage)
+	receiver.consumer.Register(topicUserUserNameUpdated, receiver.processMessage)
+	receiver.consumer.Register(topicUserDeleted, receiver.processMessage)
+
+	go receiver.consumer.Run()
+
+	return nil
+}
+
+// StopMessageReceiver 停止消息接收
+func (receiver *MessageReceiver) StopMessageReceiver() {
+	receiver.consumer.Shutdown()
+}
+
+func (receiver *MessageReceiver) processMessage(receivedMessage *redisqueue.Message) error {
+	msgStr := receivedMessage.Values["msg"].(string)
+
+	msg := new(Message)
+	err := json.Unmarshal([]byte(msgStr), msg)
+	if err != nil {
+		return err
+	}
+
+	switch receivedMessage.Stream {
+	case topicUserRegistered:
+		data := new(UserRegisteredMessageData)
+		err := json.Unmarshal([]byte(msg.Data), data)
+		if err != nil {
+			return receiver.userRegisteredCallback(nil, err)
+		}
+
+		return receiver.userRegisteredCallback(data, nil)
+	case topicUserUserNameUpdated:
+		data := new(UserUserNameUpdatedMessageData)
+		err := json.Unmarshal([]byte(msg.Data), data)
+		if err != nil {
+			return receiver.userUserNameUpdatedCallback(nil, err)
+		}
+
+		return receiver.userUserNameUpdatedCallback(data, nil)
+	case topicUserDeleted:
+		data := new(UserDeletedMessageData)
+		err := json.Unmarshal([]byte(msg.Data), data)
+		if err != nil {
+			return receiver.userDeletedCallback(nil, err)
+		}
+
+		return receiver.userDeletedCallback(data, nil)
+	default:
+		return errors.New("不存在的主题: " + receivedMessage.Stream)
+	}
+}

+ 23 - 0
response_message.go

@@ -2,6 +2,10 @@ package ups_sdk
 
 type GetMessagesResponse struct {
 	msgResponse
+	QueryMessageResult
+}
+
+type QueryMessageResult struct {
 	Infos      []MessageInfo `json:"infos"`
 	TotalCount int64         `json:"totalCount"`
 	PageNo     int           `json:"pageNo"`
@@ -13,3 +17,22 @@ type MessageInfo struct {
 	Data     string
 	SendTime string
 }
+
+type Message struct {
+	ID       string `json:"id"`
+	Data     string `json:"data"`
+	SendTime string `json:"sendTime"`
+}
+
+type UserRegisteredMessageData struct {
+	UserName string `json:"userName"`
+}
+
+type UserUserNameUpdatedMessageData struct {
+	OldUserName string `json:"oldUserName"`
+	NewUserName string `json:"newUserName"`
+}
+
+type UserDeletedMessageData struct {
+	UserName string `json:"userName"`
+}

+ 4 - 0
response_user.go

@@ -28,6 +28,10 @@ type MergeUserResponse struct {
 
 type GetUsersResponse struct {
 	msgResponse
+	QueryUserResult
+}
+
+type QueryUserResult struct {
 	Infos      []UserInfo `json:"infos"`
 	TotalCount int64      `json:"totalCount"`
 	PageNo     int        `json:"pageNo"`

+ 0 - 104
sdk.go

@@ -1,104 +0,0 @@
-package ups_sdk
-
-import (
-	"errors"
-	"fmt"
-	"github.com/robinjoseph08/redisqueue/v2"
-	"time"
-)
-
-var (
-	VisibilityTimeout = 30 * time.Second
-	BlockingTimeout   = 5 * time.Second
-	ReclaimInterval   = 1 * time.Second
-	BufferSize        = 1024
-	Concurrency       = 10
-)
-
-type SDK struct {
-	consumer *redisqueue.Consumer
-}
-
-func New() *SDK {
-	return &SDK{}
-}
-
-type MessageReceiverOptions struct {
-	ReceiverName     string
-	MessageGroupName string
-	RedisOptions
-}
-
-type RedisOptions struct {
-	Address      string
-	Password     string
-	DB           int
-	MaxRetries   int
-	DialTimeout  time.Duration
-	ReadTimeout  time.Duration
-	WriteTimeout time.Duration
-}
-
-func (sdk *SDK) StartMessageReceiver(options *MessageReceiverOptions) error {
-	if IsStringEmpty(options.ReceiverName) {
-		return errors.New("没有传递接受者名称")
-	}
-
-	if IsStringEmpty(options.MessageGroupName) {
-		return errors.New("没有传递消息组名称")
-	}
-
-	if IsStringEmpty(options.Address) {
-		return errors.New("没有传递Redis地址")
-	}
-
-	if IsStringEmpty(options.Password) {
-		return errors.New("没有传递Redis密码")
-	}
-
-	consumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
-		Name:              options.ReceiverName,
-		GroupName:         options.MessageGroupName,
-		VisibilityTimeout: VisibilityTimeout,
-		BlockingTimeout:   BlockingTimeout,
-		ReclaimInterval:   ReclaimInterval,
-		BufferSize:        BufferSize,
-		Concurrency:       Concurrency,
-		RedisOptions: &redisqueue.RedisOptions{
-			Addr:         options.Address,
-			Password:     options.Password,
-			DB:           options.DB,
-			MaxRetries:   options.MaxRetries,
-			DialTimeout:  options.DialTimeout,
-			ReadTimeout:  options.ReadTimeout,
-			WriteTimeout: options.WriteTimeout,
-		},
-	})
-	if err != nil {
-		return err
-	}
-
-	sdk.consumer = consumer
-
-	sdk.consumer.Register("ups:user-registered", sdk.processMessage)
-	sdk.consumer.Register("ups:user-username-updated", sdk.processMessage)
-	sdk.consumer.Register("ups:user-deleted", sdk.processMessage)
-
-	go func() {
-		for err := range sdk.consumer.Errors {
-			fmt.Printf("err: %+v\n", err)
-		}
-	}()
-
-	go sdk.consumer.Run()
-
-	return nil
-}
-
-func (sdk *SDK) StopMessageReceiver() {
-	sdk.consumer.Shutdown()
-}
-
-func (sdk *SDK) processMessage(receivedMessage *redisqueue.Message) error {
-	return nil
-}