udp_server.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package network
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "time"
  7. )
  8. const (
  9. udpServerReceiveBufferSize = 1024
  10. )
  11. var UDPServerIgnoreResponse = errors.New("忽略响应")
  12. // UDPServerRequestCallback 请求回调
  13. // 参数:
  14. // dataReader: 请求数据DataReader
  15. // 返回值:
  16. // 响应数据DataReader: 可以使用DataWriter构造,然后使用ToReader转换成DataReader,返回nil代表还要继续接收数据,不做响应
  17. type UDPServerRequestCallback func(dataReader *DataReader) *DataReader
  18. type UDPServerOption func(opt *UDPServerOptions)
  19. func WithUDPServerReceiveBufferSize(receiveBufferSize int) UDPServerOption {
  20. return func(opt *UDPServerOptions) {
  21. opt.receiveBufferSize = receiveBufferSize
  22. }
  23. }
  24. func WithUDPServerWriteTimeout(timeout time.Duration) UDPServerOption {
  25. return func(opt *UDPServerOptions) {
  26. opt.writeTimeout = timeout
  27. }
  28. }
  29. func WithUDPServerReadTimeout(timeout time.Duration) UDPServerOption {
  30. return func(opt *UDPServerOptions) {
  31. opt.readTimeout = timeout
  32. }
  33. }
  34. func WithUDPServerRequestCallback(requestCallback UDPServerRequestCallback) UDPServerOption {
  35. return func(opt *UDPServerOptions) {
  36. opt.requestCallback = requestCallback
  37. }
  38. }
  39. type UDPServerOptions struct {
  40. // 默认1024字节,一般保证足够收取一个数据包
  41. receiveBufferSize int
  42. // 写超时,不设置就是阻塞写
  43. writeTimeout time.Duration
  44. // 读超时,不设置就是阻塞读
  45. readTimeout time.Duration
  46. // 请求数据回调
  47. requestCallback UDPServerRequestCallback
  48. }
  49. func NewUDPServerOptions(opts ...UDPServerOption) *UDPServerOptions {
  50. options := new(UDPServerOptions)
  51. for _, opt := range opts {
  52. opt(options)
  53. }
  54. if options.receiveBufferSize == 0 {
  55. options.receiveBufferSize = udpServerReceiveBufferSize
  56. }
  57. return options
  58. }
  59. type UDPServer struct {
  60. options *UDPServerOptions
  61. conn *net.UDPConn
  62. doneChan chan any
  63. }
  64. // Connect 建立连接
  65. func (server *UDPServer) Connect(address string, options *UDPServerOptions) error {
  66. addr, err := net.ResolveUDPAddr("udp", address)
  67. if err != nil {
  68. return err
  69. }
  70. // 监听端口
  71. conn, err := net.ListenUDP("udp", addr)
  72. if err != nil {
  73. return err
  74. }
  75. server.options = options
  76. server.conn = conn
  77. server.doneChan = make(chan any)
  78. // 启动读取请求协程
  79. go server.readRequest()
  80. return nil
  81. }
  82. // Disconnect 断开连接
  83. func (server *UDPServer) Disconnect() {
  84. server.doneChan <- nil
  85. close(server.doneChan)
  86. server.doneChan = nil
  87. closeConnection(server.conn)
  88. server.conn = nil
  89. }
  90. func (server *UDPServer) readRequest() {
  91. dealRequestDoneChannels := make([]chan any, 0)
  92. for {
  93. select {
  94. case <-server.doneChan:
  95. for _, dealRequestDoneChan := range dealRequestDoneChannels {
  96. dealRequestDoneChan <- nil
  97. close(dealRequestDoneChan)
  98. }
  99. return
  100. default:
  101. // 读取任意客户端发来的请求,超时就是没有客户端发出请求
  102. data, rAddr, err := readUDP(server.conn, server.options.receiveBufferSize, withReadDeadline(server.options.readTimeout))
  103. if err != nil {
  104. fmt.Println(err)
  105. continue
  106. }
  107. // 接收到请求
  108. dealRequestDoneChan := make(chan any)
  109. dealRequestDoneChannels = append(dealRequestDoneChannels, dealRequestDoneChan)
  110. go server.dealRequest(data, rAddr, dealRequestDoneChan)
  111. }
  112. }
  113. }
  114. func (server *UDPServer) dealRequest(data []byte, rAddr *net.UDPAddr, doneChan chan any) {
  115. for {
  116. select {
  117. case <-doneChan:
  118. return
  119. default:
  120. // 没有提供请求响应函数
  121. if server.options.requestCallback == nil {
  122. return
  123. }
  124. // 交给上层回调处理,返回处理结果和响应数据
  125. responseDataReader := server.options.requestCallback(NewDataReader(data))
  126. if responseDataReader != nil {
  127. server.response(server.conn, rAddr, responseDataReader.GetBytes())
  128. return
  129. }
  130. return
  131. }
  132. }
  133. }
  134. func (server *UDPServer) response(conn *net.UDPConn, rAddr *net.UDPAddr, data []byte) {
  135. err := writeUDPWithRemoteAddr(conn, rAddr, data, withWriteDeadline(server.options.writeTimeout))
  136. if err != nil {
  137. fmt.Println("Response Error:", err)
  138. return
  139. }
  140. }