数据库优化策略
概述
数据库优化是 Ledger 模块性能保障的关键环节。本文档详细介绍了 MySQL 数据库在高并发财务场景下的优化策略,包括查询优化、索引设计、分区分表、读写分离、缓存策略等各个方面。
优化目标
- 查询响应时间: < 100ms (90% 查询)
- 并发处理能力: 10,000+ QPS
- 数据一致性: ACID 特性保障
- 高可用性: 99.99% 可用性
- 存储效率: 合理的空间利用率
技术选型
- 数据库: MySQL 8.0+ (InnoDB 引擎)
- 连接池: HikariCP / go-sql-driver
- 监控: Prometheus + Grafana
- 慢查询分析: pt-query-digest
- 性能调优: MySQLTuner
查询优化策略
1. SQL 查询优化
查询重写技术
sql
-- 原始查询(性能差)
SELECT t.*, l.name as ledger_name
FROM transactions t
LEFT JOIN ledgers l ON t.ledger_id = l.id
WHERE t.user_id = 12345
AND t.created_at >= '2024-01-01'
AND t.amount > 10000
ORDER BY t.created_at DESC
LIMIT 20;
-- 优化后查询
SELECT t.id, t.ledger_id, t.type, t.amount, t.description,
t.created_at, l.name as ledger_name
FROM transactions t FORCE INDEX (idx_user_date_amount)
INNER JOIN ledgers l ON t.ledger_id = l.id
WHERE t.user_id = 12345
AND t.created_at >= '2024-01-01'
AND t.amount > 10000
ORDER BY t.created_at DESC
LIMIT 20;分页优化
sql
-- 传统分页(深度分页性能差)
SELECT * FROM transactions
WHERE user_id = 12345
ORDER BY created_at DESC
LIMIT 1000, 20;
-- 游标分页优化
SELECT * FROM transactions
WHERE user_id = 12345
AND created_at < '2024-01-15 10:30:00'
AND id < 98765 -- 使用 ID 作为辅助排序
ORDER BY created_at DESC, id DESC
LIMIT 20;
-- Go 代码实现
type PaginationCursor struct {
CreatedAt time.Time `json:"created_at"`
ID int64 `json:"id"`
}
func (r *TransactionRepository) FindWithCursor(
ctx context.Context,
userID int64,
cursor *PaginationCursor,
limit int,
) ([]*Transaction, *PaginationCursor, error) {
query := `
SELECT id, ledger_id, type, amount, description, created_at
FROM transactions
WHERE user_id = ?`
args := []interface{}{userID}
if cursor != nil {
query += ` AND (created_at < ? OR (created_at = ? AND id < ?))`
args = append(args, cursor.CreatedAt, cursor.CreatedAt, cursor.ID)
}
query += ` ORDER BY created_at DESC, id DESC LIMIT ?`
args = append(args, limit)
rows, err := r.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, nil, err
}
defer rows.Close()
transactions := make([]*Transaction, 0, limit)
var nextCursor *PaginationCursor
for rows.Next() {
tx := &Transaction{}
err := rows.Scan(&tx.ID, &tx.LedgerID, &tx.Type,
&tx.Amount, &tx.Description, &tx.CreatedAt)
if err != nil {
return nil, nil, err
}
transactions = append(transactions, tx)
}
if len(transactions) == limit {
lastTx := transactions[len(transactions)-1]
nextCursor = &PaginationCursor{
CreatedAt: lastTx.CreatedAt,
ID: lastTx.ID,
}
}
return transactions, nextCursor, nil
}2. 聚合查询优化
预计算统计数据
sql
-- 创建统计汇总表
CREATE TABLE daily_transaction_summary (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
ledger_id BIGINT NOT NULL,
stat_date DATE NOT NULL,
transaction_count INT DEFAULT 0,
total_income DECIMAL(15,2) DEFAULT 0.00,
total_expense DECIMAL(15,2) DEFAULT 0.00,
net_amount DECIMAL(15,2) DEFAULT 0.00,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_user_ledger_date (user_id, ledger_id, stat_date),
KEY idx_user_date (user_id, stat_date),
KEY idx_ledger_date (ledger_id, stat_date)
) ENGINE=InnoDB;
-- 实时更新统计数据的触发器
DELIMITER $$
CREATE TRIGGER tr_transaction_after_insert
AFTER INSERT ON transactions
FOR EACH ROW
BEGIN
INSERT INTO daily_transaction_summary
(user_id, ledger_id, stat_date, transaction_count,
total_income, total_expense, net_amount)
VALUES
(NEW.user_id, NEW.ledger_id, DATE(NEW.created_at), 1,
IF(NEW.amount > 0, NEW.amount, 0),
IF(NEW.amount < 0, ABS(NEW.amount), 0),
NEW.amount)
ON DUPLICATE KEY UPDATE
transaction_count = transaction_count + 1,
total_income = total_income + IF(NEW.amount > 0, NEW.amount, 0),
total_expense = total_expense + IF(NEW.amount < 0, ABS(NEW.amount), 0),
net_amount = net_amount + NEW.amount,
updated_at = CURRENT_TIMESTAMP;
END$$
DELIMITER ;Go 实现预计算服务
go
type StatisticsService struct {
db *gorm.DB
redis redis.Cmdable
scheduler *cron.Cron
logger *zap.Logger
}
// 定时预计算任务
func (s *StatisticsService) StartScheduler() {
s.scheduler.AddFunc("0 1 * * *", s.calculateDailyStats) // 每天凌晨1点
s.scheduler.AddFunc("0 2 1 * *", s.calculateMonthlyStats) // 每月1日凌晨2点
s.scheduler.AddFunc("0 3 1 1 *", s.calculateYearlyStats) // 每年1月1日凌晨3点
s.scheduler.Start()
}
func (s *StatisticsService) calculateDailyStats() {
ctx := context.Background()
yesterday := time.Now().AddDate(0, 0, -1)
// 批量计算所有用户的昨日统计
query := `
INSERT INTO daily_transaction_summary
(user_id, ledger_id, stat_date, transaction_count,
total_income, total_expense, net_amount)
SELECT
user_id,
ledger_id,
DATE(created_at) as stat_date,
COUNT(*) as transaction_count,
SUM(CASE WHEN amount > 0 THEN amount ELSE 0 END) as total_income,
SUM(CASE WHEN amount < 0 THEN ABS(amount) ELSE 0 END) as total_expense,
SUM(amount) as net_amount
FROM transactions
WHERE DATE(created_at) = ?
GROUP BY user_id, ledger_id, DATE(created_at)
ON DUPLICATE KEY UPDATE
transaction_count = VALUES(transaction_count),
total_income = VALUES(total_income),
total_expense = VALUES(total_expense),
net_amount = VALUES(net_amount),
updated_at = CURRENT_TIMESTAMP`
result := s.db.WithContext(ctx).Exec(query, yesterday.Format("2006-01-02"))
if result.Error != nil {
s.logger.Error("Failed to calculate daily stats",
zap.Error(result.Error),
zap.String("date", yesterday.Format("2006-01-02")))
return
}
s.logger.Info("Daily stats calculated successfully",
zap.Int64("affected_rows", result.RowsAffected),
zap.String("date", yesterday.Format("2006-01-02")))
}索引优化策略
1. 索引设计原则
复合索引设计
sql
-- 基于查询模式设计复合索引
-- 查询模式分析:
-- 1. 按用户查询交易 (user_id)
-- 2. 按时间范围过滤 (created_at)
-- 3. 按金额范围过滤 (amount)
-- 4. 按交易类型过滤 (type)
-- 主要复合索引
CREATE INDEX idx_user_date_amount ON transactions(user_id, created_at, amount);
CREATE INDEX idx_user_type_date ON transactions(user_id, type, created_at);
CREATE INDEX idx_ledger_date_type ON transactions(ledger_id, created_at, type);
-- 覆盖索引(包含常用查询的所有列)
CREATE INDEX idx_user_date_cover ON transactions(
user_id, created_at, id, type, amount, description
);索引选择性分析
sql
-- 分析列的选择性
SELECT
'user_id' as column_name,
COUNT(DISTINCT user_id) as distinct_values,
COUNT(*) as total_rows,
COUNT(DISTINCT user_id) / COUNT(*) as selectivity
FROM transactions
UNION ALL
SELECT
'type' as column_name,
COUNT(DISTINCT type) as distinct_values,
COUNT(*) as total_rows,
COUNT(DISTINCT type) / COUNT(*) as selectivity
FROM transactions
UNION ALL
SELECT
'amount' as column_name,
COUNT(DISTINCT amount) as distinct_values,
COUNT(*) as total_rows,
COUNT(DISTINCT amount) / COUNT(*) as selectivity
FROM transactions;
-- 索引使用情况监控
SELECT
TABLE_NAME,
INDEX_NAME,
CARDINALITY,
SEQ_IN_INDEX,
COLUMN_NAME,
COLLATION,
NULLABLE
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = 'ledger_db'
AND TABLE_NAME = 'transactions'
ORDER BY TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX;2. Go 代码索引优化
go
// Repository 层优化查询
type TransactionRepository struct {
db *gorm.DB
cache redis.Cmdable
}
// 使用强制索引
func (r *TransactionRepository) FindByUserAndDateRange(
ctx context.Context,
userID int64,
startDate, endDate time.Time,
limit, offset int,
) ([]*Transaction, error) {
var transactions []*Transaction
// 强制使用最优索引
err := r.db.WithContext(ctx).
Table("transactions FORCE INDEX (idx_user_date_amount)").
Where("user_id = ? AND created_at BETWEEN ? AND ?",
userID, startDate, endDate).
Order("created_at DESC").
Limit(limit).
Offset(offset).
Find(&transactions).Error
return transactions, err
}
// 索引提示优化复杂查询
func (r *TransactionRepository) FindComplexQuery(
ctx context.Context,
filter *TransactionFilter,
) ([]*Transaction, error) {
query := r.db.WithContext(ctx).Model(&Transaction{})
// 根据过滤条件选择最优索引
if filter.UserID != 0 && !filter.StartDate.IsZero() {
query = query.Table("transactions USE INDEX (idx_user_date_amount)")
} else if filter.LedgerID != 0 {
query = query.Table("transactions USE INDEX (idx_ledger_date_type)")
}
// 动态构建查询条件
if filter.UserID != 0 {
query = query.Where("user_id = ?", filter.UserID)
}
if filter.LedgerID != 0 {
query = query.Where("ledger_id = ?", filter.LedgerID)
}
if !filter.StartDate.IsZero() && !filter.EndDate.IsZero() {
query = query.Where("created_at BETWEEN ? AND ?",
filter.StartDate, filter.EndDate)
}
if len(filter.Types) > 0 {
query = query.Where("type IN ?", filter.Types)
}
if filter.MinAmount != nil {
query = query.Where("amount >= ?", *filter.MinAmount)
}
if filter.MaxAmount != nil {
query = query.Where("amount <= ?", *filter.MaxAmount)
}
var transactions []*Transaction
err := query.Order("created_at DESC").
Limit(filter.Limit).
Offset(filter.Offset).
Find(&transactions).Error
return transactions, err
}分区分表策略
1. 水平分区
按时间分区
sql
-- 创建分区表(按年分区)
CREATE TABLE transactions (
id BIGINT NOT NULL AUTO_INCREMENT,
user_id BIGINT NOT NULL,
ledger_id BIGINT NOT NULL,
type ENUM('income', 'expense', 'transfer') NOT NULL,
amount DECIMAL(15,2) NOT NULL,
description TEXT,
transaction_date DATETIME NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id, transaction_date),
KEY idx_user_date (user_id, transaction_date),
KEY idx_ledger_date (ledger_id, transaction_date)
) ENGINE=InnoDB
PARTITION BY RANGE (YEAR(transaction_date)) (
PARTITION p2023 VALUES LESS THAN (2024),
PARTITION p2024 VALUES LESS THAN (2025),
PARTITION p2025 VALUES LESS THAN (2026),
PARTITION p2026 VALUES LESS THAN (2027),
PARTITION p_future VALUES LESS THAN MAXVALUE
);
-- 动态添加新分区
ALTER TABLE transactions
ADD PARTITION (PARTITION p2027 VALUES LESS THAN (2028));
-- 删除历史分区
ALTER TABLE transactions DROP PARTITION p2023;按用户 Hash 分区
sql
-- 用户维度 Hash 分区
CREATE TABLE user_transactions (
id BIGINT NOT NULL AUTO_INCREMENT,
user_id BIGINT NOT NULL,
ledger_id BIGINT NOT NULL,
-- ... 其他字段
PRIMARY KEY (id, user_id),
KEY idx_user_date (user_id, created_at)
) ENGINE=InnoDB
PARTITION BY HASH(user_id)
PARTITIONS 16;2. 分表策略
Go 实现分表路由
go
type ShardingConfig struct {
ShardCount int
DatabaseName string
TablePrefix string
}
type ShardingManager struct {
config *ShardingConfig
databases map[int]*gorm.DB
logger *zap.Logger
}
func NewShardingManager(config *ShardingConfig) *ShardingManager {
manager := &ShardingManager{
config: config,
databases: make(map[int]*gorm.DB),
logger: zap.L(),
}
// 初始化所有分片数据库连接
for i := 0; i < config.ShardCount; i++ {
db := initShardDB(i, config)
manager.databases[i] = db
}
return manager
}
// 基于用户ID的分片路由
func (sm *ShardingManager) GetShardByUserID(userID int64) int {
return int(userID % int64(sm.config.ShardCount))
}
// 基于时间的分表路由
func (sm *ShardingManager) GetTableName(baseTable string, date time.Time) string {
suffix := date.Format("200601") // YYYYMM
return fmt.Sprintf("%s_%s", baseTable, suffix)
}
// 分片查询实现
func (sm *ShardingManager) QueryTransactions(
ctx context.Context,
userID int64,
startDate, endDate time.Time,
) ([]*Transaction, error) {
shardID := sm.GetShardByUserID(userID)
db := sm.databases[shardID]
// 计算需要查询的表
tables := sm.getTablesInRange("transactions", startDate, endDate)
var allTransactions []*Transaction
for _, tableName := range tables {
var transactions []*Transaction
err := db.WithContext(ctx).
Table(tableName).
Where("user_id = ? AND created_at BETWEEN ? AND ?",
userID, startDate, endDate).
Find(&transactions).Error
if err != nil {
sm.logger.Error("Failed to query shard table",
zap.String("table", tableName),
zap.Int("shard", shardID),
zap.Error(err))
continue
}
allTransactions = append(allTransactions, transactions...)
}
// 合并结果并排序
sort.Slice(allTransactions, func(i, j int) bool {
return allTransactions[i].CreatedAt.After(allTransactions[j].CreatedAt)
})
return allTransactions, nil
}
func (sm *ShardingManager) getTablesInRange(
baseTable string,
startDate, endDate time.Time,
) []string {
var tables []string
current := startDate
for current.Before(endDate) || current.Equal(endDate) {
tableName := sm.GetTableName(baseTable, current)
tables = append(tables, tableName)
current = current.AddDate(0, 1, 0) // 下个月
}
return tables
}缓存优化策略
1. 多级缓存架构
Go 实现多级缓存
go
type CacheManager struct {
localCache cache.Cache // 本地缓存
redisCache redis.Cmdable // Redis 缓存
db *gorm.DB // 数据库
stats *CacheStats // 缓存统计
}
type CacheConfig struct {
LocalTTL time.Duration
RedisTTL time.Duration
LocalMaxSize int
RedisMaxSize int
EnableMetrics bool
}
func NewCacheManager(config *CacheConfig) *CacheManager {
return &CacheManager{
localCache: cache.New(config.LocalTTL, time.Minute),
stats: NewCacheStats(),
}
}
// 多级缓存查询
func (cm *CacheManager) GetTransaction(
ctx context.Context,
id int64,
) (*Transaction, error) {
cacheKey := fmt.Sprintf("transaction:%d", id)
// L1: 本地缓存
if data, found := cm.localCache.Get(cacheKey); found {
cm.stats.LocalHits.Inc()
return data.(*Transaction), nil
}
cm.stats.LocalMisses.Inc()
// L2: Redis 缓存
data, err := cm.redisCache.Get(ctx, cacheKey).Result()
if err == nil {
var tx Transaction
if err := json.Unmarshal([]byte(data), &tx); err == nil {
// 回填本地缓存
cm.localCache.Set(cacheKey, &tx, cache.DefaultExpiration)
cm.stats.RedisHits.Inc()
return &tx, nil
}
}
cm.stats.RedisMisses.Inc()
// L3: 数据库查询
var tx Transaction
err = cm.db.WithContext(ctx).First(&tx, id).Error
if err != nil {
cm.stats.DBMisses.Inc()
return nil, err
}
cm.stats.DBHits.Inc()
// 异步写入缓存
go cm.asyncSetCache(cacheKey, &tx)
return &tx, nil
}
func (cm *CacheManager) asyncSetCache(key string, tx *Transaction) {
// 写入本地缓存
cm.localCache.Set(key, tx, cache.DefaultExpiration)
// 写入 Redis 缓存
data, err := json.Marshal(tx)
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cm.redisCache.SetEX(ctx, key, data, 10*time.Minute)
}2. 缓存策略实现
查询结果缓存
go
// 用户交易列表缓存
func (r *TransactionRepository) FindByUserWithCache(
ctx context.Context,
userID int64,
limit, offset int,
) ([]*Transaction, error) {
cacheKey := fmt.Sprintf("user_transactions:%d:%d:%d", userID, limit, offset)
// 尝试从缓存获取
cached, err := r.cache.Get(ctx, cacheKey).Result()
if err == nil {
var transactions []*Transaction
if json.Unmarshal([]byte(cached), &transactions) == nil {
return transactions, nil
}
}
// 从数据库查询
var transactions []*Transaction
err = r.db.WithContext(ctx).
Where("user_id = ?", userID).
Order("created_at DESC").
Limit(limit).
Offset(offset).
Find(&transactions).Error
if err != nil {
return nil, err
}
// 异步写入缓存
go func() {
data, _ := json.Marshal(transactions)
r.cache.SetEX(context.Background(), cacheKey, data, 5*time.Minute)
}()
return transactions, nil
}
// 统计数据缓存
func (s *StatisticsService) GetUserStats(
ctx context.Context,
userID int64,
startDate, endDate time.Time,
) (*UserStats, error) {
cacheKey := fmt.Sprintf("user_stats:%d:%s:%s",
userID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02"))
// 检查缓存
if stats := s.getStatsFromCache(ctx, cacheKey); stats != nil {
return stats, nil
}
// 计算统计数据
stats, err := s.calculateUserStats(ctx, userID, startDate, endDate)
if err != nil {
return nil, err
}
// 缓存结果(统计数据缓存较长时间)
s.setStatsToCache(ctx, cacheKey, stats, time.Hour)
return stats, nil
}缓存失效策略
go
type CacheInvalidator struct {
redis redis.Cmdable
patterns map[string][]string // 失效模式映射
}
func NewCacheInvalidator(redis redis.Cmdable) *CacheInvalidator {
ci := &CacheInvalidator{
redis: redis,
patterns: make(map[string][]string),
}
// 定义失效模式
ci.patterns["transaction_created"] = []string{
"user_transactions:%d:*",
"user_stats:%d:*",
"ledger_summary:%d:*",
}
ci.patterns["transaction_updated"] = []string{
"transaction:%d",
"user_transactions:%d:*",
"user_stats:%d:*",
}
return ci
}
// 事务后置处理:缓存失效
func (ci *CacheInvalidator) InvalidateOnTransactionCreated(
ctx context.Context,
tx *Transaction,
) error {
patterns := ci.patterns["transaction_created"]
for _, pattern := range patterns {
keyPattern := fmt.Sprintf(pattern, tx.UserID)
// 查找匹配的键
keys, err := ci.redis.Keys(ctx, keyPattern).Result()
if err != nil {
continue
}
// 批量删除
if len(keys) > 0 {
ci.redis.Del(ctx, keys...)
}
}
return nil
}
// 在业务层集成缓存失效
func (s *TransactionService) CreateTransaction(
ctx context.Context,
req *CreateTransactionRequest,
) (*Transaction, error) {
// 创建事务
tx, err := s.repository.Create(ctx, req)
if err != nil {
return nil, err
}
// 异步失效缓存
go s.cacheInvalidator.InvalidateOnTransactionCreated(context.Background(), tx)
return tx, nil
}连接池优化
1. 连接池配置优化
go
type DatabaseConfig struct {
DSN string
MaxOpenConns int // 最大打开连接数
MaxIdleConns int // 最大空闲连接数
ConnMaxLifetime time.Duration // 连接最大生命周期
ConnMaxIdleTime time.Duration // 连接最大空闲时间
HealthCheckInterval time.Duration // 健康检查间隔
}
func NewOptimizedDB(config *DatabaseConfig) (*gorm.DB, error) {
db, err := gorm.Open(mysql.Open(config.DSN), &gorm.Config{
Logger: logger.Default.LogMode(logger.Info),
NamingStrategy: schema.NamingStrategy{
SingularTable: true, // 使用单数表名
},
NowFunc: func() time.Time {
return time.Now().UTC()
},
})
if err != nil {
return nil, err
}
sqlDB, err := db.DB()
if err != nil {
return nil, err
}
// 连接池优化配置
sqlDB.SetMaxOpenConns(config.MaxOpenConns) // 100
sqlDB.SetMaxIdleConns(config.MaxIdleConns) // 10
sqlDB.SetConnMaxLifetime(config.ConnMaxLifetime) // 1小时
sqlDB.SetConnMaxIdleTime(config.ConnMaxIdleTime) // 30分钟
// 启动健康检查
go startHealthCheck(sqlDB, config.HealthCheckInterval)
return db, nil
}
func startHealthCheck(db *sql.DB, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := db.PingContext(ctx)
cancel()
if err != nil {
log.Printf("Database health check failed: %v", err)
// 可以发送告警或重连
}
}
}2. 读写分离配置
go
type DBCluster struct {
master *gorm.DB
slaves []*gorm.DB
policy LoadBalancePolicy
}
type LoadBalancePolicy interface {
SelectSlave(slaves []*gorm.DB) *gorm.DB
}
// 轮询策略
type RoundRobinPolicy struct {
counter uint64
}
func (p *RoundRobinPolicy) SelectSlave(slaves []*gorm.DB) *gorm.DB {
if len(slaves) == 0 {
return nil
}
index := atomic.AddUint64(&p.counter, 1) % uint64(len(slaves))
return slaves[index]
}
func NewDBCluster(masterDSN string, slaveDSNs []string) (*DBCluster, error) {
master, err := NewOptimizedDB(&DatabaseConfig{DSN: masterDSN})
if err != nil {
return nil, err
}
var slaves []*gorm.DB
for _, dsn := range slaveDSNs {
slave, err := NewOptimizedDB(&DatabaseConfig{DSN: dsn})
if err != nil {
continue // 继续尝试其他从库
}
slaves = append(slaves, slave)
}
return &DBCluster{
master: master,
slaves: slaves,
policy: &RoundRobinPolicy{},
}, nil
}
func (cluster *DBCluster) Master() *gorm.DB {
return cluster.master
}
func (cluster *DBCluster) Slave() *gorm.DB {
if len(cluster.slaves) == 0 {
return cluster.master // 降级到主库
}
return cluster.policy.SelectSlave(cluster.slaves)
}
// Repository 层使用读写分离
func (r *TransactionRepository) FindByID(ctx context.Context, id int64) (*Transaction, error) {
var tx Transaction
err := cluster.Slave().WithContext(ctx).First(&tx, id).Error
return &tx, err
}
func (r *TransactionRepository) Create(ctx context.Context, tx *Transaction) error {
return cluster.Master().WithContext(ctx).Create(tx).Error
}监控和性能分析
1. 慢查询监控
sql
-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 0.5; -- 0.5秒
SET GLOBAL log_queries_not_using_indexes = 'ON';
-- 查看慢查询统计
SELECT
SCHEMA_NAME,
DIGEST_TEXT,
COUNT_STAR,
SUM_TIMER_WAIT/1000000000000 as SUM_TIME_SEC,
AVG_TIMER_WAIT/1000000000000 as AVG_TIME_SEC,
SUM_ROWS_EXAMINED,
SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest
WHERE SCHEMA_NAME = 'ledger_db'
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;2. Go 性能监控
go
// 数据库性能指标收集
type DBMetrics struct {
QueryDuration prometheus.HistogramVec
ConnectionsInUse prometheus.Gauge
ConnectionsIdle prometheus.Gauge
QueryErrors prometheus.CounterVec
}
func NewDBMetrics() *DBMetrics {
return &DBMetrics{
QueryDuration: *prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "db_query_duration_seconds",
Help: "Database query duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"operation", "table"},
),
ConnectionsInUse: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "db_connections_in_use",
Help: "Number of database connections currently in use",
},
),
ConnectionsIdle: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "db_connections_idle",
Help: "Number of idle database connections",
},
),
QueryErrors: *prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "db_query_errors_total",
Help: "Total number of database query errors",
},
[]string{"operation", "error_type"},
),
}
}
// 监控中间件
func (m *DBMetrics) MonitoringMiddleware() gorm.Plugin {
return &monitoringPlugin{metrics: m}
}
type monitoringPlugin struct {
metrics *DBMetrics
}
func (p *monitoringPlugin) Name() string {
return "monitoring"
}
func (p *monitoringPlugin) Initialize(db *gorm.DB) error {
// 注册回调
db.Callback().Query().Before("gorm:query").Register("monitoring:before", p.beforeQuery)
db.Callback().Query().After("gorm:query").Register("monitoring:after", p.afterQuery)
// 启动连接池监控
go p.monitorConnectionPool(db)
return nil
}
func (p *monitoringPlugin) beforeQuery(db *gorm.DB) {
db.InstanceSet("start_time", time.Now())
}
func (p *monitoringPlugin) afterQuery(db *gorm.DB) {
startTime, exists := db.InstanceGet("start_time")
if !exists {
return
}
duration := time.Since(startTime.(time.Time))
// 记录查询耗时
operation := "select"
if db.Statement.Schema != nil {
tableName := db.Statement.Schema.Table
p.metrics.QueryDuration.WithLabelValues(operation, tableName).Observe(duration.Seconds())
}
// 记录错误
if db.Error != nil && !errors.Is(db.Error, gorm.ErrRecordNotFound) {
p.metrics.QueryErrors.WithLabelValues(operation, db.Error.Error()).Inc()
}
}
func (p *monitoringPlugin) monitorConnectionPool(db *gorm.DB) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
sqlDB, _ := db.DB()
for range ticker.C {
stats := sqlDB.Stats()
p.metrics.ConnectionsInUse.Set(float64(stats.InUse))
p.metrics.ConnectionsIdle.Set(float64(stats.Idle))
}
}最佳实践总结
1. 查询优化检查清单
- [ ] 避免 SELECT * 查询
- [ ] 使用合适的索引
- [ ] 避免深度分页
- [ ] 使用预编译语句
- [ ] 合理使用 JOIN
- [ ] 避免在 WHERE 中使用函数
- [ ] 使用 LIMIT 限制结果集
- [ ] 优化 ORDER BY 性能
2. 索引设计原则
- [ ] 基于查询模式设计复合索引
- [ ] 考虑索引的选择性
- [ ] 避免过多的索引
- [ ] 定期分析索引使用情况
- [ ] 使用覆盖索引优化查询
- [ ] 合理使用前缀索引
3. 缓存策略原则
- [ ] 多级缓存架构
- [ ] 合理的缓存过期时间
- [ ] 缓存预热和失效策略
- [ ] 缓存穿透和雪崩防护
- [ ] 监控缓存命中率
- [ ] 异步更新缓存
4. 连接池最佳实践
- [ ] 合理配置连接池大小
- [ ] 设置合适的连接生命周期
- [ ] 实现连接健康检查
- [ ] 读写分离配置
- [ ] 监控连接池状态
- [ ] 预留连接数用于突发流量
这些优化策略确保了 Ledger 模块在高并发场景下的数据库性能,通过系统化的优化方法保证系统的稳定性和可扩展性。