package network import ( "fmt" "net" "time" ) const ( tcpClientReceiveBufferSize = 1024 ) // TCPClientResponseCallback 响应回调 // 参数: // dataReader: 响应数据DataReader // 返回值: // 是否读取完成,false继续读取 type TCPClientResponseCallback func(dataReader *DataReader) bool type TCPClientOption func(opt *TCPClientOptions) func WithTCPClientRequestNonBlockCount(count int) TCPClientOption { return func(opt *TCPClientOptions) { opt.requestNonBlockCount = count } } func WithTCPClientReceiveBufferSize(receiveBufferSize int) TCPClientOption { return func(opt *TCPClientOptions) { opt.receiveBufferSize = receiveBufferSize } } func WithTCPClientWriteTimeout(timeout time.Duration) TCPClientOption { return func(opt *TCPClientOptions) { opt.writeTimeout = timeout } } func WithTCPClientReadTimeout(timeout time.Duration) TCPClientOption { return func(opt *TCPClientOptions) { opt.readTimeout = timeout } } func WithTCPClientResponseCallback(responseCallback TCPClientResponseCallback) TCPClientOption { return func(opt *TCPClientOptions) { opt.responseCallback = responseCallback } } type TCPClientOptions struct { // 不阻塞的请求数量,默认为阻塞 requestNonBlockCount int // 默认1024字节,一般保证足够收取一个数据包 receiveBufferSize int // 写超时,不设置就是阻塞写 writeTimeout time.Duration // 读超时,不设置就是阻塞读 readTimeout time.Duration // 响应数据回调 responseCallback TCPClientResponseCallback } func NewTCPClientOptions(opts ...TCPClientOption) *TCPClientOptions { options := new(TCPClientOptions) for _, opt := range opts { opt(options) } if options.receiveBufferSize == 0 { options.receiveBufferSize = tcpClientReceiveBufferSize } return options } type TCPClient struct { options *TCPClientOptions conn *net.TCPConn requestChan chan []byte doneChan chan any } // Connect 建立连接 func (client *TCPClient) Connect(serverAddress string, options *TCPClientOptions) error { serverAddr, err := net.ResolveTCPAddr("tcp", serverAddress) if err != nil { panic(err) } conn, err := net.DialTCP("tcp", nil, serverAddr) if err != nil { panic(err) } client.options = options client.conn = conn client.requestChan = make(chan []byte, options.requestNonBlockCount) client.doneChan = make(chan any) // 启动发送请求协程 go client.sendRequest() return nil } // Disconnect 断开连接 func (client *TCPClient) Disconnect() { client.doneChan <- nil close(client.doneChan) client.doneChan = nil close(client.requestChan) client.requestChan = nil closeConnection(client.conn) client.conn = nil } // Send 发送数据包,data应该为大端字节序 func (client *TCPClient) Send(data []byte) { client.requestChan <- data } func (client *TCPClient) sendRequest() { dealRequestDoneChannels := make([]chan any, 0) for { select { case <-client.doneChan: for _, dealRequestDoneChan := range dealRequestDoneChannels { dealRequestDoneChan <- nil close(dealRequestDoneChan) } return case data := <-client.requestChan: err := writeTCP(client.conn, data, withWriteDeadline(client.options.writeTimeout)) if err != nil { fmt.Println(err) continue } if client.options.responseCallback != nil { err = readTCP(client.conn, client.options.receiveBufferSize, func(data []byte) (bool, error) { readOver := client.options.responseCallback(NewDataReader(data)) return readOver, nil }, withReadDeadline(client.options.readTimeout)) if err != nil { fmt.Println(err) continue } } } } }