udp_server.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package network
  2. import (
  3. "fmt"
  4. "net"
  5. "time"
  6. )
  7. const (
  8. udpServerReceiveBufferSize = 1024
  9. )
  10. type UDPServerOption func(opt *UDPServerOptions)
  11. func WithReceiveBufferSize(receiveBufferSize int) UDPServerOption {
  12. return func(opt *UDPServerOptions) {
  13. opt.ReceiveBufferSize = receiveBufferSize
  14. }
  15. }
  16. func WithWriteTimeout(timeout time.Duration) UDPServerOption {
  17. return func(opt *UDPServerOptions) {
  18. opt.WriteTimeout = timeout
  19. }
  20. }
  21. func WithReadTimeout(timeout time.Duration) UDPServerOption {
  22. return func(opt *UDPServerOptions) {
  23. opt.ReadTimeout = timeout
  24. }
  25. }
  26. type UDPServerOptions struct {
  27. ReceiveBufferSize int
  28. WriteTimeout time.Duration
  29. ReadTimeout time.Duration
  30. }
  31. func NewUDPServerOptions(opts ...UDPServerOption) *UDPServerOptions {
  32. options := new(UDPServerOptions)
  33. for _, opt := range opts {
  34. opt(options)
  35. }
  36. if options.ReceiveBufferSize == 0 {
  37. options.ReceiveBufferSize = udpServerReceiveBufferSize
  38. }
  39. return options
  40. }
  41. type UDPServer struct {
  42. options *UDPServerOptions
  43. conn *net.UDPConn
  44. doneChan chan any
  45. dealRequestChan chan *remoteData
  46. dealRequestDoneChan chan any
  47. }
  48. type remoteData struct {
  49. data []byte
  50. rAddr *net.UDPAddr
  51. }
  52. // Connect 建立连接
  53. func (server *UDPServer) Connect(address string, options *UDPServerOptions) error {
  54. addr, err := net.ResolveUDPAddr("udp", address)
  55. if err != nil {
  56. return err
  57. }
  58. // 监听端口
  59. conn, err := net.ListenUDP("udp", addr)
  60. if err != nil {
  61. return err
  62. }
  63. server.options = options
  64. server.conn = conn
  65. server.doneChan = make(chan any)
  66. // 启动读取请求协程
  67. go server.readRequest()
  68. return nil
  69. }
  70. // Disconnect 断开连接
  71. func (server *UDPServer) Disconnect() {
  72. server.doneChan <- nil
  73. close(server.doneChan)
  74. server.doneChan = nil
  75. CloseConnection(server.conn)
  76. server.conn = nil
  77. }
  78. func (server *UDPServer) readRequest() {
  79. server.dealRequestChan = make(chan *remoteData)
  80. server.dealRequestDoneChan = make(chan any)
  81. go server.dealRequestAndResponse()
  82. for {
  83. select {
  84. case <-server.doneChan:
  85. server.dealRequestDoneChan <- nil
  86. close(server.dealRequestDoneChan)
  87. server.dealRequestDoneChan = nil
  88. close(server.dealRequestChan)
  89. server.dealRequestChan = nil
  90. return
  91. default:
  92. // 读取任意客户端发来的请求
  93. data, rAddr, err := readUDP(server.conn, server.options.ReceiveBufferSize, WithReadDeadline(server.options.ReadTimeout))
  94. if err != nil {
  95. fmt.Println(err)
  96. continue
  97. }
  98. server.dealRequestChan <- &remoteData{
  99. data: data,
  100. rAddr: rAddr,
  101. }
  102. }
  103. }
  104. }
  105. func (server *UDPServer) dealRequestAndResponse() {
  106. // 回调上层
  107. }