SAGE - Streaming-Augmented Generative Execution¶
用于构建透明 LLM 系统的声明式、可组合框架
[](ht# 开发者安装
./quickstart.sh --dev --yes
核心运行时安装¶
./quickstart.sh --core --yes
标准安装 + vLLM 支持¶
./quickstart.sh --standard --vllm --yes
使用系统 Python 而非 conda¶
./quickstart.sh --core --pip --yes
查看所有标志¶
./quickstart.sh --help
```m/intellistream/SAGE/actions/workflows/ci.yml)
SAGE 是一个用于构建 AI 驱动数据处理流水线的高性能流处理框架。通过声明式数据流抽象,将复杂的 LLM 推理工作流转换为透明、可扩展且易于维护的系统。
为什么选择 SAGE?¶
生产就绪: 为企业级应用构建,提供开箱即用的分布式处理、容错机制和全面的监控功能。
开发体验: 使用直观的声明式 API,只需几行代码即可编写复杂的 AI 流水线,消除样板代码。
性能优化: 针对高吞吐量流式工作负载优化,具备智能内存管理和并行执行能力。
透明可观测: 内置可观测性和调试工具,提供执行路径和性能特征的完整可见性。
快速开始¶
将传统的命令式 LLM 应用转换为灵活、可观测的工作流。传统方法创建的系统脆弱且难以修改:
# 传统方法 - 僵化且难以修改
def traditional_rag(query):
docs = retriever.retrieve(query)
if len(docs) < 3:
docs = fallback_retriever.retrieve(query)
prompt = build_prompt(query, docs)
response = llm.generate(prompt)
return response
SAGE 将其转换为声明式、可组合的工作流:
from sage.core.api.local_environment import LocalEnvironment
from sage.libs.io.source import FileSource
from sage.libs.rag.retriever import DenseRetriever
from sage.libs.rag.promptor import QAPromptor
from sage.libs.rag.generator import OpenAIGenerator
from sage.libs.io.sink import TerminalSink
# 创建执行环境
env = LocalEnvironment("rag_pipeline")
# 构建声明式流水线
(
env.from_source(FileSource, {"file_path": "questions.txt"})
.map(DenseRetriever, {"model": "sentence-transformers/all-MiniLM-L6-v2"})
.map(QAPromptor, {"template": "基于上下文回答: {context}\n问: {query}\n答:"})
.map(OpenAIGenerator, {"model": "gpt-3.5-turbo"})
.sink(TerminalSink)
)
# 执行流水线
env.submit()
为什么这很重要¶
灵活性: 无需修改执行逻辑即可修改流水线结构。轻松替换组件、添加监控或更改部署目标。
透明性: 通过内置的可观测性和调试工具,清楚地了解每一步发生的事情。
性能: 基于数据流分析的自动优化、并行化和资源管理。
可靠性: 内置容错、检查点和错误恢复机制。
架构设计¶
系统架构¶
SAGE 基于分层架构构建,提供灵活性、可扩展性和可维护性。架构由五个主要层次组成:
- 用户层: 使用 SAGE 构建的应用(RAG、Agent、Memory、QA 系统)
- API 层: LocalEnvironment 和 RemoteEnvironment 用于不同的执行上下文
- 核心层: Dispatcher、Job Manager、Service Manager 和运行时执行引擎
- 库层: RAG 流水线、Agent 框架、Memory 存储、中间件组件
- 基础设施层: 计算后端(Ray、本地)、数据存储、模型服务、监控
模块化设计¶
SAGE 遵循清晰的关注点分离,具有无缝协作的可插拔组件:
- Core (sage-kernel): 流处理引擎和执行环境
- Libraries (sage-libs): 丰富的 AI、I/O、转换和工具算子
- Kernel (sage-kernel): 分布式计算原语和通信
- Middleware (sage-middleware): 服务发现、监控和管理
- Common (sage-common): 共享工具、配置和日志
生产级特性¶
为满足企业需求的实际部署而构建:
- 分布式执行: 通过自动负载均衡跨多个节点扩展
- 容错机制: 全面的错误处理和恢复机制
- 可观测性: 详细的指标、日志和性能监控
- 安全性: 身份验证、授权和数据加密支持
- 集成: 为流行的数据库、消息队列和 AI 服务提供原生连接器
🧩 核心原生扩展¶
SAGE 提供两个 C++ 原生扩展,覆盖向量存储与流式计算:
SAGE DB - 向量数据库¶
基于 FAISS 的高性能向量数据库,支持:
- 多模态数据: 文本、图像、音频等多种数据类型
- 元数据过滤: 基于元数据的精确过滤和检索
- Hybrid 检索: 结合向量检索和关键词检索
- 持久化存储: 数据持久化到磁盘并支持增量更新
- 多索引管理: 支持创建和管理多个索引
安装方式:
SAGE Flow - 流式处理引擎¶
向量级流式处理引擎,提供:
- 窗口化算子: 时间窗口、计数窗口、会话窗口
- 低延迟状态更新: 毫秒级状态更新和查询
- RAG 联动: 与向量数据库无缝集成
- 实时处理: 适合实时 Agent 和交互场景
安装方式:
扩展管理¶
# 安装所有扩展
sage extensions install all
# 检查扩展状态
sage extensions status
# 重新编译扩展
sage extensions install all --force
更多扩展正在规划中。您可以在 packages/sage-middleware/src/sage/middleware/components/ 下查看示例并提交提案。
安装¶
我们提供交互式安装器和明确的命令标志。推荐开发者使用开发模式。
克隆仓库并交互式安装¶
git clone https://github.com/intellistream/SAGE.git
cd SAGE
git checkout main-dev
./quickstart.sh # 打开交互式菜单
常用非交互式安装模式¶
# 开发者安装
./quickstart.sh --dev --yes
# 最小核心安装
./quickstart.sh --minimal --yes
# 标准安装 + vLLM 支持
./quickstart.sh --standard --vllm --yes
# 使用系统 Python 而非 conda
./quickstart.sh --minimal --pip --yes
# 查看所有标志
./quickstart.sh --help
快速 PyPI 安装¶
# 选择您的安装模式:
pip install isage[minimal] # 核心功能
pip install isage[standard] # 完整特性
pip install isage[dev] # 所有功能 + 开发工具
注意: PyPI 安装可能不包含所有系统依赖;使用 quickstart.sh 进行完整的环境设置。
关键安装特性¶
- 🎯 为首次用户提供交互式菜单
- 🤖 通过
--vllm集成 vLLM - 🐍 通过
--pip支持 conda 或系统 Python - ⚡ 三种模式: minimal / standard / dev
环境配置¶
安装后,配置您的 API 密钥和环境设置:
快速设置¶
手动设置¶
# 复制环境模板
cp .env.template .env
# 编辑 .env 并添加您的 API 密钥
# 大多数示例需要:
OPENAI_API_KEY=your_openai_api_key_here
HF_TOKEN=your_huggingface_token_here
核心概念¶
Environment(执行环境)¶
Environment 是 SAGE 的执行入口点,提供两种模式:
- LocalEnvironment: 本地多线程执行,适合开发和小规模任务
- RemoteEnvironment: 基于 Ray 的分布式执行,适合生产和大规模任务
from sage.core.api.local_environment import LocalEnvironment
from sage.core.api.remote_environment import RemoteEnvironment
# 本地环境
env = LocalEnvironment("my_pipeline")
# 分布式环境
env = RemoteEnvironment("distributed_pipeline")
DataStream(数据流)¶
DataStream 是 SAGE 的核心抽象,表示数据流。通过链式 API 构建流水线:
# 构建流水线
stream = (
env.from_source(FileSource, {"file_path": "input.txt"})
.map(ProcessFunction, {"param": "value"})
.filter(FilterFunction)
.sink(OutputSink)
)
Function(函数算子)¶
Function 是流水线中的处理单元。SAGE 提供多种函数类型:
- SourceFunction: 数据源(文件、API、数据库等)
- MapFunction: 一对一转换
- FlatMapFunction: 一对多转换
- FilterFunction: 过滤数据
- BatchFunction: 批处理数据源
- SinkFunction: 数据输出(终端、文件、数据库等)
Operator(算子)¶
Operator 封装 Function,提供执行逻辑。支持的算子:
| 算子方法 | 描述 |
|---|---|
from_source() |
从外部系统读取输入数据 |
from_batch() |
批处理数据源 |
map() |
一对一转换 |
flatmap() |
一对多转换 |
filter() |
过滤数据 |
sink() |
定义流的终端输出 |
功能库¶
SAGE 提供丰富的内置功能库,覆盖常见的 AI 应用场景:
RAG (检索增强生成)¶
位于 sage.libs.rag:
- Retriever: 密集检索器(DenseRetriever)、稀疏检索器、混合检索器
- Reranker: 基于交叉编码器的重排序
- Promptor: 提示词构建器(QAPromptor、ChatPromptor)
- Generator: LLM 生成器(OpenAI、vLLM、本地模型)
- Evaluator: 评估指标(BLEU、ROUGE、F1)
Agent (智能体)¶
位于 sage.libs.agent:
- Tool Calling: 工具调用和参数解析
- ReAct: 推理-行动循环
- Planning: 任务规划和分解
- Memory: 对话历史和上下文管理
Memory (内存管理)¶
位于 sage.middleware.components.sage_db:
- VectorDB: 基于 FAISS 的向量数据库
- Metadata Filtering: 元数据过滤
- Multi-Index: 多索引管理
- Persistence: 持久化存储
I/O (输入输出)¶
位于 sage.libs.io:
- Source: FileSource、APISource、StreamSource
- Sink: TerminalSink、FileSink、DatabaseSink
- Serialization: JSON、Pickle、自定义序列化
示例应用¶
完整的示例代码位于 examples 目录:
基础教程¶
- Hello World: 简单的批处理示例
- Stream Processing: 无限流处理
- Service Integration: 微服务集成
RAG 应用¶
- Basic RAG: 基础检索增强生成
- Multi-Document RAG: 多文档检索
- Conversational RAG: 对话式 RAG
Agent 应用¶
- Tool Agent: 工具调用 Agent
- ReAct Agent: 推理-行动 Agent
- Planning Agent: 任务规划 Agent
CLI 工具¶
SAGE 提供强大的命令行工具:
# 系统诊断
sage doctor
# 扩展管理
sage extensions install all
sage extensions status
# 环境配置
sage config env setup
sage config env show
# 开发工具
sage-dev status # 显示开发状态
sage-dev clean # 清理构建产物
# 聊天界面(实验性)
sage chat
下一步¶
许可证¶
SAGE 采用 MIT 许可证。