123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- package network
- import (
- "fmt"
- "net"
- "time"
- )
- const (
- udpClientReceiveBufferSize = 1024
- )
- // UDPClientResponseCallback 响应回调
- // 参数:
- // dataReader: 响应数据DataReader
- type UDPClientResponseCallback func(dataReader *DataReader)
- type UDPClientOption func(opt *UDPClientOptions)
- func WithUDPClientRequestNonBlockCount(count int) UDPClientOption {
- return func(opt *UDPClientOptions) {
- opt.requestNonBlockCount = count
- }
- }
- func WithUDPClientReceiveBufferSize(receiveBufferSize int) UDPClientOption {
- return func(opt *UDPClientOptions) {
- opt.receiveBufferSize = receiveBufferSize
- }
- }
- func WithUDPClientWriteTimeout(timeout time.Duration) UDPClientOption {
- return func(opt *UDPClientOptions) {
- opt.writeTimeout = timeout
- }
- }
- func WithUDPClientReadTimeout(timeout time.Duration) UDPClientOption {
- return func(opt *UDPClientOptions) {
- opt.readTimeout = timeout
- }
- }
- func WithUDPClientResponseCallback(responseCallback UDPClientResponseCallback) UDPClientOption {
- return func(opt *UDPClientOptions) {
- opt.responseCallback = responseCallback
- }
- }
- type UDPClientOptions struct {
- // 不阻塞的请求数量,默认为阻塞
- requestNonBlockCount int
- // 默认1024字节,一般保证足够收取一个数据包
- receiveBufferSize int
- // 写超时,不设置就是阻塞写
- writeTimeout time.Duration
- // 读超时,不设置就是阻塞读
- readTimeout time.Duration
- // 响应数据回调
- responseCallback UDPClientResponseCallback
- }
- func NewUDPClientOptions(opts ...UDPClientOption) *UDPClientOptions {
- options := new(UDPClientOptions)
- for _, opt := range opts {
- opt(options)
- }
- if options.receiveBufferSize == 0 {
- options.receiveBufferSize = udpClientReceiveBufferSize
- }
- return options
- }
- type UDPClient struct {
- options *UDPClientOptions
- conn *net.UDPConn
- requestChan chan []byte
- doneChan chan any
- }
- // Connect 建立连接
- func (client *UDPClient) Connect(serverAddress string, options *UDPClientOptions) error {
- serverAddr, err := net.ResolveUDPAddr("udp", serverAddress)
- if err != nil {
- panic(err)
- }
- conn, err := net.DialUDP("udp", nil, serverAddr)
- if err != nil {
- panic(err)
- }
- client.options = options
- client.conn = conn
- client.requestChan = make(chan []byte, options.requestNonBlockCount)
- client.doneChan = make(chan any)
- // 启动发送请求协程
- go client.sendRequest()
- return nil
- }
- // Disconnect 断开连接
- func (client *UDPClient) Disconnect() {
- client.doneChan <- nil
- close(client.doneChan)
- client.doneChan = nil
- close(client.requestChan)
- client.requestChan = nil
- closeConnection(client.conn)
- client.conn = nil
- }
- // Send 发送数据包,data应该为大端字节序
- func (client *UDPClient) Send(data []byte) {
- client.requestChan <- data
- }
- func (client *UDPClient) sendRequest() {
- dealRequestDoneChannels := make([]chan any, 0)
- for {
- select {
- case <-client.doneChan:
- for _, dealRequestDoneChan := range dealRequestDoneChannels {
- dealRequestDoneChan <- nil
- close(dealRequestDoneChan)
- }
- return
- case data := <-client.requestChan:
- err := writeUDP(client.conn, data, withWriteDeadline(client.options.writeTimeout))
- if err != nil {
- fmt.Println(err)
- continue
- }
- if client.options.responseCallback != nil {
- responseBytes, _, err := readUDP(client.conn, client.options.receiveBufferSize, withReadDeadline(client.options.readTimeout))
- if err != nil {
- fmt.Println(err)
- continue
- }
- go client.options.responseCallback(NewDataReader(responseBytes))
- }
- }
- }
- }
|