package queue import ( "github.com/break/junhong_cmp_fiber/pkg/config" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" ) // Server Asynq Worker 服务器 type Server struct { server *asynq.Server logger *zap.Logger } // NewServer 创建新的 Asynq 服务器 func NewServer(redisClient *redis.Client, queueCfg *config.QueueConfig, logger *zap.Logger) *Server { // 从 Redis 客户端获取配置 opts := redisClient.Options() // 解析队列优先级配置 queues := ParseQueueConfig(queueCfg) // 设置并发数 concurrency := queueCfg.Concurrency if concurrency <= 0 { concurrency = constants.DefaultConcurrency } // 创建 Asynq 服务器配置 asynqServer := asynq.NewServer( asynq.RedisClientOpt{ Addr: opts.Addr, Password: opts.Password, DB: opts.DB, }, asynq.Config{ // 并发数 Concurrency: concurrency, // 队列优先级配置 Queues: queues, // 重试延迟函数(指数退避) RetryDelayFunc: asynq.DefaultRetryDelayFunc, // 是否记录详细日志 LogLevel: asynq.WarnLevel, }, ) return &Server{ server: asynqServer, logger: logger, } } // Start 启动 Worker 服务器 func (s *Server) Start(mux *asynq.ServeMux) error { s.logger.Info("Worker 服务器启动中...") if err := s.server.Start(mux); err != nil { s.logger.Error("Worker 服务器启动失败", zap.Error(err)) return err } s.logger.Info("Worker 服务器启动成功") return nil } // Shutdown 优雅关闭服务器 func (s *Server) Shutdown() { s.logger.Info("Worker 服务器关闭中...") s.server.Shutdown() s.logger.Info("Worker 服务器已关闭") } // Run 启动并阻塞运行(用于主函数) func (s *Server) Run(mux *asynq.ServeMux) error { s.logger.Info("Worker 服务器启动中...") if err := s.server.Run(mux); err != nil { s.logger.Error("Worker 服务器运行失败", zap.Error(err)) return err } return nil }