package exchange import ( "context" "time" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/errors" "github.com/break/junhong_cmp_fiber/pkg/middleware" "gorm.io/gorm" "gorm.io/gorm/clause" ) func (s *Service) executeMigration(ctx context.Context, order *model.ExchangeOrder) (int64, error) { var migrationBalance int64 err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { if order.NewAssetID == nil || *order.NewAssetID == 0 { return errors.New(errors.CodeInvalidParam, "新资产信息缺失") } oldAsset, err := s.resolveAssetByIdentifier(ctx, order.OldAssetType, order.OldAssetIdentifier) if err != nil { return err } newAsset, err := s.resolveAssetByIdentifier(ctx, order.OldAssetType, order.NewAssetIdentifier) if err != nil { return err } migrationBalance, err = s.transferWalletBalanceWithTx(ctx, tx, order, oldAsset, newAsset) if err != nil { return err } if err = s.migratePackageUsageWithTx(ctx, tx, oldAsset, newAsset); err != nil { return err } if err = s.copyAccumulatedFieldsWithTx(tx, oldAsset, newAsset); err != nil { return err } if err = s.copyResourceTagsWithTx(ctx, tx, oldAsset, newAsset); err != nil { return err } if oldAsset.VirtualNo != "" && newAsset.VirtualNo != "" { if err = tx.Model(&model.PersonalCustomerDevice{}). Where("virtual_no = ?", oldAsset.VirtualNo). Updates(map[string]any{"virtual_no": newAsset.VirtualNo, "updated_at": time.Now()}).Error; err != nil { return errors.Wrap(errors.CodeDatabaseError, err, "更新客户绑定关系失败") } } if err = s.updateOldAssetStatusWithTx(tx, oldAsset); err != nil { return err } if err = tx.Model(&model.ExchangeOrder{}).Where("id = ?", order.ID).Updates(map[string]any{ "migration_completed": true, "migration_balance": migrationBalance, "updater": middleware.GetUserIDFromContext(ctx), "updated_at": time.Now(), }).Error; err != nil { return errors.Wrap(errors.CodeDatabaseError, err, "更新换货单迁移状态失败") } return nil }) if err != nil { return 0, errors.Wrap(errors.CodeExchangeMigrationFailed, err, "执行全量迁移失败") } return migrationBalance, nil } func (s *Service) transferWalletBalanceWithTx(ctx context.Context, tx *gorm.DB, order *model.ExchangeOrder, oldAsset, newAsset *resolvedExchangeAsset) (int64, error) { var oldWallet model.AssetWallet if err := tx.WithContext(ctx).Where("resource_type = ? AND resource_id = ?", oldAsset.AssetType, oldAsset.AssetID).First(&oldWallet).Error; err != nil { if err != gorm.ErrRecordNotFound { return 0, errors.Wrap(errors.CodeDatabaseError, err, "查询旧资产钱包失败") } } var newWallet model.AssetWallet if err := tx.WithContext(ctx).Where("resource_type = ? AND resource_id = ?", newAsset.AssetType, newAsset.AssetID).First(&newWallet).Error; err != nil { if err != gorm.ErrRecordNotFound { return 0, errors.Wrap(errors.CodeDatabaseError, err, "查询新资产钱包失败") } shopTag := uint(0) if newAsset.ShopID != nil { shopTag = *newAsset.ShopID } newWallet = model.AssetWallet{ResourceType: newAsset.AssetType, ResourceID: newAsset.AssetID, Balance: 0, FrozenBalance: 0, Currency: "CNY", Status: 1, Version: 0, ShopIDTag: shopTag} if err = tx.WithContext(ctx).Create(&newWallet).Error; err != nil { return 0, errors.Wrap(errors.CodeDatabaseError, err, "创建新资产钱包失败") } } migrationBalance := oldWallet.Balance if migrationBalance <= 0 { return 0, nil } beforeBalance := newWallet.Balance if err := tx.WithContext(ctx).Model(&model.AssetWallet{}).Where("id = ?", oldWallet.ID).Updates(map[string]any{"balance": 0, "updated_at": time.Now()}).Error; err != nil { return 0, errors.Wrap(errors.CodeDatabaseError, err, "清空旧资产钱包余额失败") } if err := tx.WithContext(ctx).Model(&model.AssetWallet{}).Where("id = ?", newWallet.ID).Updates(map[string]any{"balance": gorm.Expr("balance + ?", migrationBalance), "updated_at": time.Now()}).Error; err != nil { return 0, errors.Wrap(errors.CodeDatabaseError, err, "增加新资产钱包余额失败") } refType := "exchange" if err := tx.WithContext(ctx).Create(&model.AssetWalletTransaction{ AssetWalletID: newWallet.ID, ResourceType: newAsset.AssetType, ResourceID: newAsset.AssetID, UserID: middleware.GetUserIDFromContext(ctx), TransactionType: "refund", Amount: migrationBalance, BalanceBefore: beforeBalance, BalanceAfter: beforeBalance + migrationBalance, Status: 1, ReferenceType: &refType, ReferenceNo: &order.ExchangeNo, Creator: middleware.GetUserIDFromContext(ctx), ShopIDTag: newWallet.ShopIDTag, EnterpriseIDTag: newWallet.EnterpriseIDTag, }).Error; err != nil { return 0, errors.Wrap(errors.CodeDatabaseError, err, "写入迁移钱包流水失败") } return migrationBalance, nil } func (s *Service) migratePackageUsageWithTx(ctx context.Context, tx *gorm.DB, oldAsset, newAsset *resolvedExchangeAsset) error { query := tx.WithContext(ctx).Model(&model.PackageUsage{}).Where("status IN ?", []int{constants.PackageUsageStatusPending, constants.PackageUsageStatusActive, constants.PackageUsageStatusDepleted}) if oldAsset.AssetType == constants.ExchangeAssetTypeIotCard { query = query.Where("iot_card_id = ?", oldAsset.AssetID) } else { query = query.Where("device_id = ?", oldAsset.AssetID) } var usageIDs []uint if err := query.Pluck("id", &usageIDs).Error; err != nil { return errors.Wrap(errors.CodeDatabaseError, err, "查询套餐使用记录失败") } if len(usageIDs) == 0 { return nil } updates := map[string]any{"updated_at": time.Now()} if oldAsset.AssetType == constants.ExchangeAssetTypeIotCard { updates["iot_card_id"] = newAsset.AssetID } else { updates["device_id"] = newAsset.AssetID } if err := tx.WithContext(ctx).Model(&model.PackageUsage{}).Where("id IN ?", usageIDs).Updates(updates).Error; err != nil { return errors.Wrap(errors.CodeDatabaseError, err, "迁移套餐使用记录失败") } if err := tx.WithContext(ctx).Model(&model.PackageUsageDailyRecord{}).Where("package_usage_id IN ?", usageIDs).Update("updated_at", gorm.Expr("updated_at")).Error; err != nil { return errors.Wrap(errors.CodeDatabaseError, err, "迁移套餐日记录失败") } return nil } func (s *Service) copyAccumulatedFieldsWithTx(tx *gorm.DB, oldAsset, newAsset *resolvedExchangeAsset) error { if oldAsset.AssetType == constants.ExchangeAssetTypeIotCard { if oldAsset.Card == nil { return errors.New(errors.CodeAssetNotFound) } if err := tx.Model(&model.IotCard{}).Where("id = ?", newAsset.AssetID).Updates(map[string]any{ "accumulated_recharge": oldAsset.Card.AccumulatedRecharge, "first_commission_paid": oldAsset.Card.FirstCommissionPaid, "accumulated_recharge_by_series": oldAsset.Card.AccumulatedRechargeBySeriesJSON, "first_recharge_triggered_by_series": oldAsset.Card.FirstRechargeTriggeredBySeriesJSON, "updated_at": time.Now(), }).Error; err != nil { return errors.Wrap(errors.CodeDatabaseError, err, "复制旧卡累计字段失败") } return nil } if oldAsset.Device == nil { return errors.New(errors.CodeAssetNotFound) } if err := tx.Model(&model.Device{}).Where("id = ?", newAsset.AssetID).Updates(map[string]any{ "accumulated_recharge": oldAsset.Device.AccumulatedRecharge, "first_commission_paid": oldAsset.Device.FirstCommissionPaid, "accumulated_recharge_by_series": oldAsset.Device.AccumulatedRechargeBySeriesJSON, "first_recharge_triggered_by_series": oldAsset.Device.FirstRechargeTriggeredBySeriesJSON, "updated_at": time.Now(), }).Error; err != nil { return errors.Wrap(errors.CodeDatabaseError, err, "复制旧设备累计字段失败") } return nil } func (s *Service) copyResourceTagsWithTx(ctx context.Context, tx *gorm.DB, oldAsset, newAsset *resolvedExchangeAsset) error { var tags []*model.ResourceTag if err := tx.WithContext(ctx).Where("resource_type = ? AND resource_id = ?", oldAsset.AssetType, oldAsset.AssetID).Find(&tags).Error; err != nil { return errors.Wrap(errors.CodeDatabaseError, err, "查询资源标签失败") } var creator = middleware.GetUserIDFromContext(ctx) for _, item := range tags { if item == nil { continue } record := &model.ResourceTag{ ResourceType: newAsset.AssetType, ResourceID: newAsset.AssetID, TagID: item.TagID, EnterpriseID: item.EnterpriseID, ShopID: item.ShopID, BaseModel: model.BaseModel{Creator: creator, Updater: creator}, } if err := tx.WithContext(ctx).Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "resource_type"}, {Name: "resource_id"}, {Name: "tag_id"}}, DoNothing: true}).Create(record).Error; err != nil { return errors.Wrap(errors.CodeDatabaseError, err, "复制资源标签失败") } } return nil } func (s *Service) updateOldAssetStatusWithTx(tx *gorm.DB, oldAsset *resolvedExchangeAsset) error { if oldAsset.AssetType == constants.ExchangeAssetTypeIotCard { if err := tx.Model(&model.IotCard{}).Where("id = ?", oldAsset.AssetID).Updates(map[string]any{"asset_status": 3, "updated_at": time.Now()}).Error; err != nil { return errors.Wrap(errors.CodeDatabaseError, err, "更新旧卡状态失败") } return nil } if err := tx.Model(&model.Device{}).Where("id = ?", oldAsset.AssetID).Updates(map[string]any{"asset_status": 3, "updated_at": time.Now()}).Error; err != nil { return errors.Wrap(errors.CodeDatabaseError, err, "更新旧设备状态失败") } return nil }