Naive RAG实现指南¶
本文档面向熟悉RAG基本流程的开发者。如果您对RAG概念不了解,请先阅读 RAG - 检索增强生成技术介绍。
为什么选择SAGE构建RAG?¶
SAGE流处理框架为RAG系统提供了强大的技术基础:
- 统一的内存服务:提供高效的向量存储和检索
- 流式处理能力:支持实时数据摄入和查询处理
- 模块化设计:可灵活组合不同的处理算子
- 服务化架构:便于系统集成和扩展
Naive RAG Pipeline示例¶
我们将通过 SAGE Lib 中预定义的算子向您展示一个完整的 Naive RAG 查询处理流水线的创建:
from sage.core.api.local_environment import LocalEnvironment
from sage.service.memory.memory_service import MemoryService
from sage.lib.io.source import QuerySource
from sage.lib.rag.retriever import DenseRetriever
from sage.lib.rag.promptor import QAPromptor
from sage.lib.rag.generator import OpenAIGenerator
from sage.lib.io.sink import TerminalSink
def naive_rag_example():
env = LocalEnvironment()
# 请求使用知识库服务
env.register_service("memory_service", MemoryService())
# 创建 Naive RAG Pipeline
env.from_source(QuerySource).map(DenseRetriever).map(QAPromptor).map(OpenAIGenerator).sink(AnswerSink)
env.submit()
在完成了基础的 Naive RAG Pipeline 构建后,接下来我们需要准备离线知识库,以便为查询提供可检索的信息支持。以下是知识库构建的关键步骤与示例代码:
离线知识库构建¶
1. 基础知识库创建¶
使用SAGE Memory Service创建知识库:
from sage.service.memory.memory_service import MemoryService
from sage.utils.embedding_methods.embedding_api import apply_embedding_model
def create_knowledge_base():
# 初始化服务
embedding_model = apply_embedding_model("default")
memory_service = MemoryService()
# 创建知识库集合
collection_result = memory_service.create_collection(
name="rag_knowledge",
backend_type="VDB",
description="RAG知识库",
embedding_model=embedding_model,
dim=embedding_model.get_dim()
)
# 批量插入知识
knowledge_data = [
"机器学习是人工智能的一个分支,通过算法让计算机从数据中自动学习模式。",
"深度学习使用多层神经网络来模拟人脑的学习过程,特别擅长处理图像和语音。",
# ... 更多知识条目
]
for i, text in enumerate(knowledge_data):
memory_service.insert_data(
collection_name="rag_knowledge",
text=text,
metadata={
"id": i + 1,
"topic": "AI/ML",
"type": "knowledge",
"source": "manual"
}
)
# 创建检索索引
memory_service.create_index(
collection_name="rag_knowledge",
index_name="knowledge_index",
description="知识检索索引"
)
# 持久化存储
memory_service.store()
print("✅ 知识库构建完成")
2. 运行示例¶
# 构建知识库
python app/memory_app/mem_offline_write.py
# 测试检索效果
python app/memory_app/mem_offline_write_test.py
流式RAG查询处理¶
1. 实时查询处理流水线¶
SAGE支持构建流式RAG系统,可以实时处理用户查询:
class StreamingRAGSource(BatchFunction):
"""模拟实时查询流"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.test_queries = [
"什么是深度学习?",
"监督学习是怎么工作的?",
"Transformer有什么特点?",
"强化学习的核心机制是什么?"
]
self.counter = 0
def execute(self):
if self.counter >= len(self.test_queries):
return None
query_data = {
"query": self.test_queries[self.counter],
"query_id": self.counter + 1,
"timestamp": time.time()
}
self.counter += 1
return query_data
class KnowledgeRetriever(MapFunction):
"""知识检索算子"""
def execute(self, data):
result = self.call_service["memory_service"].retrieve_data(
collection_name="rag_knowledge",
query_text=data['query'],
topk=2,
index_name="knowledge_index",
with_metadata=True
)
if result['status'] == 'success':
data['retrieved_knowledge'] = result['results']
else:
data['retrieved_knowledge'] = []
return data
class RAGResultSink(SinkFunction):
"""结果展示算子"""
def execute(self, data):
print(f"\n🔍 查询 {data['query_id']}: {data['query']}")
if data['retrieved_knowledge']:
print(f"📖 检索到 {len(data['retrieved_knowledge'])} 条相关知识:")
for i, knowledge in enumerate(data['retrieved_knowledge'], 1):
print(f" {i}. {knowledge.get('text', 'N/A')}")
else:
print("❌ 未检索到相关知识")
return data
2. 创建流式处理环境¶
def create_streaming_rag():
env = LocalEnvironment()
# 注册Memory Service
def memory_service_factory():
memory_service = MemoryService()
# 连接现有知识库
return memory_service
env.register_service("memory_service", memory_service_factory)
# 构建流水线
env.from_source(StreamingRAGSource).map(KnowledgeRetriever).sink(RAGResultSink)
env.submit()
系统特性¶
内存服务优势¶
- 高效向量存储:支持大规模向量数据存储
- 快速相似度检索:毫秒级检索响应
- 元数据过滤:支持基于条件的精确检索
- 持久化存储:数据可靠持久化到磁盘
流处理能力¶
- 实时处理:支持实时查询流处理
- 弹性扩展:可根据负载动态调整
- 故障恢复:提供完善的错误处理机制
- 监控观测:全链路处理状态可观测
扩展方向¶
- Prompt工程:优化上下文组装和prompt模板
- LLM集成:接入各种大语言模型API
- 结果优化:添加答案质量评估和优化
- 多模态支持:扩展到图像、音频等多模态检索
快速开始¶
- 克隆项目并安装依赖
- 运行
python app/memory_app/mem_offline_write.py
构建知识库 - 运行
python app/memory_app/mem_offline_write_test.py
测试检索效果 - 根据需要扩展prompt和generation功能
通过SAGE构建的RAG系统具备生产级的稳定性和扩展性,是企业级RAG应用的理想选择。