package queue import ( "context" "fmt" "github.com/break/junhong_cmp_fiber/pkg/config" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" ) // Client Asynq 任务提交客户端 type Client struct { client *asynq.Client logger *zap.Logger } // NewClient 创建新的 Asynq 客户端 func NewClient(redisClient *redis.Client, logger *zap.Logger) *Client { // 从 Redis 客户端获取配置 opts := redisClient.Options() asynqClient := asynq.NewClient(asynq.RedisClientOpt{ Addr: opts.Addr, Password: opts.Password, DB: opts.DB, }) return &Client{ client: asynqClient, logger: logger, } } // EnqueueTask 提交任务到队列 func (c *Client) EnqueueTask(ctx context.Context, taskType string, payload interface{}, opts ...asynq.Option) error { // 序列化载荷 payloadBytes, err := sonic.Marshal(payload) if err != nil { c.logger.Error("任务载荷序列化失败", zap.String("task_type", taskType), zap.Error(err)) return fmt.Errorf("failed to marshal task payload: %w", err) } // 创建任务 task := asynq.NewTask(taskType, payloadBytes, opts...) // 提交任务 info, err := c.client.EnqueueContext(ctx, task) if err != nil { c.logger.Error("任务提交失败", zap.String("task_type", taskType), zap.Error(err)) return fmt.Errorf("failed to enqueue task: %w", err) } c.logger.Info("任务已提交", zap.String("task_id", info.ID), zap.String("task_type", taskType), zap.String("queue", info.Queue), zap.Int("max_retry", info.MaxRetry)) return nil } // Close 关闭客户端 func (c *Client) Close() error { if c.client != nil { return c.client.Close() } return nil } // ParseQueueConfig 解析队列配置为 Asynq 格式 func ParseQueueConfig(cfg *config.QueueConfig) map[string]int { if cfg.Queues != nil && len(cfg.Queues) > 0 { return cfg.Queues } // 默认队列优先级 return map[string]int{ "critical": 6, "default": 3, "low": 1, } }