RAG知识库系统设计与实现

RAG知识库系统设计与实现

概述

RAG (Retrieval-Augmented Generation,检索增强生成) 是当前大模型应用的核心技术之一。本文将介绍如何在微服务平台中设计和实现完整的RAG知识库系统,包括文档处理、向量化存储、语义检索等关键环节。

RAG系统架构

整体流程

RAG系统架构

服务结构

rag.api/
├── app.js
├── routes/
│   ├── rag/root.js           # 知识库管理API
│   ├── material/root.js      # 材料管理API
│   └── search/root.js        # 检索API
├── services/
│   ├── ragService.js         # 知识库服务
│   ├── materialService.js    # 材料服务
│   ├── embeddingService.js   # 向量化服务
│   └── searchService.js      # 检索服务
├── daos/
│   ├── milvus/               # Milvus向量数据库访问
│   │   ├── schema/
│   │   │   └── chunkSchema.js
│   │   └── dac/
│   │       └── chunkDac.js
│   └── mongo/                # MongoDB业务数据
└── grpc/
    └── servers/
        └── ragSearch.js      # gRPC检索服务

核心数据模型

知识库(RAG)模型

// MongoDB Schema - rag.api/daos/core/schema/ragSchema.js
{
  ragCode: "course-ai-101",         // 知识库唯一标识
  title: "人工智能基础课程",         // 知识库标题
  ragType: "course",                // 类型:course/knowledge/project
  description: "人工智能入门课程资料",
  
  // 向量模型配置
  embeddingModel: "bge-large-zh",   // 使用的嵌入模型
  vectorDimension: 1024,            // 向量维度
  
  // 权限控制
  permission: "private",            // private/public/organization
  members: [                        // 成员列表
    { userId: "user-1", role: "owner" },
    { userId: "user-2", role: "editor" }
  ],
  
  // 处理配置
  chunkSize: 500,                   // 分段大小(字符)
  chunkOverlap: 50,                 // 分段重叠
  needTrans: 1,                     // 是否需要翻译
  
  // 统计信息
  materialCount: 10,                // 材料数量
  chunkCount: 150,                  // 分片总数
  totalTokens: 50000,               // 总Token数
  
  createdAt: Date,
  updatedAt: Date
}

材料(Material)模型

// MongoDB Schema - materialSchema.js
{
  materialCode: "mat-uuid",
  ragCode: "course-ai-101",
  
  // 基本信息
  title: "第一章:机器学习概述",
  materialType: "pdf",              // pdf/docx/txt/html
  
  // 文件信息
  fileInfo: {
    originalName: "chapter1.pdf",
    fileSize: 2048000,              // 字节
    mimeType: "application/pdf",
    filePath: "/rag/course-ai-101/chapter1.pdf"
  },
  
  // 处理状态
  status: "completed",              // pending/processing/completed/failed
  processProgress: 100,             // 处理进度
  
  // 内容摘要
  summary: "本章介绍了机器学习的基本概念...",
  keywords: ["机器学习", "监督学习", "无监督学习"],
  
  // 统计
  chunkCount: 15,
  totalTokens: 5000,
  
  createdAt: Date
}

向量存储模型 (Milvus)

// Milvus Collection Schema - chunkSchema.js
{
  // 主键
  ragChunkCode: "chunk-uuid",       // 分片唯一标识
  
  // 关联信息
  ragCode: "course-ai-101",         // 所属知识库
  materialCode: "mat-uuid",         // 所属材料
  
  // 内容信息
  content: "机器学习是人工智能的一个分支...",
  contentEn: "Machine learning is a branch of AI...",  // 翻译内容
  
  // 向量字段 (1024维)
  contentVector: [0.023, -0.156, ...],      // 中文内容向量
  contentEnVector: [0.045, -0.089, ...],    // 英文内容向量
  
  // 位置信息
  chunkIndex: 5,                    // 分段序号
  startPosition: 2500,              // 起始位置
  endPosition: 3000,                // 结束位置
  
  // 元数据
  tokenCount: 450,
  createdAt: Date
}

// Milvus索引配置
{
  fieldName: "contentVector",
  indexType: "IVF_FLAT",            // 索引类型
  metricType: "COSINE",             // 距离度量:余弦相似度
  params: { nlist: 128 }            // 聚类中心数
}

文档处理流程

1. 文档上传与解析

// services/materialService.js
class MaterialService {
  constructor(fastify) {
    this.fastify = fastify
  }

  // 上传材料
  async uploadMaterial(ragCode, file, userId) {
    // 保存原始文件
    const fileInfo = await this.saveFile(file, ragCode)
    
    // 创建材料记录
    const material = {
      materialCode: generateUUID(),
      ragCode,
      title: file.originalname,
      materialType: this.getFileType(file.mimetype),
      fileInfo,
      status: 'pending',
      createdBy: userId,
      createdAt: new Date()
    }
    
    await this.fastify.mongo.db
      .collection('materials')
      .insertOne(material)
    
    // 异步处理文档
    await this.processMaterialAsync(material.materialCode)
    
    return material
  }

  // 异步处理文档
  async processMaterialAsync(materialCode) {
    // 发送到Kafka队列异步处理
    await this.fastify.kafka.producer.send({
      topic: 'rag-material-process',
      messages: [{
        key: materialCode,
        value: JSON.stringify({ materialCode })
      }]
    })
  }
}

2. 文档内容提取 (transform.api)

# transform.api/services/pdf.py
import pypdfium2 as pdfium
from typing import List, Dict

class PDFProcessor:
    def __init__(self):
        self.ocr_service = OCRService()
    
    def extract_text(self, file_path: str) -> Dict:
        """提取PDF文本内容"""
        pdf = pdfium.PdfDocument(file_path)
        
        pages = []
        for page_num in range(len(pdf)):
            page = pdf[page_num]
            
            # 提取文本
            text_page = page.get_textpage()
            text = text_page.get_text_range()
            
            # 提取页面图片(用于OCR)
            images = self.extract_page_images(page)
            
            pages.append({
                'pageNum': page_num + 1,
                'text': text,
                'images': images,
                'charCount': len(text)
            })
        
        return {
            'totalPages': len(pdf),
            'pages': pages,
            'fullText': '\n'.join([p['text'] for p in pages])
        }
    
    def extract_page_images(self, page) -> List[Dict]:
        """提取页面中的图片"""
        # 获取页面渲染为图片
        bitmap = page.render(
            scale=2.0,  # 2倍分辨率
            rotation=0,
            crop=(0, 0, page.get_width(), page.get_height())
        )
        
        # 保存图片并返回路径
        image_path = f"/tmp/page_{page.get_index()}.png"
        bitmap.save_as_png(image_path)
        
        return [{'path': image_path, 'width': bitmap.width, 'height': bitmap.height}]

3. OCR文本识别

# transform.api/services/ocr.py
from xinference_client import Client as XinferenceClient
import openai

class OCRService:
    def __init__(self):
        self.xinference = XinferenceClient("http://xinference-server:9997")
        self.openai = openai.OpenAI()
    
    async def recognize_image(self, image_path: str) -> str:
        """识别图片中的文字"""
        # 使用Xinference OCR模型
        model = self.xinference.get_model("ocr-model")
        result = model.chat({
            'image': image_path,
            'prompt': 'Extract all text from this image'
        })
        
        return result['choices'][0]['message']['content']
    
    async def recognize_with_multimodal(self, image_path: str) -> str:
        """使用多模态模型识别"""
        with open(image_path, 'rb') as f:
            import base64
            image_base64 = base64.b64encode(f.read()).decode()
        
        response = self.openai.chat.completions.create(
            model="gpt-4-vision-preview",
            messages=[{
                "role": "user",
                "content": [
                    {"type": "text", "text": "Extract all text from this image:"},
                    {"type": "image_url", "image_url": f"data:image/png;base64,{image_base64}"}
                ]
            }]
        )
        
        return response.choices[0].message.content

4. 语义分段

# transform.api/services/chunker.py
import spacy
from typing import List, Dict

class SemanticChunker:
    def __init__(self):
        # 加载中文NLP模型
        self.nlp = spacy.load("zh_core_web_sm")
    
    def chunk_text(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[Dict]:
        """语义分段"""
        doc = self.nlp(text)
        
        chunks = []
        current_chunk = []
        current_size = 0
        
        for sent in doc.sents:
            sent_text = sent.text.strip()
            sent_length = len(sent_text)
            
            # 如果当前块加上新句子超过限制,保存当前块
            if current_size + sent_length > chunk_size and current_chunk:
                chunk_text = ' '.join(current_chunk)
                chunks.append({
                    'text': chunk_text,
                    'tokenCount': len(chunk_text),
                    'sentenceCount': len(current_chunk)
                })
                
                # 保留重叠部分
                overlap_text = ' '.join(current_chunk[-2:]) if len(current_chunk) >= 2 else current_chunk[-1]
                current_chunk = [overlap_text, sent_text]
                current_size = len(overlap_text) + sent_length
            else:
                current_chunk.append(sent_text)
                current_size += sent_length
        
        # 处理最后一个块
        if current_chunk:
            chunks.append({
                'text': ' '.join(current_chunk),
                'tokenCount': len(' '.join(current_chunk)),
                'sentenceCount': len(current_chunk)
            })
        
        return chunks

向量化与存储

1. Embedding服务

// services/embeddingService.js
class EmbeddingService {
  constructor(fastify) {
    this.fastify = fastify
    this.embeddingModel = process.env.EMBEDDING_MODEL || 'bge-large-zh'
    this.embeddingEndpoint = process.env.EMBEDDING_ENDPOINT
  }

  // 文本向量化
  async embedText(text, language = 'zh') {
    const response = await fetch(`${this.embeddingEndpoint}/embeddings`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        model: this.embeddingModel,
        input: text,
        language
      })
    })
    
    const result = await response.json()
    return result.data[0].embedding
  }

  // 批量向量化
  async embedTexts(texts, language = 'zh') {
    const response = await fetch(`${this.embeddingEndpoint}/embeddings`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        model: this.embeddingModel,
        input: texts,
        language
      })
    })
    
    const result = await response.json()
    return result.data.map(d => d.embedding)
  }
}

2. Milvus向量存储

// daos/milvus/dac/chunkDac.js
class ChunkDac {
  constructor(milvusClient) {
    this.client = milvusClient
    this.collectionName = 'rag_chunks'
  }

  // 插入向量
  async insertChunks(chunks) {
    const entities = chunks.map(chunk => ({
      ragChunkCode: chunk.ragChunkCode,
      ragCode: chunk.ragCode,
      materialCode: chunk.materialCode,
      content: chunk.content,
      contentEn: chunk.contentEn,
      contentVector: chunk.contentVector,
      contentEnVector: chunk.contentEnVector,
      chunkIndex: chunk.chunkIndex,
      tokenCount: chunk.tokenCount
    }))
    
    const result = await this.client.insert({
      collectionName: this.collectionName,
      data: entities
    })
    
    return result
  }

  // 向量检索
  async searchSimilarChunks(ragCode, queryVector, limit = 5, filters = {}) {
    const searchParams = {
      collectionName: this.collectionName,
      data: [queryVector],
      annsField: 'contentVector',
      searchParams: {
        metricType: 'COSINE',
        params: { nprobe: 128 }
      },
      limit,
      outputFields: ['ragChunkCode', 'ragCode', 'materialCode', 'content', 'contentEn'],
      expr: `ragCode == "${ragCode}"`  // 过滤条件
    }
    
    const results = await this.client.search(searchParams)
    
    return results.map(result => ({
      ragChunkCode: result.entity.ragChunkCode,
      ragCode: result.entity.ragCode,
      materialCode: result.entity.materialCode,
      content: result.entity.content,
      contentEn: result.entity.contentEn,
      score: result.score,  // 相似度分数
      distance: result.distance
    }))
  }
}

检索服务

1. 检索API

// routes/search/root.js
export default async function searchRoutes(fastify, opts) {
  
  // 知识库检索
  fastify.post('/rag', async (request, reply) => {
    const { ragCode, query, limit = 5 } = request.body
    
    // 权限检查
    await fastify.ragService.checkPermission(ragCode, request.user.userId)
    
    // 执行检索
    const results = await fastify.searchService.search(ragCode, query, limit)
    
    return {
      success: true,
      data: results
    }
  })
}

// services/searchService.js
class SearchService {
  constructor(fastify) {
    this.fastify = fastify
  }

  async search(ragCode, query, limit = 5) {
    // 1. 向量化查询
    const queryVector = await this.fastify.embeddingService.embedText(query)
    
    // 2. 向量检索
    const chunks = await this.fastify.chunkDac.searchSimilarChunks(
      ragCode,
      queryVector,
      limit * 2  // 检索更多用于重排序
    )
    
    // 3. 重排序 (可选)
    const reranked = await this.rerank(query, chunks, limit)
    
    // 4. 获取材料信息
    const results = await this.enrichResults(reranked)
    
    return results
  }

  // 重排序
  async rerank(query, chunks, limit) {
    // 使用交叉编码器进行重排序
    const scores = await Promise.all(
      chunks.map(async chunk => {
        const score = await this.calculateRelevance(query, chunk.content)
        return { ...chunk, relevanceScore: score }
      })
    )
    
    // 按相关性排序
    scores.sort((a, b) => b.relevanceScore - a.relevanceScore)
    
    return scores.slice(0, limit)
  }
}

2. gRPC检索服务

// grpc/servers/ragSearch.js
import grpc from '@grpc/grpc-js'

const ragSearchProto = `
syntax = "proto3";

service RagSearchService {
  rpc SearchRag(SearchRequest) returns (SearchResponse);
}

message SearchRequest {
  string ragCode = 1;
  string query = 2;
  int32 limit = 3;
}

message SearchResult {
  string ragChunkCode = 1;
  string content = 2;
  float score = 3;
}

message SearchResponse {
  repeated SearchResult results = 1;
  int32 total = 2;
}
`

export default async function ragSearchServer(fastify) {
  const server = new grpc.Server()
  
  server.addService(RagSearchService.service, {
    searchRag: async (call, callback) => {
      try {
        const { ragCode, query, limit } = call.request
        
        const results = await fastify.searchService.search(
          ragCode,
          query,
          limit
        )
        
        callback(null, {
          results: results.map(r => ({
            ragChunkCode: r.ragChunkCode,
            content: r.content,
            score: r.score
          })),
          total: results.length
        })
      } catch (error) {
        callback(error)
      }
    }
  })
  
  return server
}

RAG与LLM集成

完整RAG流程

// 在llm.api中集成RAG
class LLMService {
  async chatWithRAG(channelCode, message, ragCodes = []) {
    // 1. 检索相关上下文
    let contexts = []
    for (const ragCode of ragCodes) {
      const results = await this.fastify.grpcClients.ragSearch.search({
        ragCode,
        query: message,
        limit: 3
      })
      contexts.push(...results.results)
    }
    
    // 2. 构建增强提示
    const contextText = contexts
      .map((ctx, i) => `[${i + 1}] ${ctx.content}`)
      .join('\n\n')
    
    const augmentedPrompt = `基于以下参考资料回答问题:

${contextText}

用户问题:${message}

请基于参考资料回答,如果参考资料中没有相关信息,请明确说明。`
    
    // 3. 调用LLM生成回答
    const response = await this.fastify.openai.chat.completions.create({
      model: 'gpt-4',
      messages: [
        { role: 'system', content: '你是一个基于知识库回答问题的助手。' },
        { role: 'user', content: augmentedPrompt }
      ],
      stream: true
    })
    
    return {
      stream: response,
      contexts: contexts.map(c => ({
        content: c.content,
        score: c.score,
        source: c.ragChunkCode
      }))
    }
  }
}

性能优化

1. 索引优化

// Milvus索引配置优化
const indexParams = {
  collectionName: 'rag_chunks',
  fieldName: 'contentVector',
  indexType: 'IVF_FLAT',  // 或 HNSW 用于更高性能
  metricType: 'COSINE',
  params: {
    nlist: 128,  // 聚类中心数,根据数据量调整
    nprobe: 32   // 查询时搜索的聚类数
  }
}

// HNSW配置(更高召回率)
const hnswParams = {
  indexType: 'HNSW',
  params: {
    M: 16,        // 图的最大度数
    efConstruction: 200,  // 构建时的搜索范围
    ef: 128       // 查询时的搜索范围
  }
}

2. 缓存策略

// 缓存查询结果
async searchWithCache(ragCode, query, limit) {
  const cacheKey = `rag:search:${hashString(ragCode + query)}`
  const cached = await this.fastify.redis.get(cacheKey)
  
  if (cached) {
    return JSON.parse(cached)
  }
  
  const results = await this.search(ragCode, query, limit)
  
  // 缓存5分钟
  await this.fastify.redis.setex(cacheKey, 300, JSON.stringify(results))
  
  return results
}

总结

RAG知识库系统的设计要点:

  1. 文档处理链路:PDF解析 → OCR识别 → 语义分段
  2. 向量化存储:选择合适的Embedding模型和向量数据库
  3. 检索优化:向量检索 + 重排序提升准确性
  4. 权限控制:知识库级别的访问控制
  5. 性能优化:索引、缓存、批量处理

下一篇将介绍微前端架构的设计与实践。

阅读更多

Skills系统:可扩展AI能力设计

Skills系统:可扩展AI能力设计

概述 Skills系统是AI-Native架构中的重要组件,它允许通过声明式配置扩展AI的能力。本文将介绍Skills系统的设计与实现,让大模型能够像人类专家一样具备特定领域的能力。 什么是Skills系统 概念 Skills(技能)是一种声明式的AI能力扩展机制,类似于人类的"专业技能": 通用AI助手 专业AI助手(带Skills) ┌──────────────────────┐ ┌──────────────────────────────┐ │ │ │ │ │ 用户:请帮我写代码 │ │ 用户:请帮我审查这段代码 │ │ │ │ │ │ AI:我是一个AI助手 │ │ AI:[激活

By 菱角
插件化架构设计模式

插件化架构设计模式

概述 插件化架构是一种将核心功能与扩展功能分离的设计模式,允许系统在运行时动态加载和卸载功能模块。本文将介绍如何在微服务平台中设计和实现插件化架构。 为什么需要插件化 插件化优势 1. 模块化:功能独立,边界清晰 2. 可扩展:按需加载,动态增删 3. 隔离性:插件间互不干扰 4. 可维护:独立开发、测试、部署 5. 可定制:用户按需选择功能 核心设计 架构概览 核心组件实现 1. 插件接口定义 // core/plugin.interface.ts // 插件接口 export interface IPlugin { // 插件名称 readonly name: string // 插件版本 readonly version: string // 插件配置 getConfig(): PluginConfig // 插件清单

By 菱角
gRPC服务通信设计与实践

gRPC服务通信设计与实践

概述 在微服务架构中,服务间通信是关键环节。相比REST API,gRPC提供了更高的性能和更强的类型安全。本文将介绍如何在微服务平台中设计和实现gRPC服务通信。 为什么选择gRPC gRPC vs REST对比 特性 gRPC REST 协议 HTTP/2 HTTP/1.1 序列化 Protocol Buffers (二进制) JSON (文本) 性能 高(二进制+压缩) 中(文本开销) 类型安全 强(代码生成) 弱(运行时检查) 流式通信 原生支持(双向流) 需额外实现(SSE/WebSocket) 代码生成 自动生成 手动编写 浏览器支持 需gRPC-Web 原生支持 调试难度

By 菱角
多语言微服务架构:Node.js与Python协作

多语言微服务架构:Node.js与Python协作

概述 在微服务架构中,根据场景选择最适合的编程语言是最佳实践。本文将介绍如何在微服务平台中实现Node.js与Python的协作,发挥各自技术优势。 技术选型策略 为什么混合使用 服务划分 Node.js服务(7个) 服务 功能 选择Node.js的原因 llm.api 大模型服务 高并发SSE流式响应 ucenter.api 用户中心 RESTful API标准实践 doc.api 文件服务 流式上传下载处理 resource.api 资源管理 gRPC高性能通信 rag.api 知识库服务 MongoDB集成便利 statistic.api 统计分析 事件驱动架构 pptonline.api PPT服务 与前端技术栈统一 Python服务(1个) 服务 功能 选择Python的原因

By 菱角