批处理示例:Hello World¶
通过一个简单的批处理任务,快速了解 SAGE 的基本用法
什么是批处理?¶
批处理(Batch Processing) 是指处理 有界流(Bounded Stream) 数据的任务。与持续运行的流处理不同,批处理任务:
- ✅ 数据量是有限的
- ✅ 任务可以自动结束
- ✅ 适合一次性数据处理场景
常见的批处理场景包括: - 处理文件中的数据 - 执行一次性的数据转换 - 批量导入/导出数据
Hello World 示例¶
让我们通过一个完整的示例来学习如何使用 SAGE 进行批处理。
完整代码¶
创建文件 hello_batch.py:
运行示例¶
预期输出:
HELLO, WORLD! #1
HELLO, WORLD! #2
HELLO, WORLD! #3
HELLO, WORLD! #4
HELLO, WORLD! #5
HELLO, WORLD! #6
HELLO, WORLD! #7
HELLO, WORLD! #8
HELLO, WORLD! #9
HELLO, WORLD! #10
Hello World 批处理示例结束
代码详解¶
SAGE 批处理程序由以下几个核心部分组成:
1️⃣ 数据源(BatchFunction)¶
class HelloBatch(BatchFunction):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.counter = 0
self.max_count = 10
def execute(self):
if self.counter >= self.max_count:
return None # 通知框架数据已生成完毕
self.counter += 1
return f"Hello, World! #{self.counter}"
关键点:
- 继承自 BatchFunction
- 实现 execute() 方法,每次调用返回一条数据
- 返回 None 表示批处理结束,框架会自动停止数据源
2️⃣ 处理算子(MapFunction)¶
关键点:
- 继承自 MapFunction
- 实现 execute(data) 方法,接收上游数据,返回处理后的结果
- 一对一转换:输入一条数据,输出一条数据
3️⃣ 输出目标(SinkFunction)¶
关键点:
- 继承自 SinkFunction
- 实现 execute(data) 方法,定义如何输出数据
- 不需要返回值
4️⃣ 构建 Pipeline¶
env = LocalEnvironment("Hello_World")
env.from_batch(HelloBatch).map(UpperCaseMap).sink(PrintSink)
env.submit(autostop=True)
关键点:
- LocalEnvironment 创建本地执行环境
- from_batch() 指定批处理数据源
- map() 添加数据转换算子
- sink() 指定输出目标
- submit(autostop=True) 提交并执行,批处理完成后自动停止
数据流动过程¶
HelloBatch (数据源)
│
├─ "Hello, World! #1"
├─ "Hello, World! #2"
├─ ...
└─ None (结束信号)
↓
UpperCaseMap (转换)
│
├─ "HELLO, WORLD! #1"
├─ "HELLO, WORLD! #2"
└─ ...
↓
PrintSink (输出)
│
└─ 打印到控制台
常见批处理场景¶
场景 1:处理文件数据¶
from sage.core.api.function.batch_function import BatchFunction
class FileBatch(BatchFunction):
def __init__(self, file_path, **kwargs):
super().__init__(**kwargs)
self.file = open(file_path, 'r')
self.lines = self.file.readlines()
self.index = 0
def execute(self):
if self.index >= len(self.lines):
self.file.close()
return None
line = self.lines[self.index].strip()
self.index += 1
return line
场景 2:批量处理列表数据¶
class ListBatch(BatchFunction):
def __init__(self, data_list, **kwargs):
super().__init__(**kwargs)
self.data = data_list
self.index = 0
def execute(self):
if self.index >= len(self.data):
return None
item = self.data[self.index]
self.index += 1
return item
# 使用示例
data = ["apple", "banana", "cherry", "date"]
env.from_batch(ListBatch, data_list=data).map(processor).sink(output)
场景 3:分批处理大数据集¶
class ChunkedBatch(BatchFunction):
def __init__(self, total_records, chunk_size=100, **kwargs):
super().__init__(**kwargs)
self.total = total_records
self.chunk_size = chunk_size
self.processed = 0
def execute(self):
if self.processed >= self.total:
return None
# 返回一批数据
chunk = []
for i in range(self.chunk_size):
if self.processed >= self.total:
break
chunk.append(self.fetch_data(self.processed))
self.processed += 1
return chunk
def fetch_data(self, index):
# 从数据库或其他源获取数据
return {"id": index, "data": f"record_{index}"}
批处理 vs 流处理¶
| 特性 | 批处理(Batch) | 流处理(Stream) |
|---|---|---|
| 数据量 | 有限(有界流) | 无限(无界流) |
| 执行时间 | 自动结束 | 持续运行 |
| 数据源 | from_batch() |
from_stream() |
| 结束标志 | 返回 None |
不主动结束 |
| 适用场景 | 文件处理、一次性任务 | 实时数据、持续监控 |
最佳实践¶
✅ DO(推荐做法)¶
-
使用
autostop=True- 批处理完成后自动停止 -
明确返回
None- 清晰地表示批处理结束 -
资源清理 - 在返回
None前关闭文件/连接 -
关闭调试日志 - 生产环境减少日志噪音
❌ DON'T(避免做法)¶
-
忘记返回
None- 会导致程序无法自动结束 -
未使用
autostop- 批处理结束后仍保持运行 -
忘记调用
super().__init__()
下一步¶
- 📖 学习 流式处理 101 了解无界流处理
- 🔧 探索 Filter 算子 学习数据过滤
- 🚀 查看 MapFunction 了解更多数据转换方式
- 📚 阅读 高级教程 学习复杂场景处理
总结¶
本教程介绍了:
- ✅ 批处理的概念和适用场景
- ✅ 如何创建 BatchFunction 数据源
- ✅ 如何构建批处理 Pipeline
- ✅ 批处理的生命周期管理
- ✅ 常见批处理模式和最佳实践
现在您已经掌握了 SAGE 批处理的基础!试着修改示例代码,创建您自己的批处理应用吧!🚀