SAGE Kernel Best Practices¶
This guide covers best practices for developing with SAGE Kernel, based on real-world usage patterns and performance considerations.
Architecture Best Practices¶
Environment Selection¶
Use LocalEnvironment for: - Development and testing - Single-machine deployments - Prototyping and experimentation - CPU-intensive tasks on local hardware
Use RemoteEnvironment for: - Production deployments - Distributed processing - GPU-accelerated workloads - High-availability requirements
# Development
dev_env = LocalEnvironment(config={
"debug": True,
"log_level": "DEBUG"
})
# Production
prod_env = RemoteEnvironment(
endpoint="https://api.sage-cluster.com",
config={
"timeout": 30,
"retry_attempts": 3,
"failover_enabled": True
}
)
Stream Design Patterns¶
1. Single Responsibility Streams¶
Keep each stream focused on a single type of data or operation:
# Good: Focused streams
user_events = env.create_stream("user_events")
system_metrics = env.create_stream("system_metrics")
error_logs = env.create_stream("error_logs")
# Avoid: Mixed-purpose streams
everything_stream = env.create_stream("everything") # Too broad
2. Hierarchical Stream Organization¶
Organize streams in a hierarchy for complex systems:
# Main data streams
raw_data = env.create_stream("raw_data")
# Processing stages
cleaned_data = raw_data.map(clean_data_func)
enriched_data = cleaned_data.map(enrich_func)
final_output = enriched_data.filter(quality_filter)
3. Error Stream Separation¶
Separate error handling into dedicated streams:
def process_with_errors(data):
try:
return process_data(data), None
except Exception as e:
return None, str(e)
results, errors = main_stream.map(process_with_errors).split()
errors.filter(lambda x: x is not None).sink(error_handler)
Performance Optimization¶
Memory Management¶
1. Stream Buffer Configuration¶
Configure appropriate buffer sizes based on your data volume:
# High-throughput streams
high_volume_stream = env.create_stream(
"high_volume",
config={
"buffer_size": 10000,
"buffer_policy": "drop_oldest"
}
)
# Low-latency streams
low_latency_stream = env.create_stream(
"low_latency",
config={
"buffer_size": 100,
"buffer_policy": "block"
}
)
2. Resource Cleanup¶
Always clean up resources properly:
try:
# Stream processing
stream = env.create_stream("data")
result = stream.process(processor_func)
finally:
# Cleanup
if stream:
stream.close()
env.cleanup()
3. Lazy Evaluation¶
Use lazy evaluation for large datasets:
# Good: Lazy evaluation
large_dataset = env.create_stream("large_data")
filtered = large_dataset.filter(filter_func)
result = filtered.take(1000) # Only process what's needed
# Avoid: Eager evaluation of large datasets
all_data = large_dataset.collect() # Loads everything into memory
Processing Optimization¶
1. Batch Processing¶
Process data in batches for better throughput:
def batch_processor(batch):
# Process multiple items together
return [process_item(item) for item in batch]
# Process in batches of 100
stream.batch(100).map(batch_processor)
2. Parallel Processing¶
Leverage parallelism for CPU-intensive tasks:
from concurrent.futures import ThreadPoolExecutor
def parallel_map(stream, func, max_workers=4):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
return stream.map_parallel(func, executor)
# Use for I/O bound operations
result = parallel_map(stream, io_intensive_func)
3. Pipeline Optimization¶
Optimize processing pipelines:
# Good: Combine operations
optimized = (stream
.filter(expensive_filter) # Filter early
.map(transform_func) # Transform remaining items
.batch(100) # Batch for efficiency
.map(batch_process) # Process batches
)
# Avoid: Inefficient ordering
inefficient = (stream
.map(expensive_transform) # Transform everything first
.filter(simple_filter) # Then filter (wasteful)
)
Error Handling and Resilience¶
Exception Management¶
1. Graceful Degradation¶
Implement graceful degradation for non-critical failures:
def resilient_processor(data):
try:
return expensive_operation(data)
except NonCriticalError as e:
logger.warning(f"Non-critical error: {e}")
return fallback_operation(data)
except CriticalError:
raise # Let critical errors bubble up
2. Circuit Breaker Pattern¶
Implement circuit breakers for external dependencies:
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = 0
self.state = "closed" # closed, open, half-open
def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.timeout:
self.state = "half-open"
else:
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise e
# Usage
circuit_breaker = CircuitBreaker()
result = stream.map(lambda x: circuit_breaker.call(external_api, x))
3. Retry Mechanisms¶
Implement intelligent retry mechanisms:
import time
import random
def retry_with_backoff(func, max_retries=3, base_delay=1):
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise e
# Exponential backoff with jitter
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)
return wrapper
# Apply to stream operations
reliable_processor = retry_with_backoff(unreliable_operation)
stream.map(reliable_processor)
Security Best Practices¶
Data Protection¶
1. Sensitive Data Handling¶
def secure_processor(data):
# Remove or mask sensitive fields
if "ssn" in data:
data["ssn"] = "***-**-" + data["ssn"][-4:]
if "credit_card" in data:
data["credit_card"] = mask_credit_card(data["credit_card"])
return data
# Apply security processing early in pipeline
secure_stream = raw_stream.map(secure_processor)
2. Input Validation¶
def validate_input(data):
required_fields = ["id", "timestamp", "value"]
# Check required fields
for field in required_fields:
if field not in data:
raise ValueError(f"Missing required field: {field}")
# Validate data types
if not isinstance(data["id"], str):
raise ValueError("ID must be string")
if not isinstance(data["value"], (int, float)):
raise ValueError("Value must be numeric")
return data
validated_stream = input_stream.map(validate_input)
Authentication and Authorization¶
def authenticate_request(request):
token = request.get("auth_token")
if not token or not verify_token(token):
raise AuthenticationError("Invalid token")
return request
def authorize_operation(request, required_permission):
user = get_user_from_token(request["auth_token"])
if not user.has_permission(required_permission):
raise AuthorizationError("Insufficient permissions")
return request
# Apply security checks
secure_stream = (input_stream
.map(authenticate_request)
.map(lambda req: authorize_operation(req, "data.read"))
.map(process_authorized_request)
)
Testing Strategies¶
Unit Testing¶
import unittest
from sage.core.api import LocalEnvironment
class TestStreamProcessing(unittest.TestCase):
def setUp(self):
self.env = LocalEnvironment()
self.test_data = [1, 2, 3, 4, 5]
def tearDown(self):
self.env.cleanup()
def test_basic_mapping(self):
stream = self.env.create_stream("test")
# Add test data
for item in self.test_data:
stream.write(item)
stream.close()
# Test mapping
result = stream.map(lambda x: x * 2).collect()
expected = [x * 2 for x in self.test_data]
self.assertEqual(result, expected)
def test_error_handling(self):
stream = self.env.create_stream("error_test")
def failing_func(x):
if x == 3:
raise ValueError("Test error")
return x
# Test that errors are handled properly
with self.assertRaises(ValueError):
stream.map(failing_func).collect()
Integration Testing¶
class TestIntegration(unittest.TestCase):
def test_end_to_end_pipeline(self):
env = LocalEnvironment()
# Create test pipeline
input_stream = env.create_stream("input")
processed = (input_stream
.filter(lambda x: x > 0)
.map(lambda x: x * 2)
.reduce(lambda a, b: a + b)
)
# Test with known data
test_data = [1, -1, 2, -2, 3]
for item in test_data:
input_stream.write(item)
result = processed.collect()
expected = sum(x * 2 for x in test_data if x > 0)
self.assertEqual(result, expected)
Performance Testing¶
import time
import psutil
def measure_performance(stream_operation, data_size=10000):
start_time = time.time()
start_memory = psutil.Process().memory_info().rss
# Run operation
result = stream_operation(data_size)
end_time = time.time()
end_memory = psutil.Process().memory_info().rss
return {
"execution_time": end_time - start_time,
"memory_usage": end_memory - start_memory,
"result_size": len(result)
}
# Test different configurations
def test_buffer_sizes():
configs = [100, 1000, 10000]
results = {}
for buffer_size in configs:
env = LocalEnvironment(config={"buffer_size": buffer_size})
def operation(size):
stream = env.create_stream("test")
return stream.map(lambda x: x * 2).take(size)
results[buffer_size] = measure_performance(operation)
return results
Monitoring and Observability¶
Logging Best Practices¶
import logging
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def logged_processor(data):
try:
logger.info(f"Processing item: {data.get('id', 'unknown')}")
result = process_data(data)
logger.info(f"Successfully processed item: {data.get('id')}")
return result
except Exception as e:
logger.error(f"Failed to process item {data.get('id')}: {str(e)}")
raise
stream.map(logged_processor)
Metrics Collection¶
from collections import defaultdict
import time
class StreamMetrics:
def __init__(self):
self.counters = defaultdict(int)
self.timers = defaultdict(list)
self.gauges = defaultdict(float)
def increment(self, name, value=1):
self.counters[name] += value
def time_operation(self, name, func, *args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
self.increment(f"{name}.success")
return result
except Exception as e:
self.increment(f"{name}.error")
raise
finally:
duration = time.time() - start_time
self.timers[name].append(duration)
def set_gauge(self, name, value):
self.gauges[name] = value
# Usage
metrics = StreamMetrics()
def monitored_processor(data):
return metrics.time_operation("process_data", process_data, data)
stream.map(monitored_processor)
Health Checks¶
class HealthChecker:
def __init__(self, environment):
self.environment = environment
self.checks = []
def add_check(self, name, check_func):
self.checks.append((name, check_func))
def run_checks(self):
results = {}
for name, check_func in self.checks:
try:
results[name] = {
"status": "healthy" if check_func() else "unhealthy",
"timestamp": time.time()
}
except Exception as e:
results[name] = {
"status": "error",
"error": str(e),
"timestamp": time.time()
}
return results
# Setup health checks
health = HealthChecker(env)
health.add_check("stream_connectivity", lambda: env.test_connection())
health.add_check("memory_usage", lambda: psutil.virtual_memory().percent < 80)
# Run periodically
health_status = health.run_checks()
Deployment Best Practices¶
Configuration Management¶
import os
from typing import Dict, Any
class Config:
def __init__(self):
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
return {
"environment": os.getenv("SAGE_ENV", "development"),
"log_level": os.getenv("LOG_LEVEL", "INFO"),
"buffer_size": int(os.getenv("BUFFER_SIZE", "1000")),
"max_workers": int(os.getenv("MAX_WORKERS", "4")),
"redis_url": os.getenv("REDIS_URL", "redis://localhost:6379"),
"database_url": os.getenv("DATABASE_URL", "sqlite:///local.db")
}
def get(self, key: str, default: Any = None) -> Any:
return self.config.get(key, default)
# Usage
config = Config()
env = LocalEnvironment(config={
"buffer_size": config.get("buffer_size"),
"log_level": config.get("log_level")
})
Container Deployment¶
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy application
COPY . .
# Set environment variables
ENV SAGE_ENV=production
ENV LOG_LEVEL=INFO
# Run application
CMD ["python", "-m", "sage.cli", "serve"]
# docker-compose.yml
version: '3.8'
services:
sage-kernel:
build: .
environment:
- SAGE_ENV=production
- REDIS_URL=redis://redis:6379
depends_on:
- redis
ports:
- "8000:8000"
redis:
image: redis:alpine
ports:
- "6379:6379"
Production Monitoring¶
# monitoring.py
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
REQUEST_COUNT = Counter('sage_requests_total', 'Total requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('sage_request_duration_seconds', 'Request duration')
ACTIVE_STREAMS = Gauge('sage_active_streams', 'Number of active streams')
def monitor_stream_operation(operation_name):
def decorator(func):
def wrapper(*args, **kwargs):
REQUEST_COUNT.labels(method='stream', endpoint=operation_name).inc()
with REQUEST_DURATION.time():
result = func(*args, **kwargs)
return result
return wrapper
return decorator
# Usage in stream operations
@monitor_stream_operation('data_processing')
def process_data(data):
return transform(data)
This comprehensive guide should help you build robust, performant, and maintainable applications with SAGE Kernel.