|
@@ -0,0 +1,125 @@
|
|
|
+package mqtt_client
|
|
|
+
|
|
|
+import (
|
|
|
+ "errors"
|
|
|
+ mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+type MessageHandler func(client *Client, topic string, data []byte)
|
|
|
+type OnConnectHandler func(client *Client)
|
|
|
+type ConnectionLostHandler func(client *Client, err error)
|
|
|
+
|
|
|
+type Client struct {
|
|
|
+ client mqtt.Client
|
|
|
+}
|
|
|
+
|
|
|
+type ClientOptions struct {
|
|
|
+ UserName string
|
|
|
+ Password string
|
|
|
+ Address string
|
|
|
+ ClientID string
|
|
|
+ KeepAliveSec time.Duration
|
|
|
+ PingTimeoutSec time.Duration
|
|
|
+ OnConnectHandler OnConnectHandler
|
|
|
+ ConnectionLostHandler ConnectionLostHandler
|
|
|
+}
|
|
|
+
|
|
|
+func (opt *ClientOptions) check() error {
|
|
|
+ if isStringEmpty(opt.UserName) {
|
|
|
+ return errors.New("必须传递用户名")
|
|
|
+ }
|
|
|
+
|
|
|
+ if isStringEmpty(opt.Password) {
|
|
|
+ return errors.New("必须传递密码")
|
|
|
+ }
|
|
|
+
|
|
|
+ if isStringEmpty(opt.Address) {
|
|
|
+ return errors.New("必须传递地址")
|
|
|
+ }
|
|
|
+
|
|
|
+ if isStringEmpty(opt.ClientID) {
|
|
|
+ return errors.New("必须传递客户端ID")
|
|
|
+ }
|
|
|
+
|
|
|
+ if opt.OnConnectHandler == nil {
|
|
|
+ return errors.New("必须传递连接回调")
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func New(opts *ClientOptions) (*Client, error) {
|
|
|
+ if opts == nil {
|
|
|
+ return nil, errors.New("必须传递参数")
|
|
|
+ }
|
|
|
+
|
|
|
+ client := new(Client)
|
|
|
+
|
|
|
+ mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
|
|
|
+ SetAutoReconnect(true).
|
|
|
+ SetUsername(opts.UserName).
|
|
|
+ SetPassword(opts.Password).
|
|
|
+ AddBroker(opts.Address).
|
|
|
+ SetClientID(opts.ClientID).
|
|
|
+ SetKeepAlive(opts.KeepAliveSec).
|
|
|
+ SetPingTimeout(opts.PingTimeoutSec).
|
|
|
+ SetWill(opts.ClientID+"/will", "dead", 2, true).
|
|
|
+ SetOnConnectHandler(func(mqttClient mqtt.Client) {
|
|
|
+ opts.OnConnectHandler(client)
|
|
|
+ }).
|
|
|
+ SetConnectionLostHandler(func(mqttClient mqtt.Client, err error) {
|
|
|
+ if opts.ConnectionLostHandler != nil {
|
|
|
+ opts.ConnectionLostHandler(client, err)
|
|
|
+ }
|
|
|
+ }))
|
|
|
+
|
|
|
+ client.client = mqttClient
|
|
|
+
|
|
|
+ return client, nil
|
|
|
+}
|
|
|
+
|
|
|
+func Destroy(client *Client) {
|
|
|
+ client.client.Disconnect(250)
|
|
|
+ client = nil
|
|
|
+}
|
|
|
+
|
|
|
+func (client *Client) Connect() error {
|
|
|
+ token := client.client.Connect()
|
|
|
+ token.Wait()
|
|
|
+ err := token.Error()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (client *Client) Disconnect(quiesce uint) {
|
|
|
+ client.client.Disconnect(quiesce)
|
|
|
+}
|
|
|
+
|
|
|
+func (client *Client) Publish(topic string, qos byte, retained bool, payload string) error {
|
|
|
+ token := client.client.Publish(topic, qos, retained, payload)
|
|
|
+ token.Wait()
|
|
|
+ return token.Error()
|
|
|
+}
|
|
|
+
|
|
|
+func (client *Client) Subscribe(topic string, qos byte, handlerFunc MessageHandler) error {
|
|
|
+ if handlerFunc == nil {
|
|
|
+ return errors.New("必须传递处理函数")
|
|
|
+ }
|
|
|
+
|
|
|
+ token := client.client.Subscribe(topic, qos, func(mqttClient mqtt.Client, message mqtt.Message) {
|
|
|
+ handlerFunc(client, message.Topic(), message.Payload())
|
|
|
+ })
|
|
|
+
|
|
|
+ token.Wait()
|
|
|
+ return token.Error()
|
|
|
+}
|
|
|
+
|
|
|
+func (client *Client) Unsubscribe(topics ...string) error {
|
|
|
+ token := client.client.Unsubscribe(topics...)
|
|
|
+ token.Wait()
|
|
|
+ return token.Error()
|
|
|
+}
|