Skip to content

Ledger 数据库设计

概述

Ledger 模块的数据库设计基于 领域驱动设计(DDD) 原则,采用 MySQL 作为主数据库,Redis 作为缓存层。设计注重数据一致性、查询性能和扩展性,支持高并发的财务数据管理需求。

设计原则

  • ACID 特性: 保证财务数据的强一致性
  • 范式设计: 遵循第三范式,避免数据冗余
  • 性能优化: 合理的索引设计和分区策略
  • 可扩展性: 支持水平分片和读写分离
  • 审计追踪: 完整的数据变更记录

技术选型

  • 主数据库: MySQL 8.0+ (InnoDB 引擎)
  • 缓存: Redis 7.0+ (集群模式)
  • ORM: GORM v1.25+ (Go)
  • 迁移工具: GORM AutoMigrate + 自定义脚本
  • 备份: mysqldump + binlog

数据库架构

1. 整体架构设计

2. 分片策略

核心表设计

1. 账本表 (ledgers)

sql
CREATE TABLE `ledgers` (
  `id` VARCHAR(36) NOT NULL COMMENT '账本唯一标识',
  `name` VARCHAR(100) NOT NULL COMMENT '账本名称',
  `description` TEXT COMMENT '账本描述',
  `type` TINYINT NOT NULL DEFAULT 1 COMMENT '账本类型: 1=个人,2=家庭,3=项目,4=旅行,5=商务',
  `status` TINYINT NOT NULL DEFAULT 1 COMMENT '账本状态: 1=活跃,2=归档,3=暂停,4=删除',
  `default_currency` VARCHAR(3) NOT NULL DEFAULT 'CNY' COMMENT '默认货币代码',
  `owner_id` VARCHAR(36) NOT NULL COMMENT '所有者用户ID',
  
  -- 配置信息 (JSON 格式)
  `settings` JSON COMMENT '账本设置',
  
  -- 审计字段
  `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `deleted_at` TIMESTAMP NULL COMMENT '删除时间',
  `version` BIGINT UNSIGNED NOT NULL DEFAULT 1 COMMENT '乐观锁版本号',
  
  PRIMARY KEY (`id`),
  KEY `idx_owner_id` (`owner_id`),
  KEY `idx_type_status` (`type`, `status`),
  KEY `idx_created_at` (`created_at`),
  KEY `idx_deleted_at` (`deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='账本表';

2. 交易记录表 (transactions)

sql
CREATE TABLE `transactions` (
  `id` VARCHAR(36) NOT NULL COMMENT '交易唯一标识',
  `ledger_id` VARCHAR(36) NOT NULL COMMENT '所属账本ID',
  `type` TINYINT NOT NULL COMMENT '交易类型: 1=收入,2=支出,3=转账,4=调整',
  `amount` BIGINT NOT NULL COMMENT '交易金额(分为单位)',
  `currency` VARCHAR(3) NOT NULL DEFAULT 'CNY' COMMENT '货币代码',
  `description` VARCHAR(200) NOT NULL COMMENT '交易描述',
  `note` TEXT COMMENT '备注信息',
  
  -- 分类信息
  `category_id` VARCHAR(36) COMMENT '分类ID',
  `subcategory_id` VARCHAR(36) COMMENT '子分类ID',
  
  -- 时间信息
  `transaction_date` DATE NOT NULL COMMENT '交易日期',
  `transaction_time` TIME COMMENT '交易时间',
  
  -- 位置信息
  `location_name` VARCHAR(100) COMMENT '地点名称',
  `latitude` DECIMAL(10, 8) COMMENT '纬度',
  `longitude` DECIMAL(11, 8) COMMENT '经度',
  
  -- 关联信息
  `related_transaction_id` VARCHAR(36) COMMENT '关联交易ID(转账等)',
  `recurring_id` VARCHAR(36) COMMENT '重复交易ID',
  `import_batch_id` VARCHAR(36) COMMENT '导入批次ID',
  
  -- 状态信息
  `status` TINYINT NOT NULL DEFAULT 2 COMMENT '交易状态: 1=待确认,2=已完成,3=已取消,4=失败,5=已退款',
  `source` VARCHAR(50) DEFAULT 'manual' COMMENT '数据来源: manual,import,api',
  
  -- 审计字段
  `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `deleted_at` TIMESTAMP NULL COMMENT '删除时间',
  `version` BIGINT UNSIGNED NOT NULL DEFAULT 1 COMMENT '版本号',
  
  PRIMARY KEY (`id`),
  KEY `idx_ledger_id` (`ledger_id`),
  KEY `idx_transaction_date` (`transaction_date`),
  KEY `idx_type_amount` (`type`, `amount`),
  KEY `idx_category` (`category_id`, `subcategory_id`),
  KEY `idx_status` (`status`),
  KEY `idx_created_at` (`created_at`),
  KEY `idx_deleted_at` (`deleted_at`),
  
  -- 复合索引优化查询
  KEY `idx_ledger_date_type` (`ledger_id`, `transaction_date`, `type`),
  KEY `idx_ledger_status_date` (`ledger_id`, `status`, `transaction_date`),
  
  FOREIGN KEY (`ledger_id`) REFERENCES `ledgers`(`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='交易记录表'
PARTITION BY RANGE (YEAR(`transaction_date`)) (
  PARTITION p2023 VALUES LESS THAN (2024),
  PARTITION p2024 VALUES LESS THAN (2025),
  PARTITION p2025 VALUES LESS THAN (2026),
  PARTITION p2026 VALUES LESS THAN (2027),
  PARTITION p_future VALUES LESS THAN MAXVALUE
);

3. 标签表 (tags)

sql
CREATE TABLE `tags` (
  `id` VARCHAR(36) NOT NULL COMMENT '标签唯一标识',
  `ledger_id` VARCHAR(36) NOT NULL COMMENT '所属账本ID',
  `name` VARCHAR(50) NOT NULL COMMENT '标签名称',
  `color` VARCHAR(7) COMMENT '标签颜色(十六进制)',
  `icon` VARCHAR(50) COMMENT '标签图标',
  `parent_id` VARCHAR(36) COMMENT '父标签ID(层级结构)',
  `sort_order` INT DEFAULT 0 COMMENT '排序顺序',
  
  -- 统计信息 (冗余字段,定期更新)
  `usage_count` INT DEFAULT 0 COMMENT '使用次数',
  `total_amount` BIGINT DEFAULT 0 COMMENT '总金额',
  `last_used_at` TIMESTAMP NULL COMMENT '最后使用时间',
  
  -- 审计字段
  `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `deleted_at` TIMESTAMP NULL COMMENT '删除时间',
  
  PRIMARY KEY (`id`),
  KEY `idx_ledger_id` (`ledger_id`),
  KEY `idx_name` (`name`),
  KEY `idx_parent_id` (`parent_id`),
  KEY `idx_sort_order` (`sort_order`),
  KEY `idx_usage_count` (`usage_count` DESC),
  KEY `idx_deleted_at` (`deleted_at`),
  
  UNIQUE KEY `uk_ledger_name` (`ledger_id`, `name`, `deleted_at`),
  
  FOREIGN KEY (`ledger_id`) REFERENCES `ledgers`(`id`) ON DELETE CASCADE,
  FOREIGN KEY (`parent_id`) REFERENCES `tags`(`id`) ON DELETE SET NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='标签表';

4. 交易标签关联表 (transaction_tags)

sql
CREATE TABLE `transaction_tags` (
  `transaction_id` VARCHAR(36) NOT NULL COMMENT '交易ID',
  `tag_id` VARCHAR(36) NOT NULL COMMENT '标签ID',
  `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '关联创建时间',
  
  PRIMARY KEY (`transaction_id`, `tag_id`),
  KEY `idx_tag_id` (`tag_id`),
  KEY `idx_created_at` (`created_at`),
  
  FOREIGN KEY (`transaction_id`) REFERENCES `transactions`(`id`) ON DELETE CASCADE,
  FOREIGN KEY (`tag_id`) REFERENCES `tags`(`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='交易标签关联表';

5. 预算表 (budgets)

sql
CREATE TABLE `budgets` (
  `id` VARCHAR(36) NOT NULL COMMENT '预算唯一标识',
  `ledger_id` VARCHAR(36) NOT NULL COMMENT '所属账本ID',
  `name` VARCHAR(100) NOT NULL COMMENT '预算名称',
  `description` TEXT COMMENT '预算描述',
  
  -- 预算配置
  `amount` BIGINT NOT NULL COMMENT '预算金额(分为单位)',
  `currency` VARCHAR(3) NOT NULL DEFAULT 'CNY' COMMENT '货币代码',
  `period_type` TINYINT NOT NULL COMMENT '周期类型: 1=日,2=周,3=月,4=季,5=年,6=自定义',
  `start_date` DATE NOT NULL COMMENT '开始日期',
  `end_date` DATE NOT NULL COMMENT '结束日期',
  
  -- 适用范围 (JSON 格式存储复杂条件)
  `scope_config` JSON COMMENT '预算适用范围配置',
  
  -- 预警配置
  `alert_thresholds` JSON COMMENT '预警阈值配置',
  `notification_config` JSON COMMENT '通知配置',
  
  -- 状态信息
  `status` TINYINT NOT NULL DEFAULT 1 COMMENT '预算状态: 1=活跃,2=暂停,3=已完成,4=已取消',
  `auto_renew` BOOLEAN DEFAULT FALSE COMMENT '是否自动续期',
  
  -- 使用情况 (实时更新)
  `used_amount` BIGINT DEFAULT 0 COMMENT '已使用金额',
  `transaction_count` INT DEFAULT 0 COMMENT '交易笔数',
  `last_transaction_at` TIMESTAMP NULL COMMENT '最后交易时间',
  
  -- 审计字段
  `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `deleted_at` TIMESTAMP NULL COMMENT '删除时间',
  `version` BIGINT UNSIGNED NOT NULL DEFAULT 1 COMMENT '版本号',
  
  PRIMARY KEY (`id`),
  KEY `idx_ledger_id` (`ledger_id`),
  KEY `idx_period` (`period_type`, `start_date`, `end_date`),
  KEY `idx_status` (`status`),
  KEY `idx_date_range` (`start_date`, `end_date`),
  KEY `idx_usage` (`used_amount`, `amount`),
  KEY `idx_deleted_at` (`deleted_at`),
  
  FOREIGN KEY (`ledger_id`) REFERENCES `ledgers`(`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='预算表';

6. 预算标签关联表 (budget_tags)

sql
CREATE TABLE `budget_tags` (
  `budget_id` VARCHAR(36) NOT NULL COMMENT '预算ID',
  `tag_id` VARCHAR(36) NOT NULL COMMENT '标签ID',
  `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '关联创建时间',
  
  PRIMARY KEY (`budget_id`, `tag_id`),
  KEY `idx_tag_id` (`tag_id`),
  
  FOREIGN KEY (`budget_id`) REFERENCES `budgets`(`id`) ON DELETE CASCADE,
  FOREIGN KEY (`tag_id`) REFERENCES `tags`(`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='预算标签关联表';

7. 账本成员表 (ledger_members)

sql
CREATE TABLE `ledger_members` (
  `id` VARCHAR(36) NOT NULL COMMENT '成员关系唯一标识',
  `ledger_id` VARCHAR(36) NOT NULL COMMENT '账本ID',
  `user_id` VARCHAR(36) NOT NULL COMMENT '用户ID',
  `role` TINYINT NOT NULL COMMENT '角色: 1=所有者,2=管理员,3=编辑者,4=查看者',
  `status` TINYINT NOT NULL DEFAULT 1 COMMENT '状态: 1=活跃,2=邀请中,3=已拒绝,4=已移除',
  
  -- 权限配置 (JSON 格式)
  `permissions` JSON COMMENT '详细权限配置',
  
  -- 邀请信息
  `invited_by` VARCHAR(36) COMMENT '邀请者ID',
  `invited_at` TIMESTAMP NULL COMMENT '邀请时间',
  `joined_at` TIMESTAMP NULL COMMENT '加入时间',
  
  -- 审计字段
  `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `deleted_at` TIMESTAMP NULL COMMENT '删除时间',
  
  PRIMARY KEY (`id`),
  KEY `idx_ledger_id` (`ledger_id`),
  KEY `idx_user_id` (`user_id`),
  KEY `idx_role_status` (`role`, `status`),
  KEY `idx_deleted_at` (`deleted_at`),
  
  UNIQUE KEY `uk_ledger_user` (`ledger_id`, `user_id`, `deleted_at`),
  
  FOREIGN KEY (`ledger_id`) REFERENCES `ledgers`(`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='账本成员表';

8. 附件表 (attachments)

sql
CREATE TABLE `attachments` (
  `id` VARCHAR(36) NOT NULL COMMENT '附件唯一标识',
  `transaction_id` VARCHAR(36) NOT NULL COMMENT '关联交易ID',
  `filename` VARCHAR(255) NOT NULL COMMENT '文件名',
  `original_filename` VARCHAR(255) NOT NULL COMMENT '原始文件名',
  `content_type` VARCHAR(100) NOT NULL COMMENT 'MIME 类型',
  `file_size` BIGINT NOT NULL COMMENT '文件大小(字节)',
  `file_path` VARCHAR(500) NOT NULL COMMENT '文件存储路径',
  `file_url` VARCHAR(500) COMMENT '文件访问URL',
  
  -- 文件信息
  `file_hash` VARCHAR(64) COMMENT '文件哈希值(SHA256)',
  `thumbnail_path` VARCHAR(500) COMMENT '缩略图路径',
  
  -- OCR 结果 (如果是图片)
  `ocr_text` TEXT COMMENT 'OCR 识别文本',
  `ocr_confidence` DECIMAL(3, 2) COMMENT 'OCR 置信度',
  `ocr_processed_at` TIMESTAMP NULL COMMENT 'OCR 处理时间',
  
  -- 状态信息
  `status` TINYINT NOT NULL DEFAULT 1 COMMENT '状态: 1=正常,2=处理中,3=处理失败,4=已删除',
  `processing_status` TINYINT DEFAULT 0 COMMENT '处理状态: 0=未处理,1=处理中,2=已完成,3=失败',
  
  -- 审计字段
  `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `deleted_at` TIMESTAMP NULL COMMENT '删除时间',
  
  PRIMARY KEY (`id`),
  KEY `idx_transaction_id` (`transaction_id`),
  KEY `idx_file_hash` (`file_hash`),
  KEY `idx_content_type` (`content_type`),
  KEY `idx_status` (`status`),
  KEY `idx_created_at` (`created_at`),
  KEY `idx_deleted_at` (`deleted_at`),
  
  FOREIGN KEY (`transaction_id`) REFERENCES `transactions`(`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='附件表';

索引优化策略

1. 核心查询索引

sql
-- 账本查询优化
CREATE INDEX `idx_ledgers_owner_type_status` ON `ledgers` (`owner_id`, `type`, `status`, `created_at`);
CREATE INDEX `idx_ledgers_search` ON `ledgers` (`name`, `status`) USING BTREE;

-- 交易记录查询优化
CREATE INDEX `idx_transactions_ledger_date_desc` ON `transactions` (`ledger_id`, `transaction_date` DESC, `amount`);
CREATE INDEX `idx_transactions_date_range` ON `transactions` (`transaction_date`, `type`, `status`);
CREATE INDEX `idx_transactions_amount_range` ON `transactions` (`amount`, `currency`, `type`);

-- 全文搜索索引
CREATE FULLTEXT INDEX `ft_transactions_search` ON `transactions` (`description`, `note`);
CREATE FULLTEXT INDEX `ft_tags_search` ON `tags` (`name`);

-- 预算查询优化
CREATE INDEX `idx_budgets_active_period` ON `budgets` (`ledger_id`, `status`, `start_date`, `end_date`);
CREATE INDEX `idx_budgets_usage_alert` ON `budgets` (`used_amount`, `amount`, `status`);

2. 复合索引设计原则

sql
-- 遵循最左前缀原则
-- 查询: WHERE ledger_id = ? AND transaction_date BETWEEN ? AND ? ORDER BY amount DESC
CREATE INDEX `idx_ledger_date_amount` ON `transactions` (`ledger_id`, `transaction_date`, `amount` DESC);

-- 查询: WHERE ledger_id = ? AND type = ? AND status = ? ORDER BY transaction_date DESC
CREATE INDEX `idx_ledger_type_status_date` ON `transactions` (`ledger_id`, `type`, `status`, `transaction_date` DESC);

-- 覆盖索引 (包含所有需要的字段)
CREATE INDEX `idx_transactions_summary` ON `transactions` (`ledger_id`, `type`, `transaction_date`, `amount`, `currency`);

3. 分区表索引

sql
-- 每个分区都有独立的索引
-- 自动为每个分区创建相同的索引结构
ALTER TABLE `transactions` 
ADD INDEX `idx_partition_ledger_date` (`ledger_id`, `transaction_date`) LOCAL;

数据一致性和约束

1. 外键约束

sql
-- 启用外键检查
SET FOREIGN_KEY_CHECKS = 1;

-- 级联删除策略
ALTER TABLE `transactions` 
ADD CONSTRAINT `fk_transactions_ledger` 
FOREIGN KEY (`ledger_id`) REFERENCES `ledgers`(`id`) 
ON DELETE CASCADE ON UPDATE CASCADE;

-- 限制删除策略 (防止误删)
ALTER TABLE `budgets` 
ADD CONSTRAINT `fk_budgets_ledger` 
FOREIGN KEY (`ledger_id`) REFERENCES `ledgers`(`id`) 
ON DELETE RESTRICT ON UPDATE CASCADE;

2. 检查约束

sql
-- MySQL 8.0+ 支持检查约束
ALTER TABLE `transactions` 
ADD CONSTRAINT `chk_amount_positive` 
CHECK (`amount` > 0);

ALTER TABLE `transactions` 
ADD CONSTRAINT `chk_valid_type` 
CHECK (`type` IN (1, 2, 3, 4));

ALTER TABLE `budgets` 
ADD CONSTRAINT `chk_date_range` 
CHECK (`end_date` >= `start_date`);

ALTER TABLE `budgets` 
ADD CONSTRAINT `chk_budget_amount_positive` 
CHECK (`amount` > 0);

3. 唯一约束

sql
-- 账本名称在同一用户下唯一 (软删除兼容)
ALTER TABLE `ledgers` 
ADD CONSTRAINT `uk_owner_name_deleted` 
UNIQUE (`owner_id`, `name`, `deleted_at`);

-- 标签名称在同一账本下唯一
ALTER TABLE `tags` 
ADD CONSTRAINT `uk_ledger_tag_name` 
UNIQUE (`ledger_id`, `name`, `deleted_at`);

-- 预算在同一时期不重叠 (需要应用层逻辑配合)
-- 由于 MySQL 的约束限制,复杂的重叠检查在应用层实现

性能优化策略

1. 查询优化

sql
-- 统计查询优化 (使用汇总表)
CREATE TABLE `ledger_stats` (
  `ledger_id` VARCHAR(36) NOT NULL,
  `stat_date` DATE NOT NULL,
  `total_income` BIGINT DEFAULT 0,
  `total_expense` BIGINT DEFAULT 0,
  `transaction_count` INT DEFAULT 0,
  `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  
  PRIMARY KEY (`ledger_id`, `stat_date`),
  KEY `idx_stat_date` (`stat_date`)
) ENGINE=InnoDB COMMENT='账本统计汇总表';

-- 定期汇总脚本
DELIMITER $$
CREATE EVENT `evt_daily_stats_update`
ON SCHEDULE EVERY 1 DAY
STARTS '2024-01-01 01:00:00'
DO
BEGIN
  INSERT INTO `ledger_stats` (`ledger_id`, `stat_date`, `total_income`, `total_expense`, `transaction_count`)
  SELECT 
    `ledger_id`,
    `transaction_date`,
    SUM(CASE WHEN `type` = 1 THEN `amount` ELSE 0 END) AS `total_income`,
    SUM(CASE WHEN `type` = 2 THEN `amount` ELSE 0 END) AS `total_expense`,
    COUNT(*) AS `transaction_count`
  FROM `transactions`
  WHERE `transaction_date` = CURDATE() - INTERVAL 1 DAY
    AND `status` = 2
    AND `deleted_at` IS NULL
  GROUP BY `ledger_id`, `transaction_date`
  ON DUPLICATE KEY UPDATE
    `total_income` = VALUES(`total_income`),
    `total_expense` = VALUES(`total_expense`),
    `transaction_count` = VALUES(`transaction_count`),
    `updated_at` = CURRENT_TIMESTAMP;
END$$
DELIMITER ;

2. 分区表管理

sql
-- 自动创建新分区的存储过程
DELIMITER $$
CREATE PROCEDURE `CreatePartitionsForYear`(IN target_year INT)
BEGIN
  SET @sql = CONCAT('ALTER TABLE `transactions` ADD PARTITION (PARTITION p', target_year, ' VALUES LESS THAN (', target_year + 1, '))');
  PREPARE stmt FROM @sql;
  EXECUTE stmt;
  DEALLOCATE PREPARE stmt;
END$$
DELIMITER ;

-- 删除旧分区 (保留3年数据)
DELIMITER $$
CREATE PROCEDURE `DropOldPartitions`()
BEGIN
  SET @old_year = YEAR(CURDATE()) - 3;
  SET @sql = CONCAT('ALTER TABLE `transactions` DROP PARTITION p', @old_year);
  PREPARE stmt FROM @sql;
  EXECUTE stmt;
  DEALLOCATE PREPARE stmt;
END$$
DELIMITER ;

-- 定期执行分区维护
CREATE EVENT `evt_partition_maintenance`
ON SCHEDULE EVERY 1 MONTH
DO
BEGIN
  -- 创建下一年的分区
  CALL CreatePartitionsForYear(YEAR(CURDATE()) + 1);
  
  -- 删除3年前的分区
  CALL DropOldPartitions();
END;

3. 读写分离配置

go
// GORM 读写分离配置
type DatabaseConfig struct {
    Master DatabaseConnection `yaml:"master"`
    Slaves []DatabaseConnection `yaml:"slaves"`
}

type DatabaseConnection struct {
    Host     string `yaml:"host"`
    Port     int    `yaml:"port"`
    Username string `yaml:"username"`
    Password string `yaml:"password"`
    Database string `yaml:"database"`
}

func SetupDatabase(config *DatabaseConfig) (*gorm.DB, error) {
    // 主库连接
    masterDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
        config.Master.Username, config.Master.Password,
        config.Master.Host, config.Master.Port, config.Master.Database)
    
    db, err := gorm.Open(mysql.Open(masterDSN), &gorm.Config{})
    if err != nil {
        return nil, err
    }
    
    // 从库连接
    var slaveDSNs []gorm.Dialector
    for _, slave := range config.Slaves {
        slaveDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
            slave.Username, slave.Password,
            slave.Host, slave.Port, slave.Database)
        slaveDSNs = append(slaveDSNs, mysql.Open(slaveDSN))
    }
    
    // 配置读写分离
    db.Use(dbresolver.Register(dbresolver.Config{
        Sources:  []gorm.Dialector{mysql.Open(masterDSN)}, // 写
        Replicas: slaveDSNs,                                // 读
        Policy:   dbresolver.RandomPolicy{},               // 随机选择从库
    }))
    
    return db, nil
}

缓存设计

1. Redis 缓存策略

go
// 缓存键设计
const (
    // 账本缓存
    LedgerCacheKey = "ledger:id:%s"
    LedgerListCacheKey = "ledger:user:%s:list"
    
    // 交易缓存
    TransactionCacheKey = "transaction:id:%s"
    TransactionListCacheKey = "transaction:ledger:%s:page:%d"
    
    // 统计缓存
    LedgerStatsCacheKey = "stats:ledger:%s:date:%s"
    BudgetUsageCacheKey = "budget:usage:%s"
    
    // 标签缓存
    TagListCacheKey = "tags:ledger:%s"
)

// 缓存时间设置
const (
    LedgerCacheTTL = 1 * time.Hour        // 账本信息
    TransactionCacheTTL = 30 * time.Minute // 交易记录
    StatsCacheTTL = 6 * time.Hour         // 统计数据
    TagCacheTTL = 2 * time.Hour           // 标签信息
)

// 缓存管理器
type CacheManager struct {
    redis redis.Cmdable
}

func (c *CacheManager) GetLedger(ctx context.Context, id string) (*Ledger, error) {
    key := fmt.Sprintf(LedgerCacheKey, id)
    
    data, err := c.redis.Get(ctx, key).Result()
    if err == redis.Nil {
        return nil, nil // 缓存未命中
    }
    if err != nil {
        return nil, err
    }
    
    var ledger Ledger
    err = json.Unmarshal([]byte(data), &ledger)
    return &ledger, err
}

func (c *CacheManager) SetLedger(ctx context.Context, ledger *Ledger) error {
    key := fmt.Sprintf(LedgerCacheKey, ledger.ID)
    
    data, err := json.Marshal(ledger)
    if err != nil {
        return err
    }
    
    return c.redis.SetEX(ctx, key, data, LedgerCacheTTL).Err()
}

// 缓存失效策略
func (c *CacheManager) InvalidateLedgerCache(ctx context.Context, ledgerID, userID string) error {
    keys := []string{
        fmt.Sprintf(LedgerCacheKey, ledgerID),
        fmt.Sprintf(LedgerListCacheKey, userID),
        fmt.Sprintf(LedgerStatsCacheKey, ledgerID, "*"),
    }
    
    // 使用 pipeline 批量删除
    pipe := c.redis.Pipeline()
    for _, key := range keys {
        if strings.Contains(key, "*") {
            // 通配符删除
            c.deleteByPattern(ctx, pipe, key)
        } else {
            pipe.Del(ctx, key)
        }
    }
    
    _, err := pipe.Exec(ctx)
    return err
}

2. 分布式锁

go
// 分布式锁实现
type DistributedLock struct {
    redis  redis.Cmdable
    key    string
    value  string
    expiry time.Duration
}

func NewDistributedLock(redis redis.Cmdable, key string, expiry time.Duration) *DistributedLock {
    return &DistributedLock{
        redis:  redis,
        key:    key,
        value:  generateUniqueValue(),
        expiry: expiry,
    }
}

func (l *DistributedLock) TryLock(ctx context.Context) (bool, error) {
    result, err := l.redis.SetNX(ctx, l.key, l.value, l.expiry).Result()
    return result, err
}

func (l *DistributedLock) Unlock(ctx context.Context) error {
    // Lua 脚本确保原子性
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
    `
    
    result, err := l.redis.Eval(ctx, script, []string{l.key}, l.value).Result()
    if err != nil {
        return err
    }
    
    if result.(int64) == 0 {
        return errors.New("lock not held by this instance")
    }
    
    return nil
}

// 使用分布式锁的预算更新
func (s *BudgetService) UpdateBudgetUsage(ctx context.Context, budgetID string, amount int64) error {
    lockKey := fmt.Sprintf("lock:budget:%s", budgetID)
    lock := NewDistributedLock(s.redis, lockKey, 10*time.Second)
    
    locked, err := lock.TryLock(ctx)
    if err != nil {
        return err
    }
    if !locked {
        return errors.New("failed to acquire lock")
    }
    defer lock.Unlock(ctx)
    
    // 在锁保护下更新预算使用情况
    return s.updateBudgetUsageInDB(ctx, budgetID, amount)
}

数据迁移和版本管理

1. 数据库迁移

go
// 迁移脚本管理
type Migration struct {
    Version     string
    Description string
    UpSQL       string
    DownSQL     string
    CreatedAt   time.Time
}

type MigrationManager struct {
    db *gorm.DB
}

func (m *MigrationManager) CreateMigrationsTable() error {
    return m.db.Exec(`
        CREATE TABLE IF NOT EXISTS schema_migrations (
            version VARCHAR(255) PRIMARY KEY,
            description TEXT,
            applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            checksum VARCHAR(64),
            execution_time_ms INT
        )
    `).Error
}

func (m *MigrationManager) ApplyMigration(migration *Migration) error {
    start := time.Now()
    
    // 开始事务
    tx := m.db.Begin()
    defer func() {
        if r := recover(); r != nil {
            tx.Rollback()
        }
    }()
    
    // 执行迁移 SQL
    if err := tx.Exec(migration.UpSQL).Error; err != nil {
        tx.Rollback()
        return fmt.Errorf("migration %s failed: %w", migration.Version, err)
    }
    
    // 记录迁移历史
    executionTime := time.Since(start).Milliseconds()
    checksum := calculateChecksum(migration.UpSQL)
    
    if err := tx.Exec(`
        INSERT INTO schema_migrations (version, description, checksum, execution_time_ms)
        VALUES (?, ?, ?, ?)
    `, migration.Version, migration.Description, checksum, executionTime).Error; err != nil {
        tx.Rollback()
        return err
    }
    
    return tx.Commit().Error
}

// 迁移文件示例
var migrations = []Migration{
    {
        Version:     "20240115_001",
        Description: "创建账本表",
        UpSQL: `
            CREATE TABLE ledgers (
                id VARCHAR(36) NOT NULL,
                name VARCHAR(100) NOT NULL,
                -- ... 其他字段
                PRIMARY KEY (id)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
        `,
        DownSQL: `DROP TABLE IF EXISTS ledgers;`,
    },
    {
        Version:     "20240115_002",
        Description: "添加账本设置字段",
        UpSQL: `
            ALTER TABLE ledgers 
            ADD COLUMN settings JSON COMMENT '账本设置';
        `,
        DownSQL: `
            ALTER TABLE ledgers 
            DROP COLUMN settings;
        `,
    },
}

2. 数据种子

go
// 数据种子管理
type Seeder struct {
    db *gorm.DB
}

func (s *Seeder) SeedDefaultData() error {
    // 创建默认标签
    defaultTags := []Tag{
        {ID: "tag-food", Name: "餐饮", Color: "#FF6B6B", Icon: "food"},
        {ID: "tag-transport", Name: "交通", Color: "#4ECDC4", Icon: "car"},
        {ID: "tag-shopping", Name: "购物", Color: "#45B7D1", Icon: "shopping"},
        {ID: "tag-entertainment", Name: "娱乐", Color: "#96CEB4", Icon: "entertainment"},
        {ID: "tag-healthcare", Name: "医疗", Color: "#FFEAA7", Icon: "health"},
    }
    
    for _, tag := range defaultTags {
        if err := s.db.FirstOrCreate(&tag, "id = ?", tag.ID).Error; err != nil {
            return err
        }
    }
    
    // 创建默认预算模板
    defaultBudgetTemplates := []BudgetTemplate{
        {Name: "月度生活预算", Amount: 300000, Period: "monthly", Tags: []string{"tag-food", "tag-transport"}},
        {Name: "娱乐预算", Amount: 100000, Period: "monthly", Tags: []string{"tag-entertainment"}},
    }
    
    // 插入预算模板...
    
    return nil
}

备份和恢复策略

1. 定期备份

bash
#!/bin/bash
# 数据库备份脚本

BACKUP_DIR="/backup/mysql"
DATE=$(date +%Y%m%d_%H%M%S)
DB_NAME="ledger_db"

# 全量备份
mysqldump --single-transaction --routines --triggers \
  --host=localhost --user=backup_user --password=$MYSQL_PASSWORD \
  $DB_NAME > $BACKUP_DIR/full_backup_$DATE.sql

# 压缩备份文件
gzip $BACKUP_DIR/full_backup_$DATE.sql

# 删除7天前的备份
find $BACKUP_DIR -name "full_backup_*.sql.gz" -mtime +7 -delete

# 增量备份 (binlog)
BINLOG_DIR="/var/log/mysql"
cp $BINLOG_DIR/mysql-bin.* $BACKUP_DIR/binlog/

# 上传到云存储
aws s3 sync $BACKUP_DIR s3://ledger-backup/mysql/ --delete

2. 恢复策略

bash
#!/bin/bash
# 数据库恢复脚本

BACKUP_FILE=$1
DB_NAME="ledger_db"

if [ -z "$BACKUP_FILE" ]; then
    echo "Usage: $0 <backup_file>"
    exit 1
fi

# 创建新数据库
mysql -h localhost -u root -p$MYSQL_ROOT_PASSWORD -e "DROP DATABASE IF EXISTS $DB_NAME; CREATE DATABASE $DB_NAME CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"

# 恢复数据
if [[ $BACKUP_FILE == *.gz ]]; then
    zcat $BACKUP_FILE | mysql -h localhost -u root -p$MYSQL_ROOT_PASSWORD $DB_NAME
else
    mysql -h localhost -u root -p$MYSQL_ROOT_PASSWORD $DB_NAME < $BACKUP_FILE
fi

echo "Database restored from $BACKUP_FILE"

# 验证恢复结果
mysql -h localhost -u root -p$MYSQL_ROOT_PASSWORD $DB_NAME -e "SELECT COUNT(*) as ledger_count FROM ledgers; SELECT COUNT(*) as transaction_count FROM transactions;"

监控和运维

1. 性能监控

sql
-- 慢查询监控
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 1;
SET GLOBAL log_queries_not_using_indexes = 'ON';

-- 查看慢查询
SELECT 
    query_time,
    lock_time,
    rows_sent,
    rows_examined,
    sql_text
FROM mysql.slow_log
WHERE start_time > DATE_SUB(NOW(), INTERVAL 1 HOUR)
ORDER BY query_time DESC
LIMIT 10;

-- 查看索引使用情况
SELECT 
    t.TABLE_SCHEMA,
    t.TABLE_NAME,
    s.INDEX_NAME,
    s.CARDINALITY,
    s.SUB_PART,
    s.PACKED,
    s.INDEX_TYPE
FROM information_schema.TABLES t
JOIN information_schema.STATISTICS s ON t.TABLE_NAME = s.TABLE_NAME
WHERE t.TABLE_SCHEMA = 'ledger_db'
ORDER BY t.TABLE_NAME, s.SEQ_IN_INDEX;

2. 数据完整性检查

sql
-- 创建数据完整性检查存储过程
DELIMITER $$
CREATE PROCEDURE `CheckDataIntegrity`()
BEGIN
    -- 检查孤立的交易记录
    SELECT 'Orphaned Transactions' as issue, COUNT(*) as count
    FROM transactions t
    LEFT JOIN ledgers l ON t.ledger_id = l.id
    WHERE l.id IS NULL;
    
    -- 检查预算使用情况一致性
    SELECT 'Budget Usage Inconsistency' as issue, COUNT(*) as count
    FROM budgets b
    WHERE b.used_amount != (
        SELECT COALESCE(SUM(t.amount), 0)
        FROM transactions t
        JOIN transaction_tags tt ON t.id = tt.transaction_id
        WHERE t.ledger_id = b.ledger_id
          AND t.type = 2  -- 支出
          AND t.status = 2  -- 已完成
          AND t.transaction_date BETWEEN b.start_date AND b.end_date
          AND tt.tag_id IN (
              SELECT bt.tag_id FROM budget_tags bt WHERE bt.budget_id = b.id
          )
    );
    
    -- 检查标签使用统计
    SELECT 'Tag Usage Stats Inconsistency' as issue, COUNT(*) as count
    FROM tags t
    WHERE t.usage_count != (
        SELECT COUNT(*)
        FROM transaction_tags tt
        WHERE tt.tag_id = t.id
    );
END$$
DELIMITER ;

-- 定期执行完整性检查
CREATE EVENT `evt_integrity_check`
ON SCHEDULE EVERY 1 DAY
STARTS '2024-01-01 02:00:00'
DO
  CALL CheckDataIntegrity();

3. 自动化运维脚本

python
#!/usr/bin/env python3
# 数据库运维脚本

import mysql.connector
import redis
import json
import logging
from datetime import datetime, timedelta

class DatabaseMaintenance:
    def __init__(self, mysql_config, redis_config):
        self.mysql = mysql.connector.connect(**mysql_config)
        self.redis = redis.Redis(**redis_config)
        self.logger = logging.getLogger(__name__)
    
    def cleanup_old_data(self):
        """清理过期数据"""
        cursor = self.mysql.cursor()
        
        # 清理6个月前的软删除记录
        cutoff_date = datetime.now() - timedelta(days=180)
        
        tables = ['ledgers', 'transactions', 'tags', 'budgets']
        for table in tables:
            query = f"""
                DELETE FROM {table} 
                WHERE deleted_at IS NOT NULL 
                AND deleted_at < %s
            """
            cursor.execute(query, (cutoff_date,))
            deleted_count = cursor.rowcount
            self.logger.info(f"Cleaned {deleted_count} records from {table}")
        
        self.mysql.commit()
        cursor.close()
    
    def update_statistics(self):
        """更新统计数据"""
        cursor = self.mysql.cursor()
        
        # 更新标签使用统计
        query = """
            UPDATE tags t
            SET usage_count = (
                SELECT COUNT(*) 
                FROM transaction_tags tt 
                WHERE tt.tag_id = t.id
            ),
            total_amount = (
                SELECT COALESCE(SUM(tr.amount), 0)
                FROM transaction_tags tt
                JOIN transactions tr ON tt.transaction_id = tr.id
                WHERE tt.tag_id = t.id AND tr.status = 2
            ),
            last_used_at = (
                SELECT MAX(tr.created_at)
                FROM transaction_tags tt
                JOIN transactions tr ON tt.transaction_id = tr.id
                WHERE tt.tag_id = t.id
            )
            WHERE t.deleted_at IS NULL
        """
        cursor.execute(query)
        
        # 更新预算使用情况
        query = """
            UPDATE budgets b
            SET used_amount = (
                SELECT COALESCE(SUM(t.amount), 0)
                FROM transactions t
                JOIN transaction_tags tt ON t.id = tt.transaction_id
                JOIN budget_tags bt ON tt.tag_id = bt.tag_id
                WHERE bt.budget_id = b.id
                  AND t.type = 2
                  AND t.status = 2
                  AND t.transaction_date BETWEEN b.start_date AND b.end_date
            ),
            transaction_count = (
                SELECT COUNT(DISTINCT t.id)
                FROM transactions t
                JOIN transaction_tags tt ON t.id = tt.transaction_id
                JOIN budget_tags bt ON tt.tag_id = bt.tag_id
                WHERE bt.budget_id = b.id
                  AND t.type = 2
                  AND t.status = 2
                  AND t.transaction_date BETWEEN b.start_date AND b.end_date
            )
            WHERE b.deleted_at IS NULL
        """
        cursor.execute(query)
        
        self.mysql.commit()
        cursor.close()
        self.logger.info("Statistics updated successfully")
    
    def optimize_tables(self):
        """优化表结构"""
        cursor = self.mysql.cursor()
        
        # 获取需要优化的表
        cursor.execute("SHOW TABLES")
        tables = [table[0] for table in cursor.fetchall()]
        
        for table in tables:
            cursor.execute(f"OPTIMIZE TABLE {table}")
            result = cursor.fetchone()
            self.logger.info(f"Optimized table {table}: {result}")
        
        cursor.close()
    
    def check_replication_lag(self):
        """检查主从复制延迟"""
        cursor = self.mysql.cursor()
        cursor.execute("SHOW SLAVE STATUS")
        status = cursor.fetchone()
        
        if status:
            lag = status[32]  # Seconds_Behind_Master
            if lag > 60:  # 延迟超过1分钟
                self.logger.warning(f"Replication lag: {lag} seconds")
            else:
                self.logger.info(f"Replication lag: {lag} seconds")
        
        cursor.close()

if __name__ == "__main__":
    # 配置日志
    logging.basicConfig(level=logging.INFO)
    
    # 数据库配置
    mysql_config = {
        'host': 'localhost',
        'user': 'admin',
        'password': 'password',
        'database': 'ledger_db'
    }
    
    redis_config = {
        'host': 'localhost',
        'port': 6379,
        'db': 0
    }
    
    # 执行维护任务
    maintenance = DatabaseMaintenance(mysql_config, redis_config)
    maintenance.cleanup_old_data()
    maintenance.update_statistics()
    maintenance.optimize_tables()
    maintenance.check_replication_lag()

这个数据库设计文档提供了 Ledger 模块完整的数据存储方案,确保数据的一致性、性能和可维护性。通过合理的表结构设计、索引优化、分区策略和缓存机制,能够支持高并发的财务数据管理需求。

基于 MIT 许可证发布