gRPC服务通信设计与实践

gRPC服务通信设计与实践

概述

在微服务架构中,服务间通信是关键环节。相比REST API,gRPC提供了更高的性能和更强的类型安全。本文将介绍如何在微服务平台中设计和实现gRPC服务通信。

为什么选择gRPC

gRPC vs REST对比

特性 gRPC REST
协议 HTTP/2 HTTP/1.1
序列化 Protocol Buffers (二进制) JSON (文本)
性能 高(二进制+压缩) 中(文本开销)
类型安全 强(代码生成) 弱(运行时检查)
流式通信 原生支持(双向流) 需额外实现(SSE/WebSocket)
代码生成 自动生成 手动编写
浏览器支持 需gRPC-Web 原生支持
调试难度 较难 容易

适用场景

  • 内部服务通信:微服务间高性能调用
  • 流式数据处理:日志、事件流
  • 多语言服务:统一协议跨语言通信
  • 低延迟要求:高频调用场景

gRPC架构设计

服务通信拓扑

gRPC服务通信架构

Protocol Buffers定义

用户信息服务

// grpc/proto/userInfo.proto
syntax = "proto3";

package user;

option go_package = "github.com/org/micro/proto/user";

// 用户信息服务
service UserInfoService {
  // 获取用户信息
  rpc GetUserInfo(UserInfoRequest) returns (UserInfoResponse);
  
  // 批量获取用户信息
  rpc GetUserInfoBatch(UserInfoBatchRequest) returns (UserInfoBatchResponse);
  
  // 更新用户信息
  rpc UpdateUserInfo(UpdateUserInfoRequest) returns (UserInfoResponse);
  
  // 搜索用户
  rpc SearchUsers(SearchUsersRequest) returns (SearchUsersResponse);
  
  // 流式获取用户列表(服务端流)
  rpc StreamUsers(StreamUsersRequest) returns (stream UserInfo);
}

// 用户基础信息
message UserInfo {
  string userId = 1;
  string username = 2;
  string email = 3;
  string phone = 4;
  string avatar = 5;
  string realName = 6;
  int32 status = 7;
  string createdAt = 8;
  string updatedAt = 9;
  repeated string roles = 10;
  repeated string permissions = 11;
}

// 请求:获取用户信息
message UserInfoRequest {
  string userId = 1;
  bool includeRoles = 2;
  bool includePermissions = 3;
}

// 响应:用户信息
message UserInfoResponse {
  bool success = 1;
  string message = 2;
  UserInfo user = 3;
}

// 请求:批量获取
message UserInfoBatchRequest {
  repeated string userIds = 1;
}

// 响应:批量用户信息
message UserInfoBatchResponse {
  bool success = 1;
  string message = 2;
  repeated UserInfo users = 3;
}

// 请求:更新用户信息
message UpdateUserInfoRequest {
  string userId = 1;
  string username = 2;
  string email = 3;
  string phone = 4;
  string avatar = 5;
  string realName = 6;
}

// 请求:搜索用户
message SearchUsersRequest {
  string keyword = 1;
  int32 page = 2;
  int32 pageSize = 3;
}

// 响应:搜索用户
message SearchUsersResponse {
  bool success = 1;
  string message = 2;
  repeated UserInfo users = 3;
  int32 total = 4;
}

// 请求:流式获取
message StreamUsersRequest {
  int32 batchSize = 1;
}

RAG知识库搜索服务

// grpc/proto/ragSearch.proto
syntax = "proto3";

package rag;

// RAG搜索服务
service RagSearchService {
  // 语义搜索
  rpc Search(RagSearchRequest) returns (RagSearchResponse);
  
  // 获取分片详情
  rpc GetChunk(GetChunkRequest) returns (ChunkResponse);
  
  // 批量获取分片
  rpc GetChunksBatch(GetChunksBatchRequest) returns (ChunksBatchResponse);
  
  // 流式搜索(服务端流)
  rpc StreamSearch(StreamSearchRequest) returns (stream SearchResult);
}

// 搜索请求
message RagSearchRequest {
  string ragCode = 1;
  string query = 2;
  int32 limit = 3;
  float minScore = 4;
  repeated string filters = 5;
}

// 搜索结果
message SearchResult {
  string ragChunkCode = 1;
  string ragCode = 2;
  string materialCode = 3;
  string content = 4;
  float score = 5;
  int32 chunkIndex = 6;
}

// 搜索响应
message RagSearchResponse {
  bool success = 1;
  string message = 2;
  repeated SearchResult results = 3;
  int32 total = 4;
  string query = 5;
}

// 分片详情请求
message GetChunkRequest {
  string ragChunkCode = 1;
}

// 分片详情响应
message ChunkResponse {
  bool success = 1;
  string ragChunkCode = 2;
  string content = 3;
  string materialCode = 4;
  int32 tokenCount = 5;
}

// 批量获取请求
message GetChunksBatchRequest {
  repeated string ragChunkCodes = 1;
}

// 批量获取响应
message ChunksBatchResponse {
  bool success = 1;
  repeated ChunkResponse chunks = 2;
}

// 流式搜索请求
message StreamSearchRequest {
  string ragCode = 1;
  string query = 2;
  int32 limit = 3;
}

Node.js gRPC实现

gRPC服务端

// llm.api/grpc/servers/userInfo.js
import grpc from '@grpc/grpc-js'
import protoLoader from '@grpc/proto-loader'
import path from 'path'

const PROTO_PATH = path.join(process.cwd(), 'grpc/proto/userInfo.proto')

const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true,
})

const userProto = grpc.loadPackageDefinition(packageDefinition).user

export default async function userInfoServer(fastify) {
  const server = new grpc.Server()

  // 实现UserInfoService
  server.addService(userProto.UserInfoService.service, {
    // 获取用户信息
    GetUserInfo: async (call, callback) => {
      try {
        const { userId, includeRoles, includePermissions } = call.request
        
        // 从MongoDB查询用户
        const user = await fastify.mongo.db
          .collection('users')
          .findOne({ userId })
        
        if (!user) {
          return callback({
            code: grpc.status.NOT_FOUND,
            message: 'User not found',
          })
        }

        // 构建响应
        const userInfo = {
          userId: user.userId,
          username: user.username,
          email: user.email,
          phone: user.phone || '',
          avatar: user.avatar || '',
          realName: user.realName || '',
          status: user.status,
          createdAt: user.createdAt?.toISOString() || '',
          updatedAt: user.updatedAt?.toISOString() || '',
        }

        // 获取角色和权限
        if (includeRoles || includePermissions) {
          const userRoles = await fastify.mongo.db
            .collection('userRoles')
            .find({ userId })
            .toArray()
          
          if (includeRoles) {
            userInfo.roles = userRoles.map(ur => ur.roleCode)
          }
          
          if (includePermissions) {
            const permissions = await fastify.mongo.db
              .collection('rolePermissions')
              .find({ roleCode: { $in: userRoles.map(ur => ur.roleCode) } })
              .toArray()
            userInfo.permissions = permissions.map(p => p.permissionCode)
          }
        }

        callback(null, {
          success: true,
          message: 'Success',
          user: userInfo,
        })
      } catch (error) {
        callback({
          code: grpc.status.INTERNAL,
          message: error.message,
        })
      }
    },

    // 批量获取用户信息
    GetUserInfoBatch: async (call, callback) => {
      try {
        const { userIds } = call.request
        
        const users = await fastify.mongo.db
          .collection('users')
          .find({ userId: { $in: userIds } })
          .toArray()

        const userInfos = users.map(user => ({
          userId: user.userId,
          username: user.username,
          email: user.email,
          phone: user.phone || '',
          avatar: user.avatar || '',
          realName: user.realName || '',
          status: user.status,
          createdAt: user.createdAt?.toISOString() || '',
          updatedAt: user.updatedAt?.toISOString() || '',
        }))

        callback(null, {
          success: true,
          message: 'Success',
          users: userInfos,
        })
      } catch (error) {
        callback({
          code: grpc.status.INTERNAL,
          message: error.message,
        })
      }
    },

    // 流式获取用户列表
    StreamUsers: async (call) => {
      try {
        const { batchSize } = call.request
        let processed = 0
        
        // 使用游标分批获取
        const cursor = fastify.mongo.db
          .collection('users')
          .find({ status: 1 })
          .limit(batchSize)

        for await (const user of cursor) {
          call.write({
            userId: user.userId,
            username: user.username,
            email: user.email,
          })
          
          processed++
          if (processed >= batchSize) break
        }

        call.end()
      } catch (error) {
        call.destroy(error)
      }
    },
  })

  return server
}

gRPC客户端

// rag.api/grpc/clients/userInfo.js
import grpc from '@grpc/grpc-js'
import protoLoader from '@grpc/proto-loader'
import path from 'path'

const PROTO_PATH = path.join(process.cwd(), 'grpc/proto/userInfo.proto')

const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true,
})

const userProto = grpc.loadPackageDefinition(packageDefinition).user

export class UserInfoClient {
  constructor(serviceUrl) {
    this.client = new userProto.UserInfoService(
      serviceUrl,
      grpc.credentials.createInsecure()
    )
  }

  // 获取用户信息
  async getUserInfo(userId, options = {}) {
    return new Promise((resolve, reject) => {
      this.client.GetUserInfo({
        userId,
        includeRoles: options.includeRoles ?? false,
        includePermissions: options.includePermissions ?? false,
      }, (error, response) => {
        if (error) reject(error)
        else resolve(response)
      })
    })
  }

  // 批量获取用户信息
  async getUserInfoBatch(userIds) {
    return new Promise((resolve, reject) => {
      this.client.GetUserInfoBatch({
        userIds,
      }, (error, response) => {
        if (error) reject(error)
        else resolve(response)
      })
    })
  }

  // 流式获取用户
  streamUsers(batchSize = 100) {
    const stream = this.client.StreamUsers({ batchSize })
    const users = []

    return new Promise((resolve, reject) => {
      stream.on('data', (user) => {
        users.push(user)
      })

      stream.on('end', () => {
        resolve(users)
      })

      stream.on('error', (error) => {
        reject(error)
      })
    })
  }

  // 关闭连接
  close() {
    this.client.close()
  }
}

客户端连接池

// grpc/clients/pool.js
import { UserInfoClient } from './userInfo.js'
import { RagSearchClient } from './ragSearch.js'

class GRPCClientPool {
  constructor() {
    this.clients = new Map()
    this.configs = {
      userInfo: {
        url: process.env.USERINFO_GRPC_URL || 'localhost:13001',
        maxConnections: 10,
      },
      ragSearch: {
        url: process.env.RAG_GRPC_URL || 'localhost:13003',
        maxConnections: 10,
      },
    }
  }

  // 获取或创建客户端
  getUserInfoClient() {
    if (!this.clients.has('userInfo')) {
      const client = new UserInfoClient(this.configs.userInfo.url)
      this.clients.set('userInfo', client)
    }
    return this.clients.get('userInfo')
  }

  getRagSearchClient() {
    if (!this.clients.has('ragSearch')) {
      const client = new RagSearchClient(this.configs.ragSearch.url)
      this.clients.set('ragSearch', client)
    }
    return this.clients.get('ragSearch')
  }

  // 关闭所有连接
  closeAll() {
    for (const [name, client] of this.clients) {
      try {
        client.close()
        console.log(`Closed gRPC client: ${name}`)
      } catch (error) {
        console.error(`Error closing gRPC client ${name}:`, error)
      }
    }
    this.clients.clear()
  }
}

// 单例导出
export const grpcPool = new GRPCClientPool()

服务端启动与集成

服务端启动

// llm.api/grpc/server.js
import grpc from '@grpc/grpc-js'
import userInfoServer from './servers/userInfo.js'
import tokenServer from './servers/token.js'

export async function startGRPCServer(fastify) {
  const GRPC_PORT = process.env.GRPC_PORT || 13001
  const GRPC_ADDRESS = process.env.GRPC_ADDRESS || '0.0.0.0'

  // 创建gRPC服务器
  const server = new grpc.Server()

  // 注册所有服务
  const userServer = await userInfoServer(fastify)
  const tokenSvcServer = await tokenServer(fastify)

  // 绑定端口
  server.bindAsync(
    `${GRPC_ADDRESS}:${GRPC_PORT}`,
    grpc.ServerCredentials.createInsecure(),
    (error, port) => {
      if (error) {
        fastify.log.error(`gRPC server bind failed: ${error}`)
        return
      }
      
      server.start()
      fastify.log.info(`gRPC server started on ${GRPC_ADDRESS}:${port}`)
    }
  )

  // 优雅关闭
  process.on('SIGTERM', () => {
    console.log('Shutting down gRPC server...')
    server.tryShutdown(() => {
      console.log('gRPC server shut down')
    })
  })

  return server
}

Fastify集成

// llm.api/app.js
import Fastify from 'fastify'
import { startGRPCServer } from './grpc/server.js'

const app = Fastify({
  logger: { level: 'info' },
})

// 注册MongoDB等插件
await app.register(import('@fastify/mongodb'), {
  url: process.env.MONGO_URI,
})

// 启动gRPC服务器
await startGRPCServer(app)

// 启动HTTP服务器
await app.listen({
  port: process.env.PORT || 12001,
  host: process.env.FASTIFY_ADDRESS || '0.0.0.0',
})

调用示例

服务间调用

// rag.api/services/ragService.js
import { grpcPool } from '../grpc/clients/pool.js'

class RagService {
  async searchWithUserContext(ragCode, query, userId) {
    // 1. 获取用户信息(gRPC调用)
    const userInfoClient = grpcPool.getUserInfoClient()
    const userInfo = await userInfoClient.getUserInfo(userId, {
      includeRoles: true,
    })

    if (!userInfo.success) {
      throw new Error('Failed to get user info')
    }

    // 2. 检查权限
    const hasAccess = await this.checkRagAccess(ragCode, userInfo.user)
    if (!hasAccess) {
      throw new Error('Access denied')
    }

    // 3. 执行搜索
    const results = await this.search(ragCode, query)

    // 4. 记录日志
    await this.logSearch(userId, ragCode, query, results.length)

    return results
  }

  async search(ragCode, query, limit = 5) {
    // 向量化查询
    const queryVector = await this.embedText(query)

    // 从Milvus检索
    const chunks = await this.chunkDac.searchSimilarChunks(
      ragCode,
      queryVector,
      limit
    )

    return chunks
  }
}

性能优化

连接复用

// 客户端复用连接
const client = grpcPool.getUserInfoClient()

// 多次调用使用同一连接
const user1 = await client.getUserInfo('user-1')
const user2 = await client.getUserInfo('user-2')
const batch = await client.getUserInfoBatch(['user-3', 'user-4'])

批量操作

// 批量获取优于多次单条获取
// ❌ 不推荐
const users = []
for (const id of userIds) {
  const user = await client.getUserInfo(id)  // N次调用
  users.push(user)
}

// ✅ 推荐
const response = await client.getUserInfoBatch(userIds)  // 1次调用
const users = response.users

超时控制

// 设置调用超时
const client = new UserInfoService(
  serviceUrl,
  grpc.credentials.createInsecure(),
  {
    'grpc.keepalive_time_ms': 10000,
    'grpc.keepalive_timeout_ms': 5000,
    'grpc.service_config': JSON.stringify({
      methodConfig: [{
        name: [{}],
        timeout: { seconds: 10 },
      }],
    }),
  }
)

错误处理

// 统一错误处理
class GRPCError extends Error {
  constructor(code, message) {
    super(message)
    this.code = code
    this.grpcCode = code
  }
}

// 错误码映射
const GRPC_STATUS = {
  OK: 0,
  CANCELLED: 1,
  UNKNOWN: 2,
  INVALID_ARGUMENT: 3,
  DEADLINE_EXCEEDED: 4,
  NOT_FOUND: 5,
  ALREADY_EXISTS: 6,
  PERMISSION_DENIED: 7,
  RESOURCE_EXHAUSTED: 8,
  FAILED_PRECONDITION: 9,
  ABORTED: 10,
  OUT_OF_RANGE: 11,
  UNIMPLEMENTED: 12,
  INTERNAL: 13,
  UNAVAILABLE: 14,
  DATA_LOSS: 15,
  UNAUTHENTICATED: 16,
}

// 服务端错误处理
function handleGRPCError(error) {
  if (error.code === grpc.status.NOT_FOUND) {
    return { code: 404, message: 'Resource not found' }
  }
  if (error.code === grpc.status.PERMISSION_DENIED) {
    return { code: 403, message: 'Permission denied' }
  }
  if (error.code === grpc.status.UNAUTHENTICATED) {
    return { code: 401, message: 'Unauthorized' }
  }
  return { code: 500, message: error.message }
}

总结

gRPC服务通信的关键点:

  1. 协议定义:使用Protobuf定义清晰的服务契约
  2. 代码生成:自动生成客户端和服务端代码
  3. 连接管理:复用连接,使用连接池
  4. 流式通信:支持服务端流、客户端流、双向流
  5. 批量操作:减少网络往返次数
  6. 错误处理:统一的错误码和错误处理机制
  7. 超时控制:设置合理的超时时间

下一篇将介绍插件化架构设计模式。

阅读更多

Skills系统:可扩展AI能力设计

Skills系统:可扩展AI能力设计

概述 Skills系统是AI-Native架构中的重要组件,它允许通过声明式配置扩展AI的能力。本文将介绍Skills系统的设计与实现,让大模型能够像人类专家一样具备特定领域的能力。 什么是Skills系统 概念 Skills(技能)是一种声明式的AI能力扩展机制,类似于人类的"专业技能": 通用AI助手 专业AI助手(带Skills) ┌──────────────────────┐ ┌──────────────────────────────┐ │ │ │ │ │ 用户:请帮我写代码 │ │ 用户:请帮我审查这段代码 │ │ │ │ │ │ AI:我是一个AI助手 │ │ AI:[激活

By 菱角
插件化架构设计模式

插件化架构设计模式

概述 插件化架构是一种将核心功能与扩展功能分离的设计模式,允许系统在运行时动态加载和卸载功能模块。本文将介绍如何在微服务平台中设计和实现插件化架构。 为什么需要插件化 插件化优势 1. 模块化:功能独立,边界清晰 2. 可扩展:按需加载,动态增删 3. 隔离性:插件间互不干扰 4. 可维护:独立开发、测试、部署 5. 可定制:用户按需选择功能 核心设计 架构概览 核心组件实现 1. 插件接口定义 // core/plugin.interface.ts // 插件接口 export interface IPlugin { // 插件名称 readonly name: string // 插件版本 readonly version: string // 插件配置 getConfig(): PluginConfig // 插件清单

By 菱角
多语言微服务架构:Node.js与Python协作

多语言微服务架构:Node.js与Python协作

概述 在微服务架构中,根据场景选择最适合的编程语言是最佳实践。本文将介绍如何在微服务平台中实现Node.js与Python的协作,发挥各自技术优势。 技术选型策略 为什么混合使用 服务划分 Node.js服务(7个) 服务 功能 选择Node.js的原因 llm.api 大模型服务 高并发SSE流式响应 ucenter.api 用户中心 RESTful API标准实践 doc.api 文件服务 流式上传下载处理 resource.api 资源管理 gRPC高性能通信 rag.api 知识库服务 MongoDB集成便利 statistic.api 统计分析 事件驱动架构 pptonline.api PPT服务 与前端技术栈统一 Python服务(1个) 服务 功能 选择Python的原因

By 菱角
CI/CD流水线:Jenkins + APISIX微服务部署实践

CI/CD流水线:Jenkins + APISIX微服务部署实践

概述 微服务架构的CI/CD流水线设计是DevOps实践的核心。本文将介绍如何使用Jenkins Pipeline、Docker和APISIX构建完整的微服务持续集成与持续部署系统。 架构概览 CI/CD流程 Jenkins Pipeline设计 流水线流程 ┌─────────────────────────────────────────────────────────────────────────────┐ │ Jenkins Pipeline 标准流程 │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌─────────

By 菱角