跳转至

环境管理 (Environments)

环境 (Environment) 是 SAGE Kernel 的核心概念,它定义了数据流应用的执行上下文。环境负责管理计算资源、调度任务、处理服务注册等。

🌍 环境类型

1. LocalEnvironment (本地环境)

适用于单机开发、测试和小规模数据处理。

from sage.core.api.local_environment import LocalEnvironment

# 创建本地环境
env = LocalEnvironment("my_local_app")

# 配置选项
env = LocalEnvironment(
    name="my_app",
    config={
        "parallelism": 4,          # 并行度
        "buffer_size": 10000,      # 缓冲区大小
        "checkpoint_interval": 30  # 检查点间隔(秒)
    }
)

2. RemoteEnvironment (远程环境)

适用于生产环境和分布式集群部署。

from sage.core.api.remote_environment import RemoteEnvironment

# 创建远程环境
env = RemoteEnvironment(
    name="my_cluster_app",
    config={
        "jobmanager_host": "cluster-master",
        "jobmanager_port": 8081,
        "taskmanager_slots": 8
    }
)

🔧 环境配置

基础配置

config = {
    # 执行配置
    "parallelism": 4,              # 默认并行度
    "max_parallelism": 128,        # 最大并行度
    "buffer_size": 10000,          # 数据缓冲区大小

    # 容错配置
    "restart_strategy": "fixed-delay",
    "restart_attempts": 3,
    "restart_delay": "10s",

    # 检查点配置
    "checkpointing_enabled": True,
    "checkpoint_interval": "30s",
    "checkpoint_timeout": "10m",

    # 日志配置
    "log_level": "INFO",
    "log_file": "./logs/sage.log"
}

env = LocalEnvironment("my_app", config=config)

高级配置

# 性能调优配置
performance_config = {
    "network_buffer_size": "64mb",
    "sort_buffer_size": "64mb", 
    "hash_table_size": "1gb",
    "managed_memory_fraction": 0.7,
    "network_memory_fraction": 0.1,
    "jvm_heap_size": "2g"
}

# 安全配置  
security_config = {
    "security_enabled": True,
    "kerberos_principal": "sage@REALM.COM",
    "ssl_enabled": True,
    "ssl_keystore": "./ssl/keystore.jks"
}

📊 数据源创建

批处理数据源

# 从集合创建
stream = env.from_batch([1, 2, 3, 4, 5])

# 从文件创建
stream = env.from_text_file("./data/input.txt")

# 从多个文件创建
stream = env.from_text_files("./data/*.txt")

流数据源

# Kafka数据源
stream = env.from_kafka_source(
    bootstrap_servers="localhost:9092",
    topic="my_topic",
    group_id="my_consumer_group",
    auto_offset_reset="latest"
)

# Socket数据源
stream = env.from_socket_text_stream("localhost", 9999)

# 自定义数据源
class MySource(SourceFunction[str]):
    def run(self, ctx):
        for i in range(100):
            ctx.emit(f"Message {i}")
            time.sleep(1)

stream = env.add_source(MySource())

🛠️ 服务管理

服务注册

# 注册服务类
env.register_service("cache", RedisCacheService, 
                    host="localhost", port=6379)

# 注册服务工厂
from sage.middleware import create_kv_service_factory

kv_factory = create_kv_service_factory("my_kv", backend_type="memory")
env.register_service_factory("my_kv", kv_factory)

服务使用

# 在处理函数中使用服务
class ProcessFunction(MapFunction[str, str]):
    def map(self, value: str) -> str:
        # 获取服务代理
        cache = self.get_runtime_context().get_service("cache")

        # 使用服务
        result = cache.get(value)
        if result is None:
            result = expensive_computation(value)
            cache.put(value, result)

        return result

stream.map(ProcessFunction())

🚀 任务提交和管理

提交任务

# 同步提交 (阻塞)
env.submit()

# 异步提交 (非阻塞)
job_id = env.submit_async()

# 带参数提交
env.submit(
    job_name="my_processing_job",
    save_point_path="./savepoints/sp_001",
    allow_non_restored_state=False
)

任务控制

# 停止任务
env.stop()

# 取消任务
env.cancel()

# 暂停任务
env.pause()

# 恢复任务
env.resume()

# 创建保存点
savepoint_path = env.create_savepoint()

# 从保存点恢复
env.restore_from_savepoint("./savepoints/sp_001")

📊 监控和调试

性能监控

# 启用指标收集
env.enable_metrics(
    reporters=["jmx", "prometheus"],
    interval="10s"
)

# 自定义指标
counter = env.get_metric_group().counter("my_counter")
histogram = env.get_metric_group().histogram("my_histogram")

class MyMapFunction(MapFunction[str, str]):
    def map(self, value: str) -> str:
        counter.inc()  # 增加计数器

        start_time = time.time()
        result = process(value)
        histogram.update(time.time() - start_time)  # 记录处理时间

        return result

日志配置

# 配置日志
env.set_log_level("DEBUG")
env.set_log_file("./logs/my_app.log")

# 结构化日志
logger = env.get_logger("MyFunction")
logger.info("Processing record", extra={"record_id": 123})

🔧 最佳实践

1. 环境生命周期管理

def main():
    env = None
    try:
        env = LocalEnvironment("my_app")

        # 构建数据流管道
        stream = env.from_batch(data)
        stream.map(process).sink(output)

        # 提交执行
        env.submit()

    except Exception as e:
        logger.error(f"Job failed: {e}")
    finally:
        if env:
            env.close()  # 确保资源清理

2. 配置外部化

# config.yaml
parallelism: 4
buffer_size: 10000
checkpoint_interval: 30s

# Python代码
import yaml

with open("config.yaml") as f:
    config = yaml.safe_load(f)

env = LocalEnvironment("my_app", config=config)

3. 错误处理

# 设置重启策略
env.set_restart_strategy(
    strategy="exponential-delay",
    max_attempts=5,
    initial_delay="1s",
    max_delay="1m",
    backoff_multiplier=2.0
)

# 自定义错误处理
class ErrorHandler(ProcessFunction[str, str]):
    def process(self, value: str, ctx: ProcessContext) -> str:
        try:
            return risky_operation(value)
        except Exception as e:
            # 发送到错误流
            ctx.output_to_side("errors", f"Error: {e}, Value: {value}")
            return None  # 过滤掉错误数据

main_stream, error_stream = stream.process(ErrorHandler()).split()

4. 资源优化

# 合理设置并行度
env.set_parallelism(min(cpu_count(), len(input_partitions)))

# 启用对象重用
env.enable_object_reuse()

# 配置内存管理
env.set_managed_memory_fraction(0.7)
env.set_network_memory_fraction(0.1)

📚 相关文档