udp_server.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package network
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "time"
  7. )
  8. const (
  9. udpServerReceiveBufferSize = 1024
  10. )
  11. var UDPServerIgnoreResponse = errors.New("忽略响应")
  12. // UDPServerRequestCallback 请求回调
  13. // 参数:
  14. // dataReader: 请求数据DataReader
  15. // 返回值:
  16. // 响应数据DataReader: 可以使用DataWriter构造,然后使用ToReader转换成DataReader
  17. // 是否存在错误: 如果是UDPServerIgnoreResponse,则忽略,不进行响应
  18. type UDPServerRequestCallback func(dataReader *DataReader) (*DataReader, error)
  19. type UDPServerOption func(opt *UDPServerOptions)
  20. func WithUDPServerReceiveBufferSize(receiveBufferSize int) UDPServerOption {
  21. return func(opt *UDPServerOptions) {
  22. opt.receiveBufferSize = receiveBufferSize
  23. }
  24. }
  25. func WithUDPServerWriteTimeout(timeout time.Duration) UDPServerOption {
  26. return func(opt *UDPServerOptions) {
  27. opt.writeTimeout = timeout
  28. }
  29. }
  30. func WithUDPServerReadTimeout(timeout time.Duration) UDPServerOption {
  31. return func(opt *UDPServerOptions) {
  32. opt.readTimeout = timeout
  33. }
  34. }
  35. func WithUDPServerRequestCallback(requestCallback UDPServerRequestCallback) UDPServerOption {
  36. return func(opt *UDPServerOptions) {
  37. opt.requestCallback = requestCallback
  38. }
  39. }
  40. type UDPServerOptions struct {
  41. // 默认1024字节,一般保证足够收取一个数据包
  42. receiveBufferSize int
  43. // 写超时,不设置就是阻塞写
  44. writeTimeout time.Duration
  45. // 读超时,不设置就是阻塞读
  46. readTimeout time.Duration
  47. // 请求数据回调
  48. requestCallback UDPServerRequestCallback
  49. }
  50. func NewUDPServerOptions(opts ...UDPServerOption) *UDPServerOptions {
  51. options := new(UDPServerOptions)
  52. for _, opt := range opts {
  53. opt(options)
  54. }
  55. if options.receiveBufferSize == 0 {
  56. options.receiveBufferSize = udpServerReceiveBufferSize
  57. }
  58. return options
  59. }
  60. type UDPServer struct {
  61. options *UDPServerOptions
  62. conn *net.UDPConn
  63. doneChan chan any
  64. }
  65. // Connect 建立连接
  66. func (server *UDPServer) Connect(address string, options *UDPServerOptions) error {
  67. addr, err := net.ResolveUDPAddr("udp", address)
  68. if err != nil {
  69. return err
  70. }
  71. // 监听端口
  72. conn, err := net.ListenUDP("udp", addr)
  73. if err != nil {
  74. return err
  75. }
  76. server.options = options
  77. server.conn = conn
  78. server.doneChan = make(chan any)
  79. // 启动读取请求协程
  80. go server.readRequest()
  81. return nil
  82. }
  83. // Disconnect 断开连接
  84. func (server *UDPServer) Disconnect() {
  85. server.doneChan <- nil
  86. close(server.doneChan)
  87. server.doneChan = nil
  88. CloseConnection(server.conn)
  89. server.conn = nil
  90. }
  91. func (server *UDPServer) readRequest() {
  92. dealRequestDoneChannels := make([]chan any, 0)
  93. for {
  94. select {
  95. case <-server.doneChan:
  96. for _, dealRequestDoneChan := range dealRequestDoneChannels {
  97. dealRequestDoneChan <- nil
  98. close(dealRequestDoneChan)
  99. }
  100. return
  101. default:
  102. // 读取任意客户端发来的请求,超时就是没有客户端发出请求
  103. data, rAddr, err := readUDP(server.conn, server.options.receiveBufferSize, withReadDeadline(server.options.readTimeout))
  104. if err != nil {
  105. fmt.Println(err)
  106. continue
  107. }
  108. // 接收到请求
  109. dealRequestDoneChan := make(chan any)
  110. dealRequestDoneChannels = append(dealRequestDoneChannels, dealRequestDoneChan)
  111. go server.dealRequest(data, rAddr, dealRequestDoneChan)
  112. }
  113. }
  114. }
  115. func (server *UDPServer) dealRequest(data []byte, rAddr *net.UDPAddr, doneChan chan any) {
  116. for {
  117. select {
  118. case <-doneChan:
  119. return
  120. default:
  121. // 没有提供请求响应函数
  122. if server.options.requestCallback == nil {
  123. return
  124. }
  125. // 交给上层回调处理,返回处理结果和响应数据
  126. responseDataReader, err := server.options.requestCallback(NewDataReader(data))
  127. if err != nil {
  128. // 忽略响应
  129. if errors.Is(err, UDPServerIgnoreResponse) {
  130. return
  131. }
  132. server.response(server.conn, rAddr, []byte(err.Error()))
  133. return
  134. }
  135. server.response(server.conn, rAddr, responseDataReader.GetBytes())
  136. return
  137. }
  138. }
  139. }
  140. func (server *UDPServer) response(conn *net.UDPConn, rAddr *net.UDPAddr, data []byte) {
  141. err := writeUDPWithRemoteAddr(conn, rAddr, data, withWriteDeadline(server.options.writeTimeout))
  142. if err != nil {
  143. fmt.Println("Response Error:", err)
  144. return
  145. }
  146. }