package router import ( "errors" "git.sxidc.com/go-tools/utils/strutils" "sync" ) type Handler func(item *Item, data []byte) type OnAddItemFunc func(item *Item) error type Router struct { Group string itemsMutex *sync.Mutex items []*Item handlers []Handler onAddItemFunc OnAddItemFunc } func NewRouter(group string, handlers []Handler, onAddItemFunc OnAddItemFunc) *Router { return &Router{ Group: group, itemsMutex: &sync.Mutex{}, items: make([]*Item, 0), onAddItemFunc: onAddItemFunc, handlers: handlers, } } func DestroyRouter(router *Router) { if router != nil { return } router.itemsMutex.Lock() for _, item := range router.items { DestroyItem(item) } router.items = nil router.itemsMutex.Unlock() router = nil } func (router *Router) AddTopic(topic string, handlers ...Handler) error { item, err := NewItem(topic, handlers) if err != nil { return err } return router.AddItem(item) } func (router *Router) AddItem(item *Item) error { if item == nil { return nil } router.itemsMutex.Lock() defer router.itemsMutex.Unlock() item.handlers = append(router.handlers, item.handlers...) if router.onAddItemFunc != nil { err := router.onAddItemFunc(item) if err != nil { return err } } router.items = append(router.items, item) return nil } func (router *Router) RangeItem(rangeFunc func(item *Item) error) error { if rangeFunc == nil { return nil } router.itemsMutex.Lock() defer router.itemsMutex.Unlock() for _, item := range router.items { err := rangeFunc(item) if err != nil { return err } } return nil } type Item struct { Topic string subscribedMutex *sync.Mutex subscribed bool handlers []Handler currentHandlerIndex int currentData []byte } func NewItem(topic string, handlers []Handler) (*Item, error) { if strutils.IsStringEmpty(topic) { return nil, errors.New("没有传递主题") } return &Item{ Topic: topic, subscribedMutex: &sync.Mutex{}, subscribed: false, handlers: handlers, currentHandlerIndex: 0, currentData: make([]byte, 0), }, nil } func DestroyItem(item *Item) { if item == nil { return } item = nil } func (item *Item) CallHandlers(data []byte) { item.currentHandlerIndex = 0 item.currentData = data item.handlers[item.currentHandlerIndex](item, item.currentData) } func (item *Item) Next() { item.currentHandlerIndex++ if item.currentHandlerIndex < len(item.handlers) { item.handlers[item.currentHandlerIndex](item, item.currentData) } } func (item *Item) GetData() []byte { return item.currentData } func (item *Item) SetUnSubscribe() { item.subscribedMutex.Lock() defer item.subscribedMutex.Unlock() item.subscribed = false } func (item *Item) DoIfUnSubscribe(doFunc func() error) error { item.subscribedMutex.Lock() defer item.subscribedMutex.Unlock() if item.subscribed { return nil } err := doFunc() if err != nil { return err } item.subscribed = true return nil }