跳转至

基础教程

欢迎来到SAGE的基础教程!本教程将通过一系列循序渐进的示例,帮助您掌握SAGE数据流编程的基础知识。每个示例都包含完整的代码和详细的解释。

🎯 学习目标

完成本教程后,您将能够: - 理解SAGE数据流编程的基本概念 - 使用SAGE API构建简单的数据处理流水线 - 掌握常用的数据转换操作 - 了解如何处理不同类型的数据源和输出

📋 准备工作

安装SAGE

pip install sage-framework

导入必要的模块

from sage.core.api.local_environment import LocalEnvironment
from sage.core.api.remote_environment import RemoteEnvironment
from sage.lib.io_utils.source import FileSource, ListSource
from sage.lib.io_utils.sink import FileSink, ConsoleSink
import time

📚 教程1:第一个数据流程序

让我们从最简单的例子开始 - 处理一个数字列表:

def tutorial_01_hello_sage():
    """第一个SAGE程序:处理数字列表"""

    # 创建本地执行环境
    env = LocalEnvironment()

    # 创建数据源
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    # 构建数据流水线
    result = (env
        .from_batch(numbers)                    # 从列表创建数据流
         - [数据流开发指南](../guides/dataflow_development.md) - 更深入的开发技巧
   - [自定义算子开发](../guides/custom_operators.md) - 扩展SAGE功能map(lambda x: x * 2)                   # 每个数字乘以2
        .filter(lambda x: x > 10)               # 过滤大于10的数字
        .map(lambda x: f"Result: {x}")          # 格式化输出
        .sink(ConsoleSink())                    # 输出到控制台
    )

    # 执行流水线
    env.submit()
    print("第一个SAGE程序执行完成!")

# 运行示例
tutorial_01_hello_sage()

输出结果:

Result: 12
Result: 14
Result: 16
Result: 18
Result: 20
第一个SAGE程序执行完成!

解释: - from_batch(): 从Python列表创建数据流 - map(): 对每个数据项应用转换函数 - filter(): 根据条件过滤数据 - sink(): 指定数据输出目标 - submit(): 开始执行流水线

📚 教程2:文件处理

处理文本文件是常见需求,让我们学习如何读取和处理文件:

def tutorial_02_file_processing():
    """文件处理:读取文本文件并统计单词"""

    # 准备测试文件
    test_content = """Hello World
    This is SAGE tutorial
    We are learning dataflow programming
    SAGE makes data processing easy"""

    with open("test_input.txt", "w") as f:
        f.write(test_content)

    # 创建环境
    env = LocalEnvironment()

    # 文本处理函数
    def process_line(line):
        """处理单行文本"""
        # 清理和分词
        words = line.strip().lower().split()
        return {
            'line': line.strip(),
            'word_count': len(words),
            'words': words
        }

    def extract_words(processed_line):
        """提取单词列表"""
        return processed_line['words']

    # 构建文件处理流水线
    word_stream = (env
        .from_source(FileSource, "test_input.txt")  # 读取文件
        .filter(lambda line: line.strip())          # 过滤空行
        .map(process_line)                          # 处理每一行
        .flatmap(extract_words)                     # 展开单词列表
        .filter(lambda word: len(word) > 2)         # 过滤短单词
        .sink(FileSink, "processed_words.txt")      # 输出到文件
    )

    # 执行流水线
    env.submit()

    # 读取并显示结果
    with open("processed_words.txt", "r") as f:
        words = f.readlines()
        print(f"处理完成,共提取 {len(words)} 个单词")
        print("前10个单词:", words[:10])

# 运行示例
tutorial_02_file_processing()

新概念: - from_source(): 从文件源创建数据流 - flatmap(): 一对多映射,将列表展开为多个数据项 - FileSink: 文件输出汇

📚 教程3:数据聚合和统计

学习如何对数据进行聚合计算:

def tutorial_03_data_aggregation():
    """数据聚合:统计分析示例"""

    # 模拟用户行为数据
    user_events = [
        {'user_id': 'user1', 'action': 'click', 'timestamp': 1000, 'value': 10},
        {'user_id': 'user2', 'action': 'view', 'timestamp': 1001, 'value': 5},
        {'user_id': 'user1', 'action': 'purchase', 'timestamp': 1002, 'value': 100},
        {'user_id': 'user3', 'action': 'click', 'timestamp': 1003, 'value': 8},
        {'user_id': 'user2', 'action': 'click', 'timestamp': 1004, 'value': 15},
        {'user_id': 'user1', 'action': 'view', 'timestamp': 1005, 'value': 3},
    ]

    env = LocalEnvironment()

    # 用户活动统计函数
    def aggregate_by_user(events):
        """按用户聚合统计"""
        from collections import defaultdict

        user_stats = defaultdict(lambda: {
            'user_id': '', 'total_events': 0, 'total_value': 0, 'actions': []
        })

        for event in events:
            user_id = event['user_id']
            user_stats[user_id]['user_id'] = user_id
            user_stats[user_id]['total_events'] += 1
            user_stats[user_id]['total_value'] += event['value']
            user_stats[user_id]['actions'].append(event['action'])

        return list(user_stats.values())

    def format_user_stats(stats):
        """格式化用户统计信息"""
        return {
            'user_id': stats['user_id'],
            'total_events': stats['total_events'],
            'total_value': stats['total_value'],
            'avg_value': stats['total_value'] / stats['total_events'],
            'unique_actions': len(set(stats['actions']))
        }

    # 构建聚合流水线
    user_stats_stream = (env
        .from_batch(user_events)                    # 加载事件数据
        .keyBy(lambda event: event['user_id'])      # 按用户ID分组
        .window(count_window=100)                   # 窗口聚合(这里用计数窗口)
        .reduce(lambda acc, event: acc + [event] if isinstance(acc, list) else [acc, event])  # 收集事件
        .map(aggregate_by_user)                     # 聚合统计
        .flatmap(lambda user_list: user_list)       # 展开用户列表
        .map(format_user_stats)                     # 格式化结果
        .sink(ConsoleSink())                        # 输出结果
    )

    env.submit()

# 运行示例
tutorial_03_data_aggregation()

新概念: - keyBy(): 按键分组数据流 - window(): 创建时间或计数窗口 - reduce(): 聚合操作

📚 教程4:异步处理

学习如何处理需要异步操作的场景:

import asyncio
import aiohttp
import json

def tutorial_04_async_processing():
    """异步处理:模拟API调用"""

    # 模拟API处理函数
    class MockAPIProcessor:
        def __init__(self, delay=0.1):
            self.delay = delay

        async def __call__(self, data):
            """异步处理函数"""
            # 模拟API延迟
            await asyncio.sleep(self.delay)

            # 模拟API响应
            response = {
                'input': data,
                'processed': data * 2 if isinstance(data, (int, float)) else f"processed_{data}",
                'timestamp': time.time(),
                'status': 'success'
            }

            return response

    # 批量处理函数
    async def batch_api_call(batch_data):
        """批量API调用"""
        processor = MockAPIProcessor(delay=0.05)  # 批处理延迟更短
        results = []

        # 并行处理批次中的所有项目
        tasks = [processor(item) for item in batch_data]
        results = await asyncio.gather(*tasks)

        return results

    env = LocalEnvironment()

    # 测试数据
    test_data = list(range(1, 21))  # 1到20的数字

    # 构建异步处理流水线
    async_stream = (env
        .from_batch(test_data)                      # 加载测试数据
        .batch(size=5)                              # 5个一批
        .map(batch_api_call)                        # 批量异步处理
        .flatmap(lambda batch_result: batch_result) # 展开批处理结果
        .filter(lambda result: result['status'] == 'success')  # 过滤成功结果
        .map(lambda result: {                       # 提取有用信息
            'input': result['input'],
            'output': result['processed'],
            'processing_time': time.time() - result['timestamp']
        })
        .sink(ConsoleSink())                        # 输出结果
    )

    print("开始异步处理...")
    start_time = time.time()
    env.submit()
    end_time = time.time()

    print(f"异步处理完成,总用时: {end_time - start_time:.2f}秒")

# 运行示例
tutorial_04_async_processing()

新概念: - 异步函数:使用async def定义的处理函数 - batch(): 将数据打包成批次 - 并行处理:使用asyncio.gather()并行执行多个异步任务

📚 教程5:错误处理

学习如何在数据流中处理错误和异常:

def tutorial_05_error_handling():
    """错误处理:容错数据处理"""

    # 可能出错的处理函数
    def risky_processor(data):
        """可能抛出异常的处理器"""
        if data % 3 == 0:  # 模拟处理失败
            raise ValueError(f"处理失败: {data}")
        return data * 10

    # 容错处理函数
    def safe_processor(data):
        """安全的处理器"""
        try:
            result = risky_processor(data)
            return {'status': 'success', 'data': data, 'result': result}
        except Exception as e:
            return {'status': 'error', 'data': data, 'error': str(e)}

    # 重试处理器
    def retry_processor(data, max_retries=2):
        """带重试的处理器"""
        for attempt in range(max_retries + 1):
            try:
                result = risky_processor(data)
                return {'status': 'success', 'data': data, 'result': result, 'attempts': attempt + 1}
            except Exception as e:
                if attempt == max_retries:
                    return {'status': 'failed', 'data': data, 'error': str(e), 'attempts': attempt + 1}
                time.sleep(0.1 * (attempt + 1))  # 指数退避

    env = LocalEnvironment()

    # 测试数据(包含会导致错误的数据)
    test_data = list(range(1, 16))  # 3, 6, 9, 12, 15 会出错

    print("=== 基本错误处理示例 ===")
    basic_error_stream = (env
        .from_batch(test_data)
        .map(safe_processor)                        # 安全处理
        .sink(ConsoleSink())
    )
    env.submit()

    print("\n=== 重试机制示例 ===")
    env = LocalEnvironment()  # 创建新环境
    retry_stream = (env
        .from_batch(test_data)
        .map(retry_processor)                       # 重试处理
        .sink(ConsoleSink())
    )
    env.submit()

    print("\n=== 错误过滤和统计 ===")
    env = LocalEnvironment()  # 创建新环境

    def collect_stats(results):
        """统计处理结果"""
        success_count = len([r for r in results if r['status'] == 'success'])
        error_count = len([r for r in results if r['status'] == 'error'])

        return {
            'total': len(results),
            'success': success_count,
            'errors': error_count,
            'success_rate': success_count / len(results) if results else 0
        }

    stats_stream = (env
        .from_batch(test_data)
        .map(safe_processor)                        # 安全处理
        .collect()                                  # 收集所有结果
        .map(collect_stats)                         # 统计
        .sink(ConsoleSink())
    )
    env.submit()

# 运行示例
tutorial_05_error_handling()

新概念: - 异常处理:在处理函数中捕获和处理异常 - 重试机制:对失败的操作进行重试 - collect(): 收集所有数据项到一个列表中

📚 教程6:多流连接

学习如何处理多个数据流的连接:

def tutorial_06_stream_joining():
    """多流连接:用户数据关联"""

    # 用户基本信息
    users = [
        {'user_id': 1, 'name': 'Alice', 'age': 25},
        {'user_id': 2, 'name': 'Bob', 'age': 30},
        {'user_id': 3, 'name': 'Charlie', 'age': 35},
    ]

    # 用户订单信息
    orders = [
        {'order_id': 101, 'user_id': 1, 'amount': 100, 'product': 'Book'},
        {'order_id': 102, 'user_id': 2, 'amount': 200, 'product': 'Phone'},
        {'order_id': 103, 'user_id': 1, 'amount': 50, 'product': 'Coffee'},
        {'order_id': 104, 'user_id': 4, 'amount': 300, 'product': 'Laptop'},  # 用户不存在
    ]

    env = LocalEnvironment()

    # 创建用户流和订单流
    user_stream = env.from_batch(users)
    order_stream = env.from_batch(orders)

    # 连接两个流
    def join_user_orders(user_data, order_data):
        """连接用户和订单数据"""
        user_dict = {u['user_id']: u for u in user_data}

        joined_results = []
        for order in order_data:
            user_id = order['user_id']
            if user_id in user_dict:
                user = user_dict[user_id]
                joined_results.append({
                    'user_id': user_id,
                    'user_name': user['name'],
                    'user_age': user['age'],
                    'order_id': order['order_id'],
                    'amount': order['amount'],
                    'product': order['product']
                })
            else:
                # 处理找不到用户的订单
                joined_results.append({
                    'user_id': user_id,
                    'user_name': 'Unknown',
                    'user_age': None,
                    'order_id': order['order_id'],
                    'amount': order['amount'],
                    'product': order['product'],
                    'status': 'orphaned_order'
                })

        return joined_results

    # 使用connect进行流连接
    connected_stream = user_stream.connect(order_stream)

    # 处理连接后的数据
    result_stream = (connected_stream
        .comap(join_user_orders)                    # 协同处理两个流的数据
        .flatmap(lambda results: results)          # 展开结果列表
        .sink(ConsoleSink())                        # 输出结果
    )

    print("=== 流连接示例 ===")
    env.submit()

    # 另一种连接方式:使用窗口连接
    print("\n=== 窗口连接示例 ===")
    env = LocalEnvironment()

    # 模拟时间序列数据
    user_actions = [
        {'user_id': 1, 'action': 'login', 'timestamp': 1000},
        {'user_id': 2, 'action': 'view_product', 'timestamp': 1001},
        {'user_id': 1, 'action': 'add_to_cart', 'timestamp': 1002},
    ]

    user_profiles = [
        {'user_id': 1, 'segment': 'premium', 'timestamp': 999},
        {'user_id': 2, 'segment': 'regular', 'timestamp': 1000},
    ]

    action_stream = env.from_batch(user_actions)
    profile_stream = env.from_batch(user_profiles)

    # 基于时间窗口的连接
    def time_window_join(actions, profiles):
        """基于时间窗口连接用户行为和档案"""
        profile_dict = {p['user_id']: p for p in profiles}

        enriched_actions = []
        for action in actions:
            user_id = action['user_id']
            if user_id in profile_dict:
                profile = profile_dict[user_id]
                # 简单的时间匹配(实际应用中会更复杂)
                if abs(action['timestamp'] - profile['timestamp']) <= 10:
                    enriched_actions.append({
                        **action,
                        'user_segment': profile['segment']
                    })

        return enriched_actions

    windowed_stream = (action_stream.connect(profile_stream)
        .comap(time_window_join)
        .flatmap(lambda results: results)
        .sink(ConsoleSink())
    )

    env.submit()

# 运行示例
tutorial_06_stream_joining()

新概念: - connect(): 连接两个数据流 - comap(): 协同处理连接的数据流 - 时间窗口连接:基于时间戳进行数据关联

📚 教程7:状态管理

学习如何在数据流中维护状态:

def tutorial_07_stateful_processing():
    """有状态处理:运行统计和会话管理"""

    # 有状态计数器
    class StatefulCounter:
        def __init__(self):
            self.counts = {}

        def __call__(self, item):
            key = item.get('category', 'default')
            self.counts[key] = self.counts.get(key, 0) + 1

            return {
                'item': item,
                'category_count': self.counts[key],
                'total_categories': len(self.counts)
            }

    # 滑动窗口平均
    class MovingAverageCalculator:
        def __init__(self, window_size=5):
            self.window_size = window_size
            self.window = []

        def __call__(self, value):
            self.window.append(value)
            if len(self.window) > self.window_size:
                self.window.pop(0)  # 移除最旧的元素

            avg = sum(self.window) / len(self.window)
            return {
                'value': value,
                'moving_average': avg,
                'window_size': len(self.window)
            }

    # 会话管理器
    class SessionManager:
        def __init__(self, timeout=10):
            self.sessions = {}
            self.timeout = timeout

        def __call__(self, event):
            user_id = event['user_id']
            current_time = event['timestamp']

            # 检查现有会话
            if user_id in self.sessions:
                last_activity = self.sessions[user_id]['last_activity']
                if current_time - last_activity > self.timeout:
                    # 会话超时,开始新会话
                    self.sessions[user_id] = {
                        'session_id': f"session_{current_time}",
                        'start_time': current_time,
                        'last_activity': current_time,
                        'event_count': 1
                    }
                else:
                    # 更新现有会话
                    self.sessions[user_id]['last_activity'] = current_time
                    self.sessions[user_id]['event_count'] += 1
            else:
                # 新用户会话
                self.sessions[user_id] = {
                    'session_id': f"session_{current_time}",
                    'start_time': current_time,
                    'last_activity': current_time,
                    'event_count': 1
                }

            session_info = self.sessions[user_id].copy()
            session_info.update({
                'user_id': user_id,
                'event': event['action'],
                'session_duration': current_time - session_info['start_time']
            })

            return session_info

    env = LocalEnvironment()

    print("=== 有状态计数器示例 ===")
    counter = StatefulCounter()

    # 测试数据
    items = [
        {'name': 'item1', 'category': 'A'},
        {'name': 'item2', 'category': 'B'},
        {'name': 'item3', 'category': 'A'},
        {'name': 'item4', 'category': 'C'},
        {'name': 'item5', 'category': 'A'},
        {'name': 'item6', 'category': 'B'},
    ]

    counter_stream = (env
        .from_batch(items)
        .map(counter)                               # 有状态计数
        .sink(ConsoleSink())
    )
    env.submit()

    print("\n=== 滑动窗口平均示例 ===")
    env = LocalEnvironment()
    moving_avg = MovingAverageCalculator(window_size=3)

    values = [10, 15, 20, 12, 18, 25, 8, 30]

    avg_stream = (env
        .from_batch(values)
        .map(moving_avg)                            # 滑动窗口平均
        .sink(ConsoleSink())
    )
    env.submit()

    print("\n=== 会话管理示例 ===")
    env = LocalEnvironment()
    session_mgr = SessionManager(timeout=5)

    # 模拟用户事件(包含会话超时)
    events = [
        {'user_id': 'user1', 'action': 'login', 'timestamp': 1000},
        {'user_id': 'user1', 'action': 'view', 'timestamp': 1002},
        {'user_id': 'user2', 'action': 'login', 'timestamp': 1003},
        {'user_id': 'user1', 'action': 'click', 'timestamp': 1004},
        {'user_id': 'user1', 'action': 'purchase', 'timestamp': 1015},  # 超时,新会话
        {'user_id': 'user2', 'action': 'logout', 'timestamp': 1016},
    ]

    session_stream = (env
        .from_batch(events)
        .map(session_mgr)                           # 会话管理
        .sink(ConsoleSink())
    )
    env.submit()

# 运行示例
tutorial_07_stateful_processing()

新概念: - 有状态处理:处理函数维护内部状态 - 滑动窗口:维护固定大小的历史数据窗口 - 会话管理:基于时间和用户行为管理会话状态

📚 教程8:综合示例 - 简单的RAG系统

现在让我们构建一个综合示例,展示如何使用SAGE构建一个简单的RAG(检索增强生成)系统:

def tutorial_08_simple_rag_system():
    """综合示例:简单的RAG系统"""

    # 模拟知识库
    knowledge_base = [
        {"id": 1, "content": "Python是一种高级编程语言", "category": "programming"},
        {"id": 2, "content": "机器学习是人工智能的一个分支", "category": "ai"},
        {"id": 3, "content": "数据流编程是一种编程范式", "category": "programming"},
        {"id": 4, "content": "SAGE是一个数据流处理框架", "category": "framework"},
        {"id": 5, "content": "异步编程可以提高程序性能", "category": "programming"},
    ]

    # 查询嵌入器(简化版)
    class SimpleEmbedder:
        def __call__(self, text):
            """简单的文本嵌入(使用关键词匹配)"""
            keywords = text.lower().split()
            return {
                'text': text,
                'keywords': keywords,
                'embedding': keywords  # 简化:直接使用关键词作为嵌入
            }

    # 检索器
    class SimpleRetriever:
        def __init__(self, knowledge_base, top_k=2):
            self.knowledge_base = knowledge_base
            self.top_k = top_k

        def __call__(self, query_data):
            """检索相关文档"""
            query_keywords = set(query_data['keywords'])
            scores = []

            for doc in self.knowledge_base:
                doc_keywords = set(doc['content'].lower().split())
                # 简单的相似度计算(交集大小)
                similarity = len(query_keywords & doc_keywords)
                scores.append({
                    'document': doc,
                    'similarity': similarity
                })

            # 按相似度排序并返回top-k
            scores.sort(key=lambda x: x['similarity'], reverse=True)
            top_docs = scores[:self.top_k]

            return {
                'query': query_data['text'],
                'retrieved_docs': [item['document'] for item in top_docs],
                'similarities': [item['similarity'] for item in top_docs]
            }

    # 生成器
    class SimpleGenerator:
        def __call__(self, retrieval_result):
            """基于检索结果生成回答"""
            query = retrieval_result['query']
            docs = retrieval_result['retrieved_docs']

            if not docs or all(sim == 0 for sim in retrieval_result['similarities']):
                return {
                    'query': query,
                    'answer': "抱歉,我无法找到相关信息来回答您的问题。",
                    'sources': []
                }

            # 简单的答案生成
            context = " ".join([doc['content'] for doc in docs])
            answer = f"根据相关资料:{context}。这些信息可能对您的问题 '{query}' 有所帮助。"

            return {
                'query': query,
                'answer': answer,
                'sources': [doc['id'] for doc in docs],
                'context_length': len(context)
            }

    # 查询质量过滤器
    def query_filter(query):
        """过滤查询质量"""
        return len(query.strip()) > 3 and not query.strip().isdigit()

    # 结果格式化器
    def format_response(result):
        """格式化最终响应"""
        return f"""
问题: {result['query']}
回答: {result['answer']}
参考文档: {', '.join(map(str, result['sources'])) if result['sources'] else '无'}
上下文长度: {result.get('context_length', 0)} 字符
---"""

    env = LocalEnvironment()

    # 测试查询
    queries = [
        "什么是Python?",
        "机器学习是什么",
        "SAGE框架介绍",
        "异步编程",
        "xyz",  # 低质量查询
        "123",  # 数字查询
    ]

    # 创建处理组件
    embedder = SimpleEmbedder()
    retriever = SimpleRetriever(knowledge_base, top_k=2)
    generator = SimpleGenerator()

    print("=== 简单RAG系统示例 ===")

    # 构建RAG处理流水线
    rag_pipeline = (env
        .from_batch(queries)                        # 输入查询
        .filter(query_filter)                       # 过滤低质量查询
        .map(embedder)                              # 查询嵌入
        .map(retriever)                             # 文档检索
        .map(generator)                             # 答案生成
        .map(format_response)                       # 格式化响应
        .sink(ConsoleSink())                        # 输出结果
    )

    env.submit()

    print("\n=== 带错误处理的RAG系统 ===")

    # 带错误处理的RAG系统
    def safe_rag_step(step_func, step_name):
        """为RAG步骤添加错误处理"""
        def wrapper(data):
            try:
                result = step_func(data)
                return {'status': 'success', 'data': result, 'step': step_name}
            except Exception as e:
                return {'status': 'error', 'error': str(e), 'step': step_name, 'input': data}
        return wrapper

    def extract_successful_data(result):
        """提取成功处理的数据"""
        if result['status'] == 'success':
            return result['data']
        else:
            print(f"步骤 {result['step']} 失败: {result['error']}")
            return None

    env = LocalEnvironment()
    safe_rag_pipeline = (env
        .from_batch(queries)
        .filter(query_filter)
        .map(safe_rag_step(embedder, "embedding"))
        .map(extract_successful_data)
        .filter(lambda x: x is not None)
        .map(safe_rag_step(retriever, "retrieval"))
        .map(extract_successful_data)
        .filter(lambda x: x is not None)
        .map(safe_rag_step(generator, "generation"))
        .map(extract_successful_data)
        .filter(lambda x: x is not None)
        .map(format_response)
        .sink(ConsoleSink())
    )

    env.submit()

# 运行示例
tutorial_08_simple_rag_system()

🎓 总结

恭喜您完成了SAGE基础教程!您已经学会了:

核心概念

  • 数据流编程模型:声明式的数据处理方式
  • 算子链式调用:使用map、filter、sink等构建处理流水线
  • 执行环境:LocalEnvironment的使用

基本操作

  • 数据源:from_batch、from_source等创建数据流
  • 转换操作:map、filter、flatmap等数据转换
  • 输出操作:sink到不同目标(控制台、文件等)

高级特性

  • 批处理:提升处理效率
  • 异步处理:处理I/O密集型操作
  • 错误处理:构建健壮的数据处理系统
  • 多流连接:处理复杂的数据关联
  • 状态管理:维护处理状态和上下文

实际应用

  • 文件处理:读取和处理文本文件
  • 数据聚合:统计和分析数据
  • RAG系统:检索增强生成的完整实现

📚 下一步学习

现在您可以:

  1. 深入学习
  2. 数据流开发指南 - 更深入的开发技巧
  3. 自定义算子开发 - 扩展系统功能
  4. 性能调优 - 优化系统性能

  5. 实践项目

  6. 构建更复杂的RAG系统
  7. 实现实时数据处理流水线
  8. 开发多模态数据处理应用

  9. 探索高级特性

  10. 分布式处理
  11. 可视化监控
  12. 生产环境部署

继续探索SAGE的强大功能,构建属于您的智能数据处理应用吧!