SAGE Kernel API 模块¶
SAGE Kernel API 模块为流数据处理提供了简洁而强大的编程接口。采用声明式编程范式,让开发者能够专注于业务逻辑而不是底层实现细节。
📋 API 模块概览¶
sage.core.api/
├── __init__.py # API 主入口,导出所有公共接口
├── base_environment.py # 环境基类,定义通用接口
├── local_environment.py # 本地环境实现
├── remote_environment.py # 远程/分布式环境实现
├── datastream.py # 数据流核心类
├── connected_streams.py # 连接流管理
└── function/ # 函数接口目录
├── __init__.py
├── base_function.py # 函数基类
├── map_function.py # Map函数
├── filter_function.py # Filter函数
├── sink_function.py # Sink函数
├── source_function.py # Source函数
├── keyby_function.py # KeyBy函数
├── flatmap_function.py # FlatMap函数
├── comap_function.py # CoMap函数
└── join_function.py # Join函数
🎯 核心组件¶
1. 环境管理 (Environments)¶
- BaseEnvironment: 环境抽象基类
- LocalEnvironment: 本地单机环境
- RemoteEnvironment: 分布式集群环境
2. 数据流处理 (DataStreams)¶
- DataStream: 核心数据流类,支持链式操作
- ConnectedStreams: 多流连接和协同处理
3. 函数接口 (Functions)¶
- 转换函数: Map, FlatMap, Filter等
- 路由函数: KeyBy, Partition等
- 输出函数: Sink, Print等
- 连接函数: Join, CoMap等
🔄 典型工作流程¶
# 1. 创建环境
env = LocalEnvironment("my_app")
# 2. 创建数据源
stream = env.from_batch([1, 2, 3, 4, 5])
# 3. 数据转换 (构建计算图)
result = (stream
.map(lambda x: x * 2)
.filter(lambda x: x > 5)
.sink(print))
# 4. 提交执行
env.submit()
📚 详细文档¶
🌟 设计特性¶
类型安全¶
from typing import TypeVar
T = TypeVar('T')
U = TypeVar('U')
class DataStream(Generic[T]):
def map(self, func: Callable[[T], U]) -> DataStream[U]:
# 编译时类型检查
pass
延迟执行¶
# 构建阶段 - 只创建计算图,不执行
stream = env.from_kafka("topic").map(process).sink(output)
# 执行阶段 - 调用submit()时才开始处理数据
env.submit()
链式调用¶
# 支持流畅的链式API
result = (data_stream
.map(transform1)
.filter(condition)
.map(transform2)
.sink(output))
🔌 扩展机制¶
自定义函数¶
class MyMapFunction(MapFunction[int, str]):
def map(self, value: int) -> str:
return f"Value: {value}"
stream.map(MyMapFunction())
自定义数据源¶
class MySourceFunction(SourceFunction[dict]):
def run(self, ctx: SourceContext[dict]):
# 自定义数据源逻辑
pass
stream = env.add_source(MySourceFunction())
📖 最佳实践¶
- 环境隔离: 不同应用使用独立环境
- 资源管理: 及时关闭环境释放资源
- 错误处理: 实现适当的错误处理逻辑
- 性能优化: 合理设置并行度和缓冲区大小
- 类型注解: 使用类型注解提高代码可读性
🚀 快速开始¶
查看 快速开始指南 了解如何快速上手 SAGE Kernel API。