123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- package mqtt_api
- import (
- "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
- "sync"
- )
- type Handler func(c *Context)
- type subscribeItem struct {
- handlers []Handler
- subscribed bool
- }
- type Router struct {
- Group string
- mqttClient *MqttClient
- contextsMutex *sync.Mutex
- contexts []*Context
- globalHandlers []Handler
- subscribeItemsMutex *sync.Mutex
- subscribeItems map[string]*subscribeItem
- }
- func NewRouter(group string, globalHandlers []Handler) *Router {
- return &Router{
- Group: group,
- mqttClient: nil,
- contextsMutex: &sync.Mutex{},
- contexts: make([]*Context, 0),
- globalHandlers: globalHandlers,
- subscribeItemsMutex: &sync.Mutex{},
- subscribeItems: make(map[string]*subscribeItem),
- }
- }
- func DestroyRouter(router *Router) {
- if router == nil {
- return
- }
- router.contextsMutex.Lock()
- for _, context := range router.contexts {
- destroyContext(context)
- }
- router.contexts = nil
- router.contextsMutex.Unlock()
- router = nil
- }
- func (router *Router) Start(mqttOptions *MqttClientOptions) error {
- mqttClient, err := newMqttClient(mqttOptions, func(client *MqttClient) {
- router.subscribeTopics(client)
- }, func(client *MqttClient) {
- router.unsubscribeTopics(client)
- })
- if err != nil {
- return err
- }
- err = mqttClient.connect()
- if err != nil {
- return err
- }
- router.mqttClient = mqttClient
- return nil
- }
- func (router *Router) Finish() {
- if router.mqttClient == nil {
- return
- }
- router.unsubscribeTopics(router.mqttClient)
- router.mqttClient.disconnect()
- destroyMqttClient(router.mqttClient)
- router.mqttClient = nil
- }
- func (router *Router) AddGlobalHandlers(handlers ...Handler) {
- router.globalHandlers = append(router.globalHandlers, handlers...)
- }
- func (router *Router) AddTopic(topic string, handlers ...Handler) error {
- allHandlers := append(router.globalHandlers, handlers...)
- err := router.addAndSubscribeTopicHandlers(router.Group+topic, allHandlers...)
- if err != nil {
- return err
- }
- return nil
- }
- func (router *Router) subscribeTopics(client *MqttClient) {
- if !client.mqttClient.IsConnected() {
- return
- }
- router.rangeTopicHandlers(func(topic string, handlers []Handler) {
- err := router.subscribeMqttClient(client, topic, handlers)
- if err != nil {
- logger.GetInstance().Error(err)
- return
- }
- })
- }
- func (router *Router) unsubscribeTopics(client *MqttClient) {
- if !client.mqttClient.IsConnected() {
- return
- }
- router.rangeTopicHandlers(func(topic string, handlers []Handler) {
- err := client.unsubscribe(topic)
- if err != nil {
- logger.GetInstance().Error(err)
- return
- }
- })
- }
- func (router *Router) subscribeMqttClient(client *MqttClient, topic string, handlers []Handler) error {
- err := client.subscribe(topic, func(topic string, data []byte) {
- c, err := newContext(router.mqttClient, topic, data, handlers)
- if err != nil {
- logger.GetInstance().Error(err)
- return
- }
- c.callHandlers()
- c.Next()
- })
- if err != nil {
- return err
- }
- return nil
- }
- func (router *Router) addAndSubscribeTopicHandlers(topic string, handlers ...Handler) error {
- router.subscribeItemsMutex.Lock()
- defer router.subscribeItemsMutex.Unlock()
- router.subscribeItems[topic] = &subscribeItem{
- handlers: handlers,
- subscribed: false,
- }
- if router.mqttClient == nil {
- return nil
- }
- if !router.mqttClient.mqttClient.IsConnected() {
- return nil
- }
- err := router.subscribeMqttClient(router.mqttClient, topic, handlers)
- if err != nil {
- return err
- }
- router.subscribeItems[topic].subscribed = true
- return nil
- }
- func (router *Router) removeTopicHandlers(topic string) {
- router.subscribeItemsMutex.Lock()
- defer router.subscribeItemsMutex.Unlock()
- if router.subscribeItems[topic] == nil {
- return
- }
- delete(router.subscribeItems, topic)
- }
- func (router *Router) rangeTopicHandlers(callback func(topic string, handlers []Handler)) {
- router.subscribeItemsMutex.Lock()
- defer router.subscribeItemsMutex.Unlock()
- for topic, item := range router.subscribeItems {
- if !item.subscribed {
- callback(topic, item.handlers)
- }
- }
- }
|