udp_client.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package network
  2. import (
  3. "fmt"
  4. "net"
  5. "time"
  6. )
  7. const (
  8. udpClientReceiveBufferSize = 1024
  9. )
  10. // UDPClientResponseCallback 响应回调
  11. // 参数:
  12. // dataReader: 响应数据DataReader
  13. type UDPClientResponseCallback func(dataReader *DataReader)
  14. type UDPClientOption func(opt *UDPClientOptions)
  15. func WithUDPClientRequestNonBlockCount(count int) UDPClientOption {
  16. return func(opt *UDPClientOptions) {
  17. opt.requestNonBlockCount = count
  18. }
  19. }
  20. func WithUDPClientReceiveBufferSize(receiveBufferSize int) UDPClientOption {
  21. return func(opt *UDPClientOptions) {
  22. opt.receiveBufferSize = receiveBufferSize
  23. }
  24. }
  25. func WithUDPClientWriteTimeout(timeout time.Duration) UDPClientOption {
  26. return func(opt *UDPClientOptions) {
  27. opt.writeTimeout = timeout
  28. }
  29. }
  30. func WithUDPClientReadTimeout(timeout time.Duration) UDPClientOption {
  31. return func(opt *UDPClientOptions) {
  32. opt.readTimeout = timeout
  33. }
  34. }
  35. func WithUDPClientResponseCallback(responseCallback UDPClientResponseCallback) UDPClientOption {
  36. return func(opt *UDPClientOptions) {
  37. opt.responseCallback = responseCallback
  38. }
  39. }
  40. type UDPClientOptions struct {
  41. // 不阻塞的请求数量,默认为阻塞
  42. requestNonBlockCount int
  43. // 默认1024字节,一般保证足够收取一个数据包
  44. receiveBufferSize int
  45. // 写超时,不设置就是阻塞写
  46. writeTimeout time.Duration
  47. // 读超时,不设置就是阻塞读
  48. readTimeout time.Duration
  49. // 响应数据回调
  50. responseCallback UDPClientResponseCallback
  51. }
  52. func NewUDPClientOptions(opts ...UDPClientOption) *UDPClientOptions {
  53. options := new(UDPClientOptions)
  54. for _, opt := range opts {
  55. opt(options)
  56. }
  57. if options.receiveBufferSize == 0 {
  58. options.receiveBufferSize = udpClientReceiveBufferSize
  59. }
  60. return options
  61. }
  62. type UDPClient struct {
  63. options *UDPClientOptions
  64. conn *net.UDPConn
  65. requestChan chan []byte
  66. doneChan chan any
  67. }
  68. // Connect 建立连接
  69. func (client *UDPClient) Connect(serverAddress string, options *UDPClientOptions) error {
  70. serverAddr, err := net.ResolveUDPAddr("udp", serverAddress)
  71. if err != nil {
  72. panic(err)
  73. }
  74. conn, err := net.DialUDP("udp", nil, serverAddr)
  75. if err != nil {
  76. panic(err)
  77. }
  78. client.options = options
  79. client.conn = conn
  80. client.requestChan = make(chan []byte, options.requestNonBlockCount)
  81. client.doneChan = make(chan any)
  82. // 启动发送请求协程
  83. go client.sendRequest()
  84. return nil
  85. }
  86. // Disconnect 断开连接
  87. func (client *UDPClient) Disconnect() {
  88. client.doneChan <- nil
  89. close(client.doneChan)
  90. client.doneChan = nil
  91. close(client.requestChan)
  92. client.requestChan = nil
  93. closeConnection(client.conn)
  94. client.conn = nil
  95. }
  96. // Send 发送数据包,data应该为大端字节序
  97. func (client *UDPClient) Send(data []byte) {
  98. client.requestChan <- data
  99. }
  100. func (client *UDPClient) sendRequest() {
  101. dealRequestDoneChannels := make([]chan any, 0)
  102. for {
  103. select {
  104. case <-client.doneChan:
  105. for _, dealRequestDoneChan := range dealRequestDoneChannels {
  106. dealRequestDoneChan <- nil
  107. close(dealRequestDoneChan)
  108. }
  109. return
  110. case data := <-client.requestChan:
  111. err := writeUDP(client.conn, data, withWriteDeadline(client.options.writeTimeout))
  112. if err != nil {
  113. fmt.Println(err)
  114. continue
  115. }
  116. if client.options.responseCallback != nil {
  117. responseBytes, _, err := readUDP(client.conn, client.options.receiveBufferSize, withReadDeadline(client.options.readTimeout))
  118. if err != nil {
  119. fmt.Println(err)
  120. continue
  121. }
  122. go client.options.responseCallback(NewDataReader(responseBytes))
  123. }
  124. }
  125. }
  126. }