package mqtt_api import ( "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger" "sync" ) type Handler func(c *Context) type Router struct { Group string mqttClient *MqttClient contextsMutex *sync.Mutex contexts []*Context globalHandlers []Handler topicHandlersMutex *sync.Mutex topicHandlers map[string][]Handler } func NewRouter(group string, globalHandlers []Handler) *Router { return &Router{ Group: group, mqttClient: nil, contextsMutex: &sync.Mutex{}, contexts: make([]*Context, 0), globalHandlers: globalHandlers, topicHandlersMutex: &sync.Mutex{}, topicHandlers: make(map[string][]Handler), } } func DestroyRouter(router *Router) { if router == nil { return } router.contextsMutex.Lock() for _, context := range router.contexts { destroyContext(context) } router.contexts = nil router.contextsMutex.Unlock() router = nil } func (router *Router) Start(mqttOptions *MqttClientOptions) error { mqttClient, err := newMqttClient(mqttOptions, func(client *MqttClient) { router.subscribeTopics(client) }, func(client *MqttClient) { router.unsubscribeTopics(client) }) if err != nil { return err } err = mqttClient.connect() if err != nil { return err } router.mqttClient = mqttClient return nil } func (router *Router) Finish() { if router.mqttClient == nil { return } router.unsubscribeTopics(router.mqttClient) router.mqttClient.disconnect() destroyMqttClient(router.mqttClient) router.mqttClient = nil } func (router *Router) AddTopic(topic string, handlers ...Handler) error { added := router.addTopicHandlers(topic, handlers...) if !added { return nil } if router.mqttClient == nil { return nil } err := router.mqttClient.subscribe(topic, func(topic string, data []byte) {}) if err != nil { return err } return nil } func (router *Router) subscribeTopics(client *MqttClient) { router.rangeTopicHandlers(func(topic string, handlers []Handler) { err := client.subscribe(topic, func(topic string, data []byte) { c, err := newContext(router.mqttClient, topic, data, handlers) if err != nil { logger.GetInstance().Error(err) return } c.Next() }) if err != nil { logger.GetInstance().Error(err) return } }) } func (router *Router) unsubscribeTopics(client *MqttClient) { router.rangeTopicHandlers(func(topic string, handlers []Handler) { err := client.unsubscribe(topic) if err != nil { logger.GetInstance().Error(err) return } }) } func (router *Router) addTopicHandlers(topic string, handler ...Handler) bool { router.topicHandlersMutex.Lock() defer router.topicHandlersMutex.Unlock() if router.topicHandlers[topic] != nil && len(router.topicHandlers[topic]) > 0 { return false } router.topicHandlers[topic] = append(router.globalHandlers, handler...) return true } func (router *Router) removeTopicHandlers(topic string) { router.topicHandlersMutex.Lock() defer router.topicHandlersMutex.Unlock() if router.topicHandlers[topic] == nil || len(router.topicHandlers[topic]) == 0 { return } delete(router.topicHandlers, topic) } func (router *Router) rangeTopicHandlers(callback func(topic string, handlers []Handler)) { router.topicHandlersMutex.Lock() defer router.topicHandlersMutex.Unlock() for topic, topicHandlers := range router.topicHandlers { callback(topic, topicHandlers) } }