udp_server.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package network
  2. import (
  3. "fmt"
  4. "net"
  5. "time"
  6. )
  7. const (
  8. udpServerReceiveBufferSize = 1024
  9. )
  10. // UDPServerRequestCallback 请求回调
  11. // 参数:
  12. // dataReader: 请求数据DataReader
  13. // 返回值:
  14. // 响应数据DataReader: 可以使用DataWriter构造,然后使用ToReader转换成DataReader,返回nil代表还要继续接收数据,不做响应
  15. type UDPServerRequestCallback func(dataReader *DataReader) *DataReader
  16. type UDPServerOption func(opt *UDPServerOptions)
  17. func WithUDPServerReceiveBufferSize(receiveBufferSize int) UDPServerOption {
  18. return func(opt *UDPServerOptions) {
  19. opt.receiveBufferSize = receiveBufferSize
  20. }
  21. }
  22. func WithUDPServerWriteTimeout(timeout time.Duration) UDPServerOption {
  23. return func(opt *UDPServerOptions) {
  24. opt.writeTimeout = timeout
  25. }
  26. }
  27. func WithUDPServerReadTimeout(timeout time.Duration) UDPServerOption {
  28. return func(opt *UDPServerOptions) {
  29. opt.readTimeout = timeout
  30. }
  31. }
  32. func WithUDPServerRequestCallback(requestCallback UDPServerRequestCallback) UDPServerOption {
  33. return func(opt *UDPServerOptions) {
  34. opt.requestCallback = requestCallback
  35. }
  36. }
  37. type UDPServerOptions struct {
  38. // 默认1024字节,一般保证足够收取一个数据包
  39. receiveBufferSize int
  40. // 写超时,不设置就是阻塞写
  41. writeTimeout time.Duration
  42. // 读超时,不设置就是阻塞读
  43. readTimeout time.Duration
  44. // 请求数据回调
  45. requestCallback UDPServerRequestCallback
  46. }
  47. func NewUDPServerOptions(opts ...UDPServerOption) *UDPServerOptions {
  48. options := new(UDPServerOptions)
  49. for _, opt := range opts {
  50. opt(options)
  51. }
  52. if options.receiveBufferSize == 0 {
  53. options.receiveBufferSize = udpServerReceiveBufferSize
  54. }
  55. return options
  56. }
  57. type UDPServer struct {
  58. options *UDPServerOptions
  59. conn *net.UDPConn
  60. doneChan chan any
  61. }
  62. // Connect 建立连接
  63. func (server *UDPServer) Connect(address string, options *UDPServerOptions) error {
  64. addr, err := net.ResolveUDPAddr("udp", address)
  65. if err != nil {
  66. return err
  67. }
  68. // 监听端口
  69. conn, err := net.ListenUDP("udp", addr)
  70. if err != nil {
  71. return err
  72. }
  73. server.options = options
  74. server.conn = conn
  75. server.doneChan = make(chan any)
  76. // 启动读取请求协程
  77. go server.readRequest()
  78. return nil
  79. }
  80. // Disconnect 断开连接
  81. func (server *UDPServer) Disconnect() {
  82. server.doneChan <- nil
  83. close(server.doneChan)
  84. server.doneChan = nil
  85. closeConnection(server.conn)
  86. server.conn = nil
  87. }
  88. func (server *UDPServer) readRequest() {
  89. dealRequestDoneChannels := make([]chan any, 0)
  90. for {
  91. select {
  92. case <-server.doneChan:
  93. for _, dealRequestDoneChan := range dealRequestDoneChannels {
  94. dealRequestDoneChan <- nil
  95. close(dealRequestDoneChan)
  96. }
  97. return
  98. default:
  99. // 读取任意客户端发来的请求,超时就是没有客户端发出请求
  100. data, rAddr, err := readUDP(server.conn, server.options.receiveBufferSize, withReadDeadline(server.options.readTimeout))
  101. if err != nil {
  102. fmt.Println(err)
  103. continue
  104. }
  105. // 接收到请求
  106. dealRequestDoneChan := make(chan any)
  107. dealRequestDoneChannels = append(dealRequestDoneChannels, dealRequestDoneChan)
  108. go server.dealRequest(data, rAddr, dealRequestDoneChan)
  109. }
  110. }
  111. }
  112. func (server *UDPServer) dealRequest(data []byte, rAddr *net.UDPAddr, doneChan chan any) {
  113. for {
  114. select {
  115. case <-doneChan:
  116. return
  117. default:
  118. // 没有提供请求响应函数
  119. if server.options.requestCallback == nil {
  120. return
  121. }
  122. // 交给上层回调处理,返回处理结果和响应数据
  123. responseDataReader := server.options.requestCallback(NewDataReader(data))
  124. if responseDataReader != nil {
  125. server.response(server.conn, rAddr, responseDataReader.GetBytes())
  126. return
  127. }
  128. return
  129. }
  130. }
  131. }
  132. func (server *UDPServer) response(conn *net.UDPConn, rAddr *net.UDPAddr, data []byte) {
  133. err := writeUDPWithRemoteAddr(conn, rAddr, data, withWriteDeadline(server.options.writeTimeout))
  134. if err != nil {
  135. fmt.Println("Response Error:", err)
  136. return
  137. }
  138. }