yjp 2 gadi atpakaļ
vecāks
revīzija
4588058d2e
1 mainītis faili ar 15 papildinājumiem un 18 dzēšanām
  1. 15 18
      mqtt_binding/mqtt_client/mqtt_client.go

+ 15 - 18
mqtt_binding/mqtt_client/mqtt_client.go

@@ -129,14 +129,7 @@ func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.
 		}
 
 		err := item.DoIfUnSubscribe(func() error {
-			token := c.client.Subscribe(item.Topic, 2, func(client mqtt.Client, message mqtt.Message) {
-				item.CallHandlers(message.Payload())
-			})
-			if token.Wait(); token.Error() != nil {
-				return token.Error()
-			}
-
-			return nil
+			return c.subscribe(item)
 		})
 		if err != nil {
 			return err
@@ -163,16 +156,7 @@ func (c *MqttClient) onConnect() error {
 	err := c.rangeRouters(func(r *router.Router) error {
 		err := r.RangeItem(func(item *router.Item) error {
 			err := item.DoIfUnSubscribe(func() error {
-				token := c.client.Subscribe(item.Topic, 2, func(client mqtt.Client, message mqtt.Message) {
-					item.CallHandlers(message.Payload())
-				})
-				if token.Wait(); token.Error() != nil {
-					return token.Error()
-				}
-
-				fmt.Println("[MQTT] Subscribe Topic: " + item.Topic)
-
-				return nil
+				return c.subscribe(item)
 			})
 			if err != nil {
 				return err
@@ -232,3 +216,16 @@ func (c *MqttClient) rangeRouters(rangeFunc func(router *router.Router) error) e
 
 	return nil
 }
+
+func (c *MqttClient) subscribe(item *router.Item) error {
+	token := c.client.Subscribe(item.Topic, 2, func(client mqtt.Client, message mqtt.Message) {
+		item.CallHandlers(message.Payload())
+	})
+	if token.Wait(); token.Error() != nil {
+		return token.Error()
+	}
+
+	fmt.Println("[MQTT] Subscribe Topic: " + item.Topic)
+
+	return nil
+}