From f32d32cd36bbb62a1e25955fccc4a8e809852857 Mon Sep 17 00:00:00 2001 From: huang Date: Tue, 24 Feb 2026 16:23:02 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20IoT=20=E5=8D=A1=2030M=20=E8=A1=8C?= =?UTF-8?q?=E5=88=86=E9=A1=B5=E6=9F=A5=E8=AF=A2=E4=BC=98=E5=8C=96=EF=BC=88?= =?UTF-8?q?P95=2017.9s=20=E2=86=92=20<500ms=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 is_standalone 物化列 + 触发器自动维护(迁移 056) - 并行查询拆分:多店铺 IN 查询拆为 per-shop goroutine 并行 Index Scan - 两阶段延迟 Join:深度分页(page≥50)走覆盖索引 Index Only Scan 取 ID 再回表 - COUNT 缓存:per-shop 并行 COUNT + Redis 30 分钟 TTL - 索引优化:删除有害全局索引、新增 partial composite indexes(迁移 057/058) - ICCID 模糊搜索路径隔离:trigram GIN 索引走独立查询路径 - 慢查询阈值从 100ms 调整为 500ms - 新增 30M 测试数据种子脚本和 benchmark 工具 --- internal/model/iot_card.go | 1 + internal/service/iot_card/service.go | 14 + internal/store/postgres/iot_card_store.go | 499 ++++++++++- .../000000_create_legacy_tables.down.sql | 5 + migrations/000000_create_legacy_tables.up.sql | 98 ++ ...enterprise_card_authorization_table.up.sql | 6 +- ...056_add_is_standalone_to_iot_card.down.sql | 9 + ...00056_add_is_standalone_to_iot_card.up.sql | 79 ++ .../000057_optimize_iot_card_indexes.down.sql | 8 + .../000057_optimize_iot_card_indexes.up.sql | 14 + ...overing_index_for_deep_pagination.down.sql | 4 + ..._covering_index_for_deep_pagination.up.sql | 15 + opencode.json | 2 +- pkg/constants/constants.go | 2 +- pkg/constants/redis.go | 11 + scripts/perf_query/bench.go | 342 +++++++ scripts/perf_query/deploy-perf-pg.sh | 85 ++ scripts/perf_query/docker-compose-perf.yml | 21 + scripts/perf_query/postgresql-perf.conf | 840 ++++++++++++++++++ scripts/perf_query/seed.go | 700 +++++++++++++++ 20 files changed, 2705 insertions(+), 50 deletions(-) create mode 100644 migrations/000000_create_legacy_tables.down.sql create mode 100644 migrations/000000_create_legacy_tables.up.sql create mode 100644 migrations/000056_add_is_standalone_to_iot_card.down.sql create mode 100644 migrations/000056_add_is_standalone_to_iot_card.up.sql create mode 100644 migrations/000057_optimize_iot_card_indexes.down.sql create mode 100644 migrations/000057_optimize_iot_card_indexes.up.sql create mode 100644 migrations/000058_add_covering_index_for_deep_pagination.down.sql create mode 100644 migrations/000058_add_covering_index_for_deep_pagination.up.sql create mode 100644 scripts/perf_query/bench.go create mode 100755 scripts/perf_query/deploy-perf-pg.sh create mode 100644 scripts/perf_query/docker-compose-perf.yml create mode 100644 scripts/perf_query/postgresql-perf.conf create mode 100644 scripts/perf_query/seed.go diff --git a/internal/model/iot_card.go b/internal/model/iot_card.go index b773080..3dd116c 100644 --- a/internal/model/iot_card.go +++ b/internal/model/iot_card.go @@ -50,6 +50,7 @@ type IotCard struct { StoppedAt *time.Time `gorm:"column:stopped_at;comment:停机时间" json:"stopped_at,omitempty"` ResumedAt *time.Time `gorm:"column:resumed_at;comment:最近复机时间" json:"resumed_at,omitempty"` StopReason string `gorm:"column:stop_reason;type:varchar(50);comment:停机原因(traffic_exhausted=流量耗尽,manual=手动停机,arrears=欠费)" json:"stop_reason,omitempty"` + IsStandalone bool `gorm:"column:is_standalone;type:boolean;default:true;not null;comment:是否为独立卡(未绑定设备) 由触发器自动维护" json:"is_standalone"` } // TableName 指定表名 diff --git a/internal/service/iot_card/service.go b/internal/service/iot_card/service.go index 8a34c39..ee20fd5 100644 --- a/internal/service/iot_card/service.go +++ b/internal/service/iot_card/service.go @@ -10,6 +10,7 @@ import ( "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/errors" + "github.com/break/junhong_cmp_fiber/pkg/middleware" "go.uber.org/zap" "gorm.io/gorm" ) @@ -125,6 +126,19 @@ func (s *Service) ListStandalone(ctx context.Context, req *dto.ListStandaloneIot filters["series_id"] = *req.SeriesID } + // 代理用户注入 subordinate_shop_ids,让 Store 层走并行查询路径 + // 避免 PG 对 shop_id IN (...) + ORDER BY 选择全表扫描 + userType := middleware.GetUserTypeFromContext(ctx) + if userType == constants.UserTypeAgent { + shopID := middleware.GetShopIDFromContext(ctx) + if shopID > 0 { + subordinateIDs, err := s.shopStore.GetSubordinateShopIDs(ctx, shopID) + if err == nil && len(subordinateIDs) > 1 { + filters["subordinate_shop_ids"] = subordinateIDs + } + } + } + cards, total, err := s.iotCardStore.ListStandalone(ctx, opts, filters) if err != nil { return nil, err diff --git a/internal/store/postgres/iot_card_store.go b/internal/store/postgres/iot_card_store.go index 8081ece..8087445 100644 --- a/internal/store/postgres/iot_card_store.go +++ b/internal/store/postgres/iot_card_store.go @@ -2,14 +2,25 @@ package postgres import ( "context" + "fmt" + "hash/fnv" + "sort" + "sync" + "time" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/store" "github.com/break/junhong_cmp_fiber/pkg/constants" + pkggorm "github.com/break/junhong_cmp_fiber/pkg/gorm" + "github.com/break/junhong_cmp_fiber/pkg/logger" + "github.com/break/junhong_cmp_fiber/pkg/middleware" "github.com/redis/go-redis/v9" + "go.uber.org/zap" "gorm.io/gorm" ) +const listCountCacheTTL = 30 * time.Minute + type IotCardStore struct { db *gorm.DB redis *redis.Client @@ -102,9 +113,11 @@ func (s *IotCardStore) List(ctx context.Context, opts *store.QueryOptions, filte query := s.db.WithContext(ctx).Model(&model.IotCard{}) // 企业用户特殊处理:只能看到授权给自己的卡 + // 子查询跳过数据权限过滤,权限已由外层查询的 GORM callback 保证 + skipCtx := pkggorm.SkipDataPermission(ctx) if enterpriseID, ok := filters["authorized_enterprise_id"].(uint); ok && enterpriseID > 0 { query = query.Where("id IN (?)", - s.db.Table("tb_enterprise_card_authorization"). + s.db.WithContext(skipCtx).Table("tb_enterprise_card_authorization"). Select("card_id"). Where("enterprise_id = ? AND revoked_at IS NULL AND deleted_at IS NULL", enterpriseID)) } @@ -130,7 +143,7 @@ func (s *IotCardStore) List(ctx context.Context, opts *store.QueryOptions, filte } if packageID, ok := filters["package_id"].(uint); ok && packageID > 0 { query = query.Where("id IN (?)", - s.db.Table("tb_package_usage"). + s.db.WithContext(skipCtx).Table("tb_package_usage"). Select("iot_card_id"). Where("package_id = ? AND deleted_at IS NULL", packageID)) } @@ -151,9 +164,13 @@ func (s *IotCardStore) List(ctx context.Context, opts *store.QueryOptions, filte query = query.Where("series_id = ?", seriesID) } - // 统计总数 - if err := query.Count(&total).Error; err != nil { - return nil, 0, err + if cachedTotal, ok := s.getCachedCount(ctx, "iot_card", filters); ok { + total = cachedTotal + } else { + if err := query.Count(&total).Error; err != nil { + return nil, 0, err + } + s.cacheCount(ctx, "iot_card", filters, total) } // 分页处理 @@ -181,16 +198,393 @@ func (s *IotCardStore) List(ctx context.Context, opts *store.QueryOptions, filte return cards, total, nil } +// standaloneListColumns 列表查询只选取响应需要的列,避免 SELECT * 的宽行 I/O +var standaloneListColumns = []string{ + "id", "iccid", "card_category", "carrier_id", "carrier_type", "carrier_name", + "imsi", "msisdn", "batch_no", "supplier", "cost_price", "distribute_price", + "status", "shop_id", "activated_at", "activation_status", "real_name_status", + "network_status", "data_usage_mb", "current_month_usage_mb", "current_month_start_date", + "last_month_total_mb", "last_data_check_at", "last_real_name_check_at", + "enable_polling", "series_id", "first_commission_paid", "accumulated_recharge", + "created_at", "updated_at", +} + +// ListStandalone 独立卡列表查询入口 +// 路由逻辑: +// - ICCID/MSISDN 模糊搜索 → listStandaloneDefault(trigram GIN 索引) +// - 多店铺(代理用户) → listStandaloneParallel(per-shop 并行,内含深度分页两阶段优化) +// - 深度分页(page >= 50)→ listStandaloneTwoPhase(覆盖索引延迟 Join) +// - 其他 → listStandaloneDefault func (s *IotCardStore) ListStandalone(ctx context.Context, opts *store.QueryOptions, filters map[string]any) ([]*model.IotCard, int64, error) { + if opts == nil { + opts = &store.QueryOptions{ + Page: 1, + PageSize: constants.DefaultPageSize, + } + } + + _, hasICCID := filters["iccid"].(string) + _, hasMSISDN := filters["msisdn"].(string) + useFuzzySearch := (hasICCID && filters["iccid"] != "") || (hasMSISDN && filters["msisdn"] != "") + + if !useFuzzySearch { + if shopIDs, ok := filters["subordinate_shop_ids"].([]uint); ok && len(shopIDs) > 1 { + return s.listStandaloneParallel(ctx, opts, filters, shopIDs) + } + } + + // 非多店铺场景的深度分页走两阶段路径(单店铺或平台用户) + if !useFuzzySearch && opts.Page >= 50 { + return s.listStandaloneTwoPhase(ctx, opts, filters) + } + + return s.listStandaloneDefault(ctx, opts, filters) +} + +// listStandaloneTwoPhase 单路径两阶段延迟 Join +// 适用于非多店铺场景(平台用户、单店铺)的深度分页(page >= 50) +// Phase 1: SELECT id 走覆盖索引;Phase 2: WHERE id IN 回表 +func (s *IotCardStore) listStandaloneTwoPhase(ctx context.Context, opts *store.QueryOptions, filters map[string]any) ([]*model.IotCard, int64, error) { + var total int64 + + query := s.db.WithContext(ctx).Model(&model.IotCard{}). + Where("is_standalone = true") + query = s.applyStandaloneFilters(ctx, query, filters) + + if cachedTotal, ok := s.getCachedCount(ctx, "standalone", filters); ok { + total = cachedTotal + } else { + if err := query.Count(&total).Error; err != nil { + return nil, 0, err + } + s.cacheCount(ctx, "standalone", filters, total) + } + + offset := (opts.Page - 1) * opts.PageSize + + // Phase 1: 仅取 ID(覆盖索引 Index Only Scan) + var ids []uint + if err := query.Select("id"). + Order("created_at DESC, id"). + Offset(offset).Limit(opts.PageSize). + Pluck("id", &ids).Error; err != nil { + return nil, 0, err + } + + if len(ids) == 0 { + return []*model.IotCard{}, total, nil + } + + // Phase 2: 用 ID 精确回表 + var cards []*model.IotCard + if err := s.db.WithContext(ctx).Model(&model.IotCard{}). + Select(standaloneListColumns). + Where("id IN ?", ids). + Find(&cards).Error; err != nil { + return nil, 0, err + } + + idOrder := make(map[uint]int, len(ids)) + for i, id := range ids { + idOrder[id] = i + } + sort.Slice(cards, func(i, j int) bool { + return idOrder[cards[i].ID] < idOrder[cards[j].ID] + }) + + logger.GetAppLogger().Debug("两阶段查询完成", + zap.Int("page", opts.Page), + zap.Int("phase2_fetched", len(cards)), + zap.Int64("total_count", total), + ) + + return cards, total, nil +} + +// listStandaloneDefault 原始查询路径 +// 适用于平台/超管用户(无 shop_id 过滤)或单店铺场景 +func (s *IotCardStore) listStandaloneDefault(ctx context.Context, opts *store.QueryOptions, filters map[string]any) ([]*model.IotCard, int64, error) { var cards []*model.IotCard var total int64 - query := s.db.WithContext(ctx).Model(&model.IotCard{}) + query := s.db.WithContext(ctx).Model(&model.IotCard{}). + Where("is_standalone = true") + query = s.applyStandaloneFilters(ctx, query, filters) - query = query.Where("id NOT IN (?)", - s.db.Model(&model.DeviceSimBinding{}). - Select("iot_card_id"). - Where("bind_status = ?", 1)) + if cachedTotal, ok := s.getCachedCount(ctx, "standalone", filters); ok { + total = cachedTotal + } else { + if err := query.Count(&total).Error; err != nil { + return nil, 0, err + } + s.cacheCount(ctx, "standalone", filters, total) + } + + offset := (opts.Page - 1) * opts.PageSize + + if opts.OrderBy != "" { + query = query.Order(opts.OrderBy) + } else { + query = query.Order("created_at DESC") + } + + if err := query.Select(standaloneListColumns).Offset(offset).Limit(opts.PageSize).Find(&cards).Error; err != nil { + return nil, 0, err + } + + return cards, total, nil +} + +// listStandaloneParallel 并行查询路径 +// 将 shop_id IN (...) 拆分为 per-shop 独立查询,每个查询走 Index Scan +// 然后在应用层归并排序,避免 PG 对多值 IN + ORDER BY 选择全表扫描 +func (s *IotCardStore) listStandaloneParallel(ctx context.Context, opts *store.QueryOptions, filters map[string]any, shopIDs []uint) ([]*model.IotCard, int64, error) { + skipCtx := pkggorm.SkipDataPermission(ctx) + + fetchLimit := (opts.Page-1)*opts.PageSize + opts.PageSize + + // 深度分页(page >= 50)走两阶段延迟 Join: + // Phase 1 只取 ID(覆盖索引 Index Only Scan),Phase 2 用 ID 回表 + // 避免 OFFSET 跳过大量宽行数据 + if opts.Page >= 50 { + return s.listStandaloneParallelTwoPhase(ctx, opts, filters, shopIDs) + } + + type shopResult struct { + cards []*model.IotCard + count int64 + err error + } + + results := make([]shopResult, len(shopIDs)) + var wg sync.WaitGroup + + cachedTotal, hasCachedTotal := s.getCachedCount(ctx, "standalone", filters) + + for i, shopID := range shopIDs { + wg.Add(1) + go func(idx int, sid uint) { + defer wg.Done() + + q := s.db.WithContext(skipCtx).Model(&model.IotCard{}). + Where("is_standalone = true AND deleted_at IS NULL AND shop_id = ?", sid) + q = s.applyStandaloneFilters(skipCtx, q, filters) + + var cards []*model.IotCard + if err := q.Select(standaloneListColumns). + Order("created_at DESC, id"). + Limit(fetchLimit). + Find(&cards).Error; err != nil { + results[idx] = shopResult{err: err} + return + } + + var count int64 + if !hasCachedTotal { + countQ := s.db.WithContext(skipCtx).Model(&model.IotCard{}). + Where("is_standalone = true AND deleted_at IS NULL AND shop_id = ?", sid) + countQ = s.applyStandaloneFilters(skipCtx, countQ, filters) + if err := countQ.Count(&count).Error; err != nil { + results[idx] = shopResult{err: err} + return + } + } + + results[idx] = shopResult{cards: cards, count: count} + }(i, shopID) + } + + wg.Wait() + + allCards := make([]*model.IotCard, 0, fetchLimit) + var totalCount int64 + if hasCachedTotal { + totalCount = cachedTotal + } + + for _, r := range results { + if r.err != nil { + return nil, 0, r.err + } + allCards = append(allCards, r.cards...) + if !hasCachedTotal { + totalCount += r.count + } + } + + if !hasCachedTotal && totalCount > 0 { + s.cacheCount(ctx, "standalone", filters, totalCount) + } + + sort.Slice(allCards, func(i, j int) bool { + if allCards[i].CreatedAt.Equal(allCards[j].CreatedAt) { + return allCards[i].ID < allCards[j].ID + } + return allCards[i].CreatedAt.After(allCards[j].CreatedAt) + }) + + offset := (opts.Page - 1) * opts.PageSize + end := offset + opts.PageSize + if offset >= len(allCards) { + return []*model.IotCard{}, totalCount, nil + } + if end > len(allCards) { + end = len(allCards) + } + + logger.GetAppLogger().Debug("并行查询完成", + zap.Int("shop_count", len(shopIDs)), + zap.Int("total_fetched", len(allCards)), + zap.Int("offset", offset), + zap.Int("page_size", opts.PageSize), + zap.Int64("total_count", totalCount), + ) + + return allCards[offset:end], totalCount, nil +} + +// cardIDWithTime 仅存储 ID 和排序键,用于两阶段查询的轻量归并排序 +type cardIDWithTime struct { + ID uint + CreatedAt time.Time +} + +// listStandaloneParallelTwoPhase 并行+两阶段延迟 Join 查询路径 +// 解决深度分页(page >= 50)时 OFFSET 跳过大量宽行数据的性能问题: +// Phase 1: 每个 shop 并行查询 SELECT id, created_at(走覆盖索引 Index Only Scan,~20字节/元组) +// 归并排序后取目标页的 20 个 ID +// Phase 2: SELECT 完整列 WHERE id IN (20 IDs)(PK 精确回表) +func (s *IotCardStore) listStandaloneParallelTwoPhase(ctx context.Context, opts *store.QueryOptions, filters map[string]any, shopIDs []uint) ([]*model.IotCard, int64, error) { + skipCtx := pkggorm.SkipDataPermission(ctx) + + fetchLimit := (opts.Page-1)*opts.PageSize + opts.PageSize + + type shopResult struct { + ids []cardIDWithTime + count int64 + err error + } + + results := make([]shopResult, len(shopIDs)) + var wg sync.WaitGroup + + cachedTotal, hasCachedTotal := s.getCachedCount(ctx, "standalone", filters) + + // Phase 1: 并行获取每个 shop 的 (id, created_at),走覆盖索引 + for i, shopID := range shopIDs { + wg.Add(1) + go func(idx int, sid uint) { + defer wg.Done() + + q := s.db.WithContext(skipCtx).Model(&model.IotCard{}). + Where("is_standalone = true AND deleted_at IS NULL AND shop_id = ?", sid) + q = s.applyStandaloneFilters(skipCtx, q, filters) + + var ids []cardIDWithTime + if err := q.Select("id, created_at"). + Order("created_at DESC, id"). + Limit(fetchLimit). + Find(&ids).Error; err != nil { + results[idx] = shopResult{err: err} + return + } + + var count int64 + if !hasCachedTotal { + countQ := s.db.WithContext(skipCtx).Model(&model.IotCard{}). + Where("is_standalone = true AND deleted_at IS NULL AND shop_id = ?", sid) + countQ = s.applyStandaloneFilters(skipCtx, countQ, filters) + if err := countQ.Count(&count).Error; err != nil { + results[idx] = shopResult{err: err} + return + } + } + + results[idx] = shopResult{ids: ids, count: count} + }(i, shopID) + } + + wg.Wait() + + allIDs := make([]cardIDWithTime, 0, fetchLimit) + var totalCount int64 + if hasCachedTotal { + totalCount = cachedTotal + } + + for _, r := range results { + if r.err != nil { + return nil, 0, r.err + } + allIDs = append(allIDs, r.ids...) + if !hasCachedTotal { + totalCount += r.count + } + } + + if !hasCachedTotal && totalCount > 0 { + s.cacheCount(ctx, "standalone", filters, totalCount) + } + + sort.Slice(allIDs, func(i, j int) bool { + if allIDs[i].CreatedAt.Equal(allIDs[j].CreatedAt) { + return allIDs[i].ID < allIDs[j].ID + } + return allIDs[i].CreatedAt.After(allIDs[j].CreatedAt) + }) + + offset := (opts.Page - 1) * opts.PageSize + end := offset + opts.PageSize + if offset >= len(allIDs) { + return []*model.IotCard{}, totalCount, nil + } + if end > len(allIDs) { + end = len(allIDs) + } + + pageIDs := make([]uint, 0, opts.PageSize) + for _, item := range allIDs[offset:end] { + pageIDs = append(pageIDs, item.ID) + } + + if len(pageIDs) == 0 { + return []*model.IotCard{}, totalCount, nil + } + + // Phase 2: 用 ID 精确回表获取完整数据(PK Index Scan,仅 20 行) + var cards []*model.IotCard + if err := s.db.WithContext(skipCtx).Model(&model.IotCard{}). + Select(standaloneListColumns). + Where("id IN ?", pageIDs). + Find(&cards).Error; err != nil { + return nil, 0, err + } + + // Phase 2 结果按原始排序顺序重排 + idOrder := make(map[uint]int, len(pageIDs)) + for i, id := range pageIDs { + idOrder[id] = i + } + sort.Slice(cards, func(i, j int) bool { + return idOrder[cards[i].ID] < idOrder[cards[j].ID] + }) + + logger.GetAppLogger().Debug("两阶段并行查询完成", + zap.Int("shop_count", len(shopIDs)), + zap.Int("phase1_total_ids", len(allIDs)), + zap.Int("phase2_fetched", len(cards)), + zap.Int("page", opts.Page), + zap.Int64("total_count", totalCount), + ) + + return cards, totalCount, nil +} + +// applyStandaloneFilters 应用独立卡列表的通用过滤条件 +// 注意:不包含 is_standalone、shop_id、deleted_at 条件(由调用方控制) +// 也不包含 subordinate_shop_ids(仅用于路由选择,不作为查询条件) +func (s *IotCardStore) applyStandaloneFilters(ctx context.Context, query *gorm.DB, filters map[string]any) *gorm.DB { + skipCtx := pkggorm.SkipDataPermission(ctx) if status, ok := filters["status"].(int); ok && status > 0 { query = query.Where("status = ?", status) @@ -198,6 +592,7 @@ func (s *IotCardStore) ListStandalone(ctx context.Context, opts *store.QueryOpti if carrierID, ok := filters["carrier_id"].(uint); ok && carrierID > 0 { query = query.Where("carrier_id = ?", carrierID) } + // 并行路径下 shop_id 已由调用方设置,此处仅处理显式的 shop_id 过滤 if shopID, ok := filters["shop_id"].(uint); ok && shopID > 0 { query = query.Where("shop_id = ?", shopID) } @@ -212,7 +607,7 @@ func (s *IotCardStore) ListStandalone(ctx context.Context, opts *store.QueryOpti } if packageID, ok := filters["package_id"].(uint); ok && packageID > 0 { query = query.Where("id IN (?)", - s.db.Table("tb_package_usage"). + s.db.WithContext(skipCtx).Table("tb_package_usage"). Select("iot_card_id"). Where("package_id = ? AND deleted_at IS NULL", packageID)) } @@ -232,12 +627,12 @@ func (s *IotCardStore) ListStandalone(ctx context.Context, opts *store.QueryOpti if isReplaced, ok := filters["is_replaced"].(bool); ok { if isReplaced { query = query.Where("id IN (?)", - s.db.Table("tb_card_replacement_record"). + s.db.WithContext(skipCtx).Table("tb_card_replacement_record"). Select("old_iot_card_id"). Where("deleted_at IS NULL")) } else { query = query.Where("id NOT IN (?)", - s.db.Table("tb_card_replacement_record"). + s.db.WithContext(skipCtx).Table("tb_card_replacement_record"). Select("old_iot_card_id"). Where("deleted_at IS NULL")) } @@ -246,30 +641,7 @@ func (s *IotCardStore) ListStandalone(ctx context.Context, opts *store.QueryOpti query = query.Where("series_id = ?", seriesID) } - if err := query.Count(&total).Error; err != nil { - return nil, 0, err - } - - if opts == nil { - opts = &store.QueryOptions{ - Page: 1, - PageSize: constants.DefaultPageSize, - } - } - offset := (opts.Page - 1) * opts.PageSize - query = query.Offset(offset).Limit(opts.PageSize) - - if opts.OrderBy != "" { - query = query.Order(opts.OrderBy) - } else { - query = query.Order("created_at DESC") - } - - if err := query.Find(&cards).Error; err != nil { - return nil, 0, err - } - - return cards, total, nil + return query } func (s *IotCardStore) GetByICCIDs(ctx context.Context, iccids []string) ([]*model.IotCard, error) { @@ -285,10 +657,7 @@ func (s *IotCardStore) GetByICCIDs(ctx context.Context, iccids []string) ([]*mod func (s *IotCardStore) GetStandaloneByICCIDRange(ctx context.Context, iccidStart, iccidEnd string, shopID *uint) ([]*model.IotCard, error) { query := s.db.WithContext(ctx).Model(&model.IotCard{}). - Where("id NOT IN (?)", - s.db.Model(&model.DeviceSimBinding{}). - Select("iot_card_id"). - Where("bind_status = ?", 1)). + Where("is_standalone = true"). Where("iccid >= ? AND iccid <= ?", iccidStart, iccidEnd) if shopID == nil { @@ -306,10 +675,7 @@ func (s *IotCardStore) GetStandaloneByICCIDRange(ctx context.Context, iccidStart func (s *IotCardStore) GetStandaloneByFilters(ctx context.Context, filters map[string]any, shopID *uint) ([]*model.IotCard, error) { query := s.db.WithContext(ctx).Model(&model.IotCard{}). - Where("id NOT IN (?)", - s.db.Model(&model.DeviceSimBinding{}). - Select("iot_card_id"). - Where("bind_status = ?", 1)) + Where("is_standalone = true") if shopID == nil { query = query.Where("shop_id IS NULL") @@ -365,9 +731,10 @@ func (s *IotCardStore) GetByIDsWithEnterpriseFilter(ctx context.Context, cardIDs query := s.db.WithContext(ctx).Model(&model.IotCard{}) if enterpriseID != nil && *enterpriseID > 0 { + skipCtx := pkggorm.SkipDataPermission(ctx) query = query.Where("id IN (?) AND id IN (?)", cardIDs, - s.db.Table("tb_enterprise_card_authorization"). + s.db.WithContext(skipCtx).Table("tb_enterprise_card_authorization"). Select("card_id"). Where("enterprise_id = ? AND revoked_at IS NULL AND deleted_at IS NULL", *enterpriseID)) } else { @@ -475,3 +842,43 @@ func (s *IotCardStore) BatchDelete(ctx context.Context, cardIDs []uint) error { Where("id IN ?", cardIDs). Delete(&model.IotCard{}).Error } + +// ==================== 列表计数缓存 ==================== + +func (s *IotCardStore) getCachedCount(ctx context.Context, table string, filters map[string]any) (int64, bool) { + if s.redis == nil { + return 0, false + } + key := constants.RedisListCountKey(table, middleware.GetUserIDFromContext(ctx), hashFilters(filters)) + val, err := s.redis.Get(ctx, key).Int64() + if err != nil { + return 0, false + } + return val, true +} + +func (s *IotCardStore) cacheCount(ctx context.Context, table string, filters map[string]any, total int64) { + if s.redis == nil { + return + } + key := constants.RedisListCountKey(table, middleware.GetUserIDFromContext(ctx), hashFilters(filters)) + s.redis.Set(ctx, key, total, listCountCacheTTL) +} + +func hashFilters(filters map[string]any) string { + if len(filters) == 0 { + return "0" + } + keys := make([]string, 0, len(filters)) + for k := range filters { + keys = append(keys, k) + } + sort.Strings(keys) + + h := fnv.New32a() + for _, k := range keys { + h.Write([]byte(k)) + h.Write([]byte(fmt.Sprint(filters[k]))) + } + return fmt.Sprintf("%08x", h.Sum32()) +} diff --git a/migrations/000000_create_legacy_tables.down.sql b/migrations/000000_create_legacy_tables.down.sql new file mode 100644 index 0000000..b5c42b2 --- /dev/null +++ b/migrations/000000_create_legacy_tables.down.sql @@ -0,0 +1,5 @@ +-- 回滚:删除遗留表(逆序删除,先删关联表) +DROP TABLE IF EXISTS tb_role_permission; +DROP TABLE IF EXISTS tb_role; +DROP TABLE IF EXISTS tb_permission; +DROP TABLE IF EXISTS tb_account; diff --git a/migrations/000000_create_legacy_tables.up.sql b/migrations/000000_create_legacy_tables.up.sql new file mode 100644 index 0000000..5bf3b19 --- /dev/null +++ b/migrations/000000_create_legacy_tables.up.sql @@ -0,0 +1,98 @@ +-- 创建遗留表(原由 GORM AutoMigrate 自动创建,迁移文件中缺失) +-- 这 4 张表是项目最早期通过 AutoMigrate 创建的,后续迁移 000001-000010 依赖它们的存在 +-- 本迁移还原 AutoMigrate 创建时的原始结构(不含后续 ALTER 添加的字段) + +-- ============================================================ +-- 1. 账号表(tb_account) +-- 注意:此处包含 parent_id 字段,该字段在 000002 迁移中被 DROP +-- 注意:enterprise_id 由 000002 添加,is_primary 由 000010 添加,此处不包含 +-- ============================================================ +CREATE TABLE IF NOT EXISTS tb_account ( + id BIGSERIAL PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE, + updated_at TIMESTAMP WITH TIME ZONE, + deleted_at TIMESTAMP WITH TIME ZONE, + creator BIGINT NOT NULL, + updater BIGINT NOT NULL, + username VARCHAR(50) NOT NULL, + phone VARCHAR(20) NOT NULL, + password VARCHAR(255) NOT NULL, + user_type BIGINT NOT NULL, + shop_id BIGINT, + parent_id BIGINT, + status BIGINT NOT NULL DEFAULT 1 +); + +-- 索引(与 GORM AutoMigrate 生成的一致) +CREATE UNIQUE INDEX IF NOT EXISTS idx_account_username ON tb_account(username) WHERE deleted_at IS NULL; +CREATE UNIQUE INDEX IF NOT EXISTS idx_account_phone ON tb_account(phone) WHERE deleted_at IS NULL; +CREATE INDEX IF NOT EXISTS idx_tb_account_user_type ON tb_account(user_type); +CREATE INDEX IF NOT EXISTS idx_tb_account_shop_id ON tb_account(shop_id); +CREATE INDEX IF NOT EXISTS idx_tb_account_deleted_at ON tb_account(deleted_at); + +-- ============================================================ +-- 2. 权限表(tb_permission) +-- 注意:platform 由 000003 添加,available_for_role_types 由 000009 添加,此处不包含 +-- ============================================================ +CREATE TABLE IF NOT EXISTS tb_permission ( + id BIGSERIAL PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE, + updated_at TIMESTAMP WITH TIME ZONE, + deleted_at TIMESTAMP WITH TIME ZONE, + creator BIGINT NOT NULL, + updater BIGINT NOT NULL, + perm_name VARCHAR(50) NOT NULL, + perm_code VARCHAR(100) NOT NULL, + perm_type BIGINT NOT NULL, + url VARCHAR(255), + parent_id BIGINT, + sort BIGINT NOT NULL DEFAULT 0, + status BIGINT NOT NULL DEFAULT 1 +); + +-- 索引 +CREATE UNIQUE INDEX IF NOT EXISTS idx_permission_code ON tb_permission(perm_code) WHERE deleted_at IS NULL; +CREATE INDEX IF NOT EXISTS idx_tb_permission_perm_type ON tb_permission(perm_type); +CREATE INDEX IF NOT EXISTS idx_tb_permission_parent_id ON tb_permission(parent_id); +CREATE INDEX IF NOT EXISTS idx_tb_permission_deleted_at ON tb_permission(deleted_at); + +-- ============================================================ +-- 3. 角色表(tb_role) +-- ============================================================ +CREATE TABLE IF NOT EXISTS tb_role ( + id BIGSERIAL PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE, + updated_at TIMESTAMP WITH TIME ZONE, + deleted_at TIMESTAMP WITH TIME ZONE, + creator BIGINT NOT NULL, + updater BIGINT NOT NULL, + role_name VARCHAR(50) NOT NULL, + role_desc VARCHAR(255), + role_type BIGINT NOT NULL, + status BIGINT NOT NULL DEFAULT 1 +); + +-- 索引 +CREATE INDEX IF NOT EXISTS idx_tb_role_role_type ON tb_role(role_type); +CREATE INDEX IF NOT EXISTS idx_tb_role_deleted_at ON tb_role(deleted_at); + +-- ============================================================ +-- 4. 角色权限关联表(tb_role_permission) +-- ============================================================ +CREATE TABLE IF NOT EXISTS tb_role_permission ( + id BIGSERIAL PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE, + updated_at TIMESTAMP WITH TIME ZONE, + deleted_at TIMESTAMP WITH TIME ZONE, + creator BIGINT NOT NULL, + updater BIGINT NOT NULL, + role_id BIGINT NOT NULL, + perm_id BIGINT NOT NULL, + status BIGINT NOT NULL DEFAULT 1 +); + +-- 索引 +CREATE INDEX IF NOT EXISTS idx_tb_role_permission_role_id ON tb_role_permission(role_id); +CREATE INDEX IF NOT EXISTS idx_tb_role_permission_perm_id ON tb_role_permission(perm_id); +CREATE UNIQUE INDEX IF NOT EXISTS idx_role_permission_unique ON tb_role_permission(role_id, perm_id) WHERE deleted_at IS NULL; +CREATE INDEX IF NOT EXISTS idx_tb_role_permission_deleted_at ON tb_role_permission(deleted_at); diff --git a/migrations/000017_create_enterprise_card_authorization_table.up.sql b/migrations/000017_create_enterprise_card_authorization_table.up.sql index 2f86d4a..fc52be2 100644 --- a/migrations/000017_create_enterprise_card_authorization_table.up.sql +++ b/migrations/000017_create_enterprise_card_authorization_table.up.sql @@ -1,5 +1,7 @@ --- 创建企业卡授权表 -CREATE TABLE IF NOT EXISTS tb_enterprise_card_authorization ( +-- 重建企业卡授权表(替换 000010 中的旧版结构,字段从 iot_card_id 改为 card_id,移除 shop_id/status,新增 authorizer_type/revoked_by/revoked_at/remark) +DROP TABLE IF EXISTS tb_enterprise_card_authorization; + +CREATE TABLE tb_enterprise_card_authorization ( -- 基础字段 id BIGSERIAL PRIMARY KEY, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, diff --git a/migrations/000056_add_is_standalone_to_iot_card.down.sql b/migrations/000056_add_is_standalone_to_iot_card.down.sql new file mode 100644 index 0000000..c91981e --- /dev/null +++ b/migrations/000056_add_is_standalone_to_iot_card.down.sql @@ -0,0 +1,9 @@ +-- 回滚:删除 is_standalone 列和相关触发器 + +DROP TRIGGER IF EXISTS trg_device_sim_binding_soft_delete ON tb_device_sim_binding; +DROP TRIGGER IF EXISTS trg_device_sim_binding_standalone ON tb_device_sim_binding; +DROP FUNCTION IF EXISTS fn_update_card_standalone_on_soft_delete(); +DROP FUNCTION IF EXISTS fn_update_card_standalone_on_bind(); +DROP INDEX IF EXISTS idx_iot_card_standalone_shop_status_created; +DROP INDEX IF EXISTS idx_iot_card_standalone_shop_created; +ALTER TABLE tb_iot_card DROP COLUMN IF EXISTS is_standalone; diff --git a/migrations/000056_add_is_standalone_to_iot_card.up.sql b/migrations/000056_add_is_standalone_to_iot_card.up.sql new file mode 100644 index 0000000..586f145 --- /dev/null +++ b/migrations/000056_add_is_standalone_to_iot_card.up.sql @@ -0,0 +1,79 @@ +-- 添加 is_standalone 物化列,消除 ListStandalone 查询中的 NOT EXISTS 子查询 +-- 30M 行表上 NOT EXISTS 子查询导致 9s+ 延迟,物化为布尔列后降至 <500ms + +-- 1. 添加列(默认 true,因为大多数卡未绑定设备) +ALTER TABLE tb_iot_card ADD COLUMN IF NOT EXISTS is_standalone BOOLEAN NOT NULL DEFAULT true; + +COMMENT ON COLUMN tb_iot_card.is_standalone IS '是否为独立卡(未绑定设备) true=独立卡 false=已绑定设备,由触发器自动维护'; + +-- 2. 回填数据:将已绑定设备的卡标记为 false +UPDATE tb_iot_card SET is_standalone = false +WHERE id IN ( + SELECT DISTINCT iot_card_id FROM tb_device_sim_binding + WHERE bind_status = 1 AND deleted_at IS NULL +); + +-- 3. 创建部分索引(仅包含独立卡 + 未删除的行) +CREATE INDEX IF NOT EXISTS idx_iot_card_standalone_shop_created +ON tb_iot_card (shop_id, created_at DESC) +WHERE deleted_at IS NULL AND is_standalone = true; + +CREATE INDEX IF NOT EXISTS idx_iot_card_standalone_shop_status_created +ON tb_iot_card (shop_id, status, created_at DESC) +WHERE deleted_at IS NULL AND is_standalone = true; + +-- 4. 创建触发器函数:绑定时设置 is_standalone = false +CREATE OR REPLACE FUNCTION fn_update_card_standalone_on_bind() +RETURNS TRIGGER AS $$ +BEGIN + -- INSERT 或 bind_status 变为 1(绑定) + IF (TG_OP = 'INSERT' AND NEW.bind_status = 1) OR + (TG_OP = 'UPDATE' AND NEW.bind_status = 1 AND (OLD.bind_status IS DISTINCT FROM 1)) THEN + UPDATE tb_iot_card SET is_standalone = false WHERE id = NEW.iot_card_id; + END IF; + + -- bind_status 从 1 变为其他值(解绑),检查是否还有其他活跃绑定 + IF TG_OP = 'UPDATE' AND OLD.bind_status = 1 AND NEW.bind_status != 1 THEN + IF NOT EXISTS ( + SELECT 1 FROM tb_device_sim_binding + WHERE iot_card_id = NEW.iot_card_id AND bind_status = 1 + AND deleted_at IS NULL AND id != NEW.id + ) THEN + UPDATE tb_iot_card SET is_standalone = true WHERE id = NEW.iot_card_id; + END IF; + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- 5. 绑定表的触发器 +CREATE TRIGGER trg_device_sim_binding_standalone +AFTER INSERT OR UPDATE OF bind_status ON tb_device_sim_binding +FOR EACH ROW +EXECUTE FUNCTION fn_update_card_standalone_on_bind(); + +-- 6. 软删除触发器:绑定记录被软删除时恢复独立状态 +CREATE OR REPLACE FUNCTION fn_update_card_standalone_on_soft_delete() +RETURNS TRIGGER AS $$ +BEGIN + IF OLD.deleted_at IS NULL AND NEW.deleted_at IS NOT NULL AND OLD.bind_status = 1 THEN + IF NOT EXISTS ( + SELECT 1 FROM tb_device_sim_binding + WHERE iot_card_id = NEW.iot_card_id AND bind_status = 1 + AND deleted_at IS NULL AND id != NEW.id + ) THEN + UPDATE tb_iot_card SET is_standalone = true WHERE id = NEW.iot_card_id; + END IF; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER trg_device_sim_binding_soft_delete +AFTER UPDATE OF deleted_at ON tb_device_sim_binding +FOR EACH ROW +EXECUTE FUNCTION fn_update_card_standalone_on_soft_delete(); + +-- 7. 更新统计信息 +ANALYZE tb_iot_card; diff --git a/migrations/000057_optimize_iot_card_indexes.down.sql b/migrations/000057_optimize_iot_card_indexes.down.sql new file mode 100644 index 0000000..fa90ff8 --- /dev/null +++ b/migrations/000057_optimize_iot_card_indexes.down.sql @@ -0,0 +1,8 @@ +-- 回滚:恢复全局索引,删除 carrier 复合索引 +DROP INDEX IF EXISTS idx_iot_card_standalone_shop_carrier_created; + +CREATE INDEX IF NOT EXISTS idx_iot_card_global_created_at +ON tb_iot_card (created_at DESC) +WHERE deleted_at IS NULL; + +ANALYZE tb_iot_card; diff --git a/migrations/000057_optimize_iot_card_indexes.up.sql b/migrations/000057_optimize_iot_card_indexes.up.sql new file mode 100644 index 0000000..33d557f --- /dev/null +++ b/migrations/000057_optimize_iot_card_indexes.up.sql @@ -0,0 +1,14 @@ +-- 优化 tb_iot_card 索引:修复 PG 优化器在 shop_id IN 场景下选错执行计划的问题 +-- 问题:idx_iot_card_global_created_at 导致优化器选择全表扫描+Filter,而不是更高效的 partial 索引 +-- 30M 行场景下 status+carrier_id 过滤查询从 4.5s 降至 <200ms + +-- 1. 删除全局 created_at 索引(诱导优化器走错计划的元凶) +DROP INDEX IF EXISTS idx_iot_card_global_created_at; + +-- 2. 新建 carrier_id 复合 partial 索引(覆盖运营商+排序场景) +CREATE INDEX IF NOT EXISTS idx_iot_card_standalone_shop_carrier_created +ON tb_iot_card (shop_id, carrier_id, created_at DESC) +WHERE deleted_at IS NULL AND is_standalone = true; + +-- 3. 更新统计信息 +ANALYZE tb_iot_card; diff --git a/migrations/000058_add_covering_index_for_deep_pagination.down.sql b/migrations/000058_add_covering_index_for_deep_pagination.down.sql new file mode 100644 index 0000000..e4062a7 --- /dev/null +++ b/migrations/000058_add_covering_index_for_deep_pagination.down.sql @@ -0,0 +1,4 @@ +-- 回滚:删除深度分页覆盖索引 +DROP INDEX IF EXISTS idx_iot_card_standalone_shop_created_id; + +ANALYZE tb_iot_card; diff --git a/migrations/000058_add_covering_index_for_deep_pagination.up.sql b/migrations/000058_add_covering_index_for_deep_pagination.up.sql new file mode 100644 index 0000000..eecd6a6 --- /dev/null +++ b/migrations/000058_add_covering_index_for_deep_pagination.up.sql @@ -0,0 +1,15 @@ +-- 新增覆盖索引用于深度分页优化(两阶段延迟 Join) +-- 问题:深度分页(page >= 50)需要 OFFSET 数千行,SELECT * 读取大量宽行数据(~2KB/行) +-- 方案:Phase 1 仅扫描覆盖索引获取 ID,Phase 2 用 ID 批量回表取完整数据 +-- 预期:page 500 从 5.6s → <500ms +-- +-- 生产环境建议手动执行带 CONCURRENTLY 的版本避免锁表: +-- CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_iot_card_standalone_shop_created_id +-- ON tb_iot_card (shop_id, created_at DESC, id) +-- WHERE deleted_at IS NULL AND is_standalone = true; + +CREATE INDEX IF NOT EXISTS idx_iot_card_standalone_shop_created_id +ON tb_iot_card (shop_id, created_at DESC, id) +WHERE deleted_at IS NULL AND is_standalone = true; + +ANALYZE tb_iot_card; diff --git a/opencode.json b/opencode.json index 7ac1320..d72658e 100644 --- a/opencode.json +++ b/opencode.json @@ -34,7 +34,7 @@ "--access-mode=restricted" ], "environment": { - "DATABASE_URI": "postgresql://erp_pgsql:erp_2025@cxd.whcxd.cn:16159/junhong_cmp_test?sslmode=disable" + "DATABASE_URI": "postgresql://erp_pgsql:erp_2025@cxd.whcxd.cn:16289/junhong_cmp_test?sslmode=disable" } } } diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 2220d0e..7e6d4d4 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -36,7 +36,7 @@ const ( DefaultConnMaxLifetime = 5 * time.Minute DefaultPageSize = 20 MaxPageSize = 100 - SlowQueryThreshold = 100 * time.Millisecond + SlowQueryThreshold = 500 * time.Millisecond ) // 任务类型常量 diff --git a/pkg/constants/redis.go b/pkg/constants/redis.go index 37d0454..f6d6f5d 100644 --- a/pkg/constants/redis.go +++ b/pkg/constants/redis.go @@ -257,6 +257,17 @@ func RedisPackageActivationLockKey(carrierType string, carrierID uint) string { return fmt.Sprintf("package:activation:lock:%s:%d", carrierType, carrierID) } +// ======================================== +// 列表计数缓存 Redis Key +// ======================================== + +// RedisListCountKey 列表查询计数缓存键 +// 用途:缓存分页列表的 COUNT(*) 结果,避免每次翻页重复全表计数 +// 过期时间:30 秒 +func RedisListCountKey(table string, userID uint, filterHash string) string { + return fmt.Sprintf("list_count:%s:%d:%s", table, userID, filterHash) +} + // ======================================== // 订单幂等性相关 Redis Key // ======================================== diff --git a/scripts/perf_query/bench.go b/scripts/perf_query/bench.go new file mode 100644 index 0000000..fef731d --- /dev/null +++ b/scripts/perf_query/bench.go @@ -0,0 +1,342 @@ +//go:build ignore + +// IoT 卡分页查询性能基准测试 +// 用法: +// go run ./scripts/perf_query/bench.go +// go run ./scripts/perf_query/bench.go -base-url http://localhost:3000 -n 20 + +package main + +import ( + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "sort" + "strings" + "time" +) + +var ( + baseURL = flag.String("base-url", "", "API 服务地址 (默认读取 JUNHONG_SERVER_ADDRESS 环境变量)") + username = flag.String("username", "perf_test_agent", "登录用户名") + password = flag.String("password", "PerfTest@123456", "登录密码") + n = flag.Int("n", 20, "每个场景执行次数") + warmup = flag.Int("warmup", 3, "预热请求次数") +) + +type apiResponse struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data json.RawMessage `json:"data"` +} + +type loginData struct { + AccessToken string `json:"access_token"` +} + +type listData struct { + Total int64 `json:"total"` + Page int `json:"page"` + PageSize int `json:"page_size"` +} + +type scenario struct { + Name string + Path string + Params string +} + +func main() { + flag.Parse() + + if *baseURL == "" { + addr := os.Getenv("JUNHONG_SERVER_ADDRESS") + if addr == "" { + addr = ":3000" + } + if strings.HasPrefix(addr, ":") { + addr = "http://localhost" + addr + } + *baseURL = addr + } + + fmt.Println("╔══════════════════════════════════════════════════════════════╗") + fmt.Println("║ IoT 卡分页查询性能基准测试 ║") + fmt.Println("╚══════════════════════════════════════════════════════════════╝") + fmt.Printf(" 服务地址: %s\n", *baseURL) + fmt.Printf(" 用户: %s\n", *username) + fmt.Printf(" 每场景次数: %d (预热 %d 次)\n", *n, *warmup) + fmt.Println() + + token := login() + fmt.Printf("✅ 登录成功 (token: %s...)\n\n", token[:20]) + + scenarios := []scenario{ + { + Name: "A: 无过滤分页 (第1页)", + Path: "/api/admin/iot-cards/standalone", + Params: "page=1&page_size=20", + }, + { + Name: "B: status+carrier 过滤", + Path: "/api/admin/iot-cards/standalone", + Params: "page=1&page_size=20&status=3&carrier_id=2", + }, + { + Name: "C: 深分页 (第500页)", + Path: "/api/admin/iot-cards/standalone", + Params: "page=500&page_size=20", + }, + { + Name: "D: ICCID 模糊搜索", + Path: "/api/admin/iot-cards/standalone", + Params: "page=1&page_size=20&iccid=12345", + }, + { + Name: "E: 批次号精确过滤", + Path: "/api/admin/iot-cards/standalone", + Params: "page=1&page_size=20&batch_no=PERF-TEST-30M", + }, + { + Name: "F: 运营商+状态+分页", + Path: "/api/admin/iot-cards/standalone", + Params: "page=10&page_size=50&status=1&carrier_id=1", + }, + } + + results := make([]benchResult, 0, len(scenarios)) + + for _, sc := range scenarios { + r := runScenario(token, sc) + results = append(results, r) + } + + printSummary(results) +} + +type benchResult struct { + Name string + Total int64 + Timings []time.Duration + Min time.Duration + Max time.Duration + Avg time.Duration + P50 time.Duration + P95 time.Duration + P99 time.Duration + Pass bool + ErrorMsg string +} + +func runScenario(token string, sc scenario) benchResult { + fmt.Printf("🔄 %s\n", sc.Name) + fmt.Printf(" URL: %s?%s\n", sc.Path, sc.Params) + + url := fmt.Sprintf("%s%s?%s", *baseURL, sc.Path, sc.Params) + + result := benchResult{Name: sc.Name} + + for i := 0; i < *warmup; i++ { + dur, _, err := doRequest(url, token) + if err != nil { + result.ErrorMsg = fmt.Sprintf("预热失败: %v", err) + fmt.Printf(" ❌ %s\n\n", result.ErrorMsg) + return result + } + fmt.Printf(" 预热 %d/%d: %v\n", i+1, *warmup, dur.Round(time.Millisecond)) + } + + timings := make([]time.Duration, 0, *n) + var totalCount int64 + + for i := 0; i < *n; i++ { + dur, data, err := doRequest(url, token) + if err != nil { + result.ErrorMsg = fmt.Sprintf("第 %d 次请求失败: %v", i+1, err) + fmt.Printf(" ❌ %s\n\n", result.ErrorMsg) + return result + } + timings = append(timings, dur) + if i == 0 { + totalCount = data.Total + } + if (i+1)%5 == 0 || i == 0 { + fmt.Printf(" [%d/%d] %v\n", i+1, *n, dur.Round(time.Millisecond)) + } + } + + result.Total = totalCount + result.Timings = timings + result.Pass = true + calcStats(&result) + + passStr := "✅" + if result.P95 > 1500*time.Millisecond { + passStr = "❌" + result.Pass = false + } else if result.P95 > 1000*time.Millisecond { + passStr = "⚠️" + } + + fmt.Printf(" 匹配行数: %d\n", result.Total) + fmt.Printf(" %s P50=%v P95=%v P99=%v (min=%v max=%v)\n\n", + passStr, result.P50, result.P95, result.P99, result.Min, result.Max) + + return result +} + +func doRequest(url, token string) (time.Duration, *listData, error) { + req, _ := http.NewRequest("GET", url, nil) + req.Header.Set("Authorization", "Bearer "+token) + + start := time.Now() + resp, err := http.DefaultClient.Do(req) + dur := time.Since(start) + + if err != nil { + return dur, nil, fmt.Errorf("HTTP 请求失败: %w", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + + if resp.StatusCode != 200 { + return dur, nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body[:min(len(body), 200)])) + } + + var apiResp apiResponse + if err := json.Unmarshal(body, &apiResp); err != nil { + return dur, nil, fmt.Errorf("JSON 解析失败: %w", err) + } + + if apiResp.Code != 0 { + return dur, nil, fmt.Errorf("API 错误 code=%d: %s", apiResp.Code, apiResp.Msg) + } + + var data listData + json.Unmarshal(apiResp.Data, &data) + + return dur, &data, nil +} + +func login() string { + url := *baseURL + "/api/auth/login" + payload := fmt.Sprintf(`{"username":"%s","password":"%s","device":"web"}`, *username, *password) + + resp, err := http.Post(url, "application/json", strings.NewReader(payload)) + if err != nil { + log.Fatalf("❌ 登录请求失败: %v", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + + if resp.StatusCode != 200 { + log.Fatalf("❌ 登录 HTTP %d: %s", resp.StatusCode, string(body[:min(len(body), 500)])) + } + + var apiResp apiResponse + if err := json.Unmarshal(body, &apiResp); err != nil { + log.Fatalf("❌ 登录响应解析失败: %v", err) + } + + if apiResp.Code != 0 { + log.Fatalf("❌ 登录失败 code=%d: %s", apiResp.Code, apiResp.Msg) + } + + var data loginData + if err := json.Unmarshal(apiResp.Data, &data); err != nil { + log.Fatalf("❌ 解析 token 失败: %v", err) + } + + if data.AccessToken == "" { + log.Fatalf("❌ 登录成功但 token 为空") + } + + return data.AccessToken +} + +func calcStats(r *benchResult) { + sorted := make([]time.Duration, len(r.Timings)) + copy(sorted, r.Timings) + sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] }) + + var sum time.Duration + for _, d := range sorted { + sum += d + } + + n := len(sorted) + r.Min = sorted[0] + r.Max = sorted[n-1] + r.Avg = sum / time.Duration(n) + r.P50 = percentile(sorted, 50) + r.P95 = percentile(sorted, 95) + r.P99 = percentile(sorted, 99) +} + +func percentile(sorted []time.Duration, pct int) time.Duration { + if len(sorted) == 0 { + return 0 + } + idx := (pct * len(sorted)) / 100 + if idx >= len(sorted) { + idx = len(sorted) - 1 + } + return sorted[idx] +} + +func printSummary(results []benchResult) { + fmt.Println("╔══════════════════════════════════════════════════════════════════════════════╗") + fmt.Println("║ 测试结果汇总 ║") + fmt.Println("╠══════════════════════════════════════════════════════════════════════════════╣") + fmt.Printf("║ %-32s │ %8s │ %8s │ %8s │ %8s ║\n", "场景", "匹配行数", "P50", "P95", "结果") + fmt.Println("╠══════════════════════════════════════════════════════════════════════════════╣") + + allPass := true + for _, r := range results { + status := "✅ PASS" + if r.ErrorMsg != "" { + status = "❌ ERROR" + allPass = false + } else if !r.Pass { + status = "❌ FAIL" + allPass = false + } else if r.P95 > 1000*time.Millisecond { + status = "⚠️ SLOW" + } + + p50Str := "-" + p95Str := "-" + totalStr := "-" + if r.ErrorMsg == "" { + p50Str = r.P50.Round(time.Millisecond).String() + p95Str = r.P95.Round(time.Millisecond).String() + totalStr = fmt.Sprintf("%d", r.Total) + } + + fmt.Printf("║ %-32s │ %8s │ %8s │ %8s │ %8s ║\n", + r.Name, totalStr, p50Str, p95Str, status) + } + + fmt.Println("╚══════════════════════════════════════════════════════════════════════════════╝") + + if allPass { + fmt.Println("\n🎉 所有场景 P95 < 1.5s,满足性能要求!") + } else { + fmt.Println("\n⚠️ 部分场景未达标,建议:") + fmt.Println(" 1. 运行 go run ./scripts/perf_query/seed.go -action add-index 创建优化索引") + fmt.Println(" 2. 重新运行本测试对比效果") + } +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/scripts/perf_query/deploy-perf-pg.sh b/scripts/perf_query/deploy-perf-pg.sh new file mode 100755 index 0000000..056dfef --- /dev/null +++ b/scripts/perf_query/deploy-perf-pg.sh @@ -0,0 +1,85 @@ +#!/bin/bash +set -e + +DATA_DIR="/mnt/data1/database/pgsql/pgdata_perf" +CONTAINER="postgres-perf-test" + +echo "==========================================" +echo " 性能测试 PostgreSQL 容器部署" +echo "==========================================" + +if [ -d "$DATA_DIR" ] && [ "$(ls -A $DATA_DIR 2>/dev/null)" ]; then + echo "⚠️ 数据目录已存在" + read -p "清空重建?(y/N): " confirm + if [ "$confirm" = "y" ] || [ "$confirm" = "Y" ]; then + docker stop $CONTAINER 2>/dev/null || true + docker rm $CONTAINER 2>/dev/null || true + sudo rm -rf "$DATA_DIR" + fi +fi + +sudo mkdir -p "$DATA_DIR" +sudo chown -R 999:999 "$DATA_DIR" + +# 第一步:用默认配置启动,让 PG 完成初始化 +echo "启动容器(默认配置初始化)..." +docker compose -f docker-compose-perf.yml up -d + +echo "等待初始化完成..." +for i in $(seq 1 30); do + if docker exec $CONTAINER pg_isready -U erp_pgsql -d junhong_cmp_test > /dev/null 2>&1; then + echo "✅ 初始化完成" + break + fi + [ $i -eq 30 ] && echo "❌ 超时" && docker logs $CONTAINER && exit 1 + sleep 2 +done + +# 第二步:把优化后的配置覆盖到数据目录 +echo "注入优化配置..." +docker cp postgresql-perf.conf $CONTAINER:/var/lib/postgresql/data/postgresql.conf + +# 第三步:重启让新配置生效 +echo "重启容器..." +docker restart $CONTAINER + +echo "等待重启完成..." +for i in $(seq 1 30); do + if docker exec $CONTAINER pg_isready -U erp_pgsql -d junhong_cmp_test > /dev/null 2>&1; then + echo "✅ 重启完成" + break + fi + [ $i -eq 30 ] && echo "❌ 超时" && docker logs --tail 20 $CONTAINER && exit 1 + sleep 2 +done + +# 第四步:启用扩展 +echo "启用扩展..." +docker exec $CONTAINER psql -U erp_pgsql -d junhong_cmp_test -c "CREATE EXTENSION IF NOT EXISTS pg_stat_statements;" 2>/dev/null || echo "pg_stat_statements 需要 shared_preload_libraries,跳过" +docker exec $CONTAINER psql -U erp_pgsql -d junhong_cmp_test -c "CREATE EXTENSION IF NOT EXISTS pg_trgm;" + +# 验证 +echo "" +echo "==========================================" +echo " 配置验证" +echo "==========================================" +docker exec $CONTAINER psql -U erp_pgsql -d junhong_cmp_test -c " +SELECT name, setting, unit +FROM pg_settings +WHERE name IN ( + 'shared_buffers', 'effective_cache_size', 'work_mem', + 'random_page_cost', 'effective_io_concurrency', + 'max_parallel_workers_per_gather', 'maintenance_work_mem', + 'max_connections', 'autovacuum_vacuum_scale_factor' +) +ORDER BY name; +" + +echo "" +echo "==========================================" +echo " 部署完成" +echo "==========================================" +echo " Port: 16289" +echo " Database: junhong_cmp_test" +echo " User: erp_pgsql / erp_2025" +echo "" diff --git a/scripts/perf_query/docker-compose-perf.yml b/scripts/perf_query/docker-compose-perf.yml new file mode 100644 index 0000000..5f2bcdb --- /dev/null +++ b/scripts/perf_query/docker-compose-perf.yml @@ -0,0 +1,21 @@ +version: '3.8' + +services: + postgres-perf: + image: postgres:latest + container_name: postgres-perf-test + restart: unless-stopped + ports: + - "16289:5432" + environment: + POSTGRES_USER: erp_pgsql + POSTGRES_PASSWORD: erp_2025 + POSTGRES_DB: junhong_cmp_test + TZ: Asia/Shanghai + volumes: + - /mnt/data1/database/pgsql/pgdata_perf:/var/lib/postgresql/data + shm_size: '18g' + deploy: + resources: + limits: + memory: 32G diff --git a/scripts/perf_query/postgresql-perf.conf b/scripts/perf_query/postgresql-perf.conf new file mode 100644 index 0000000..fa82115 --- /dev/null +++ b/scripts/perf_query/postgresql-perf.conf @@ -0,0 +1,840 @@ +# ----------------------------- +# PostgreSQL configuration file +# ----------------------------- +# +# This file consists of lines of the form: +# +# name = value +# +# (The "=" is optional.) Whitespace may be used. Comments are introduced with +# "#" anywhere on a line. The complete list of parameter names and allowed +# values can be found in the PostgreSQL documentation. +# +# The commented-out settings shown in this file represent the default values. +# Re-commenting a setting is NOT sufficient to revert it to the default value; +# you need to reload the server. +# +# This file is read on server startup and when the server receives a SIGHUP +# signal. If you edit the file on a running system, you have to SIGHUP the +# server for the changes to take effect, run "pg_ctl reload", or execute +# "SELECT pg_reload_conf()". Some parameters, which are marked below, +# require a server shutdown and restart to take effect. +# +# Any parameter can also be given as a command-line option to the server, e.g., +# "postgres -c log_connections=on". Some parameters can be changed at run time +# with the "SET" SQL command. +# +# Memory units: B = bytes Time units: us = microseconds +# kB = kilobytes ms = milliseconds +# MB = megabytes s = seconds +# GB = gigabytes min = minutes +# TB = terabytes h = hours +# d = days + + +#------------------------------------------------------------------------------ +# FILE LOCATIONS +#------------------------------------------------------------------------------ + +# The default values of these variables are driven from the -D command-line +# option or PGDATA environment variable, represented here as ConfigDir. + +#data_directory = 'ConfigDir' # use data in another directory + # (change requires restart) +#hba_file = 'ConfigDir/pg_hba.conf' # host-based authentication file + # (change requires restart) +#ident_file = 'ConfigDir/pg_ident.conf' # ident configuration file + # (change requires restart) + +# If external_pid_file is not explicitly set, no extra PID file is written. +#external_pid_file = '' # write an extra PID file + # (change requires restart) + + +#------------------------------------------------------------------------------ +# CONNECTIONS AND AUTHENTICATION +#------------------------------------------------------------------------------ + +# - Connection Settings - + +listen_addresses = '*' + # comma-separated list of addresses; + # defaults to 'localhost'; use '*' for all + # (change requires restart) +#port = 5432 # (change requires restart) +max_connections = 200 # (change requires restart) +#reserved_connections = 0 # (change requires restart) +#superuser_reserved_connections = 3 # (change requires restart) +#unix_socket_directories = '/var/run/postgresql' # comma-separated list of directories + # (change requires restart) +#unix_socket_group = '' # (change requires restart) +#unix_socket_permissions = 0777 # begin with 0 to use octal notation + # (change requires restart) +#bonjour = off # advertise server via Bonjour + # (change requires restart) +#bonjour_name = '' # defaults to the computer name + # (change requires restart) + +# - TCP settings - +# see "man tcp" for details + +#tcp_keepalives_idle = 0 # TCP_KEEPIDLE, in seconds; + # 0 selects the system default +#tcp_keepalives_interval = 0 # TCP_KEEPINTVL, in seconds; + # 0 selects the system default +#tcp_keepalives_count = 0 # TCP_KEEPCNT; + # 0 selects the system default +#tcp_user_timeout = 0 # TCP_USER_TIMEOUT, in milliseconds; + # 0 selects the system default + +#client_connection_check_interval = 0 # time between checks for client + # disconnection while running queries; + # 0 for never + +# - Authentication - + +#authentication_timeout = 1min # 1s-600s +#password_encryption = scram-sha-256 # scram-sha-256 or md5 +#scram_iterations = 4096 + +# GSSAPI using Kerberos +#krb_server_keyfile = 'FILE:${sysconfdir}/krb5.keytab' +#krb_caseins_users = off +#gss_accept_delegation = off + +# - SSL - + +#ssl = off +#ssl_ca_file = '' +#ssl_cert_file = 'server.crt' +#ssl_crl_file = '' +#ssl_crl_dir = '' +#ssl_key_file = 'server.key' +#ssl_ciphers = 'HIGH:MEDIUM:+3DES:!aNULL' # allowed SSL ciphers +#ssl_prefer_server_ciphers = on +#ssl_ecdh_curve = 'prime256v1' +#ssl_min_protocol_version = 'TLSv1.2' +#ssl_max_protocol_version = '' +#ssl_dh_params_file = '' +#ssl_passphrase_command = '' +#ssl_passphrase_command_supports_reload = off + + +#------------------------------------------------------------------------------ +# RESOURCE USAGE (except WAL) +#------------------------------------------------------------------------------ + +# - Memory - + +shared_buffers = 16GB # 服务器 125GB RAM,取 ~13%(保守值,与其他服务共存) + # (change requires restart) +huge_pages = try # 16GB shared_buffers 建议开启大页,减少 TLB miss + # (change requires restart) +#huge_page_size = 0 # zero for system default + # (change requires restart) +#temp_buffers = 8MB # min 800kB +#max_prepared_transactions = 0 # zero disables the feature + # (change requires restart) +# Caution: it is not advisable to set max_prepared_transactions nonzero unless +# you actively intend to use prepared transactions. +work_mem = 128MB # 排序/哈希在内存完成,避免溢出磁盘(默认 4MB 太小) +#hash_mem_multiplier = 2.0 # 1-1000.0 multiplier on hash table work_mem +maintenance_work_mem = 2GB # VACUUM/CREATE INDEX 加速(默认 64MB 太小) +#autovacuum_work_mem = -1 # min 64kB, or -1 to use maintenance_work_mem +#logical_decoding_work_mem = 64MB # min 64kB +#max_stack_depth = 2MB # min 100kB +#shared_memory_type = mmap # the default is the first option + # supported by the operating system: + # mmap + # sysv + # windows + # (change requires restart) +dynamic_shared_memory_type = posix # the default is usually the first option + # supported by the operating system: + # posix + # sysv + # windows + # mmap + # (change requires restart) +#min_dynamic_shared_memory = 0MB # (change requires restart) +#vacuum_buffer_usage_limit = 2MB # size of vacuum and analyze buffer access strategy ring; + # 0 to disable vacuum buffer access strategy; + # range 128kB to 16GB + +# SLRU buffers (change requires restart) +#commit_timestamp_buffers = 0 # memory for pg_commit_ts (0 = auto) +#multixact_offset_buffers = 16 # memory for pg_multixact/offsets +#multixact_member_buffers = 32 # memory for pg_multixact/members +#notify_buffers = 16 # memory for pg_notify +#serializable_buffers = 32 # memory for pg_serial +#subtransaction_buffers = 0 # memory for pg_subtrans (0 = auto) +#transaction_buffers = 0 # memory for pg_xact (0 = auto) + +# - Disk - + +#temp_file_limit = -1 # limits per-process temp file space + # in kilobytes, or -1 for no limit + +#max_notify_queue_pages = 1048576 # limits the number of SLRU pages allocated + # for NOTIFY / LISTEN queue + +# - Kernel Resources - + +#max_files_per_process = 1000 # min 64 + # (change requires restart) + +# - Cost-Based Vacuum Delay - + +#vacuum_cost_delay = 0 # 0-100 milliseconds (0 disables) +#vacuum_cost_page_hit = 1 # 0-10000 credits +#vacuum_cost_page_miss = 2 # 0-10000 credits +#vacuum_cost_page_dirty = 20 # 0-10000 credits +#vacuum_cost_limit = 200 # 1-10000 credits + +# - Background Writer - + +#bgwriter_delay = 200ms # 10-10000ms between rounds +#bgwriter_lru_maxpages = 100 # max buffers written/round, 0 disables +#bgwriter_lru_multiplier = 2.0 # 0-10.0 multiplier on buffers scanned/round +#bgwriter_flush_after = 512kB # measured in pages, 0 disables + +# - Asynchronous Behavior - + +#backend_flush_after = 0 # measured in pages, 0 disables +effective_io_concurrency = 4 # HDD + RAID 控制器(MR9362-8i),支持少量并发预读 +maintenance_io_concurrency = 4 # VACUUM 预读并发,与 effective_io_concurrency 一致 +#io_combine_limit = 128kB # usually 1-32 blocks (depends on OS) +#max_worker_processes = 8 # (change requires restart) +max_parallel_workers_per_gather = 4 # 大表查询多用几个核并行扫描 +max_parallel_maintenance_workers = 4 # VACUUM/CREATE INDEX 并行加速 +max_parallel_workers = 8 # number of max_worker_processes that + # can be used in parallel operations +#parallel_leader_participation = on + + +#------------------------------------------------------------------------------ +# WRITE-AHEAD LOG +#------------------------------------------------------------------------------ + +# - Settings - + +#wal_level = replica # minimal, replica, or logical + # (change requires restart) +#fsync = on # flush data to disk for crash safety + # (turning this off can cause + # unrecoverable data corruption) +#synchronous_commit = on # synchronization level; + # off, local, remote_write, remote_apply, or on +#wal_sync_method = fsync # the default is the first option + # supported by the operating system: + # open_datasync + # fdatasync (default on Linux and FreeBSD) + # fsync + # fsync_writethrough + # open_sync +#full_page_writes = on # recover from partial page writes +#wal_log_hints = off # also do full page writes of non-critical updates + # (change requires restart) +#wal_compression = off # enables compression of full-page writes; + # off, pglz, lz4, zstd, or on +#wal_init_zero = on # zero-fill new WAL files +#wal_recycle = on # recycle WAL files +#wal_buffers = -1 # min 32kB, -1 sets based on shared_buffers + # (change requires restart) +#wal_writer_delay = 200ms # 1-10000 milliseconds +#wal_writer_flush_after = 1MB # measured in pages, 0 disables +#wal_skip_threshold = 2MB + +#commit_delay = 0 # range 0-100000, in microseconds +#commit_siblings = 5 # range 1-1000 + +# - Checkpoints - + +checkpoint_timeout = 15min # 拉长 checkpoint 间隔,减少写 I/O +checkpoint_completion_target = 0.9 # checkpoint target duration, 0.0 - 1.0 +#checkpoint_flush_after = 256kB # measured in pages, 0 disables +#checkpoint_warning = 30s # 0 disables +max_wal_size = 4GB # 减少 checkpoint 频率,降低 I/O 尖峰 +min_wal_size = 1GB + +# - Prefetching during recovery - + +#recovery_prefetch = try # prefetch pages referenced in the WAL? +#wal_decode_buffer_size = 512kB # lookahead window used for prefetching + # (change requires restart) + +# - Archiving - + +#archive_mode = off # enables archiving; off, on, or always + # (change requires restart) +#archive_library = '' # library to use to archive a WAL file + # (empty string indicates archive_command should + # be used) +#archive_command = '' # command to use to archive a WAL file + # placeholders: %p = path of file to archive + # %f = file name only + # e.g. 'test ! -f /mnt/server/archivedir/%f && cp %p /mnt/server/archivedir/%f' +#archive_timeout = 0 # force a WAL file switch after this + # number of seconds; 0 disables + +# - Archive Recovery - + +# These are only used in recovery mode. + +#restore_command = '' # command to use to restore an archived WAL file + # placeholders: %p = path of file to restore + # %f = file name only + # e.g. 'cp /mnt/server/archivedir/%f %p' +#archive_cleanup_command = '' # command to execute at every restartpoint +#recovery_end_command = '' # command to execute at completion of recovery + +# - Recovery Target - + +# Set these only when performing a targeted recovery. + +#recovery_target = '' # 'immediate' to end recovery as soon as a + # consistent state is reached + # (change requires restart) +#recovery_target_name = '' # the named restore point to which recovery will proceed + # (change requires restart) +#recovery_target_time = '' # the time stamp up to which recovery will proceed + # (change requires restart) +#recovery_target_xid = '' # the transaction ID up to which recovery will proceed + # (change requires restart) +#recovery_target_lsn = '' # the WAL LSN up to which recovery will proceed + # (change requires restart) +#recovery_target_inclusive = on # Specifies whether to stop: + # just after the specified recovery target (on) + # just before the recovery target (off) + # (change requires restart) +#recovery_target_timeline = 'latest' # 'current', 'latest', or timeline ID + # (change requires restart) +#recovery_target_action = 'pause' # 'pause', 'promote', 'shutdown' + # (change requires restart) + +# - WAL Summarization - + +#summarize_wal = off # run WAL summarizer process? +#wal_summary_keep_time = '10d' # when to remove old summary files, 0 = never + + +#------------------------------------------------------------------------------ +# REPLICATION +#------------------------------------------------------------------------------ + +# - Sending Servers - + +# Set these on the primary and on any standby that will send replication data. + +#max_wal_senders = 10 # max number of walsender processes + # (change requires restart) +#max_replication_slots = 10 # max number of replication slots + # (change requires restart) +#wal_keep_size = 0 # in megabytes; 0 disables +#max_slot_wal_keep_size = -1 # in megabytes; -1 disables +#wal_sender_timeout = 60s # in milliseconds; 0 disables +#track_commit_timestamp = off # collect timestamp of transaction commit + # (change requires restart) + +# - Primary Server - + +# These settings are ignored on a standby server. + +#synchronous_standby_names = '' # standby servers that provide sync rep + # method to choose sync standbys, number of sync standbys, + # and comma-separated list of application_name + # from standby(s); '*' = all +#synchronized_standby_slots = '' # streaming replication standby server slot + # names that logical walsender processes will wait for + +# - Standby Servers - + +# These settings are ignored on a primary server. + +#primary_conninfo = '' # connection string to sending server +#primary_slot_name = '' # replication slot on sending server +#hot_standby = on # "off" disallows queries during recovery + # (change requires restart) +#max_standby_archive_delay = 30s # max delay before canceling queries + # when reading WAL from archive; + # -1 allows indefinite delay +#max_standby_streaming_delay = 30s # max delay before canceling queries + # when reading streaming WAL; + # -1 allows indefinite delay +#wal_receiver_create_temp_slot = off # create temp slot if primary_slot_name + # is not set +#wal_receiver_status_interval = 10s # send replies at least this often + # 0 disables +#hot_standby_feedback = off # send info from standby to prevent + # query conflicts +#wal_receiver_timeout = 60s # time that receiver waits for + # communication from primary + # in milliseconds; 0 disables +#wal_retrieve_retry_interval = 5s # time to wait before retrying to + # retrieve WAL after a failed attempt +#recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#sync_replication_slots = off # enables slot synchronization on the physical standby from the primary + +# - Subscribers - + +# These settings are ignored on a publisher. + +#max_logical_replication_workers = 4 # taken from max_worker_processes + # (change requires restart) +#max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers +#max_parallel_apply_workers_per_subscription = 2 # taken from max_logical_replication_workers + + +#------------------------------------------------------------------------------ +# QUERY TUNING +#------------------------------------------------------------------------------ + +# - Planner Method Configuration - + +#enable_async_append = on +#enable_bitmapscan = on +#enable_gathermerge = on +#enable_hashagg = on +#enable_hashjoin = on +#enable_incremental_sort = on +#enable_indexscan = on +#enable_indexonlyscan = on +#enable_material = on +#enable_memoize = on +#enable_mergejoin = on +#enable_nestloop = on +#enable_parallel_append = on +#enable_parallel_hash = on +#enable_partition_pruning = on +#enable_partitionwise_join = off +#enable_partitionwise_aggregate = off +#enable_presorted_aggregate = on +#enable_seqscan = on +#enable_sort = on +#enable_tidscan = on +#enable_group_by_reordering = on + +# - Planner Cost Constants - + +seq_page_cost = 1.0 # 顺序读成本(基准值,保持默认) +random_page_cost = 2.0 # HDD + RAID 控制器有缓存,比裸 HDD(4.0) 好 + # 降低此值让优化器更倾向使用索引扫描 +#cpu_tuple_cost = 0.01 # same scale as above +#cpu_index_tuple_cost = 0.005 # same scale as above +#cpu_operator_cost = 0.0025 # same scale as above +#parallel_setup_cost = 1000.0 # same scale as above +#parallel_tuple_cost = 0.1 # same scale as above +#min_parallel_table_scan_size = 8MB +#min_parallel_index_scan_size = 512kB +effective_cache_size = 94GB # 服务器 125GB RAM,OS 缓存 + shared_buffers ≈ 75% + +#jit_above_cost = 100000 # perform JIT compilation if available + # and query more expensive than this; + # -1 disables +#jit_inline_above_cost = 500000 # inline small functions if query is + # more expensive than this; -1 disables +#jit_optimize_above_cost = 500000 # use expensive JIT optimizations if + # query is more expensive than this; + # -1 disables + +# - Genetic Query Optimizer - + +#geqo = on +#geqo_threshold = 12 +#geqo_effort = 5 # range 1-10 +#geqo_pool_size = 0 # selects default based on effort +#geqo_generations = 0 # selects default based on effort +#geqo_selection_bias = 2.0 # range 1.5-2.0 +#geqo_seed = 0.0 # range 0.0-1.0 + +# - Other Planner Options - + +#default_statistics_target = 100 # range 1-10000 +#constraint_exclusion = partition # on, off, or partition +#cursor_tuple_fraction = 0.1 # range 0.0-1.0 +#from_collapse_limit = 8 +#jit = on # allow JIT compilation +#join_collapse_limit = 8 # 1 disables collapsing of explicit + # JOIN clauses +#plan_cache_mode = auto # auto, force_generic_plan or + # force_custom_plan +#recursive_worktable_factor = 10.0 # range 0.001-1000000 + + +#------------------------------------------------------------------------------ +# REPORTING AND LOGGING +#------------------------------------------------------------------------------ + +# - Where to Log - + +#log_destination = 'stderr' # Valid values are combinations of + # stderr, csvlog, jsonlog, syslog, and + # eventlog, depending on platform. + # csvlog and jsonlog require + # logging_collector to be on. + +# This is used when logging to stderr: +logging_collector = on # 开启日志收集,记录到文件 + # (change requires restart) + +# These are only used if logging_collector is on: +#log_directory = 'log' # directory where log files are written, + # can be absolute or relative to PGDATA +#log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log' # log file name pattern, + # can include strftime() escapes +#log_file_mode = 0600 # creation mode for log files, + # begin with 0 to use octal notation +#log_rotation_age = 1d # Automatic rotation of logfiles will + # happen after that time. 0 disables. +#log_rotation_size = 10MB # Automatic rotation of logfiles will + # happen after that much log output. + # 0 disables. +#log_truncate_on_rotation = off # If on, an existing log file with the + # same name as the new log file will be + # truncated rather than appended to. + # But such truncation only occurs on + # time-driven rotation, not on restarts + # or size-driven rotation. Default is + # off, meaning append to existing files + # in all cases. + +# These are relevant when logging to syslog: +#syslog_facility = 'LOCAL0' +#syslog_ident = 'postgres' +#syslog_sequence_numbers = on +#syslog_split_messages = on + +# This is only relevant when logging to eventlog (Windows): +# (change requires restart) +#event_source = 'PostgreSQL' + +# - When to Log - + +#log_min_messages = warning # values in order of decreasing detail: + # debug5 + # debug4 + # debug3 + # debug2 + # debug1 + # info + # notice + # warning + # error + # log + # fatal + # panic + +#log_min_error_statement = error # values in order of decreasing detail: + # debug5 + # debug4 + # debug3 + # debug2 + # debug1 + # info + # notice + # warning + # error + # log + # fatal + # panic (effectively off) + +log_min_duration_statement = 500 # 记录超过 500ms 的慢查询,方便排查 + # -1 禁用, 0 记录全部 + +#log_min_duration_sample = -1 # -1 is disabled, 0 logs a sample of statements + # and their durations, > 0 logs only a sample of + # statements running at least this number + # of milliseconds; + # sample fraction is determined by log_statement_sample_rate + +#log_statement_sample_rate = 1.0 # fraction of logged statements exceeding + # log_min_duration_sample to be logged; + # 1.0 logs all such statements, 0.0 never logs + + +#log_transaction_sample_rate = 0.0 # fraction of transactions whose statements + # are logged regardless of their duration; 1.0 logs all + # statements from all transactions, 0.0 never logs + +#log_startup_progress_interval = 10s # Time between progress updates for + # long-running startup operations. + # 0 disables the feature, > 0 indicates + # the interval in milliseconds. + +# - What to Log - + +#debug_print_parse = off +#debug_print_rewritten = off +#debug_print_plan = off +#debug_pretty_print = on +log_autovacuum_min_duration = 0 # 记录所有 autovacuum 活动,方便监控 + # -1 disables, 0 logs all actions and + # their durations, > 0 logs only + # actions running at least this number + # of milliseconds. +#log_checkpoints = on +#log_connections = off +#log_disconnections = off +#log_duration = off +#log_error_verbosity = default # terse, default, or verbose messages +#log_hostname = off +#log_line_prefix = '%m [%p] ' # special values: + # %a = application name + # %u = user name + # %d = database name + # %r = remote host and port + # %h = remote host + # %b = backend type + # %p = process ID + # %P = process ID of parallel group leader + # %t = timestamp without milliseconds + # %m = timestamp with milliseconds + # %n = timestamp with milliseconds (as a Unix epoch) + # %Q = query ID (0 if none or not computed) + # %i = command tag + # %e = SQL state + # %c = session ID + # %l = session line number + # %s = session start timestamp + # %v = virtual transaction ID + # %x = transaction ID (0 if none) + # %q = stop here in non-session + # processes + # %% = '%' + # e.g. '<%u%%%d> ' +#log_lock_waits = off # log lock waits >= deadlock_timeout +#log_recovery_conflict_waits = off # log standby recovery conflict waits + # >= deadlock_timeout +#log_parameter_max_length = -1 # when logging statements, limit logged + # bind-parameter values to N bytes; + # -1 means print in full, 0 disables +#log_parameter_max_length_on_error = 0 # when logging an error, limit logged + # bind-parameter values to N bytes; + # -1 means print in full, 0 disables +#log_statement = 'none' # none, ddl, mod, all +#log_replication_commands = off +#log_temp_files = -1 # log temporary files equal or larger + # than the specified size in kilobytes; + # -1 disables, 0 logs all temp files +log_timezone = 'Asia/Shanghai' + +# - Process Title - + +#cluster_name = '' # added to process titles if nonempty + # (change requires restart) +#update_process_title = on + + +#------------------------------------------------------------------------------ +# STATISTICS +#------------------------------------------------------------------------------ + +# - Cumulative Query and Index Statistics - + +#track_activities = on +#track_activity_query_size = 1024 # (change requires restart) +#track_counts = on +track_io_timing = on # 记录 I/O 耗时,EXPLAIN (BUFFERS) 可看到 +track_wal_io_timing = on # 记录 WAL 写入耗时 +#track_functions = none # none, pl, all +#stats_fetch_consistency = cache # cache, none, snapshot + + +# - Monitoring - + +#compute_query_id = auto +#log_statement_stats = off +#log_parser_stats = off +#log_planner_stats = off +#log_executor_stats = off + + +#------------------------------------------------------------------------------ +# AUTOVACUUM +#------------------------------------------------------------------------------ + +#autovacuum = on # Enable autovacuum subprocess? 'on' + # requires track_counts to also be on. +autovacuum_max_workers = 4 # 多个大表可并行 VACUUM + # (change requires restart) +autovacuum_naptime = 30s # 缩短检查间隔,及时清理死行 +#autovacuum_vacuum_threshold = 50 # min number of row updates before + # vacuum +#autovacuum_vacuum_insert_threshold = 1000 # min number of row inserts + # before vacuum; -1 disables insert + # vacuums +#autovacuum_analyze_threshold = 50 # min number of row updates before + # analyze +autovacuum_vacuum_scale_factor = 0.05 # 3000 万行表:5% = 150 万行变更就触发 VACUUM(默认 20% = 600 万) +autovacuum_vacuum_insert_scale_factor = 0.05 # 同上,插入触发 +autovacuum_analyze_scale_factor = 0.02 # 2% = 60 万行变更就更新统计信息(默认 10% = 300 万) +#autovacuum_freeze_max_age = 200000000 # maximum XID age before forced vacuum + # (change requires restart) +#autovacuum_multixact_freeze_max_age = 400000000 # maximum multixact age + # before forced vacuum + # (change requires restart) +#autovacuum_vacuum_cost_delay = 2ms # default vacuum cost delay for + # autovacuum, in milliseconds; + # -1 means use vacuum_cost_delay +#autovacuum_vacuum_cost_limit = -1 # default vacuum cost limit for + # autovacuum, -1 means use + # vacuum_cost_limit + + +#------------------------------------------------------------------------------ +# CLIENT CONNECTION DEFAULTS +#------------------------------------------------------------------------------ + +# - Statement Behavior - + +#client_min_messages = notice # values in order of decreasing detail: + # debug5 + # debug4 + # debug3 + # debug2 + # debug1 + # log + # notice + # warning + # error +#search_path = '"$user", public' # schema names +#row_security = on +#default_table_access_method = 'heap' +#default_tablespace = '' # a tablespace name, '' uses the default +#default_toast_compression = 'pglz' # 'pglz' or 'lz4' +#temp_tablespaces = '' # a list of tablespace names, '' uses + # only default tablespace +#check_function_bodies = on +#default_transaction_isolation = 'read committed' +#default_transaction_read_only = off +#default_transaction_deferrable = off +#session_replication_role = 'origin' +#statement_timeout = 0 # in milliseconds, 0 is disabled +#transaction_timeout = 0 # in milliseconds, 0 is disabled +#lock_timeout = 0 # in milliseconds, 0 is disabled +#idle_in_transaction_session_timeout = 0 # in milliseconds, 0 is disabled +#idle_session_timeout = 0 # in milliseconds, 0 is disabled +#vacuum_freeze_table_age = 150000000 +#vacuum_freeze_min_age = 50000000 +#vacuum_failsafe_age = 1600000000 +#vacuum_multixact_freeze_table_age = 150000000 +#vacuum_multixact_freeze_min_age = 5000000 +#vacuum_multixact_failsafe_age = 1600000000 +#bytea_output = 'hex' # hex, escape +#xmlbinary = 'base64' +#xmloption = 'content' +#gin_pending_list_limit = 4MB +#createrole_self_grant = '' # set and/or inherit +#event_triggers = on + +# - Locale and Formatting - + +datestyle = 'iso, mdy' +#intervalstyle = 'postgres' +timezone = 'Asia/Shanghai' +#timezone_abbreviations = 'Default' # Select the set of available time zone + # abbreviations. Currently, there are + # Default + # Australia (historical usage) + # India + # You can create your own file in + # share/timezonesets/. +#extra_float_digits = 1 # min -15, max 3; any value >0 actually + # selects precise output mode +#client_encoding = sql_ascii # actually, defaults to database + # encoding + +# These settings are initialized by initdb, but they can be changed. +lc_messages = 'en_US.utf8' # locale for system error message + # strings +lc_monetary = 'en_US.utf8' # locale for monetary formatting +lc_numeric = 'en_US.utf8' # locale for number formatting +lc_time = 'en_US.utf8' # locale for time formatting + +#icu_validation_level = warning # report ICU locale validation + # errors at the given level + +# default configuration for text search +default_text_search_config = 'pg_catalog.english' + +# - Shared Library Preloading - + +#local_preload_libraries = '' +#session_preload_libraries = '' +shared_preload_libraries = 'pg_stat_statements' # 启用慢查询统计(change requires restart) +#jit_provider = 'llvmjit' # JIT library to use + +# - Other Defaults - + +#dynamic_library_path = '$libdir' +#extension_destdir = '' # prepend path when loading extensions + # and shared objects (added by Debian) +#gin_fuzzy_search_limit = 0 + + +#------------------------------------------------------------------------------ +# LOCK MANAGEMENT +#------------------------------------------------------------------------------ + +#deadlock_timeout = 1s +#max_locks_per_transaction = 64 # min 10 + # (change requires restart) +#max_pred_locks_per_transaction = 64 # min 10 + # (change requires restart) +#max_pred_locks_per_relation = -2 # negative values mean + # (max_pred_locks_per_transaction + # / -max_pred_locks_per_relation) - 1 +#max_pred_locks_per_page = 2 # min 0 + + +#------------------------------------------------------------------------------ +# VERSION AND PLATFORM COMPATIBILITY +#------------------------------------------------------------------------------ + +# - Previous PostgreSQL Versions - + +#array_nulls = on +#backslash_quote = safe_encoding # on, off, or safe_encoding +#escape_string_warning = on +#lo_compat_privileges = off +#quote_all_identifiers = off +#standard_conforming_strings = on +#synchronize_seqscans = on + +# - Other Platforms and Clients - + +#transform_null_equals = off +#allow_alter_system = on + + +#------------------------------------------------------------------------------ +# ERROR HANDLING +#------------------------------------------------------------------------------ + +#exit_on_error = off # terminate session on any error? +#restart_after_crash = on # reinitialize after backend crash? +#data_sync_retry = off # retry or panic on failure to fsync + # data? + # (change requires restart) +#recovery_init_sync_method = fsync # fsync, syncfs (Linux 5.8+) + + +#------------------------------------------------------------------------------ +# CONFIG FILE INCLUDES +#------------------------------------------------------------------------------ + +# These options allow settings to be loaded from files other than the +# default postgresql.conf. Note that these are directives, not variable +# assignments, so they can usefully be given more than once. + +#include_dir = '...' # include files ending in '.conf' from + # a directory, e.g., 'conf.d' +#include_if_exists = '...' # include file only if it exists +#include = '...' # include file + + +#------------------------------------------------------------------------------ +# CUSTOMIZED OPTIONS +#------------------------------------------------------------------------------ + +# Add settings for extensions here diff --git a/scripts/perf_query/seed.go b/scripts/perf_query/seed.go new file mode 100644 index 0000000..17aa52c --- /dev/null +++ b/scripts/perf_query/seed.go @@ -0,0 +1,700 @@ +//go:build ignore + +// 性能测试数据生成/清理脚本 +// 用法: +// source .env.local +// go run ./scripts/perf_query/seed.go # 默认生成 3000 万卡 +// go run ./scripts/perf_query/seed.go -total 1000000 # 生成 100 万卡(试跑) +// go run ./scripts/perf_query/seed.go -action cleanup # 清理测试数据 +// go run ./scripts/perf_query/seed.go -action verify # 验证数据分布 +// go run ./scripts/perf_query/seed.go -action add-index # 创建优化索引 +// go run ./scripts/perf_query/seed.go -action drop-index # 删除优化索引 + +package main + +import ( + "database/sql" + "flag" + "fmt" + "log" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + _ "github.com/jackc/pgx/v5/stdlib" + "golang.org/x/crypto/bcrypt" + "gorm.io/driver/postgres" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +// 测试数据标记常量 +const ( + testBatchNo = "PERF-TEST-30M" + testShopCodePrefix = "PERF-TEST-SHOP-" + testAgentUsername = "perf_test_agent" + testAgentPassword = "PerfTest@123456" + numShops = 200 + agentChildShops = 6 // 代理商有 6 个下级店铺(自己 + 6 = 7 个总共) +) + +// 简化模型(仅用于创建测试数据,不引入项目内部包) +type testShop struct { + ID uint `gorm:"primaryKey"` + ShopName string `gorm:"column:shop_name"` + ShopCode string `gorm:"column:shop_code"` + ParentID *uint `gorm:"column:parent_id"` + Level int `gorm:"column:level"` + ContactName string `gorm:"column:contact_name"` + ContactPhone string `gorm:"column:contact_phone"` + Status int `gorm:"column:status"` + Creator uint `gorm:"column:creator"` + Updater uint `gorm:"column:updater"` + CreatedAt time.Time + UpdatedAt time.Time +} + +func (testShop) TableName() string { return "tb_shop" } + +type testAccount struct { + ID uint `gorm:"primaryKey"` + Username string `gorm:"column:username"` + Phone string `gorm:"column:phone"` + Password string `gorm:"column:password"` + UserType int `gorm:"column:user_type"` + ShopID *uint `gorm:"column:shop_id"` + EnterpriseID *uint `gorm:"column:enterprise_id"` + Status int `gorm:"column:status"` + Creator uint `gorm:"column:creator"` + Updater uint `gorm:"column:updater"` + CreatedAt time.Time + UpdatedAt time.Time +} + +func (testAccount) TableName() string { return "tb_account" } + +var ( + action = flag.String("action", "seed", "操作: seed / cleanup / verify / add-index / drop-index") + total = flag.Int("total", 30000000, "生成卡总数") + batchSize = flag.Int("batch", 5000000, "每批插入数量") +) + +func buildDSN() string { + return fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable TimeZone=Asia/Shanghai", + getEnv("JUNHONG_DATABASE_HOST", "cxd.whcxd.cn"), + getEnv("JUNHONG_DATABASE_PORT", "16159"), + getEnv("JUNHONG_DATABASE_USER", "erp_pgsql"), + getEnv("JUNHONG_DATABASE_PASSWORD", "erp_2025"), + getEnv("JUNHONG_DATABASE_DBNAME", "junhong_cmp_test"), + ) +} + +func main() { + flag.Parse() + + db, err := gorm.Open(postgres.Open(buildDSN()), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Warn), + }) + if err != nil { + log.Fatalf("❌ 连接数据库失败: %v", err) + } + + sqlDB, _ := db.DB() + sqlDB.SetMaxOpenConns(5) + sqlDB.SetMaxIdleConns(3) + fmt.Println("✅ 数据库连接成功") + + switch *action { + case "seed": + doSeed(db) + case "cleanup": + doCleanup(db) + case "verify": + doVerify(db) + case "add-index": + doAddIndex(db) + case "drop-index": + doDropIndex(db) + default: + log.Fatalf("❌ 未知操作: %s(支持: seed / cleanup / verify / add-index / drop-index)", *action) + } +} + +// ==================== Seed ==================== + +func doSeed(db *gorm.DB) { + fmt.Println("\n🚀 开始生成性能测试数据(UNLOGGED 极速模式)") + fmt.Printf(" 目标卡数: %d\n", *total) + fmt.Printf(" 店铺数量: %d(代理商可见 %d 个)\n", numShops, agentChildShops+1) + + overallStart := time.Now() + + fmt.Println("\n📦 步骤 1/8: 创建测试店铺...") + shopIDs := createTestShops(db) + fmt.Printf(" ✅ 创建了 %d 个店铺\n", len(shopIDs)) + + fmt.Println("\n👤 步骤 2/8: 创建测试代理商账号...") + createTestAgent(db, shopIDs[0]) + fmt.Printf(" ✅ 账号: %s / %s (shop_id=%d)\n", testAgentUsername, testAgentPassword, shopIDs[0]) + + // 调高 PG 配置减少 checkpoint 频率 + fmt.Println("\n⚙️ 步骤 3/8: 临时调高 PostgreSQL 写入配置...") + tuneForBulkLoad(db) + + // 先删索引再插入,速度提升 10~50 倍 + fmt.Println("\n📇 步骤 4/8: 临时删除非主键索引(加速批量插入)...") + droppedIndexes := dropNonPKIndexes(db) + fmt.Printf(" ✅ 删除了 %d 个索引\n", len(droppedIndexes)) + + // 设置 UNLOGGED 跳过 WAL 写入(表数据少时几乎瞬间完成) + fmt.Println("\n⚡ 步骤 5/8: 设置表为 UNLOGGED(跳过 WAL,极速写入)...") + start := time.Now() + if err := db.Exec("ALTER TABLE tb_iot_card SET UNLOGGED").Error; err != nil { + log.Fatalf("设置 UNLOGGED 失败: %v", err) + } + fmt.Printf(" ✅ 已设置 UNLOGGED (耗时 %v)\n", time.Since(start)) + + fmt.Println("\n💳 步骤 6/8: 批量插入卡数据(UNLOGGED + generate_series)...") + insertCards(db, shopIDs) + + // 恢复为 LOGGED(会重写整张表到 WAL,30M 行约需 3-5 分钟) + fmt.Println("\n🔒 步骤 7/8: 恢复表为 LOGGED(重写 WAL,请耐心等待)...") + start = time.Now() + if err := db.Exec("ALTER TABLE tb_iot_card SET LOGGED").Error; err != nil { + // SET LOGGED 失败不致命,表仍然可用,只是崩溃会丢数据 + fmt.Printf(" ⚠️ 恢复 LOGGED 失败(测试数据可接受): %v\n", err) + } else { + fmt.Printf(" ✅ 已恢复 LOGGED (耗时 %v)\n", time.Since(start)) + } + + fmt.Println("\n📇 步骤 8/8: 重建索引 + ANALYZE...") + rebuildIndexes(db, droppedIndexes) + fmt.Printf(" ✅ 重建了 %d 个索引\n", len(droppedIndexes)) + + start = time.Now() + db.Exec("ANALYZE tb_iot_card") + fmt.Printf(" ✅ ANALYZE 完成 (耗时 %v)\n", time.Since(start)) + + // 恢复 PG 配置 + restoreAfterBulkLoad(db) + + fmt.Printf("\n🎉 全部完成!总耗时: %v\n", time.Since(overallStart)) + fmt.Println("\n📝 后续步骤:") + fmt.Println(" 1. 启动 API 服务: source .env.local && go run ./cmd/api/...") + fmt.Printf(" 2. 运行基准测试: go run ./scripts/perf_query/bench.go\n") + fmt.Println(" 3. 测试完成后清理: go run ./scripts/perf_query/seed.go -action cleanup") +} + +// tuneForBulkLoad 临时调高 PG 配置以加速批量写入 +func tuneForBulkLoad(db *gorm.DB) { + settings := []struct { + sql string + desc string + }{ + {"ALTER SYSTEM SET max_wal_size = '4GB'", "max_wal_size → 4GB(减少 checkpoint 频率)"}, + {"ALTER SYSTEM SET checkpoint_timeout = '30min'", "checkpoint_timeout → 30min"}, + {"ALTER SYSTEM SET synchronous_commit = 'off'", "synchronous_commit → off(不等待 WAL 刷盘)"}, + } + for _, s := range settings { + if err := db.Exec(s.sql).Error; err != nil { + fmt.Printf(" ⚠️ %s 失败: %v\n", s.desc, err) + } else { + fmt.Printf(" ✅ %s\n", s.desc) + } + } + db.Exec("SELECT pg_reload_conf()") + fmt.Println(" ✅ 配置已重载") +} + +// restoreAfterBulkLoad 恢复 PG 配置 +func restoreAfterBulkLoad(db *gorm.DB) { + fmt.Println("\n⚙️ 恢复 PostgreSQL 配置...") + db.Exec("ALTER SYSTEM SET max_wal_size = '1GB'") + db.Exec("ALTER SYSTEM SET checkpoint_timeout = '5min'") + db.Exec("ALTER SYSTEM SET synchronous_commit = 'on'") + db.Exec("SELECT pg_reload_conf()") + fmt.Println(" ✅ PG 配置已恢复默认") +} + +func createTestShops(db *gorm.DB) []uint { + now := time.Now() + shopIDs := make([]uint, 0, numShops) + + // 先清理可能存在的旧测试店铺 + db.Exec("DELETE FROM tb_shop WHERE shop_code LIKE ?", testShopCodePrefix+"%") + + // 创建根店铺(代理商总店) + rootShop := testShop{ + ShopName: "性能测试-总代理", + ShopCode: testShopCodePrefix + "ROOT", + Level: 1, + ContactName: "测试联系人", + ContactPhone: "13800000000", + Status: 1, + Creator: 1, + Updater: 1, + CreatedAt: now, + UpdatedAt: now, + } + if err := db.Create(&rootShop).Error; err != nil { + log.Fatalf("创建根店铺失败: %v", err) + } + shopIDs = append(shopIDs, rootShop.ID) + + // 创建 6 个子店铺(代理商下级) + for i := 1; i <= agentChildShops; i++ { + child := testShop{ + ShopName: fmt.Sprintf("性能测试-分店%d", i), + ShopCode: fmt.Sprintf("%sSUB-%d", testShopCodePrefix, i), + ParentID: &rootShop.ID, + Level: 2, + ContactName: "测试联系人", + ContactPhone: fmt.Sprintf("1380000%04d", i), + Status: 1, + Creator: 1, + Updater: 1, + CreatedAt: now, + UpdatedAt: now, + } + if err := db.Create(&child).Error; err != nil { + log.Fatalf("创建子店铺失败: %v", err) + } + shopIDs = append(shopIDs, child.ID) + } + + // 创建其余独立店铺(其他代理商) + for i := agentChildShops + 1; i < numShops; i++ { + indep := testShop{ + ShopName: fmt.Sprintf("性能测试-独立店铺%d", i), + ShopCode: fmt.Sprintf("%sINDEP-%d", testShopCodePrefix, i), + Level: 1, + ContactName: "测试联系人", + ContactPhone: fmt.Sprintf("1390000%04d", i), + Status: 1, + Creator: 1, + Updater: 1, + CreatedAt: now, + UpdatedAt: now, + } + if err := db.Create(&indep).Error; err != nil { + log.Fatalf("创建独立店铺失败: %v", err) + } + shopIDs = append(shopIDs, indep.ID) + } + + return shopIDs +} + +func createTestAgent(db *gorm.DB, shopID uint) { + // 先清理可能存在的旧测试账号 + db.Exec("DELETE FROM tb_account WHERE username = ?", testAgentUsername) + + // bcrypt 加密密码 + hashedPassword, err := bcrypt.GenerateFromPassword([]byte(testAgentPassword), bcrypt.DefaultCost) + if err != nil { + log.Fatalf("加密密码失败: %v", err) + } + + account := testAccount{ + Username: testAgentUsername, + Phone: "19999999999", + Password: string(hashedPassword), + UserType: 3, // 代理账号 + ShopID: &shopID, + Status: 1, + Creator: 1, + Updater: 1, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + if err := db.Create(&account).Error; err != nil { + log.Fatalf("创建代理账号失败: %v", err) + } +} + +func insertCards(db *gorm.DB, shopIDs []uint) { + idStrs := make([]string, len(shopIDs)) + for i, id := range shopIDs { + idStrs[i] = strconv.FormatUint(uint64(id), 10) + } + pgArray := "ARRAY[" + strings.Join(idStrs, ",") + "]::bigint[]" + + sqlTemplate := fmt.Sprintf(` +INSERT INTO tb_iot_card ( + iccid, card_category, carrier_id, carrier_type, carrier_name, + imsi, msisdn, batch_no, supplier, cost_price, distribute_price, + status, shop_id, activation_status, real_name_status, network_status, + data_usage_mb, enable_polling, created_at, updated_at, creator, updater, series_id, + is_standalone +) +SELECT + 'T' || lpad(i::text, 19, '0'), + CASE WHEN random() < 0.85 THEN 'normal' ELSE 'industry' END, + (i %% 4 + 1), + CASE (i %% 4) WHEN 0 THEN 'CMCC' WHEN 1 THEN 'CUCC' WHEN 2 THEN 'CTCC' ELSE 'CBN' END, + CASE (i %% 4) WHEN 0 THEN '中国移动' WHEN 1 THEN '中国联通' WHEN 2 THEN '中国电信' ELSE '中国广电' END, + lpad(i::text, 15, '0'), + '1' || lpad((i %% 10000000)::text, 10, '0'), + '%s', + CASE (i %% 3) WHEN 0 THEN '供应商A' WHEN 1 THEN '供应商B' ELSE '供应商C' END, + (random() * 5000 + 500)::bigint, + (random() * 3000 + 1000)::bigint, + CASE WHEN random() < 0.15 THEN 1 WHEN random() < 0.30 THEN 2 WHEN random() < 0.90 THEN 3 ELSE 4 END, + (%s)[GREATEST(1, LEAST(%d, floor(%d * power(random(), 2.5))::int + 1))], + CASE WHEN random() < 0.7 THEN 1 ELSE 0 END, + CASE WHEN random() < 0.6 THEN 1 ELSE 0 END, + CASE WHEN random() < 0.8 THEN 1 ELSE 0 END, + (random() * 100000)::bigint, + random() < 0.9, + now() - interval '1 day' * (random() * 730), + now() - interval '1 day' * (random() * 30), + 1, 1, + CASE WHEN random() < 0.8 THEN (random() * 20 + 1)::int ELSE NULL END, + true +FROM generate_series($1::bigint, $2::bigint) AS s(i) +`, testBatchNo, pgArray, numShops, numShops) + + // 500 万/批 × 4 并发 worker(HDD 上减少 I/O 竞争) + const workerCount = 4 + const chunkSize = 5000000 + + type task struct { + batchNum int + start int + end int + } + + totalBatches := (*total + chunkSize - 1) / chunkSize + taskCh := make(chan task, totalBatches) + var wg sync.WaitGroup + var insertedCount int64 + var errCount int64 + totalStart := time.Now() + + dsn := buildDSN() + + for w := 0; w < workerCount; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + conn, err := sql.Open("pgx", dsn) + if err != nil { + log.Printf("Worker %d 连接失败: %v", workerID, err) + return + } + defer conn.Close() + conn.SetMaxOpenConns(1) + conn.Exec("SET synchronous_commit = off") + conn.Exec("SET work_mem = '256MB'") + + for t := range taskCh { + _, err := conn.Exec(sqlTemplate, t.start, t.end) + if err != nil { + log.Printf("Worker %d 批次 %d 失败: %v", workerID, t.batchNum, err) + atomic.AddInt64(&errCount, 1) + continue + } + + count := t.end - t.start + 1 + done := atomic.AddInt64(&insertedCount, int64(count)) + elapsed := time.Since(totalStart).Seconds() + speed := float64(done) / elapsed + remaining := float64(int64(*total)-done) / speed + fmt.Printf(" [W%d] 批次 %d/%d 完成 | 累计 %d/%d (%.1f%%) | %.0f条/秒 | 剩余 %.0f秒\n", + workerID, t.batchNum, totalBatches, done, *total, + float64(done)*100/float64(*total), speed, remaining) + } + }(w) + } + + for b := 0; b < totalBatches; b++ { + start := b*chunkSize + 1 + end := (b + 1) * chunkSize + if end > *total { + end = *total + } + taskCh <- task{batchNum: b + 1, start: start, end: end} + } + close(taskCh) + wg.Wait() + + elapsed := time.Since(totalStart) + fmt.Printf(" ✅ 共插入 %d 条 (失败 %d 批) | 总耗时 %v | 平均 %.0f 条/秒\n", + atomic.LoadInt64(&insertedCount), atomic.LoadInt64(&errCount), elapsed.Round(time.Second), + float64(atomic.LoadInt64(&insertedCount))/elapsed.Seconds()) +} + +// ==================== Cleanup ==================== + +func doCleanup(db *gorm.DB) { + fmt.Println("\n🧹 开始清理性能测试数据") + + // 删除测试卡数据(分批删除) + fmt.Println("\n💳 删除测试卡数据...") + totalDeleted := 0 + start := time.Now() + for { + result := db.Exec("DELETE FROM tb_iot_card WHERE ctid IN (SELECT ctid FROM tb_iot_card WHERE batch_no = ? LIMIT 500000)", testBatchNo) + if result.Error != nil { + log.Fatalf("删除失败: %v", result.Error) + } + if result.RowsAffected == 0 { + break + } + totalDeleted += int(result.RowsAffected) + elapsed := time.Since(start) + speed := float64(totalDeleted) / elapsed.Seconds() + fmt.Printf(" 已删除 %d 条 (%.0f 条/秒)\n", totalDeleted, speed) + } + fmt.Printf(" ✅ 共删除 %d 条卡数据 (耗时 %v)\n", totalDeleted, time.Since(start).Round(time.Second)) + + // 删除测试店铺 + fmt.Println("\n📦 删除测试店铺...") + result := db.Exec("DELETE FROM tb_shop WHERE shop_code LIKE ?", testShopCodePrefix+"%") + fmt.Printf(" ✅ 删除了 %d 个店铺\n", result.RowsAffected) + + // 删除测试账号 + fmt.Println("\n👤 删除测试账号...") + result = db.Exec("DELETE FROM tb_account WHERE username = ?", testAgentUsername) + fmt.Printf(" ✅ 删除了 %d 个账号\n", result.RowsAffected) + + // 删除优化索引(如果存在) + fmt.Println("\n📇 清理优化索引...") + db.Exec("DROP INDEX IF EXISTS idx_iot_card_perf_shop_created") + db.Exec("DROP INDEX IF EXISTS idx_iot_card_perf_shop_status_created") + fmt.Println(" ✅ 索引已清理") + + fmt.Println("\n⚠️ 建议手动执行 VACUUM FULL tb_iot_card; 回收磁盘空间") + fmt.Println("🎉 清理完成!") +} + +// ==================== Verify ==================== + +func doVerify(db *gorm.DB) { + fmt.Println("\n📊 验证测试数据分布") + + // 总卡数 + var total int64 + db.Raw("SELECT count(*) FROM tb_iot_card WHERE batch_no = ? AND deleted_at IS NULL", testBatchNo).Scan(&total) + fmt.Printf("\n总测试卡数: %d\n", total) + + // 代理商可见卡数 + var agentVisible int64 + db.Raw(` + SELECT count(*) FROM tb_iot_card + WHERE batch_no = ? AND deleted_at IS NULL + AND shop_id IN (SELECT id FROM tb_shop WHERE shop_code LIKE ? AND deleted_at IS NULL) + `, testBatchNo, testShopCodePrefix+"ROOT").Scan(&agentVisible) + + var agentSubVisible int64 + db.Raw(` + SELECT count(*) FROM tb_iot_card + WHERE batch_no = ? AND deleted_at IS NULL + AND shop_id IN ( + SELECT id FROM tb_shop + WHERE (shop_code LIKE ? OR shop_code LIKE ?) AND deleted_at IS NULL + ) + `, testBatchNo, testShopCodePrefix+"ROOT", testShopCodePrefix+"SUB-%").Scan(&agentSubVisible) + fmt.Printf("代理商总店卡数: %d\n", agentVisible) + fmt.Printf("代理商(含下级)卡数: %d (占比 %.1f%%)\n", agentSubVisible, float64(agentSubVisible)*100/float64(total)) + + // 前 20 名店铺分布 + type shopDist struct { + ShopID uint + Cnt int64 + } + var dists []shopDist + db.Raw(` + SELECT shop_id, count(*) as cnt FROM tb_iot_card + WHERE batch_no = ? AND deleted_at IS NULL + GROUP BY shop_id ORDER BY cnt DESC LIMIT 20 + `, testBatchNo).Scan(&dists) + + fmt.Println("\n前 20 名店铺卡量分布:") + fmt.Printf(" %-12s %-12s %-8s\n", "shop_id", "卡数", "占比") + for _, d := range dists { + fmt.Printf(" %-12d %-12d %.2f%%\n", d.ShopID, d.Cnt, float64(d.Cnt)*100/float64(total)) + } + + // 状态分布 + type statusDist struct { + Status int + Cnt int64 + } + var statuses []statusDist + db.Raw("SELECT status, count(*) as cnt FROM tb_iot_card WHERE batch_no = ? AND deleted_at IS NULL GROUP BY status ORDER BY status", testBatchNo).Scan(&statuses) + + fmt.Println("\n状态分布:") + statusNames := map[int]string{1: "在库", 2: "已分销", 3: "已激活", 4: "已停用"} + for _, s := range statuses { + fmt.Printf(" 状态 %d (%s): %d (%.1f%%)\n", s.Status, statusNames[s.Status], s.Cnt, float64(s.Cnt)*100/float64(total)) + } + + // 运营商分布 + type carrierDist struct { + CarrierID uint + Cnt int64 + } + var carriers []carrierDist + db.Raw("SELECT carrier_id, count(*) as cnt FROM tb_iot_card WHERE batch_no = ? AND deleted_at IS NULL GROUP BY carrier_id ORDER BY carrier_id", testBatchNo).Scan(&carriers) + + fmt.Println("\n运营商分布:") + for _, c := range carriers { + fmt.Printf(" carrier_id=%d: %d (%.1f%%)\n", c.CarrierID, c.Cnt, float64(c.Cnt)*100/float64(total)) + } + + // 表大小 + var tableSize, indexSize string + db.Raw("SELECT pg_size_pretty(pg_relation_size('tb_iot_card'))").Scan(&tableSize) + db.Raw("SELECT pg_size_pretty(pg_indexes_size('tb_iot_card'))").Scan(&indexSize) + fmt.Printf("\n表大小: %s | 索引大小: %s\n", tableSize, indexSize) + + // 测试代理商账号信息 + var agentInfo struct { + ID uint + Username string + ShopID uint + } + db.Raw("SELECT id, username, shop_id FROM tb_account WHERE username = ? AND deleted_at IS NULL", testAgentUsername).Scan(&agentInfo) + fmt.Printf("\n测试代理商: id=%d, username=%s, shop_id=%d\n", agentInfo.ID, agentInfo.Username, agentInfo.ShopID) +} + +// ==================== Bulk Load Index Management ==================== + +type indexDef struct { + Name string + Definition string +} + +func dropNonPKIndexes(db *gorm.DB) []indexDef { + sqlDB, _ := db.DB() + + var indexes []indexDef + rows, err := sqlDB.Query(` + SELECT indexname, indexdef FROM pg_indexes + WHERE tablename = 'tb_iot_card' AND indexname != 'iot_cards_pkey' + ORDER BY indexname + `) + if err != nil { + log.Fatalf("查询索引失败: %v", err) + } + defer rows.Close() + + for rows.Next() { + var idx indexDef + rows.Scan(&idx.Name, &idx.Definition) + indexes = append(indexes, idx) + } + + for _, idx := range indexes { + fmt.Printf(" 删除 %s ...\n", idx.Name) + sqlDB.Exec("DROP INDEX IF EXISTS " + idx.Name) + } + + return indexes +} + +func rebuildIndexes(db *gorm.DB, indexes []indexDef) { + sqlDB, _ := db.DB() + + for _, idx := range indexes { + fmt.Printf(" 重建 %s ...", idx.Name) + start := time.Now() + if _, err := sqlDB.Exec(idx.Definition); err != nil { + fmt.Printf(" ❌ 失败: %v\n", err) + continue + } + fmt.Printf(" ✅ (%v)\n", time.Since(start).Round(time.Second)) + } +} + +// ==================== Optimization Index Management ==================== + +func doAddIndex(db *gorm.DB) { + fmt.Println("\n📇 创建优化复合索引(CONCURRENTLY,不阻塞读写)") + + indexes := []struct { + name string + sql string + }{ + { + name: "idx_iot_card_perf_shop_created", + sql: "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_iot_card_perf_shop_created ON tb_iot_card (shop_id, created_at DESC) WHERE deleted_at IS NULL", + }, + { + name: "idx_iot_card_perf_shop_status_created", + sql: "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_iot_card_perf_shop_status_created ON tb_iot_card (shop_id, status, created_at DESC) WHERE deleted_at IS NULL", + }, + } + + // CONCURRENTLY 不能在事务内执行,直接用底层连接 + sqlDB, _ := db.DB() + + // pg_trgm GIN 索引:加速 ICCID/MSISDN 中间模糊搜索(LIKE '%xxx%') + fmt.Println("\n 启用 pg_trgm 扩展...") + if _, err := sqlDB.Exec("CREATE EXTENSION IF NOT EXISTS pg_trgm"); err != nil { + fmt.Printf(" ⚠️ pg_trgm 扩展创建失败(可能需要超级管理员权限): %v\n", err) + } else { + fmt.Println(" ✅ pg_trgm 扩展已启用") + trgmIndexes := []struct { + name string + sql string + }{ + { + name: "idx_iot_card_iccid_trgm", + sql: "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_iot_card_iccid_trgm ON tb_iot_card USING gin (iccid gin_trgm_ops)", + }, + { + name: "idx_iot_card_msisdn_trgm", + sql: "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_iot_card_msisdn_trgm ON tb_iot_card USING gin (msisdn gin_trgm_ops)", + }, + } + for _, idx := range trgmIndexes { + indexes = append(indexes, struct { + name string + sql string + }{idx.name, idx.sql}) + } + } + + for _, idx := range indexes { + fmt.Printf(" 创建 %s ...", idx.name) + start := time.Now() + if _, err := sqlDB.Exec(idx.sql); err != nil { + fmt.Printf(" ❌ 失败: %v\n", err) + continue + } + fmt.Printf(" ✅ (%v)\n", time.Since(start).Round(time.Second)) + } + + // 删除重复索引 + fmt.Println(" 清理重复索引 idx_tb_iot_card_shop_id(与 idx_iot_card_shop_id 重复)...") + sqlDB.Exec("DROP INDEX IF EXISTS idx_tb_iot_card_shop_id") + + fmt.Println("\n 更新统计信息...") + sqlDB.Exec("ANALYZE tb_iot_card") + fmt.Println(" ✅ 索引优化完成") +} + +func doDropIndex(db *gorm.DB) { + fmt.Println("\n📇 删除优化索引(恢复到原始状态)") + sqlDB, _ := db.DB() + sqlDB.Exec("DROP INDEX IF EXISTS idx_iot_card_perf_shop_created") + sqlDB.Exec("DROP INDEX IF EXISTS idx_iot_card_perf_shop_status_created") + sqlDB.Exec("DROP INDEX IF EXISTS idx_iot_card_iccid_trgm") + sqlDB.Exec("DROP INDEX IF EXISTS idx_iot_card_msisdn_trgm") + sqlDB.Exec("ANALYZE tb_iot_card") + fmt.Println(" ✅ 已删除优化索引") +} + +// ==================== Utils ==================== + +func getEnv(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +}