123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- package mqtt_api
- import (
- "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
- "sync"
- )
- type Handler func(c *Context)
- type Router struct {
- Group string
- mqttClient *MqttClient
- contextsMutex *sync.Mutex
- contexts []*Context
- globalHandlers []Handler
- topicHandlersMutex *sync.Mutex
- topicHandlers map[string][]Handler
- }
- func NewRouter(group string, globalHandlers []Handler) *Router {
- return &Router{
- Group: group,
- mqttClient: nil,
- contextsMutex: &sync.Mutex{},
- contexts: make([]*Context, 0),
- globalHandlers: globalHandlers,
- topicHandlersMutex: &sync.Mutex{},
- topicHandlers: make(map[string][]Handler),
- }
- }
- func DestroyRouter(router *Router) {
- if router == nil {
- return
- }
- router.contextsMutex.Lock()
- for _, context := range router.contexts {
- destroyContext(context)
- }
- router.contexts = nil
- router.contextsMutex.Unlock()
- router = nil
- }
- func (router *Router) Start(mqttOptions *MqttClientOptions) error {
- mqttClient, err := newMqttClient(mqttOptions, func(client *MqttClient) {
- router.subscribeTopics(client)
- }, func(client *MqttClient) {
- router.unsubscribeTopics(client)
- })
- if err != nil {
- return err
- }
- err = mqttClient.connect()
- if err != nil {
- return err
- }
- router.mqttClient = mqttClient
- return nil
- }
- func (router *Router) Finish() {
- if router.mqttClient == nil {
- return
- }
- router.unsubscribeTopics(router.mqttClient)
- router.mqttClient.disconnect()
- destroyMqttClient(router.mqttClient)
- router.mqttClient = nil
- }
- func (router *Router) AddTopic(topic string, handlers ...Handler) error {
- added := router.addTopicHandlers(topic, handlers...)
- if !added {
- return nil
- }
- if router.mqttClient == nil {
- return nil
- }
- err := router.mqttClient.subscribe(topic, func(topic string, data []byte) {})
- if err != nil {
- return err
- }
- return nil
- }
- func (router *Router) subscribeTopics(client *MqttClient) {
- router.rangeTopicHandlers(func(topic string, handlers []Handler) {
- err := client.subscribe(topic, func(topic string, data []byte) {
- c, err := newContext(router.mqttClient, topic, data, handlers)
- if err != nil {
- logger.GetInstance().Error(err)
- return
- }
- c.Next()
- })
- if err != nil {
- logger.GetInstance().Error(err)
- return
- }
- })
- }
- func (router *Router) unsubscribeTopics(client *MqttClient) {
- router.rangeTopicHandlers(func(topic string, handlers []Handler) {
- err := client.unsubscribe(topic)
- if err != nil {
- logger.GetInstance().Error(err)
- return
- }
- })
- }
- func (router *Router) addTopicHandlers(topic string, handler ...Handler) bool {
- router.topicHandlersMutex.Lock()
- defer router.topicHandlersMutex.Unlock()
- if router.topicHandlers[topic] != nil && len(router.topicHandlers[topic]) > 0 {
- return false
- }
- router.topicHandlers[topic] = append(router.globalHandlers, handler...)
- return true
- }
- func (router *Router) removeTopicHandlers(topic string) {
- router.topicHandlersMutex.Lock()
- defer router.topicHandlersMutex.Unlock()
- if router.topicHandlers[topic] == nil || len(router.topicHandlers[topic]) == 0 {
- return
- }
- delete(router.topicHandlers, topic)
- }
- func (router *Router) rangeTopicHandlers(callback func(topic string, handlers []Handler)) {
- router.topicHandlersMutex.Lock()
- defer router.topicHandlersMutex.Unlock()
- for topic, topicHandlers := range router.topicHandlers {
- callback(topic, topicHandlers)
- }
- }
|