package mqtt_client import ( "errors" mqtt "github.com/eclipse/paho.mqtt.golang" "time" ) type MessageHandler func(client *Client, topic string, data []byte) type OnConnectHandler func(client *Client) type ConnectionLostHandler func(client *Client, err error) type Client struct { client mqtt.Client } type ClientOptions struct { UserName string Password string Address string ClientID string KeepAliveSec time.Duration PingTimeoutSec time.Duration OnConnectHandler OnConnectHandler ConnectionLostHandler ConnectionLostHandler } func (opt *ClientOptions) check() error { if isStringEmpty(opt.UserName) { return errors.New("必须传递用户名") } if isStringEmpty(opt.Password) { return errors.New("必须传递密码") } if isStringEmpty(opt.Address) { return errors.New("必须传递地址") } if isStringEmpty(opt.ClientID) { return errors.New("必须传递客户端ID") } if opt.OnConnectHandler == nil { return errors.New("必须传递连接回调") } return nil } func New(opts *ClientOptions) (*Client, error) { if opts == nil { return nil, errors.New("必须传递参数") } client := new(Client) mqttClient := mqtt.NewClient(mqtt.NewClientOptions(). SetAutoReconnect(true). SetUsername(opts.UserName). SetPassword(opts.Password). AddBroker(opts.Address). SetClientID(opts.ClientID). SetKeepAlive(opts.KeepAliveSec). SetPingTimeout(opts.PingTimeoutSec). SetWill(opts.ClientID+"/will", "dead", 2, true). SetOnConnectHandler(func(mqttClient mqtt.Client) { opts.OnConnectHandler(client) }). SetConnectionLostHandler(func(mqttClient mqtt.Client, err error) { if opts.ConnectionLostHandler != nil { opts.ConnectionLostHandler(client, err) } })) client.client = mqttClient return client, nil } func Destroy(client *Client) { client.client.Disconnect(250) client = nil } func (client *Client) Connect() error { token := client.client.Connect() token.Wait() err := token.Error() if err != nil { return err } return nil } func (client *Client) Disconnect(quiesce uint) { client.client.Disconnect(quiesce) } func (client *Client) Publish(topic string, qos byte, retained bool, payload string) error { token := client.client.Publish(topic, qos, retained, payload) token.Wait() return token.Error() } func (client *Client) Subscribe(topic string, qos byte, handlerFunc MessageHandler) error { if handlerFunc == nil { return errors.New("必须传递处理函数") } token := client.client.Subscribe(topic, qos, func(mqttClient mqtt.Client, message mqtt.Message) { handlerFunc(client, message.Topic(), message.Payload()) }) token.Wait() return token.Error() } func (client *Client) Unsubscribe(topics ...string) error { token := client.client.Unsubscribe(topics...) token.Wait() return token.Error() }