udp_client.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package network
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "time"
  7. )
  8. const (
  9. udpClientReceiveBufferSize = 1024
  10. )
  11. // UDPClientResponseCallback 响应回调
  12. // 参数:
  13. // data: 响应数据,大端模式,可以用DataReader处理
  14. // 返回值:
  15. // 是否存在错误
  16. type UDPClientResponseCallback func(data []byte)
  17. type UDPClientOption func(opt *UDPClientOptions)
  18. func WithUDPClientRequestNonBlockCount(count int) UDPClientOption {
  19. return func(opt *UDPClientOptions) {
  20. opt.requestNonBlockCount = count
  21. }
  22. }
  23. func WithUDPClientReceiveBufferSize(receiveBufferSize int) UDPClientOption {
  24. return func(opt *UDPClientOptions) {
  25. opt.receiveBufferSize = receiveBufferSize
  26. }
  27. }
  28. func WithUDPClientWriteTimeout(timeout time.Duration) UDPClientOption {
  29. return func(opt *UDPClientOptions) {
  30. opt.writeTimeout = timeout
  31. }
  32. }
  33. func WithUDPClientReadTimeout(timeout time.Duration) UDPClientOption {
  34. return func(opt *UDPClientOptions) {
  35. opt.readTimeout = timeout
  36. }
  37. }
  38. func WithUDPClientResponseCallback(responseCallback UDPClientResponseCallback) UDPClientOption {
  39. return func(opt *UDPClientOptions) {
  40. opt.responseCallback = responseCallback
  41. }
  42. }
  43. type UDPClientOptions struct {
  44. // 不阻塞的请求数量,默认为阻塞
  45. requestNonBlockCount int
  46. // 默认1024字节,一般保证足够收取一个数据包
  47. receiveBufferSize int
  48. // 写超时,不设置就是阻塞写
  49. writeTimeout time.Duration
  50. // 读超时,不设置就是阻塞读
  51. readTimeout time.Duration
  52. // 响应数据回调
  53. responseCallback UDPClientResponseCallback
  54. }
  55. func NewUDPClientOptions(opts ...UDPClientOption) *UDPClientOptions {
  56. options := new(UDPClientOptions)
  57. for _, opt := range opts {
  58. opt(options)
  59. }
  60. if options.receiveBufferSize == 0 {
  61. options.receiveBufferSize = udpClientReceiveBufferSize
  62. }
  63. return options
  64. }
  65. type UDPClient struct {
  66. options *UDPClientOptions
  67. conn *net.UDPConn
  68. requestChan chan []byte
  69. doneChan chan any
  70. }
  71. // Connect 建立连接
  72. func (client *UDPClient) Connect(serverAddress string, options *UDPClientOptions) error {
  73. serverAddr, err := net.ResolveUDPAddr("udp", serverAddress)
  74. if err != nil {
  75. panic(err)
  76. }
  77. conn, err := net.DialUDP("udp", nil, serverAddr)
  78. if err != nil {
  79. panic(err)
  80. }
  81. client.options = options
  82. client.conn = conn
  83. client.requestChan = make(chan []byte, options.requestNonBlockCount)
  84. client.doneChan = make(chan any)
  85. // 启动发送请求协程
  86. client.sendRequest()
  87. return nil
  88. }
  89. // Disconnect 断开连接
  90. func (client *UDPClient) Disconnect() {
  91. client.doneChan <- nil
  92. close(client.doneChan)
  93. client.doneChan = nil
  94. close(client.requestChan)
  95. client.requestChan = nil
  96. CloseConnection(client.conn)
  97. client.conn = nil
  98. }
  99. // Send 发送数据包,data应该为大端字节序
  100. func (client *UDPClient) Send(data []byte) {
  101. client.requestChan <- data
  102. }
  103. func (client *UDPClient) sendRequest() {
  104. dealRequestDoneChannels := make([]chan any, 0)
  105. for {
  106. select {
  107. case <-client.doneChan:
  108. for _, dealRequestDoneChan := range dealRequestDoneChannels {
  109. dealRequestDoneChan <- nil
  110. close(dealRequestDoneChan)
  111. }
  112. return
  113. case data := <-client.requestChan:
  114. err := writeUDP(client.conn, data, withWriteDeadline(client.options.writeTimeout))
  115. if err != nil {
  116. fmt.Println(err)
  117. continue
  118. }
  119. responseBytes, _, err := readUDP(client.conn, client.options.receiveBufferSize, withReadDeadline(client.options.readTimeout))
  120. if err != nil {
  121. fmt.Println(err)
  122. continue
  123. }
  124. if client.options.responseCallback != nil {
  125. go client.dealResponse(responseBytes)
  126. }
  127. }
  128. }
  129. }
  130. func (client *UDPClient) dealResponse(responseBytes []byte) {
  131. dataBuffer := bytes.NewReader(responseBytes)
  132. reader := NewDataReader(dataBuffer)
  133. data, err := reader.Bytes(len(responseBytes))
  134. if err != nil {
  135. fmt.Println(err)
  136. return
  137. }
  138. client.options.responseCallback(data)
  139. }