跳转至

SAGE Kernel 架构概览

SAGE Kernel 是一个现代化的流数据处理框架,采用分层架构设计,提供从API到底层运行时的完整解决方案。

🏗️ 整体架构

┌─────────────────────────────────────────────────────┐
│                   用户应用层                        │
├─────────────────────────────────────────────────────┤
│              API 层 (sage.core.api)              │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐   │
│  │  环境管理   │ │  数据流API  │ │  函数接口   │   │
│  └─────────────┘ └─────────────┘ └─────────────┘   │
├─────────────────────────────────────────────────────┤
│            核心层 (sage.kernel.kernels)            │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐   │
│  │  核心引擎   │ │  变换算子   │ │  执行图     │   │
│  └─────────────┘ └─────────────┘ └─────────────┘   │
├─────────────────────────────────────────────────────┤
│           运行时层 (sage.kernel.runtime)           │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐   │
│  │  任务调度   │ │  通信机制   │ │  服务管理   │   │
│  └─────────────┘ └─────────────┘ └─────────────┘   │
├─────────────────────────────────────────────────────┤
│            工具层 (sage.kernel.utils)              │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐   │
│  │  日志系统   │ │  配置管理   │ │  性能监控   │   │
│  └─────────────┘ └─────────────┘ └─────────────┘   │
├─────────────────────────────────────────────────────┤
│             CLI层 (sage.cli)                │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐   │
│  │  命令工具   │ │  集群管理   │ │  部署工具   │   │
│  └─────────────┘ └─────────────┘ └─────────────┘   │
└─────────────────────────────────────────────────────┘

📦 核心模块

1. API 层 (sage.core.api)

职责: 为用户提供简洁且功能强大的编程接口

  • 环境管理:
  • LocalEnvironment, RemoteEnvironment
  • 管理本地与远程环境配置和连接。

  • 数据流 API:

  • DataStream, ConnectedStreams
  • 提供流数据的表示和处理能力,支持数据管道的构建。

  • 函数接口:

  • 提供用户自定义函数基类的支持,允许用户定义特定的流处理函数。

  • 类型系统:

  • 基于泛型的类型安全保证,编译时错误检查,确保类型安全。

设计特点: - 声明式 API,通过链式调用进行流数据操作。 - 延迟执行:构建计算图时仅定义任务,执行时才会触发计算。 - 类型安全:通过泛型和编译时检查减少运行时错误。


2. 核心层 (sage.kernel.kernels)

职责: 实现核心的流处理引擎和计算逻辑

  • 核心引擎:
  • 流处理引擎,负责流数据的执行和计算调度。

  • 变换算子:

  • 提供常见的流数据变换操作,如 Map, Filter, Join 等。

  • 执行图:

  • 构建和优化流数据的 DAG(有向无环图),决定执行顺序和依赖关系。

  • 任务管理:

  • 负责作业的调度和管理,确保计算任务高效执行。

设计特点: - 高性能 C++ 内核:核心计算模块采用高效的 C++ 编写。 - 内存零拷贝优化:优化内存传输,避免不必要的数据拷贝。 - 动态负载均衡:根据节点负载动态调整任务分配。


3. 运行时层 (sage.kernel.runtime)

职责: 提供底层的资源和任务调度支持

  • 任务调度:
  • 提供分布式任务调度功能,确保任务合理调度。

  • 通信机制:

  • 提供进程间和跨节点的高效通信支持。

  • 服务管理:

  • 管理微服务生命周期,处理服务的启动、停止和监控。

  • 资源管理:

  • 管理内存、CPU、网络等资源,保证资源的合理分配和使用。

设计特点: - 异步 I/O 模型:支持异步非阻塞操作,提高系统响应能力。 - 容错和故障恢复:支持任务失败重试,确保高可靠性。 - 弹性伸缩:根据负载情况动态调整资源。


4. 工具层 (sage.kernel.utils)

职责: 提供通用的工具和辅助功能

  • 日志系统:
  • 提供结构化日志和监控功能,支持实时日志分析。

  • 配置管理:

  • 管理配置文件和环境变量,支持灵活的配置方式。

  • 性能监控:

  • 收集并分析性能指标,帮助优化系统性能。

  • 安全认证:

  • 提供认证和授权机制,确保系统安全性。

5. CLI 层 (sage.cli)

职责: 提供命令行工具和运维支持

  • 开发工具:
  • 提供项目脚手架、调试工具、代码生成工具等。

  • 集群管理:

  • 提供节点管理、服务发现、集群状态监控等功能。

  • 部署工具:

  • 支持应用打包、容器化部署等,简化部署流程。

  • 监控工具:

  • 提供实时监控和日志查看功能,帮助运维人员监控系统健康。

---实时监控、日志查看

🔄 数据流向

用户代码 → API抽象 → 执行图 → 任务调度 → 分布式执行
    ↓         ↓        ↓        ↓         ↓
   DSL    变换算子   DAG优化   资源分配   数据处理

🌟 设计原则

1. 分层解耦

  • 清晰的模块边界,确保各层次之间的职责明确。
  • 接口与实现分离,提高模块的可测试性和可维护性。
  • 易于测试和维护:每层都可以单独进行单元测试,确保模块功能的独立性。

2. 性能优先

  • 零拷贝数据传输:优化数据流传输,减少内存消耗。
  • 异步并发处理:提高任务的并发执行效率。
  • 内存池管理:通过内存池管理优化内存分配和回收,避免内存泄漏。

3. 可扩展性

  • 插件化架构:允许用户根据需求扩展框架功能。
  • 服务化组件:各模块服务化,支持灵活的扩展和替换。
  • 水平扩展支持:支持通过增加节点扩展系统处理能力。

4. 易用性

  • 声明式 API:简洁的代码编写方式,提高开发效率。
  • 自动类型推导:通过类型推导减少显式类型声明,提高代码简洁性。
  • 丰富的工具支持:集成了各种开发、调试、监控工具,简化运维工作。

🔗 模块依赖关系

API层 → 核心层 → 运行时层 → 工具层
 ↓       ↓       ↓       ↓
CLI层 ←─────────────────────┘

Pipeline as Service 架构

随着 SAGE 内核的演进,流水线(Pipeline)与服务(Service)的关系也逐步从分离走向统一。本节总结了最新的 “Pipeline as Service” 设计,重点讲解为什么要演进、核心设计原则以及开发者应该如何使用和迁移。

📜 背景回顾

  • 传统 Flink 风格接口:流水线与服务完全分离。流水线用于数据处理,服务需要单独声明与注册,两者在运行时走不同的代码路径。
  • 存在的问题
  • 代码路径分叉导致维护复杂;
  • 流批/服务之间的组合能力有限;
  • 开发者需要维护重复的服务注册逻辑。

🚀 新的统一式设计

  1. 流水线即服务
  2. 部署 Pipeline 的同时完成服务注册,无需额外的声明步骤。
  3. 服务名与流水线名保持一致,统一通过 process() 接口对外暴露。
  4. 所有服务调用通过标准的 call_service / call_service_async API 完成。

  5. 统一的访问入口

  6. 同一套 API 可以在算子、服务实现、用户脚本甚至批处理作业中复用。
  7. 用户脚本可以通过 bind_runtime_context/call_service 语法糖完成调用。

  8. 透明通信

  9. 请求/响应流程完全统一,底层的队列发现、路由、超时控制由 ProxyManagerServiceManager 负责。
  10. 支持同步 / 异步调用,异步调用返回 Future 对象。

🧱 核心组件

  • BaseRuntimeContext (sage.kernel.runtime.context.base_context)
  • 所有运行时上下文的基础类。
  • 新增 call_service / call_service_async 方法,直接接入 ProxyManager

  • ProxyManager (sage.kernel.runtime.proxy.proxy_manager)

  • 负责服务发现、队列描述符缓存及调用调度。
  • 对下委托 ServiceManager,对上暴露统一的同步 / 异步接口,默认方法名为 process

  • ServiceManager (sage.kernel.runtime.service.service_caller)

  • 维护请求/响应队列、超时与结果匹配。
  • 新增 cache_service_descriptor,避免重复查询服务队列。

  • Sugar API (sage.runtime.sugar)

  • 提供 call_servicecall_service_asyncbind_runtime_context 等工具,方便在算子之外的脚本中复用服务调用能力。

🧰 开发者使用示例

from sage import call_service, call_service_async

# 同步调用另一个流水线
result = call_service("data_cleaning_pipeline", payload)

# 指定具体方法
cached = call_service("cache", key, method="get", timeout=5.0)

# 异步调用
future = call_service_async("ml_inference_pipeline", features)
prediction = future.result()

算子或服务内部可以通过上下文对象直接调用:

class DataProcessor(ProcessorOperator):
    def process(self, ctx: TaskContext, record):
        enriched = ctx.call_service("enrichment_pipeline", record)
        score = ctx.call_service("scoring", enriched, method="score")
        return score

✅ 带来的收益

  • 统一的编程模型:所有上下文都使用同一套调用接口,超时与错误处理逻辑一致。
  • 流水线灵活组合:可以任意组合流式、批处理和服务化组件,形成复合工作流。
  • 减少运维负担:部署流水线即完成服务注册,自动完成服务发现、队列缓存和通信。
  • 性能优化:Proxy 层缓存队列描述符,减少控制面开销;支持异步调用提高吞吐。

🔮 后续规划

  • 负载均衡与健康检查
  • 版本化服务调用与灰度发布
  • 链路追踪、指标采集以及依赖可视化
  • 服务调用鉴权与配额管理

新的架构将 Pipeline 视为一等服务对象,让构建复杂的数据处理与 AI 工作流变得更加简单。开发者只需要专注于业务逻辑,其余的服务发现、通信、超时与资源管理均由框架自动处理。

每一层都有清晰的职责边界,上层依赖下层,但下层不依赖上层,确保了架构的稳定性和可维护性。