跳转至

快速开始指南

这个指南将帮助你在5分钟内上手 SAGE Kernel,开始你的第一个流数据处理应用。

📦 安装

使用 pip 安装

# 安装核心包
pip install intsage-kernel

# 安装开发依赖(可选)
pip install intsage-kernel[dev]

# 安装企业版功能(需要许可证)
pip install intsage-kernel[enterprise]

从源码安装

git clone https://github.com/intellistream/SAGE.git
cd SAGE/packages/sage-kernel
pip install -e .

🚀 第一个例子

1. 简单的数据转换

from sage.core.api.local_environment import LocalEnvironment

# 创建本地环境
env = LocalEnvironment("hello_sage")

# 创建数据流
numbers = env.from_batch([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# 数据转换:平方并过滤偶数
result = (
    numbers.map(lambda x: x * x)  # 平方: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
    .filter(lambda x: x % 2 == 0)  # 过滤偶数: [4, 16, 36, 64, 100]
    .sink(print)
)  # 输出结果

# 提交执行
env.submit()

2. 文本处理示例

from sage.core.api.local_environment import LocalEnvironment

env = LocalEnvironment("text_processing")

# 模拟文本数据
sentences = env.from_batch(
    [
        "Hello SAGE Kernel",
        "Stream processing made easy",
        "Build powerful data pipelines",
        "Real-time analytics with Python",
    ]
)

# 文本处理管道
word_count = (
    sentences.flat_map(lambda s: s.lower().split())  # 分词
    .map(lambda word: (word, 1))  # 转换为(word, 1)
    .key_by(lambda pair: pair[0])  # 按单词分组
    .reduce(lambda a, b: (a[0], a[1] + b[1]))  # 统计词频
    .sink(print)
)  # 输出结果

env.submit()

3. 实时数据处理

from sage.core.api.local_environment import LocalEnvironment
from sage.core.api.function import SourceFunction, SourceContext
import time
import random


# 自定义数据源
class SensorDataSource(SourceFunction[dict]):
    def __init__(self, sensor_id: str):
        self.sensor_id = sensor_id
        self.running = True

    def run(self, ctx: SourceContext[dict]):
        while self.running:
            # 模拟传感器数据
            data = {
                "sensor_id": self.sensor_id,
                "temperature": random.uniform(20.0, 35.0),
                "humidity": random.uniform(30.0, 80.0),
                "timestamp": int(time.time()),
            }
            ctx.emit(data)
            time.sleep(1)  # 每秒产生一条数据

    def cancel(self):
        self.running = False


# 创建环境
env = LocalEnvironment("sensor_monitoring")

# 添加数据源
sensor_stream = env.add_source(SensorDataSource("sensor_001"))

# 数据处理:检测异常温度
alerts = (
    sensor_stream.filter(lambda data: data["temperature"] > 30.0)  # 温度过高
    .map(
        lambda data: f"🚨 Alert: High temperature {data['temperature']:.1f}°C from {data['sensor_id']}"
    )
    .sink(print)
)

# 启动处理(运行5秒后停止)
env.submit()

🔧 基本概念

Environment (环境)

  • 管理计算资源和执行上下文
  • LocalEnvironment: 单机环境,适合开发测试
  • RemoteEnvironment: 分布式环境,适合生产部署

DataStream (数据流)

  • 代表数据的流动序列
  • 支持链式调用的转换操作
  • 延迟执行,构建计算图

Function (函数)

  • 用户定义的数据处理逻辑
  • 类型安全,支持泛型
  • 内置常用函数类型

🛠️ 常用操作

数据源创建

# 从集合创建
stream = env.from_batch([1, 2, 3, 4, 5])

# 从文件创建
stream = env.from_text_file("data.txt")

# 从多个文件创建
stream = env.from_text_files("data/*.txt")

# 自定义数据源
stream = env.add_source(MySourceFunction())

数据转换

# Map: 一对一转换
doubled = numbers.map(lambda x: x * 2)

# Filter: 过滤
evens = numbers.filter(lambda x: x % 2 == 0)

# FlatMap: 一对多转换
words = sentences.flat_map(lambda s: s.split())

# KeyBy: 分组
grouped = words.key_by(lambda word: word[0])  # 按首字母分组

# Reduce: 归约
sums = grouped.reduce(lambda a, b: a + b)

数据输出

# 打印输出
stream.sink(print)

# 文件输出
stream.sink_to_file("output.txt")

# 自定义输出
stream.sink(MySinkFunction())

📊 完整示例:网站访问日志分析

from sage.core.api.local_environment import LocalEnvironment
from sage.core.api.function import MapFunction, FilterFunction
import json
from datetime import datetime


class LogParser(MapFunction[str, dict]):
    """解析日志行"""

    def map(self, log_line: str) -> dict:
        try:
            # 假设是JSON格式的日志
            return json.loads(log_line)
        except:
            return {"error": True, "raw": log_line}


class ValidLogFilter(FilterFunction[dict]):
    """过滤有效日志"""

    def filter(self, log: dict) -> bool:
        return not log.get("error", False) and "url" in log


class PageViewExtractor(MapFunction[dict, tuple]):
    """提取页面访问信息"""

    def map(self, log: dict) -> tuple:
        return (log["url"], 1)


def main():
    env = LocalEnvironment("log_analysis")

    # 模拟日志数据
    log_lines = env.from_batch(
        [
            '{"timestamp": "2025-01-01T10:00:00", "url": "/home", "status": 200, "user_id": "user1"}',
            '{"timestamp": "2025-01-01T10:01:00", "url": "/about", "status": 200, "user_id": "user2"}',
            '{"timestamp": "2025-01-01T10:02:00", "url": "/home", "status": 200, "user_id": "user3"}',
            '{"timestamp": "2025-01-01T10:03:00", "url": "/contact", "status": 404, "user_id": "user1"}',
            "invalid log line",
            '{"timestamp": "2025-01-01T10:04:00", "url": "/home", "status": 200, "user_id": "user2"}',
        ]
    )

    # 日志处理管道
    page_views = (
        log_lines.map(LogParser())  # 解析日志
        .filter(ValidLogFilter())  # 过滤有效日志
        .filter(lambda log: log["status"] == 200)  # 只统计成功访问
        .map(PageViewExtractor())  # 提取页面信息
        .key_by(lambda pair: pair[0])  # 按URL分组
        .reduce(lambda a, b: (a[0], a[1] + b[1]))  # 统计访问次数
        .map(lambda pair: f"Page {pair[0]}: {pair[1]} views")
        .sink(print)
    )

    env.submit()


if __name__ == "__main__":
    main()

🎯 最佳实践

1. 错误处理

class RobustMapFunction(MapFunction[str, dict]):
    def map(self, input_str: str) -> dict:
        try:
            return json.loads(input_str)
        except Exception as e:
            # 返回错误信息而不是抛出异常
            return {"error": True, "message": str(e), "input": input_str}

2. 配置管理

# config.py
CONFIG = {"parallelism": 4, "buffer_size": 10000, "checkpoint_interval": "30s"}

# main.py
env = LocalEnvironment("my_app", config=CONFIG)

3. 日志记录

import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class LoggedMapFunction(MapFunction[str, str]):
    def map(self, value: str) -> str:
        logger.info(f"Processing: {value}")
        result = process(value)
        logger.info(f"Result: {result}")
        return result

4. 性能监控

import time


class TimedMapFunction(MapFunction[str, str]):
    def __init__(self):
        self.total_time = 0
        self.count = 0

    def map(self, value: str) -> str:
        start = time.time()
        result = expensive_operation(value)
        self.total_time += time.time() - start
        self.count += 1

        if self.count % 1000 == 0:
            avg_time = self.total_time / self.count
            print(f"Processed {self.count} items, avg time: {avg_time:.3f}s")

        return result

🚀 下一步

恭喜!你已经掌握了 SAGE Kernel 的基础用法。现在可以:

  1. 📖 阅读 API 详细文档
  2. 🌐 学习 核心概念
  3. ⚡ 了解 数据流开发指南
  4. 🔍 查看 更多示例

📞 获取帮助