无界流处理 (Infinite Stream)¶
本例位于 SAGE Repo 下的
examples/tutorials/core-api/hello_wordcount_source_example.py
graph LR
A[SentenceBatch] --> B[SplitWords]
B --> C[WordToPair]
C --> D[PrintResult]
实例解析¶
无界流处理模拟真实世界中的 连续数据流,如传感器数据、用户点击流、消息队列等。它强调实时性和响应性,是构建现代数据驱动应用的核心模式。
示例程序如下:
本例由有界流改动而来,下面我们将对这些改动部分逐一进行概述,以便理解 SAGE 无界流:
A. SentenceSource
:无界数据源¶
class SentenceSource(SourceFunction):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sentences = [
"hello world",
"hello sage",
"hello chatgpt",
"world of ai",
"sage world"
]
self.index = 0
def execute(self):
# 无限流:每次输出一句话,模拟流数据源
if self.index >= len(self.sentences):
self.index = 0 # 重置索引,实现循环输出
sentence = self.sentences[self.index]
self.index += 1
return sentence
说明:
- 继承自
SourceFunction
,需要实例化抽象接口execute()
,用于生成数据,配合env.from_source
实现无界数据输入。
B. sleep
:阻塞线程¶
说明:
- 在无界流中,
autostop=True
无法暂停 Pipeline。 submit
提交的任务由后台线程执行,主线程不阻塞的话,会自动停止。
C.运行效果¶
控制台输出效果(会有很多输出不断生成):
JobManager logs: ***/.sage/logs/jobmanager/session_***
当前单词计数:
hello: 527
world: 526
sage: 350
chatgpt: 175
of: 175
ai: 175
------
当前单词计数:
hello: 527
world: 526
sage: 351
chatgpt: 175
of: 175
ai: 175
...
结语¶
env.from_source()
和 SourceFunction
共同组成了有界流处理的基础。SAGE的 LocalEnvironment
中,主线程会将提交的任务分配给 Python 从线程执行,需要用户继续显式的阻塞,否则会自动结束从而关闭从线程的 Pipeline。有界流处理的相关接口可以处理持续产生的数据流,适合实时数据分析、监控和交互式应用。