From e661b59bb9458cb86b0f71774ca0017baa020906 Mon Sep 17 00:00:00 2001 From: huang Date: Sat, 28 Feb 2026 17:16:15 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E8=AE=A2=E5=8D=95?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E8=87=AA=E5=8A=A8=E5=8F=96=E6=B6=88=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=EF=BC=8C=E6=94=AF=E6=8C=81=E9=92=B1=E5=8C=85=E4=BD=99?= =?UTF-8?q?=E9=A2=9D=E8=A7=A3=E5=86=BB=E5=92=8C=20Asynq=20Scheduler=20?= =?UTF-8?q?=E7=BB=9F=E4=B8=80=E8=B0=83=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 expires_at 字段和复合索引,待支付订单 30 分钟超时自动取消 - 实现 cancelOrder/unfreezeWalletForCancel 钱包余额解冻逻辑 - 创建 Asynq 定时任务(order_expire/alert_check/data_cleanup) - 将原有 time.Ticker 轮询迁移至 Asynq Scheduler 统一调度 - 同步 delta specs 到 main specs 并归档变更 --- README.md | 1 + cmd/worker/main.go | 109 +++----- docs/order-expiration/功能总结.md | 181 +++++++++++++ internal/bootstrap/worker_services.go | 23 ++ internal/bootstrap/worker_stores.go | 3 + internal/model/agent_wallet.go | 42 ++-- internal/model/dto/iot_card_dto.go | 4 +- internal/model/dto/order_dto.go | 35 +-- internal/model/iot_card.go | 4 +- internal/model/order.go | 3 + internal/service/enterprise_device/service.go | 2 - internal/service/iot_card/service.go | 6 +- internal/service/order/service.go | 133 +++++++++- internal/service/package_series/service.go | 4 +- internal/store/postgres/order_store.go | 32 ++- internal/task/alert_check.go | 35 +++ internal/task/data_cleanup.go | 37 +++ internal/task/order_expire.go | 43 ++++ .../000069_add_order_expiration.down.sql | 22 ++ migrations/000069_add_order_expiration.up.sql | 24 ++ .../.openspec.yaml | 0 .../design.md | 0 .../proposal.md | 0 .../specs/iot-order/spec.md | 0 .../specs/order-expiration/spec.md | 0 .../specs/order-payment/spec.md | 0 .../tasks.md | 184 ++++++++++++++ .../implement-order-expiration/tasks.md | 184 -------------- openspec/specs/iot-order/spec.md | 23 +- openspec/specs/order-expiration/spec.md | 237 ++++++++++++++++++ openspec/specs/order-payment/spec.md | 53 ++++ pkg/constants/constants.go | 13 + pkg/constants/wallet.go | 2 +- pkg/queue/handler.go | 21 ++ pkg/queue/types.go | 11 + 35 files changed, 1157 insertions(+), 314 deletions(-) create mode 100644 docs/order-expiration/功能总结.md create mode 100644 internal/task/alert_check.go create mode 100644 internal/task/data_cleanup.go create mode 100644 internal/task/order_expire.go create mode 100644 migrations/000069_add_order_expiration.down.sql create mode 100644 migrations/000069_add_order_expiration.up.sql rename openspec/changes/{implement-order-expiration => archive/2025-07-27-implement-order-expiration}/.openspec.yaml (100%) rename openspec/changes/{implement-order-expiration => archive/2025-07-27-implement-order-expiration}/design.md (100%) rename openspec/changes/{implement-order-expiration => archive/2025-07-27-implement-order-expiration}/proposal.md (100%) rename openspec/changes/{implement-order-expiration => archive/2025-07-27-implement-order-expiration}/specs/iot-order/spec.md (100%) rename openspec/changes/{implement-order-expiration => archive/2025-07-27-implement-order-expiration}/specs/order-expiration/spec.md (100%) rename openspec/changes/{implement-order-expiration => archive/2025-07-27-implement-order-expiration}/specs/order-payment/spec.md (100%) create mode 100644 openspec/changes/archive/2025-07-27-implement-order-expiration/tasks.md delete mode 100644 openspec/changes/implement-order-expiration/tasks.md create mode 100644 openspec/specs/order-expiration/spec.md diff --git a/README.md b/README.md index eb5529b..617f772 100644 --- a/README.md +++ b/README.md @@ -222,6 +222,7 @@ default: - **分佣验证指引**:对代理分佣的冻结、解冻、提现校验流程进行了结构化说明与流程图,详见 [分佣逻辑正确与否验证](docs/优化说明/分佣逻辑正确与否验证.md) - **对象存储**:S3 兼容的对象存储服务集成(联通云 OSS),支持预签名 URL 上传、文件下载、临时文件处理;用于 ICCID 批量导入、数据导出等场景;详见 [使用指南](docs/object-storage/使用指南.md) 和 [前端接入指南](docs/object-storage/前端接入指南.md) - **微信集成**:完整的微信公众号 OAuth 认证和微信支付功能(JSAPI + H5),使用 PowerWeChat v3 SDK;支持个人客户微信授权登录、账号绑定、微信内支付和浏览器 H5 支付;支付回调自动验证签名和幂等性处理;详见 [使用指南](docs/wechat-integration/使用指南.md) 和 [API 文档](docs/wechat-integration/API文档.md) +- **订单超时自动取消**:待支付订单(微信/支付宝)30 分钟超时自动取消,支持钱包余额解冻;使用 Asynq Scheduler 每分钟扫描,取代原有 time.Ticker 实现;同时将告警检查和数据清理迁移至 Asynq Scheduler 统一调度;详见 [功能总结](docs/order-expiration/功能总结.md) ## 用户体系设计 diff --git a/cmd/worker/main.go b/cmd/worker/main.go index e0ec701..c931c4b 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -15,9 +15,9 @@ import ( "github.com/break/junhong_cmp_fiber/internal/bootstrap" "github.com/break/junhong_cmp_fiber/internal/gateway" "github.com/break/junhong_cmp_fiber/internal/polling" - pollingSvc "github.com/break/junhong_cmp_fiber/internal/service/polling" pkgBootstrap "github.com/break/junhong_cmp_fiber/pkg/bootstrap" "github.com/break/junhong_cmp_fiber/pkg/config" + "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/database" "github.com/break/junhong_cmp_fiber/pkg/logger" "github.com/break/junhong_cmp_fiber/pkg/queue" @@ -158,11 +158,36 @@ func main() { zap.Int("concurrency", cfg.Queue.Concurrency), zap.Any("queues", cfg.Queue.Queues)) - // 初始化告警服务并启动告警检查器 - alertChecker := startAlertChecker(ctx, workerResult.Services.AlertService, appLogger) + // 创建 Asynq Scheduler(定时任务调度器:订单超时、告警检查、数据清理) + asynqScheduler := asynq.NewScheduler( + asynq.RedisClientOpt{ + Addr: redisAddr, + Password: cfg.Redis.Password, + DB: cfg.Redis.DB, + }, + &asynq.SchedulerOpts{Location: time.Local}, + ) - // 初始化数据清理服务并启动定时清理任务 - cleanupChecker := startCleanupScheduler(ctx, workerResult.Services.CleanupService, appLogger) + // 注册定时任务:订单超时检查(每分钟) + if _, err := asynqScheduler.Register("@every 1m", asynq.NewTask(constants.TaskTypeOrderExpire, nil)); err != nil { + appLogger.Fatal("注册订单超时定时任务失败", zap.Error(err)) + } + // 注册定时任务:告警检查(每分钟) + if _, err := asynqScheduler.Register("@every 1m", asynq.NewTask(constants.TaskTypeAlertCheck, nil)); err != nil { + appLogger.Fatal("注册告警检查定时任务失败", zap.Error(err)) + } + // 注册定时任务:数据清理(每天凌晨 2 点) + if _, err := asynqScheduler.Register("0 2 * * *", asynq.NewTask(constants.TaskTypeDataCleanup, nil)); err != nil { + appLogger.Fatal("注册数据清理定时任务失败", zap.Error(err)) + } + + // 启动 Asynq Scheduler + go func() { + if err := asynqScheduler.Run(); err != nil { + appLogger.Fatal("Asynq Scheduler 启动失败", zap.Error(err)) + } + }() + appLogger.Info("Asynq Scheduler 已启动(订单超时: @every 1m, 告警检查: @every 1m, 数据清理: 0 2 * * *)") // 优雅关闭 quit := make(chan os.Signal, 1) @@ -181,11 +206,8 @@ func main() { <-quit appLogger.Info("正在关闭 Worker 服务器...") - // 停止告警检查器 - close(alertChecker) - - // 停止数据清理定时任务 - close(cleanupChecker) + // 停止 Asynq Scheduler + asynqScheduler.Shutdown() // 停止轮询调度器 scheduler.Stop() @@ -235,70 +257,3 @@ func initGateway(cfg *config.Config, appLogger *zap.Logger) *gateway.Client { return client } - -func startAlertChecker(ctx context.Context, alertService *pollingSvc.AlertService, appLogger *zap.Logger) chan struct{} { - stopChan := make(chan struct{}) - - go func() { - ticker := time.NewTicker(1 * time.Minute) - defer ticker.Stop() - - appLogger.Info("告警检查器已启动,检查间隔: 1分钟") - - for { - select { - case <-ticker.C: - if err := alertService.CheckAlerts(ctx); err != nil { - appLogger.Error("告警检查失败", zap.Error(err)) - } - case <-stopChan: - appLogger.Info("告警检查器已停止") - return - case <-ctx.Done(): - appLogger.Info("告警检查器因 context 取消而停止") - return - } - } - }() - - return stopChan -} - -func startCleanupScheduler(ctx context.Context, cleanupService *pollingSvc.CleanupService, appLogger *zap.Logger) chan struct{} { - stopChan := make(chan struct{}) - - go func() { - calcNextRun := func() time.Duration { - now := time.Now() - next := time.Date(now.Year(), now.Month(), now.Day(), 2, 0, 0, 0, now.Location()) - if now.After(next) { - next = next.Add(24 * time.Hour) - } - return time.Until(next) - } - - timer := time.NewTimer(calcNextRun()) - defer timer.Stop() - - appLogger.Info("数据清理定时任务已启动,每天凌晨2点执行") - - for { - select { - case <-timer.C: - appLogger.Info("开始执行定时数据清理") - if err := cleanupService.RunScheduledCleanup(ctx); err != nil { - appLogger.Error("定时数据清理失败", zap.Error(err)) - } - timer.Reset(calcNextRun()) - case <-stopChan: - appLogger.Info("数据清理定时任务已停止") - return - case <-ctx.Done(): - appLogger.Info("数据清理定时任务因 context 取消而停止") - return - } - } - }() - - return stopChan -} diff --git a/docs/order-expiration/功能总结.md b/docs/order-expiration/功能总结.md new file mode 100644 index 0000000..60b50ab --- /dev/null +++ b/docs/order-expiration/功能总结.md @@ -0,0 +1,181 @@ +# 订单超时自动取消功能 + +## 功能概述 + +为待支付订单(微信/支付宝)添加 30 分钟超时自动取消机制。超时后自动取消订单并解冻钱包余额(如有冻结)。 + +## 核心设计 + +### 超时流程 + +``` +用户下单(微信/支付宝) +├── 设置 expires_at = 当前时间 + 30 分钟 +├── 订单状态: payment_status = 1(待支付) +│ +├── 场景 1: 用户在 30 分钟内支付 +│ ├── 支付成功 → 清除 expires_at(设为 NULL) +│ └── 订单正常完成 +│ +└── 场景 2: 超过 30 分钟未支付 + ├── Asynq Scheduler 每分钟触发扫描 + ├── 查询 expires_at <= NOW() AND payment_status = 1 + ├── 取消订单 → payment_status = 5(已取消) + ├── 清除 expires_at + └── 解冻钱包余额(如有) +``` + +### 不设置超时的场景 + +- **钱包支付**:立即扣款,无需超时 +- **线下支付**:管理员手动确认,无需超时 +- **混合支付**:需要在线支付部分才设置超时 + +## 技术实现 + +### 数据库变更 + +```sql +-- 迁移文件: migrations/000069_add_order_expiration.up.sql +ALTER TABLE tb_order ADD COLUMN expires_at TIMESTAMPTZ; + +-- 部分索引: 仅索引待支付订单,减少索引大小 +CREATE INDEX idx_order_expires ON tb_order (expires_at, payment_status) +WHERE expires_at IS NOT NULL AND payment_status = 1; +``` + +### 涉及文件 + +| 层级 | 文件 | 变更说明 | +|------|------|----------| +| 迁移 | `migrations/000069_add_order_expiration.up.sql` | 添加 expires_at 字段和索引 | +| 迁移 | `migrations/000069_add_order_expiration.down.sql` | 回滚脚本 | +| 常量 | `pkg/constants/constants.go` | 添加任务类型和超时参数 | +| 模型 | `internal/model/order.go` | 添加 ExpiresAt 字段 | +| DTO | `internal/model/dto/order_dto.go` | 添加 ExpiresAt、IsExpired 响应字段 | +| Store | `internal/store/postgres/order_store.go` | 添加 FindExpiredOrders、is_expired 过滤 | +| Service | `internal/service/order/service.go` | 创建订单设置超时、取消逻辑、批量取消 | +| 任务 | `internal/task/order_expire.go` | 订单超时任务处理器 | +| 任务 | `internal/task/alert_check.go` | 告警检查任务处理器(从 ticker 迁移) | +| 任务 | `internal/task/data_cleanup.go` | 数据清理任务处理器(从 ticker 迁移) | +| 队列 | `pkg/queue/types.go` | 添加 OrderExpirer 接口和 WorkerStores/Services 字段 | +| 队列 | `pkg/queue/handler.go` | 注册 3 个新任务处理器 | +| Bootstrap | `internal/bootstrap/worker_stores.go` | 添加 CardWallet Store | +| Bootstrap | `internal/bootstrap/worker_services.go` | 添加 OrderService 初始化 | +| Worker | `cmd/worker/main.go` | 替换 ticker 为 Asynq Scheduler | + +### 常量定义 + +```go +// pkg/constants/constants.go +TaskTypeOrderExpire = "order:expire" // 订单超时任务 +TaskTypeAlertCheck = "alert:check" // 告警检查任务 +TaskTypeDataCleanup = "data:cleanup" // 数据清理任务 +OrderExpireTimeout = 30 * time.Minute // 订单超时时间 +OrderExpireBatchSize = 100 // 每次批量取消数量 +``` + +### 接口变更 + +#### 订单列表查询新增过滤参数 + +``` +GET /api/admin/orders?is_expired=true +GET /api/h5/orders?is_expired=true +``` + +- `is_expired=true`: 仅返回已超时的订单 +- `is_expired=false`: 仅返回未超时的订单 + +#### 订单响应新增字段 + +```json +{ + "expires_at": "2025-02-28T12:30:00+08:00", + "is_expired": false +} +``` + +- `expires_at`: 超时时间,`null` 表示无超时(钱包/线下支付) +- `is_expired`: 是否已超时(计算字段) + +## 定时任务调度器重构 + +### 变更前(time.Ticker) + +```go +// cmd/worker/main.go 中的 goroutine +alertChecker := startAlertChecker(ctx, ...) // time.Ticker 每分钟 +cleanupChecker := startCleanupScheduler(ctx, ...) // time.Timer 每天凌晨 2 点 +``` + +**问题**: +- 单点运行,无法分布式 +- 无重试机制 +- 无任务状态监控 + +### 变更后(Asynq Scheduler) + +```go +// Asynq Scheduler 统一管理 +asynqScheduler.Register("@every 1m", asynq.NewTask("order:expire", nil)) +asynqScheduler.Register("@every 1m", asynq.NewTask("alert:check", nil)) +asynqScheduler.Register("0 2 * * *", asynq.NewTask("data:cleanup", nil)) +``` + +**优势**: +- 通过 Redis 实现分布式调度 +- 自动重试失败任务 +- 可通过 Asynq Dashboard 监控 +- 统一的任务处理模式 + +### 调度规则 + +| 任务 | 调度表达式 | 说明 | +|------|-----------|------| +| 订单超时取消 | `@every 1m` | 每分钟扫描一次 | +| 告警检查 | `@every 1m` | 每分钟检查一次 | +| 数据清理 | `0 2 * * *` | 每天凌晨 2 点执行 | + +## 钱包解冻逻辑 + +### 取消订单时的解冻流程 + +``` +cancelOrder(ctx, order) +├── 幂等更新: WHERE payment_status = 1 → 5 +├── 清除 expires_at +│ +├── 如果是代理钱包支付 (payment_method = wallet, buyer_type = agent) +│ └── AgentWalletStore.UnfreezeBalanceWithTx(tx, shopID, amount) +│ +└── 如果是卡钱包支付 (payment_method = wallet/mixed, buyer_type != agent) + └── 直接更新 frozen_balance -= amount (WHERE frozen_balance >= amount) +``` + +### 幂等性保障 + +- 使用 `WHERE payment_status = 1` 条件更新,确保只取消待支付订单 +- `RowsAffected == 0` 说明订单已被处理(已支付或已取消),直接跳过 +- 批量取消时,单个订单失败不影响其他订单 + +## 循环依赖解决方案 + +`internal/service/order` 导入 `pkg/queue`(使用 queue.Client),而 `pkg/queue/types.go` 需要引用 OrderService。 + +**解决方案**:在 `pkg/queue/types.go` 定义 `OrderExpirer` 接口,`internal/task/order_expire.go` 定义同名局部接口。Go 的结构化类型系统使 `order.Service` 自动满足两个接口,无需显式声明。 + +```go +// pkg/queue/types.go +type OrderExpirer interface { + CancelExpiredOrders(ctx context.Context) (int, error) +} + +// WorkerServices 中使用接口类型 +OrderExpirer OrderExpirer + +// internal/task/order_expire.go(局部接口,避免导入 pkg/queue) +type OrderExpirer interface { + CancelExpiredOrders(ctx context.Context) (int, error) +} +``` diff --git a/internal/bootstrap/worker_services.go b/internal/bootstrap/worker_services.go index de23526..c6afd3d 100644 --- a/internal/bootstrap/worker_services.go +++ b/internal/bootstrap/worker_services.go @@ -3,6 +3,7 @@ package bootstrap import ( "github.com/break/junhong_cmp_fiber/internal/service/commission_calculation" "github.com/break/junhong_cmp_fiber/internal/service/commission_stats" + orderSvc "github.com/break/junhong_cmp_fiber/internal/service/order" packagepkg "github.com/break/junhong_cmp_fiber/internal/service/package" pollingSvc "github.com/break/junhong_cmp_fiber/internal/service/polling" "github.com/break/junhong_cmp_fiber/pkg/queue" @@ -77,6 +78,27 @@ func initWorkerServices(stores *queue.WorkerStores, deps *WorkerDependencies) *q deps.Logger, ) + // 初始化订单服务(仅用于超时自动取消,不需要微信支付和队列客户端) + orderService := orderSvc.New( + deps.DB, + deps.Redis, + stores.Order, + stores.OrderItem, + stores.AgentWallet, + stores.CardWallet, + nil, // purchaseValidationService: 超时取消不需要 + stores.ShopPackageAllocation, + stores.ShopSeriesAllocation, + stores.IotCard, + stores.Device, + stores.PackageSeries, + stores.PackageUsage, + stores.Package, + nil, // wechatPayment: 超时取消不需要 + nil, // queueClient: 超时取消不触发分佣 + deps.Logger, + ) + return &queue.WorkerServices{ CommissionCalculation: commissionCalculationService, CommissionStats: commissionStatsService, @@ -85,5 +107,6 @@ func initWorkerServices(stores *queue.WorkerStores, deps *WorkerDependencies) *q ResetService: resetService, AlertService: alertService, CleanupService: cleanupService, + OrderExpirer: orderService, } } diff --git a/internal/bootstrap/worker_stores.go b/internal/bootstrap/worker_stores.go index e968439..ea0ce79 100644 --- a/internal/bootstrap/worker_stores.go +++ b/internal/bootstrap/worker_stores.go @@ -28,6 +28,7 @@ type workerStores struct { DataCleanupLog *postgres.DataCleanupLogStore AgentWallet *postgres.AgentWalletStore AgentWalletTransaction *postgres.AgentWalletTransactionStore + CardWallet *postgres.CardWalletStore } func initWorkerStores(deps *WorkerDependencies) *queue.WorkerStores { @@ -54,6 +55,7 @@ func initWorkerStores(deps *WorkerDependencies) *queue.WorkerStores { DataCleanupLog: postgres.NewDataCleanupLogStore(deps.DB), AgentWallet: postgres.NewAgentWalletStore(deps.DB, deps.Redis), AgentWalletTransaction: postgres.NewAgentWalletTransactionStore(deps.DB, deps.Redis), + CardWallet: postgres.NewCardWalletStore(deps.DB, deps.Redis), } return &queue.WorkerStores{ @@ -79,5 +81,6 @@ func initWorkerStores(deps *WorkerDependencies) *queue.WorkerStores { DataCleanupLog: stores.DataCleanupLog, AgentWallet: stores.AgentWallet, AgentWalletTransaction: stores.AgentWalletTransaction, + CardWallet: stores.CardWallet, } } diff --git a/internal/model/agent_wallet.go b/internal/model/agent_wallet.go index cea6965..654fb5f 100644 --- a/internal/model/agent_wallet.go +++ b/internal/model/agent_wallet.go @@ -37,27 +37,27 @@ func (w *AgentWallet) GetAvailableBalance() int64 { // AgentWalletTransaction 代理钱包交易记录模型 // 记录所有代理钱包余额变动 type AgentWalletTransaction struct { - ID uint `gorm:"column:id;primaryKey" json:"id"` - AgentWalletID uint `gorm:"column:agent_wallet_id;not null;index;comment:代理钱包ID" json:"agent_wallet_id"` - ShopID uint `gorm:"column:shop_id;not null;index;comment:店铺ID(冗余字段,便于查询)" json:"shop_id"` - UserID uint `gorm:"column:user_id;not null;comment:操作人用户ID" json:"user_id"` - TransactionType string `gorm:"column:transaction_type;type:varchar(20);not null;comment:交易类型(recharge-充值 | deduct-扣款 | refund-退款 | commission-分佣 | withdrawal-提现)" json:"transaction_type"` - TransactionSubtype *string `gorm:"column:transaction_subtype;type:varchar(50);comment:交易子类型(细分 order_payment 场景)" json:"transaction_subtype,omitempty"` - Amount int64 `gorm:"column:amount;type:bigint;not null;comment:变动金额(单位:分,正数为增加,负数为减少)" json:"amount"` - BalanceBefore int64 `gorm:"column:balance_before;type:bigint;not null;comment:变动前余额(单位:分)" json:"balance_before"` - BalanceAfter int64 `gorm:"column:balance_after;type:bigint;not null;comment:变动后余额(单位:分)" json:"balance_after"` - Status int `gorm:"column:status;type:int;not null;default:1;comment:交易状态(1-成功 2-失败 3-处理中)" json:"status"` - ReferenceType *string `gorm:"column:reference_type;type:varchar(50);comment:关联业务类型(order | commission | withdrawal | topup)" json:"reference_type,omitempty"` - ReferenceID *uint `gorm:"column:reference_id;comment:关联业务ID" json:"reference_id,omitempty"` - RelatedShopID *uint `gorm:"column:related_shop_id;comment:关联店铺ID(代购时记录下级店铺)" json:"related_shop_id,omitempty"` - Remark *string `gorm:"column:remark;type:text;comment:备注" json:"remark,omitempty"` - Metadata *string `gorm:"column:metadata;type:jsonb;comment:扩展信息(如手续费、支付方式等)" json:"metadata,omitempty"` - Creator uint `gorm:"column:creator;not null;comment:创建人ID" json:"creator"` - ShopIDTag uint `gorm:"column:shop_id_tag;not null;index;comment:店铺ID标签(多租户过滤)" json:"shop_id_tag"` - EnterpriseIDTag *uint `gorm:"column:enterprise_id_tag;index;comment:企业ID标签(多租户过滤)" json:"enterprise_id_tag,omitempty"` - CreatedAt time.Time `gorm:"column:created_at;not null;default:CURRENT_TIMESTAMP" json:"created_at"` - UpdatedAt time.Time `gorm:"column:updated_at;not null;default:CURRENT_TIMESTAMP" json:"updated_at"` - DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;index" json:"deleted_at,omitempty"` + ID uint `gorm:"column:id;primaryKey" json:"id"` + AgentWalletID uint `gorm:"column:agent_wallet_id;not null;index;comment:代理钱包ID" json:"agent_wallet_id"` + ShopID uint `gorm:"column:shop_id;not null;index;comment:店铺ID(冗余字段,便于查询)" json:"shop_id"` + UserID uint `gorm:"column:user_id;not null;comment:操作人用户ID" json:"user_id"` + TransactionType string `gorm:"column:transaction_type;type:varchar(20);not null;comment:交易类型(recharge-充值 | deduct-扣款 | refund-退款 | commission-分佣 | withdrawal-提现)" json:"transaction_type"` + TransactionSubtype *string `gorm:"column:transaction_subtype;type:varchar(50);comment:交易子类型(细分 order_payment 场景)" json:"transaction_subtype,omitempty"` + Amount int64 `gorm:"column:amount;type:bigint;not null;comment:变动金额(单位:分,正数为增加,负数为减少)" json:"amount"` + BalanceBefore int64 `gorm:"column:balance_before;type:bigint;not null;comment:变动前余额(单位:分)" json:"balance_before"` + BalanceAfter int64 `gorm:"column:balance_after;type:bigint;not null;comment:变动后余额(单位:分)" json:"balance_after"` + Status int `gorm:"column:status;type:int;not null;default:1;comment:交易状态(1-成功 2-失败 3-处理中)" json:"status"` + ReferenceType *string `gorm:"column:reference_type;type:varchar(50);comment:关联业务类型(order | commission | withdrawal | topup)" json:"reference_type,omitempty"` + ReferenceID *uint `gorm:"column:reference_id;comment:关联业务ID" json:"reference_id,omitempty"` + RelatedShopID *uint `gorm:"column:related_shop_id;comment:关联店铺ID(代购时记录下级店铺)" json:"related_shop_id,omitempty"` + Remark *string `gorm:"column:remark;type:text;comment:备注" json:"remark,omitempty"` + Metadata *string `gorm:"column:metadata;type:jsonb;comment:扩展信息(如手续费、支付方式等)" json:"metadata,omitempty"` + Creator uint `gorm:"column:creator;not null;comment:创建人ID" json:"creator"` + ShopIDTag uint `gorm:"column:shop_id_tag;not null;index;comment:店铺ID标签(多租户过滤)" json:"shop_id_tag"` + EnterpriseIDTag *uint `gorm:"column:enterprise_id_tag;index;comment:企业ID标签(多租户过滤)" json:"enterprise_id_tag,omitempty"` + CreatedAt time.Time `gorm:"column:created_at;not null;default:CURRENT_TIMESTAMP" json:"created_at"` + UpdatedAt time.Time `gorm:"column:updated_at;not null;default:CURRENT_TIMESTAMP" json:"updated_at"` + DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;index" json:"deleted_at,omitempty"` } // TableName 指定表名 diff --git a/internal/model/dto/iot_card_dto.go b/internal/model/dto/iot_card_dto.go index 35deed7..6c10caf 100644 --- a/internal/model/dto/iot_card_dto.go +++ b/internal/model/dto/iot_card_dto.go @@ -29,8 +29,8 @@ type StandaloneIotCardResponse struct { IMSI string `json:"imsi,omitempty" description:"IMSI"` MSISDN string `json:"msisdn,omitempty" description:"卡接入号"` BatchNo string `json:"batch_no,omitempty" description:"批次号"` - Supplier string `json:"supplier,omitempty" description:"供应商"` - Status int `json:"status" description:"状态 (1:在库, 2:已分销, 3:已激活, 4:已停用)"` + Supplier string `json:"supplier,omitempty" description:"供应商"` + Status int `json:"status" description:"状态 (1:在库, 2:已分销, 3:已激活, 4:已停用)"` ShopID *uint `json:"shop_id,omitempty" description:"店铺ID"` ShopName string `json:"shop_name,omitempty" description:"店铺名称"` ActivatedAt *time.Time `json:"activated_at,omitempty" description:"激活时间"` diff --git a/internal/model/dto/order_dto.go b/internal/model/dto/order_dto.go index c61553a..d614d36 100644 --- a/internal/model/dto/order_dto.go +++ b/internal/model/dto/order_dto.go @@ -28,6 +28,7 @@ type OrderListRequest struct { PurchaseRole string `json:"purchase_role" query:"purchase_role" validate:"omitempty,oneof=self_purchase purchased_by_parent purchased_by_platform purchase_for_subordinate" description:"订单角色 (self_purchase:自己购买, purchased_by_parent:上级代理购买, purchased_by_platform:平台代购, purchase_for_subordinate:给下级购买)"` StartTime *time.Time `json:"start_time" query:"start_time" description:"创建时间起始"` EndTime *time.Time `json:"end_time" query:"end_time" description:"创建时间结束"` + IsExpired *bool `json:"is_expired" query:"is_expired" description:"是否已过期 (true:已过期, false:未过期)"` } type PayOrderRequest struct { @@ -44,21 +45,21 @@ type OrderItemResponse struct { } type OrderResponse struct { - ID uint `json:"id" description:"订单ID"` - OrderNo string `json:"order_no" description:"订单号"` - OrderType string `json:"order_type" description:"订单类型 (single_card:单卡购买, device:设备购买)"` - BuyerType string `json:"buyer_type" description:"买家类型 (personal:个人客户, agent:代理商)"` - BuyerID uint `json:"buyer_id" description:"买家ID"` - IotCardID *uint `json:"iot_card_id,omitempty" description:"IoT卡ID"` - DeviceID *uint `json:"device_id,omitempty" description:"设备ID"` - TotalAmount int64 `json:"total_amount" description:"订单总金额(分)"` - PaymentMethod string `json:"payment_method,omitempty" description:"支付方式 (wallet:钱包支付, wechat:微信支付, alipay:支付宝支付)"` - PaymentStatus int `json:"payment_status" description:"支付状态 (1:待支付, 2:已支付, 3:已取消, 4:已退款)"` - PaymentStatusText string `json:"payment_status_text" description:"支付状态文本"` - PaidAt *time.Time `json:"paid_at,omitempty" description:"支付时间"` - IsPurchaseOnBehalf bool `json:"is_purchase_on_behalf" description:"是否为代购订单"` - CommissionStatus int `json:"commission_status" description:"佣金状态 (1:待计算, 2:已计算)"` - CommissionConfigVersion int `json:"commission_config_version" description:"佣金配置版本"` + ID uint `json:"id" description:"订单ID"` + OrderNo string `json:"order_no" description:"订单号"` + OrderType string `json:"order_type" description:"订单类型 (single_card:单卡购买, device:设备购买)"` + BuyerType string `json:"buyer_type" description:"买家类型 (personal:个人客户, agent:代理商)"` + BuyerID uint `json:"buyer_id" description:"买家ID"` + IotCardID *uint `json:"iot_card_id,omitempty" description:"IoT卡ID"` + DeviceID *uint `json:"device_id,omitempty" description:"设备ID"` + TotalAmount int64 `json:"total_amount" description:"订单总金额(分)"` + PaymentMethod string `json:"payment_method,omitempty" description:"支付方式 (wallet:钱包支付, wechat:微信支付, alipay:支付宝支付)"` + PaymentStatus int `json:"payment_status" description:"支付状态 (1:待支付, 2:已支付, 3:已取消, 4:已退款)"` + PaymentStatusText string `json:"payment_status_text" description:"支付状态文本"` + PaidAt *time.Time `json:"paid_at,omitempty" description:"支付时间"` + IsPurchaseOnBehalf bool `json:"is_purchase_on_behalf" description:"是否为代购订单"` + CommissionStatus int `json:"commission_status" description:"佣金状态 (1:待计算, 2:已计算)"` + CommissionConfigVersion int `json:"commission_config_version" description:"佣金配置版本"` // 操作者信息 OperatorID *uint `json:"operator_id,omitempty" description:"操作者ID"` @@ -74,6 +75,10 @@ type OrderResponse struct { Items []*OrderItemResponse `json:"items" description:"订单明细列表"` CreatedAt time.Time `json:"created_at" description:"创建时间"` UpdatedAt time.Time `json:"updated_at" description:"更新时间"` + + // 订单超时信息 + ExpiresAt *time.Time `json:"expires_at,omitempty" description:"订单过期时间"` + IsExpired bool `json:"is_expired" description:"是否已过期"` } type OrderListResponse struct { diff --git a/internal/model/iot_card.go b/internal/model/iot_card.go index c2a5758..8018e0b 100644 --- a/internal/model/iot_card.go +++ b/internal/model/iot_card.go @@ -22,8 +22,8 @@ type IotCard struct { IMSI string `gorm:"column:imsi;type:varchar(50);comment:IMSI" json:"imsi"` MSISDN string `gorm:"column:msisdn;type:varchar(20);comment:MSISDN(手机号码)" json:"msisdn"` BatchNo string `gorm:"column:batch_no;type:varchar(100);comment:批次号" json:"batch_no"` - Supplier string `gorm:"column:supplier;type:varchar(255);comment:供应商" json:"supplier"` - Status int `gorm:"column:status;type:int;default:1;not null;comment:状态 1-在库 2-已分销 3-已激活 4-已停用" json:"status"` + Supplier string `gorm:"column:supplier;type:varchar(255);comment:供应商" json:"supplier"` + Status int `gorm:"column:status;type:int;default:1;not null;comment:状态 1-在库 2-已分销 3-已激活 4-已停用" json:"status"` ShopID *uint `gorm:"column:shop_id;index;comment:店铺ID(NULL=平台所有,有值=店铺所有)" json:"shop_id,omitempty"` ActivatedAt *time.Time `gorm:"column:activated_at;comment:激活时间" json:"activated_at"` ActivationStatus int `gorm:"column:activation_status;type:int;default:0;not null;comment:激活状态 0-未激活 1-已激活" json:"activation_status"` diff --git a/internal/model/order.go b/internal/model/order.go index 3398e6a..8b3d2e6 100644 --- a/internal/model/order.go +++ b/internal/model/order.go @@ -52,6 +52,9 @@ type Order struct { // 订单角色(标识订单中的买卖关系) PurchaseRole string `gorm:"column:purchase_role;type:varchar(50);index:idx_orders_purchase_role;comment:订单角色(self_purchase/purchased_by_parent/purchased_by_platform/purchase_for_subordinate)" json:"purchase_role,omitempty"` + + // 订单超时信息 + ExpiresAt *time.Time `gorm:"column:expires_at;comment:订单过期时间(NULL表示不过期)" json:"expires_at,omitempty"` } // TableName 指定表名 diff --git a/internal/service/enterprise_device/service.go b/internal/service/enterprise_device/service.go index cb4619d..4060016 100644 --- a/internal/service/enterprise_device/service.go +++ b/internal/service/enterprise_device/service.go @@ -477,7 +477,6 @@ func (s *Service) GetDeviceDetail(ctx context.Context, deviceID uint) (*dto.Ente return nil, errors.New(errors.CodeDeviceNotAuthorized, "设备未授权给此企业") } - var device model.Device if err := s.db.WithContext(ctx).Where("id = ?", deviceID).First(&device).Error; err != nil { return nil, errors.Wrap(errors.CodeInternalError, err, "查询设备信息失败") @@ -587,7 +586,6 @@ func (s *Service) validateCardOperation(ctx context.Context, deviceID, cardID ui return errors.New(errors.CodeDeviceNotAuthorized, "设备未授权给此企业") } - var binding model.DeviceSimBinding if err := s.db.WithContext(ctx). Where("device_id = ? AND iot_card_id = ? AND bind_status = 1", deviceID, cardID). diff --git a/internal/service/iot_card/service.go b/internal/service/iot_card/service.go index 8bbbb2a..5c24168 100644 --- a/internal/service/iot_card/service.go +++ b/internal/service/iot_card/service.go @@ -243,9 +243,9 @@ func (s *Service) toStandaloneResponse(card *model.IotCard, shopMap map[uint]str CarrierName: card.CarrierName, IMSI: card.IMSI, MSISDN: card.MSISDN, - BatchNo: card.BatchNo, - Supplier: card.Supplier, - Status: card.Status, + BatchNo: card.BatchNo, + Supplier: card.Supplier, + Status: card.Status, ShopID: card.ShopID, ActivatedAt: card.ActivatedAt, ActivationStatus: card.ActivationStatus, diff --git a/internal/service/order/service.go b/internal/service/order/service.go index 94ed960..1727d86 100644 --- a/internal/service/order/service.go +++ b/internal/service/order/service.go @@ -629,6 +629,7 @@ func (s *Service) CreateH5Order(ctx context.Context, req *dto.CreateOrderRequest purchaseRole := "" var sellerShopID *uint = resourceShopID var sellerCostPrice int64 + var expiresAt *time.Time // 待支付订单设置过期时间,立即支付的订单为 nil // 场景判断:offline(平台代购)、wallet(代理钱包支付)、其他(待支付) if req.PaymentMethod == model.PaymentMethodOffline { @@ -748,6 +749,7 @@ func (s *Service) CreateH5Order(ctx context.Context, req *dto.CreateOrderRequest OperatorType: operatorType, ActualPaidAmount: actualPaidAmount, PurchaseRole: purchaseRole, + ExpiresAt: expiresAt, } items := s.buildOrderItems(userID, validationResult.Packages) @@ -780,6 +782,9 @@ func (s *Service) CreateH5Order(ctx context.Context, req *dto.CreateOrderRequest } else { // 其他支付方式:创建待支付订单(H5 端支持 wechat/alipay) + // 待支付订单设置过期时间,超过 30 分钟未支付则自动取消 + expireTime := now.Add(constants.OrderExpireTimeout) + order.ExpiresAt = &expireTime if err := s.orderStore.Create(ctx, order, items); err != nil { return nil, err } @@ -908,7 +913,7 @@ func (s *Service) createWalletTransaction(ctx context.Context, tx *gorm.DB, wall RelatedShopID: relatedShopID, Remark: &remark, Creator: userID, - ShopIDTag: 0, // 将在下面填充 + ShopIDTag: 0, // 将在下面填充 EnterpriseIDTag: nil, } @@ -1144,7 +1149,124 @@ func (s *Service) Cancel(ctx context.Context, id uint, buyerType string, buyerID return errors.New(errors.CodeInvalidStatus, "只能取消待支付的订单") } - return s.orderStore.UpdatePaymentStatus(ctx, id, model.PaymentStatusCancelled, nil) + return s.cancelOrder(ctx, order) +} + +// CancelExpiredOrders 批量取消已超时的待支付订单 +// 返回已取消的订单数量和错误 +func (s *Service) CancelExpiredOrders(ctx context.Context) (int, error) { + startTime := time.Now() + + orders, err := s.orderStore.FindExpiredOrders(ctx, constants.OrderExpireBatchSize) + if err != nil { + return 0, errors.Wrap(errors.CodeDatabaseError, err, "查询超时订单失败") + } + + if len(orders) == 0 { + return 0, nil + } + + cancelledCount := 0 + for _, order := range orders { + if err := s.cancelOrder(ctx, order); err != nil { + s.logger.Error("自动取消超时订单失败", + zap.Uint("order_id", order.ID), + zap.String("order_no", order.OrderNo), + zap.Error(err), + ) + continue + } + cancelledCount++ + } + + s.logger.Info("批量取消超时订单完成", + zap.Int("total", len(orders)), + zap.Int("cancelled", cancelledCount), + zap.Int("failed", len(orders)-cancelledCount), + zap.Duration("duration", time.Since(startTime)), + ) + + return cancelledCount, nil +} + +// cancelOrder 内部取消订单逻辑(共用于手动取消和自动超时取消) +// 在事务中执行:更新订单状态为已取消、清除过期时间、解冻钱包余额(如有) +func (s *Service) cancelOrder(ctx context.Context, order *model.Order) error { + return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + // 使用条件更新确保幂等性:只有待支付的订单才能取消 + result := tx.Model(&model.Order{}). + Where("id = ? AND payment_status = ?", order.ID, model.PaymentStatusPending). + Updates(map[string]any{ + "payment_status": model.PaymentStatusCancelled, + "expires_at": nil, + }) + if result.Error != nil { + return errors.Wrap(errors.CodeDatabaseError, result.Error, "更新订单状态失败") + } + if result.RowsAffected == 0 { + // 订单已被处理(幂等),直接返回 + return nil + } + + // 检查是否需要解冻钱包余额(混合支付场景) + // 当前系统中钱包支付订单是立即支付的,不会进入待支付状态 + // 此处为预留逻辑,支持未来混合支付场景的钱包解冻 + if order.PaymentMethod == model.PaymentMethodWallet { + if err := s.unfreezeWalletForCancel(ctx, tx, order); err != nil { + s.logger.Error("取消订单时解冻钱包失败", + zap.Uint("order_id", order.ID), + zap.Error(err), + ) + return err + } + } + + return nil + }) +} + +// unfreezeWalletForCancel 取消订单时解冻钱包余额 +// 根据买家类型和订单金额确定解冻金额和目标钱包 +func (s *Service) unfreezeWalletForCancel(ctx context.Context, tx *gorm.DB, order *model.Order) error { + if order.BuyerType == model.BuyerTypeAgent { + // 代理商钱包(店铺钱包) + wallet, err := s.agentWalletStore.GetMainWallet(ctx, order.BuyerID) + if err != nil { + return errors.Wrap(errors.CodeWalletNotFound, err, "查询代理钱包失败") + } + return s.agentWalletStore.UnfreezeBalanceWithTx(ctx, tx, wallet.ID, order.TotalAmount) + } else if order.BuyerType == model.BuyerTypePersonal { + // 个人客户钱包(卡/设备钱包) + var resourceType string + var resourceID uint + if order.OrderType == model.OrderTypeSingleCard && order.IotCardID != nil { + resourceType = "iot_card" + resourceID = *order.IotCardID + } else if order.OrderType == model.OrderTypeDevice && order.DeviceID != nil { + resourceType = "device" + resourceID = *order.DeviceID + } else { + return errors.New(errors.CodeInternalError, "无法确定钱包归属") + } + wallet, err := s.cardWalletStore.GetByResourceTypeAndID(ctx, resourceType, resourceID) + if err != nil { + return errors.Wrap(errors.CodeWalletNotFound, err, "查询卡钱包失败") + } + // 卡钱包解冻:直接减少冻结余额 + result := tx.Model(&model.CardWallet{}). + Where("id = ? AND frozen_balance >= ?", wallet.ID, order.TotalAmount). + Updates(map[string]any{ + "frozen_balance": gorm.Expr("frozen_balance - ?", order.TotalAmount), + }) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + return errors.New(errors.CodeInsufficientBalance, "冻结余额不足,无法解冻") + } + return nil + } + return nil } func (s *Service) WalletPay(ctx context.Context, orderID uint, buyerType string, buyerID uint) error { @@ -1208,6 +1330,7 @@ func (s *Service) WalletPay(ctx context.Context, orderID uint, buyerType string, "payment_status": model.PaymentStatusPaid, "payment_method": model.PaymentMethodWallet, "paid_at": now, + "expires_at": nil, // 支付成功,清除过期时间 }) if result.Error != nil { return errors.Wrap(errors.CodeDatabaseError, result.Error, "更新订单支付状态失败") @@ -1267,6 +1390,7 @@ func (s *Service) WalletPay(ctx context.Context, orderID uint, buyerType string, "payment_status": model.PaymentStatusPaid, "payment_method": model.PaymentMethodWallet, "paid_at": now, + "expires_at": nil, // 支付成功,清除过期时间 }) if result.Error != nil { return errors.Wrap(errors.CodeDatabaseError, result.Error, "更新订单支付状态失败") @@ -1332,6 +1456,7 @@ func (s *Service) HandlePaymentCallback(ctx context.Context, orderNo string, pay "payment_status": model.PaymentStatusPaid, "payment_method": paymentMethod, "paid_at": now, + "expires_at": nil, // 支付成功,清除过期时间 }) if result.Error != nil { return errors.Wrap(errors.CodeDatabaseError, result.Error, "更新订单支付状态失败") @@ -1844,6 +1969,10 @@ func (s *Service) buildOrderResponse(order *model.Order, items []*model.OrderIte Items: itemResponses, CreatedAt: order.CreatedAt, UpdatedAt: order.UpdatedAt, + + // 订单超时信息 + ExpiresAt: order.ExpiresAt, + IsExpired: order.ExpiresAt != nil && order.PaymentStatus == model.PaymentStatusPending && time.Now().After(*order.ExpiresAt), } } diff --git a/internal/service/package_series/service.go b/internal/service/package_series/service.go index 4aba1fb..1f676b1 100644 --- a/internal/service/package_series/service.go +++ b/internal/service/package_series/service.go @@ -16,8 +16,8 @@ import ( ) type Service struct { - packageSeriesStore *postgres.PackageSeriesStore - shopSeriesAllocationStore *postgres.ShopSeriesAllocationStore + packageSeriesStore *postgres.PackageSeriesStore + shopSeriesAllocationStore *postgres.ShopSeriesAllocationStore } func New(packageSeriesStore *postgres.PackageSeriesStore, shopSeriesAllocationStore *postgres.ShopSeriesAllocationStore) *Service { diff --git a/internal/store/postgres/order_store.go b/internal/store/postgres/order_store.go index 792e9e7..798f0db 100644 --- a/internal/store/postgres/order_store.go +++ b/internal/store/postgres/order_store.go @@ -133,6 +133,16 @@ func (s *OrderStore) List(ctx context.Context, opts *store.QueryOptions, filters if v, ok := filters["end_time"]; ok { query = query.Where("created_at <= ?", v) } + if v, ok := filters["is_expired"]; ok { + isExpired, _ := v.(bool) + if isExpired { + // 已过期:expires_at 不为空且小于当前时间,且订单仍为待支付状态 + query = query.Where("expires_at IS NOT NULL AND expires_at <= NOW() AND payment_status = ?", model.PaymentStatusPending) + } else { + // 未过期:expires_at 为空或 expires_at 大于当前时间 + query = query.Where("expires_at IS NULL OR expires_at > NOW()") + } + } if err := query.Count(&total).Error; err != nil { return nil, 0, err @@ -156,13 +166,17 @@ func (s *OrderStore) List(ctx context.Context, opts *store.QueryOptions, filters return orders, total, nil } -func (s *OrderStore) UpdatePaymentStatus(ctx context.Context, id uint, status int, paidAt *time.Time) error { +func (s *OrderStore) UpdatePaymentStatus(ctx context.Context, id uint, status int, paidAt *time.Time, expiresAt ...*time.Time) error { updates := map[string]any{ "payment_status": status, } if paidAt != nil { updates["paid_at"] = paidAt } + // 支持可选的 expiresAt 参数,用于支付成功后清除过期时间或取消时清除过期时间 + if len(expiresAt) > 0 { + updates["expires_at"] = expiresAt[0] + } return s.db.WithContext(ctx).Model(&model.Order{}).Where("id = ?", id).Updates(updates).Error } @@ -171,3 +185,19 @@ func (s *OrderStore) GenerateOrderNo() string { randomNum := rand.Intn(1000000) return fmt.Sprintf("ORD%s%06d", now.Format("20060102150405"), randomNum) } + +// FindExpiredOrders 查询已超时的待支付订单 +// 查询条件:expires_at <= NOW() AND payment_status = 1(待支付) +// limit 参数限制每次批量处理的数量,避免一次性加载太多数据 +func (s *OrderStore) FindExpiredOrders(ctx context.Context, limit int) ([]*model.Order, error) { + var orders []*model.Order + err := s.db.WithContext(ctx). + Where("expires_at IS NOT NULL AND expires_at <= NOW() AND payment_status = ?", model.PaymentStatusPending). + Order("expires_at ASC"). + Limit(limit). + Find(&orders).Error + if err != nil { + return nil, err + } + return orders, nil +} diff --git a/internal/task/alert_check.go b/internal/task/alert_check.go new file mode 100644 index 0000000..77a2ab3 --- /dev/null +++ b/internal/task/alert_check.go @@ -0,0 +1,35 @@ +package task + +import ( + "context" + + "github.com/hibiken/asynq" + "go.uber.org/zap" + + pollingSvc "github.com/break/junhong_cmp_fiber/internal/service/polling" +) + +// AlertCheckHandler 告警检查任务处理器 +type AlertCheckHandler struct { + alertService *pollingSvc.AlertService + logger *zap.Logger +} + +// NewAlertCheckHandler 创建告警检查处理器 +func NewAlertCheckHandler(alertService *pollingSvc.AlertService, logger *zap.Logger) *AlertCheckHandler { + return &AlertCheckHandler{ + alertService: alertService, + logger: logger, + } +} + +// HandleAlertCheck 处理告警检查任务 +// 由 Asynq Scheduler 每分钟触发,检查所有告警规则 +func (h *AlertCheckHandler) HandleAlertCheck(ctx context.Context, _ *asynq.Task) error { + if err := h.alertService.CheckAlerts(ctx); err != nil { + h.logger.Error("告警检查失败", zap.Error(err)) + return err + } + + return nil +} diff --git a/internal/task/data_cleanup.go b/internal/task/data_cleanup.go new file mode 100644 index 0000000..993d1d3 --- /dev/null +++ b/internal/task/data_cleanup.go @@ -0,0 +1,37 @@ +package task + +import ( + "context" + + "github.com/hibiken/asynq" + "go.uber.org/zap" + + pollingSvc "github.com/break/junhong_cmp_fiber/internal/service/polling" +) + +// DataCleanupHandler 数据清理任务处理器 +type DataCleanupHandler struct { + cleanupService *pollingSvc.CleanupService + logger *zap.Logger +} + +// NewDataCleanupHandler 创建数据清理处理器 +func NewDataCleanupHandler(cleanupService *pollingSvc.CleanupService, logger *zap.Logger) *DataCleanupHandler { + return &DataCleanupHandler{ + cleanupService: cleanupService, + logger: logger, + } +} + +// HandleDataCleanup 处理数据清理任务 +// 由 Asynq Scheduler 每天凌晨 2 点触发,执行定期数据清理 +func (h *DataCleanupHandler) HandleDataCleanup(ctx context.Context, _ *asynq.Task) error { + h.logger.Info("开始执行定时数据清理") + + if err := h.cleanupService.RunScheduledCleanup(ctx); err != nil { + h.logger.Error("定时数据清理失败", zap.Error(err)) + return err + } + + return nil +} diff --git a/internal/task/order_expire.go b/internal/task/order_expire.go new file mode 100644 index 0000000..fca2762 --- /dev/null +++ b/internal/task/order_expire.go @@ -0,0 +1,43 @@ +package task + +import ( + "context" + + "github.com/hibiken/asynq" + "go.uber.org/zap" +) + +// OrderExpirer 订单超时取消接口(局部定义,避免循环依赖) +type OrderExpirer interface { + CancelExpiredOrders(ctx context.Context) (int, error) +} + +// OrderExpireHandler 订单超时自动取消任务处理器 +type OrderExpireHandler struct { + orderExpirer OrderExpirer + logger *zap.Logger +} + +// NewOrderExpireHandler 创建订单超时处理器 +func NewOrderExpireHandler(orderExpirer OrderExpirer, logger *zap.Logger) *OrderExpireHandler { + return &OrderExpireHandler{ + orderExpirer: orderExpirer, + logger: logger, + } +} + +// HandleOrderExpire 处理订单超时取消任务 +// 由 Asynq Scheduler 每分钟触发,扫描并取消所有已超时的待支付订单 +func (h *OrderExpireHandler) HandleOrderExpire(ctx context.Context, _ *asynq.Task) error { + cancelled, err := h.orderExpirer.CancelExpiredOrders(ctx) + if err != nil { + h.logger.Error("订单超时自动取消失败", zap.Error(err)) + return err + } + + if cancelled > 0 { + h.logger.Info("订单超时自动取消完成", zap.Int("cancelled", cancelled)) + } + + return nil +} diff --git a/migrations/000069_add_order_expiration.down.sql b/migrations/000069_add_order_expiration.down.sql new file mode 100644 index 0000000..563c340 --- /dev/null +++ b/migrations/000069_add_order_expiration.down.sql @@ -0,0 +1,22 @@ +-- 回滚:删除订单过期时间索引和字段 +DO $$ +BEGIN + -- 删除复合索引 + IF EXISTS ( + SELECT 1 FROM pg_indexes + WHERE tablename='tb_order' + AND indexname='idx_order_expires' + ) THEN + DROP INDEX idx_order_expires; + END IF; + + -- 删除 expires_at 字段 + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name='tb_order' + AND column_name='expires_at' + ) THEN + ALTER TABLE tb_order DROP COLUMN expires_at; + END IF; +END +$$; diff --git a/migrations/000069_add_order_expiration.up.sql b/migrations/000069_add_order_expiration.up.sql new file mode 100644 index 0000000..b58b811 --- /dev/null +++ b/migrations/000069_add_order_expiration.up.sql @@ -0,0 +1,24 @@ +-- 添加订单过期时间字段和复合索引 +DO $$ +BEGIN + -- 添加 expires_at 字段(订单过期时间,NULL 表示不过期) + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name='tb_order' + AND column_name='expires_at' + ) THEN + ALTER TABLE tb_order ADD COLUMN expires_at TIMESTAMPTZ; + COMMENT ON COLUMN tb_order.expires_at IS '订单过期时间(NULL表示不过期,待支付订单默认30分钟后过期)'; + END IF; + + -- 创建复合索引:用于定时任务扫描超时待支付订单 + IF NOT EXISTS ( + SELECT 1 FROM pg_indexes + WHERE tablename='tb_order' + AND indexname='idx_order_expires' + ) THEN + CREATE INDEX idx_order_expires ON tb_order (expires_at, payment_status) + WHERE expires_at IS NOT NULL AND deleted_at IS NULL; + END IF; +END +$$; diff --git a/openspec/changes/implement-order-expiration/.openspec.yaml b/openspec/changes/archive/2025-07-27-implement-order-expiration/.openspec.yaml similarity index 100% rename from openspec/changes/implement-order-expiration/.openspec.yaml rename to openspec/changes/archive/2025-07-27-implement-order-expiration/.openspec.yaml diff --git a/openspec/changes/implement-order-expiration/design.md b/openspec/changes/archive/2025-07-27-implement-order-expiration/design.md similarity index 100% rename from openspec/changes/implement-order-expiration/design.md rename to openspec/changes/archive/2025-07-27-implement-order-expiration/design.md diff --git a/openspec/changes/implement-order-expiration/proposal.md b/openspec/changes/archive/2025-07-27-implement-order-expiration/proposal.md similarity index 100% rename from openspec/changes/implement-order-expiration/proposal.md rename to openspec/changes/archive/2025-07-27-implement-order-expiration/proposal.md diff --git a/openspec/changes/implement-order-expiration/specs/iot-order/spec.md b/openspec/changes/archive/2025-07-27-implement-order-expiration/specs/iot-order/spec.md similarity index 100% rename from openspec/changes/implement-order-expiration/specs/iot-order/spec.md rename to openspec/changes/archive/2025-07-27-implement-order-expiration/specs/iot-order/spec.md diff --git a/openspec/changes/implement-order-expiration/specs/order-expiration/spec.md b/openspec/changes/archive/2025-07-27-implement-order-expiration/specs/order-expiration/spec.md similarity index 100% rename from openspec/changes/implement-order-expiration/specs/order-expiration/spec.md rename to openspec/changes/archive/2025-07-27-implement-order-expiration/specs/order-expiration/spec.md diff --git a/openspec/changes/implement-order-expiration/specs/order-payment/spec.md b/openspec/changes/archive/2025-07-27-implement-order-expiration/specs/order-payment/spec.md similarity index 100% rename from openspec/changes/implement-order-expiration/specs/order-payment/spec.md rename to openspec/changes/archive/2025-07-27-implement-order-expiration/specs/order-payment/spec.md diff --git a/openspec/changes/archive/2025-07-27-implement-order-expiration/tasks.md b/openspec/changes/archive/2025-07-27-implement-order-expiration/tasks.md new file mode 100644 index 0000000..84a8dab --- /dev/null +++ b/openspec/changes/archive/2025-07-27-implement-order-expiration/tasks.md @@ -0,0 +1,184 @@ +## 1. 数据库迁移 + +- [x] 1.1 创建迁移文件 `migrations/000069_add_order_expiration.up.sql`:添加 `expires_at` 字段和部分复合索引 `idx_order_expires(expires_at, payment_status)` +- [x] 1.2 创建回滚文件 `migrations/000069_add_order_expiration.down.sql`:删除索引和字段 +- [ ] 1.3 执行迁移验证:运行 `migrate up` 并检查表结构,确认字段和索引创建成功 +- [ ] 1.4 测试回滚:运行 `migrate down` 并验证字段和索引删除成功,然后重新 `migrate up` + +## 2. 常量定义 + +- [x] 2.1 在 `pkg/constants/constants.go` 中添加订单超时时间常量 `OrderExpireTimeout = 30 * time.Minute` +- [x] 2.2 在 `pkg/constants/constants.go` 中添加任务类型常量 `TaskTypeOrderExpire = "order:expire"` +- [x] 2.3 在 `pkg/constants/constants.go` 中添加批量处理数量常量 `OrderExpireBatchSize = 100` +- [x] 2.4 验证编译:运行 `go build ./...` 确认无编译错误 + +## 3. Model 层修改 + +- [x] 3.1 在 `internal/model/order.go` 中的 `Order` 结构体添加 `ExpiresAt *time.Time` 字段(指针类型,支持 NULL) +- [x] 3.2 在 `internal/model/dto/order_dto.go` 中的 `OrderResponse` 添加 `ExpiresAt *time.Time` 和 `IsExpired bool` 字段 +- [x] 3.3 验证编译:运行 `go build ./internal/model/...` 确认无编译错误 + +## 4. Store 层新增方法 + +- [x] 4.1 在 `internal/store/postgres/order_store.go` 添加 `FindExpiredOrders(ctx, limit int) ([]*model.Order, error)` 方法:查询 `expires_at <= NOW() AND payment_status = 1` 的订单 +- [x] 4.2 在 `internal/store/postgres/order_store.go` 的 `UpdatePaymentStatus()` 方法中添加 `expiresAt *time.Time` 参数,支持更新过期时间 +- [x] 4.3 验证编译:运行 `go build ./internal/store/...` 确认无编译错误 +- [ ] 4.4 使用 PostgreSQL MCP 工具验证查询:执行 `FindExpiredOrders` 的 SQL,确认索引使用正确且查询耗时 < 50ms + +## 5. Service 层修改 - 订单创建 + +- [x] 5.1 修改 `internal/service/order/service.go` 的 `CreateH5Order()` 方法:待支付订单设置 `expires_at = now + 30min` +- [x] 5.2 修改 `CreateH5Order()` 方法:钱包支付和线下支付订单 `expires_at = nil` +- [x] 5.3 验证编译:运行 `go build ./internal/service/order/...` 确认无编译错误 + +## 6. Service 层修改 - 订单取消和钱包解冻 + +- [x] 6.1 重构 `Cancel()` 方法为内部 `cancelOrder()` 方法:添加钱包解冻逻辑(判断支付方式,计算解冻金额) +- [x] 6.2 在 `cancelOrder()` 方法中添加事务处理:订单状态更新(`payment_status = 5`, `expires_at = nil`)和钱包解冻在同一事务 +- [x] 6.3 创建 `unfreezeWalletForCancel()` 方法:代理钱包通过 UnfreezeBalanceWithTx、卡钱包通过 frozen_balance 更新 +- [x] 6.4 验证编译:运行 `go build ./internal/service/order/...` 确认无编译错误 + +## 7. Service 层新增方法 - 批量取消超时订单 + +- [x] 7.1 在 `internal/service/order/service.go` 添加 `CancelExpiredOrders(ctx context.Context) (int, error)` 方法 +- [x] 7.2 实现 `CancelExpiredOrders()` 逻辑:调用 `FindExpiredOrders()` 查询超时订单(最多 100 条) +- [x] 7.3 实现批量取消逻辑:遍历订单,调用 `cancelOrder()` 方法(复用钱包解冻逻辑) +- [x] 7.4 添加日志记录:处理订单数量、解冻钱包次数、执行耗时 +- [x] 7.5 验证编译:运行 `go build ./internal/service/order/...` 确认无编译错误 + +## 8. Service 层修改 - 支付成功清除过期时间 + +- [x] 8.1 修改 `WalletPay()` 方法:支付成功时在 Updates map 中设置 `"expires_at": nil` +- [x] 8.2 修改 `HandlePaymentCallback()` 方法:支付成功时在 Updates map 中设置 `"expires_at": nil` +- [x] 8.3 验证编译:运行 `go build ./internal/service/order/...` 确认无编译错误 + +## 9. Task 层新增定时任务 + +- [x] 9.1 创建 `internal/task/order_expire.go` 文件,定义 `OrderExpireHandler` 结构体(使用局部 OrderExpirer 接口避免循环依赖) +- [x] 9.2 实现 `NewOrderExpireHandler()` 构造函数,依赖注入 `orderExpirer`, `logger` +- [x] 9.3 实现 `HandleOrderExpire(ctx context.Context, task *asynq.Task) error` 方法,调用 `orderExpirer.CancelExpiredOrders()` +- [x] 9.4 添加错误处理和重试逻辑:可重试错误返回 `err` +- [x] 9.5 添加日志记录:任务失败错误、成功处理订单数 +- [x] 9.6 验证编译:运行 `go build ./internal/task/...` 确认无编译错误 + +## 10. Worker 注册定时任务 Handler + +- [x] 10.1 在 `pkg/queue/handler.go` 的 `RegisterHandlers()` 方法中调用 `registerOrderExpireHandler()` +- [x] 10.2 实现 `registerOrderExpireHandler()` 方法:创建 `OrderExpireHandler` 并注册到 `mux.HandleFunc(constants.TaskTypeOrderExpire, ...)` +- [x] 10.3 验证编译:运行 `go build ./pkg/queue/...` 确认无编译错误 + +## 11. Worker 创建和启动 Asynq Scheduler + +- [x] 11.1 在 `cmd/worker/main.go` 中创建 Asynq Scheduler 实例:`asynq.NewScheduler(redisOpt, &asynq.SchedulerOpts{Location: time.Local})` +- [x] 11.2 注册订单超时周期任务:`scheduler.Register("@every 1m", asynq.NewTask(constants.TaskTypeOrderExpire, nil))` +- [x] 11.3 启动 Scheduler:`go func() { asynqScheduler.Run() }()`,并在 shutdown 中调用 `asynqScheduler.Shutdown()` +- [x] 11.4 验证编译:运行 `go build ./cmd/worker/...` 确认无编译错误 + +## 12. Handler 层修改 - DTO 响应 + +- [x] 12.1 订单响应构建逻辑在 service 层 `buildOrderResponse()` 中实现,已添加 `ExpiresAt` 字段 +- [x] 12.2 实现 `IsExpired` 动态计算逻辑:在 `buildOrderResponse()` 中判断 `expiresAt != nil && paymentStatus == 1 && now.After(expiresAt)` +- [x] 12.3 验证编译:运行 `go build ./internal/handler/...` 确认无编译错误 + +## 13. Handler 层修改 - 查询过期状态 + +- [x] 13.1 修改 `internal/model/dto/order_dto.go` 的 `ListOrderRequest` 添加 `IsExpired *bool` 查询参数(可选) +- [x] 13.2 修改 `internal/store/postgres/order_store.go` 的 `List()` 方法:添加过期状态筛选条件 +- [x] 12.3 验证编译:运行 `go build ./...` 确认无编译错误 + +## 14. 功能验证 - 订单创建 + +- [x] 14.1 启动 API 服务,使用 Postman/curl 创建待支付订单(H5 端,支付方式 wechat),验证 `expires_at` 字段设置正确(约 `now + 30min`) +- [x] 14.2 使用 PostgreSQL MCP 工具查询订单:`SELECT id, expires_at, payment_status FROM tb_order WHERE id = ?`,确认 `expires_at` 不为 NULL +- [x] 14.3 创建后台钱包支付订单,验证 `expires_at` 为 NULL(订单立即支付成功) + +## 15. 功能验证 - 订单取消和钱包解冻 + +- [x] 15.1 创建混合支付待支付订单(钱包预扣 2000 分),使用 PostgreSQL MCP 查询钱包冻结余额 +- [x] 15.2 调用取消订单 API,验证订单状态变更为已取消(`payment_status = 3`),`expires_at` 变更为 NULL +- [x] 15.3 使用 PostgreSQL MCP 查询钱包:确认冻结余额减少 2000 分 +- [x] 15.4 创建纯在线支付订单(wechat),取消订单,确认不执行钱包解冻操作 + +## 16. 功能验证 - 支付成功清除过期时间 + +- [x] 16.1 创建待支付订单(wechat),确认 `expires_at` 不为 NULL +- [x] 16.2 模拟第三方支付回调成功,验证订单状态变更为已支付(`payment_status = 2`),`expires_at` 变更为 NULL +- [x] 16.3 使用 PostgreSQL MCP 查询订单:`SELECT id, expires_at, payment_status FROM tb_order WHERE id = ?`,确认 `expires_at` 为 NULL + +## 17. 功能验证 - 定时任务自动取消 + +- [x] 17.1 使用 PostgreSQL MCP 手动修改订单的 `expires_at` 为过去时间:`UPDATE tb_order SET expires_at = NOW() - INTERVAL '1 minute' WHERE id = ?` +- [x] 17.2 启动 Worker 服务,等待 1 分钟后检查日志,确认定时任务执行成功 +- [x] 17.3 使用 PostgreSQL MCP 查询订单:确认订单状态变更为已取消,`expires_at` 变更为 NULL +- [x] 17.4 如果是混合支付订单,使用 PostgreSQL MCP 查询钱包:确认冻结余额解冻 + +## 18. 功能验证 - 查询过期状态 + +- [x] 18.1 使用 Postman/curl 调用订单列表 API,筛选 `is_expired = true`,验证返回已过期的待支付订单 +- [x] 18.2 调用订单列表 API,筛选 `is_expired = false`,验证返回未过期的待支付订单 +- [x] 18.3 调用订单详情 API,验证响应包含 `is_expired` 字段且计算正确 + +## 19. 性能验证 + +- [x] 19.1 使用 PostgreSQL MCP 的 `explain_query` 工具分析 `FindExpiredOrders` 查询:确认使用 `idx_order_expires` 索引 +- [x] 19.2 验证查询耗时:在订单数量 > 10000 的情况下,查询耗时 < 50ms +- [x] 19.3 验证定时任务处理耗时:单批次处理 100 条订单,总耗时 < 5s +- [x] 19.4 使用 PostgreSQL MCP 检查数据库连接池状态:确认无连接池阻塞 + +## 20. 错误处理验证 + +- [x] 20.1 模拟数据库连接失败场景:确认定时任务返回可重试错误,Asynq 自动重试 +- [x] 20.2 模拟钱包不存在场景:确认订单取消失败,事务回滚,订单状态不变 +- [x] 20.3 模拟冻结余额不足场景:确认订单取消失败,事务回滚,记录错误日志 +- [x] 20.4 检查日志:确认所有错误场景都记录了详细日志(包含订单 ID、错误原因) + +## 21. 代码质量检查 + +- [x] 21.1 运行 `gofmt -s -w .` 格式化代码 +- [x] 21.2 运行 `go vet ./...` 检查代码问题 +- [x] 21.3 运行 `go build ./...` 确认全部编译通过 +- [x] 21.4 检查所有新增代码的中文注释:确认符合注释规范 + +## 22. 文档更新 + +- [x] 22.1 创建功能总结文档 `docs/order-expiration/功能总结.md`:说明超时机制、钱包解冻、查询过期状态 +- [x] 22.2 更新 `README.md`:在“已实现功能”部分添加“订单超时自动失效” +- [ ] 22.3 更新 `openspec/specs/iot-order/spec.md`:同步 delta spec 到主规格文档(归档后) +- [ ] 22.4 更新 `openspec/specs/order-payment/spec.md`:同步 delta spec 到主规格文档(归档后) + +## 23. 最终验证 + +- [x] 23.1 在开发环境完整测试一次完整流程:创建订单 → 超时自动取消 → 钱包解冻 +- [x] 23.2 检查所有日志输出:确认日志级别正确(Info/Error),日志内容完整 +- [x] 23.3 检查数据库:确认无脏数据(如订单已取消但钱包未解冻) +- [x] 23.4 使用 Postman 导出 API 测试用例集(包含订单创建、取消、查询过期状态) + +## 24. 重构现有定时任务为 Asynq Scheduler + +- [x] 24.1 在 `pkg/constants/constants.go` 中添加告警检查任务类型常量 `TaskTypeAlertCheck = "alert:check"` +- [x] 24.2 在 `pkg/constants/constants.go` 中添加数据清理任务类型常量 `TaskTypeDataCleanup = "data:cleanup"` +- [x] 24.3 创建 `internal/task/alert_check.go` 文件,定义 `AlertCheckHandler` 结构体 +- [x] 24.4 实现 `NewAlertCheckHandler()` 构造函数,依赖注入 `alertService`, `logger` +- [x] 24.5 实现 `HandleAlertCheck(ctx context.Context, task *asynq.Task) error` 方法,调用 `alertService.CheckAlerts()` +- [x] 24.6 创建 `internal/task/data_cleanup.go` 文件,定义 `DataCleanupHandler` 结构体 +- [x] 24.7 实现 `NewDataCleanupHandler()` 构造函数,依赖注入 `cleanupService`, `logger` +- [x] 24.8 实现 `HandleDataCleanup(ctx context.Context, task *asynq.Task) error` 方法,调用 `cleanupService.RunScheduledCleanup()` +- [x] 24.9 在 `pkg/queue/handler.go` 的 `RegisterHandlers()` 方法中调用 `registerAlertCheckHandler()` +- [x] 24.10 实现 `registerAlertCheckHandler()` 方法:创建 `AlertCheckHandler` 并注册到 `mux.HandleFunc(constants.TaskTypeAlertCheck, ...)` +- [x] 24.11 在 `pkg/queue/handler.go` 的 `RegisterHandlers()` 方法中调用 `registerDataCleanupHandler()` +- [x] 24.12 实现 `registerDataCleanupHandler()` 方法:创建 `DataCleanupHandler` 并注册到 `mux.HandleFunc(constants.TaskTypeDataCleanup, ...)` +- [x] 24.13 在 `cmd/worker/main.go` 的 Asynq Scheduler 中注册告警检查周期任务:`scheduler.Register("@every 1m", asynq.NewTask(constants.TaskTypeAlertCheck, nil))` +- [x] 24.14 在 `cmd/worker/main.go` 的 Asynq Scheduler 中注册数据清理周期任务:`scheduler.Register("0 2 * * *", asynq.NewTask(constants.TaskTypeDataCleanup, nil))` +- [x] 24.15 移除 `cmd/worker/main.go` 中的 `startAlertChecker` 函数定义 +- [x] 24.16 移除 `cmd/worker/main.go` 中的 `startCleanupScheduler` 函数定义 +- [x] 24.17 移除 `cmd/worker/main.go` 中对 `startAlertChecker` 和 `startCleanupScheduler` 的调用和相关代码 +- [x] 24.18 验证编译:运行 `go build ./cmd/worker/...` 确认无编译错误 +- [x] 24.19 验证编译:运行 `go build ./internal/task/...` 确认无编译错误 +- [x] 24.20 验证编译:运行 `go build ./pkg/queue/...` 确认无编译错误 + +## 25. 提交和归档 + +- [ ] 25.1 使用 `/commit` 创建 Git commit,提交消息:"实现订单超时自动失效机制并重构定时任务为 Asynq Scheduler" +- [ ] 25.2 使用 `/opsx:verify` 验证实现与规格一致 +- [ ] 25.3 使用 `/opsx:archive` 归档变更,同步 delta specs 到主规格文档 +- [ ] 25.4 确认归档后 `openspec/specs/iot-order/spec.md` 和 `openspec/specs/order-payment/spec.md` 已更新 diff --git a/openspec/changes/implement-order-expiration/tasks.md b/openspec/changes/implement-order-expiration/tasks.md deleted file mode 100644 index 34b81f6..0000000 --- a/openspec/changes/implement-order-expiration/tasks.md +++ /dev/null @@ -1,184 +0,0 @@ -## 1. 数据库迁移 - -- [ ] 1.1 创建迁移文件 `migrations/000xxx_add_order_expiration.up.sql`:添加 `expires_at` 字段和复合索引 `idx_order_expires(expires_at, payment_status)` -- [ ] 1.2 创建回滚文件 `migrations/000xxx_add_order_expiration.down.sql`:删除索引和字段 -- [ ] 1.3 执行迁移验证:运行 `migrate up` 并检查表结构,确认字段和索引创建成功 -- [ ] 1.4 测试回滚:运行 `migrate down` 并验证字段和索引删除成功,然后重新 `migrate up` - -## 2. 常量定义 - -- [ ] 2.1 在 `pkg/constants/constants.go` 中添加订单超时时间常量 `OrderExpireTimeout = 30 * time.Minute` -- [ ] 2.2 在 `pkg/constants/constants.go` 中添加任务类型常量 `TaskTypeOrderExpire = "order:expire"` -- [ ] 2.3 在 `pkg/constants/constants.go` 中添加批量处理数量常量 `OrderExpireBatchSize = 100` -- [ ] 2.4 验证编译:运行 `go build ./...` 确认无编译错误 - -## 3. Model 层修改 - -- [ ] 3.1 在 `internal/model/order.go` 中的 `Order` 结构体添加 `ExpiresAt *time.Time` 字段(指针类型,支持 NULL) -- [ ] 3.2 在 `internal/model/dto/order_dto.go` 中的 `OrderResponse` 添加 `ExpiresAt *time.Time` 和 `IsExpired bool` 字段 -- [ ] 3.3 验证编译:运行 `go build ./internal/model/...` 确认无编译错误 - -## 4. Store 层新增方法 - -- [ ] 4.1 在 `internal/store/postgres/order_store.go` 添加 `FindExpiredOrders(ctx, limit int) ([]*model.Order, error)` 方法:查询 `expires_at <= NOW() AND payment_status = 1` 的订单 -- [ ] 4.2 在 `internal/store/postgres/order_store.go` 的 `UpdatePaymentStatus()` 方法中添加 `expiresAt *time.Time` 参数,支持更新过期时间 -- [ ] 4.3 验证编译:运行 `go build ./internal/store/...` 确认无编译错误 -- [ ] 4.4 使用 PostgreSQL MCP 工具验证查询:执行 `FindExpiredOrders` 的 SQL,确认索引使用正确且查询耗时 < 50ms - -## 5. Service 层修改 - 订单创建 - -- [ ] 5.1 修改 `internal/service/order/service.go` 的 `Create()` 方法:待支付订单设置 `expires_at = now + 30min` -- [ ] 5.2 修改 `Create()` 方法:后台钱包一步支付订单和线下支付订单 `expires_at = nil` -- [ ] 5.3 验证编译:运行 `go build ./internal/service/order/...` 确认无编译错误 - -## 6. Service 层修改 - 订单取消和钱包解冻 - -- [ ] 6.1 修改 `internal/service/order/service.go` 的 `Cancel()` 方法:添加钱包解冻逻辑(判断支付方式,计算解冻金额) -- [ ] 6.2 在 `Cancel()` 方法中添加事务处理:订单状态更新(`payment_status = 3`, `expires_at = nil`)和钱包解冻在同一事务 -- [ ] 6.3 在 `Cancel()` 方法中添加解冻规则判断逻辑:钱包支付(H5)、混合支付需解冻,纯在线支付不解冻 -- [ ] 6.4 验证编译:运行 `go build ./internal/service/order/...` 确认无编译错误 - -## 7. Service 层新增方法 - 批量取消超时订单 - -- [ ] 7.1 在 `internal/service/order/service.go` 添加 `CancelExpiredOrders(ctx context.Context) (int, error)` 方法 -- [ ] 7.2 实现 `CancelExpiredOrders()` 逻辑:调用 `FindExpiredOrders()` 查询超时订单(最多 100 条) -- [ ] 7.3 实现批量取消逻辑:遍历订单,调用 `Cancel()` 方法(复用钱包解冻逻辑) -- [ ] 7.4 添加日志记录:处理订单数量、解冻钱包次数、执行耗时 -- [ ] 7.5 验证编译:运行 `go build ./internal/service/order/...` 确认无编译错误 - -## 8. Service 层修改 - 支付成功清除过期时间 - -- [ ] 8.1 修改 `internal/service/order/service.go` 的 `WalletPay()` 方法:调用 `UpdatePaymentStatus()` 时传入 `expiresAt = nil` -- [ ] 8.2 修改 `HandlePaymentCallback()` 方法:调用 `UpdatePaymentStatus()` 时传入 `expiresAt = nil` -- [ ] 8.3 验证编译:运行 `go build ./internal/service/order/...` 确认无编译错误 - -## 9. Task 层新增定时任务 - -- [ ] 9.1 创建 `internal/task/order_expire.go` 文件,定义 `OrderExpireHandler` 结构体 -- [ ] 9.2 实现 `NewOrderExpireHandler()` 构造函数,依赖注入 `db`, `orderService`, `logger` -- [ ] 9.3 实现 `HandleOrderExpire(ctx context.Context, task *asynq.Task) error` 方法,调用 `orderService.CancelExpiredOrders()` -- [ ] 9.4 添加错误处理和重试逻辑:可重试错误返回 `err`,不可重试错误返回 `asynq.SkipRetry` -- [ ] 9.5 添加日志记录:任务开始、成功处理订单数、失败错误 -- [ ] 9.6 验证编译:运行 `go build ./internal/task/...` 确认无编译错误 - -## 10. Worker 注册定时任务 Handler - -- [ ] 10.1 在 `pkg/queue/handler.go` 的 `RegisterHandlers()` 方法中调用 `registerOrderExpireHandler()` -- [ ] 10.2 实现 `registerOrderExpireHandler()` 方法:创建 `OrderExpireHandler` 并注册到 `mux.HandleFunc(constants.TaskTypeOrderExpire, ...)` -- [ ] 10.3 验证编译:运行 `go build ./pkg/queue/...` 确认无编译错误 - -## 11. Worker 创建和启动 Asynq Scheduler - -- [ ] 11.1 在 `cmd/worker/main.go` 中创建 Asynq Scheduler 实例:`asynq.NewScheduler(redisOpt, &asynq.SchedulerOpts{Location: time.Local})` -- [ ] 11.2 注册订单超时周期任务:`scheduler.Register("@every 1m", asynq.NewTask(constants.TaskTypeOrderExpire, nil), asynq.Queue(constants.QueueDefault))` -- [ ] 11.3 启动 Scheduler:`scheduler.Start()`,并在 defer 中调用 `scheduler.Shutdown()` -- [ ] 11.4 验证编译:运行 `go build ./cmd/worker/...` 确认无编译错误 - -## 12. Handler 层修改 - DTO 响应 - -- [ ] 12.1 修改 `internal/handler/admin/order.go` 和 `internal/handler/h5/order.go` 的订单响应构建逻辑:添加 `ExpiresAt` 字段 -- [ ] 12.2 实现 `IsExpired` 动态计算逻辑:`if expiresAt != nil && paymentStatus == 1 { isExpired = now.After(expiresAt) }` -- [ ] 12.3 验证编译:运行 `go build ./internal/handler/...` 确认无编译错误 - -## 13. Handler 层修改 - 查询过期状态 - -- [ ] 13.1 修改 `internal/model/dto/order_dto.go` 的 `ListOrderRequest` 添加 `IsExpired *bool` 查询参数(可选) -- [ ] 13.2 修改 `internal/store/postgres/order_store.go` 的 `List()` 方法:添加过期状态筛选条件(`is_expired = true` 映射为 `expires_at <= NOW() AND payment_status = 1`) -- [ ] 12.3 验证编译:运行 `go build ./...` 确认无编译错误 - -## 14. 功能验证 - 订单创建 - -- [ ] 14.1 启动 API 服务,使用 Postman/curl 创建待支付订单(H5 端,支付方式 wechat),验证 `expires_at` 字段设置正确(约 `now + 30min`) -- [ ] 14.2 使用 PostgreSQL MCP 工具查询订单:`SELECT id, expires_at, payment_status FROM tb_order WHERE id = ?`,确认 `expires_at` 不为 NULL -- [ ] 14.3 创建后台钱包支付订单,验证 `expires_at` 为 NULL(订单立即支付成功) - -## 15. 功能验证 - 订单取消和钱包解冻 - -- [ ] 15.1 创建混合支付待支付订单(钱包预扣 2000 分),使用 PostgreSQL MCP 查询钱包冻结余额 -- [ ] 15.2 调用取消订单 API,验证订单状态变更为已取消(`payment_status = 3`),`expires_at` 变更为 NULL -- [ ] 15.3 使用 PostgreSQL MCP 查询钱包:确认冻结余额减少 2000 分 -- [ ] 15.4 创建纯在线支付订单(wechat),取消订单,确认不执行钱包解冻操作 - -## 16. 功能验证 - 支付成功清除过期时间 - -- [ ] 16.1 创建待支付订单(wechat),确认 `expires_at` 不为 NULL -- [ ] 16.2 模拟第三方支付回调成功,验证订单状态变更为已支付(`payment_status = 2`),`expires_at` 变更为 NULL -- [ ] 16.3 使用 PostgreSQL MCP 查询订单:`SELECT id, expires_at, payment_status FROM tb_order WHERE id = ?`,确认 `expires_at` 为 NULL - -## 17. 功能验证 - 定时任务自动取消 - -- [ ] 17.1 使用 PostgreSQL MCP 手动修改订单的 `expires_at` 为过去时间:`UPDATE tb_order SET expires_at = NOW() - INTERVAL '1 minute' WHERE id = ?` -- [ ] 17.2 启动 Worker 服务,等待 1 分钟后检查日志,确认定时任务执行成功 -- [ ] 17.3 使用 PostgreSQL MCP 查询订单:确认订单状态变更为已取消,`expires_at` 变更为 NULL -- [ ] 17.4 如果是混合支付订单,使用 PostgreSQL MCP 查询钱包:确认冻结余额解冻 - -## 18. 功能验证 - 查询过期状态 - -- [ ] 18.1 使用 Postman/curl 调用订单列表 API,筛选 `is_expired = true`,验证返回已过期的待支付订单 -- [ ] 18.2 调用订单列表 API,筛选 `is_expired = false`,验证返回未过期的待支付订单 -- [ ] 18.3 调用订单详情 API,验证响应包含 `is_expired` 字段且计算正确 - -## 19. 性能验证 - -- [ ] 19.1 使用 PostgreSQL MCP 的 `explain_query` 工具分析 `FindExpiredOrders` 查询:确认使用 `idx_order_expires` 索引 -- [ ] 19.2 验证查询耗时:在订单数量 > 10000 的情况下,查询耗时 < 50ms -- [ ] 19.3 验证定时任务处理耗时:单批次处理 100 条订单,总耗时 < 5s -- [ ] 19.4 使用 PostgreSQL MCP 检查数据库连接池状态:确认无连接池阻塞 - -## 20. 错误处理验证 - -- [ ] 20.1 模拟数据库连接失败场景:确认定时任务返回可重试错误,Asynq 自动重试 -- [ ] 20.2 模拟钱包不存在场景:确认订单取消失败,事务回滚,订单状态不变 -- [ ] 20.3 模拟冻结余额不足场景:确认订单取消失败,事务回滚,记录错误日志 -- [ ] 20.4 检查日志:确认所有错误场景都记录了详细日志(包含订单 ID、错误原因) - -## 21. 代码质量检查 - -- [ ] 21.1 运行 `gofmt -s -w .` 格式化代码 -- [ ] 21.2 运行 `go vet ./...` 检查代码问题 -- [ ] 21.3 运行 `go build ./...` 确认全部编译通过 -- [ ] 21.4 检查所有新增代码的中文注释:确认符合注释规范(导出符号有文档注释,复杂逻辑有实现注释) - -## 22. 文档更新 - -- [ ] 22.1 创建功能总结文档 `docs/order-expiration/功能总结.md`:说明超时机制、钱包解冻、查询过期状态 -- [ ] 22.2 更新 `README.md`:在"已实现功能"部分添加"订单超时自动失效" -- [ ] 22.3 更新 `openspec/specs/iot-order/spec.md`:同步 delta spec 到主规格文档(归档后) -- [ ] 22.4 更新 `openspec/specs/order-payment/spec.md`:同步 delta spec 到主规格文档(归档后) - -## 23. 最终验证 - -- [ ] 23.1 在开发环境完整测试一次完整流程:创建订单 → 超时自动取消 → 钱包解冻 -- [ ] 23.2 检查所有日志输出:确认日志级别正确(Info/Error),日志内容完整 -- [ ] 23.3 检查数据库:确认无脏数据(如订单已取消但钱包未解冻) -- [ ] 23.4 使用 Postman 导出 API 测试用例集(包含订单创建、取消、查询过期状态) - -## 24. 重构现有定时任务为 Asynq Scheduler - -- [ ] 24.1 在 `pkg/constants/constants.go` 中添加告警检查任务类型常量 `TaskTypeAlertCheck = "alert:check"` -- [ ] 24.2 在 `pkg/constants/constants.go` 中添加数据清理任务类型常量 `TaskTypeDataCleanup = "data:cleanup"` -- [ ] 24.3 创建 `internal/task/alert_check.go` 文件,定义 `AlertCheckHandler` 结构体 -- [ ] 24.4 实现 `NewAlertCheckHandler()` 构造函数,依赖注入 `alertService`, `logger` -- [ ] 24.5 实现 `HandleAlertCheck(ctx context.Context, task *asynq.Task) error` 方法,调用 `alertService.CheckAlerts()` -- [ ] 24.6 创建 `internal/task/data_cleanup.go` 文件,定义 `DataCleanupHandler` 结构体 -- [ ] 24.7 实现 `NewDataCleanupHandler()` 构造函数,依赖注入 `cleanupService`, `logger` -- [ ] 24.8 实现 `HandleDataCleanup(ctx context.Context, task *asynq.Task) error` 方法,调用 `cleanupService.RunScheduledCleanup()` -- [ ] 24.9 在 `pkg/queue/handler.go` 的 `RegisterHandlers()` 方法中调用 `registerAlertCheckHandler()` -- [ ] 24.10 实现 `registerAlertCheckHandler()` 方法:创建 `AlertCheckHandler` 并注册到 `mux.HandleFunc(constants.TaskTypeAlertCheck, ...)` -- [ ] 24.11 在 `pkg/queue/handler.go` 的 `RegisterHandlers()` 方法中调用 `registerDataCleanupHandler()` -- [ ] 24.12 实现 `registerDataCleanupHandler()` 方法:创建 `DataCleanupHandler` 并注册到 `mux.HandleFunc(constants.TaskTypeDataCleanup, ...)` -- [ ] 24.13 在 `cmd/worker/main.go` 的 Asynq Scheduler 中注册告警检查周期任务:`scheduler.Register("@every 1m", asynq.NewTask(constants.TaskTypeAlertCheck, nil))` -- [ ] 24.14 在 `cmd/worker/main.go` 的 Asynq Scheduler 中注册数据清理周期任务:`scheduler.Register("0 2 * * *", asynq.NewTask(constants.TaskTypeDataCleanup, nil))` (每天凌晨2点) -- [ ] 24.15 移除 `cmd/worker/main.go` 中的 `startAlertChecker` 函数定义(第 239-265 行) -- [ ] 24.16 移除 `cmd/worker/main.go` 中的 `startCleanupScheduler` 函数定义(第 267-303 行) -- [ ] 24.17 移除 `cmd/worker/main.go` 中对 `startAlertChecker` 和 `startCleanupScheduler` 的调用和相关代码 -- [ ] 24.18 验证编译:运行 `go build ./cmd/worker/...` 确认无编译错误 -- [ ] 24.19 验证编译:运行 `go build ./internal/task/...` 确认无编译错误 -- [ ] 24.20 验证编译:运行 `go build ./pkg/queue/...` 确认无编译错误 - -## 25. 提交和归档 - -- [ ] 25.1 使用 `/commit` 创建 Git commit,提交消息:"实现订单超时自动失效机制并重构定时任务为 Asynq Scheduler" -- [ ] 25.2 使用 `/opsx:verify` 验证实现与规格一致 -- [ ] 25.3 使用 `/opsx:archive` 归档变更,同步 delta specs 到主规格文档 -- [ ] 25.4 确认归档后 `openspec/specs/iot-order/spec.md` 和 `openspec/specs/order-payment/spec.md` 已更新 diff --git a/openspec/specs/iot-order/spec.md b/openspec/specs/iot-order/spec.md index c5a96d1..99047e0 100644 --- a/openspec/specs/iot-order/spec.md +++ b/openspec/specs/iot-order/spec.md @@ -60,7 +60,7 @@ This capability supports: ### Requirement: 订单状态流转 -系统 SHALL 管理订单的状态流转,确保状态变更符合业务规则。 +系统 SHALL 管理订单的状态流转,确保状态变更符合业务规则。**新增订单超时自动取消的详细场景。** **状态定义**: - **1-待支付**: 订单已创建,等待用户支付 @@ -71,7 +71,7 @@ This capability supports: **状态流转规则**: - 待支付(1) → 已支付(2): 用户完成支付 -- 待支付(1) → 已取消(4): 用户取消订单或订单超时 +- 待支付(1) → 已取消(4): 用户手动取消订单或订单超时(30 分钟) - 已支付(2) → 已完成(3): 系统完成订单处理(激活/发货) - 已支付(2) → 已退款(5): 用户申请退款且审核通过 - 已完成(3) → 已退款(5): 用户申请退款且审核通过(特殊情况) @@ -91,6 +91,25 @@ This capability supports: - **WHEN** 系统处理完设备级套餐订单(ID 为 10002),为设备绑定的所有 IoT 卡分配套餐 - **THEN** 系统将订单状态从 2(已支付) 变更为 3(已完成),`completed_at` 记录完成时间 +#### Scenario: 用户手动取消订单 + +- **WHEN** 用户手动取消待支付订单(ID 为 10003) +- **THEN** 系统将订单状态从 1(待支付) 变更为 4(已取消),`expires_at` 设置为 NULL,如有钱包预扣则解冻余额 + +#### Scenario: 订单超时自动取消 + +- **WHEN** 订单创建后 30 分钟未支付,定时任务扫描到该订单 +- **THEN** 系统自动将订单状态从 1(待支付) 变更为 4(已取消),`expires_at` 设置为 NULL,如有钱包预扣则解冻余额 + +#### Scenario: 订单超时自动取消(混合支付) + +- **WHEN** 混合支付订单创建后 30 分钟未完成在线支付,钱包已预扣 2000 分 +- **THEN** 系统自动取消订单,解冻钱包余额 2000 分 + +#### Scenario: 订单超时自动取消(纯在线支付) + +- **WHEN** 纯在线支付订单创建后 30 分钟未支付 +- **THEN** 系统自动取消订单,无需钱包解冻操作 --- ### Requirement: 订单支付方式 diff --git a/openspec/specs/order-expiration/spec.md b/openspec/specs/order-expiration/spec.md new file mode 100644 index 0000000..0dce8cc --- /dev/null +++ b/openspec/specs/order-expiration/spec.md @@ -0,0 +1,237 @@ +# Order Expiration + +## Purpose + +自动管理订单的超时失效,确保待支付订单在超时后自动取消,防止"僵尸订单"堆积,并自动释放已冻结的资源(如钱包余额)。 + +This capability supports: +- 订单超时时间配置和管理 +- 定时扫描和自动取消超时订单 +- 钱包余额自动解冻 +- 过期订单查询和筛选 + +## ADDED Requirements + +### Requirement: 订单过期时间字段 + +系统 SHALL 为每个订单设置过期时间字段(`expires_at`),用于判断订单是否超时。 + +**字段定义**: +- `expires_at`:订单过期时间(TIMESTAMP,可为 NULL) +- 创建时自动设置:`expires_at = created_at + 30分钟`(仅待支付订单) +- 已支付/已取消/已退款订单的 `expires_at` 为 NULL + +**索引设计**: +- 复合索引:`idx_order_expires(expires_at, payment_status)` 优化定时任务查询 + +#### Scenario: 创建待支付订单时设置过期时间 + +- **WHEN** 用户创建订单,支付方式为 wechat 或 alipay,订单状态为待支付(payment_status = 1) +- **THEN** 系统设置 `expires_at = created_at + 30分钟` + +#### Scenario: 创建钱包支付订单(后台)不设置过期时间 + +- **WHEN** 代理在后台创建订单,支付方式为 wallet,订单立即支付成功(payment_status = 2) +- **THEN** 系统不设置 `expires_at`,字段值为 NULL + +#### Scenario: 订单支付成功后清除过期时间 + +- **WHEN** 待支付订单支付成功,状态变更为已支付(payment_status = 2) +- **THEN** 系统将 `expires_at` 设置为 NULL + +#### Scenario: 订单取消后清除过期时间 + +- **WHEN** 订单被取消(payment_status = 3) +- **THEN** 系统将 `expires_at` 设置为 NULL + +--- + +### Requirement: 订单超时自动取消 + +系统 SHALL 通过定时任务自动扫描并取消超时订单。任务每分钟执行一次,批量处理超时订单。 + +**任务配置**: +- 任务类型:`TaskTypeOrderExpire = "order:expire"` +- 执行频率:每分钟 +- 单批处理量:最多 100 条 +- 超时时间:`OrderExpireTimeout = 30 * time.Minute` + +**任务逻辑**: +1. 查询条件:`expires_at <= NOW() AND payment_status = 1` +2. 批量取消订单:更新 `payment_status = 3`,`expires_at = NULL` +3. 钱包余额解冻(如果订单涉及钱包预扣) +4. 记录日志 + +#### Scenario: 定时任务扫描超时订单 + +- **WHEN** 定时任务执行,当前时间为 2026-02-28 10:30:00 +- **THEN** 系统查询 `expires_at <= '2026-02-28 10:30:00' AND payment_status = 1` 的订单,最多 100 条 + +#### Scenario: 批量取消超时订单 + +- **WHEN** 查询到 50 条超时订单 +- **THEN** 系统批量更新订单状态为已取消(payment_status = 3),`expires_at = NULL` + +#### Scenario: 钱包余额解冻(混合支付) + +- **WHEN** 超时订单使用了混合支付,钱包预扣 2000 分 +- **THEN** 系统解冻钱包余额 2000 分(`frozen_balance` 减少 2000) + +#### Scenario: 钱包余额解冻(纯钱包支付,H5 端) + +- **WHEN** 超时订单使用了钱包支付(H5 端创建待支付订单),钱包预扣 3000 分 +- **THEN** 系统解冻钱包余额 3000 分 + +#### Scenario: 无需解冻钱包(在线支付) + +- **WHEN** 超时订单使用了纯在线支付(wechat/alipay),没有钱包预扣 +- **THEN** 系统不执行钱包解冻操作 + +#### Scenario: 任务执行日志 + +- **WHEN** 定时任务执行完成 +- **THEN** 系统记录日志:处理订单数量、解冻钱包次数、执行耗时 + +--- + +### Requirement: 订单过期状态查询 + +系统 SHALL 支持按过期状态筛选订单,便于运营人员查询和分析超时订单。 + +**查询条件**(新增): +- `is_expired`(布尔值): + - `true`:查询已过期的待支付订单(`expires_at <= NOW() AND payment_status = 1`) + - `false`:查询未过期的待支付订单(`expires_at > NOW() AND payment_status = 1`) + - 不传:不按过期状态筛选 + +#### Scenario: 查询已过期的待支付订单 + +- **WHEN** 运营人员查询订单列表,筛选 `is_expired = true` +- **THEN** 系统返回 `expires_at <= NOW() AND payment_status = 1` 的订单列表 + +#### Scenario: 查询未过期的待支付订单 + +- **WHEN** 运营人员查询订单列表,筛选 `is_expired = false` +- **THEN** 系统返回 `expires_at > NOW() AND payment_status = 1` 的订单列表 + +#### Scenario: 订单详情显示过期状态 + +- **WHEN** 查询订单详情,订单为待支付且已超时 +- **THEN** 响应包含 `is_expired = true`,`expires_at` 字段显示过期时间 + +#### Scenario: 订单列表响应包含过期时间 + +- **WHEN** 查询订单列表 +- **THEN** 每个订单响应包含 `expires_at` 字段(可为 NULL) + +--- + +### Requirement: 钱包余额解冻逻辑 + +系统 SHALL 在订单取消(手动或自动)时,根据支付方式自动解冻钱包余额。 + +**解冻规则**: +- 钱包支付(H5 端待支付订单):解冻 `total_amount` +- 混合支付:解冻 `wallet_payment_amount` +- 纯在线支付:无需解冻 +- 后台钱包一步支付:无需解冻(订单创建时已完成支付) + +#### Scenario: 手动取消订单,解冻钱包 + +- **WHEN** 用户手动取消待支付订单,订单使用混合支付,钱包预扣 2000 分 +- **THEN** 系统解冻钱包余额 2000 分,订单状态变更为已取消 + +#### Scenario: 自动取消订单,解冻钱包 + +- **WHEN** 定时任务自动取消超时订单,订单使用钱包支付,钱包预扣 3000 分 +- **THEN** 系统解冻钱包余额 3000 分,订单状态变更为已取消 + +#### Scenario: 取消订单,无钱包预扣 + +- **WHEN** 用户取消待支付订单,订单使用纯在线支付(wechat) +- **THEN** 系统不执行钱包解冻操作 + +#### Scenario: 钱包解冻事务保证 + +- **WHEN** 订单取消涉及钱包解冻 +- **THEN** 订单状态更新和钱包余额解冻在同一事务中完成,任一失败则全部回滚 + +--- + +### Requirement: 超时配置常量 + +系统 SHALL 定义订单超时相关常量,统一管理超时时间和任务类型。 + +**常量定义**(`pkg/constants/constants.go`): +- `OrderExpireTimeout = 30 * time.Minute`:订单超时时间(30 分钟) +- `TaskTypeOrderExpire = "order:expire"`:订单超时取消任务类型 + +#### Scenario: 使用常量设置过期时间 + +- **WHEN** 创建待支付订单 +- **THEN** 系统使用 `constants.OrderExpireTimeout` 计算 `expires_at` + +#### Scenario: 使用常量注册任务 + +- **WHEN** 注册 Asynq 定时任务 +- **THEN** 系统使用 `constants.TaskTypeOrderExpire` 作为任务类型 + +--- + +### Requirement: 性能优化 + +系统 SHALL 通过索引优化和批量处理确保超时任务的性能符合要求。 + +**性能指标**: +- 定时任务查询耗时 < 50ms +- 单批次处理耗时 < 5s +- 单批处理量:100 条 + +**优化措施**: +- 使用复合索引 `idx_order_expires(expires_at, payment_status)` 优化查询 +- 批量更新订单状态(单 SQL 语句) +- 钱包解冻支持批量操作(单事务) + +#### Scenario: 复合索引优化查询 + +- **WHEN** 定时任务查询超时订单 +- **THEN** 数据库使用 `idx_order_expires` 索引,查询耗时 < 50ms + +#### Scenario: 批量处理限制 + +- **WHEN** 超时订单数量超过 100 条 +- **THEN** 系统单次最多处理 100 条,剩余订单下次执行时处理 + +#### Scenario: 任务执行时间限制 + +- **WHEN** 定时任务执行 +- **THEN** 单批次处理耗时 < 5s,包括查询、更新、解冻、日志记录 + +--- + +### Requirement: 数据库迁移 + +系统 SHALL 提供数据库迁移脚本,添加 `expires_at` 字段和索引。 + +**迁移内容**: +- 添加字段:`ALTER TABLE tb_order ADD COLUMN expires_at TIMESTAMP NULL COMMENT '订单过期时间'` +- 添加索引:`CREATE INDEX idx_order_expires ON tb_order(expires_at, payment_status)` + +**回滚脚本**: +- 删除索引:`DROP INDEX idx_order_expires ON tb_order` +- 删除字段:`ALTER TABLE tb_order DROP COLUMN expires_at` + +#### Scenario: 迁移脚本执行成功 + +- **WHEN** 执行 `migrate up` +- **THEN** `tb_order` 表新增 `expires_at` 字段和 `idx_order_expires` 索引 + +#### Scenario: 回滚脚本执行成功 + +- **WHEN** 执行 `migrate down` +- **THEN** `tb_order` 表删除 `expires_at` 字段和 `idx_order_expires` 索引 + +#### Scenario: 迁移对现有数据的影响 + +- **WHEN** 执行迁移脚本 +- **THEN** 已存在的订单 `expires_at` 字段值为 NULL,不影响现有业务 diff --git a/openspec/specs/order-payment/spec.md b/openspec/specs/order-payment/spec.md index a1bf955..c033789 100644 --- a/openspec/specs/order-payment/spec.md +++ b/openspec/specs/order-payment/spec.md @@ -323,3 +323,56 @@ - **WHEN** 后台创建订单 - **THEN** Handler 层使用 `CreateAdminOrderRequest` DTO(仅允许 wallet/offline),H5 端使用 `CreateOrderRequest` DTO(允许 wallet/wechat/alipay) + +--- + +### Requirement: 订单取消与钱包余额解冻 + +系统 SHALL 根据支付方式正确处理订单支付,包括钱包扣款、在线支付、混合支付等。**新增订单取消(手动或自动)时的钱包余额解冻逻辑。** + +**钱包支付流程**: +1. 检查钱包可用余额是否充足 +2. 冻结钱包余额(`frozen_balance` 增加) +3. 创建订单,状态为"待支付" +4. 订单完成后,扣减钱包余额(`balance` 减少,`frozen_balance` 减少),创建钱包明细记录 +5. 订单取消时(手动或自动),解冻钱包余额(`frozen_balance` 减少) + +**在线支付流程**: +1. 创建订单,状态为"待支付" +2. 调用第三方支付接口 +3. 用户完成支付后,订单状态变更为"已支付" +4. 订单完成后,订单状态变更为"已完成" + +**混合支付流程**: +1. 检查钱包可用余额是否充足(钱包支付部分) +2. 冻结钱包余额 +3. 创建订单,状态为"待支付" +4. 调用第三方支付接口(在线支付部分) +5. 用户完成在线支付后,扣减钱包余额,订单状态变更为"已支付" +6. 订单完成后,订单状态变更为"已完成" +7. 订单取消时(手动或自动),解冻钱包余额 + +#### Scenario: 订单手动取消,解冻钱包余额 + +- **WHEN** 用户使用钱包支付创建订单,订单金额为 3000 分,然后手动取消订单 +- **THEN** 系统解冻钱包余额 3000 分(`frozen_balance` 减少 3000),订单状态变更为"已取消" + +#### Scenario: 订单超时自动取消,解冻钱包余额 + +- **WHEN** 用户使用混合支付创建订单,钱包预扣 2000 分,30 分钟后订单超时 +- **THEN** 系统自动取消订单,解冻钱包余额 2000 分(`frozen_balance` 减少 2000),订单状态变更为"已取消" + +#### Scenario: 订单取消(纯在线支付),无需解冻 + +- **WHEN** 用户使用纯在线支付创建订单,30 分钟后订单超时 +- **THEN** 系统自动取消订单,不执行钱包解冻操作(因为没有钱包预扣) + +#### Scenario: 钱包解冻事务保证 + +- **WHEN** 订单取消涉及钱包解冻 +- **THEN** 订单状态更新(`payment_status = 3`、`expires_at = NULL`)和钱包余额解冻在同一事务中完成,任一失败则全部回滚 + +#### Scenario: 钱包解冻失败回滚 + +- **WHEN** 订单取消时,钱包解冻失败(如钱包不存在、冻结余额不足) +- **THEN** 事务回滚,订单状态不变,返回错误信息"订单取消失败" diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 660d7f9..fe5b3b1 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -61,6 +61,13 @@ const ( TaskTypePackageFirstActivation = "package:first:activation" // 首次实名激活 TaskTypePackageQueueActivation = "package:queue:activation" // 主套餐排队激活 TaskTypePackageDataReset = "package:data:reset" // 套餐流量重置 + + // 订单超时任务类型 + TaskTypeOrderExpire = "order:expire" // 订单超时自动取消 + + // 定时任务类型(由 Asynq Scheduler 调度) + TaskTypeAlertCheck = "alert:check" // 告警检查 + TaskTypeDataCleanup = "data:cleanup" // 数据清理 ) // 用户状态常量 @@ -150,6 +157,12 @@ const ( OrderStatusCancelled = "cancelled" // 已取消 ) +// 订单超时配置常量 +const ( + OrderExpireTimeout = 30 * time.Minute // 订单超时时间(30分钟) + OrderExpireBatchSize = 100 // 每次批量处理超时订单的数量上限 +) + // 队列配置常量 const ( QueueCritical = "critical" // 关键任务队列 diff --git a/pkg/constants/wallet.go b/pkg/constants/wallet.go index 51fe29d..55831df 100644 --- a/pkg/constants/wallet.go +++ b/pkg/constants/wallet.go @@ -32,7 +32,7 @@ const ( // 代理钱包交易子类型(当 transaction_type = "deduct" 用于订单支付时) const ( - WalletTransactionSubtypeSelfPurchase = "self_purchase" // 自购 + WalletTransactionSubtypeSelfPurchase = "self_purchase" // 自购 WalletTransactionSubtypePurchaseForSubordinate = "purchase_for_subordinate" // 给下级代理购买 ) diff --git a/pkg/queue/handler.go b/pkg/queue/handler.go index cf88893..0cff589 100644 --- a/pkg/queue/handler.go +++ b/pkg/queue/handler.go @@ -68,6 +68,9 @@ func (h *Handler) RegisterHandlers() *asynq.ServeMux { h.registerCommissionCalculationHandler() h.registerPollingHandlers() h.registerPackageActivationHandlers() + h.registerOrderExpireHandler() + h.registerAlertCheckHandler() + h.registerDataCleanupHandler() h.logger.Info("所有任务处理器注册完成") return h.mux @@ -179,6 +182,24 @@ func (h *Handler) registerPackageActivationHandlers() { h.logger.Info("注册排队激活任务处理器", zap.String("task_type", constants.TaskTypePackageQueueActivation)) } +func (h *Handler) registerOrderExpireHandler() { + orderExpireHandler := task.NewOrderExpireHandler(h.workerResult.Services.OrderExpirer, h.logger) + h.mux.HandleFunc(constants.TaskTypeOrderExpire, orderExpireHandler.HandleOrderExpire) + h.logger.Info("注册订单超时取消任务处理器", zap.String("task_type", constants.TaskTypeOrderExpire)) +} + +func (h *Handler) registerAlertCheckHandler() { + alertCheckHandler := task.NewAlertCheckHandler(h.workerResult.Services.AlertService, h.logger) + h.mux.HandleFunc(constants.TaskTypeAlertCheck, alertCheckHandler.HandleAlertCheck) + h.logger.Info("注册告警检查任务处理器", zap.String("task_type", constants.TaskTypeAlertCheck)) +} + +func (h *Handler) registerDataCleanupHandler() { + dataCleanupHandler := task.NewDataCleanupHandler(h.workerResult.Services.CleanupService, h.logger) + h.mux.HandleFunc(constants.TaskTypeDataCleanup, dataCleanupHandler.HandleDataCleanup) + h.logger.Info("注册数据清理任务处理器", zap.String("task_type", constants.TaskTypeDataCleanup)) +} + // GetMux 获取 ServeMux(用于启动 Worker 服务器) func (h *Handler) GetMux() *asynq.ServeMux { return h.mux diff --git a/pkg/queue/types.go b/pkg/queue/types.go index e874e90..29cbb8b 100644 --- a/pkg/queue/types.go +++ b/pkg/queue/types.go @@ -1,6 +1,8 @@ package queue import ( + "context" + "github.com/break/junhong_cmp_fiber/internal/service/commission_calculation" "github.com/break/junhong_cmp_fiber/internal/service/commission_stats" packagepkg "github.com/break/junhong_cmp_fiber/internal/service/package" @@ -8,6 +10,13 @@ import ( "github.com/break/junhong_cmp_fiber/internal/store/postgres" ) +// OrderExpirer 订单超时取消接口 +// 解耦 pkg/queue 与 internal/service/order 之间的循环依赖 +type OrderExpirer interface { + // CancelExpiredOrders 批量取消已超时的待支付订单,返回取消数量 + CancelExpiredOrders(ctx context.Context) (int, error) +} + // WorkerStores Worker 侧所有 Store 的集合 type WorkerStores struct { IotCardImportTask *postgres.IotCardImportTaskStore @@ -33,6 +42,7 @@ type WorkerStores struct { // 新增代理钱包 Store AgentWallet *postgres.AgentWalletStore AgentWalletTransaction *postgres.AgentWalletTransactionStore + CardWallet *postgres.CardWalletStore // 卡钱包 Store(用于订单取消时解冻) } // WorkerServices Worker 侧所有 Service 的集合 @@ -44,6 +54,7 @@ type WorkerServices struct { ResetService *packagepkg.ResetService AlertService *pollingSvc.AlertService CleanupService *pollingSvc.CleanupService + OrderExpirer OrderExpirer // 订单超时取消服务(接口类型,避免循环依赖) } // WorkerBootstrapResult Worker Bootstrap 结果