123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- package network
- import (
- "errors"
- "fmt"
- "net"
- "time"
- )
- const (
- udpServerReceiveBufferSize = 1024
- )
- var UDPServerIgnoreResponse = errors.New("忽略响应")
- type UDPServerRequestCallback func(dataReader *DataReader) (*DataReader, error)
- type UDPServerOption func(opt *UDPServerOptions)
- func WithUDPServerReceiveBufferSize(receiveBufferSize int) UDPServerOption {
- return func(opt *UDPServerOptions) {
- opt.receiveBufferSize = receiveBufferSize
- }
- }
- func WithUDPServerWriteTimeout(timeout time.Duration) UDPServerOption {
- return func(opt *UDPServerOptions) {
- opt.writeTimeout = timeout
- }
- }
- func WithUDPServerReadTimeout(timeout time.Duration) UDPServerOption {
- return func(opt *UDPServerOptions) {
- opt.readTimeout = timeout
- }
- }
- func WithUDPServerRequestCallback(requestCallback UDPServerRequestCallback) UDPServerOption {
- return func(opt *UDPServerOptions) {
- opt.requestCallback = requestCallback
- }
- }
- type UDPServerOptions struct {
-
- receiveBufferSize int
-
- writeTimeout time.Duration
-
- readTimeout time.Duration
-
- requestCallback UDPServerRequestCallback
- }
- func NewUDPServerOptions(opts ...UDPServerOption) *UDPServerOptions {
- options := new(UDPServerOptions)
- for _, opt := range opts {
- opt(options)
- }
- if options.receiveBufferSize == 0 {
- options.receiveBufferSize = udpServerReceiveBufferSize
- }
- return options
- }
- type UDPServer struct {
- options *UDPServerOptions
- conn *net.UDPConn
- doneChan chan any
- }
- func (server *UDPServer) Connect(address string, options *UDPServerOptions) error {
- addr, err := net.ResolveUDPAddr("udp", address)
- if err != nil {
- return err
- }
-
- conn, err := net.ListenUDP("udp", addr)
- if err != nil {
- return err
- }
- server.options = options
- server.conn = conn
- server.doneChan = make(chan any)
-
- go server.readRequest()
- return nil
- }
- func (server *UDPServer) Disconnect() {
- server.doneChan <- nil
- close(server.doneChan)
- server.doneChan = nil
- CloseConnection(server.conn)
- server.conn = nil
- }
- func (server *UDPServer) readRequest() {
- dealRequestDoneChannels := make([]chan any, 0)
- for {
- select {
- case <-server.doneChan:
- for _, dealRequestDoneChan := range dealRequestDoneChannels {
- dealRequestDoneChan <- nil
- close(dealRequestDoneChan)
- }
- return
- default:
-
- data, rAddr, err := readUDP(server.conn, server.options.receiveBufferSize, withReadDeadline(server.options.readTimeout))
- if err != nil {
- fmt.Println(err)
- continue
- }
-
- dealRequestDoneChan := make(chan any)
- dealRequestDoneChannels = append(dealRequestDoneChannels, dealRequestDoneChan)
- go server.dealRequest(data, rAddr, dealRequestDoneChan)
- }
- }
- }
- func (server *UDPServer) dealRequest(data []byte, rAddr *net.UDPAddr, doneChan chan any) {
- for {
- select {
- case <-doneChan:
- return
- default:
-
- if server.options.requestCallback == nil {
- return
- }
-
- responseDataReader, err := server.options.requestCallback(NewDataReader(data))
- if err != nil {
-
- if errors.Is(err, UDPServerIgnoreResponse) {
- return
- }
- server.response(server.conn, rAddr, []byte(err.Error()))
- return
- }
- server.response(server.conn, rAddr, responseDataReader.GetBytes())
- return
- }
- }
- }
- func (server *UDPServer) response(conn *net.UDPConn, rAddr *net.UDPAddr, data []byte) {
- err := writeUDPWithRemoteAddr(conn, rAddr, data, withWriteDeadline(server.options.writeTimeout))
- if err != nil {
- fmt.Println("Response Error:", err)
- return
- }
- }
|