package network import ( "fmt" "net" "time" ) const ( udpClientReceiveBufferSize = 1024 ) // UDPClientResponseCallback 响应回调 // 参数: // dataReader: 响应数据DataReader type UDPClientResponseCallback func(dataReader *DataReader) type UDPClientOption func(opt *UDPClientOptions) func WithUDPClientRequestNonBlockCount(count int) UDPClientOption { return func(opt *UDPClientOptions) { opt.requestNonBlockCount = count } } func WithUDPClientReceiveBufferSize(receiveBufferSize int) UDPClientOption { return func(opt *UDPClientOptions) { opt.receiveBufferSize = receiveBufferSize } } func WithUDPClientWriteTimeout(timeout time.Duration) UDPClientOption { return func(opt *UDPClientOptions) { opt.writeTimeout = timeout } } func WithUDPClientReadTimeout(timeout time.Duration) UDPClientOption { return func(opt *UDPClientOptions) { opt.readTimeout = timeout } } func WithUDPClientResponseCallback(responseCallback UDPClientResponseCallback) UDPClientOption { return func(opt *UDPClientOptions) { opt.responseCallback = responseCallback } } type UDPClientOptions struct { // 不阻塞的请求数量,默认为阻塞 requestNonBlockCount int // 默认1024字节,一般保证足够收取一个数据包 receiveBufferSize int // 写超时,不设置就是阻塞写 writeTimeout time.Duration // 读超时,不设置就是阻塞读 readTimeout time.Duration // 响应数据回调 responseCallback UDPClientResponseCallback } func NewUDPClientOptions(opts ...UDPClientOption) *UDPClientOptions { options := new(UDPClientOptions) for _, opt := range opts { opt(options) } if options.receiveBufferSize == 0 { options.receiveBufferSize = udpClientReceiveBufferSize } return options } type UDPClient struct { options *UDPClientOptions conn *net.UDPConn requestChan chan []byte doneChan chan any } // Connect 建立连接 func (client *UDPClient) Connect(serverAddress string, options *UDPClientOptions) error { serverAddr, err := net.ResolveUDPAddr("udp", serverAddress) if err != nil { panic(err) } conn, err := net.DialUDP("udp", nil, serverAddr) if err != nil { panic(err) } client.options = options client.conn = conn client.requestChan = make(chan []byte, options.requestNonBlockCount) client.doneChan = make(chan any) // 启动发送请求协程 go client.sendRequest() return nil } // Disconnect 断开连接 func (client *UDPClient) Disconnect() { client.doneChan <- nil close(client.doneChan) client.doneChan = nil close(client.requestChan) client.requestChan = nil closeConnection(client.conn) client.conn = nil } // Send 发送数据包,data应该为大端字节序 func (client *UDPClient) Send(data []byte) { client.requestChan <- data } func (client *UDPClient) sendRequest() { dealRequestDoneChannels := make([]chan any, 0) for { select { case <-client.doneChan: for _, dealRequestDoneChan := range dealRequestDoneChannels { dealRequestDoneChan <- nil close(dealRequestDoneChan) } return case data := <-client.requestChan: err := writeUDP(client.conn, data, withWriteDeadline(client.options.writeTimeout)) if err != nil { fmt.Println(err) continue } if client.options.responseCallback != nil { responseBytes, _, err := readUDP(client.conn, client.options.receiveBufferSize, withReadDeadline(client.options.readTimeout)) if err != nil { fmt.Println(err) continue } go client.options.responseCallback(NewDataReader(responseBytes)) } } } }