package mqtt_api import ( "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger" "sync" ) type Handler func(c *Context) type subscribeItem struct { handlers []Handler subscribed bool } type Router struct { Group string mqttClient *MqttClient contextsMutex *sync.Mutex contexts []*Context globalHandlers []Handler subscribeItemsMutex *sync.Mutex subscribeItems map[string]*subscribeItem } func NewRouter(group string, globalHandlers []Handler) *Router { return &Router{ Group: group, mqttClient: nil, contextsMutex: &sync.Mutex{}, contexts: make([]*Context, 0), globalHandlers: globalHandlers, subscribeItemsMutex: &sync.Mutex{}, subscribeItems: make(map[string]*subscribeItem), } } func DestroyRouter(router *Router) { if router == nil { return } router.contextsMutex.Lock() for _, context := range router.contexts { destroyContext(context) } router.contexts = nil router.contextsMutex.Unlock() router = nil } func (router *Router) Start(mqttOptions *MqttClientOptions) error { mqttClient, err := newMqttClient(mqttOptions, func(client *MqttClient) { router.subscribeTopics(client) }, func(client *MqttClient) { router.unsubscribeTopics(client) }) if err != nil { return err } err = mqttClient.connect() if err != nil { return err } router.mqttClient = mqttClient return nil } func (router *Router) Finish() { if router.mqttClient == nil { return } router.unsubscribeTopics(router.mqttClient) router.mqttClient.disconnect() destroyMqttClient(router.mqttClient) router.mqttClient = nil } func (router *Router) AddGlobalHandlers(handlers ...Handler) { router.globalHandlers = append(router.globalHandlers, handlers...) } func (router *Router) AddTopic(topic string, handlers ...Handler) error { allHandlers := append(router.globalHandlers, handlers...) err := router.addAndSubscribeTopicHandlers(router.Group+topic, allHandlers...) if err != nil { return err } return nil } func (router *Router) subscribeTopics(client *MqttClient) { if !client.mqttClient.IsConnected() { return } router.rangeTopicHandlers(func(topic string, handlers []Handler) { err := router.subscribeMqttClient(client, topic, handlers) if err != nil { logger.GetInstance().Error(err) return } }) } func (router *Router) unsubscribeTopics(client *MqttClient) { if !client.mqttClient.IsConnected() { return } router.rangeTopicHandlers(func(topic string, handlers []Handler) { err := client.unsubscribe(topic) if err != nil { logger.GetInstance().Error(err) return } }) } func (router *Router) subscribeMqttClient(client *MqttClient, topic string, handlers []Handler) error { err := client.subscribe(topic, func(topic string, data []byte) { c, err := newContext(router.mqttClient, topic, data, handlers) if err != nil { logger.GetInstance().Error(err) return } c.callHandlers() c.Next() }) if err != nil { return err } return nil } func (router *Router) addAndSubscribeTopicHandlers(topic string, handlers ...Handler) error { router.subscribeItemsMutex.Lock() defer router.subscribeItemsMutex.Unlock() router.subscribeItems[topic] = &subscribeItem{ handlers: handlers, subscribed: false, } if router.mqttClient == nil { return nil } if !router.mqttClient.mqttClient.IsConnected() { return nil } err := router.subscribeMqttClient(router.mqttClient, topic, handlers) if err != nil { return err } router.subscribeItems[topic].subscribed = true return nil } func (router *Router) removeTopicHandlers(topic string) { router.subscribeItemsMutex.Lock() defer router.subscribeItemsMutex.Unlock() if router.subscribeItems[topic] == nil { return } delete(router.subscribeItems, topic) } func (router *Router) rangeTopicHandlers(callback func(topic string, handlers []Handler)) { router.subscribeItemsMutex.Lock() defer router.subscribeItemsMutex.Unlock() for topic, item := range router.subscribeItems { if !item.subscribed { callback(topic, item.handlers) } } }