分布式 Pipeline¶
目标:学习如何构建可扩展的分布式流式处理应用
概述¶
SAGE 基于 Ray 构建分布式执行能力,支持在多节点集群上运行大规模流式处理任务。
分布式环境配置¶
启动 Ray 集群¶
# 启动 Head 节点
sage cluster start --head
# 在其他机器上启动 Worker 节点
sage cluster start --worker --head-address=<head-node-ip>:10001
配置分布式环境¶
from sage.kernel.api import RemoteEnvironment
# 创建远程分布式执行环境
env = RemoteEnvironment(
name="distributed_app",
host="127.0.0.1", # JobManager 服务地址
port=19001, # JobManager 服务端口
config={
"ray": {
"address": "ray://localhost:10001", # Ray 集群地址
"num_cpus": 16, # 总 CPU 数
"num_gpus": 4, # 总 GPU 数
}
}
)
注意:分布式执行需要使用
RemoteEnvironment,它会将作业提交到远程的 JobManager 服务。
并行处理¶
设置并行度¶
from sage.libs.io import FileSource, ConsoleSink
# 并行读取和处理
stream = (
env.from_source(FileSource("large_dataset/"))
.map(
ProcessFunction(),
parallelism=8 # 8 个并行实例
)
.filter(FilterFunction(), parallelism=4)
.sink(ConsoleSink())
)
env.execute()
资源分配¶
⚠️ 功能开发中:当前版本的
map()方法仅支持parallelism参数。细粒度的资源分配功能(如
num_cpus、memory、num_gpus)正在开发中。相关 Issue: #TODO: 添加算子级别的资源配置支持
当前可用的并行度配置:
# 当前支持:设置并行度
stream = (
env.from_source(source)
.map(HeavyComputeOperator(), parallelism=4) # 4 个并行实例
.map(GPUInferenceOperator(), parallelism=2) # 2 个并行实例
.sink(sink)
)
未来计划支持的资源配置(开发中):
# 计划支持:细粒度资源分配
stream = (
env.from_source(source)
.map(
HeavyComputeOperator(),
parallelism=4,
resources={
"num_cpus": 4, # 每个实例 4 核
"memory": "8GB" # 每个实例 8GB 内存
}
)
.map(
GPUInferenceOperator(),
parallelism=2,
resources={
"num_gpus": 1 # 每个实例 1 个 GPU
}
)
.sink(sink)
)
分布式 RAG Pipeline¶
并行 Embedding¶
from sage.middleware.operators.rag import VLLMEmbeddingOperator, ChromaUpsertOperator
# 大规模文档并行嵌入
stream = (
env.from_source(ChunkedFileSource("documents/"))
.map(
VLLMEmbeddingOperator(
model="sentence-transformers/all-MiniLM-L6-v2"
),
parallelism=8 # 8 个并行 embedding 实例
)
.sink(
ChromaUpsertOperator(collection="distributed_docs")
)
)
env.execute()
并行检索和生成¶
from sage.middleware.operators.rag import ChromaRetrieverOperator, OpenAIGeneratorOperator
# 高并发查询处理
stream = (
env.from_source(QuerySource())
.map(
ChromaRetrieverOperator(collection="docs", top_k=5),
parallelism=4 # 4 个并行检索实例
)
.map(
OpenAIGeneratorOperator(model="gpt-4"),
parallelism=8 # 8 个并行生成实例
)
.sink(ResponseSink())
)
env.execute()
数据分区策略¶
Key-Based 分区¶
# 按 key 分区,确保相同 key 的数据到同一个实例
stream = (
env.from_source(source)
.key_by(lambda record: record["user_id"]) # 按用户 ID 分区
.map(UserSessionOperator(), parallelism=4)
.sink(sink)
)
自定义分区¶
from sage.kernel.api.partitioner import Partitioner
class CustomPartitioner(Partitioner):
def partition(self, record, num_partitions):
# 自定义分区逻辑
hash_value = hash(record["key"])
return hash_value % num_partitions
stream = (
env.from_source(source)
.partition_custom(CustomPartitioner(), parallelism=4)
.map(operator)
.sink(sink)
)
监控和调试¶
查看集群状态¶
资源使用监控¶
# 在代码中获取资源使用情况
from sage.kernel.api.runtime import RuntimeContext
class MonitoredOperator(MapFunction):
def open(self, context: RuntimeContext):
self.metrics = context.get_metrics()
def map(self, record):
# 记录处理时间
start = time.time()
result = self.process(record)
duration = time.time() - start
self.metrics.record("processing_time", duration)
return result
最佳实践¶
✅ 推荐做法¶
- 合理设置并行度 - 根据数据量和资源情况设置,避免过度并行
- 资源预估 - 提前评估每个算子的资源需求
- 数据分区 - 使用 key_by 保证有状态操作的正确性
- 监控指标 - 持续监控资源使用和处理延迟
❌ 避免的问题¶
- 并行度设置过高导致调度开销增加
- 未考虑数据倾斜导致部分节点过载
- GPU 资源分配不均导致利用率低
- 忽略网络传输开销
故障排查¶
常见问题¶
问题 1:任务启动慢
- 检查 Ray 集群连接状态
- 确认资源配置是否合理
- 查看是否有资源争用
问题 2:部分节点空闲
- 检查数据分区是否均衡
- 调整并行度配置
- 使用 key-based 分区避免数据倾斜
问题 3:内存溢出
- 减少单个算子实例的并行度
- 增加每个实例的内存配置
- 优化算子的内存使用(见性能调优)
相关阅读¶
- Kernel 用户指南 - 执行引擎详解
- 性能调优 - 优化分布式性能
- 容错与可靠性 - 分布式容错机制
下一步:学习 自定义算子 封装业务逻辑