initialize.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package cabinet_pkg
  2. import (
  3. "dy-admin/internal/pcmserver/bus/cabinet_pkg/tcp_server"
  4. "dy-admin/pkg/log"
  5. "encoding/hex"
  6. "fmt"
  7. "go.uber.org/zap"
  8. "net"
  9. "strings"
  10. "time"
  11. )
  12. var tcpServer *tcp_server.TCPServer
  13. type tcpServe struct {
  14. }
  15. func (t *tcpServe) OnConnect(_ *tcp_server.TCPServer, conn net.Conn) {
  16. // 连接回调,与内存柜子信息关联
  17. if cabinets == nil {
  18. log.Error("cabinets is nil")
  19. return
  20. }
  21. remoteIP := strings.Split(conn.RemoteAddr().String(), ":")[0]
  22. cab, ok := cabinets.getCabinetByIP(remoteIP)
  23. if !ok {
  24. log.Error("remote ip not found in set", zap.String("remoteIP", remoteIP))
  25. tcpServer.WriteData(conn, []byte(fmt.Sprintf("remote ip %s not found in set", remoteIP)))
  26. time.Sleep(time.Second)
  27. //主动关闭客户端连接
  28. _ = conn.Close()
  29. createConnectLog(remoteIP, fmt.Sprintf("此ip未在平台登记,拒绝连接"))
  30. return
  31. }
  32. if cab.checkConnected() {
  33. log.Warn("remote ip has connected", zap.String("remoteIP", remoteIP))
  34. // 主动关闭连接
  35. conn.Close()
  36. createConnectLog(remoteIP, fmt.Sprintf("此ip已经被使用,拒绝连接"))
  37. return
  38. }
  39. // 张总要求
  40. time.Sleep(time.Second * 5)
  41. err := cab.syncCabinets(cab.memInfo.CabinetID, conn)
  42. if err != nil {
  43. log.Error("syncCabinets err", zap.Error(err))
  44. //柜子状态与平台同步失败,切断连接。重新连接
  45. cab.setUnconnected()
  46. conn.Close()
  47. createConnectLog(remoteIP, fmt.Sprintf("拒绝连接,同步柜子状态失败:%s", err))
  48. return
  49. }
  50. // 发送心跳
  51. go func() {
  52. if err := cab.heartBeat(cab.memInfo.CabinetID); err != nil {
  53. log.Error("get heartbeat err", zap.Error(err))
  54. cab.setUnconnected()
  55. conn.Close()
  56. createConnectLog(remoteIP, fmt.Sprintf("拒绝连接,发送心跳失败,:%s", err))
  57. }
  58. }()
  59. // 连接并且同步状态成功
  60. log.Info("cabinet connected success", zap.String("remoteIP", remoteIP))
  61. createConnectLog(remoteIP, fmt.Sprintf("连接成功"))
  62. }
  63. func (t *tcpServe) OnReceive(_ *tcp_server.TCPServer, conn net.Conn, data []byte, readSize int) {
  64. if cabinets == nil {
  65. log.Error("cabinets is nil")
  66. return
  67. }
  68. remoteIP := strings.Split(conn.RemoteAddr().String(), ":")[0]
  69. log.Info("receive message info", zap.String("remoteIP", remoteIP), zap.Int("dataLen", len(data)), zap.Int("readSize", readSize), zap.String("dataHex", hex.EncodeToString(data[:readSize])))
  70. cab, ok := cabinets.getCabinetByIP(remoteIP)
  71. if !ok {
  72. log.Warn("remote ip not found in set", zap.String("remoteIP", remoteIP))
  73. return
  74. }
  75. err := cab.packet.NewPacket(data[:readSize])
  76. if err != nil {
  77. log.Error(err.Error())
  78. return
  79. }
  80. }
  81. func (t *tcpServe) OnWrite(_ *tcp_server.TCPServer, conn net.Conn, writeSize int) {
  82. //remoteIP := strings.Split(conn.RemoteAddr().String(), ":")[0]
  83. //logrus.WithFields(logrus.Fields{"remoteIP": remoteIP, "writeSize": writeSize}).Info("send message info")
  84. return
  85. }
  86. func (t *tcpServe) OnReceiveError(_ *tcp_server.TCPServer, conn net.Conn, err error) {
  87. remoteIP := strings.Split(conn.RemoteAddr().String(), ":")[0]
  88. log.Error("receive error", zap.String("remoteIP", remoteIP), zap.Error(err))
  89. // 修改在线状态
  90. cab, ok := cabinets.getCabinetByIP(remoteIP)
  91. if !ok {
  92. log.Warn("remote ip not found in set", zap.String("remoteIP", remoteIP))
  93. return
  94. }
  95. cab.setUnconnected()
  96. log.Warn("remote ip offline", zap.String("remoteIP", remoteIP))
  97. }
  98. func (t *tcpServe) OnWriteError(_ *tcp_server.TCPServer, conn net.Conn, err error) {
  99. remoteIP := strings.Split(conn.RemoteAddr().String(), ":")[0]
  100. log.Error("send error", zap.String("remoteIP", remoteIP))
  101. // 修改在线状态
  102. cab, ok := cabinets.getCabinetByIP(remoteIP)
  103. if !ok {
  104. log.Warn("remote ip not found in set", zap.String("remoteIP", remoteIP))
  105. return
  106. }
  107. cab.setUnconnected()
  108. log.Warn("remote ip offline", zap.String("remoteIP", remoteIP))
  109. }
  110. func InitTcpServer(port string) {
  111. // 初始化tcpServer
  112. conf := tcp_server.NewTCPServerConfig()
  113. server, err := tcp_server.NewTCPServer(conf, "0.0.0.0:"+port, &tcpServe{})
  114. if err != nil {
  115. log.Error(err.Error())
  116. return
  117. }
  118. tcpServer = server
  119. log.Info("tcp server listening", zap.String("port", port))
  120. }
  121. func DestroyTcpServer() {
  122. tcp_server.DestroyTCPServer(tcpServer)
  123. tcpServer = nil
  124. }