123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- package router
- import (
- "errors"
- "git.sxidc.com/go-tools/utils/strutils"
- "sync"
- )
- type Handler func(item *Item, data []byte)
- type OnAddItemFunc func(item *Item) error
- type Router struct {
- Group string
- itemsMutex *sync.Mutex
- items []*Item
- handlers []Handler
- onAddItemFunc OnAddItemFunc
- }
- func NewRouter(group string, handlers []Handler, onAddItemFunc OnAddItemFunc) *Router {
- return &Router{
- Group: group,
- itemsMutex: &sync.Mutex{},
- items: make([]*Item, 0),
- onAddItemFunc: onAddItemFunc,
- handlers: handlers,
- }
- }
- func DestroyRouter(router *Router) {
- if router != nil {
- return
- }
- router.itemsMutex.Lock()
- for _, item := range router.items {
- DestroyItem(item)
- }
- router.items = nil
- router.itemsMutex.Unlock()
- router = nil
- }
- func (router *Router) AddTopic(topic string, handlers ...Handler) error {
- item, err := NewItem(topic, handlers)
- if err != nil {
- return err
- }
- return router.AddItem(item)
- }
- func (router *Router) AddItem(item *Item) error {
- if item == nil {
- return nil
- }
- router.itemsMutex.Lock()
- defer router.itemsMutex.Unlock()
- item.handlers = append(router.handlers, item.handlers...)
- if router.onAddItemFunc != nil {
- err := router.onAddItemFunc(item)
- if err != nil {
- return err
- }
- }
- router.items = append(router.items, item)
- return nil
- }
- func (router *Router) RangeItem(rangeFunc func(item *Item) error) error {
- if rangeFunc == nil {
- return nil
- }
- router.itemsMutex.Lock()
- defer router.itemsMutex.Unlock()
- for _, item := range router.items {
- err := rangeFunc(item)
- if err != nil {
- return err
- }
- }
- return nil
- }
- type Item struct {
- Topic string
- subscribedMutex *sync.Mutex
- subscribed bool
- handlers []Handler
- currentHandlerIndex int
- currentData []byte
- }
- func NewItem(topic string, handlers []Handler) (*Item, error) {
- if strutils.IsStringEmpty(topic) {
- return nil, errors.New("没有传递主题")
- }
- return &Item{
- Topic: topic,
- subscribedMutex: &sync.Mutex{},
- subscribed: false,
- handlers: handlers,
- currentHandlerIndex: 0,
- currentData: make([]byte, 0),
- }, nil
- }
- func DestroyItem(item *Item) {
- if item == nil {
- return
- }
- item = nil
- }
- func (item *Item) CallHandlers(data []byte) {
- item.currentHandlerIndex = 0
- item.currentData = data
- item.handlers[item.currentHandlerIndex](item, item.currentData)
- }
- func (item *Item) Next() {
- item.currentHandlerIndex++
- if item.currentHandlerIndex < len(item.handlers) {
- item.handlers[item.currentHandlerIndex](item, item.currentData)
- }
- }
- func (item *Item) GetData() []byte {
- return item.currentData
- }
- func (item *Item) SetUnSubscribe() {
- item.subscribedMutex.Lock()
- defer item.subscribedMutex.Unlock()
- item.subscribed = false
- }
- func (item *Item) DoIfUnSubscribe(doFunc func() error) error {
- item.subscribedMutex.Lock()
- defer item.subscribedMutex.Unlock()
- if item.subscribed {
- return nil
- }
- err := doFunc()
- if err != nil {
- return err
- }
- item.subscribed = true
- return nil
- }
|