udp_server.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. package network
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "time"
  7. )
  8. const (
  9. udpServerReceiveBufferSize = 1024
  10. )
  11. var UDPServerIgnoreResponse = errors.New("忽略响应")
  12. // UDPServerRequestCallback 请求回调
  13. // 参数:
  14. // data: 请求数据
  15. // 返回值:
  16. // responseBytes: 响应数据包
  17. // err: 是否存在错误,如果是UDPServerIgnoreResponse,则忽略,不进行响应
  18. type UDPServerRequestCallback func(data []byte) (responseBytes []byte, err error)
  19. type UDPServerOption func(opt *UDPServerOptions)
  20. func WithUDPServerReceiveBufferSize(receiveBufferSize int) UDPServerOption {
  21. return func(opt *UDPServerOptions) {
  22. opt.receiveBufferSize = receiveBufferSize
  23. }
  24. }
  25. func WithUDPServerWriteTimeout(timeout time.Duration) UDPServerOption {
  26. return func(opt *UDPServerOptions) {
  27. opt.writeTimeout = timeout
  28. }
  29. }
  30. func WithUDPServerReadTimeout(timeout time.Duration) UDPServerOption {
  31. return func(opt *UDPServerOptions) {
  32. opt.readTimeout = timeout
  33. }
  34. }
  35. func WithUDPServerRequestCallback(requestCallback UDPServerRequestCallback) UDPServerOption {
  36. return func(opt *UDPServerOptions) {
  37. opt.requestCallback = requestCallback
  38. }
  39. }
  40. type UDPServerOptions struct {
  41. // 默认1024字节,一般保证足够收取一个数据包
  42. receiveBufferSize int
  43. // 写超时,不设置就是阻塞写
  44. writeTimeout time.Duration
  45. // 读超时,不设置就是阻塞读
  46. readTimeout time.Duration
  47. // 请求数据回调
  48. requestCallback UDPServerRequestCallback
  49. }
  50. func NewUDPServerOptions(opts ...UDPServerOption) *UDPServerOptions {
  51. options := new(UDPServerOptions)
  52. for _, opt := range opts {
  53. opt(options)
  54. }
  55. if options.receiveBufferSize == 0 {
  56. options.receiveBufferSize = udpServerReceiveBufferSize
  57. }
  58. return options
  59. }
  60. type UDPServer struct {
  61. options *UDPServerOptions
  62. conn *net.UDPConn
  63. doneChan chan any
  64. }
  65. // Connect 建立连接
  66. func (server *UDPServer) Connect(address string, options *UDPServerOptions) error {
  67. addr, err := net.ResolveUDPAddr("udp", address)
  68. if err != nil {
  69. return err
  70. }
  71. // 监听端口
  72. conn, err := net.ListenUDP("udp", addr)
  73. if err != nil {
  74. return err
  75. }
  76. server.options = options
  77. server.conn = conn
  78. server.doneChan = make(chan any)
  79. // 启动读取请求协程
  80. server.readRequest()
  81. return nil
  82. }
  83. // Disconnect 断开连接
  84. func (server *UDPServer) Disconnect() {
  85. server.doneChan <- nil
  86. close(server.doneChan)
  87. server.doneChan = nil
  88. CloseConnection(server.conn)
  89. server.conn = nil
  90. }
  91. func (server *UDPServer) readRequest() {
  92. dealRequestDoneChannels := make([]chan any, 0)
  93. for {
  94. select {
  95. case <-server.doneChan:
  96. for _, dealRequestDoneChan := range dealRequestDoneChannels {
  97. dealRequestDoneChan <- nil
  98. close(dealRequestDoneChan)
  99. }
  100. return
  101. default:
  102. // 读取任意客户端发来的请求,超时就是没有客户端发出请求
  103. data, rAddr, err := readUDP(server.conn, server.options.receiveBufferSize, WithReadDeadline(server.options.readTimeout))
  104. if err != nil {
  105. fmt.Println(err)
  106. continue
  107. }
  108. // 接收到请求
  109. dealRequestDoneChan := make(chan any)
  110. dealRequestDoneChannels = append(dealRequestDoneChannels, dealRequestDoneChan)
  111. go server.dealRequest(data, rAddr, dealRequestDoneChan)
  112. }
  113. }
  114. }
  115. func (server *UDPServer) dealRequest(data []byte, rAddr *net.UDPAddr, doneChan chan any) {
  116. for {
  117. select {
  118. case <-doneChan:
  119. return
  120. default:
  121. // 没有提供请求响应函数
  122. if server.options.requestCallback == nil {
  123. server.response(server.conn, rAddr, data)
  124. return
  125. }
  126. // 交给上层回调处理,返回处理结果和响应数据
  127. responseBytes, err := server.options.requestCallback(data)
  128. if err != nil {
  129. // 忽略响应
  130. if errors.Is(err, UDPServerIgnoreResponse) {
  131. return
  132. }
  133. server.response(server.conn, rAddr, []byte(err.Error()))
  134. return
  135. }
  136. server.response(server.conn, rAddr, responseBytes)
  137. return
  138. }
  139. }
  140. }
  141. func (server *UDPServer) response(conn *net.UDPConn, rAddr *net.UDPAddr, data []byte) {
  142. err := writeUDPWithRemoteAddr(conn, rAddr, data, WithWriteDeadline(server.options.writeTimeout))
  143. if err != nil {
  144. fmt.Println("Response Error:", err)
  145. return
  146. }
  147. }