Browse Source

修改websocket包

yjp 5 tháng trước cách đây
mục cha
commit
234d9dbd1d

+ 1 - 0
go.mod

@@ -13,6 +13,7 @@ require (
 	github.com/mitchellh/mapstructure v1.5.0
 	github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
 	github.com/olahol/melody v1.2.1
+	github.com/pkg/errors v0.9.1
 	github.com/redis/go-redis/v9 v9.4.0
 	github.com/satori/go.uuid v1.2.0
 	github.com/shopspring/decimal v1.2.0

+ 2 - 0
go.sum

@@ -49,6 +49,8 @@ github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6
 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
 github.com/olahol/melody v1.2.1 h1:xdwRkzHxf+B0w4TKbGpUSSkV516ZucQZJIWLztOWICQ=
 github.com/olahol/melody v1.2.1/go.mod h1:GgkTl6Y7yWj/HtfD48Q5vLKPVoZOH+Qqgfa7CvJgJM4=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk=

+ 1 - 1
http_client/request.go

@@ -2,8 +2,8 @@ package http_client
 
 import (
 	"encoding/json"
-	"errors"
 	"github.com/go-resty/resty/v2"
+	"github.com/pkg/errors"
 )
 
 type RequestOption func(request *Request)

+ 1 - 1
maputils/maputils.go

@@ -1,7 +1,7 @@
 package maputils
 
 import (
-	"errors"
+	"github.com/pkg/errors"
 	"strconv"
 	"strings"
 )

+ 1 - 1
mqtt_client/mqtt_client.go

@@ -2,9 +2,9 @@ package mqtt_client
 
 import (
 	"encoding/json"
-	"errors"
 	"git.sxidc.com/go-tools/utils/strutils"
 	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"github.com/pkg/errors"
 	"log"
 	"sync"
 	"time"

+ 1 - 1
oss/oss.go

@@ -2,9 +2,9 @@ package oss
 
 import (
 	"bytes"
-	"errors"
 	"git.sxidc.com/go-tools/utils/strutils"
 	"github.com/aliyun/aliyun-oss-go-sdk/oss"
+	"github.com/pkg/errors"
 	"io"
 	"log"
 )

+ 1 - 1
pipeline/component/component.go

@@ -1,8 +1,8 @@
 package component
 
 import (
-	"errors"
 	"git.sxidc.com/go-tools/utils/strutils"
+	"github.com/pkg/errors"
 	"sync"
 )
 

+ 1 - 1
pipeline/component/flow/bi.go

@@ -1,9 +1,9 @@
 package flow
 
 import (
-	"errors"
 	"git.sxidc.com/go-tools/utils/pipeline/component"
 	"github.com/mitchellh/mapstructure"
+	"github.com/pkg/errors"
 )
 
 const (

+ 1 - 1
pipeline/component/flow/flow.go

@@ -1,9 +1,9 @@
 package flow
 
 import (
-	"errors"
 	"git.sxidc.com/go-tools/utils/pipeline/component"
 	"git.sxidc.com/go-tools/utils/strutils"
+	"github.com/pkg/errors"
 )
 
 type SubComponentBuildParams struct {

+ 1 - 1
pipeline/component/flow/if.go

@@ -1,10 +1,10 @@
 package flow
 
 import (
-	"errors"
 	"fmt"
 	"git.sxidc.com/go-tools/utils/pipeline/component"
 	"github.com/mitchellh/mapstructure"
+	"github.com/pkg/errors"
 )
 
 const (

+ 1 - 1
pipeline/component/flow/loop.go

@@ -1,10 +1,10 @@
 package flow
 
 import (
-	"errors"
 	"fmt"
 	"git.sxidc.com/go-tools/utils/pipeline/component"
 	"github.com/mitchellh/mapstructure"
+	"github.com/pkg/errors"
 )
 
 const (

+ 1 - 1
pipeline/component/flow/range.go

@@ -1,9 +1,9 @@
 package flow
 
 import (
-	"errors"
 	"git.sxidc.com/go-tools/utils/pipeline/component"
 	"github.com/mitchellh/mapstructure"
+	"github.com/pkg/errors"
 )
 
 const (

+ 1 - 1
pipeline/component/flow/seq.go

@@ -1,9 +1,9 @@
 package flow
 
 import (
-	"errors"
 	"git.sxidc.com/go-tools/utils/pipeline/component"
 	"github.com/mitchellh/mapstructure"
+	"github.com/pkg/errors"
 )
 
 const (

+ 1 - 1
pipeline/definition.go

@@ -1,11 +1,11 @@
 package pipeline
 
 import (
-	"errors"
 	"git.sxidc.com/go-tools/utils/pipeline/component"
 	"git.sxidc.com/go-tools/utils/pipeline/component/flow"
 	"git.sxidc.com/go-tools/utils/strutils"
 	"github.com/fatih/structs"
+	"github.com/pkg/errors"
 )
 
 type Definition struct {

+ 1 - 1
pipeline/pipeline.go

@@ -1,11 +1,11 @@
 package pipeline
 
 import (
-	"errors"
 	"git.sxidc.com/go-tools/utils/fileutils"
 	"git.sxidc.com/go-tools/utils/pipeline/component"
 	"git.sxidc.com/go-tools/utils/pipeline/component/flow"
 	"git.sxidc.com/go-tools/utils/strutils"
+	"github.com/pkg/errors"
 	"gopkg.in/yaml.v3"
 	"os"
 )

+ 1 - 1
pipeline/test/test_node/bool.go

@@ -1,11 +1,11 @@
 package test_node
 
 import (
-	"errors"
 	"fmt"
 	"git.sxidc.com/go-tools/utils/pipeline/component"
 	"git.sxidc.com/go-tools/utils/strutils"
 	"github.com/mitchellh/mapstructure"
+	"github.com/pkg/errors"
 	"math/rand"
 )
 

+ 1 - 1
pipeline/test/test_node/println.go

@@ -1,11 +1,11 @@
 package test_node
 
 import (
-	"errors"
 	"fmt"
 	"git.sxidc.com/go-tools/utils/pipeline/component"
 	"git.sxidc.com/go-tools/utils/pipeline/component/flow"
 	"github.com/mitchellh/mapstructure"
+	"github.com/pkg/errors"
 )
 
 const (

+ 1 - 1
redis_client/redis_client.go

@@ -2,7 +2,7 @@ package redis_client
 
 import (
 	"context"
-	"errors"
+	"github.com/pkg/errors"
 	"github.com/redis/go-redis/v9"
 	"time"
 )

+ 1 - 1
redislock/lock.go

@@ -2,7 +2,7 @@ package redislock
 
 import (
 	"context"
-	"errors"
+	"github.com/pkg/errors"
 	"github.com/redis/go-redis/v9"
 	"time"
 )

+ 1 - 1
reflectutils/value.go

@@ -1,7 +1,7 @@
 package reflectutils
 
 import (
-	"errors"
+	"github.com/pkg/errors"
 	"reflect"
 	"strconv"
 )

+ 177 - 78
websocket/websocket.go

@@ -2,66 +2,44 @@ package websocket
 
 import (
 	"github.com/olahol/melody"
+	"github.com/pkg/errors"
 	"net/http"
+	"sync"
 	"time"
 )
 
-const (
-	groupIDKey           = "group-id"
-	connectionContextKey = "connection-context"
-)
-
-type HandleConnectFunc func(groupID string, context any)
-type HandleDisconnectFunc func(groupID string, context any)
-type HandleErrorFunc func(groupID string, err error, context any)
-type HandleCloseFunc func(groupID string, i int, s string, context any) error
-type HandlePongFunc func(groupID string, context any)
-type HandleMessageFunc func(groupID string, message []byte, context any)
+type HandleConnectFunc func(context map[string]any)
+type HandleDisconnectFunc func(context map[string]any)
+type HandleErrorFunc func(err error, context map[string]any)
+type HandleCloseFunc func(i int, s string, context map[string]any) error
+type HandlePongFunc func(context map[string]any)
+type HandleMessageFunc func(message []byte, context map[string]any)
 
 var managerInstance *Manager
 
-func Init(opts ...InitOption) {
+func Init() {
 	if managerInstance == nil {
-		melodyInstance := melody.New()
-
-		options := new(InitOptions)
-		for _, opt := range opts {
-			opt(options)
-		}
-
-		if options.writeWaitSec != 0 {
-			melodyInstance.Config.WriteWait = time.Duration(options.writeWaitSec) * time.Second
+		managerInstance = &Manager{
+			melodyMapMutex: &sync.RWMutex{},
+			melodyMap:      make(map[string]*melody.Melody),
 		}
-
-		if options.pongWaitSec != 0 {
-			melodyInstance.Config.PongWait = time.Duration(options.pongWaitSec) * time.Second
-		}
-
-		if options.pingPeriodSec != 0 {
-			melodyInstance.Config.PingPeriod = time.Duration(options.pingPeriodSec) * time.Second
-		}
-
-		if options.maxMessageSize != 0 {
-			melodyInstance.Config.MaxMessageSize = options.maxMessageSize
-		}
-
-		if options.messageBufferSize != 0 {
-			melodyInstance.Config.MessageBufferSize = options.messageBufferSize
-		}
-
-		melodyInstance.Config.ConcurrentMessageHandling = options.concurrentMessageHandling
-
-		melodyInstance.Upgrader.CheckOrigin = func(r *http.Request) bool { return true }
-		managerInstance = &Manager{melodyInstance: melodyInstance}
 	}
 }
 
 func Destroy() {
 	if managerInstance != nil {
-		err := managerInstance.melodyInstance.Close()
-		if err != nil {
-			panic(err)
+		managerInstance.melodyMapMutex.Lock()
+		defer managerInstance.melodyMapMutex.Unlock()
+
+		for _, melodyInstance := range managerInstance.melodyMap {
+			err := melodyInstance.Close()
+			if err != nil {
+				panic(err)
+			}
 		}
+
+		managerInstance.melodyMap = nil
+		managerInstance = nil
 	}
 
 	managerInstance = nil
@@ -72,37 +50,132 @@ func GetInstance() *Manager {
 }
 
 type Manager struct {
-	melodyInstance *melody.Melody
+	melodyMapMutex *sync.RWMutex
+	melodyMap      map[string]*melody.Melody
 }
 
-func (m *Manager) HandleConnect(handleConnectFunc HandleConnectFunc) {
-	m.melodyInstance.HandleConnect(func(session *melody.Session) {
+func (m *Manager) RegisterHub(groupID string, opts ...InitOption) {
+	m.melodyMapMutex.Lock()
+	defer m.melodyMapMutex.Unlock()
+
+	_, ok := m.melodyMap[groupID]
+	if ok {
+		return
+	}
+
+	melodyInstance := melody.New()
+
+	options := new(InitOptions)
+	for _, opt := range opts {
+		opt(options)
+	}
+
+	if options.writeWaitSec != 0 {
+		melodyInstance.Config.WriteWait = time.Duration(options.writeWaitSec) * time.Second
+	}
+
+	if options.pongWaitSec != 0 {
+		melodyInstance.Config.PongWait = time.Duration(options.pongWaitSec) * time.Second
+	}
+
+	if options.pingPeriodSec != 0 {
+		melodyInstance.Config.PingPeriod = time.Duration(options.pingPeriodSec) * time.Second
+	}
+
+	if options.maxMessageSize != 0 {
+		melodyInstance.Config.MaxMessageSize = options.maxMessageSize
+	}
+
+	if options.messageBufferSize != 0 {
+		melodyInstance.Config.MessageBufferSize = options.messageBufferSize
+	}
+
+	melodyInstance.Config.ConcurrentMessageHandling = options.concurrentMessageHandling
+
+	melodyInstance.Upgrader.CheckOrigin = func(r *http.Request) bool { return true }
+
+	m.melodyMap[groupID] = melodyInstance
+}
+
+func (m *Manager) UnregisterHub(groupID string) {
+	m.melodyMapMutex.Lock()
+	defer m.melodyMapMutex.Unlock()
+
+	melodyInstance, ok := m.melodyMap[groupID]
+	if !ok {
+		return
+	}
+
+	err := melodyInstance.Close()
+	if err != nil {
+		panic(err)
+	}
+
+	melodyInstance = nil
+
+	delete(m.melodyMap, groupID)
+}
+
+func (m *Manager) HandleConnect(groupID string, handleConnectFunc HandleConnectFunc) {
+	m.melodyMapMutex.RLock()
+	defer m.melodyMapMutex.RUnlock()
+
+	melodyInstance, ok := m.melodyMap[groupID]
+	if !ok {
+		return
+	}
+
+	melodyInstance.HandleConnect(func(session *melody.Session) {
 		if handleConnectFunc != nil {
-			handleConnectFunc(session.Keys[groupIDKey].(string), session.Keys[connectionContextKey])
+			handleConnectFunc(session.Keys)
 		}
 	})
 }
 
-func (m *Manager) HandleDisconnect(handleDisconnectFunc HandleDisconnectFunc) {
-	m.melodyInstance.HandleDisconnect(func(session *melody.Session) {
+func (m *Manager) HandleDisconnect(groupID string, handleDisconnectFunc HandleDisconnectFunc) {
+	m.melodyMapMutex.Lock()
+	defer m.melodyMapMutex.Unlock()
+
+	melodyInstance, ok := m.melodyMap[groupID]
+	if !ok {
+		return
+	}
+
+	melodyInstance.HandleDisconnect(func(session *melody.Session) {
 		if handleDisconnectFunc != nil {
-			handleDisconnectFunc(session.Keys[groupIDKey].(string), session.Keys[connectionContextKey])
+			handleDisconnectFunc(session.Keys)
 		}
 	})
 }
 
-func (m *Manager) HandleError(handleErrorFunc HandleErrorFunc) {
-	m.melodyInstance.HandleError(func(session *melody.Session, err error) {
+func (m *Manager) HandleError(groupID string, handleErrorFunc HandleErrorFunc) {
+	m.melodyMapMutex.RLock()
+	defer m.melodyMapMutex.RUnlock()
+
+	melodyInstance, ok := m.melodyMap[groupID]
+	if !ok {
+		return
+	}
+
+	melodyInstance.HandleError(func(session *melody.Session, err error) {
 		if handleErrorFunc != nil {
-			handleErrorFunc(session.Keys[groupIDKey].(string), err, session.Keys[connectionContextKey])
+			handleErrorFunc(err, session.Keys)
 		}
 	})
 }
 
-func (m *Manager) HandleClose(handleCloseFunc HandleCloseFunc) {
-	m.melodyInstance.HandleClose(func(session *melody.Session, i int, s string) error {
+func (m *Manager) HandleClose(groupID string, handleCloseFunc HandleCloseFunc) {
+	m.melodyMapMutex.RLock()
+	defer m.melodyMapMutex.RUnlock()
+
+	melodyInstance, ok := m.melodyMap[groupID]
+	if !ok {
+		return
+	}
+
+	melodyInstance.HandleClose(func(session *melody.Session, i int, s string) error {
 		if handleCloseFunc != nil {
-			err := handleCloseFunc(session.Keys[groupIDKey].(string), i, s, session.Keys[connectionContextKey])
+			err := handleCloseFunc(i, s, session.Keys)
 			if err != nil {
 				return err
 			}
@@ -112,24 +185,38 @@ func (m *Manager) HandleClose(handleCloseFunc HandleCloseFunc) {
 	})
 }
 
-func (m *Manager) HandlePong(handlePongFunc HandlePongFunc) {
-	m.melodyInstance.HandlePong(func(session *melody.Session) {
+func (m *Manager) HandlePong(groupID string, handlePongFunc HandlePongFunc) {
+	m.melodyMapMutex.RLock()
+	defer m.melodyMapMutex.RUnlock()
+
+	melodyInstance, ok := m.melodyMap[groupID]
+	if !ok {
+		return
+	}
+
+	melodyInstance.HandlePong(func(session *melody.Session) {
 		if handlePongFunc != nil {
-			handlePongFunc(session.Keys[groupIDKey].(string), session.Keys[connectionContextKey])
+			handlePongFunc(session.Keys)
 		}
 	})
 }
 
 func (m *Manager) HandleRequest(groupID string, w http.ResponseWriter, r *http.Request, opts ...ConnectionOption) error {
-	sessionMap := map[string]interface{}{
-		groupIDKey: groupID,
+	m.melodyMapMutex.RLock()
+	defer m.melodyMapMutex.RUnlock()
+
+	melodyInstance, ok := m.melodyMap[groupID]
+	if !ok {
+		return errors.New("groupID尚未注册")
 	}
 
+	sessionMap := map[string]any{}
+
 	for _, opt := range opts {
-		opt(sessionMap)
+		opt(&sessionMap)
 	}
 
-	err := m.melodyInstance.HandleRequestWithKeys(w, r, sessionMap)
+	err := melodyInstance.HandleRequestWithKeys(w, r, sessionMap)
 	if err != nil {
 		return err
 	}
@@ -137,22 +224,34 @@ func (m *Manager) HandleRequest(groupID string, w http.ResponseWriter, r *http.R
 	return nil
 }
 
-func (m *Manager) HandleMessage(handleMessageFunc HandleMessageFunc) {
-	m.melodyInstance.HandleMessage(func(session *melody.Session, bytes []byte) {
+func (m *Manager) HandleMessage(groupID string, handleMessageFunc HandleMessageFunc) error {
+	m.melodyMapMutex.RLock()
+	defer m.melodyMapMutex.RUnlock()
+
+	melodyInstance, ok := m.melodyMap[groupID]
+	if !ok {
+		return errors.New("groupID尚未注册")
+	}
+
+	melodyInstance.HandleMessage(func(session *melody.Session, bytes []byte) {
 		if handleMessageFunc != nil {
-			handleMessageFunc(session.Keys[groupIDKey].(string), bytes, session.Keys[connectionContextKey])
+			handleMessageFunc(bytes, session.Keys)
 		}
 	})
+
+	return nil
 }
 
 func (m *Manager) BroadCast(groupID string, msg []byte) error {
-	return m.melodyInstance.BroadcastFilter(msg, func(session *melody.Session) bool {
-		if session.Keys[groupIDKey] != groupID {
-			return false
-		}
+	m.melodyMapMutex.RLock()
+	defer m.melodyMapMutex.RUnlock()
 
-		return true
-	})
+	melodyInstance, ok := m.melodyMap[groupID]
+	if !ok {
+		return errors.New("groupID尚未注册")
+	}
+
+	return melodyInstance.Broadcast(msg)
 }
 
 type InitOption func(*InitOptions)
@@ -202,10 +301,10 @@ func InitWithConcurrentMessageHandling(concurrentMessageHandling bool) InitOptio
 	}
 }
 
-type ConnectionOption func(sessionMap map[string]any)
+type ConnectionOption func(sessionMap *map[string]any)
 
-func WithConnectionContext(context any) ConnectionOption {
-	return func(sessionMap map[string]any) {
-		sessionMap[connectionContextKey] = context
+func WithConnectionContext(context map[string]any) ConnectionOption {
+	return func(sessionMap *map[string]any) {
+		sessionMap = &context
 	}
 }

+ 1 - 1
yaml/yaml_checker/schema_doc.go

@@ -1,7 +1,7 @@
 package yaml_checker
 
 import (
-	"errors"
+	"github.com/pkg/errors"
 	"gopkg.in/yaml.v3"
 	"os"
 )

+ 1 - 1
yaml/yaml_checker/schema_node.go

@@ -1,7 +1,7 @@
 package yaml_checker
 
 import (
-	"errors"
+	"github.com/pkg/errors"
 	"reflect"
 )
 

+ 1 - 1
yaml/yaml_loader/yaml_loader.go

@@ -2,8 +2,8 @@ package yaml_loader
 
 import (
 	"bytes"
-	"errors"
 	"fmt"
+	"github.com/pkg/errors"
 	"gopkg.in/yaml.v3"
 	"io"
 	"os"