SAGE Kernel 架构概览¶
SAGE Kernel 是一个现代化的流数据处理框架,采用分层架构设计,提供从API到底层运行时的完整解决方案。
🏗️ 整体架构¶
┌─────────────────────────────────────────────────────┐
│ 用户应用层 │
├─────────────────────────────────────────────────────┤
│ API 层 (sage.core.api) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 环境管理 │ │ 数据流API │ │ 函数接口 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ 核心层 (sage.kernel.kernels) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 核心引擎 │ │ 变换算子 │ │ 执行图 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ 运行时层 (sage.kernel.runtime) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 任务调度 │ │ 通信机制 │ │ 服务管理 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ 工具层 (sage.kernel.utils) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 日志系统 │ │ 配置管理 │ │ 性能监控 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ CLI层 (sage.cli) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 命令工具 │ │ 集群管理 │ │ 部署工具 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────┘
📦 核心模块¶
1. API 层 (sage.core.api
)¶
职责: 为用户提供简洁且功能强大的编程接口
- 环境管理:
LocalEnvironment
,RemoteEnvironment
-
管理本地与远程环境配置和连接。
-
数据流 API:
DataStream
,ConnectedStreams
-
提供流数据的表示和处理能力,支持数据管道的构建。
-
函数接口:
-
提供用户自定义函数基类的支持,允许用户定义特定的流处理函数。
-
类型系统:
- 基于泛型的类型安全保证,编译时错误检查,确保类型安全。
设计特点: - 声明式 API,通过链式调用进行流数据操作。 - 延迟执行:构建计算图时仅定义任务,执行时才会触发计算。 - 类型安全:通过泛型和编译时检查减少运行时错误。
2. 核心层 (sage.kernel.kernels
)¶
职责: 实现核心的流处理引擎和计算逻辑
- 核心引擎:
-
流处理引擎,负责流数据的执行和计算调度。
-
变换算子:
-
提供常见的流数据变换操作,如
Map
,Filter
,Join
等。 -
执行图:
-
构建和优化流数据的 DAG(有向无环图),决定执行顺序和依赖关系。
-
任务管理:
- 负责作业的调度和管理,确保计算任务高效执行。
设计特点: - 高性能 C++ 内核:核心计算模块采用高效的 C++ 编写。 - 内存零拷贝优化:优化内存传输,避免不必要的数据拷贝。 - 动态负载均衡:根据节点负载动态调整任务分配。
3. 运行时层 (sage.kernel.runtime
)¶
职责: 提供底层的资源和任务调度支持
- 任务调度:
-
提供分布式任务调度功能,确保任务合理调度。
-
通信机制:
-
提供进程间和跨节点的高效通信支持。
-
服务管理:
-
管理微服务生命周期,处理服务的启动、停止和监控。
-
资源管理:
- 管理内存、CPU、网络等资源,保证资源的合理分配和使用。
设计特点: - 异步 I/O 模型:支持异步非阻塞操作,提高系统响应能力。 - 容错和故障恢复:支持任务失败重试,确保高可靠性。 - 弹性伸缩:根据负载情况动态调整资源。
4. 工具层 (sage.kernel.utils
)¶
职责: 提供通用的工具和辅助功能
- 日志系统:
-
提供结构化日志和监控功能,支持实时日志分析。
-
配置管理:
-
管理配置文件和环境变量,支持灵活的配置方式。
-
性能监控:
-
收集并分析性能指标,帮助优化系统性能。
-
安全认证:
- 提供认证和授权机制,确保系统安全性。
5. CLI 层 (sage.cli
)¶
职责: 提供命令行工具和运维支持
- 开发工具:
-
提供项目脚手架、调试工具、代码生成工具等。
-
集群管理:
-
提供节点管理、服务发现、集群状态监控等功能。
-
部署工具:
-
支持应用打包、容器化部署等,简化部署流程。
-
监控工具:
- 提供实时监控和日志查看功能,帮助运维人员监控系统健康。
---实时监控、日志查看
🔄 数据流向¶
🌟 设计原则¶
1. 分层解耦¶
- 清晰的模块边界,确保各层次之间的职责明确。
- 接口与实现分离,提高模块的可测试性和可维护性。
- 易于测试和维护:每层都可以单独进行单元测试,确保模块功能的独立性。
2. 性能优先¶
- 零拷贝数据传输:优化数据流传输,减少内存消耗。
- 异步并发处理:提高任务的并发执行效率。
- 内存池管理:通过内存池管理优化内存分配和回收,避免内存泄漏。
3. 可扩展性¶
- 插件化架构:允许用户根据需求扩展框架功能。
- 服务化组件:各模块服务化,支持灵活的扩展和替换。
- 水平扩展支持:支持通过增加节点扩展系统处理能力。
4. 易用性¶
- 声明式 API:简洁的代码编写方式,提高开发效率。
- 自动类型推导:通过类型推导减少显式类型声明,提高代码简洁性。
- 丰富的工具支持:集成了各种开发、调试、监控工具,简化运维工作。
🔗 模块依赖关系¶
Pipeline as Service 架构¶
随着 SAGE 内核的演进,流水线(Pipeline)与服务(Service)的关系也逐步从分离走向统一。本节总结了最新的 “Pipeline as Service” 设计,重点讲解为什么要演进、核心设计原则以及开发者应该如何使用和迁移。
📜 背景回顾¶
- 传统 Flink 风格接口:流水线与服务完全分离。流水线用于数据处理,服务需要单独声明与注册,两者在运行时走不同的代码路径。
- 存在的问题:
- 代码路径分叉导致维护复杂;
- 流批/服务之间的组合能力有限;
- 开发者需要维护重复的服务注册逻辑。
🚀 新的统一式设计¶
- 流水线即服务
- 部署 Pipeline 的同时完成服务注册,无需额外的声明步骤。
- 服务名与流水线名保持一致,统一通过
process()
接口对外暴露。 -
所有服务调用通过标准的
call_service
/call_service_async
API 完成。 -
统一的访问入口
- 同一套 API 可以在算子、服务实现、用户脚本甚至批处理作业中复用。
-
用户脚本可以通过
bind_runtime_context
/call_service
语法糖完成调用。 -
透明通信
- 请求/响应流程完全统一,底层的队列发现、路由、超时控制由
ProxyManager
与ServiceManager
负责。 - 支持同步 / 异步调用,异步调用返回
Future
对象。
🧱 核心组件¶
- BaseRuntimeContext (
sage.kernel.runtime.context.base_context
) - 所有运行时上下文的基础类。
-
新增
call_service
/call_service_async
方法,直接接入ProxyManager
。 -
ProxyManager (
sage.kernel.runtime.proxy.proxy_manager
) - 负责服务发现、队列描述符缓存及调用调度。
-
对下委托
ServiceManager
,对上暴露统一的同步 / 异步接口,默认方法名为process
。 -
ServiceManager (
sage.kernel.runtime.service.service_caller
) - 维护请求/响应队列、超时与结果匹配。
-
新增
cache_service_descriptor
,避免重复查询服务队列。 -
Sugar API (
sage.runtime.sugar
) - 提供
call_service
、call_service_async
、bind_runtime_context
等工具,方便在算子之外的脚本中复用服务调用能力。
🧰 开发者使用示例¶
from sage import call_service, call_service_async
# 同步调用另一个流水线
result = call_service("data_cleaning_pipeline", payload)
# 指定具体方法
cached = call_service("cache", key, method="get", timeout=5.0)
# 异步调用
future = call_service_async("ml_inference_pipeline", features)
prediction = future.result()
算子或服务内部可以通过上下文对象直接调用:
class DataProcessor(ProcessorOperator):
def process(self, ctx: TaskContext, record):
enriched = ctx.call_service("enrichment_pipeline", record)
score = ctx.call_service("scoring", enriched, method="score")
return score
✅ 带来的收益¶
- 统一的编程模型:所有上下文都使用同一套调用接口,超时与错误处理逻辑一致。
- 流水线灵活组合:可以任意组合流式、批处理和服务化组件,形成复合工作流。
- 减少运维负担:部署流水线即完成服务注册,自动完成服务发现、队列缓存和通信。
- 性能优化:Proxy 层缓存队列描述符,减少控制面开销;支持异步调用提高吞吐。
🔮 后续规划¶
- 负载均衡与健康检查
- 版本化服务调用与灰度发布
- 链路追踪、指标采集以及依赖可视化
- 服务调用鉴权与配额管理
新的架构将 Pipeline 视为一等服务对象,让构建复杂的数据处理与 AI 工作流变得更加简单。开发者只需要专注于业务逻辑,其余的服务发现、通信、超时与资源管理均由框架自动处理。
每一层都有清晰的职责边界,上层依赖下层,但下层不依赖上层,确保了架构的稳定性和可维护性。