Skip to content

数据库优化策略

概述

数据库优化是 Ledger 模块性能保障的关键环节。本文档详细介绍了 MySQL 数据库在高并发财务场景下的优化策略,包括查询优化、索引设计、分区分表、读写分离、缓存策略等各个方面。

优化目标

  • 查询响应时间: < 100ms (90% 查询)
  • 并发处理能力: 10,000+ QPS
  • 数据一致性: ACID 特性保障
  • 高可用性: 99.99% 可用性
  • 存储效率: 合理的空间利用率

技术选型

  • 数据库: MySQL 8.0+ (InnoDB 引擎)
  • 连接池: HikariCP / go-sql-driver
  • 监控: Prometheus + Grafana
  • 慢查询分析: pt-query-digest
  • 性能调优: MySQLTuner

查询优化策略

1. SQL 查询优化

查询重写技术

sql
-- 原始查询(性能差)
SELECT t.*, l.name as ledger_name 
FROM transactions t 
LEFT JOIN ledgers l ON t.ledger_id = l.id 
WHERE t.user_id = 12345 
  AND t.created_at >= '2024-01-01' 
  AND t.amount > 10000
ORDER BY t.created_at DESC 
LIMIT 20;

-- 优化后查询
SELECT t.id, t.ledger_id, t.type, t.amount, t.description, 
       t.created_at, l.name as ledger_name
FROM transactions t FORCE INDEX (idx_user_date_amount)
INNER JOIN ledgers l ON t.ledger_id = l.id 
WHERE t.user_id = 12345 
  AND t.created_at >= '2024-01-01' 
  AND t.amount > 10000
ORDER BY t.created_at DESC 
LIMIT 20;

分页优化

sql
-- 传统分页(深度分页性能差)
SELECT * FROM transactions 
WHERE user_id = 12345 
ORDER BY created_at DESC 
LIMIT 1000, 20;

-- 游标分页优化
SELECT * FROM transactions 
WHERE user_id = 12345 
  AND created_at < '2024-01-15 10:30:00'
  AND id < 98765  -- 使用 ID 作为辅助排序
ORDER BY created_at DESC, id DESC 
LIMIT 20;

-- Go 代码实现
type PaginationCursor struct {
    CreatedAt time.Time `json:"created_at"`
    ID        int64     `json:"id"`
}

func (r *TransactionRepository) FindWithCursor(
    ctx context.Context, 
    userID int64, 
    cursor *PaginationCursor, 
    limit int,
) ([]*Transaction, *PaginationCursor, error) {
    query := `
        SELECT id, ledger_id, type, amount, description, created_at
        FROM transactions 
        WHERE user_id = ?`
    
    args := []interface{}{userID}
    
    if cursor != nil {
        query += ` AND (created_at < ? OR (created_at = ? AND id < ?))`
        args = append(args, cursor.CreatedAt, cursor.CreatedAt, cursor.ID)
    }
    
    query += ` ORDER BY created_at DESC, id DESC LIMIT ?`
    args = append(args, limit)
    
    rows, err := r.db.QueryContext(ctx, query, args...)
    if err != nil {
        return nil, nil, err
    }
    defer rows.Close()
    
    transactions := make([]*Transaction, 0, limit)
    var nextCursor *PaginationCursor
    
    for rows.Next() {
        tx := &Transaction{}
        err := rows.Scan(&tx.ID, &tx.LedgerID, &tx.Type, 
                        &tx.Amount, &tx.Description, &tx.CreatedAt)
        if err != nil {
            return nil, nil, err
        }
        transactions = append(transactions, tx)
    }
    
    if len(transactions) == limit {
        lastTx := transactions[len(transactions)-1]
        nextCursor = &PaginationCursor{
            CreatedAt: lastTx.CreatedAt,
            ID:        lastTx.ID,
        }
    }
    
    return transactions, nextCursor, nil
}

2. 聚合查询优化

预计算统计数据

sql
-- 创建统计汇总表
CREATE TABLE daily_transaction_summary (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    ledger_id BIGINT NOT NULL,
    stat_date DATE NOT NULL,
    transaction_count INT DEFAULT 0,
    total_income DECIMAL(15,2) DEFAULT 0.00,
    total_expense DECIMAL(15,2) DEFAULT 0.00,
    net_amount DECIMAL(15,2) DEFAULT 0.00,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    
    UNIQUE KEY uk_user_ledger_date (user_id, ledger_id, stat_date),
    KEY idx_user_date (user_id, stat_date),
    KEY idx_ledger_date (ledger_id, stat_date)
) ENGINE=InnoDB;

-- 实时更新统计数据的触发器
DELIMITER $$
CREATE TRIGGER tr_transaction_after_insert 
AFTER INSERT ON transactions
FOR EACH ROW
BEGIN
    INSERT INTO daily_transaction_summary 
        (user_id, ledger_id, stat_date, transaction_count, 
         total_income, total_expense, net_amount)
    VALUES 
        (NEW.user_id, NEW.ledger_id, DATE(NEW.created_at), 1,
         IF(NEW.amount > 0, NEW.amount, 0),
         IF(NEW.amount < 0, ABS(NEW.amount), 0),
         NEW.amount)
    ON DUPLICATE KEY UPDATE
        transaction_count = transaction_count + 1,
        total_income = total_income + IF(NEW.amount > 0, NEW.amount, 0),
        total_expense = total_expense + IF(NEW.amount < 0, ABS(NEW.amount), 0),
        net_amount = net_amount + NEW.amount,
        updated_at = CURRENT_TIMESTAMP;
END$$
DELIMITER ;

Go 实现预计算服务

go
type StatisticsService struct {
    db           *gorm.DB
    redis        redis.Cmdable
    scheduler    *cron.Cron
    logger       *zap.Logger
}

// 定时预计算任务
func (s *StatisticsService) StartScheduler() {
    s.scheduler.AddFunc("0 1 * * *", s.calculateDailyStats)     // 每天凌晨1点
    s.scheduler.AddFunc("0 2 1 * *", s.calculateMonthlyStats)   // 每月1日凌晨2点
    s.scheduler.AddFunc("0 3 1 1 *", s.calculateYearlyStats)    // 每年1月1日凌晨3点
    s.scheduler.Start()
}

func (s *StatisticsService) calculateDailyStats() {
    ctx := context.Background()
    yesterday := time.Now().AddDate(0, 0, -1)
    
    // 批量计算所有用户的昨日统计
    query := `
        INSERT INTO daily_transaction_summary 
            (user_id, ledger_id, stat_date, transaction_count, 
             total_income, total_expense, net_amount)
        SELECT 
            user_id, 
            ledger_id,
            DATE(created_at) as stat_date,
            COUNT(*) as transaction_count,
            SUM(CASE WHEN amount > 0 THEN amount ELSE 0 END) as total_income,
            SUM(CASE WHEN amount < 0 THEN ABS(amount) ELSE 0 END) as total_expense,
            SUM(amount) as net_amount
        FROM transactions 
        WHERE DATE(created_at) = ?
        GROUP BY user_id, ledger_id, DATE(created_at)
        ON DUPLICATE KEY UPDATE
            transaction_count = VALUES(transaction_count),
            total_income = VALUES(total_income),
            total_expense = VALUES(total_expense),
            net_amount = VALUES(net_amount),
            updated_at = CURRENT_TIMESTAMP`
    
    result := s.db.WithContext(ctx).Exec(query, yesterday.Format("2006-01-02"))
    if result.Error != nil {
        s.logger.Error("Failed to calculate daily stats", 
            zap.Error(result.Error),
            zap.String("date", yesterday.Format("2006-01-02")))
        return
    }
    
    s.logger.Info("Daily stats calculated successfully", 
        zap.Int64("affected_rows", result.RowsAffected),
        zap.String("date", yesterday.Format("2006-01-02")))
}

索引优化策略

1. 索引设计原则

复合索引设计

sql
-- 基于查询模式设计复合索引
-- 查询模式分析:
-- 1. 按用户查询交易 (user_id)
-- 2. 按时间范围过滤 (created_at) 
-- 3. 按金额范围过滤 (amount)
-- 4. 按交易类型过滤 (type)

-- 主要复合索引
CREATE INDEX idx_user_date_amount ON transactions(user_id, created_at, amount);
CREATE INDEX idx_user_type_date ON transactions(user_id, type, created_at);
CREATE INDEX idx_ledger_date_type ON transactions(ledger_id, created_at, type);

-- 覆盖索引(包含常用查询的所有列)
CREATE INDEX idx_user_date_cover ON transactions(
    user_id, created_at, id, type, amount, description
);

索引选择性分析

sql
-- 分析列的选择性
SELECT 
    'user_id' as column_name,
    COUNT(DISTINCT user_id) as distinct_values,
    COUNT(*) as total_rows,
    COUNT(DISTINCT user_id) / COUNT(*) as selectivity
FROM transactions
UNION ALL
SELECT 
    'type' as column_name,
    COUNT(DISTINCT type) as distinct_values,
    COUNT(*) as total_rows,
    COUNT(DISTINCT type) / COUNT(*) as selectivity
FROM transactions
UNION ALL
SELECT 
    'amount' as column_name,
    COUNT(DISTINCT amount) as distinct_values,
    COUNT(*) as total_rows,
    COUNT(DISTINCT amount) / COUNT(*) as selectivity
FROM transactions;

-- 索引使用情况监控
SELECT 
    TABLE_NAME,
    INDEX_NAME,
    CARDINALITY,
    SEQ_IN_INDEX,
    COLUMN_NAME,
    COLLATION,
    NULLABLE
FROM information_schema.STATISTICS 
WHERE TABLE_SCHEMA = 'ledger_db' 
  AND TABLE_NAME = 'transactions'
ORDER BY TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX;

2. Go 代码索引优化

go
// Repository 层优化查询
type TransactionRepository struct {
    db    *gorm.DB
    cache redis.Cmdable
}

// 使用强制索引
func (r *TransactionRepository) FindByUserAndDateRange(
    ctx context.Context,
    userID int64,
    startDate, endDate time.Time,
    limit, offset int,
) ([]*Transaction, error) {
    var transactions []*Transaction
    
    // 强制使用最优索引
    err := r.db.WithContext(ctx).
        Table("transactions FORCE INDEX (idx_user_date_amount)").
        Where("user_id = ? AND created_at BETWEEN ? AND ?", 
              userID, startDate, endDate).
        Order("created_at DESC").
        Limit(limit).
        Offset(offset).
        Find(&transactions).Error
    
    return transactions, err
}

// 索引提示优化复杂查询
func (r *TransactionRepository) FindComplexQuery(
    ctx context.Context,
    filter *TransactionFilter,
) ([]*Transaction, error) {
    query := r.db.WithContext(ctx).Model(&Transaction{})
    
    // 根据过滤条件选择最优索引
    if filter.UserID != 0 && !filter.StartDate.IsZero() {
        query = query.Table("transactions USE INDEX (idx_user_date_amount)")
    } else if filter.LedgerID != 0 {
        query = query.Table("transactions USE INDEX (idx_ledger_date_type)")
    }
    
    // 动态构建查询条件
    if filter.UserID != 0 {
        query = query.Where("user_id = ?", filter.UserID)
    }
    
    if filter.LedgerID != 0 {
        query = query.Where("ledger_id = ?", filter.LedgerID)
    }
    
    if !filter.StartDate.IsZero() && !filter.EndDate.IsZero() {
        query = query.Where("created_at BETWEEN ? AND ?", 
                           filter.StartDate, filter.EndDate)
    }
    
    if len(filter.Types) > 0 {
        query = query.Where("type IN ?", filter.Types)
    }
    
    if filter.MinAmount != nil {
        query = query.Where("amount >= ?", *filter.MinAmount)
    }
    
    if filter.MaxAmount != nil {
        query = query.Where("amount <= ?", *filter.MaxAmount)
    }
    
    var transactions []*Transaction
    err := query.Order("created_at DESC").
               Limit(filter.Limit).
               Offset(filter.Offset).
               Find(&transactions).Error
    
    return transactions, err
}

分区分表策略

1. 水平分区

按时间分区

sql
-- 创建分区表(按年分区)
CREATE TABLE transactions (
    id BIGINT NOT NULL AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    ledger_id BIGINT NOT NULL,
    type ENUM('income', 'expense', 'transfer') NOT NULL,
    amount DECIMAL(15,2) NOT NULL,
    description TEXT,
    transaction_date DATETIME NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    
    PRIMARY KEY (id, transaction_date),
    KEY idx_user_date (user_id, transaction_date),
    KEY idx_ledger_date (ledger_id, transaction_date)
) ENGINE=InnoDB
PARTITION BY RANGE (YEAR(transaction_date)) (
    PARTITION p2023 VALUES LESS THAN (2024),
    PARTITION p2024 VALUES LESS THAN (2025),
    PARTITION p2025 VALUES LESS THAN (2026),
    PARTITION p2026 VALUES LESS THAN (2027),
    PARTITION p_future VALUES LESS THAN MAXVALUE
);

-- 动态添加新分区
ALTER TABLE transactions 
ADD PARTITION (PARTITION p2027 VALUES LESS THAN (2028));

-- 删除历史分区
ALTER TABLE transactions DROP PARTITION p2023;

按用户 Hash 分区

sql
-- 用户维度 Hash 分区
CREATE TABLE user_transactions (
    id BIGINT NOT NULL AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    ledger_id BIGINT NOT NULL,
    -- ... 其他字段
    PRIMARY KEY (id, user_id),
    KEY idx_user_date (user_id, created_at)
) ENGINE=InnoDB
PARTITION BY HASH(user_id)
PARTITIONS 16;

2. 分表策略

Go 实现分表路由

go
type ShardingConfig struct {
    ShardCount   int
    DatabaseName string
    TablePrefix  string
}

type ShardingManager struct {
    config   *ShardingConfig
    databases map[int]*gorm.DB
    logger   *zap.Logger
}

func NewShardingManager(config *ShardingConfig) *ShardingManager {
    manager := &ShardingManager{
        config:    config,
        databases: make(map[int]*gorm.DB),
        logger:    zap.L(),
    }
    
    // 初始化所有分片数据库连接
    for i := 0; i < config.ShardCount; i++ {
        db := initShardDB(i, config)
        manager.databases[i] = db
    }
    
    return manager
}

// 基于用户ID的分片路由
func (sm *ShardingManager) GetShardByUserID(userID int64) int {
    return int(userID % int64(sm.config.ShardCount))
}

// 基于时间的分表路由
func (sm *ShardingManager) GetTableName(baseTable string, date time.Time) string {
    suffix := date.Format("200601") // YYYYMM
    return fmt.Sprintf("%s_%s", baseTable, suffix)
}

// 分片查询实现
func (sm *ShardingManager) QueryTransactions(
    ctx context.Context,
    userID int64,
    startDate, endDate time.Time,
) ([]*Transaction, error) {
    shardID := sm.GetShardByUserID(userID)
    db := sm.databases[shardID]
    
    // 计算需要查询的表
    tables := sm.getTablesInRange("transactions", startDate, endDate)
    
    var allTransactions []*Transaction
    
    for _, tableName := range tables {
        var transactions []*Transaction
        err := db.WithContext(ctx).
            Table(tableName).
            Where("user_id = ? AND created_at BETWEEN ? AND ?", 
                  userID, startDate, endDate).
            Find(&transactions).Error
        
        if err != nil {
            sm.logger.Error("Failed to query shard table",
                zap.String("table", tableName),
                zap.Int("shard", shardID),
                zap.Error(err))
            continue
        }
        
        allTransactions = append(allTransactions, transactions...)
    }
    
    // 合并结果并排序
    sort.Slice(allTransactions, func(i, j int) bool {
        return allTransactions[i].CreatedAt.After(allTransactions[j].CreatedAt)
    })
    
    return allTransactions, nil
}

func (sm *ShardingManager) getTablesInRange(
    baseTable string, 
    startDate, endDate time.Time,
) []string {
    var tables []string
    
    current := startDate
    for current.Before(endDate) || current.Equal(endDate) {
        tableName := sm.GetTableName(baseTable, current)
        tables = append(tables, tableName)
        current = current.AddDate(0, 1, 0) // 下个月
    }
    
    return tables
}

缓存优化策略

1. 多级缓存架构

Go 实现多级缓存

go
type CacheManager struct {
    localCache  cache.Cache        // 本地缓存
    redisCache  redis.Cmdable      // Redis 缓存
    db          *gorm.DB           // 数据库
    stats       *CacheStats        // 缓存统计
}

type CacheConfig struct {
    LocalTTL      time.Duration
    RedisTTL      time.Duration
    LocalMaxSize  int
    RedisMaxSize  int
    EnableMetrics bool
}

func NewCacheManager(config *CacheConfig) *CacheManager {
    return &CacheManager{
        localCache: cache.New(config.LocalTTL, time.Minute),
        stats:      NewCacheStats(),
    }
}

// 多级缓存查询
func (cm *CacheManager) GetTransaction(
    ctx context.Context, 
    id int64,
) (*Transaction, error) {
    cacheKey := fmt.Sprintf("transaction:%d", id)
    
    // L1: 本地缓存
    if data, found := cm.localCache.Get(cacheKey); found {
        cm.stats.LocalHits.Inc()
        return data.(*Transaction), nil
    }
    cm.stats.LocalMisses.Inc()
    
    // L2: Redis 缓存
    data, err := cm.redisCache.Get(ctx, cacheKey).Result()
    if err == nil {
        var tx Transaction
        if err := json.Unmarshal([]byte(data), &tx); err == nil {
            // 回填本地缓存
            cm.localCache.Set(cacheKey, &tx, cache.DefaultExpiration)
            cm.stats.RedisHits.Inc()
            return &tx, nil
        }
    }
    cm.stats.RedisMisses.Inc()
    
    // L3: 数据库查询
    var tx Transaction
    err = cm.db.WithContext(ctx).First(&tx, id).Error
    if err != nil {
        cm.stats.DBMisses.Inc()
        return nil, err
    }
    cm.stats.DBHits.Inc()
    
    // 异步写入缓存
    go cm.asyncSetCache(cacheKey, &tx)
    
    return &tx, nil
}

func (cm *CacheManager) asyncSetCache(key string, tx *Transaction) {
    // 写入本地缓存
    cm.localCache.Set(key, tx, cache.DefaultExpiration)
    
    // 写入 Redis 缓存
    data, err := json.Marshal(tx)
    if err != nil {
        return
    }
    
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    cm.redisCache.SetEX(ctx, key, data, 10*time.Minute)
}

2. 缓存策略实现

查询结果缓存

go
// 用户交易列表缓存
func (r *TransactionRepository) FindByUserWithCache(
    ctx context.Context,
    userID int64,
    limit, offset int,
) ([]*Transaction, error) {
    cacheKey := fmt.Sprintf("user_transactions:%d:%d:%d", userID, limit, offset)
    
    // 尝试从缓存获取
    cached, err := r.cache.Get(ctx, cacheKey).Result()
    if err == nil {
        var transactions []*Transaction
        if json.Unmarshal([]byte(cached), &transactions) == nil {
            return transactions, nil
        }
    }
    
    // 从数据库查询
    var transactions []*Transaction
    err = r.db.WithContext(ctx).
        Where("user_id = ?", userID).
        Order("created_at DESC").
        Limit(limit).
        Offset(offset).
        Find(&transactions).Error
    
    if err != nil {
        return nil, err
    }
    
    // 异步写入缓存
    go func() {
        data, _ := json.Marshal(transactions)
        r.cache.SetEX(context.Background(), cacheKey, data, 5*time.Minute)
    }()
    
    return transactions, nil
}

// 统计数据缓存
func (s *StatisticsService) GetUserStats(
    ctx context.Context,
    userID int64,
    startDate, endDate time.Time,
) (*UserStats, error) {
    cacheKey := fmt.Sprintf("user_stats:%d:%s:%s", 
        userID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02"))
    
    // 检查缓存
    if stats := s.getStatsFromCache(ctx, cacheKey); stats != nil {
        return stats, nil
    }
    
    // 计算统计数据
    stats, err := s.calculateUserStats(ctx, userID, startDate, endDate)
    if err != nil {
        return nil, err
    }
    
    // 缓存结果(统计数据缓存较长时间)
    s.setStatsToCache(ctx, cacheKey, stats, time.Hour)
    
    return stats, nil
}

缓存失效策略

go
type CacheInvalidator struct {
    redis    redis.Cmdable
    patterns map[string][]string // 失效模式映射
}

func NewCacheInvalidator(redis redis.Cmdable) *CacheInvalidator {
    ci := &CacheInvalidator{
        redis:    redis,
        patterns: make(map[string][]string),
    }
    
    // 定义失效模式
    ci.patterns["transaction_created"] = []string{
        "user_transactions:%d:*",
        "user_stats:%d:*",
        "ledger_summary:%d:*",
    }
    
    ci.patterns["transaction_updated"] = []string{
        "transaction:%d",
        "user_transactions:%d:*",
        "user_stats:%d:*",
    }
    
    return ci
}

// 事务后置处理:缓存失效
func (ci *CacheInvalidator) InvalidateOnTransactionCreated(
    ctx context.Context,
    tx *Transaction,
) error {
    patterns := ci.patterns["transaction_created"]
    
    for _, pattern := range patterns {
        keyPattern := fmt.Sprintf(pattern, tx.UserID)
        
        // 查找匹配的键
        keys, err := ci.redis.Keys(ctx, keyPattern).Result()
        if err != nil {
            continue
        }
        
        // 批量删除
        if len(keys) > 0 {
            ci.redis.Del(ctx, keys...)
        }
    }
    
    return nil
}

// 在业务层集成缓存失效
func (s *TransactionService) CreateTransaction(
    ctx context.Context,
    req *CreateTransactionRequest,
) (*Transaction, error) {
    // 创建事务
    tx, err := s.repository.Create(ctx, req)
    if err != nil {
        return nil, err
    }
    
    // 异步失效缓存
    go s.cacheInvalidator.InvalidateOnTransactionCreated(context.Background(), tx)
    
    return tx, nil
}

连接池优化

1. 连接池配置优化

go
type DatabaseConfig struct {
    DSN                  string
    MaxOpenConns         int           // 最大打开连接数
    MaxIdleConns         int           // 最大空闲连接数
    ConnMaxLifetime      time.Duration // 连接最大生命周期
    ConnMaxIdleTime      time.Duration // 连接最大空闲时间
    HealthCheckInterval  time.Duration // 健康检查间隔
}

func NewOptimizedDB(config *DatabaseConfig) (*gorm.DB, error) {
    db, err := gorm.Open(mysql.Open(config.DSN), &gorm.Config{
        Logger: logger.Default.LogMode(logger.Info),
        NamingStrategy: schema.NamingStrategy{
            SingularTable: true, // 使用单数表名
        },
        NowFunc: func() time.Time {
            return time.Now().UTC()
        },
    })
    
    if err != nil {
        return nil, err
    }
    
    sqlDB, err := db.DB()
    if err != nil {
        return nil, err
    }
    
    // 连接池优化配置
    sqlDB.SetMaxOpenConns(config.MaxOpenConns)         // 100
    sqlDB.SetMaxIdleConns(config.MaxIdleConns)         // 10
    sqlDB.SetConnMaxLifetime(config.ConnMaxLifetime)   // 1小时
    sqlDB.SetConnMaxIdleTime(config.ConnMaxIdleTime)   // 30分钟
    
    // 启动健康检查
    go startHealthCheck(sqlDB, config.HealthCheckInterval)
    
    return db, nil
}

func startHealthCheck(db *sql.DB, interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    for range ticker.C {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        err := db.PingContext(ctx)
        cancel()
        
        if err != nil {
            log.Printf("Database health check failed: %v", err)
            // 可以发送告警或重连
        }
    }
}

2. 读写分离配置

go
type DBCluster struct {
    master *gorm.DB
    slaves []*gorm.DB
    policy LoadBalancePolicy
}

type LoadBalancePolicy interface {
    SelectSlave(slaves []*gorm.DB) *gorm.DB
}

// 轮询策略
type RoundRobinPolicy struct {
    counter uint64
}

func (p *RoundRobinPolicy) SelectSlave(slaves []*gorm.DB) *gorm.DB {
    if len(slaves) == 0 {
        return nil
    }
    index := atomic.AddUint64(&p.counter, 1) % uint64(len(slaves))
    return slaves[index]
}

func NewDBCluster(masterDSN string, slaveDSNs []string) (*DBCluster, error) {
    master, err := NewOptimizedDB(&DatabaseConfig{DSN: masterDSN})
    if err != nil {
        return nil, err
    }
    
    var slaves []*gorm.DB
    for _, dsn := range slaveDSNs {
        slave, err := NewOptimizedDB(&DatabaseConfig{DSN: dsn})
        if err != nil {
            continue // 继续尝试其他从库
        }
        slaves = append(slaves, slave)
    }
    
    return &DBCluster{
        master: master,
        slaves: slaves,
        policy: &RoundRobinPolicy{},
    }, nil
}

func (cluster *DBCluster) Master() *gorm.DB {
    return cluster.master
}

func (cluster *DBCluster) Slave() *gorm.DB {
    if len(cluster.slaves) == 0 {
        return cluster.master // 降级到主库
    }
    return cluster.policy.SelectSlave(cluster.slaves)
}

// Repository 层使用读写分离
func (r *TransactionRepository) FindByID(ctx context.Context, id int64) (*Transaction, error) {
    var tx Transaction
    err := cluster.Slave().WithContext(ctx).First(&tx, id).Error
    return &tx, err
}

func (r *TransactionRepository) Create(ctx context.Context, tx *Transaction) error {
    return cluster.Master().WithContext(ctx).Create(tx).Error
}

监控和性能分析

1. 慢查询监控

sql
-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 0.5;  -- 0.5秒
SET GLOBAL log_queries_not_using_indexes = 'ON';

-- 查看慢查询统计
SELECT 
    SCHEMA_NAME,
    DIGEST_TEXT,
    COUNT_STAR,
    SUM_TIMER_WAIT/1000000000000 as SUM_TIME_SEC,
    AVG_TIMER_WAIT/1000000000000 as AVG_TIME_SEC,
    SUM_ROWS_EXAMINED,
    SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest 
WHERE SCHEMA_NAME = 'ledger_db'
ORDER BY SUM_TIMER_WAIT DESC 
LIMIT 10;

2. Go 性能监控

go
// 数据库性能指标收集
type DBMetrics struct {
    QueryDuration     prometheus.HistogramVec
    ConnectionsInUse  prometheus.Gauge
    ConnectionsIdle   prometheus.Gauge
    QueryErrors       prometheus.CounterVec
}

func NewDBMetrics() *DBMetrics {
    return &DBMetrics{
        QueryDuration: *prometheus.NewHistogramVec(
            prometheus.HistogramOpts{
                Name: "db_query_duration_seconds",
                Help: "Database query duration in seconds",
                Buckets: prometheus.DefBuckets,
            },
            []string{"operation", "table"},
        ),
        ConnectionsInUse: prometheus.NewGauge(
            prometheus.GaugeOpts{
                Name: "db_connections_in_use",
                Help: "Number of database connections currently in use",
            },
        ),
        ConnectionsIdle: prometheus.NewGauge(
            prometheus.GaugeOpts{
                Name: "db_connections_idle",
                Help: "Number of idle database connections",
            },
        ),
        QueryErrors: *prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "db_query_errors_total",
                Help: "Total number of database query errors",
            },
            []string{"operation", "error_type"},
        ),
    }
}

// 监控中间件
func (m *DBMetrics) MonitoringMiddleware() gorm.Plugin {
    return &monitoringPlugin{metrics: m}
}

type monitoringPlugin struct {
    metrics *DBMetrics
}

func (p *monitoringPlugin) Name() string {
    return "monitoring"
}

func (p *monitoringPlugin) Initialize(db *gorm.DB) error {
    // 注册回调
    db.Callback().Query().Before("gorm:query").Register("monitoring:before", p.beforeQuery)
    db.Callback().Query().After("gorm:query").Register("monitoring:after", p.afterQuery)
    
    // 启动连接池监控
    go p.monitorConnectionPool(db)
    
    return nil
}

func (p *monitoringPlugin) beforeQuery(db *gorm.DB) {
    db.InstanceSet("start_time", time.Now())
}

func (p *monitoringPlugin) afterQuery(db *gorm.DB) {
    startTime, exists := db.InstanceGet("start_time")
    if !exists {
        return
    }
    
    duration := time.Since(startTime.(time.Time))
    
    // 记录查询耗时
    operation := "select"
    if db.Statement.Schema != nil {
        tableName := db.Statement.Schema.Table
        p.metrics.QueryDuration.WithLabelValues(operation, tableName).Observe(duration.Seconds())
    }
    
    // 记录错误
    if db.Error != nil && !errors.Is(db.Error, gorm.ErrRecordNotFound) {
        p.metrics.QueryErrors.WithLabelValues(operation, db.Error.Error()).Inc()
    }
}

func (p *monitoringPlugin) monitorConnectionPool(db *gorm.DB) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    sqlDB, _ := db.DB()
    
    for range ticker.C {
        stats := sqlDB.Stats()
        p.metrics.ConnectionsInUse.Set(float64(stats.InUse))
        p.metrics.ConnectionsIdle.Set(float64(stats.Idle))
    }
}

最佳实践总结

1. 查询优化检查清单

  • [ ] 避免 SELECT * 查询
  • [ ] 使用合适的索引
  • [ ] 避免深度分页
  • [ ] 使用预编译语句
  • [ ] 合理使用 JOIN
  • [ ] 避免在 WHERE 中使用函数
  • [ ] 使用 LIMIT 限制结果集
  • [ ] 优化 ORDER BY 性能

2. 索引设计原则

  • [ ] 基于查询模式设计复合索引
  • [ ] 考虑索引的选择性
  • [ ] 避免过多的索引
  • [ ] 定期分析索引使用情况
  • [ ] 使用覆盖索引优化查询
  • [ ] 合理使用前缀索引

3. 缓存策略原则

  • [ ] 多级缓存架构
  • [ ] 合理的缓存过期时间
  • [ ] 缓存预热和失效策略
  • [ ] 缓存穿透和雪崩防护
  • [ ] 监控缓存命中率
  • [ ] 异步更新缓存

4. 连接池最佳实践

  • [ ] 合理配置连接池大小
  • [ ] 设置合适的连接生命周期
  • [ ] 实现连接健康检查
  • [ ] 读写分离配置
  • [ ] 监控连接池状态
  • [ ] 预留连接数用于突发流量

这些优化策略确保了 Ledger 模块在高并发场景下的数据库性能,通过系统化的优化方法保证系统的稳定性和可扩展性。

基于 MIT 许可证发布