有界流处理 (Limited Streaming)¶
处理固定大小的数据集,适合离线批量分析和一次性计算任务。
有界流处理模拟真实世界中的**批量数据分析**场景,如日志分析、报表生成、数据迁移等。它强调完整性和确定性,是构建可靠数据处理系统的基础模式。
技术架构¶
graph LR
A[BatchFunction] --> B[map/filter/flatmap]
B --> C[stateful operator]
C --> D[sink]
D --> E[自动结束]
示例1:WordCount 批处理¶
WordCount(词频统计)是大数据处理领域的经典示例,它通过统计文本中每个单词出现的次数,展示了完整的数据处理流程:数据读取、清洗、转换、聚合和输出。
在批处理模式下,WordCount处理固定的文本数据集,非常适合离线分析、报告生成和数据挖掘等场景。
数据源定义¶
from sage.core.function.batch_function import BatchFunction
class TextDataBatch(BatchFunction):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sentences = [
"hello world sage framework",
"this is a streaming data processing example",
"lambda functions make the code much cleaner",
"word count is a classic big data example",
"sage provides powerful stream processing capabilities"
]
self.counter = 0
def execute(self):
if self.counter >= len(self.sentences):
return None # 返回None表示批处理完成
sentence = self.sentences[self.counter]
self.counter += 1
return sentence
批处理管道¶
from sage.core.api.local_environment import LocalEnvironment
from collections import Counter
import time
def main():
# 创建本地执行环境
env = LocalEnvironment("batch_wordcount")
# 准备状态管理
word_counts = Counter()
total_processed = 0
def update_word_count(words_with_count):
"""更新全局词汇计数"""
nonlocal word_counts, total_processed
word, count = words_with_count
word_counts[word] += count
total_processed += count
return words_with_count
# 构建批处理管道
(env
.from_batch(TextDataBatch) # 批数据源
# 数据清洗和预处理
.map(lambda sentence: sentence.lower()) # 转小写
.map(lambda sentence: sentence.strip()) # 去除首尾空白
.filter(lambda sentence: len(sentence) > 0) # 过滤空字符串
# 分词处理
.flatmap(lambda sentence: sentence.split()) # 按空格分词
.filter(lambda word: len(word) > 2) # 过滤长度小于3的词
.map(lambda word: word.replace(",", "").replace(".", "")) # 去除标点
# 词汇统计
.map(lambda word: (word, 1)) # 转换为(word, count)格式
.map(update_word_count) # 更新计数器
.sink(lambda x: None) # 添加sink确保数据流完整
)
print("🚀 Starting Batch WordCount Example")
try:
# 提交并运行批处理作业
env.submit()
time.sleep(2) # 等待批处理完成
# 打印最终统计结果
print("\n📊 Final Word Count Results:")
print("=" * 60)
for word, count in word_counts.most_common():
print(f"{word:20}: {count:3d}")
print("=" * 60)
print(f"Total words processed: {total_processed}")
except Exception as e:
print(f"❌ 批处理执行失败: {str(e)}")
finally:
env.close()
if __name__ == "__main__":
main()
关键技术特点¶
1. 批量状态累积¶
- 使用nonlocal
关键字访问外部作用域变量
- Counter
对象自动处理词频统计
- 所有数据处理完成后才输出最终结果
2. 确定性处理流程¶
- 与无界流不同,批处理有明确的开始和结束 - 数据源返回None
时自动结束处理
- 必须使用 .sink()
确保管道完整执行
示例2:RAG问答批处理¶
在第一个WordCount示例展示了基础的文本处理和统计功能后,我们进入一个更复杂的应用场景:基于检索增强生成(RAG)的问答系统。
这个示例演示了如何使用SAGE框架构建智能问答管道,包括问题批量处理、知识检索、提示词生成和大模型回答等关键组件的协作。
技术架构¶
graph LR
A[问题文件] --> B[QABatch]
B --> C[BiologyRetriever]
C --> D[知识库检索]
D --> E[QAPromptor]
E --> F[OpenAIGenerator]
F --> G[TerminalSink]
H[MemoryService] --> D
I[配置文件] --> C
I --> E
I --> F
数据源定义¶
基于实际的qa_batch.py代码,从文件读取问题进行批处理:
from sage.core.function.batch_function import BatchFunction
class QABatch(BatchFunction):
"""QA批处理数据源:从配置文件中读取数据文件并逐行返回"""
def __init__(self, config, **kwargs):
super().__init__(**kwargs)
self.data_path = config["data_path"]
self.counter = 0
self.questions = []
self._load_questions()
def _load_questions(self):
"""从文件加载问题"""
try:
with open(self.data_path, 'r', encoding='utf-8') as file:
self.questions = [line.strip() for line in file.readlines() if line.strip()]
except Exception as e:
print(f"Error loading file {self.data_path}: {e}")
self.questions = []
def execute(self):
"""返回下一个问题,如果没有更多问题则返回None"""
if self.counter >= len(self.questions):
return None # 返回None表示批处理完成
question = self.questions[self.counter]
self.counter += 1
return question
知识检索组件¶
知识检索是RAG系统的核心组件之一,负责从向量数据库中检索与用户问题相关的知识片段。以下是检索器的配置:
from sage.core.function.map_function import MapFunction
class BiologyRetriever(MapFunction):
"""生物学知识检索器"""
def __init__(self, config, **kwargs):
super().__init__(**kwargs)
self.config = config
self.collection_name = config.get("collection_name", "biology_rag_knowledge")
self.index_name = config.get("index_name", "biology_index")
self.topk = config.get("ltm", {}).get("topk", 3)
def execute(self, data):
if not data:
return None
query = data
# 从生物学知识库检索相关知识
try:
result = self.call_service["memory_service"].retrieve_data(
collection_name=self.collection_name,
query_text=query,
topk=self.topk,
index_name=self.index_name,
with_metadata=True
)
if result['status'] == 'success':
# 返回包含查询和检索结果的元组
retrieved_texts = [item.get('text', '') for item in result['results']]
return (query, retrieved_texts)
else:
return (query, [])
except Exception as e:
return (query, [])
RAG批处理管道¶
RAG管道将问题处理、知识检索、提示词构造和答案生成串联成完整的问答流程。与WordCount的简单文本处理不同,这里涉及复杂的服务依赖和AI模型调用:
from sage.core.api.local_environment import LocalEnvironment
from sage.lib.rag.generator import OpenAIGenerator
from sage.lib.rag.promptor import QAPromptor
from sage.lib.io.sink import TerminalSink
from sage.service.memory.memory_service import MemoryService
from sage.utils.embedding_methods.embedding_api import apply_embedding_model
def pipeline_run(config: dict) -> None:
"""创建并运行数据处理管道"""
env = LocalEnvironment()
# 注册memory service并连接到生物学知识库
def memory_service_factory():
# 创建memory service实例
embedding_model = apply_embedding_model("default")
memory_service = MemoryService()
# 检查生物学知识库是否存在
try:
collections = memory_service.list_collections()
if collections["status"] != "success":
return None
collection_names = [c["name"] for c in collections["collections"]]
if "biology_rag_knowledge" not in collection_names:
return None
# 连接到现有的知识库
collection = memory_service.manager.connect_collection(
"biology_rag_knowledge", embedding_model
)
if not collection:
return None
except Exception as e:
return None
return memory_service
# 注册服务到环境中
env.register_service("memory_service", memory_service_factory)
# 构建数据处理流程 - 使用自定义的生物学检索器
(env
.from_batch(QABatch, config["source"])
.map(BiologyRetriever, config["retriever"])
.map(QAPromptor, config["promptor"])
.map(OpenAIGenerator, config["generator"]["vllm"])
.sink(TerminalSink, config["sink"])
)
env.submit()
time.sleep(10) # 增加等待时间确保处理完成
env.close()
关键说明¶
.from_batch(QABatch, config["source"])
:从文件批量读取问题BiologyRetriever
:从知识库检索相关生物学知识QAPromptor
:将问题和知识组合成提示词OpenAIGenerator
:调用大模型生成答案TerminalSink
:将结果输出到终端
代码关键细节解析¶
前面展示了两个不同复杂度的批处理示例,现在我们深入分析实现这些功能的关键技术细节。
1. 状态管理机制¶
def update_word_count(words_with_count):
nonlocal word_counts, total_processed # 关键:访问外部作用域变量
word, count = words_with_count
word_counts[word] += count # Counter对象自动处理键不存在的情况
total_processed += count
return words_with_count # 重要:必须返回数据继续流转
设计要点:
- nonlocal
声明允许修改外部作用域的变量
- Counter()
对象在键不存在时自动初始化为0
- 函数必须返回数据以保持流式处理的连续性
2. 数据源生命周期¶
重要约定:返回 None
是告诉 SAGE 框架批处理已完成的**唯一方式**,任何其他返回值都会被视为有效数据。
3. 服务注册机制¶
框架特性:
- SAGE支持服务注册,实现组件间的依赖注入
- self.call_service["memory_service"]
可访问注册的服务
- memory_service_factory负责创建和配置知识库连接
小结¶
有界流处理通过**固定数据源**、明确结束信号**和**状态累积**机制,实现**可复现、**自动结束**的批量数据处理流程。
关键特点:
- 确定性:相同输入产生相同输出,便于调试和测试
- 完整性:确保所有数据都被处理,不会遗漏
- 自动结束:数据源返回 None
时管道自动停止
- 状态聚合:支持跨数据项的状态累积和最终结果汇总
- 服务集成:支持复杂的依赖注入和组件协作
适用场景:数据分析、报表生成、模型训练、批量数据处理、批量RAG问答等需要处理完整数据集的场景。