common.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package common
  2. import (
  3. "encoding/json"
  4. "git.sxidc.com/go-tools/utils/strutils"
  5. "github.com/pkg/errors"
  6. "time"
  7. )
  8. type CloudEvent struct {
  9. SpecVersion string `json:"specversion"`
  10. ID string `json:"id"`
  11. Type string `json:"type"`
  12. Source string `json:"source"`
  13. DataContentType string `json:"datacontenttype"`
  14. Time string `json:"time"`
  15. Data string `json:"data"`
  16. }
  17. func NewCloudEvent(eventID string, eventType string, data string) *CloudEvent {
  18. return &CloudEvent{
  19. SpecVersion: "v1.0",
  20. ID: eventID,
  21. Type: eventType,
  22. Source: "baize.sxidc.com",
  23. DataContentType: "application/json",
  24. Time: time.Now().Format(time.RFC3339),
  25. Data: data,
  26. }
  27. }
  28. func UnmarshalCloudEvent(data []byte) (*CloudEvent, error) {
  29. event := new(CloudEvent)
  30. err := json.Unmarshal(data, event)
  31. if err != nil {
  32. return nil, errors.New(err.Error())
  33. }
  34. return event, nil
  35. }
  36. func (event *CloudEvent) Marshal() ([]byte, error) {
  37. if strutils.IsStringEmpty(event.SpecVersion) {
  38. return nil, errors.New("没有传递事件规范版本")
  39. }
  40. if strutils.IsStringEmpty(event.ID) {
  41. return nil, errors.New("没有传递事件ID")
  42. }
  43. if strutils.IsStringEmpty(event.Type) {
  44. return nil, errors.New("没有传递事件类型")
  45. }
  46. if strutils.IsStringEmpty(event.Source) {
  47. return nil, errors.New("没有传递事件源")
  48. }
  49. if strutils.IsStringEmpty(event.DataContentType) {
  50. return nil, errors.New("没有传递事件数据类型")
  51. }
  52. if strutils.IsStringEmpty(event.Time) {
  53. return nil, errors.New("没有传递事件时间")
  54. }
  55. eventJson, err := json.Marshal(event)
  56. if err != nil {
  57. return nil, errors.New(err.Error())
  58. }
  59. return eventJson, nil
  60. }
  61. // MessageHandler 消息处理函数
  62. // 参数:
  63. // - queue: 消息队列
  64. // - topic: 主题
  65. // - data: 消息数据
  66. // 返回值: 无
  67. type MessageHandler func(queue MessageQueue, topic string, event *CloudEvent)
  68. // MessageQueue 消息队列接口
  69. type MessageQueue interface {
  70. Subscribe(group string, topic string, handler MessageHandler) error
  71. UnSubscribe(group string, topic string) error
  72. Publish(topic string, event *CloudEvent) error
  73. }