router.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package router
  2. import (
  3. "errors"
  4. "git.sxidc.com/go-tools/utils/strutils"
  5. "sync"
  6. )
  7. type Handler func(item *Item, data []byte)
  8. type OnAddItemFunc func(item *Item) error
  9. type Router struct {
  10. Group string
  11. itemsMutex *sync.Mutex
  12. items []*Item
  13. handlers []Handler
  14. onAddItemFunc OnAddItemFunc
  15. }
  16. func NewRouter(group string, handlers []Handler, onAddItemFunc OnAddItemFunc) *Router {
  17. return &Router{
  18. Group: group,
  19. itemsMutex: &sync.Mutex{},
  20. items: make([]*Item, 0),
  21. onAddItemFunc: onAddItemFunc,
  22. handlers: handlers,
  23. }
  24. }
  25. func DestroyRouter(router *Router) {
  26. if router != nil {
  27. return
  28. }
  29. router.itemsMutex.Lock()
  30. for _, item := range router.items {
  31. DestroyItem(item)
  32. }
  33. router.items = nil
  34. router.itemsMutex.Unlock()
  35. router = nil
  36. }
  37. func (router *Router) AddTopic(topic string, handlers ...Handler) error {
  38. item, err := NewItem(topic, handlers)
  39. if err != nil {
  40. return err
  41. }
  42. return router.AddItem(item)
  43. }
  44. func (router *Router) AddItem(item *Item) error {
  45. if item == nil {
  46. return nil
  47. }
  48. router.itemsMutex.Lock()
  49. defer router.itemsMutex.Unlock()
  50. item.handlers = append(router.handlers, item.handlers...)
  51. if router.onAddItemFunc != nil {
  52. err := router.onAddItemFunc(item)
  53. if err != nil {
  54. return err
  55. }
  56. }
  57. router.items = append(router.items, item)
  58. return nil
  59. }
  60. func (router *Router) RangeItem(rangeFunc func(item *Item) error) error {
  61. if rangeFunc == nil {
  62. return nil
  63. }
  64. router.itemsMutex.Lock()
  65. defer router.itemsMutex.Unlock()
  66. for _, item := range router.items {
  67. err := rangeFunc(item)
  68. if err != nil {
  69. return err
  70. }
  71. }
  72. return nil
  73. }
  74. type Item struct {
  75. Topic string
  76. subscribedMutex *sync.Mutex
  77. subscribed bool
  78. handlers []Handler
  79. currentHandlerIndex int
  80. currentData []byte
  81. }
  82. func NewItem(topic string, handlers []Handler) (*Item, error) {
  83. if strutils.IsStringEmpty(topic) {
  84. return nil, errors.New("没有传递主题")
  85. }
  86. return &Item{
  87. Topic: topic,
  88. subscribedMutex: &sync.Mutex{},
  89. subscribed: false,
  90. handlers: handlers,
  91. currentHandlerIndex: 0,
  92. currentData: make([]byte, 0),
  93. }, nil
  94. }
  95. func DestroyItem(item *Item) {
  96. if item == nil {
  97. return
  98. }
  99. item = nil
  100. }
  101. func (item *Item) CallHandlers(data []byte) {
  102. item.currentHandlerIndex = 0
  103. item.currentData = data
  104. item.handlers[item.currentHandlerIndex](item, item.currentData)
  105. }
  106. func (item *Item) Next() {
  107. item.currentHandlerIndex++
  108. if item.currentHandlerIndex < len(item.handlers) {
  109. item.handlers[item.currentHandlerIndex](item, item.currentData)
  110. }
  111. }
  112. func (item *Item) GetData() []byte {
  113. return item.currentData
  114. }
  115. func (item *Item) SetUnSubscribe() {
  116. item.subscribedMutex.Lock()
  117. defer item.subscribedMutex.Unlock()
  118. item.subscribed = false
  119. }
  120. func (item *Item) DoIfUnSubscribe(doFunc func() error) error {
  121. item.subscribedMutex.Lock()
  122. defer item.subscribedMutex.Unlock()
  123. if item.subscribed {
  124. return nil
  125. }
  126. err := doFunc()
  127. if err != nil {
  128. return err
  129. }
  130. item.subscribed = true
  131. return nil
  132. }