gRPC服务通信设计与实践
概述
在微服务架构中,服务间通信是关键环节。相比REST API,gRPC提供了更高的性能和更强的类型安全。本文将介绍如何在微服务平台中设计和实现gRPC服务通信。
为什么选择gRPC
gRPC vs REST对比
| 特性 | gRPC | REST |
|---|---|---|
| 协议 | HTTP/2 | HTTP/1.1 |
| 序列化 | Protocol Buffers (二进制) | JSON (文本) |
| 性能 | 高(二进制+压缩) | 中(文本开销) |
| 类型安全 | 强(代码生成) | 弱(运行时检查) |
| 流式通信 | 原生支持(双向流) | 需额外实现(SSE/WebSocket) |
| 代码生成 | 自动生成 | 手动编写 |
| 浏览器支持 | 需gRPC-Web | 原生支持 |
| 调试难度 | 较难 | 容易 |
适用场景
- 内部服务通信:微服务间高性能调用
- 流式数据处理:日志、事件流
- 多语言服务:统一协议跨语言通信
- 低延迟要求:高频调用场景
gRPC架构设计
服务通信拓扑

Protocol Buffers定义
用户信息服务
// grpc/proto/userInfo.proto
syntax = "proto3";
package user;
option go_package = "github.com/org/micro/proto/user";
// 用户信息服务
service UserInfoService {
// 获取用户信息
rpc GetUserInfo(UserInfoRequest) returns (UserInfoResponse);
// 批量获取用户信息
rpc GetUserInfoBatch(UserInfoBatchRequest) returns (UserInfoBatchResponse);
// 更新用户信息
rpc UpdateUserInfo(UpdateUserInfoRequest) returns (UserInfoResponse);
// 搜索用户
rpc SearchUsers(SearchUsersRequest) returns (SearchUsersResponse);
// 流式获取用户列表(服务端流)
rpc StreamUsers(StreamUsersRequest) returns (stream UserInfo);
}
// 用户基础信息
message UserInfo {
string userId = 1;
string username = 2;
string email = 3;
string phone = 4;
string avatar = 5;
string realName = 6;
int32 status = 7;
string createdAt = 8;
string updatedAt = 9;
repeated string roles = 10;
repeated string permissions = 11;
}
// 请求:获取用户信息
message UserInfoRequest {
string userId = 1;
bool includeRoles = 2;
bool includePermissions = 3;
}
// 响应:用户信息
message UserInfoResponse {
bool success = 1;
string message = 2;
UserInfo user = 3;
}
// 请求:批量获取
message UserInfoBatchRequest {
repeated string userIds = 1;
}
// 响应:批量用户信息
message UserInfoBatchResponse {
bool success = 1;
string message = 2;
repeated UserInfo users = 3;
}
// 请求:更新用户信息
message UpdateUserInfoRequest {
string userId = 1;
string username = 2;
string email = 3;
string phone = 4;
string avatar = 5;
string realName = 6;
}
// 请求:搜索用户
message SearchUsersRequest {
string keyword = 1;
int32 page = 2;
int32 pageSize = 3;
}
// 响应:搜索用户
message SearchUsersResponse {
bool success = 1;
string message = 2;
repeated UserInfo users = 3;
int32 total = 4;
}
// 请求:流式获取
message StreamUsersRequest {
int32 batchSize = 1;
}
RAG知识库搜索服务
// grpc/proto/ragSearch.proto
syntax = "proto3";
package rag;
// RAG搜索服务
service RagSearchService {
// 语义搜索
rpc Search(RagSearchRequest) returns (RagSearchResponse);
// 获取分片详情
rpc GetChunk(GetChunkRequest) returns (ChunkResponse);
// 批量获取分片
rpc GetChunksBatch(GetChunksBatchRequest) returns (ChunksBatchResponse);
// 流式搜索(服务端流)
rpc StreamSearch(StreamSearchRequest) returns (stream SearchResult);
}
// 搜索请求
message RagSearchRequest {
string ragCode = 1;
string query = 2;
int32 limit = 3;
float minScore = 4;
repeated string filters = 5;
}
// 搜索结果
message SearchResult {
string ragChunkCode = 1;
string ragCode = 2;
string materialCode = 3;
string content = 4;
float score = 5;
int32 chunkIndex = 6;
}
// 搜索响应
message RagSearchResponse {
bool success = 1;
string message = 2;
repeated SearchResult results = 3;
int32 total = 4;
string query = 5;
}
// 分片详情请求
message GetChunkRequest {
string ragChunkCode = 1;
}
// 分片详情响应
message ChunkResponse {
bool success = 1;
string ragChunkCode = 2;
string content = 3;
string materialCode = 4;
int32 tokenCount = 5;
}
// 批量获取请求
message GetChunksBatchRequest {
repeated string ragChunkCodes = 1;
}
// 批量获取响应
message ChunksBatchResponse {
bool success = 1;
repeated ChunkResponse chunks = 2;
}
// 流式搜索请求
message StreamSearchRequest {
string ragCode = 1;
string query = 2;
int32 limit = 3;
}
Node.js gRPC实现
gRPC服务端
// llm.api/grpc/servers/userInfo.js
import grpc from '@grpc/grpc-js'
import protoLoader from '@grpc/proto-loader'
import path from 'path'
const PROTO_PATH = path.join(process.cwd(), 'grpc/proto/userInfo.proto')
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
})
const userProto = grpc.loadPackageDefinition(packageDefinition).user
export default async function userInfoServer(fastify) {
const server = new grpc.Server()
// 实现UserInfoService
server.addService(userProto.UserInfoService.service, {
// 获取用户信息
GetUserInfo: async (call, callback) => {
try {
const { userId, includeRoles, includePermissions } = call.request
// 从MongoDB查询用户
const user = await fastify.mongo.db
.collection('users')
.findOne({ userId })
if (!user) {
return callback({
code: grpc.status.NOT_FOUND,
message: 'User not found',
})
}
// 构建响应
const userInfo = {
userId: user.userId,
username: user.username,
email: user.email,
phone: user.phone || '',
avatar: user.avatar || '',
realName: user.realName || '',
status: user.status,
createdAt: user.createdAt?.toISOString() || '',
updatedAt: user.updatedAt?.toISOString() || '',
}
// 获取角色和权限
if (includeRoles || includePermissions) {
const userRoles = await fastify.mongo.db
.collection('userRoles')
.find({ userId })
.toArray()
if (includeRoles) {
userInfo.roles = userRoles.map(ur => ur.roleCode)
}
if (includePermissions) {
const permissions = await fastify.mongo.db
.collection('rolePermissions')
.find({ roleCode: { $in: userRoles.map(ur => ur.roleCode) } })
.toArray()
userInfo.permissions = permissions.map(p => p.permissionCode)
}
}
callback(null, {
success: true,
message: 'Success',
user: userInfo,
})
} catch (error) {
callback({
code: grpc.status.INTERNAL,
message: error.message,
})
}
},
// 批量获取用户信息
GetUserInfoBatch: async (call, callback) => {
try {
const { userIds } = call.request
const users = await fastify.mongo.db
.collection('users')
.find({ userId: { $in: userIds } })
.toArray()
const userInfos = users.map(user => ({
userId: user.userId,
username: user.username,
email: user.email,
phone: user.phone || '',
avatar: user.avatar || '',
realName: user.realName || '',
status: user.status,
createdAt: user.createdAt?.toISOString() || '',
updatedAt: user.updatedAt?.toISOString() || '',
}))
callback(null, {
success: true,
message: 'Success',
users: userInfos,
})
} catch (error) {
callback({
code: grpc.status.INTERNAL,
message: error.message,
})
}
},
// 流式获取用户列表
StreamUsers: async (call) => {
try {
const { batchSize } = call.request
let processed = 0
// 使用游标分批获取
const cursor = fastify.mongo.db
.collection('users')
.find({ status: 1 })
.limit(batchSize)
for await (const user of cursor) {
call.write({
userId: user.userId,
username: user.username,
email: user.email,
})
processed++
if (processed >= batchSize) break
}
call.end()
} catch (error) {
call.destroy(error)
}
},
})
return server
}
gRPC客户端
// rag.api/grpc/clients/userInfo.js
import grpc from '@grpc/grpc-js'
import protoLoader from '@grpc/proto-loader'
import path from 'path'
const PROTO_PATH = path.join(process.cwd(), 'grpc/proto/userInfo.proto')
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
})
const userProto = grpc.loadPackageDefinition(packageDefinition).user
export class UserInfoClient {
constructor(serviceUrl) {
this.client = new userProto.UserInfoService(
serviceUrl,
grpc.credentials.createInsecure()
)
}
// 获取用户信息
async getUserInfo(userId, options = {}) {
return new Promise((resolve, reject) => {
this.client.GetUserInfo({
userId,
includeRoles: options.includeRoles ?? false,
includePermissions: options.includePermissions ?? false,
}, (error, response) => {
if (error) reject(error)
else resolve(response)
})
})
}
// 批量获取用户信息
async getUserInfoBatch(userIds) {
return new Promise((resolve, reject) => {
this.client.GetUserInfoBatch({
userIds,
}, (error, response) => {
if (error) reject(error)
else resolve(response)
})
})
}
// 流式获取用户
streamUsers(batchSize = 100) {
const stream = this.client.StreamUsers({ batchSize })
const users = []
return new Promise((resolve, reject) => {
stream.on('data', (user) => {
users.push(user)
})
stream.on('end', () => {
resolve(users)
})
stream.on('error', (error) => {
reject(error)
})
})
}
// 关闭连接
close() {
this.client.close()
}
}
客户端连接池
// grpc/clients/pool.js
import { UserInfoClient } from './userInfo.js'
import { RagSearchClient } from './ragSearch.js'
class GRPCClientPool {
constructor() {
this.clients = new Map()
this.configs = {
userInfo: {
url: process.env.USERINFO_GRPC_URL || 'localhost:13001',
maxConnections: 10,
},
ragSearch: {
url: process.env.RAG_GRPC_URL || 'localhost:13003',
maxConnections: 10,
},
}
}
// 获取或创建客户端
getUserInfoClient() {
if (!this.clients.has('userInfo')) {
const client = new UserInfoClient(this.configs.userInfo.url)
this.clients.set('userInfo', client)
}
return this.clients.get('userInfo')
}
getRagSearchClient() {
if (!this.clients.has('ragSearch')) {
const client = new RagSearchClient(this.configs.ragSearch.url)
this.clients.set('ragSearch', client)
}
return this.clients.get('ragSearch')
}
// 关闭所有连接
closeAll() {
for (const [name, client] of this.clients) {
try {
client.close()
console.log(`Closed gRPC client: ${name}`)
} catch (error) {
console.error(`Error closing gRPC client ${name}:`, error)
}
}
this.clients.clear()
}
}
// 单例导出
export const grpcPool = new GRPCClientPool()
服务端启动与集成
服务端启动
// llm.api/grpc/server.js
import grpc from '@grpc/grpc-js'
import userInfoServer from './servers/userInfo.js'
import tokenServer from './servers/token.js'
export async function startGRPCServer(fastify) {
const GRPC_PORT = process.env.GRPC_PORT || 13001
const GRPC_ADDRESS = process.env.GRPC_ADDRESS || '0.0.0.0'
// 创建gRPC服务器
const server = new grpc.Server()
// 注册所有服务
const userServer = await userInfoServer(fastify)
const tokenSvcServer = await tokenServer(fastify)
// 绑定端口
server.bindAsync(
`${GRPC_ADDRESS}:${GRPC_PORT}`,
grpc.ServerCredentials.createInsecure(),
(error, port) => {
if (error) {
fastify.log.error(`gRPC server bind failed: ${error}`)
return
}
server.start()
fastify.log.info(`gRPC server started on ${GRPC_ADDRESS}:${port}`)
}
)
// 优雅关闭
process.on('SIGTERM', () => {
console.log('Shutting down gRPC server...')
server.tryShutdown(() => {
console.log('gRPC server shut down')
})
})
return server
}
Fastify集成
// llm.api/app.js
import Fastify from 'fastify'
import { startGRPCServer } from './grpc/server.js'
const app = Fastify({
logger: { level: 'info' },
})
// 注册MongoDB等插件
await app.register(import('@fastify/mongodb'), {
url: process.env.MONGO_URI,
})
// 启动gRPC服务器
await startGRPCServer(app)
// 启动HTTP服务器
await app.listen({
port: process.env.PORT || 12001,
host: process.env.FASTIFY_ADDRESS || '0.0.0.0',
})
调用示例
服务间调用
// rag.api/services/ragService.js
import { grpcPool } from '../grpc/clients/pool.js'
class RagService {
async searchWithUserContext(ragCode, query, userId) {
// 1. 获取用户信息(gRPC调用)
const userInfoClient = grpcPool.getUserInfoClient()
const userInfo = await userInfoClient.getUserInfo(userId, {
includeRoles: true,
})
if (!userInfo.success) {
throw new Error('Failed to get user info')
}
// 2. 检查权限
const hasAccess = await this.checkRagAccess(ragCode, userInfo.user)
if (!hasAccess) {
throw new Error('Access denied')
}
// 3. 执行搜索
const results = await this.search(ragCode, query)
// 4. 记录日志
await this.logSearch(userId, ragCode, query, results.length)
return results
}
async search(ragCode, query, limit = 5) {
// 向量化查询
const queryVector = await this.embedText(query)
// 从Milvus检索
const chunks = await this.chunkDac.searchSimilarChunks(
ragCode,
queryVector,
limit
)
return chunks
}
}
性能优化
连接复用
// 客户端复用连接
const client = grpcPool.getUserInfoClient()
// 多次调用使用同一连接
const user1 = await client.getUserInfo('user-1')
const user2 = await client.getUserInfo('user-2')
const batch = await client.getUserInfoBatch(['user-3', 'user-4'])
批量操作
// 批量获取优于多次单条获取
// ❌ 不推荐
const users = []
for (const id of userIds) {
const user = await client.getUserInfo(id) // N次调用
users.push(user)
}
// ✅ 推荐
const response = await client.getUserInfoBatch(userIds) // 1次调用
const users = response.users
超时控制
// 设置调用超时
const client = new UserInfoService(
serviceUrl,
grpc.credentials.createInsecure(),
{
'grpc.keepalive_time_ms': 10000,
'grpc.keepalive_timeout_ms': 5000,
'grpc.service_config': JSON.stringify({
methodConfig: [{
name: [{}],
timeout: { seconds: 10 },
}],
}),
}
)
错误处理
// 统一错误处理
class GRPCError extends Error {
constructor(code, message) {
super(message)
this.code = code
this.grpcCode = code
}
}
// 错误码映射
const GRPC_STATUS = {
OK: 0,
CANCELLED: 1,
UNKNOWN: 2,
INVALID_ARGUMENT: 3,
DEADLINE_EXCEEDED: 4,
NOT_FOUND: 5,
ALREADY_EXISTS: 6,
PERMISSION_DENIED: 7,
RESOURCE_EXHAUSTED: 8,
FAILED_PRECONDITION: 9,
ABORTED: 10,
OUT_OF_RANGE: 11,
UNIMPLEMENTED: 12,
INTERNAL: 13,
UNAVAILABLE: 14,
DATA_LOSS: 15,
UNAUTHENTICATED: 16,
}
// 服务端错误处理
function handleGRPCError(error) {
if (error.code === grpc.status.NOT_FOUND) {
return { code: 404, message: 'Resource not found' }
}
if (error.code === grpc.status.PERMISSION_DENIED) {
return { code: 403, message: 'Permission denied' }
}
if (error.code === grpc.status.UNAUTHENTICATED) {
return { code: 401, message: 'Unauthorized' }
}
return { code: 500, message: error.message }
}
总结
gRPC服务通信的关键点:
- 协议定义:使用Protobuf定义清晰的服务契约
- 代码生成:自动生成客户端和服务端代码
- 连接管理:复用连接,使用连接池
- 流式通信:支持服务端流、客户端流、双向流
- 批量操作:减少网络往返次数
- 错误处理:统一的错误码和错误处理机制
- 超时控制:设置合理的超时时间
下一篇将介绍插件化架构设计模式。