|
@@ -2,45 +2,66 @@ package websocket
|
|
|
|
|
|
import (
|
|
|
"github.com/olahol/melody"
|
|
|
- "github.com/pkg/errors"
|
|
|
"net/http"
|
|
|
- "sync"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
-type HandleConnectFunc func(context map[string]any)
|
|
|
-type HandleDisconnectFunc func(context map[string]any)
|
|
|
-type HandleErrorFunc func(err error, context map[string]any)
|
|
|
-type HandleCloseFunc func(i int, s string, context map[string]any) error
|
|
|
-type HandlePongFunc func(context map[string]any)
|
|
|
-type HandleMessageFunc func(message []byte, context map[string]any)
|
|
|
-type BroadCastFilterCallback func(context map[string]any) bool
|
|
|
+const (
|
|
|
+ groupIDKey = "group-id"
|
|
|
+ connectionContextKey = "connection-context"
|
|
|
+)
|
|
|
+
|
|
|
+type HandleConnectFunc func(groupID string, context any)
|
|
|
+type HandleDisconnectFunc func(groupID string, context any)
|
|
|
+type HandleErrorFunc func(groupID string, err error, context any)
|
|
|
+type HandleCloseFunc func(groupID string, i int, s string, context any) error
|
|
|
+type HandlePongFunc func(groupID string, context any)
|
|
|
+type HandleMessageFunc func(groupID string, message []byte, context any)
|
|
|
|
|
|
var managerInstance *Manager
|
|
|
|
|
|
-func Init() {
|
|
|
+func Init(opts ...InitOption) {
|
|
|
if managerInstance == nil {
|
|
|
- managerInstance = &Manager{
|
|
|
- melodyMapMutex: &sync.RWMutex{},
|
|
|
- melodyMap: make(map[string]*melody.Melody),
|
|
|
+ melodyInstance := melody.New()
|
|
|
+
|
|
|
+ options := new(InitOptions)
|
|
|
+ for _, opt := range opts {
|
|
|
+ opt(options)
|
|
|
+ }
|
|
|
+
|
|
|
+ if options.writeWaitSec != 0 {
|
|
|
+ melodyInstance.Config.WriteWait = time.Duration(options.writeWaitSec) * time.Second
|
|
|
+ }
|
|
|
+
|
|
|
+ if options.pongWaitSec != 0 {
|
|
|
+ melodyInstance.Config.PongWait = time.Duration(options.pongWaitSec) * time.Second
|
|
|
+ }
|
|
|
+
|
|
|
+ if options.pingPeriodSec != 0 {
|
|
|
+ melodyInstance.Config.PingPeriod = time.Duration(options.pingPeriodSec) * time.Second
|
|
|
+ }
|
|
|
+
|
|
|
+ if options.maxMessageSize != 0 {
|
|
|
+ melodyInstance.Config.MaxMessageSize = options.maxMessageSize
|
|
|
+ }
|
|
|
+
|
|
|
+ if options.messageBufferSize != 0 {
|
|
|
+ melodyInstance.Config.MessageBufferSize = options.messageBufferSize
|
|
|
}
|
|
|
+
|
|
|
+ melodyInstance.Config.ConcurrentMessageHandling = options.concurrentMessageHandling
|
|
|
+
|
|
|
+ melodyInstance.Upgrader.CheckOrigin = func(r *http.Request) bool { return true }
|
|
|
+ managerInstance = &Manager{melodyInstance: melodyInstance}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func Destroy() {
|
|
|
if managerInstance != nil {
|
|
|
- managerInstance.melodyMapMutex.Lock()
|
|
|
- defer managerInstance.melodyMapMutex.Unlock()
|
|
|
-
|
|
|
- for _, melodyInstance := range managerInstance.melodyMap {
|
|
|
- err := melodyInstance.Close()
|
|
|
- if err != nil {
|
|
|
- panic(err)
|
|
|
- }
|
|
|
+ err := managerInstance.melodyInstance.Close()
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
}
|
|
|
-
|
|
|
- managerInstance.melodyMap = nil
|
|
|
- managerInstance = nil
|
|
|
}
|
|
|
|
|
|
managerInstance = nil
|
|
@@ -51,132 +72,37 @@ func GetInstance() *Manager {
|
|
|
}
|
|
|
|
|
|
type Manager struct {
|
|
|
- melodyMapMutex *sync.RWMutex
|
|
|
- melodyMap map[string]*melody.Melody
|
|
|
-}
|
|
|
-
|
|
|
-func (m *Manager) RegisterHub(groupID string, opts ...InitOption) {
|
|
|
- m.melodyMapMutex.Lock()
|
|
|
- defer m.melodyMapMutex.Unlock()
|
|
|
-
|
|
|
- _, ok := m.melodyMap[groupID]
|
|
|
- if ok {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- melodyInstance := melody.New()
|
|
|
-
|
|
|
- options := new(InitOptions)
|
|
|
- for _, opt := range opts {
|
|
|
- opt(options)
|
|
|
- }
|
|
|
-
|
|
|
- if options.writeWaitSec != 0 {
|
|
|
- melodyInstance.Config.WriteWait = time.Duration(options.writeWaitSec) * time.Second
|
|
|
- }
|
|
|
-
|
|
|
- if options.pongWaitSec != 0 {
|
|
|
- melodyInstance.Config.PongWait = time.Duration(options.pongWaitSec) * time.Second
|
|
|
- }
|
|
|
-
|
|
|
- if options.pingPeriodSec != 0 {
|
|
|
- melodyInstance.Config.PingPeriod = time.Duration(options.pingPeriodSec) * time.Second
|
|
|
- }
|
|
|
-
|
|
|
- if options.maxMessageSize != 0 {
|
|
|
- melodyInstance.Config.MaxMessageSize = options.maxMessageSize
|
|
|
- }
|
|
|
-
|
|
|
- if options.messageBufferSize != 0 {
|
|
|
- melodyInstance.Config.MessageBufferSize = options.messageBufferSize
|
|
|
- }
|
|
|
-
|
|
|
- melodyInstance.Config.ConcurrentMessageHandling = options.concurrentMessageHandling
|
|
|
-
|
|
|
- melodyInstance.Upgrader.CheckOrigin = func(r *http.Request) bool { return true }
|
|
|
-
|
|
|
- m.melodyMap[groupID] = melodyInstance
|
|
|
+ melodyInstance *melody.Melody
|
|
|
}
|
|
|
|
|
|
-func (m *Manager) UnregisterHub(groupID string) {
|
|
|
- m.melodyMapMutex.Lock()
|
|
|
- defer m.melodyMapMutex.Unlock()
|
|
|
-
|
|
|
- melodyInstance, ok := m.melodyMap[groupID]
|
|
|
- if !ok {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- err := melodyInstance.Close()
|
|
|
- if err != nil {
|
|
|
- panic(err)
|
|
|
- }
|
|
|
-
|
|
|
- melodyInstance = nil
|
|
|
-
|
|
|
- delete(m.melodyMap, groupID)
|
|
|
-}
|
|
|
-
|
|
|
-func (m *Manager) HandleConnect(groupID string, handleConnectFunc HandleConnectFunc) {
|
|
|
- m.melodyMapMutex.RLock()
|
|
|
- defer m.melodyMapMutex.RUnlock()
|
|
|
-
|
|
|
- melodyInstance, ok := m.melodyMap[groupID]
|
|
|
- if !ok {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- melodyInstance.HandleConnect(func(session *melody.Session) {
|
|
|
+func (m *Manager) HandleConnect(handleConnectFunc HandleConnectFunc) {
|
|
|
+ m.melodyInstance.HandleConnect(func(session *melody.Session) {
|
|
|
if handleConnectFunc != nil {
|
|
|
- handleConnectFunc(session.Keys)
|
|
|
+ handleConnectFunc(session.Keys[groupIDKey].(string), session.Keys[connectionContextKey])
|
|
|
}
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-func (m *Manager) HandleDisconnect(groupID string, handleDisconnectFunc HandleDisconnectFunc) {
|
|
|
- m.melodyMapMutex.Lock()
|
|
|
- defer m.melodyMapMutex.Unlock()
|
|
|
-
|
|
|
- melodyInstance, ok := m.melodyMap[groupID]
|
|
|
- if !ok {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- melodyInstance.HandleDisconnect(func(session *melody.Session) {
|
|
|
+func (m *Manager) HandleDisconnect(handleDisconnectFunc HandleDisconnectFunc) {
|
|
|
+ m.melodyInstance.HandleDisconnect(func(session *melody.Session) {
|
|
|
if handleDisconnectFunc != nil {
|
|
|
- handleDisconnectFunc(session.Keys)
|
|
|
+ handleDisconnectFunc(session.Keys[groupIDKey].(string), session.Keys[connectionContextKey])
|
|
|
}
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-func (m *Manager) HandleError(groupID string, handleErrorFunc HandleErrorFunc) {
|
|
|
- m.melodyMapMutex.RLock()
|
|
|
- defer m.melodyMapMutex.RUnlock()
|
|
|
-
|
|
|
- melodyInstance, ok := m.melodyMap[groupID]
|
|
|
- if !ok {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- melodyInstance.HandleError(func(session *melody.Session, err error) {
|
|
|
+func (m *Manager) HandleError(handleErrorFunc HandleErrorFunc) {
|
|
|
+ m.melodyInstance.HandleError(func(session *melody.Session, err error) {
|
|
|
if handleErrorFunc != nil {
|
|
|
- handleErrorFunc(err, session.Keys)
|
|
|
+ handleErrorFunc(session.Keys[groupIDKey].(string), err, session.Keys[connectionContextKey])
|
|
|
}
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-func (m *Manager) HandleClose(groupID string, handleCloseFunc HandleCloseFunc) {
|
|
|
- m.melodyMapMutex.RLock()
|
|
|
- defer m.melodyMapMutex.RUnlock()
|
|
|
-
|
|
|
- melodyInstance, ok := m.melodyMap[groupID]
|
|
|
- if !ok {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- melodyInstance.HandleClose(func(session *melody.Session, i int, s string) error {
|
|
|
+func (m *Manager) HandleClose(handleCloseFunc HandleCloseFunc) {
|
|
|
+ m.melodyInstance.HandleClose(func(session *melody.Session, i int, s string) error {
|
|
|
if handleCloseFunc != nil {
|
|
|
- err := handleCloseFunc(i, s, session.Keys)
|
|
|
+ err := handleCloseFunc(session.Keys[groupIDKey].(string), i, s, session.Keys[connectionContextKey])
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -186,38 +112,24 @@ func (m *Manager) HandleClose(groupID string, handleCloseFunc HandleCloseFunc) {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-func (m *Manager) HandlePong(groupID string, handlePongFunc HandlePongFunc) {
|
|
|
- m.melodyMapMutex.RLock()
|
|
|
- defer m.melodyMapMutex.RUnlock()
|
|
|
-
|
|
|
- melodyInstance, ok := m.melodyMap[groupID]
|
|
|
- if !ok {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- melodyInstance.HandlePong(func(session *melody.Session) {
|
|
|
+func (m *Manager) HandlePong(handlePongFunc HandlePongFunc) {
|
|
|
+ m.melodyInstance.HandlePong(func(session *melody.Session) {
|
|
|
if handlePongFunc != nil {
|
|
|
- handlePongFunc(session.Keys)
|
|
|
+ handlePongFunc(session.Keys[groupIDKey].(string), session.Keys[connectionContextKey])
|
|
|
}
|
|
|
})
|
|
|
}
|
|
|
|
|
|
func (m *Manager) HandleRequest(groupID string, w http.ResponseWriter, r *http.Request, opts ...ConnectionOption) error {
|
|
|
- m.melodyMapMutex.RLock()
|
|
|
- defer m.melodyMapMutex.RUnlock()
|
|
|
-
|
|
|
- melodyInstance, ok := m.melodyMap[groupID]
|
|
|
- if !ok {
|
|
|
- return errors.New("groupID尚未注册")
|
|
|
+ sessionMap := map[string]interface{}{
|
|
|
+ groupIDKey: groupID,
|
|
|
}
|
|
|
|
|
|
- sessionMap := make(map[string]any)
|
|
|
-
|
|
|
for _, opt := range opts {
|
|
|
- opt(&sessionMap)
|
|
|
+ opt(sessionMap)
|
|
|
}
|
|
|
|
|
|
- err := melodyInstance.HandleRequestWithKeys(w, r, sessionMap)
|
|
|
+ err := m.melodyInstance.HandleRequestWithKeys(w, r, sessionMap)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -225,47 +137,21 @@ func (m *Manager) HandleRequest(groupID string, w http.ResponseWriter, r *http.R
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (m *Manager) HandleMessage(groupID string, handleMessageFunc HandleMessageFunc) error {
|
|
|
- m.melodyMapMutex.RLock()
|
|
|
- defer m.melodyMapMutex.RUnlock()
|
|
|
-
|
|
|
- melodyInstance, ok := m.melodyMap[groupID]
|
|
|
- if !ok {
|
|
|
- return errors.New("groupID尚未注册")
|
|
|
- }
|
|
|
-
|
|
|
- melodyInstance.HandleMessage(func(session *melody.Session, bytes []byte) {
|
|
|
+func (m *Manager) HandleMessage(handleMessageFunc HandleMessageFunc) {
|
|
|
+ m.melodyInstance.HandleMessage(func(session *melody.Session, bytes []byte) {
|
|
|
if handleMessageFunc != nil {
|
|
|
- handleMessageFunc(bytes, session.Keys)
|
|
|
+ handleMessageFunc(session.Keys[groupIDKey].(string), bytes, session.Keys[connectionContextKey])
|
|
|
}
|
|
|
})
|
|
|
-
|
|
|
- return nil
|
|
|
}
|
|
|
|
|
|
func (m *Manager) BroadCast(groupID string, msg []byte) error {
|
|
|
- m.melodyMapMutex.RLock()
|
|
|
- defer m.melodyMapMutex.RUnlock()
|
|
|
-
|
|
|
- melodyInstance, ok := m.melodyMap[groupID]
|
|
|
- if !ok {
|
|
|
- return errors.New("groupID尚未注册")
|
|
|
- }
|
|
|
-
|
|
|
- return melodyInstance.Broadcast(msg)
|
|
|
-}
|
|
|
-
|
|
|
-func (m *Manager) BroadCastFilter(groupID string, msg []byte, filterCallback BroadCastFilterCallback) error {
|
|
|
- m.melodyMapMutex.RLock()
|
|
|
- defer m.melodyMapMutex.RUnlock()
|
|
|
-
|
|
|
- melodyInstance, ok := m.melodyMap[groupID]
|
|
|
- if !ok {
|
|
|
- return errors.New("groupID尚未注册")
|
|
|
- }
|
|
|
+ return m.melodyInstance.BroadcastFilter(msg, func(session *melody.Session) bool {
|
|
|
+ if session.Keys[groupIDKey] != groupID {
|
|
|
+ return false
|
|
|
+ }
|
|
|
|
|
|
- return melodyInstance.BroadcastFilter(msg, func(session *melody.Session) bool {
|
|
|
- return filterCallback(session.Keys)
|
|
|
+ return true
|
|
|
})
|
|
|
}
|
|
|
|
|
@@ -316,10 +202,10 @@ func InitWithConcurrentMessageHandling(concurrentMessageHandling bool) InitOptio
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-type ConnectionOption func(sessionMap *map[string]any)
|
|
|
+type ConnectionOption func(sessionMap map[string]any)
|
|
|
|
|
|
-func WithConnectionContext(context map[string]any) ConnectionOption {
|
|
|
- return func(sessionMap *map[string]any) {
|
|
|
- *sessionMap = context
|
|
|
+func WithConnectionContext(context any) ConnectionOption {
|
|
|
+ return func(sessionMap map[string]any) {
|
|
|
+ sessionMap[connectionContextKey] = context
|
|
|
}
|
|
|
}
|