websocket.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. package websocket
  2. import (
  3. "github.com/olahol/melody"
  4. "net/http"
  5. "time"
  6. )
  7. const (
  8. groupIDKey = "group-id"
  9. connectionContextKey = "connection-context"
  10. )
  11. type HandleConnectFunc func(groupID string, context any)
  12. type HandleDisconnectFunc func(groupID string, context any)
  13. type HandleErrorFunc func(groupID string, err error, context any)
  14. type HandleCloseFunc func(groupID string, i int, s string, context any) error
  15. type HandlePongFunc func(groupID string, context any)
  16. type HandleMessageFunc func(groupID string, message []byte, context any)
  17. var managerInstance *Manager
  18. func Init(opts ...InitOption) {
  19. if managerInstance == nil {
  20. melodyInstance := melody.New()
  21. options := new(InitOptions)
  22. for _, opt := range opts {
  23. opt(options)
  24. }
  25. if options.writeWaitSec != 0 {
  26. melodyInstance.Config.WriteWait = time.Duration(options.writeWaitSec) * time.Second
  27. }
  28. if options.pongWaitSec != 0 {
  29. melodyInstance.Config.PongWait = time.Duration(options.pongWaitSec) * time.Second
  30. }
  31. if options.pingPeriodSec != 0 {
  32. melodyInstance.Config.PingPeriod = time.Duration(options.pingPeriodSec) * time.Second
  33. }
  34. if options.maxMessageSize != 0 {
  35. melodyInstance.Config.MaxMessageSize = options.maxMessageSize
  36. }
  37. if options.messageBufferSize != 0 {
  38. melodyInstance.Config.MessageBufferSize = options.messageBufferSize
  39. }
  40. melodyInstance.Config.ConcurrentMessageHandling = options.concurrentMessageHandling
  41. melodyInstance.Upgrader.CheckOrigin = func(r *http.Request) bool { return true }
  42. managerInstance = &Manager{melodyInstance: melodyInstance}
  43. }
  44. }
  45. func Destroy() {
  46. if managerInstance != nil {
  47. err := managerInstance.melodyInstance.Close()
  48. if err != nil {
  49. panic(err)
  50. }
  51. }
  52. managerInstance = nil
  53. }
  54. func GetInstance() *Manager {
  55. return managerInstance
  56. }
  57. type Manager struct {
  58. melodyInstance *melody.Melody
  59. }
  60. func (m *Manager) HandleConnect(handleConnectFunc HandleConnectFunc) {
  61. m.melodyInstance.HandleConnect(func(session *melody.Session) {
  62. if handleConnectFunc != nil {
  63. handleConnectFunc(session.Keys[groupIDKey].(string), session.Keys[connectionContextKey])
  64. }
  65. })
  66. }
  67. func (m *Manager) HandleDisconnect(handleDisconnectFunc HandleDisconnectFunc) {
  68. m.melodyInstance.HandleDisconnect(func(session *melody.Session) {
  69. if handleDisconnectFunc != nil {
  70. handleDisconnectFunc(session.Keys[groupIDKey].(string), session.Keys[connectionContextKey])
  71. }
  72. })
  73. }
  74. func (m *Manager) HandleError(handleErrorFunc HandleErrorFunc) {
  75. m.melodyInstance.HandleError(func(session *melody.Session, err error) {
  76. if handleErrorFunc != nil {
  77. handleErrorFunc(session.Keys[groupIDKey].(string), err, session.Keys[connectionContextKey])
  78. }
  79. })
  80. }
  81. func (m *Manager) HandleClose(handleCloseFunc HandleCloseFunc) {
  82. m.melodyInstance.HandleClose(func(session *melody.Session, i int, s string) error {
  83. if handleCloseFunc != nil {
  84. err := handleCloseFunc(session.Keys[groupIDKey].(string), i, s, session.Keys[connectionContextKey])
  85. if err != nil {
  86. return err
  87. }
  88. }
  89. return nil
  90. })
  91. }
  92. func (m *Manager) HandlePong(handlePongFunc HandlePongFunc) {
  93. m.melodyInstance.HandlePong(func(session *melody.Session) {
  94. if handlePongFunc != nil {
  95. handlePongFunc(session.Keys[groupIDKey].(string), session.Keys[connectionContextKey])
  96. }
  97. })
  98. }
  99. func (m *Manager) HandleRequest(groupID string, w http.ResponseWriter, r *http.Request, opts ...ConnectionOption) error {
  100. sessionMap := map[string]interface{}{
  101. groupIDKey: groupID,
  102. }
  103. for _, opt := range opts {
  104. opt(sessionMap)
  105. }
  106. err := m.melodyInstance.HandleRequestWithKeys(w, r, sessionMap)
  107. if err != nil {
  108. return err
  109. }
  110. return nil
  111. }
  112. func (m *Manager) HandleMessage(handleMessageFunc HandleMessageFunc) {
  113. m.melodyInstance.HandleMessage(func(session *melody.Session, bytes []byte) {
  114. if handleMessageFunc != nil {
  115. handleMessageFunc(session.Keys[groupIDKey].(string), bytes, session.Keys[connectionContextKey])
  116. }
  117. })
  118. }
  119. func (m *Manager) BroadCast(groupID string, msg []byte) error {
  120. return m.melodyInstance.BroadcastFilter(msg, func(session *melody.Session) bool {
  121. if session.Keys[groupIDKey] != groupID {
  122. return false
  123. }
  124. return true
  125. })
  126. }
  127. type InitOption func(*InitOptions)
  128. type InitOptions struct {
  129. writeWaitSec int64
  130. pongWaitSec int64
  131. pingPeriodSec int64
  132. maxMessageSize int64
  133. messageBufferSize int
  134. concurrentMessageHandling bool
  135. }
  136. func InitWithWriteWaitSec(writeWaitSec int64) InitOption {
  137. return func(options *InitOptions) {
  138. options.writeWaitSec = writeWaitSec
  139. }
  140. }
  141. func InitWithPongWaitSec(pongWaitSec int64) InitOption {
  142. return func(options *InitOptions) {
  143. options.pongWaitSec = pongWaitSec
  144. }
  145. }
  146. func InitWithPingPeriodSec(pingPeriodSec int64) InitOption {
  147. return func(options *InitOptions) {
  148. options.pingPeriodSec = pingPeriodSec
  149. }
  150. }
  151. func InitWithMaxMessageSize(maxMessageSize int64) InitOption {
  152. return func(options *InitOptions) {
  153. options.maxMessageSize = maxMessageSize
  154. }
  155. }
  156. func InitWithMaxMessageBufferSize(messageBufferSize int) InitOption {
  157. return func(options *InitOptions) {
  158. options.messageBufferSize = messageBufferSize
  159. }
  160. }
  161. func InitWithConcurrentMessageHandling(concurrentMessageHandling bool) InitOption {
  162. return func(options *InitOptions) {
  163. options.concurrentMessageHandling = concurrentMessageHandling
  164. }
  165. }
  166. type ConnectionOption func(sessionMap map[string]any)
  167. func WithConnectionContext(context any) ConnectionOption {
  168. return func(sessionMap map[string]any) {
  169. sessionMap[connectionContextKey] = context
  170. }
  171. }