udp_server.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package network
  2. import (
  3. "fmt"
  4. "net"
  5. "time"
  6. )
  7. const (
  8. udpServerReceiveBufferSize = 1024
  9. )
  10. // UDPServerRequestCallback 请求回调
  11. // 参数:
  12. // data: 请求数据
  13. // 返回值:
  14. // send: 是否发送响应
  15. // responseBytes: 响应数据包
  16. // err: 是否存在错误
  17. type UDPServerRequestCallback func(data []byte) (send bool, responseBytes []byte, err error)
  18. type UDPServerOption func(opt *UDPServerOptions)
  19. func WithUDPServerReceiveBufferSize(receiveBufferSize int) UDPServerOption {
  20. return func(opt *UDPServerOptions) {
  21. opt.receiveBufferSize = receiveBufferSize
  22. }
  23. }
  24. func WithUDPServerWriteTimeout(timeout time.Duration) UDPServerOption {
  25. return func(opt *UDPServerOptions) {
  26. opt.writeTimeout = timeout
  27. }
  28. }
  29. func WithUDPServerReadTimeout(timeout time.Duration) UDPServerOption {
  30. return func(opt *UDPServerOptions) {
  31. opt.readTimeout = timeout
  32. }
  33. }
  34. func WithUDPServerRequestCallback(requestCallback UDPServerRequestCallback) UDPServerOption {
  35. return func(opt *UDPServerOptions) {
  36. opt.requestCallback = requestCallback
  37. }
  38. }
  39. type UDPServerOptions struct {
  40. // 默认1024字节,一般保证足够收取一个数据包
  41. receiveBufferSize int
  42. // 写超时,不设置就是阻塞写
  43. writeTimeout time.Duration
  44. // 读超时,不设置就是阻塞读
  45. readTimeout time.Duration
  46. // 请求数据回调
  47. requestCallback UDPServerRequestCallback
  48. }
  49. func NewUDPServerOptions(opts ...UDPServerOption) *UDPServerOptions {
  50. options := new(UDPServerOptions)
  51. for _, opt := range opts {
  52. opt(options)
  53. }
  54. if options.receiveBufferSize == 0 {
  55. options.receiveBufferSize = udpServerReceiveBufferSize
  56. }
  57. return options
  58. }
  59. type UDPServer struct {
  60. options *UDPServerOptions
  61. conn *net.UDPConn
  62. doneChan chan any
  63. }
  64. // Connect 建立连接
  65. func (server *UDPServer) Connect(address string, options *UDPServerOptions) error {
  66. addr, err := net.ResolveUDPAddr("udp", address)
  67. if err != nil {
  68. return err
  69. }
  70. // 监听端口
  71. conn, err := net.ListenUDP("udp", addr)
  72. if err != nil {
  73. return err
  74. }
  75. server.options = options
  76. server.conn = conn
  77. server.doneChan = make(chan any)
  78. // 启动读取请求协程
  79. server.readRequest()
  80. return nil
  81. }
  82. // Disconnect 断开连接
  83. func (server *UDPServer) Disconnect() {
  84. server.doneChan <- nil
  85. close(server.doneChan)
  86. server.doneChan = nil
  87. CloseConnection(server.conn)
  88. server.conn = nil
  89. }
  90. func (server *UDPServer) readRequest() {
  91. dealRequestDoneChannels := make([]chan any, 0)
  92. for {
  93. select {
  94. case <-server.doneChan:
  95. for _, dealRequestDoneChan := range dealRequestDoneChannels {
  96. dealRequestDoneChan <- nil
  97. close(dealRequestDoneChan)
  98. }
  99. return
  100. default:
  101. // 读取任意客户端发来的请求,超时就是没有客户端发出请求
  102. data, rAddr, err := readUDP(server.conn, server.options.receiveBufferSize, WithReadDeadline(server.options.readTimeout))
  103. if err != nil {
  104. fmt.Println(err)
  105. continue
  106. }
  107. // 接收到请求
  108. dealRequestDoneChan := make(chan any)
  109. dealRequestDoneChannels = append(dealRequestDoneChannels, dealRequestDoneChan)
  110. go server.dealRequest(data, rAddr, dealRequestDoneChan)
  111. }
  112. }
  113. }
  114. func (server *UDPServer) dealRequest(data []byte, rAddr *net.UDPAddr, doneChan chan any) {
  115. for {
  116. select {
  117. case <-doneChan:
  118. return
  119. default:
  120. if server.options.requestCallback == nil {
  121. server.response(server.conn, rAddr, data)
  122. return
  123. }
  124. send, responseBytes, err := server.options.requestCallback(data)
  125. if !send {
  126. return
  127. }
  128. if err != nil {
  129. server.response(server.conn, rAddr, []byte(err.Error()))
  130. }
  131. server.response(server.conn, rAddr, responseBytes)
  132. }
  133. }
  134. }
  135. func (server *UDPServer) response(conn *net.UDPConn, rAddr *net.UDPAddr, data []byte) {
  136. err := writeUDPWithRemoteAddr(conn, rAddr, data, WithWriteDeadline(server.options.writeTimeout))
  137. if err != nil {
  138. fmt.Println("Response Error:", err)
  139. return
  140. }
  141. }