package main import ( "context" "os" "os/signal" "strconv" "syscall" "github.com/redis/go-redis/v9" "go.uber.org/zap" "github.com/break/junhong_cmp_fiber/pkg/config" "github.com/break/junhong_cmp_fiber/pkg/database" "github.com/break/junhong_cmp_fiber/pkg/logger" "github.com/break/junhong_cmp_fiber/pkg/queue" ) func main() { // 加载配置 cfg, err := config.Load() if err != nil { panic("加载配置失败: " + err.Error()) } // 初始化日志 if err := logger.InitLoggers( cfg.Logging.Level, cfg.Logging.Development, logger.LogRotationConfig{ Filename: cfg.Logging.AppLog.Filename, MaxSize: cfg.Logging.AppLog.MaxSize, MaxBackups: cfg.Logging.AppLog.MaxBackups, MaxAge: cfg.Logging.AppLog.MaxAge, Compress: cfg.Logging.AppLog.Compress, }, logger.LogRotationConfig{ Filename: cfg.Logging.AccessLog.Filename, MaxSize: cfg.Logging.AccessLog.MaxSize, MaxBackups: cfg.Logging.AccessLog.MaxBackups, MaxAge: cfg.Logging.AccessLog.MaxAge, Compress: cfg.Logging.AccessLog.Compress, }, ); err != nil { panic("初始化日志失败: " + err.Error()) } defer func() { _ = logger.Sync() // 忽略 sync 错误 }() appLogger := logger.GetAppLogger() appLogger.Info("Worker 服务启动中...") // 连接 Redis redisAddr := cfg.Redis.Address + ":" + strconv.Itoa(cfg.Redis.Port) redisClient := redis.NewClient(&redis.Options{ Addr: redisAddr, Password: cfg.Redis.Password, DB: cfg.Redis.DB, PoolSize: cfg.Redis.PoolSize, MinIdleConns: cfg.Redis.MinIdleConns, DialTimeout: cfg.Redis.DialTimeout, ReadTimeout: cfg.Redis.ReadTimeout, WriteTimeout: cfg.Redis.WriteTimeout, }) defer func() { if err := redisClient.Close(); err != nil { appLogger.Error("关闭 Redis 客户端失败", zap.Error(err)) } }() // 测试 Redis 连接 ctx := context.Background() if err := redisClient.Ping(ctx).Err(); err != nil { appLogger.Fatal("连接 Redis 失败", zap.Error(err)) } appLogger.Info("Redis 已连接", zap.String("address", redisAddr)) // 初始化 PostgreSQL 连接 db, err := database.InitPostgreSQL(&cfg.Database, appLogger) if err != nil { appLogger.Fatal("初始化 PostgreSQL 失败", zap.Error(err)) } defer func() { sqlDB, _ := db.DB() if sqlDB != nil { if err := sqlDB.Close(); err != nil { appLogger.Error("关闭 PostgreSQL 连接失败", zap.Error(err)) } } }() // 创建 Asynq Worker 服务器 workerServer := queue.NewServer(redisClient, &cfg.Queue, appLogger) // 创建任务处理器管理器并注册所有处理器 taskHandler := queue.NewHandler(db, redisClient, appLogger) taskHandler.RegisterHandlers() appLogger.Info("Worker 服务器配置完成", zap.Int("concurrency", cfg.Queue.Concurrency), zap.Any("queues", cfg.Queue.Queues)) // 优雅关闭 quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, syscall.SIGTERM) // 启动 Worker 服务器(阻塞运行) go func() { if err := workerServer.Run(taskHandler.GetMux()); err != nil { appLogger.Fatal("Worker 服务器运行失败", zap.Error(err)) } }() appLogger.Info("Worker 服务器已启动") // 等待关闭信号 <-quit appLogger.Info("正在关闭 Worker 服务器...") // 优雅关闭 Worker 服务器(等待正在执行的任务完成) workerServer.Shutdown() appLogger.Info("Worker 服务器已停止") }