mqtt_client.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package mqtt_client
  2. import (
  3. "errors"
  4. mqtt "github.com/eclipse/paho.mqtt.golang"
  5. "time"
  6. )
  7. type MessageHandler func(client *Client, topic string, data []byte)
  8. type OnConnectHandler func(client *Client)
  9. type ConnectionLostHandler func(client *Client, err error)
  10. type Client struct {
  11. client mqtt.Client
  12. }
  13. type ClientOptions struct {
  14. UserName string
  15. Password string
  16. Address string
  17. ClientID string
  18. KeepAliveSec time.Duration
  19. PingTimeoutSec time.Duration
  20. OnConnectHandler OnConnectHandler
  21. ConnectionLostHandler ConnectionLostHandler
  22. }
  23. func (opt *ClientOptions) check() error {
  24. if isStringEmpty(opt.UserName) {
  25. return errors.New("必须传递用户名")
  26. }
  27. if isStringEmpty(opt.Password) {
  28. return errors.New("必须传递密码")
  29. }
  30. if isStringEmpty(opt.Address) {
  31. return errors.New("必须传递地址")
  32. }
  33. if isStringEmpty(opt.ClientID) {
  34. return errors.New("必须传递客户端ID")
  35. }
  36. if opt.OnConnectHandler == nil {
  37. return errors.New("必须传递连接回调")
  38. }
  39. return nil
  40. }
  41. func New(opts *ClientOptions) (*Client, error) {
  42. if opts == nil {
  43. return nil, errors.New("必须传递参数")
  44. }
  45. client := new(Client)
  46. mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
  47. SetAutoReconnect(true).
  48. SetUsername(opts.UserName).
  49. SetPassword(opts.Password).
  50. AddBroker(opts.Address).
  51. SetClientID(opts.ClientID).
  52. SetKeepAlive(opts.KeepAliveSec).
  53. SetPingTimeout(opts.PingTimeoutSec).
  54. SetWill(opts.ClientID+"/will", "dead", 2, true).
  55. SetOnConnectHandler(func(mqttClient mqtt.Client) {
  56. opts.OnConnectHandler(client)
  57. }).
  58. SetConnectionLostHandler(func(mqttClient mqtt.Client, err error) {
  59. if opts.ConnectionLostHandler != nil {
  60. opts.ConnectionLostHandler(client, err)
  61. }
  62. }))
  63. client.client = mqttClient
  64. return client, nil
  65. }
  66. func Destroy(client *Client) {
  67. client.client.Disconnect(250)
  68. client = nil
  69. }
  70. func (client *Client) Connect() error {
  71. token := client.client.Connect()
  72. token.Wait()
  73. err := token.Error()
  74. if err != nil {
  75. return err
  76. }
  77. return nil
  78. }
  79. func (client *Client) Disconnect(quiesce uint) {
  80. client.client.Disconnect(quiesce)
  81. }
  82. func (client *Client) Publish(topic string, qos byte, retained bool, payload string) error {
  83. token := client.client.Publish(topic, qos, retained, payload)
  84. token.Wait()
  85. return token.Error()
  86. }
  87. func (client *Client) Subscribe(topic string, qos byte, handlerFunc MessageHandler) error {
  88. if handlerFunc == nil {
  89. return errors.New("必须传递处理函数")
  90. }
  91. token := client.client.Subscribe(topic, qos, func(mqttClient mqtt.Client, message mqtt.Message) {
  92. handlerFunc(client, message.Topic(), message.Payload())
  93. })
  94. token.Wait()
  95. return token.Error()
  96. }
  97. func (client *Client) Unsubscribe(topics ...string) error {
  98. token := client.client.Unsubscribe(topics...)
  99. token.Wait()
  100. return token.Error()
  101. }