Skip to content

gRPC 和 Protobuf 设计模式

概述

gRPC 是 Google 开发的高性能、开源的通用 RPC 框架,基于 HTTP/2 协议和 Protocol Buffers 序列化机制。在 Ledger 模块中,gRPC 作为内部服务通信的主要协议,同时通过 google.api.http 注解提供 HTTP RESTful API 支持,实现"一份定义,双协议支持"的设计目标。

核心优势

  • 高性能: 基于 HTTP/2,支持多路复用、流式传输
  • 强类型: Protocol Buffers 提供强类型定义和向后兼容
  • 多语言: 支持 10+ 种编程语言的客户端生成
  • 流式支持: 支持单向和双向流式传输
  • 负载均衡: 内置负载均衡和服务发现机制

应用场景

  • 微服务间高性能通信
  • 多语言系统集成
  • 移动端 SDK 生成
  • 实时数据流传输
  • API 契约管理

核心原理

Protocol Buffers 序列化机制

Protocol Buffers (protobuf) 是一种语言无关、平台无关的可扩展序列化结构数据的方法。

工作原理:

  1. Schema 定义: 使用 .proto 文件定义数据结构和服务接口
  2. 代码生成: 通过 protoc 编译器生成各语言的代码
  3. 序列化: 将内存对象序列化为二进制格式
  4. 传输: 通过网络传输序列化后的数据
  5. 反序列化: 接收方将二进制数据反序列化为对象

性能特性:

  • 比 JSON 小 3-10 倍
  • 比 JSON 快 20-100 倍
  • 支持向前和向后兼容性
  • 强类型检查

gRPC 通信模型

Ledger 模块 Protobuf 设计

1. 文件组织结构

proto/
├── common/v1/                    # 通用类型定义
│   ├── types.proto              # 基础类型
│   ├── money.proto              # 货币类型
│   ├── pagination.proto         # 分页类型
│   └── errors.proto             # 错误定义
├── ledger/v1/                   # 账本模块
│   ├── ledger_service.proto     # 服务定义
│   ├── ledger.proto            # 账本实体
│   ├── transaction.proto       # 交易实体
│   ├── tag.proto               # 标签实体
│   ├── budget.proto            # 预算实体
│   └── analytics.proto         # 统计分析
└── notification/v1/             # 通知模块
    ├── notification_service.proto
    └── notification.proto

2. 通用类型定义

基础类型 (common/v1/types.proto)

protobuf
syntax = "proto3";

package common.v1;

option go_package = "github.com/FixIterate/lz-stash/gen/common/v1;commonv1";

import "google/protobuf/timestamp.proto";

// 通用 ID 类型
message ID {
  string value = 1;
}

// 分页请求
message PaginationRequest {
  int32 page_size = 1;    // 页大小,默认 20,最大 100
  string page_token = 2;  // 分页令牌
}

// 分页响应
message PaginationResponse {
  string next_page_token = 1;  // 下一页令牌
  int64 total_count = 2;       // 总记录数
}

// 时间范围
message TimeRange {
  google.protobuf.Timestamp start_time = 1;
  google.protobuf.Timestamp end_time = 2;
}

// 排序参数
message SortOrder {
  string field = 1;                    // 排序字段
  enum Direction {
    DIRECTION_UNSPECIFIED = 0;
    DIRECTION_ASC = 1;                 // 升序
    DIRECTION_DESC = 2;                // 降序
  }
  Direction direction = 2;
}

货币类型 (common/v1/money.proto)

protobuf
syntax = "proto3";

package common.v1;

option go_package = "github.com/FixIterate/lz-stash/gen/common/v1;commonv1";

// 货币枚举
enum Currency {
  CURRENCY_UNSPECIFIED = 0;
  CURRENCY_CNY = 1;      // 人民币
  CURRENCY_USD = 2;      // 美元
  CURRENCY_EUR = 3;      // 欧元
  CURRENCY_GBP = 4;      // 英镑
  CURRENCY_JPY = 5;      // 日元
  CURRENCY_HKD = 6;      // 港币
  CURRENCY_TWD = 7;      // 台币
}

// 金额类型 (以分为单位,避免浮点数精度问题)
message Money {
  int64 amount = 1;              // 金额,以分为单位
  Currency currency = 2;         // 货币类型
  
  // 扩展字段,预留未来使用
  map<string, string> metadata = 100;
}

// 汇率信息
message ExchangeRate {
  Currency from_currency = 1;
  Currency to_currency = 2;
  double rate = 3;
  google.protobuf.Timestamp updated_at = 4;
}

3. 账本服务定义

服务接口 (ledger/v1/ledger_service.proto)

protobuf
syntax = "proto3";

package ledger.v1;

option go_package = "github.com/FixIterate/lz-stash/gen/ledger/v1;ledgerv1";

import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "common/v1/types.proto";
import "ledger/v1/ledger.proto";
import "ledger/v1/transaction.proto";
import "ledger/v1/tag.proto";
import "ledger/v1/budget.proto";
import "ledger/v1/analytics.proto";

// 账本管理服务
service LedgerService {
  // 账本管理
  rpc CreateLedger(CreateLedgerRequest) returns (CreateLedgerResponse) {
    option (google.api.http) = {
      post: "/api/v1/ledgers"
      body: "*"
    };
  }
  
  rpc GetLedger(GetLedgerRequest) returns (GetLedgerResponse) {
    option (google.api.http) = {
      get: "/api/v1/ledgers/{id}"
    };
  }
  
  rpc ListLedgers(ListLedgersRequest) returns (ListLedgersResponse) {
    option (google.api.http) = {
      get: "/api/v1/ledgers"
    };
  }
  
  rpc UpdateLedger(UpdateLedgerRequest) returns (UpdateLedgerResponse) {
    option (google.api.http) = {
      put: "/api/v1/ledgers/{id}"
      body: "*"
    };
  }
  
  rpc DeleteLedger(DeleteLedgerRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      delete: "/api/v1/ledgers/{id}"
    };
  }
  
  // 交易记录管理
  rpc CreateTransaction(CreateTransactionRequest) returns (CreateTransactionResponse) {
    option (google.api.http) = {
      post: "/api/v1/ledgers/{ledger_id}/transactions"
      body: "*"
    };
  }
  
  rpc BatchCreateTransactions(BatchCreateTransactionsRequest) returns (BatchCreateTransactionsResponse) {
    option (google.api.http) = {
      post: "/api/v1/ledgers/{ledger_id}/transactions:batch"
      body: "*"
    };
  }
  
  rpc GetTransaction(GetTransactionRequest) returns (GetTransactionResponse) {
    option (google.api.http) = {
      get: "/api/v1/transactions/{id}"
    };
  }
  
  rpc ListTransactions(ListTransactionsRequest) returns (ListTransactionsResponse) {
    option (google.api.http) = {
      get: "/api/v1/ledgers/{ledger_id}/transactions"
    };
  }
  
  rpc UpdateTransaction(UpdateTransactionRequest) returns (UpdateTransactionResponse) {
    option (google.api.http) = {
      put: "/api/v1/transactions/{id}"
      body: "*"
    };
  }
  
  rpc DeleteTransaction(DeleteTransactionRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      delete: "/api/v1/transactions/{id}"
    };
  }
  
  // 标签管理
  rpc CreateTag(CreateTagRequest) returns (CreateTagResponse) {
    option (google.api.http) = {
      post: "/api/v1/ledgers/{ledger_id}/tags"
      body: "*"
    };
  }
  
  rpc ListTags(ListTagsRequest) returns (ListTagsResponse) {
    option (google.api.http) = {
      get: "/api/v1/ledgers/{ledger_id}/tags"
    };
  }
  
  // 预算管理
  rpc CreateBudget(CreateBudgetRequest) returns (CreateBudgetResponse) {
    option (google.api.http) = {
      post: "/api/v1/ledgers/{ledger_id}/budgets"
      body: "*"
    };
  }
  
  rpc GetBudget(GetBudgetRequest) returns (GetBudgetResponse) {
    option (google.api.http) = {
      get: "/api/v1/budgets/{id}"
    };
  }
  
  // 统计分析
  rpc GetAnalytics(GetAnalyticsRequest) returns (GetAnalyticsResponse) {
    option (google.api.http) = {
      get: "/api/v1/ledgers/{ledger_id}/analytics"
    };
  }
  
  rpc GetTrendAnalysis(GetTrendAnalysisRequest) returns (GetTrendAnalysisResponse) {
    option (google.api.http) = {
      get: "/api/v1/ledgers/{ledger_id}/analytics/trends"
    };
  }
}

// 移动端优化服务
service MobileLedgerService {
  // 快速记账
  rpc QuickCreateTransaction(QuickCreateTransactionRequest) returns (QuickCreateTransactionResponse) {
    option (google.api.http) = {
      post: "/api/mobile/v1/transactions/quick"
      body: "*"
    };
  }
  
  // 数据同步
  rpc SyncData(SyncDataRequest) returns (SyncDataResponse) {
    option (google.api.http) = {
      post: "/api/mobile/v1/sync"
      body: "*"
    };
  }
  
  // 首页仪表板
  rpc GetDashboard(GetDashboardRequest) returns (GetDashboardResponse) {
    option (google.api.http) = {
      get: "/api/mobile/v1/dashboard"
    };
  }
}

// 管理后台服务
service AdminLedgerService {
  // 用户账本管理
  rpc GetUserLedgers(GetUserLedgersRequest) returns (GetUserLedgersResponse) {
    option (google.api.http) = {
      get: "/api/admin/v1/users/{user_id}/ledgers"
    };
  }
  
  // 系统统计
  rpc GetSystemStats(GetSystemStatsRequest) returns (GetSystemStatsResponse) {
    option (google.api.http) = {
      get: "/api/admin/v1/stats"
    };
  }
}

4. 实体定义示例

账本实体 (ledger/v1/ledger.proto)

protobuf
syntax = "proto3";

package ledger.v1;

option go_package = "github.com/FixIterate/lz-stash/gen/ledger/v1;ledgerv1";

import "google/protobuf/timestamp.proto";
import "common/v1/types.proto";
import "common/v1/money.proto";

// 账本类型
enum LedgerType {
  LEDGER_TYPE_UNSPECIFIED = 0;
  LEDGER_TYPE_PERSONAL = 1;      // 个人账本
  LEDGER_TYPE_FAMILY = 2;        // 家庭账本
  LEDGER_TYPE_PROJECT = 3;       // 项目账本
  LEDGER_TYPE_TRAVEL = 4;        // 旅行账本
  LEDGER_TYPE_BUSINESS = 5;      // 商务账本
}

// 账本状态
enum LedgerStatus {
  LEDGER_STATUS_UNSPECIFIED = 0;
  LEDGER_STATUS_ACTIVE = 1;      // 活跃
  LEDGER_STATUS_ARCHIVED = 2;    // 已归档
  LEDGER_STATUS_DELETED = 3;     // 已删除
}


// 账本实体
message Ledger {
  string id = 1;                              // 账本 ID
  string name = 2;                            // 账本名称
  string description = 3;                     // 账本描述
  LedgerType type = 4;                        // 账本类型
  LedgerStatus status = 5;                    // 账本状态
  common.v1.Currency default_currency = 6;   // 默认货币
  string owner_id = 7;                        // 所有者 ID
  LedgerSettings settings = 9;                // 账本设置
  google.protobuf.Timestamp created_at = 10; // 创建时间
  google.protobuf.Timestamp updated_at = 11; // 更新时间
  int64 version = 12;                         // 版本号(乐观锁)
  
  // 扩展字段
  map<string, string> metadata = 100;
}


// 账本设置
message LedgerSettings {
  bool enable_budget_alerts = 1;              // 启用预算提醒
  bool enable_auto_categorization = 2;        // 启用自动分类
  bool enable_receipt_ocr = 3;                // 启用小票 OCR
  int32 budget_alert_threshold = 4;           // 预算提醒阈值 (%)
  string timezone = 5;                        // 时区
  string locale = 6;                          // 语言地区
}

// 创建账本请求
message CreateLedgerRequest {
  string name = 1;                            // 账本名称
  string description = 2;                     // 账本描述
  LedgerType type = 3;                        // 账本类型
  common.v1.Currency default_currency = 4;   // 默认货币
  LedgerSettings settings = 5;                // 账本设置
}

// 创建账本响应
message CreateLedgerResponse {
  Ledger ledger = 1;
}

// 其他请求响应消息...

实现方案

1. buf 配置管理

buf.yaml

yaml
version: v1
name: buf.build/FixIterate/lz-stash
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT
  except:
    - UNARY_RPC
  enum_zero_value_suffix: _UNSPECIFIED
  rpc_allow_same_request_response: false
  rpc_allow_google_protobuf_empty_requests: true
  rpc_allow_google_protobuf_empty_responses: true
deps:
  - buf.build/googleapis/googleapis
  - buf.build/protocolbuffers/wellknowntypes

buf.gen.yaml (Go 代码生成)

yaml
version: v1
managed:
  enabled: true
  go_package_prefix:
    default: github.com/FixIterate/lz-stash/gen
    except:
      - buf.build/googleapis/googleapis
plugins:
  - plugin: buf.build/protocolbuffers/go
    out: gen
    opt:
      - paths=source_relative
      - module=github.com/FixIterate/lz-stash
  - plugin: buf.build/grpc/go
    out: gen
    opt:
      - paths=source_relative
      - require_unimplemented_servers=false
  - plugin: buf.build/grpc-ecosystem/grpc-gateway
    out: gen
    opt:
      - paths=source_relative
      - generate_unbound_methods=true
  - plugin: buf.build/grpc-ecosystem/openapiv2
    out: gen/openapi

多语言客户端生成配置

buf.gen.nodejs.yaml

yaml
version: v1
plugins:
  - plugin: buf.build/connectrpc/es
    out: clients/nodejs
    opt:
      - target=ts
  - plugin: buf.build/protocolbuffers/js
    out: clients/nodejs
    opt:
      - import_style=commonjs

buf.gen.python.yaml

yaml
version: v1
plugins:
  - plugin: buf.build/protocolbuffers/python
    out: clients/python
  - plugin: buf.build/grpc/python
    out: clients/python

buf.gen.php.yaml

yaml
version: v1
plugins:
  - plugin: buf.build/protocolbuffers/php
    out: clients/php
  - plugin: buf.build/grpc/php
    out: clients/php

2. 服务实现模板

gRPC 服务实现

go
package service

import (
    "context"
    "fmt"
    
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "google.golang.org/protobuf/types/known/emptypb"
    
    ledgerv1 "github.com/FixIterate/lz-stash/gen/ledger/v1"
    "github.com/FixIterate/lz-stash/internal/domain/ledger"
)

// LedgerService gRPC 服务实现
type LedgerService struct {
    ledgerv1.UnimplementedLedgerServiceServer
    
    // 依赖注入
    ledgerRepo       ledger.Repository
    transactionRepo  transaction.Repository
    budgetService    budget.Service
    notificationSvc  notification.Service
    cacheManager     cache.Manager
    eventPublisher   event.Publisher
}

// CreateLedger 创建账本
func (s *LedgerService) CreateLedger(
    ctx context.Context,
    req *ledgerv1.CreateLedgerRequest,
) (*ledgerv1.CreateLedgerResponse, error) {
    // 1. 参数验证
    if err := s.validateCreateLedgerRequest(req); err != nil {
        return nil, status.Error(codes.InvalidArgument, err.Error())
    }
    
    // 2. 构建领域对象
    ledgerEntity, err := s.buildLedgerFromRequest(req)
    if err != nil {
        return nil, status.Error(codes.Internal, err.Error())
    }
    
    // 3. 业务逻辑处理
    if err := s.ledgerRepo.Save(ctx, ledgerEntity); err != nil {
        return nil, status.Error(codes.Internal, 
            fmt.Sprintf("failed to save ledger: %v", err))
    }
    
    // 4. 发布领域事件
    event := ledger.NewLedgerCreatedEvent(ledgerEntity)
    if err := s.eventPublisher.Publish(ctx, event); err != nil {
        // 记录日志,但不影响主流程
        s.logger.Warn("failed to publish event", "error", err)
    }
    
    // 5. 构建响应
    response := &ledgerv1.CreateLedgerResponse{
        Ledger: s.toProtobufLedger(ledgerEntity),
    }
    
    return response, nil
}

// 参数验证
func (s *LedgerService) validateCreateLedgerRequest(
    req *ledgerv1.CreateLedgerRequest,
) error {
    if req.Name == "" {
        return fmt.Errorf("ledger name is required")
    }
    if len(req.Name) > 100 {
        return fmt.Errorf("ledger name too long (max 100 characters)")
    }
    if req.Type == ledgerv1.LedgerType_LEDGER_TYPE_UNSPECIFIED {
        return fmt.Errorf("ledger type is required")
    }
    return nil
}

// 领域对象转换
func (s *LedgerService) toProtobufLedger(entity *ledger.Ledger) *ledgerv1.Ledger {
    return &ledgerv1.Ledger{
        Id:               entity.ID().String(),
        Name:            entity.Name(),
        Description:     entity.Description(),
        Type:            s.toProtobufLedgerType(entity.Type()),
        Status:          s.toProtobufLedgerStatus(entity.Status()),
        DefaultCurrency: s.toProtobufCurrency(entity.DefaultCurrency()),
        OwnerId:         entity.OwnerID().String(),
        Members:         s.toProtobufMembers(entity.Members()),
        Settings:        s.toProtobufSettings(entity.Settings()),
        CreatedAt:       timestamppb.New(entity.CreatedAt()),
        UpdatedAt:       timestamppb.New(entity.UpdatedAt()),
        Version:         entity.Version(),
    }
}

3. HTTP Gateway 集成

HTTP 服务器设置

go
package server

import (
    "context"
    "net/http"
    
    "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    
    ledgerv1 "github.com/FixIterate/lz-stash/gen/ledger/v1"
)

// HTTPServer HTTP 网关服务器
type HTTPServer struct {
    mux        *runtime.ServeMux
    grpcServer *grpc.Server
    httpServer *http.Server
}

// NewHTTPServer 创建 HTTP 服务器
func NewHTTPServer(grpcAddr string, httpAddr string) *HTTPServer {
    // 创建 gRPC Gateway 多路复用器
    mux := runtime.NewServeMux(
        runtime.WithIncomingHeaderMatcher(customHeaderMatcher),
        runtime.WithOutgoingHeaderMatcher(customHeaderMatcher),
        runtime.WithErrorHandler(customErrorHandler),
        runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{
            MarshalOptions: protojson.MarshalOptions{
                UseProtoNames:   true,
                EmitUnpopulated: true,
            },
            UnmarshalOptions: protojson.UnmarshalOptions{
                DiscardUnknown: true,
            },
        }),
    )
    
    return &HTTPServer{
        mux: mux,
        httpServer: &http.Server{
            Addr:    httpAddr,
            Handler: mux,
        },
    }
}

// RegisterServices 注册服务
func (s *HTTPServer) RegisterServices(ctx context.Context, grpcAddr string) error {
    conn, err := grpc.DialContext(
        ctx,
        grpcAddr,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        return fmt.Errorf("failed to dial gRPC server: %w", err)
    }
    
    // 注册 Ledger 服务
    if err := ledgerv1.RegisterLedgerServiceHandler(ctx, s.mux, conn); err != nil {
        return fmt.Errorf("failed to register ledger service: %w", err)
    }
    
    // 注册移动端服务
    if err := ledgerv1.RegisterMobileLedgerServiceHandler(ctx, s.mux, conn); err != nil {
        return fmt.Errorf("failed to register mobile service: %w", err)
    }
    
    return nil
}

// 自定义错误处理
func customErrorHandler(
    ctx context.Context,
    mux *runtime.ServeMux,
    marshaler runtime.Marshaler,
    w http.ResponseWriter,
    r *http.Request,
    err error,
) {
    const fallback = `{"error": "internal server error"}`
    
    w.Header().Set("Content-Type", marshaler.ContentType("application/json"))
    
    if s, ok := status.FromError(err); ok {
        w.WriteHeader(runtime.HTTPStatusFromCode(s.Code()))
        
        errorResponse := map[string]interface{}{
            "error": s.Message(),
            "code":  int(s.Code()),
        }
        
        if details := s.Details(); len(details) > 0 {
            errorResponse["details"] = details
        }
        
        if buf, marshalErr := marshaler.Marshal(errorResponse); marshalErr == nil {
            w.Write(buf)
            return
        }
    }
    
    w.WriteHeader(http.StatusInternalServerError)
    w.Write([]byte(fallback))
}

最佳实践

1. API 设计原则

RESTful 路径设计

protobuf
// ✅ 良好的 RESTful 设计
rpc CreateLedger(CreateLedgerRequest) returns (CreateLedgerResponse) {
  option (google.api.http) = {
    post: "/api/v1/ledgers"
    body: "*"
  };
}

rpc GetTransaction(GetTransactionRequest) returns (GetTransactionResponse) {
  option (google.api.http) = {
    get: "/api/v1/transactions/{id}"
  };
}

rpc ListTransactions(ListTransactionsRequest) returns (ListTransactionsResponse) {
  option (google.api.http) = {
    get: "/api/v1/ledgers/{ledger_id}/transactions"
  };
}

// ❌ 避免的设计
rpc GetLedgerData(GetLedgerDataRequest) returns (GetLedgerDataResponse) {
  option (google.api.http) = {
    post: "/api/v1/getLedgerData"  // 不符合 RESTful 原则
    body: "*"
  };
}

响应状态码映射

go
// gRPC 状态码到 HTTP 状态码的映射
var grpcToHTTPStatus = map[codes.Code]int{
    codes.OK:                 200, // 成功
    codes.InvalidArgument:    400, // 参数错误
    codes.Unauthenticated:    401, // 未认证
    codes.PermissionDenied:   403, // 权限不足
    codes.NotFound:           404, // 资源不存在
    codes.AlreadyExists:      409, // 资源冲突
    codes.ResourceExhausted:  429, // 请求过于频繁
    codes.Internal:           500, // 服务器内部错误
    codes.Unavailable:        503, // 服务不可用
}

2. 版本管理策略

语义化版本控制

protobuf
// v1 版本
syntax = "proto3";
package ledger.v1;
option go_package = "github.com/FixIterate/lz-stash/gen/ledger/v1;ledgerv1";

message Ledger {
  string id = 1;
  string name = 2;
  string description = 3;
  // 新增字段使用高数字标签
  string owner_id = 10;   // v1.1 新增
  repeated string tags = 11; // v1.2 新增
}

// v2 版本 (破坏性变更)
syntax = "proto3";
package ledger.v2;
option go_package = "github.com/FixIterate/lz-stash/gen/ledger/v2;ledgerv2";

message Ledger {
  string id = 1;
  string name = 2;
  // 移除 description 字段
  LedgerType type = 3;    // 新增枚举类型
  LedgerSettings settings = 4;
}

向后兼容性保证

protobuf
// ✅ 兼容性更新
message Transaction {
  string id = 1;
  string description = 2;
  int64 amount = 3;
  
  // 新增可选字段
  string note = 4;                    // 新增
  repeated string attachments = 5;    // 新增
  
  // 预留字段范围
  reserved 100 to 199;               // 为未来扩展预留
}

// ❌ 破坏兼容性的更新
message Transaction {
  string id = 1;
  int64 amount = 2;        // 改变字段编号
  float amount_float = 3;  // 改变字段类型
  // 删除 description 字段
}

3. 性能优化技巧

流式传输

protobuf
// 大量数据流式传输
service LedgerService {
  // 流式导出交易记录
  rpc ExportTransactions(ExportTransactionsRequest) 
    returns (stream ExportTransactionsResponse);
  
  // 流式导入交易记录
  rpc ImportTransactions(stream ImportTransactionsRequest) 
    returns (ImportTransactionsResponse);
  
  // 双向流式同步
  rpc SyncTransactions(stream SyncTransactionsRequest) 
    returns (stream SyncTransactionsResponse);
}

批量操作

protobuf
// 批量创建
message BatchCreateTransactionsRequest {
  string ledger_id = 1;
  repeated CreateTransactionRequest requests = 2;
  bool skip_on_error = 3;              // 遇到错误是否跳过
  bool validate_only = 4;              // 仅验证不执行
}

message BatchCreateTransactionsResponse {
  repeated CreateTransactionResponse results = 1;
  repeated BatchError errors = 2;      // 错误列表
  int32 success_count = 3;             // 成功数量
  int32 error_count = 4;               // 错误数量
}

message BatchError {
  int32 index = 1;                     // 错误请求索引
  string message = 2;                  // 错误信息
  int32 code = 3;                      // 错误码
}

4. 安全考虑

字段验证

protobuf
import "validate/validate.proto";

message CreateLedgerRequest {
  string name = 1 [(validate.rules).string = {
    min_len: 1,
    max_len: 100,
    pattern: "^[a-zA-Z0-9\\s\\-_]+$"
  }];
  
  string description = 2 [(validate.rules).string = {
    max_len: 500
  }];
  
  repeated string member_ids = 3 [(validate.rules).repeated = {
    min_items: 0,
    max_items: 10,
    items: {
      string: {
        pattern: "^[a-zA-Z0-9\\-]+$"
      }
    }
  }];
}

敏感信息处理

protobuf
// 敏感字段标记
message User {
  string id = 1;
  string username = 2;
  string email = 3 [(sensitive) = true];       // 敏感信息标记
  string phone = 4 [(sensitive) = true];
  string password_hash = 5 [(internal) = true]; // 内部字段,不对外暴露
}

// 脱敏响应
message PublicUser {
  string id = 1;
  string username = 2;
  string masked_email = 3;    // 脱敏后的邮箱
  // 不包含敏感信息
}

代码示例

1. 完整的客户端示例

Go 客户端

go
package main

import (
    "context"
    "log"
    "time"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    
    ledgerv1 "github.com/FixIterate/lz-stash/gen/ledger/v1"
    commonv1 "github.com/FixIterate/lz-stash/gen/common/v1"
)

func main() {
    // 建立 gRPC 连接
    conn, err := grpc.Dial(
        "localhost:9090",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithTimeout(5*time.Second),
    )
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()
    
    // 创建客户端
    client := ledgerv1.NewLedgerServiceClient(conn)
    
    // 创建账本
    ledger, err := createLedger(client)
    if err != nil {
        log.Fatalf("Failed to create ledger: %v", err)
    }
    log.Printf("Created ledger: %s", ledger.Id)
    
    // 创建交易记录
    transaction, err := createTransaction(client, ledger.Id)
    if err != nil {
        log.Fatalf("Failed to create transaction: %v", err)
    }
    log.Printf("Created transaction: %s", transaction.Id)
    
    // 查询统计信息
    analytics, err := getAnalytics(client, ledger.Id)
    if err != nil {
        log.Fatalf("Failed to get analytics: %v", err)
    }
    log.Printf("Total income: %d, Total expense: %d", 
        analytics.Summary.TotalIncome.Amount,
        analytics.Summary.TotalExpense.Amount)
}

func createLedger(client ledgerv1.LedgerServiceClient) (*ledgerv1.Ledger, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    req := &ledgerv1.CreateLedgerRequest{
        Name:        "我的账本",
        Description: "个人日常账本",
        Type:        ledgerv1.LedgerType_LEDGER_TYPE_PERSONAL,
        DefaultCurrency: commonv1.Currency_CURRENCY_CNY,
        Settings: &ledgerv1.LedgerSettings{
            EnableBudgetAlerts:      true,
            BudgetAlertThreshold:   80,
            EnableAutoCategorization: true,
            Timezone:               "Asia/Shanghai",
            Locale:                 "zh-CN",
        },
    }
    
    resp, err := client.CreateLedger(ctx, req)
    if err != nil {
        return nil, err
    }
    
    return resp.Ledger, nil
}

func createTransaction(client ledgerv1.LedgerServiceClient, ledgerID string) (*ledgerv1.Transaction, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    req := &ledgerv1.CreateTransactionRequest{
        LedgerId: ledgerID,
        Type:     ledgerv1.TransactionType_TRANSACTION_TYPE_EXPENSE,
        Amount: &commonv1.Money{
            Amount:   3500, // 35.00 元
            Currency: commonv1.Currency_CURRENCY_CNY,
        },
        Description: "午餐",
        Note:        "公司楼下餐厅",
        TagIds:      []string{"food", "lunch"},
    }
    
    resp, err := client.CreateTransaction(ctx, req)
    if err != nil {
        return nil, err
    }
    
    return resp.Transaction, nil
}

func getAnalytics(client ledgerv1.LedgerServiceClient, ledgerID string) (*ledgerv1.AnalyticsResponse, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    req := &ledgerv1.GetAnalyticsRequest{
        LedgerId: ledgerID,
        TimeRange: &commonv1.TimeRange{
            StartTime: timestamppb.New(time.Now().AddDate(0, -1, 0)), // 一个月前
            EndTime:   timestamppb.New(time.Now()),
        },
    }
    
    resp, err := client.GetAnalytics(ctx, req)
    if err != nil {
        return nil, err
    }
    
    return resp, nil
}

Node.js 客户端

typescript
import { PromiseClient, createPromiseClient } from "@connectrpc/connect";
import { createGrpcTransport } from "@connectrpc/connect-node";
import { LedgerService } from "./gen/ledger/v1/ledger_service_connect";
import { 
  CreateLedgerRequest, 
  CreateTransactionRequest,
  LedgerType,
  TransactionType
} from "./gen/ledger/v1/ledger_pb";
import { Currency } from "./gen/common/v1/money_pb";

async function main() {
  // 创建传输层
  const transport = createGrpcTransport({
    baseUrl: "http://localhost:9090",
    httpVersion: "2",
  });
  
  // 创建客户端
  const client: PromiseClient<typeof LedgerService> = createPromiseClient(
    LedgerService,
    transport
  );
  
  try {
    // 创建账本
    const createLedgerReq = new CreateLedgerRequest({
      name: "我的账本",
      description: "个人日常账本", 
      type: LedgerType.LEDGER_TYPE_PERSONAL,
      defaultCurrency: Currency.CURRENCY_CNY,
      settings: {
        enableBudgetAlerts: true,
        budgetAlertThreshold: 80,
        enableAutoCategorization: true,
        timezone: "Asia/Shanghai",
        locale: "zh-CN",
      },
    });
    
    const ledgerResp = await client.createLedger(createLedgerReq);
    console.log("Created ledger:", ledgerResp.ledger?.id);
    
    // 创建交易记录
    const createTransactionReq = new CreateTransactionRequest({
      ledgerId: ledgerResp.ledger?.id,
      type: TransactionType.TRANSACTION_TYPE_EXPENSE,
      amount: {
        amount: BigInt(3500), // 35.00 元
        currency: Currency.CURRENCY_CNY,
      },
      description: "午餐",
      note: "公司楼下餐厅",
      tagIds: ["food", "lunch"],
    });
    
    const transactionResp = await client.createTransaction(createTransactionReq);
    console.log("Created transaction:", transactionResp.transaction?.id);
    
  } catch (error) {
    console.error("Error:", error);
  }
}

main().catch(console.error);

2. 中间件集成示例

认证中间件

go
func AuthInterceptor(authService auth.Service) grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        // 跳过不需要认证的方法
        if isPublicMethod(info.FullMethod) {
            return handler(ctx, req)
        }
        
        // 从元数据中提取 token
        md, ok := metadata.FromIncomingContext(ctx)
        if !ok {
            return nil, status.Error(codes.Unauthenticated, "missing metadata")
        }
        
        tokens := md.Get("authorization")
        if len(tokens) == 0 {
            return nil, status.Error(codes.Unauthenticated, "missing token")
        }
        
        token := strings.TrimPrefix(tokens[0], "Bearer ")
        
        // 验证 token
        user, err := authService.ValidateToken(ctx, token)
        if err != nil {
            return nil, status.Error(codes.Unauthenticated, "invalid token")
        }
        
        // 将用户信息添加到上下文
        ctx = auth.WithUser(ctx, user)
        
        return handler(ctx, req)
    }
}

func isPublicMethod(method string) bool {
    publicMethods := []string{
        "/ledger.v1.LedgerService/HealthCheck",
        "/ledger.v1.LedgerService/GetVersion",
    }
    
    for _, public := range publicMethods {
        if method == public {
            return true
        }
    }
    return false
}

限流中间件

go
func RateLimitInterceptor(limiter ratelimit.Limiter) grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        // 获取用户 ID
        userID := auth.GetUserID(ctx)
        if userID == "" {
            userID = "anonymous"
        }
        
        // 构建限流 key
        key := fmt.Sprintf("user:%s:method:%s", userID, info.FullMethod)
        
        // 检查限流
        if !limiter.Allow(key) {
            return nil, status.Error(codes.ResourceExhausted, "rate limit exceeded")
        }
        
        return handler(ctx, req)
    }
}

性能考虑

1. 连接复用

连接池配置

go
// gRPC 连接池
type ConnectionPool struct {
    connections []*grpc.ClientConn
    roundRobin  int64
    mutex       sync.RWMutex
}

func NewConnectionPool(target string, size int) (*ConnectionPool, error) {
    pool := &ConnectionPool{
        connections: make([]*grpc.ClientConn, size),
    }
    
    for i := 0; i < size; i++ {
        conn, err := grpc.Dial(target,
            grpc.WithTransportCredentials(insecure.NewCredentials()),
            grpc.WithKeepaliveParams(keepalive.ClientParameters{
                Time:                10 * time.Second,
                Timeout:             3 * time.Second,
                PermitWithoutStream: true,
            }),
        )
        if err != nil {
            return nil, err
        }
        pool.connections[i] = conn
    }
    
    return pool, nil
}

func (p *ConnectionPool) GetConnection() *grpc.ClientConn {
    p.mutex.RLock()
    defer p.mutex.RUnlock()
    
    index := atomic.AddInt64(&p.roundRobin, 1) % int64(len(p.connections))
    return p.connections[index]
}

2. 消息压缩

启用 gRPC 压缩

go
// 服务端启用压缩
server := grpc.NewServer(
    grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
        // 其他中间件...
    )),
    // 启用 gzip 压缩
    grpc.RPCCompressor(grpc.NewGZIPCompressor()),
    grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
)

// 客户端启用压缩
conn, err := grpc.Dial(target,
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithDefaultCallOptions(
        grpc.UseCompressor(gzip.Name),
    ),
)

3. 流式传输优化

大数据流式处理

go
func (s *LedgerService) ExportTransactions(
    req *ledgerv1.ExportTransactionsRequest,
    stream ledgerv1.LedgerService_ExportTransactionsServer,
) error {
    const batchSize = 100
    offset := 0
    
    for {
        // 分批查询数据
        transactions, err := s.transactionRepo.FindWithPagination(
            stream.Context(),
            req.LedgerId,
            offset,
            batchSize,
        )
        if err != nil {
            return status.Error(codes.Internal, err.Error())
        }
        
        if len(transactions) == 0 {
            break // 没有更多数据
        }
        
        // 分批发送数据
        for _, transaction := range transactions {
            response := &ledgerv1.ExportTransactionsResponse{
                Transaction: s.toProtobufTransaction(transaction),
            }
            
            if err := stream.Send(response); err != nil {
                return status.Error(codes.Internal, err.Error())
            }
        }
        
        offset += batchSize
        
        // 检查上下文取消
        if stream.Context().Err() != nil {
            return status.Error(codes.Canceled, "export canceled")
        }
    }
    
    return nil
}

常见问题

1. 版本兼容性问题

问题: 升级 protobuf 定义后客户端出现兼容性问题

解决方案:

protobuf
// ✅ 正确的字段添加方式
message Transaction {
  string id = 1;
  string description = 2;
  int64 amount = 3;
  
  // 新增字段使用新的字段编号
  string note = 4;        // v1.1 新增
  repeated string tags = 5; // v1.2 新增
}

// ❌ 错误的做法
message Transaction {
  string id = 1;
  int64 amount = 2;       // 改变了原有字段编号
  string description = 3; // 改变了原有字段编号
}

2. 性能调优

问题: gRPC 调用延迟过高

诊断和解决:

go
// 启用详细的性能监控
func withMonitoring() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        start := time.Now()
        
        resp, err := handler(ctx, req)
        
        duration := time.Since(start)
        
        // 记录性能指标
        grpcDuration.WithLabelValues(info.FullMethod).Observe(duration.Seconds())
        
        // 慢查询日志
        if duration > 100*time.Millisecond {
            log.Warn("slow grpc call",
                zap.String("method", info.FullMethod),
                zap.Duration("duration", duration),
            )
        }
        
        return resp, err
    }
}

3. 错误处理

问题: 错误信息在 HTTP 和 gRPC 之间转换丢失

解决方案:

go
// 自定义错误类型
type LedgerError struct {
    Code    codes.Code
    Message string
    Details map[string]interface{}
}

func (e *LedgerError) Error() string {
    return e.Message
}

func (e *LedgerError) GRPCStatus() *status.Status {
    st := status.New(e.Code, e.Message)
    
    // 添加错误详情
    if len(e.Details) > 0 {
        details, _ := structpb.NewStruct(e.Details)
        st, _ = st.WithDetails(details)
    }
    
    return st
}

// 业务层使用
func (s *LedgerService) CreateLedger(ctx context.Context, req *CreateLedgerRequest) (*CreateLedgerResponse, error) {
    if req.Name == "" {
        return nil, &LedgerError{
            Code:    codes.InvalidArgument,
            Message: "ledger name is required",
            Details: map[string]interface{}{
                "field": "name",
                "constraint": "not_empty",
            },
        }
    }
    
    // ... 其他逻辑
}

参考资料

官方文档

  1. gRPC 官方文档 - gRPC 完整指南
  2. Protocol Buffers 指南 - Protobuf 语法和最佳实践
  3. grpc-gateway - HTTP/gRPC 网关文档
  4. buf 文档 - 现代化 Protobuf 工具链

经典书籍

  1. 《gRPC: Up and Running》 - Kasun Indrasiri & Danesh Kuruppu
  2. 《Building Microservices》 - Sam Newman
  3. 《API Design Patterns》 - JJ Geewax

最佳实践文章

  1. Google API 设计指南 - Google API 设计最佳实践
  2. gRPC Best Practices - gRPC 官方最佳实践
  3. Protobuf Style Guide - Protobuf 编码规范

工具和框架

  1. buf - 现代化 Protobuf 工具链
  2. grpcui - gRPC Web UI 调试工具
  3. Evans - gRPC 客户端调试工具
  4. grpcurl - gRPC 命令行客户端

实战练习

练习 1: 设计用户管理 API

设计一个用户管理的 gRPC 服务,包含以下功能:

  • 用户注册和登录
  • 用户信息管理
  • 用户权限控制
  • HTTP RESTful API 支持

练习 2: 实现流式数据处理

实现一个流式数据处理服务:

  • 客户端流式上传文件
  • 服务端流式返回处理结果
  • 双向流式聊天服务

练习 3: 多语言客户端集成

为同一个 gRPC 服务生成多语言客户端:

  • Go 客户端
  • Node.js 客户端
  • Python 客户端
  • 测试客户端间的互操作性

练习 4: 性能基准测试

对 gRPC 服务进行性能测试:

  • 单一调用性能测试
  • 并发调用压力测试
  • 流式传输性能测试
  • 与 HTTP REST API 性能对比

这些练习将帮助你深入理解 gRPC 和 Protobuf 的实际应用,掌握在生产环境中的最佳实践。

基于 MIT 许可证发布