package ups_sdk

import (
	"encoding/json"
	"errors"
	"github.com/robinjoseph08/redisqueue/v2"
	"time"
)

const (
	topicUserRegistered      = "ups:user-registered"
	topicUserUserNameUpdated = "ups:user-username-updated"
	topicUserDeleted         = "ups:user-deleted"
)

type UserRegisteredCallback func(data *UserRegisteredMessageData, err error) error
type UserUserNameUpdatedCallback func(data *UserUserNameUpdatedMessageData, err error) error
type UserDeletedCallback func(data *UserDeletedMessageData, err error) error

type MessageReceiver struct {
	consumer                    *redisqueue.Consumer
	userRegisteredCallback      UserRegisteredCallback
	userUserNameUpdatedCallback UserUserNameUpdatedCallback
	userDeletedCallback         UserDeletedCallback
}

type MessageReceiverOptions struct {
	// 接收者名称,必传
	ReceiverName string

	// 消息组名称,必传
	MessageGroupName string

	// 用户注册消息回调,必传
	UserRegisteredCallback UserRegisteredCallback

	// 用户名更新消息回调,必传
	UserUserNameUpdatedCallback UserUserNameUpdatedCallback

	// 用户删除消息回调,必传
	UserDeletedCallback UserDeletedCallback

	// Redis配置
	RedisOptions
}

type RedisOptions struct {
	Address      string
	Password     string
	DB           int
	MaxRetries   int
	DialTimeout  time.Duration
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
}

// NewMessageReceiver 创建MessageReceiver
func NewMessageReceiver(options *MessageReceiverOptions) (*MessageReceiver, error) {
	if IsStringEmpty(options.ReceiverName) {
		return nil, errors.New("没有传递接受者名称")
	}

	if IsStringEmpty(options.MessageGroupName) {
		return nil, errors.New("没有传递消息组名称")
	}

	if IsStringEmpty(options.Address) {
		return nil, errors.New("没有传递Redis地址")
	}

	if IsStringEmpty(options.Password) {
		return nil, errors.New("没有传递Redis密码")
	}

	if options.UserRegisteredCallback == nil ||
		options.UserUserNameUpdatedCallback == nil ||
		options.UserDeletedCallback == nil {
		return nil, errors.New("没有传递需要的回调函数")
	}

	consumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
		Name:              options.ReceiverName,
		GroupName:         options.MessageGroupName,
		VisibilityTimeout: MessageReceiverVisibilityTimeout,
		BlockingTimeout:   MessageReceiverBlockingTimeout,
		ReclaimInterval:   MessageReceiverReclaimInterval,
		BufferSize:        MessageReceiverBufferSize,
		Concurrency:       MessageReceiverConcurrency,
		RedisOptions: &redisqueue.RedisOptions{
			Addr:         options.Address,
			Password:     options.Password,
			DB:           options.DB,
			MaxRetries:   options.MaxRetries,
			DialTimeout:  options.DialTimeout,
			ReadTimeout:  options.ReadTimeout,
			WriteTimeout: options.WriteTimeout,
		},
	})
	if err != nil {
		return nil, err
	}

	return &MessageReceiver{
		consumer:                    consumer,
		userRegisteredCallback:      options.UserRegisteredCallback,
		userUserNameUpdatedCallback: options.UserUserNameUpdatedCallback,
		userDeletedCallback:         options.UserDeletedCallback,
	}, nil
}

// DestroyMessageReceiver 销毁MessageReceiver
func DestroyMessageReceiver(messageReceiver *MessageReceiver) {
	messageReceiver.userDeletedCallback = nil
	messageReceiver.userUserNameUpdatedCallback = nil
	messageReceiver.userRegisteredCallback = nil
	messageReceiver.consumer = nil
	messageReceiver = nil
}

// StartMessageReceiver 启动消息接收
func (receiver *MessageReceiver) StartMessageReceiver() error {
	receiver.consumer.Register(topicUserRegistered, receiver.processMessage)
	receiver.consumer.Register(topicUserUserNameUpdated, receiver.processMessage)
	receiver.consumer.Register(topicUserDeleted, receiver.processMessage)

	go receiver.consumer.Run()

	return nil
}

// StopMessageReceiver 停止消息接收
func (receiver *MessageReceiver) StopMessageReceiver() {
	receiver.consumer.Shutdown()
}

func (receiver *MessageReceiver) processMessage(receivedMessage *redisqueue.Message) error {
	msgStr := receivedMessage.Values["msg"].(string)

	msg := new(Message)
	err := json.Unmarshal([]byte(msgStr), msg)
	if err != nil {
		return err
	}

	switch receivedMessage.Stream {
	case topicUserRegistered:
		data := new(UserRegisteredMessageData)
		err := json.Unmarshal([]byte(msg.Data), data)
		if err != nil {
			return receiver.userRegisteredCallback(nil, err)
		}

		return receiver.userRegisteredCallback(data, nil)
	case topicUserUserNameUpdated:
		data := new(UserUserNameUpdatedMessageData)
		err := json.Unmarshal([]byte(msg.Data), data)
		if err != nil {
			return receiver.userUserNameUpdatedCallback(nil, err)
		}

		return receiver.userUserNameUpdatedCallback(data, nil)
	case topicUserDeleted:
		data := new(UserDeletedMessageData)
		err := json.Unmarshal([]byte(msg.Data), data)
		if err != nil {
			return receiver.userDeletedCallback(nil, err)
		}

		return receiver.userDeletedCallback(data, nil)
	default:
		return errors.New("不存在的主题: " + receivedMessage.Stream)
	}
}