package mqtt_client

import (
	"errors"
	"fmt"
	"git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
	"git.sxidc.com/go-tools/api_binding/utils"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"sync"
	"time"
)

type MqttClientOptions struct {
	UserName              string
	Password              string
	Address               string
	ClientID              string
	KeepAliveSec          int64
	PingTimeoutSec        int64
	WriteTimeoutSec       int64
	SubscribeRoutineCount int
	SubscribeBufferSize   int
}

func (opts *MqttClientOptions) check() error {
	if utils.IsStringEmpty(opts.UserName) {
		return errors.New("必须传递用户名")
	}

	if utils.IsStringEmpty(opts.Password) {
		return errors.New("必须传递密码")
	}

	if utils.IsStringEmpty(opts.Address) {
		return errors.New("必须传递地址")
	}

	if utils.IsStringEmpty(opts.ClientID) {
		return errors.New("必须传递客户端ID")
	}

	return nil
}

type subscribePayload struct {
	item    *router.Item
	payload []byte
}

type MqttClient struct {
	client mqtt.Client
	opts   *MqttClientOptions

	routersMutex *sync.Mutex
	routers      []*router.Router

	subscribeChan     chan *subscribePayload
	subscribeDoneChan chan any
}

func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) {
	if opts == nil {
		return nil, errors.New("没有传递mqtt客户端选项")
	}

	err := opts.check()
	if err != nil {
		return nil, err
	}

	if opts.SubscribeRoutineCount == 0 {
		opts.SubscribeRoutineCount = 10
	}

	if opts.SubscribeBufferSize == 0 {
		opts.SubscribeRoutineCount = 1024
	}

	if opts.WriteTimeoutSec == 0 {
		opts.WriteTimeoutSec = 60
	}

	mqttClient := &MqttClient{
		opts:              opts,
		routersMutex:      &sync.Mutex{},
		routers:           make([]*router.Router, 0),
		subscribeChan:     make(chan *subscribePayload, opts.SubscribeRoutineCount),
		subscribeDoneChan: make(chan any),
	}

	mqttOptions := mqtt.NewClientOptions().
		SetAutoReconnect(true).
		SetUsername(opts.UserName).
		SetPassword(opts.Password).
		AddBroker(opts.Address).
		SetClientID(opts.ClientID).
		SetKeepAlive(time.Duration(opts.KeepAliveSec)*time.Second).
		SetPingTimeout(time.Duration(opts.PingTimeoutSec)*time.Second).
		SetWriteTimeout(time.Duration(opts.WriteTimeoutSec)*time.Second).
		SetWill(opts.ClientID+"/will", "dead", 2, false).
		SetOnConnectHandler(func(client mqtt.Client) {
			err := mqttClient.onConnect()
			if err != nil {
				fmt.Println(err)
				return
			}
		}).
		SetConnectionLostHandler(func(client mqtt.Client, _ error) {
			err := mqttClient.onConnectLost()
			if err != nil {
				fmt.Println(err)
				return
			}
		})

	mqttClient.client = mqtt.NewClient(mqttOptions)

	for i := 0; i < opts.SubscribeRoutineCount; i++ {
		go mqttClient.subscribeRoutine()
	}

	return mqttClient, nil
}

func DestroyMqttClient(c *MqttClient) {
	if c != nil {
		close(c.subscribeDoneChan)
		c.subscribeDoneChan = nil

		close(c.subscribeChan)
		c.subscribeChan = nil

		c.client = nil

		c.routersMutex.Lock()
		for _, r := range c.routers {
			router.DestroyRouter(r)
		}
		c.routers = nil
		c.routersMutex.Unlock()
	}

	c = nil
}

func (c *MqttClient) Connect() error {
	token := c.client.Connect()
	if !token.WaitTimeout(time.Duration(c.opts.WriteTimeoutSec) * time.Second) {
		return errors.New("连接超时")
	}

	if token.Error() != nil {
		return token.Error()
	}

	return nil
}

func (c *MqttClient) Disconnect() {
	c.client.Disconnect(250)
}

func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.Router {
	r := router.NewRouter(group, handlers, func(item *router.Item) error {
		for {
			if c.client.IsConnected() {
				break
			}

			time.Sleep(1 * time.Second)
		}

		err := item.DoIfUnSubscribe(func() error {
			return c.subscribe(item)
		})
		if err != nil {
			return err
		}

		return nil
	})

	c.addRouter(r)

	return r
}

func (c *MqttClient) Response(item *router.Item, responseIdentifier string, data []byte) error {
	replyTopic := item.Topic + "/reply"
	if utils.IsStringNotEmpty(responseIdentifier) {
		replyTopic = fmt.Sprintf("%s/%s/reply", item.Topic, responseIdentifier)
	}

	token := c.client.Publish(replyTopic, 2, false, data)
	if !token.WaitTimeout(time.Duration(c.opts.WriteTimeoutSec) * time.Second) {
		return errors.New("发布超时")
	}

	if token.Error() != nil {
		return token.Error()
	}

	return nil
}

func (c *MqttClient) onConnect() error {
	err := c.rangeRouters(func(r *router.Router) error {
		err := r.RangeItem(func(item *router.Item) error {
			err := item.DoIfUnSubscribe(func() error {
				return c.subscribe(item)
			})
			if err != nil {
				return err
			}

			return nil
		})
		if err != nil {
			return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
		}

		return nil
	})
	if err != nil {
		return err
	}

	return nil
}

func (c *MqttClient) onConnectLost() error {
	err := c.rangeRouters(func(r *router.Router) error {
		err := r.RangeItem(func(item *router.Item) error {
			item.SetUnSubscribe()
			return nil
		})
		if err != nil {
			return err
		}

		return nil
	})
	if err != nil {
		return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
	}

	return nil
}

func (c *MqttClient) addRouter(router *router.Router) {
	c.routersMutex.Lock()
	defer c.routersMutex.Unlock()

	c.routers = append(c.routers, router)
}

func (c *MqttClient) rangeRouters(rangeFunc func(router *router.Router) error) error {
	c.routersMutex.Lock()
	defer c.routersMutex.Unlock()

	for _, r := range c.routers {
		err := rangeFunc(r)
		if err != nil {
			return err
		}
	}

	return nil
}

func (c *MqttClient) subscribe(item *router.Item) error {
	token := c.client.Subscribe(item.Topic, 2, func(client mqtt.Client, message mqtt.Message) {
		c.subscribeChan <- &subscribePayload{
			item:    item,
			payload: message.Payload(),
		}
	})
	if !token.WaitTimeout(time.Duration(c.opts.WriteTimeoutSec) * time.Second) {
		return errors.New("订阅超时")
	}

	if token.Error() != nil {
		return token.Error()
	}

	fmt.Println("[MQTT] Subscribe Topic: " + item.Topic)

	return nil
}

func (c *MqttClient) subscribeRoutine() {
	for {
		select {
		case <-c.subscribeDoneChan:
			return
		case payload := <-c.subscribeChan:
			payload.item.CallHandlers(payload.payload)
		}
	}
}