123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- package network
- import (
- "fmt"
- "net"
- "time"
- )
- const (
- tcpClientReceiveBufferSize = 1024
- )
- // TCPClientResponseCallback 响应回调
- // 参数:
- // dataReader: 响应数据DataReader
- // 返回值:
- // 是否读取完成,false继续读取
- type TCPClientResponseCallback func(dataReader *DataReader) bool
- type TCPClientOption func(opt *TCPClientOptions)
- func WithTCPClientRequestNonBlockCount(count int) TCPClientOption {
- return func(opt *TCPClientOptions) {
- opt.requestNonBlockCount = count
- }
- }
- func WithTCPClientReceiveBufferSize(receiveBufferSize int) TCPClientOption {
- return func(opt *TCPClientOptions) {
- opt.receiveBufferSize = receiveBufferSize
- }
- }
- func WithTCPClientWriteTimeout(timeout time.Duration) TCPClientOption {
- return func(opt *TCPClientOptions) {
- opt.writeTimeout = timeout
- }
- }
- func WithTCPClientReadTimeout(timeout time.Duration) TCPClientOption {
- return func(opt *TCPClientOptions) {
- opt.readTimeout = timeout
- }
- }
- func WithTCPClientResponseCallback(responseCallback TCPClientResponseCallback) TCPClientOption {
- return func(opt *TCPClientOptions) {
- opt.responseCallback = responseCallback
- }
- }
- type TCPClientOptions struct {
- // 不阻塞的请求数量,默认为阻塞
- requestNonBlockCount int
- // 默认1024字节,一般保证足够收取一个数据包
- receiveBufferSize int
- // 写超时,不设置就是阻塞写
- writeTimeout time.Duration
- // 读超时,不设置就是阻塞读
- readTimeout time.Duration
- // 响应数据回调
- responseCallback TCPClientResponseCallback
- }
- func NewTCPClientOptions(opts ...TCPClientOption) *TCPClientOptions {
- options := new(TCPClientOptions)
- for _, opt := range opts {
- opt(options)
- }
- if options.receiveBufferSize == 0 {
- options.receiveBufferSize = tcpClientReceiveBufferSize
- }
- return options
- }
- type TCPClient struct {
- options *TCPClientOptions
- conn *net.TCPConn
- requestChan chan []byte
- doneChan chan any
- }
- // Connect 建立连接
- func (client *TCPClient) Connect(serverAddress string, options *TCPClientOptions) error {
- serverAddr, err := net.ResolveTCPAddr("tcp", serverAddress)
- if err != nil {
- panic(err)
- }
- conn, err := net.DialTCP("tcp", nil, serverAddr)
- if err != nil {
- panic(err)
- }
- client.options = options
- client.conn = conn
- client.requestChan = make(chan []byte, options.requestNonBlockCount)
- client.doneChan = make(chan any)
- // 启动发送请求协程
- go client.sendRequest()
- return nil
- }
- // Disconnect 断开连接
- func (client *TCPClient) Disconnect() {
- client.doneChan <- nil
- close(client.doneChan)
- client.doneChan = nil
- close(client.requestChan)
- client.requestChan = nil
- closeConnection(client.conn)
- client.conn = nil
- }
- // Send 发送数据包,data应该为大端字节序
- func (client *TCPClient) Send(data []byte) {
- client.requestChan <- data
- }
- func (client *TCPClient) sendRequest() {
- dealRequestDoneChannels := make([]chan any, 0)
- for {
- select {
- case <-client.doneChan:
- for _, dealRequestDoneChan := range dealRequestDoneChannels {
- dealRequestDoneChan <- nil
- close(dealRequestDoneChan)
- }
- return
- case data := <-client.requestChan:
- err := writeTCP(client.conn, data, withWriteDeadline(client.options.writeTimeout))
- if err != nil {
- fmt.Println(err)
- continue
- }
- if client.options.responseCallback != nil {
- err = readTCP(client.conn, client.options.receiveBufferSize, func(data []byte) (bool, error) {
- readOver := client.options.responseCallback(NewDataReader(data))
- return readOver, nil
- }, withReadDeadline(client.options.readTimeout))
- if err != nil {
- fmt.Println(err)
- continue
- }
- }
- }
- }
- }
|