package main import ( "context" "os" "os/signal" "strconv" "syscall" "time" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "github.com/break/junhong_cmp_fiber/internal/bootstrap" "github.com/break/junhong_cmp_fiber/internal/gateway" "github.com/break/junhong_cmp_fiber/internal/polling" pkgBootstrap "github.com/break/junhong_cmp_fiber/pkg/bootstrap" "github.com/break/junhong_cmp_fiber/pkg/config" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/database" "github.com/break/junhong_cmp_fiber/pkg/logger" "github.com/break/junhong_cmp_fiber/pkg/queue" "github.com/break/junhong_cmp_fiber/pkg/storage" ) func main() { cfg, err := config.Load() if err != nil { panic("加载配置失败: " + err.Error()) } if _, err := pkgBootstrap.EnsureDirectories(cfg, nil); err != nil { panic("初始化目录失败: " + err.Error()) } if err := logger.InitLoggers( cfg.Logging.Level, cfg.Logging.Development, logger.LogRotationConfig{ Filename: cfg.Logging.AppLog.Filename, MaxSize: cfg.Logging.AppLog.MaxSize, MaxBackups: cfg.Logging.AppLog.MaxBackups, MaxAge: cfg.Logging.AppLog.MaxAge, Compress: cfg.Logging.AppLog.Compress, }, logger.LogRotationConfig{ Filename: cfg.Logging.AccessLog.Filename, MaxSize: cfg.Logging.AccessLog.MaxSize, MaxBackups: cfg.Logging.AccessLog.MaxBackups, MaxAge: cfg.Logging.AccessLog.MaxAge, Compress: cfg.Logging.AccessLog.Compress, }, ); err != nil { panic("初始化日志失败: " + err.Error()) } defer func() { _ = logger.Sync() // 忽略 sync 错误 }() appLogger := logger.GetAppLogger() appLogger.Info("Worker 服务启动中...") // 连接 Redis redisAddr := cfg.Redis.Address + ":" + strconv.Itoa(cfg.Redis.Port) redisClient := redis.NewClient(&redis.Options{ Addr: redisAddr, Password: cfg.Redis.Password, DB: cfg.Redis.DB, PoolSize: cfg.Redis.PoolSize, MinIdleConns: cfg.Redis.MinIdleConns, DialTimeout: cfg.Redis.DialTimeout, ReadTimeout: cfg.Redis.ReadTimeout, WriteTimeout: cfg.Redis.WriteTimeout, }) defer func() { if err := redisClient.Close(); err != nil { appLogger.Error("关闭 Redis 客户端失败", zap.Error(err)) } }() // 测试 Redis 连接 ctx := context.Background() if err := redisClient.Ping(ctx).Err(); err != nil { appLogger.Fatal("连接 Redis 失败", zap.Error(err)) } appLogger.Info("Redis 已连接", zap.String("address", redisAddr)) // 初始化 PostgreSQL 连接 db, err := database.InitPostgreSQL(&cfg.Database, appLogger) if err != nil { appLogger.Fatal("初始化 PostgreSQL 失败", zap.Error(err)) } defer func() { sqlDB, _ := db.DB() if sqlDB != nil { if err := sqlDB.Close(); err != nil { appLogger.Error("关闭 PostgreSQL 连接失败", zap.Error(err)) } } }() // 初始化对象存储服务(可选) storageSvc := initStorage(cfg, appLogger) // 初始化 Gateway 客户端(可选,用于轮询任务) gatewayClient := initGateway(cfg, appLogger) // 创建 Asynq 客户端(用于调度器提交任务) asynqClient := asynq.NewClient(asynq.RedisClientOpt{ Addr: redisAddr, Password: cfg.Redis.Password, DB: cfg.Redis.DB, }) defer func() { if err := asynqClient.Close(); err != nil { appLogger.Error("关闭 Asynq 客户端失败", zap.Error(err)) } }() // 创建 Worker 依赖 workerDeps := &bootstrap.WorkerDependencies{ DB: db, Redis: redisClient, Logger: appLogger, AsynqClient: asynqClient, StorageService: storageSvc, GatewayClient: gatewayClient, } // Bootstrap Worker 组件 workerResult, err := bootstrap.BootstrapWorker(workerDeps) if err != nil { appLogger.Fatal("Worker Bootstrap 失败", zap.Error(err)) } // 创建 Asynq Worker 服务器 workerServer := queue.NewServer(redisClient, &cfg.Queue, appLogger) // 初始化轮询调度器(在创建 Handler 之前,因为 Handler 需要使用调度器作为回调) scheduler := polling.NewScheduler(db, redisClient, asynqClient, appLogger) // 注入流量重置服务到调度器 dataResetHandler := polling.NewDataResetHandler(workerResult.Services.ResetService, appLogger) scheduler.SetResetService(dataResetHandler) if err := scheduler.Start(ctx); err != nil { appLogger.Error("启动轮询调度器失败", zap.Error(err)) } else { appLogger.Info("轮询调度器已启动") } // 创建任务处理器管理器并注册所有处理器 taskHandler := queue.NewHandler(db, redisClient, storageSvc, gatewayClient, scheduler, workerResult, asynqClient, appLogger) taskHandler.RegisterHandlers() appLogger.Info("Worker 服务器配置完成", zap.Int("concurrency", cfg.Queue.Concurrency), zap.Any("queues", cfg.Queue.Queues)) // 创建 Asynq Scheduler(定时任务调度器:订单超时、告警检查、数据清理) asynqScheduler := asynq.NewScheduler( asynq.RedisClientOpt{ Addr: redisAddr, Password: cfg.Redis.Password, DB: cfg.Redis.DB, }, &asynq.SchedulerOpts{Location: time.Local}, ) // 注册定时任务:订单超时检查(每分钟) if _, err := asynqScheduler.Register("@every 1m", asynq.NewTask(constants.TaskTypeOrderExpire, nil)); err != nil { appLogger.Fatal("注册订单超时定时任务失败", zap.Error(err)) } // 注册定时任务:告警检查(每分钟) if _, err := asynqScheduler.Register("@every 1m", asynq.NewTask(constants.TaskTypeAlertCheck, nil)); err != nil { appLogger.Fatal("注册告警检查定时任务失败", zap.Error(err)) } // 注册定时任务:数据清理(每天凌晨 2 点) if _, err := asynqScheduler.Register("0 2 * * *", asynq.NewTask(constants.TaskTypeDataCleanup, nil)); err != nil { appLogger.Fatal("注册数据清理定时任务失败", zap.Error(err)) } // 启动 Asynq Scheduler go func() { if err := asynqScheduler.Run(); err != nil { appLogger.Fatal("Asynq Scheduler 启动失败", zap.Error(err)) } }() appLogger.Info("Asynq Scheduler 已启动(订单超时: @every 1m, 告警检查: @every 1m, 数据清理: 0 2 * * *)") // 优雅关闭 quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, syscall.SIGTERM) // 启动 Worker 服务器(阻塞运行) go func() { if err := workerServer.Run(taskHandler.GetMux()); err != nil { appLogger.Fatal("Worker 服务器运行失败", zap.Error(err)) } }() appLogger.Info("Worker 服务器已启动") // 等待关闭信号 <-quit appLogger.Info("正在关闭 Worker 服务器...") // 停止 Asynq Scheduler asynqScheduler.Shutdown() // 停止轮询调度器 scheduler.Stop() // 优雅关闭 Worker 服务器(等待正在执行的任务完成) workerServer.Shutdown() appLogger.Info("Worker 服务器已停止") } func initStorage(cfg *config.Config, appLogger *zap.Logger) *storage.Service { if cfg.Storage.Provider == "" || cfg.Storage.S3.Endpoint == "" { appLogger.Info("对象存储未配置,跳过初始化") return nil } provider, err := storage.NewS3Provider(&cfg.Storage) if err != nil { appLogger.Warn("初始化对象存储失败,功能将不可用", zap.Error(err)) return nil } appLogger.Info("对象存储已初始化", zap.String("provider", cfg.Storage.Provider), zap.String("bucket", cfg.Storage.S3.Bucket), ) return storage.NewService(provider, &cfg.Storage) } // initGateway 初始化 Gateway 客户端 func initGateway(cfg *config.Config, appLogger *zap.Logger) *gateway.Client { if cfg.Gateway.BaseURL == "" { appLogger.Info("Gateway 未配置,跳过初始化(轮询任务将无法查询真实数据)") return nil } client := gateway.NewClient( cfg.Gateway.BaseURL, cfg.Gateway.AppID, cfg.Gateway.AppSecret, ).WithTimeout(time.Duration(cfg.Gateway.Timeout) * time.Second) appLogger.Info("Gateway 客户端初始化成功", zap.String("base_url", cfg.Gateway.BaseURL), zap.String("app_id", cfg.Gateway.AppID)) return client }