package main import ( "context" "os" "os/signal" "strconv" "syscall" "time" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "github.com/break/junhong_cmp_fiber/internal/bootstrap" "github.com/break/junhong_cmp_fiber/internal/gateway" "github.com/break/junhong_cmp_fiber/internal/polling" pollingSvc "github.com/break/junhong_cmp_fiber/internal/service/polling" pkgBootstrap "github.com/break/junhong_cmp_fiber/pkg/bootstrap" "github.com/break/junhong_cmp_fiber/pkg/config" "github.com/break/junhong_cmp_fiber/pkg/database" "github.com/break/junhong_cmp_fiber/pkg/logger" "github.com/break/junhong_cmp_fiber/pkg/queue" "github.com/break/junhong_cmp_fiber/pkg/storage" ) func main() { cfg, err := config.Load() if err != nil { panic("加载配置失败: " + err.Error()) } if _, err := pkgBootstrap.EnsureDirectories(cfg, nil); err != nil { panic("初始化目录失败: " + err.Error()) } if err := logger.InitLoggers( cfg.Logging.Level, cfg.Logging.Development, logger.LogRotationConfig{ Filename: cfg.Logging.AppLog.Filename, MaxSize: cfg.Logging.AppLog.MaxSize, MaxBackups: cfg.Logging.AppLog.MaxBackups, MaxAge: cfg.Logging.AppLog.MaxAge, Compress: cfg.Logging.AppLog.Compress, }, logger.LogRotationConfig{ Filename: cfg.Logging.AccessLog.Filename, MaxSize: cfg.Logging.AccessLog.MaxSize, MaxBackups: cfg.Logging.AccessLog.MaxBackups, MaxAge: cfg.Logging.AccessLog.MaxAge, Compress: cfg.Logging.AccessLog.Compress, }, ); err != nil { panic("初始化日志失败: " + err.Error()) } defer func() { _ = logger.Sync() // 忽略 sync 错误 }() appLogger := logger.GetAppLogger() appLogger.Info("Worker 服务启动中...") // 连接 Redis redisAddr := cfg.Redis.Address + ":" + strconv.Itoa(cfg.Redis.Port) redisClient := redis.NewClient(&redis.Options{ Addr: redisAddr, Password: cfg.Redis.Password, DB: cfg.Redis.DB, PoolSize: cfg.Redis.PoolSize, MinIdleConns: cfg.Redis.MinIdleConns, DialTimeout: cfg.Redis.DialTimeout, ReadTimeout: cfg.Redis.ReadTimeout, WriteTimeout: cfg.Redis.WriteTimeout, }) defer func() { if err := redisClient.Close(); err != nil { appLogger.Error("关闭 Redis 客户端失败", zap.Error(err)) } }() // 测试 Redis 连接 ctx := context.Background() if err := redisClient.Ping(ctx).Err(); err != nil { appLogger.Fatal("连接 Redis 失败", zap.Error(err)) } appLogger.Info("Redis 已连接", zap.String("address", redisAddr)) // 初始化 PostgreSQL 连接 db, err := database.InitPostgreSQL(&cfg.Database, appLogger) if err != nil { appLogger.Fatal("初始化 PostgreSQL 失败", zap.Error(err)) } defer func() { sqlDB, _ := db.DB() if sqlDB != nil { if err := sqlDB.Close(); err != nil { appLogger.Error("关闭 PostgreSQL 连接失败", zap.Error(err)) } } }() // 初始化对象存储服务(可选) storageSvc := initStorage(cfg, appLogger) // 初始化 Gateway 客户端(可选,用于轮询任务) gatewayClient := initGateway(cfg, appLogger) // 创建 Asynq 客户端(用于调度器提交任务) asynqClient := asynq.NewClient(asynq.RedisClientOpt{ Addr: redisAddr, Password: cfg.Redis.Password, DB: cfg.Redis.DB, }) defer func() { if err := asynqClient.Close(); err != nil { appLogger.Error("关闭 Asynq 客户端失败", zap.Error(err)) } }() // 创建 Worker 依赖 workerDeps := &bootstrap.WorkerDependencies{ DB: db, Redis: redisClient, Logger: appLogger, AsynqClient: asynqClient, StorageService: storageSvc, GatewayClient: gatewayClient, } // Bootstrap Worker 组件 workerResult, err := bootstrap.BootstrapWorker(workerDeps) if err != nil { appLogger.Fatal("Worker Bootstrap 失败", zap.Error(err)) } // 创建 Asynq Worker 服务器 workerServer := queue.NewServer(redisClient, &cfg.Queue, appLogger) // 初始化轮询调度器(在创建 Handler 之前,因为 Handler 需要使用调度器作为回调) scheduler := polling.NewScheduler(db, redisClient, asynqClient, appLogger) // 注入流量重置服务到调度器 dataResetHandler := polling.NewDataResetHandler(workerResult.Services.ResetService, appLogger) scheduler.SetResetService(dataResetHandler) if err := scheduler.Start(ctx); err != nil { appLogger.Error("启动轮询调度器失败", zap.Error(err)) } else { appLogger.Info("轮询调度器已启动") } // 创建任务处理器管理器并注册所有处理器 taskHandler := queue.NewHandler(db, redisClient, storageSvc, gatewayClient, scheduler, workerResult, asynqClient, appLogger) taskHandler.RegisterHandlers() appLogger.Info("Worker 服务器配置完成", zap.Int("concurrency", cfg.Queue.Concurrency), zap.Any("queues", cfg.Queue.Queues)) // 初始化告警服务并启动告警检查器 alertChecker := startAlertChecker(ctx, workerResult.Services.AlertService, appLogger) // 初始化数据清理服务并启动定时清理任务 cleanupChecker := startCleanupScheduler(ctx, workerResult.Services.CleanupService, appLogger) // 优雅关闭 quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, syscall.SIGTERM) // 启动 Worker 服务器(阻塞运行) go func() { if err := workerServer.Run(taskHandler.GetMux()); err != nil { appLogger.Fatal("Worker 服务器运行失败", zap.Error(err)) } }() appLogger.Info("Worker 服务器已启动") // 等待关闭信号 <-quit appLogger.Info("正在关闭 Worker 服务器...") // 停止告警检查器 close(alertChecker) // 停止数据清理定时任务 close(cleanupChecker) // 停止轮询调度器 scheduler.Stop() // 优雅关闭 Worker 服务器(等待正在执行的任务完成) workerServer.Shutdown() appLogger.Info("Worker 服务器已停止") } func initStorage(cfg *config.Config, appLogger *zap.Logger) *storage.Service { if cfg.Storage.Provider == "" || cfg.Storage.S3.Endpoint == "" { appLogger.Info("对象存储未配置,跳过初始化") return nil } provider, err := storage.NewS3Provider(&cfg.Storage) if err != nil { appLogger.Warn("初始化对象存储失败,功能将不可用", zap.Error(err)) return nil } appLogger.Info("对象存储已初始化", zap.String("provider", cfg.Storage.Provider), zap.String("bucket", cfg.Storage.S3.Bucket), ) return storage.NewService(provider, &cfg.Storage) } // initGateway 初始化 Gateway 客户端 func initGateway(cfg *config.Config, appLogger *zap.Logger) *gateway.Client { if cfg.Gateway.BaseURL == "" { appLogger.Info("Gateway 未配置,跳过初始化(轮询任务将无法查询真实数据)") return nil } client := gateway.NewClient( cfg.Gateway.BaseURL, cfg.Gateway.AppID, cfg.Gateway.AppSecret, ).WithTimeout(time.Duration(cfg.Gateway.Timeout) * time.Second) appLogger.Info("Gateway 客户端初始化成功", zap.String("base_url", cfg.Gateway.BaseURL), zap.String("app_id", cfg.Gateway.AppID)) return client } func startAlertChecker(ctx context.Context, alertService *pollingSvc.AlertService, appLogger *zap.Logger) chan struct{} { stopChan := make(chan struct{}) go func() { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() appLogger.Info("告警检查器已启动,检查间隔: 1分钟") for { select { case <-ticker.C: if err := alertService.CheckAlerts(ctx); err != nil { appLogger.Error("告警检查失败", zap.Error(err)) } case <-stopChan: appLogger.Info("告警检查器已停止") return case <-ctx.Done(): appLogger.Info("告警检查器因 context 取消而停止") return } } }() return stopChan } func startCleanupScheduler(ctx context.Context, cleanupService *pollingSvc.CleanupService, appLogger *zap.Logger) chan struct{} { stopChan := make(chan struct{}) go func() { calcNextRun := func() time.Duration { now := time.Now() next := time.Date(now.Year(), now.Month(), now.Day(), 2, 0, 0, 0, now.Location()) if now.After(next) { next = next.Add(24 * time.Hour) } return time.Until(next) } timer := time.NewTimer(calcNextRun()) defer timer.Stop() appLogger.Info("数据清理定时任务已启动,每天凌晨2点执行") for { select { case <-timer.C: appLogger.Info("开始执行定时数据清理") if err := cleanupService.RunScheduledCleanup(ctx); err != nil { appLogger.Error("定时数据清理失败", zap.Error(err)) } timer.Reset(calcNextRun()) case <-stopChan: appLogger.Info("数据清理定时任务已停止") return case <-ctx.Done(): appLogger.Info("数据清理定时任务因 context 取消而停止") return } } }() return stopChan }