gRPC 和 Protobuf 设计模式
概述
gRPC 是 Google 开发的高性能、开源的通用 RPC 框架,基于 HTTP/2 协议和 Protocol Buffers 序列化机制。在 Ledger 模块中,gRPC 作为内部服务通信的主要协议,同时通过 google.api.http 注解提供 HTTP RESTful API 支持,实现"一份定义,双协议支持"的设计目标。
核心优势
- 高性能: 基于 HTTP/2,支持多路复用、流式传输
- 强类型: Protocol Buffers 提供强类型定义和向后兼容
- 多语言: 支持 10+ 种编程语言的客户端生成
- 流式支持: 支持单向和双向流式传输
- 负载均衡: 内置负载均衡和服务发现机制
应用场景
- 微服务间高性能通信
- 多语言系统集成
- 移动端 SDK 生成
- 实时数据流传输
- API 契约管理
核心原理
Protocol Buffers 序列化机制
Protocol Buffers (protobuf) 是一种语言无关、平台无关的可扩展序列化结构数据的方法。
工作原理:
- Schema 定义: 使用 .proto 文件定义数据结构和服务接口
- 代码生成: 通过 protoc 编译器生成各语言的代码
- 序列化: 将内存对象序列化为二进制格式
- 传输: 通过网络传输序列化后的数据
- 反序列化: 接收方将二进制数据反序列化为对象
性能特性:
- 比 JSON 小 3-10 倍
- 比 JSON 快 20-100 倍
- 支持向前和向后兼容性
- 强类型检查
gRPC 通信模型
Ledger 模块 Protobuf 设计
1. 文件组织结构
proto/
├── common/v1/ # 通用类型定义
│ ├── types.proto # 基础类型
│ ├── money.proto # 货币类型
│ ├── pagination.proto # 分页类型
│ └── errors.proto # 错误定义
├── ledger/v1/ # 账本模块
│ ├── ledger_service.proto # 服务定义
│ ├── ledger.proto # 账本实体
│ ├── transaction.proto # 交易实体
│ ├── tag.proto # 标签实体
│ ├── budget.proto # 预算实体
│ └── analytics.proto # 统计分析
└── notification/v1/ # 通知模块
├── notification_service.proto
└── notification.proto2. 通用类型定义
基础类型 (common/v1/types.proto)
syntax = "proto3";
package common.v1;
option go_package = "github.com/FixIterate/lz-stash/gen/common/v1;commonv1";
import "google/protobuf/timestamp.proto";
// 通用 ID 类型
message ID {
string value = 1;
}
// 分页请求
message PaginationRequest {
int32 page_size = 1; // 页大小,默认 20,最大 100
string page_token = 2; // 分页令牌
}
// 分页响应
message PaginationResponse {
string next_page_token = 1; // 下一页令牌
int64 total_count = 2; // 总记录数
}
// 时间范围
message TimeRange {
google.protobuf.Timestamp start_time = 1;
google.protobuf.Timestamp end_time = 2;
}
// 排序参数
message SortOrder {
string field = 1; // 排序字段
enum Direction {
DIRECTION_UNSPECIFIED = 0;
DIRECTION_ASC = 1; // 升序
DIRECTION_DESC = 2; // 降序
}
Direction direction = 2;
}货币类型 (common/v1/money.proto)
syntax = "proto3";
package common.v1;
option go_package = "github.com/FixIterate/lz-stash/gen/common/v1;commonv1";
// 货币枚举
enum Currency {
CURRENCY_UNSPECIFIED = 0;
CURRENCY_CNY = 1; // 人民币
CURRENCY_USD = 2; // 美元
CURRENCY_EUR = 3; // 欧元
CURRENCY_GBP = 4; // 英镑
CURRENCY_JPY = 5; // 日元
CURRENCY_HKD = 6; // 港币
CURRENCY_TWD = 7; // 台币
}
// 金额类型 (以分为单位,避免浮点数精度问题)
message Money {
int64 amount = 1; // 金额,以分为单位
Currency currency = 2; // 货币类型
// 扩展字段,预留未来使用
map<string, string> metadata = 100;
}
// 汇率信息
message ExchangeRate {
Currency from_currency = 1;
Currency to_currency = 2;
double rate = 3;
google.protobuf.Timestamp updated_at = 4;
}3. 账本服务定义
服务接口 (ledger/v1/ledger_service.proto)
syntax = "proto3";
package ledger.v1;
option go_package = "github.com/FixIterate/lz-stash/gen/ledger/v1;ledgerv1";
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "common/v1/types.proto";
import "ledger/v1/ledger.proto";
import "ledger/v1/transaction.proto";
import "ledger/v1/tag.proto";
import "ledger/v1/budget.proto";
import "ledger/v1/analytics.proto";
// 账本管理服务
service LedgerService {
// 账本管理
rpc CreateLedger(CreateLedgerRequest) returns (CreateLedgerResponse) {
option (google.api.http) = {
post: "/api/v1/ledgers"
body: "*"
};
}
rpc GetLedger(GetLedgerRequest) returns (GetLedgerResponse) {
option (google.api.http) = {
get: "/api/v1/ledgers/{id}"
};
}
rpc ListLedgers(ListLedgersRequest) returns (ListLedgersResponse) {
option (google.api.http) = {
get: "/api/v1/ledgers"
};
}
rpc UpdateLedger(UpdateLedgerRequest) returns (UpdateLedgerResponse) {
option (google.api.http) = {
put: "/api/v1/ledgers/{id}"
body: "*"
};
}
rpc DeleteLedger(DeleteLedgerRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
delete: "/api/v1/ledgers/{id}"
};
}
// 交易记录管理
rpc CreateTransaction(CreateTransactionRequest) returns (CreateTransactionResponse) {
option (google.api.http) = {
post: "/api/v1/ledgers/{ledger_id}/transactions"
body: "*"
};
}
rpc BatchCreateTransactions(BatchCreateTransactionsRequest) returns (BatchCreateTransactionsResponse) {
option (google.api.http) = {
post: "/api/v1/ledgers/{ledger_id}/transactions:batch"
body: "*"
};
}
rpc GetTransaction(GetTransactionRequest) returns (GetTransactionResponse) {
option (google.api.http) = {
get: "/api/v1/transactions/{id}"
};
}
rpc ListTransactions(ListTransactionsRequest) returns (ListTransactionsResponse) {
option (google.api.http) = {
get: "/api/v1/ledgers/{ledger_id}/transactions"
};
}
rpc UpdateTransaction(UpdateTransactionRequest) returns (UpdateTransactionResponse) {
option (google.api.http) = {
put: "/api/v1/transactions/{id}"
body: "*"
};
}
rpc DeleteTransaction(DeleteTransactionRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
delete: "/api/v1/transactions/{id}"
};
}
// 标签管理
rpc CreateTag(CreateTagRequest) returns (CreateTagResponse) {
option (google.api.http) = {
post: "/api/v1/ledgers/{ledger_id}/tags"
body: "*"
};
}
rpc ListTags(ListTagsRequest) returns (ListTagsResponse) {
option (google.api.http) = {
get: "/api/v1/ledgers/{ledger_id}/tags"
};
}
// 预算管理
rpc CreateBudget(CreateBudgetRequest) returns (CreateBudgetResponse) {
option (google.api.http) = {
post: "/api/v1/ledgers/{ledger_id}/budgets"
body: "*"
};
}
rpc GetBudget(GetBudgetRequest) returns (GetBudgetResponse) {
option (google.api.http) = {
get: "/api/v1/budgets/{id}"
};
}
// 统计分析
rpc GetAnalytics(GetAnalyticsRequest) returns (GetAnalyticsResponse) {
option (google.api.http) = {
get: "/api/v1/ledgers/{ledger_id}/analytics"
};
}
rpc GetTrendAnalysis(GetTrendAnalysisRequest) returns (GetTrendAnalysisResponse) {
option (google.api.http) = {
get: "/api/v1/ledgers/{ledger_id}/analytics/trends"
};
}
}
// 移动端优化服务
service MobileLedgerService {
// 快速记账
rpc QuickCreateTransaction(QuickCreateTransactionRequest) returns (QuickCreateTransactionResponse) {
option (google.api.http) = {
post: "/api/mobile/v1/transactions/quick"
body: "*"
};
}
// 数据同步
rpc SyncData(SyncDataRequest) returns (SyncDataResponse) {
option (google.api.http) = {
post: "/api/mobile/v1/sync"
body: "*"
};
}
// 首页仪表板
rpc GetDashboard(GetDashboardRequest) returns (GetDashboardResponse) {
option (google.api.http) = {
get: "/api/mobile/v1/dashboard"
};
}
}
// 管理后台服务
service AdminLedgerService {
// 用户账本管理
rpc GetUserLedgers(GetUserLedgersRequest) returns (GetUserLedgersResponse) {
option (google.api.http) = {
get: "/api/admin/v1/users/{user_id}/ledgers"
};
}
// 系统统计
rpc GetSystemStats(GetSystemStatsRequest) returns (GetSystemStatsResponse) {
option (google.api.http) = {
get: "/api/admin/v1/stats"
};
}
}4. 实体定义示例
账本实体 (ledger/v1/ledger.proto)
syntax = "proto3";
package ledger.v1;
option go_package = "github.com/FixIterate/lz-stash/gen/ledger/v1;ledgerv1";
import "google/protobuf/timestamp.proto";
import "common/v1/types.proto";
import "common/v1/money.proto";
// 账本类型
enum LedgerType {
LEDGER_TYPE_UNSPECIFIED = 0;
LEDGER_TYPE_PERSONAL = 1; // 个人账本
LEDGER_TYPE_FAMILY = 2; // 家庭账本
LEDGER_TYPE_PROJECT = 3; // 项目账本
LEDGER_TYPE_TRAVEL = 4; // 旅行账本
LEDGER_TYPE_BUSINESS = 5; // 商务账本
}
// 账本状态
enum LedgerStatus {
LEDGER_STATUS_UNSPECIFIED = 0;
LEDGER_STATUS_ACTIVE = 1; // 活跃
LEDGER_STATUS_ARCHIVED = 2; // 已归档
LEDGER_STATUS_DELETED = 3; // 已删除
}
// 账本实体
message Ledger {
string id = 1; // 账本 ID
string name = 2; // 账本名称
string description = 3; // 账本描述
LedgerType type = 4; // 账本类型
LedgerStatus status = 5; // 账本状态
common.v1.Currency default_currency = 6; // 默认货币
string owner_id = 7; // 所有者 ID
LedgerSettings settings = 9; // 账本设置
google.protobuf.Timestamp created_at = 10; // 创建时间
google.protobuf.Timestamp updated_at = 11; // 更新时间
int64 version = 12; // 版本号(乐观锁)
// 扩展字段
map<string, string> metadata = 100;
}
// 账本设置
message LedgerSettings {
bool enable_budget_alerts = 1; // 启用预算提醒
bool enable_auto_categorization = 2; // 启用自动分类
bool enable_receipt_ocr = 3; // 启用小票 OCR
int32 budget_alert_threshold = 4; // 预算提醒阈值 (%)
string timezone = 5; // 时区
string locale = 6; // 语言地区
}
// 创建账本请求
message CreateLedgerRequest {
string name = 1; // 账本名称
string description = 2; // 账本描述
LedgerType type = 3; // 账本类型
common.v1.Currency default_currency = 4; // 默认货币
LedgerSettings settings = 5; // 账本设置
}
// 创建账本响应
message CreateLedgerResponse {
Ledger ledger = 1;
}
// 其他请求响应消息...实现方案
1. buf 配置管理
buf.yaml
version: v1
name: buf.build/FixIterate/lz-stash
breaking:
use:
- FILE
lint:
use:
- DEFAULT
except:
- UNARY_RPC
enum_zero_value_suffix: _UNSPECIFIED
rpc_allow_same_request_response: false
rpc_allow_google_protobuf_empty_requests: true
rpc_allow_google_protobuf_empty_responses: true
deps:
- buf.build/googleapis/googleapis
- buf.build/protocolbuffers/wellknowntypesbuf.gen.yaml (Go 代码生成)
version: v1
managed:
enabled: true
go_package_prefix:
default: github.com/FixIterate/lz-stash/gen
except:
- buf.build/googleapis/googleapis
plugins:
- plugin: buf.build/protocolbuffers/go
out: gen
opt:
- paths=source_relative
- module=github.com/FixIterate/lz-stash
- plugin: buf.build/grpc/go
out: gen
opt:
- paths=source_relative
- require_unimplemented_servers=false
- plugin: buf.build/grpc-ecosystem/grpc-gateway
out: gen
opt:
- paths=source_relative
- generate_unbound_methods=true
- plugin: buf.build/grpc-ecosystem/openapiv2
out: gen/openapi多语言客户端生成配置
buf.gen.nodejs.yaml
version: v1
plugins:
- plugin: buf.build/connectrpc/es
out: clients/nodejs
opt:
- target=ts
- plugin: buf.build/protocolbuffers/js
out: clients/nodejs
opt:
- import_style=commonjsbuf.gen.python.yaml
version: v1
plugins:
- plugin: buf.build/protocolbuffers/python
out: clients/python
- plugin: buf.build/grpc/python
out: clients/pythonbuf.gen.php.yaml
version: v1
plugins:
- plugin: buf.build/protocolbuffers/php
out: clients/php
- plugin: buf.build/grpc/php
out: clients/php2. 服务实现模板
gRPC 服务实现
package service
import (
"context"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
ledgerv1 "github.com/FixIterate/lz-stash/gen/ledger/v1"
"github.com/FixIterate/lz-stash/internal/domain/ledger"
)
// LedgerService gRPC 服务实现
type LedgerService struct {
ledgerv1.UnimplementedLedgerServiceServer
// 依赖注入
ledgerRepo ledger.Repository
transactionRepo transaction.Repository
budgetService budget.Service
notificationSvc notification.Service
cacheManager cache.Manager
eventPublisher event.Publisher
}
// CreateLedger 创建账本
func (s *LedgerService) CreateLedger(
ctx context.Context,
req *ledgerv1.CreateLedgerRequest,
) (*ledgerv1.CreateLedgerResponse, error) {
// 1. 参数验证
if err := s.validateCreateLedgerRequest(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
// 2. 构建领域对象
ledgerEntity, err := s.buildLedgerFromRequest(req)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// 3. 业务逻辑处理
if err := s.ledgerRepo.Save(ctx, ledgerEntity); err != nil {
return nil, status.Error(codes.Internal,
fmt.Sprintf("failed to save ledger: %v", err))
}
// 4. 发布领域事件
event := ledger.NewLedgerCreatedEvent(ledgerEntity)
if err := s.eventPublisher.Publish(ctx, event); err != nil {
// 记录日志,但不影响主流程
s.logger.Warn("failed to publish event", "error", err)
}
// 5. 构建响应
response := &ledgerv1.CreateLedgerResponse{
Ledger: s.toProtobufLedger(ledgerEntity),
}
return response, nil
}
// 参数验证
func (s *LedgerService) validateCreateLedgerRequest(
req *ledgerv1.CreateLedgerRequest,
) error {
if req.Name == "" {
return fmt.Errorf("ledger name is required")
}
if len(req.Name) > 100 {
return fmt.Errorf("ledger name too long (max 100 characters)")
}
if req.Type == ledgerv1.LedgerType_LEDGER_TYPE_UNSPECIFIED {
return fmt.Errorf("ledger type is required")
}
return nil
}
// 领域对象转换
func (s *LedgerService) toProtobufLedger(entity *ledger.Ledger) *ledgerv1.Ledger {
return &ledgerv1.Ledger{
Id: entity.ID().String(),
Name: entity.Name(),
Description: entity.Description(),
Type: s.toProtobufLedgerType(entity.Type()),
Status: s.toProtobufLedgerStatus(entity.Status()),
DefaultCurrency: s.toProtobufCurrency(entity.DefaultCurrency()),
OwnerId: entity.OwnerID().String(),
Members: s.toProtobufMembers(entity.Members()),
Settings: s.toProtobufSettings(entity.Settings()),
CreatedAt: timestamppb.New(entity.CreatedAt()),
UpdatedAt: timestamppb.New(entity.UpdatedAt()),
Version: entity.Version(),
}
}3. HTTP Gateway 集成
HTTP 服务器设置
package server
import (
"context"
"net/http"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
ledgerv1 "github.com/FixIterate/lz-stash/gen/ledger/v1"
)
// HTTPServer HTTP 网关服务器
type HTTPServer struct {
mux *runtime.ServeMux
grpcServer *grpc.Server
httpServer *http.Server
}
// NewHTTPServer 创建 HTTP 服务器
func NewHTTPServer(grpcAddr string, httpAddr string) *HTTPServer {
// 创建 gRPC Gateway 多路复用器
mux := runtime.NewServeMux(
runtime.WithIncomingHeaderMatcher(customHeaderMatcher),
runtime.WithOutgoingHeaderMatcher(customHeaderMatcher),
runtime.WithErrorHandler(customErrorHandler),
runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{
UseProtoNames: true,
EmitUnpopulated: true,
},
UnmarshalOptions: protojson.UnmarshalOptions{
DiscardUnknown: true,
},
}),
)
return &HTTPServer{
mux: mux,
httpServer: &http.Server{
Addr: httpAddr,
Handler: mux,
},
}
}
// RegisterServices 注册服务
func (s *HTTPServer) RegisterServices(ctx context.Context, grpcAddr string) error {
conn, err := grpc.DialContext(
ctx,
grpcAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return fmt.Errorf("failed to dial gRPC server: %w", err)
}
// 注册 Ledger 服务
if err := ledgerv1.RegisterLedgerServiceHandler(ctx, s.mux, conn); err != nil {
return fmt.Errorf("failed to register ledger service: %w", err)
}
// 注册移动端服务
if err := ledgerv1.RegisterMobileLedgerServiceHandler(ctx, s.mux, conn); err != nil {
return fmt.Errorf("failed to register mobile service: %w", err)
}
return nil
}
// 自定义错误处理
func customErrorHandler(
ctx context.Context,
mux *runtime.ServeMux,
marshaler runtime.Marshaler,
w http.ResponseWriter,
r *http.Request,
err error,
) {
const fallback = `{"error": "internal server error"}`
w.Header().Set("Content-Type", marshaler.ContentType("application/json"))
if s, ok := status.FromError(err); ok {
w.WriteHeader(runtime.HTTPStatusFromCode(s.Code()))
errorResponse := map[string]interface{}{
"error": s.Message(),
"code": int(s.Code()),
}
if details := s.Details(); len(details) > 0 {
errorResponse["details"] = details
}
if buf, marshalErr := marshaler.Marshal(errorResponse); marshalErr == nil {
w.Write(buf)
return
}
}
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fallback))
}最佳实践
1. API 设计原则
RESTful 路径设计
// ✅ 良好的 RESTful 设计
rpc CreateLedger(CreateLedgerRequest) returns (CreateLedgerResponse) {
option (google.api.http) = {
post: "/api/v1/ledgers"
body: "*"
};
}
rpc GetTransaction(GetTransactionRequest) returns (GetTransactionResponse) {
option (google.api.http) = {
get: "/api/v1/transactions/{id}"
};
}
rpc ListTransactions(ListTransactionsRequest) returns (ListTransactionsResponse) {
option (google.api.http) = {
get: "/api/v1/ledgers/{ledger_id}/transactions"
};
}
// ❌ 避免的设计
rpc GetLedgerData(GetLedgerDataRequest) returns (GetLedgerDataResponse) {
option (google.api.http) = {
post: "/api/v1/getLedgerData" // 不符合 RESTful 原则
body: "*"
};
}响应状态码映射
// gRPC 状态码到 HTTP 状态码的映射
var grpcToHTTPStatus = map[codes.Code]int{
codes.OK: 200, // 成功
codes.InvalidArgument: 400, // 参数错误
codes.Unauthenticated: 401, // 未认证
codes.PermissionDenied: 403, // 权限不足
codes.NotFound: 404, // 资源不存在
codes.AlreadyExists: 409, // 资源冲突
codes.ResourceExhausted: 429, // 请求过于频繁
codes.Internal: 500, // 服务器内部错误
codes.Unavailable: 503, // 服务不可用
}2. 版本管理策略
语义化版本控制
// v1 版本
syntax = "proto3";
package ledger.v1;
option go_package = "github.com/FixIterate/lz-stash/gen/ledger/v1;ledgerv1";
message Ledger {
string id = 1;
string name = 2;
string description = 3;
// 新增字段使用高数字标签
string owner_id = 10; // v1.1 新增
repeated string tags = 11; // v1.2 新增
}
// v2 版本 (破坏性变更)
syntax = "proto3";
package ledger.v2;
option go_package = "github.com/FixIterate/lz-stash/gen/ledger/v2;ledgerv2";
message Ledger {
string id = 1;
string name = 2;
// 移除 description 字段
LedgerType type = 3; // 新增枚举类型
LedgerSettings settings = 4;
}向后兼容性保证
// ✅ 兼容性更新
message Transaction {
string id = 1;
string description = 2;
int64 amount = 3;
// 新增可选字段
string note = 4; // 新增
repeated string attachments = 5; // 新增
// 预留字段范围
reserved 100 to 199; // 为未来扩展预留
}
// ❌ 破坏兼容性的更新
message Transaction {
string id = 1;
int64 amount = 2; // 改变字段编号
float amount_float = 3; // 改变字段类型
// 删除 description 字段
}3. 性能优化技巧
流式传输
// 大量数据流式传输
service LedgerService {
// 流式导出交易记录
rpc ExportTransactions(ExportTransactionsRequest)
returns (stream ExportTransactionsResponse);
// 流式导入交易记录
rpc ImportTransactions(stream ImportTransactionsRequest)
returns (ImportTransactionsResponse);
// 双向流式同步
rpc SyncTransactions(stream SyncTransactionsRequest)
returns (stream SyncTransactionsResponse);
}批量操作
// 批量创建
message BatchCreateTransactionsRequest {
string ledger_id = 1;
repeated CreateTransactionRequest requests = 2;
bool skip_on_error = 3; // 遇到错误是否跳过
bool validate_only = 4; // 仅验证不执行
}
message BatchCreateTransactionsResponse {
repeated CreateTransactionResponse results = 1;
repeated BatchError errors = 2; // 错误列表
int32 success_count = 3; // 成功数量
int32 error_count = 4; // 错误数量
}
message BatchError {
int32 index = 1; // 错误请求索引
string message = 2; // 错误信息
int32 code = 3; // 错误码
}4. 安全考虑
字段验证
import "validate/validate.proto";
message CreateLedgerRequest {
string name = 1 [(validate.rules).string = {
min_len: 1,
max_len: 100,
pattern: "^[a-zA-Z0-9\\s\\-_]+$"
}];
string description = 2 [(validate.rules).string = {
max_len: 500
}];
repeated string member_ids = 3 [(validate.rules).repeated = {
min_items: 0,
max_items: 10,
items: {
string: {
pattern: "^[a-zA-Z0-9\\-]+$"
}
}
}];
}敏感信息处理
// 敏感字段标记
message User {
string id = 1;
string username = 2;
string email = 3 [(sensitive) = true]; // 敏感信息标记
string phone = 4 [(sensitive) = true];
string password_hash = 5 [(internal) = true]; // 内部字段,不对外暴露
}
// 脱敏响应
message PublicUser {
string id = 1;
string username = 2;
string masked_email = 3; // 脱敏后的邮箱
// 不包含敏感信息
}代码示例
1. 完整的客户端示例
Go 客户端
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
ledgerv1 "github.com/FixIterate/lz-stash/gen/ledger/v1"
commonv1 "github.com/FixIterate/lz-stash/gen/common/v1"
)
func main() {
// 建立 gRPC 连接
conn, err := grpc.Dial(
"localhost:9090",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithTimeout(5*time.Second),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
// 创建客户端
client := ledgerv1.NewLedgerServiceClient(conn)
// 创建账本
ledger, err := createLedger(client)
if err != nil {
log.Fatalf("Failed to create ledger: %v", err)
}
log.Printf("Created ledger: %s", ledger.Id)
// 创建交易记录
transaction, err := createTransaction(client, ledger.Id)
if err != nil {
log.Fatalf("Failed to create transaction: %v", err)
}
log.Printf("Created transaction: %s", transaction.Id)
// 查询统计信息
analytics, err := getAnalytics(client, ledger.Id)
if err != nil {
log.Fatalf("Failed to get analytics: %v", err)
}
log.Printf("Total income: %d, Total expense: %d",
analytics.Summary.TotalIncome.Amount,
analytics.Summary.TotalExpense.Amount)
}
func createLedger(client ledgerv1.LedgerServiceClient) (*ledgerv1.Ledger, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req := &ledgerv1.CreateLedgerRequest{
Name: "我的账本",
Description: "个人日常账本",
Type: ledgerv1.LedgerType_LEDGER_TYPE_PERSONAL,
DefaultCurrency: commonv1.Currency_CURRENCY_CNY,
Settings: &ledgerv1.LedgerSettings{
EnableBudgetAlerts: true,
BudgetAlertThreshold: 80,
EnableAutoCategorization: true,
Timezone: "Asia/Shanghai",
Locale: "zh-CN",
},
}
resp, err := client.CreateLedger(ctx, req)
if err != nil {
return nil, err
}
return resp.Ledger, nil
}
func createTransaction(client ledgerv1.LedgerServiceClient, ledgerID string) (*ledgerv1.Transaction, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req := &ledgerv1.CreateTransactionRequest{
LedgerId: ledgerID,
Type: ledgerv1.TransactionType_TRANSACTION_TYPE_EXPENSE,
Amount: &commonv1.Money{
Amount: 3500, // 35.00 元
Currency: commonv1.Currency_CURRENCY_CNY,
},
Description: "午餐",
Note: "公司楼下餐厅",
TagIds: []string{"food", "lunch"},
}
resp, err := client.CreateTransaction(ctx, req)
if err != nil {
return nil, err
}
return resp.Transaction, nil
}
func getAnalytics(client ledgerv1.LedgerServiceClient, ledgerID string) (*ledgerv1.AnalyticsResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req := &ledgerv1.GetAnalyticsRequest{
LedgerId: ledgerID,
TimeRange: &commonv1.TimeRange{
StartTime: timestamppb.New(time.Now().AddDate(0, -1, 0)), // 一个月前
EndTime: timestamppb.New(time.Now()),
},
}
resp, err := client.GetAnalytics(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}Node.js 客户端
import { PromiseClient, createPromiseClient } from "@connectrpc/connect";
import { createGrpcTransport } from "@connectrpc/connect-node";
import { LedgerService } from "./gen/ledger/v1/ledger_service_connect";
import {
CreateLedgerRequest,
CreateTransactionRequest,
LedgerType,
TransactionType
} from "./gen/ledger/v1/ledger_pb";
import { Currency } from "./gen/common/v1/money_pb";
async function main() {
// 创建传输层
const transport = createGrpcTransport({
baseUrl: "http://localhost:9090",
httpVersion: "2",
});
// 创建客户端
const client: PromiseClient<typeof LedgerService> = createPromiseClient(
LedgerService,
transport
);
try {
// 创建账本
const createLedgerReq = new CreateLedgerRequest({
name: "我的账本",
description: "个人日常账本",
type: LedgerType.LEDGER_TYPE_PERSONAL,
defaultCurrency: Currency.CURRENCY_CNY,
settings: {
enableBudgetAlerts: true,
budgetAlertThreshold: 80,
enableAutoCategorization: true,
timezone: "Asia/Shanghai",
locale: "zh-CN",
},
});
const ledgerResp = await client.createLedger(createLedgerReq);
console.log("Created ledger:", ledgerResp.ledger?.id);
// 创建交易记录
const createTransactionReq = new CreateTransactionRequest({
ledgerId: ledgerResp.ledger?.id,
type: TransactionType.TRANSACTION_TYPE_EXPENSE,
amount: {
amount: BigInt(3500), // 35.00 元
currency: Currency.CURRENCY_CNY,
},
description: "午餐",
note: "公司楼下餐厅",
tagIds: ["food", "lunch"],
});
const transactionResp = await client.createTransaction(createTransactionReq);
console.log("Created transaction:", transactionResp.transaction?.id);
} catch (error) {
console.error("Error:", error);
}
}
main().catch(console.error);2. 中间件集成示例
认证中间件
func AuthInterceptor(authService auth.Service) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// 跳过不需要认证的方法
if isPublicMethod(info.FullMethod) {
return handler(ctx, req)
}
// 从元数据中提取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "missing metadata")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "missing token")
}
token := strings.TrimPrefix(tokens[0], "Bearer ")
// 验证 token
user, err := authService.ValidateToken(ctx, token)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
// 将用户信息添加到上下文
ctx = auth.WithUser(ctx, user)
return handler(ctx, req)
}
}
func isPublicMethod(method string) bool {
publicMethods := []string{
"/ledger.v1.LedgerService/HealthCheck",
"/ledger.v1.LedgerService/GetVersion",
}
for _, public := range publicMethods {
if method == public {
return true
}
}
return false
}限流中间件
func RateLimitInterceptor(limiter ratelimit.Limiter) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// 获取用户 ID
userID := auth.GetUserID(ctx)
if userID == "" {
userID = "anonymous"
}
// 构建限流 key
key := fmt.Sprintf("user:%s:method:%s", userID, info.FullMethod)
// 检查限流
if !limiter.Allow(key) {
return nil, status.Error(codes.ResourceExhausted, "rate limit exceeded")
}
return handler(ctx, req)
}
}性能考虑
1. 连接复用
连接池配置
// gRPC 连接池
type ConnectionPool struct {
connections []*grpc.ClientConn
roundRobin int64
mutex sync.RWMutex
}
func NewConnectionPool(target string, size int) (*ConnectionPool, error) {
pool := &ConnectionPool{
connections: make([]*grpc.ClientConn, size),
}
for i := 0; i < size; i++ {
conn, err := grpc.Dial(target,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
)
if err != nil {
return nil, err
}
pool.connections[i] = conn
}
return pool, nil
}
func (p *ConnectionPool) GetConnection() *grpc.ClientConn {
p.mutex.RLock()
defer p.mutex.RUnlock()
index := atomic.AddInt64(&p.roundRobin, 1) % int64(len(p.connections))
return p.connections[index]
}2. 消息压缩
启用 gRPC 压缩
// 服务端启用压缩
server := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
// 其他中间件...
)),
// 启用 gzip 压缩
grpc.RPCCompressor(grpc.NewGZIPCompressor()),
grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
)
// 客户端启用压缩
conn, err := grpc.Dial(target,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.UseCompressor(gzip.Name),
),
)3. 流式传输优化
大数据流式处理
func (s *LedgerService) ExportTransactions(
req *ledgerv1.ExportTransactionsRequest,
stream ledgerv1.LedgerService_ExportTransactionsServer,
) error {
const batchSize = 100
offset := 0
for {
// 分批查询数据
transactions, err := s.transactionRepo.FindWithPagination(
stream.Context(),
req.LedgerId,
offset,
batchSize,
)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
if len(transactions) == 0 {
break // 没有更多数据
}
// 分批发送数据
for _, transaction := range transactions {
response := &ledgerv1.ExportTransactionsResponse{
Transaction: s.toProtobufTransaction(transaction),
}
if err := stream.Send(response); err != nil {
return status.Error(codes.Internal, err.Error())
}
}
offset += batchSize
// 检查上下文取消
if stream.Context().Err() != nil {
return status.Error(codes.Canceled, "export canceled")
}
}
return nil
}常见问题
1. 版本兼容性问题
问题: 升级 protobuf 定义后客户端出现兼容性问题
解决方案:
// ✅ 正确的字段添加方式
message Transaction {
string id = 1;
string description = 2;
int64 amount = 3;
// 新增字段使用新的字段编号
string note = 4; // v1.1 新增
repeated string tags = 5; // v1.2 新增
}
// ❌ 错误的做法
message Transaction {
string id = 1;
int64 amount = 2; // 改变了原有字段编号
string description = 3; // 改变了原有字段编号
}2. 性能调优
问题: gRPC 调用延迟过高
诊断和解决:
// 启用详细的性能监控
func withMonitoring() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
duration := time.Since(start)
// 记录性能指标
grpcDuration.WithLabelValues(info.FullMethod).Observe(duration.Seconds())
// 慢查询日志
if duration > 100*time.Millisecond {
log.Warn("slow grpc call",
zap.String("method", info.FullMethod),
zap.Duration("duration", duration),
)
}
return resp, err
}
}3. 错误处理
问题: 错误信息在 HTTP 和 gRPC 之间转换丢失
解决方案:
// 自定义错误类型
type LedgerError struct {
Code codes.Code
Message string
Details map[string]interface{}
}
func (e *LedgerError) Error() string {
return e.Message
}
func (e *LedgerError) GRPCStatus() *status.Status {
st := status.New(e.Code, e.Message)
// 添加错误详情
if len(e.Details) > 0 {
details, _ := structpb.NewStruct(e.Details)
st, _ = st.WithDetails(details)
}
return st
}
// 业务层使用
func (s *LedgerService) CreateLedger(ctx context.Context, req *CreateLedgerRequest) (*CreateLedgerResponse, error) {
if req.Name == "" {
return nil, &LedgerError{
Code: codes.InvalidArgument,
Message: "ledger name is required",
Details: map[string]interface{}{
"field": "name",
"constraint": "not_empty",
},
}
}
// ... 其他逻辑
}参考资料
官方文档
- gRPC 官方文档 - gRPC 完整指南
- Protocol Buffers 指南 - Protobuf 语法和最佳实践
- grpc-gateway - HTTP/gRPC 网关文档
- buf 文档 - 现代化 Protobuf 工具链
经典书籍
- 《gRPC: Up and Running》 - Kasun Indrasiri & Danesh Kuruppu
- 《Building Microservices》 - Sam Newman
- 《API Design Patterns》 - JJ Geewax
最佳实践文章
- Google API 设计指南 - Google API 设计最佳实践
- gRPC Best Practices - gRPC 官方最佳实践
- Protobuf Style Guide - Protobuf 编码规范
工具和框架
实战练习
练习 1: 设计用户管理 API
设计一个用户管理的 gRPC 服务,包含以下功能:
- 用户注册和登录
- 用户信息管理
- 用户权限控制
- HTTP RESTful API 支持
练习 2: 实现流式数据处理
实现一个流式数据处理服务:
- 客户端流式上传文件
- 服务端流式返回处理结果
- 双向流式聊天服务
练习 3: 多语言客户端集成
为同一个 gRPC 服务生成多语言客户端:
- Go 客户端
- Node.js 客户端
- Python 客户端
- 测试客户端间的互操作性
练习 4: 性能基准测试
对 gRPC 服务进行性能测试:
- 单一调用性能测试
- 并发调用压力测试
- 流式传输性能测试
- 与 HTTP REST API 性能对比
这些练习将帮助你深入理解 gRPC 和 Protobuf 的实际应用,掌握在生产环境中的最佳实践。