Pārlūkot izejas kodu

完成部分udp server

yjp 1 gadu atpakaļ
vecāks
revīzija
f340b6a2c2
2 mainītis faili ar 143 papildinājumiem un 2 dzēšanām
  1. 10 2
      network/connection.go
  2. 133 0
      network/udp_server.go

+ 10 - 2
network/connection.go

@@ -12,6 +12,10 @@ type ConnectionReadOptions func(conn net.Conn) error
 // WithReadDeadline 读超时选项
 func WithReadDeadline(duration time.Duration) ConnectionReadOptions {
 	return func(conn net.Conn) error {
+		if duration == 0 {
+			return nil
+		}
+
 		err := conn.SetReadDeadline(time.Now().Add(duration))
 		if err != nil {
 			return err
@@ -27,6 +31,10 @@ type ConnectionWriteOptions func(conn net.Conn) error
 // WithWriteDeadline 写超时选项
 func WithWriteDeadline(duration time.Duration) ConnectionWriteOptions {
 	return func(conn net.Conn) error {
+		if duration == 0 {
+			return nil
+		}
+
 		err := conn.SetWriteDeadline(time.Now().Add(duration))
 		if err != nil {
 			return err
@@ -36,8 +44,8 @@ func WithWriteDeadline(duration time.Duration) ConnectionWriteOptions {
 	}
 }
 
-// ReadUDP 读取UDP包
-func ReadUDP(conn *net.UDPConn, bufferSize int, opts ...ConnectionReadOptions) ([]byte, *net.UDPAddr, error) {
+// readUDP 读取UDP包
+func readUDP(conn *net.UDPConn, bufferSize int, opts ...ConnectionReadOptions) ([]byte, *net.UDPAddr, error) {
 	buffer := make([]byte, bufferSize)
 
 	for _, opt := range opts {

+ 133 - 0
network/udp_server.go

@@ -1 +1,134 @@
 package network
+
+import (
+	"fmt"
+	"net"
+	"time"
+)
+
+const (
+	udpServerReceiveBufferSize = 1024
+)
+
+type UDPServerOption func(opt *UDPServerOptions)
+
+func WithReceiveBufferSize(receiveBufferSize int) UDPServerOption {
+	return func(opt *UDPServerOptions) {
+		opt.ReceiveBufferSize = receiveBufferSize
+	}
+}
+
+func WithWriteTimeout(timeout time.Duration) UDPServerOption {
+	return func(opt *UDPServerOptions) {
+		opt.WriteTimeout = timeout
+	}
+}
+
+func WithReadTimeout(timeout time.Duration) UDPServerOption {
+	return func(opt *UDPServerOptions) {
+		opt.ReadTimeout = timeout
+	}
+}
+
+type UDPServerOptions struct {
+	ReceiveBufferSize int
+	WriteTimeout      time.Duration
+	ReadTimeout       time.Duration
+}
+
+func NewUDPServerOptions(opts ...UDPServerOption) *UDPServerOptions {
+	options := new(UDPServerOptions)
+
+	for _, opt := range opts {
+		opt(options)
+	}
+
+	if options.ReceiveBufferSize == 0 {
+		options.ReceiveBufferSize = udpServerReceiveBufferSize
+	}
+
+	return options
+}
+
+type UDPServer struct {
+	options             *UDPServerOptions
+	conn                *net.UDPConn
+	doneChan            chan any
+	dealRequestChan     chan *remoteData
+	dealRequestDoneChan chan any
+}
+
+type remoteData struct {
+	data  []byte
+	rAddr *net.UDPAddr
+}
+
+// Connect 建立连接
+func (server *UDPServer) Connect(address string, options *UDPServerOptions) error {
+	addr, err := net.ResolveUDPAddr("udp", address)
+	if err != nil {
+		return err
+	}
+
+	// 监听端口
+	conn, err := net.ListenUDP("udp", addr)
+	if err != nil {
+		return err
+	}
+
+	server.options = options
+	server.conn = conn
+	server.doneChan = make(chan any)
+
+	// 启动读取请求协程
+	go server.readRequest()
+
+	return nil
+}
+
+// Disconnect 断开连接
+func (server *UDPServer) Disconnect() {
+	server.doneChan <- nil
+	close(server.doneChan)
+	server.doneChan = nil
+
+	CloseConnection(server.conn)
+	server.conn = nil
+}
+
+func (server *UDPServer) readRequest() {
+	server.dealRequestChan = make(chan *remoteData)
+	server.dealRequestDoneChan = make(chan any)
+
+	go server.dealRequestAndResponse()
+
+	for {
+		select {
+		case <-server.doneChan:
+			server.dealRequestDoneChan <- nil
+			close(server.dealRequestDoneChan)
+			server.dealRequestDoneChan = nil
+
+			close(server.dealRequestChan)
+			server.dealRequestChan = nil
+
+			return
+		default:
+			// 读取任意客户端发来的请求
+			data, rAddr, err := readUDP(server.conn, server.options.ReceiveBufferSize, WithReadDeadline(server.options.ReadTimeout))
+			if err != nil {
+				fmt.Println(err)
+				continue
+			}
+
+			server.dealRequestChan <- &remoteData{
+				data:  data,
+				rAddr: rAddr,
+			}
+		}
+	}
+}
+
+func (server *UDPServer) dealRequestAndResponse() {
+	// 回调上层
+}