跳转至

Best Practices

Recommended patterns and practices for sage-development.

Pipeline Design

1. Single Responsibility

Keep operators focused:

# ❌ Bad: Operator does too much
class ProcessEverything(MapFunction):
    def map(self, record):
        # Load data, transform, validate, save...
        pass


# ✅ Good: Separate concerns
class LoadData(MapFunction):
    def map(self, record):
        return load_data(record)


class TransformData(MapFunction):
    def map(self, record):
        return transform(record)


class ValidateData(FilterFunction):
    def filter(self, record):
        return is_valid(record)

2. Stateless Operators

Avoid shared state:

# ❌ Bad: Shared state between records
class StatefulOperator(MapFunction):
    def __init__(self):
        self.cache = {}  # Shared across records

    def map(self, record):
        if record.id in self.cache:
            return self.cache[record.id]
        result = process(record)
        self.cache[record.id] = result
        return result


# ✅ Good: Stateless processing
class StatelessOperator(MapFunction):
    def map(self, record):
        return process(record)

3. Clear Data Flow

Make pipelines readable:

# ✅ Good: Clear, linear flow
stream = (
    env.from_source(source)
    .map(load_operator)
    .filter(validate_operator)
    .map(transform_operator)
    .map(enrich_operator)
    .to_sink(sink)
)

# Better: Named intermediate streams
loaded = env.from_source(source).map(load_operator)
validated = loaded.filter(validate_operator)
transformed = validated.map(transform_operator)
enriched = transformed.map(enrich_operator)
enriched.to_sink(sink)

4. Error Handling

Handle errors gracefully:

class RobustOperator(MapFunction):
    def map(self, record):
        try:
            return process(record)
        except ValueError as e:
            # Log and skip invalid records
            logger.warning(f"Invalid record: {e}")
            return None
        except Exception as e:
            # Log and raise unexpected errors
            logger.error(f"Unexpected error: {e}")
            raise

Performance Optimization

1. Batch Processing

Process records in batches:

class BatchOperator(MapFunction):
    def __init__(self, batch_size=32):
        self.batch_size = batch_size
        self.buffer = []

    def map(self, record):
        self.buffer.append(record)
        if len(self.buffer) >= self.batch_size:
            results = process_batch(self.buffer)
            self.buffer = []
            return results
        return None

2. Parallel Execution

Use parallelism wisely:

# CPU-bound operations
stream.map(cpu_intensive_op, parallelism=8)

# I/O-bound operations
stream.map(io_intensive_op, parallelism=16)

# GPU operations
stream.map(gpu_op, parallelism=1)  # Typically 1 per GPU

3. Resource Management

Manage resources carefully:

class ResourceAwareOperator(MapFunction):
    def open(self, context):
        # Initialize expensive resources once
        self.model = load_model()
        self.db_connection = connect_db()

    def map(self, record):
        # Reuse resources
        return self.model.process(record)

    def close(self):
        # Clean up resources
        self.db_connection.close()

4. Caching

Cache expensive computations:

from functools import lru_cache


class CachingOperator(MapFunction):
    @lru_cache(maxsize=1000)
    def expensive_computation(self, key):
        return compute(key)

    def map(self, record):
        return self.expensive_computation(record.key)

Code Organization

1. Project Structure

Organize your SAGE project:

my-sage-app/
├── src/
│   ├── __init__.py
│   ├── operators/
│   │   ├── __init__.py
│   │   ├── load.py
│   │   ├── transform.py
│   │   └── validate.py
│   ├── pipelines/
│   │   ├── __init__.py
│   │   └── main_pipeline.py
│   ├── sources/
│   │   └── data_source.py
│   └── sinks/
│       └── output_sink.py
├── tests/
│   ├── test_operators.py
│   └── test_pipeline.py
├── config/
│   └── production.yaml
├── requirements.txt
└── main.py

2. Configuration Management

Separate code from configuration:

# config/settings.py
from pydantic import BaseModel


class PipelineConfig(BaseModel):
    batch_size: int = 32
    parallelism: int = 4
    checkpoint_interval: float = 60.0


# main.py
from config.settings import PipelineConfig

config = PipelineConfig()
stream.map(operator, parallelism=config.parallelism)

3. Reusable Components

Create reusable operator libraries:

# src/operators/base.py
class BaseOperator(MapFunction):
    def __init__(self, config):
        self.config = config

    def open(self, context):
        self.setup()

    def setup(self):
        """Override in subclasses"""
        pass


# src/operators/llm.py
class LLMOperator(BaseOperator):
    def setup(self):
        self.client = OpenAI(api_key=self.config.api_key)

    def map(self, record):
        return self.client.chat.completions.create(
            model=self.config.model, messages=[{"role": "user", "content": record.text}]
        )

Testing

1. Unit Tests

Test operators in isolation:

import pytest
from src.operators.transform import TransformOperator


def test_transform_operator():
    operator = TransformOperator()

    # Test with valid input
    result = operator.map({"value": 10})
    assert result["value"] == 20

    # Test with invalid input
    with pytest.raises(ValueError):
        operator.map({"value": -1})

2. Integration Tests

Test pipelines end-to-end:

def test_pipeline():
    env = LocalStreamEnvironment("test")

    # Create test data
    test_data = [{"id": 1}, {"id": 2}]
    source = ListSource(test_data)
    sink = CollectSink()

    # Build and execute pipeline
    stream = env.from_source(source).map(TransformOperator()).to_sink(sink)

    env.execute()

    # Verify results
    results = sink.get_results()
    assert len(results) == 2

3. Mock External Services

Mock LLMs and APIs:

from unittest.mock import Mock, patch


def test_llm_operator():
    with patch("openai.OpenAI") as mock_openai:
        mock_client = Mock()
        mock_openai.return_value = mock_client
        mock_client.chat.completions.create.return_value = Mock(
            choices=[Mock(message=Mock(content="Test response"))]
        )

        operator = LLMOperator()
        result = operator.map({"text": "Test input"})
        assert result == "Test response"

4. Test Data Management

Use conftest.py for test fixtures:

# tests/conftest.py
import pytest


@pytest.fixture(scope="session")
def test_data_dir(tmp_path_factory):
    data_dir = tmp_path_factory.mktemp("data")
    # Create test data
    (data_dir / "test.csv").write_text("id,value\n1,10\n2,20\n")
    return data_dir


# tests/test_pipeline.py
def test_with_real_data(test_data_dir):
    source = CSVSource(test_data_dir / "test.csv")
    # Test with real data structure

Debugging

1. Logging

Use structured logging:

import logging

logger = logging.getLogger(__name__)


class DebugOperator(MapFunction):
    def map(self, record):
        logger.info(
            "Processing record", extra={"record_id": record.id, "stage": "transform"}
        )
        try:
            result = process(record)
            logger.debug(
                "Processed successfully",
                extra={"record_id": record.id, "output_size": len(result)},
            )
            return result
        except Exception as e:
            logger.error(
                "Processing failed",
                extra={"record_id": record.id, "error": str(e)},
                exc_info=True,
            )
            raise

2. Profiling

Profile performance:

import cProfile
import pstats


def profile_pipeline():
    profiler = cProfile.Profile()
    profiler.enable()

    # Run pipeline
    env.execute()

    profiler.disable()
    stats = pstats.Stats(profiler)
    stats.sort_stats("cumulative")
    stats.print_stats(20)

3. Checkpointing

Enable checkpointing for debugging:

env = LocalStreamEnvironment(
    "debug_app",
    config={
        "fault_tolerance": {
            "strategy": "checkpoint",
            "checkpoint_interval": 10.0,  # Checkpoint frequently
            "checkpoint_dir": "./checkpoints",
        }
    },
)

4. Incremental Development

Develop incrementally:

# 1. Start with simple pipeline
stream = env.from_source(source).to_sink(sink)
env.execute()  # Verify data flow

# 2. Add one operator at a time
stream = env.from_source(source).map(op1).to_sink(sink)
env.execute()  # Verify op1

# 3. Continue adding operators
stream = env.from_source(source).map(op1).map(op2).to_sink(sink)
env.execute()  # Verify op1 + op2

LLM Integration

1. Prompt Engineering

Structure prompts clearly:

class LLMPromptOperator(MapFunction):
    PROMPT_TEMPLATE = """
Task: {task}

Context:
{context}

Input:
{input}

Instructions:
- Be concise
- Use examples
- Cite sources

Output:
"""

    def map(self, record):
        prompt = self.PROMPT_TEMPLATE.format(
            task=record.task, context=record.context, input=record.input
        )
        return self.llm.generate(prompt)

2. Rate Limiting

Respect API rate limits:

import time
from threading import Lock


class RateLimitedOperator(MapFunction):
    def __init__(self, calls_per_second=10):
        self.min_interval = 1.0 / calls_per_second
        self.last_call = 0
        self.lock = Lock()

    def map(self, record):
        with self.lock:
            elapsed = time.time() - self.last_call
            if elapsed < self.min_interval:
                time.sleep(self.min_interval - elapsed)
            self.last_call = time.time()

        return self.llm.generate(record.text)

3. Cost Monitoring

Track API costs:

class CostAwareOperator(MapFunction):
    def __init__(self):
        self.total_tokens = 0
        self.total_cost = 0.0

    def map(self, record):
        response = self.llm.generate(record.text)

        # Track usage
        tokens = response.usage.total_tokens
        cost = tokens * 0.000002  # $0.002 per 1K tokens

        self.total_tokens += tokens
        self.total_cost += cost

        if self.total_cost > 10.0:  # Alert if cost > $10
            logger.warning(f"High cost: ${self.total_cost:.2f}")

        return response.text

See Also