🐍 📊 Monitoring & Observability - Python
📊 Monitoring & Observability - Python
"We don't bow to any king" - Monitoring Edition
TuskLang provides comprehensive monitoring and observability features to ensure your applications run smoothly and efficiently.
📈 Metrics Collection
Application Metrics
from tsk import TSK
import time
import psutilMonitoring configuration
monitoring_config = TSK.from_string("""
[metrics]
Application metrics
request_count: @metric("counter", "requests_total")
response_time: @metric("histogram", "response_time_seconds")
error_count: @metric("counter", "errors_total")
active_connections: @metric("gauge", "active_connections")Business metrics
user_registrations: @metric("counter", "user_registrations_total")
payment_volume: @metric("counter", "payment_volume_total")
api_calls: @metric("counter", "api_calls_total")collect_metrics_fujsen = '''
def collect_metrics():
# System metrics
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Application metrics
app_metrics = {
'cpu_usage': cpu_percent,
'memory_usage': memory.percent,
'disk_usage': disk.percent,
'memory_available': memory.available,
'disk_free': disk.free
}
# Record metrics
for metric_name, value in app_metrics.items():
record_metric(metric_name, value)
return app_metrics
'''
record_metric_fujsen = '''
def record_metric(metric_name, value, labels=None):
# Record metric to monitoring system
metric_data = {
'name': metric_name,
'value': value,
'timestamp': time.time(),
'labels': labels or {}
}
# Store in database
execute("""
INSERT INTO metrics (metric_name, value, timestamp, labels)
VALUES (?, ?, datetime('now'), ?)
""", metric_name, value, json.dumps(labels or {}))
return metric_data
'''
get_metrics_fujsen = '''
def get_metrics(metric_name, time_range='1h', aggregation='avg'):
# Get metrics from database
if time_range == '1h':
time_filter = "timestamp > datetime('now', '-1 hour')"
elif time_range == '24h':
time_filter = "timestamp > datetime('now', '-1 day')"
elif time_range == '7d':
time_filter = "timestamp > datetime('now', '-7 days')"
else:
time_filter = "1=1"
if aggregation == 'avg':
query_str = f"SELECT AVG(value) FROM metrics WHERE metric_name = ? AND {time_filter}"
elif aggregation == 'max':
query_str = f"SELECT MAX(value) FROM metrics WHERE metric_name = ? AND {time_filter}"
elif aggregation == 'min':
query_str = f"SELECT MIN(value) FROM metrics WHERE metric_name = ? AND {time_filter}"
elif aggregation == 'sum':
query_str = f"SELECT SUM(value) FROM metrics WHERE metric_name = ? AND {time_filter}"
else:
query_str = f"SELECT value FROM metrics WHERE metric_name = ? AND {time_filter} ORDER BY timestamp DESC LIMIT 100"
results = query(query_str, metric_name)
return [row[0] for row in results]
'''
""")
Test metrics collection
def test_metrics():
# Collect system metrics
metrics = monitoring_config.execute_fujsen('metrics', 'collect_metrics')
print(f"CPU Usage: {metrics['cpu_usage']}%")
print(f"Memory Usage: {metrics['memory_usage']}%")
# Record custom metric
monitoring_config.execute_fujsen('metrics', 'record_metric', 'custom_metric', 42.5, {'service': 'api'})
# Get metrics
cpu_avg = monitoring_config.execute_fujsen('metrics', 'get_metrics', 'cpu_usage', '1h', 'avg')
print(f"Average CPU (1h): {cpu_avg[0] if cpu_avg else 'N/A'}")
Custom Business Metrics
Business metrics configuration
business_metrics = TSK.from_string("""
[business_metrics]
User engagement metrics
daily_active_users: @metric("gauge", "daily_active_users")
user_session_duration: @metric("histogram", "user_session_duration_seconds")
feature_usage: @metric("counter", "feature_usage_total")Financial metrics
revenue_daily: @metric("counter", "revenue_daily_total")
transaction_volume: @metric("counter", "transaction_volume_total")
payment_success_rate: @metric("gauge", "payment_success_rate")track_user_activity_fujsen = '''
def track_user_activity(user_id, action, duration=None):
# Track user activity
activity_data = {
'user_id': user_id,
'action': action,
'timestamp': time.time(),
'duration': duration
}
# Record activity
execute("""
INSERT INTO user_activities (user_id, action, timestamp, duration)
VALUES (?, ?, datetime('now'), ?)
""", user_id, action, duration)
# Update metrics
if action == 'login':
increment_metric('daily_active_users', 1)
elif action == 'logout' and duration:
record_metric('user_session_duration', duration)
return activity_data
'''
track_feature_usage_fujsen = '''
def track_feature_usage(user_id, feature_name):
# Track feature usage
usage_data = {
'user_id': user_id,
'feature': feature_name,
'timestamp': time.time()
}
# Record usage
execute("""
INSERT INTO feature_usage (user_id, feature_name, timestamp)
VALUES (?, ?, datetime('now'))
""", user_id, feature_name)
# Increment counter
increment_metric('feature_usage', 1, {'feature': feature_name})
return usage_data
'''
track_payment_fujsen = '''
def track_payment(amount, success, payment_method):
# Track payment metrics
payment_data = {
'amount': amount,
'success': success,
'payment_method': payment_method,
'timestamp': time.time()
}
# Record payment
execute("""
INSERT INTO payments (amount, success, payment_method, timestamp)
VALUES (?, ?, ?, datetime('now'))
""", amount, success, payment_method)
# Update metrics
if success:
increment_metric('revenue_daily', amount)
increment_metric('transaction_volume', 1)
# Calculate success rate
success_rate = calculate_payment_success_rate()
record_metric('payment_success_rate', success_rate)
return payment_data
'''
calculate_payment_success_rate_fujsen = '''
def calculate_payment_success_rate():
# Calculate payment success rate for last 24 hours
total_payments = query("""
SELECT COUNT(*) FROM payments
WHERE timestamp > datetime('now', '-1 day')
""")[0][0]
successful_payments = query("""
SELECT COUNT(*) FROM payments
WHERE success = 1 AND timestamp > datetime('now', '-1 day')
""")[0][0]
if total_payments == 0:
return 0
return (successful_payments / total_payments) * 100
'''
""")
📊 Logging & Tracing
Structured Logging
Logging configuration
logging_config = TSK.from_string("""
[logging]
Log levels
log_level: @env("LOG_LEVEL", "info")
log_format: @env("LOG_FORMAT", "json")
log_file: @env("LOG_FILE", "/app/logs/app.log")Logging functions
log_info_fujsen = '''
def log_info(message, context=None):
log_entry = {
'level': 'INFO',
'message': message,
'timestamp': time.time(),
'context': context or {}
}
# Write to database
execute("""
INSERT INTO application_logs (level, message, timestamp, context)
VALUES (?, ?, datetime('now'), ?)
""", 'INFO', message, json.dumps(context or {}))
# Write to file if configured
if log_file:
write_log_to_file(log_entry)
return log_entry
'''log_error_fujsen = '''
def log_error(message, error=None, context=None):
log_entry = {
'level': 'ERROR',
'message': message,
'error': str(error) if error else None,
'timestamp': time.time(),
'context': context or {}
}
# Write to database
execute("""
INSERT INTO application_logs (level, message, error, timestamp, context)
VALUES (?, ?, ?, datetime('now'), ?)
""", 'ERROR', message, str(error) if error else None, json.dumps(context or {}))
# Write to file if configured
if log_file:
write_log_to_file(log_entry)
# Increment error counter
increment_metric('errors_total', 1)
return log_entry
'''
log_request_fujsen = '''
def log_request(request_id, method, path, status_code, duration, user_id=None):
log_entry = {
'level': 'INFO',
'message': 'HTTP Request',
'request_id': request_id,
'method': method,
'path': path,
'status_code': status_code,
'duration': duration,
'user_id': user_id,
'timestamp': time.time()
}
# Write to database
execute("""
INSERT INTO request_logs (request_id, method, path, status_code, duration, user_id, timestamp)
VALUES (?, ?, ?, ?, ?, ?, datetime('now'))
""", request_id, method, path, status_code, duration, user_id)
# Update metrics
increment_metric('requests_total', 1)
record_metric('response_time', duration)
if status_code >= 400:
increment_metric('errors_total', 1)
return log_entry
'''
write_log_to_file_fujsen = '''
def write_log_to_file(log_entry):
import json
if log_format == 'json':
log_line = json.dumps(log_entry) + '\\n'
else:
log_line = f"[{log_entry['timestamp']}] {log_entry['level']}: {log_entry['message']}\\n"
with open(log_file, 'a') as f:
f.write(log_line)
'''
""")
Distributed Tracing
Tracing configuration
tracing_config = TSK.from_string("""
[tracing]
Trace configuration
trace_enabled: @env("TRACE_ENABLED", true)
trace_sampler: @env("TRACE_SAMPLER", 0.1)start_trace_fujsen = '''
def start_trace(operation_name, trace_id=None, parent_id=None):
if not trace_enabled:
return None
# Generate trace ID if not provided
if not trace_id:
trace_id = generate_trace_id()
# Create span
span = {
'trace_id': trace_id,
'span_id': generate_span_id(),
'parent_id': parent_id,
'operation_name': operation_name,
'start_time': time.time(),
'tags': {}
}
# Store span
execute("""
INSERT INTO traces (trace_id, span_id, parent_id, operation_name, start_time, tags)
VALUES (?, ?, ?, ?, datetime('now'), ?)
""", trace_id, span['span_id'], parent_id, operation_name, json.dumps({}))
return span
'''
end_trace_fujsen = '''
def end_trace(span, status='OK', error=None):
if not span:
return
end_time = time.time()
duration = end_time - span['start_time']
# Update span
execute("""
UPDATE traces
SET end_time = datetime('now'), duration = ?, status = ?, error = ?
WHERE trace_id = ? AND span_id = ?
""", duration, status, str(error) if error else None, span['trace_id'], span['span_id'])
# Record trace metric
record_metric('trace_duration', duration, {'operation': span['operation_name']})
return {
'trace_id': span['trace_id'],
'span_id': span['span_id'],
'duration': duration,
'status': status
}
'''
add_trace_tag_fujsen = '''
def add_trace_tag(span, key, value):
if not span:
return
# Update tags
current_tags = query("""
SELECT tags FROM traces WHERE trace_id = ? AND span_id = ?
""", span['trace_id'], span['span_id'])[0][0]
tags = json.loads(current_tags) if current_tags else {}
tags[key] = value
# Update database
execute("""
UPDATE traces SET tags = ? WHERE trace_id = ? AND span_id = ?
""", json.dumps(tags), span['trace_id'], span['span_id'])
return tags
'''
generate_trace_id_fujsen = '''
def generate_trace_id():
import uuid
return str(uuid.uuid4())
'''
generate_span_id_fujsen = '''
def generate_span_id():
import uuid
return str(uuid.uuid4())[:16]
'''
""")
🔍 Health Checks & Alerts
Health Check System
Health check configuration
health_config = TSK.from_string("""
[health_checks]
Health check endpoints
health_endpoint: "/health"
ready_endpoint: "/ready"
live_endpoint: "/live"Health check functions
check_health_fujsen = '''
def check_health():
health_status = {
'status': 'healthy',
'timestamp': time.time(),
'checks': {}
}
# Check database
db_health = check_database_health()
health_status['checks']['database'] = db_health
# Check cache
cache_health = check_cache_health()
health_status['checks']['cache'] = cache_health
# Check external services
external_health = check_external_services()
health_status['checks']['external'] = external_health
# Determine overall status
all_healthy = all(check['status'] == 'healthy' for check in health_status['checks'].values())
health_status['status'] = 'healthy' if all_healthy else 'unhealthy'
return health_status
'''check_database_health_fujsen = '''
def check_database_health():
try:
# Test database connection
result = query("SELECT 1")
# Check connection pool
pool_stats = connection_pool_stats()
return {
'status': 'healthy',
'details': {
'connection': 'OK',
'pool_size': pool_stats.get('total_connections', 0),
'active_connections': pool_stats.get('active_connections', 0)
}
}
except Exception as e:
return {
'status': 'unhealthy',
'error': str(e)
}
'''
check_cache_health_fujsen = '''
def check_cache_health():
try:
# Test cache connection
cache.set('health_check', 'ok', 10)
value = cache.get('health_check')
if value == 'ok':
return {
'status': 'healthy',
'details': {
'connection': 'OK',
'read_write': 'OK'
}
}
else:
return {
'status': 'unhealthy',
'error': 'Cache read/write test failed'
}
except Exception as e:
return {
'status': 'unhealthy',
'error': str(e)
}
'''
check_external_services_fujsen = '''
def check_external_services():
services = {
'api_gateway': 'https://api.example.com/health',
'payment_service': 'https://payments.example.com/health',
'email_service': 'https://email.example.com/health'
}
results = {}
for service_name, url in services.items():
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
results[service_name] = {
'status': 'healthy',
'response_time': response.elapsed.total_seconds()
}
else:
results[service_name] = {
'status': 'unhealthy',
'error': f'HTTP {response.status_code}'
}
except Exception as e:
results[service_name] = {
'status': 'unhealthy',
'error': str(e)
}
return results
'''
""")
Alerting System
Alerting configuration
alerting_config = TSK.from_string("""
[alerting]
Alert thresholds
cpu_threshold: @env("CPU_THRESHOLD", 80)
memory_threshold: @env("MEMORY_THRESHOLD", 85)
error_rate_threshold: @env("ERROR_RATE_THRESHOLD", 5)
response_time_threshold: @env("RESPONSE_TIME_THRESHOLD", 2.0)Alert functions
check_alerts_fujsen = '''
def check_alerts():
alerts = []
# Check CPU usage
cpu_usage = get_system_metrics('cpu_usage')
if cpu_usage > cpu_threshold:
alerts.append({
'type': 'high_cpu',
'severity': 'warning',
'message': f'CPU usage is {cpu_usage}% (threshold: {cpu_threshold}%)',
'value': cpu_usage,
'threshold': cpu_threshold
})
# Check memory usage
memory_usage = get_system_metrics('memory_usage')
if memory_usage > memory_threshold:
alerts.append({
'type': 'high_memory',
'severity': 'warning',
'message': f'Memory usage is {memory_usage}% (threshold: {memory_threshold}%)',
'value': memory_usage,
'threshold': memory_threshold
})
# Check error rate
error_rate = calculate_error_rate()
if error_rate > error_rate_threshold:
alerts.append({
'type': 'high_error_rate',
'severity': 'critical',
'message': f'Error rate is {error_rate}% (threshold: {error_rate_threshold}%)',
'value': error_rate,
'threshold': error_rate_threshold
})
# Check response time
avg_response_time = get_metrics('response_time', '5m', 'avg')
if avg_response_time and avg_response_time[0] > response_time_threshold:
alerts.append({
'type': 'slow_response',
'severity': 'warning',
'message': f'Average response time is {avg_response_time[0]:.2f}s (threshold: {response_time_threshold}s)',
'value': avg_response_time[0],
'threshold': response_time_threshold
})
return alerts
'''send_alert_fujsen = '''
def send_alert(alert):
# Store alert in database
execute("""
INSERT INTO alerts (type, severity, message, value, threshold, timestamp)
VALUES (?, ?, ?, ?, ?, datetime('now'))
""", alert['type'], alert['severity'], alert['message'],
alert['value'], alert['threshold'])
# Send notification based on severity
if alert['severity'] == 'critical':
send_critical_alert(alert)
elif alert['severity'] == 'warning':
send_warning_alert(alert)
return alert
'''
send_critical_alert_fujsen = '''
def send_critical_alert(alert):
# Send critical alert via multiple channels
# Email
send_email_alert(alert, 'critical')
# Slack
send_slack_alert(alert, 'critical')
# PagerDuty
send_pagerduty_alert(alert)
return True
'''
send_warning_alert_fujsen = '''
def send_warning_alert(alert):
# Send warning alert
# Email
send_email_alert(alert, 'warning')
# Slack
send_slack_alert(alert, 'warning')
return True
'''
calculate_error_rate_fujsen = '''
def calculate_error_rate():
# Calculate error rate for last 5 minutes
total_requests = query("""
SELECT COUNT(*) FROM request_logs
WHERE timestamp > datetime('now', '-5 minutes')
""")[0][0]
error_requests = query("""
SELECT COUNT(*) FROM request_logs
WHERE status_code >= 400 AND timestamp > datetime('now', '-5 minutes')
""")[0][0]
if total_requests == 0:
return 0
return (error_requests / total_requests) * 100
'''
""")
📊 Dashboard & Visualization
Metrics Dashboard
Dashboard configuration
dashboard_config = TSK.from_string("""
[dashboard]
Dashboard endpoints
metrics_endpoint: "/metrics"
dashboard_endpoint: "/dashboard"
status_endpoint: "/status"generate_dashboard_data_fujsen = '''
def generate_dashboard_data():
# Generate comprehensive dashboard data
# System metrics
system_metrics = {
'cpu_usage': get_metrics('cpu_usage', '1h', 'avg'),
'memory_usage': get_metrics('memory_usage', '1h', 'avg'),
'disk_usage': get_metrics('disk_usage', '1h', 'avg')
}
# Application metrics
app_metrics = {
'request_rate': calculate_request_rate(),
'error_rate': calculate_error_rate(),
'response_time': get_metrics('response_time', '1h', 'avg'),
'active_users': get_metrics('daily_active_users', '1h', 'max')
}
# Business metrics
business_metrics = {
'revenue_today': calculate_revenue_today(),
'user_registrations': get_metrics('user_registrations', '24h', 'sum'),
'payment_success_rate': get_metrics('payment_success_rate', '1h', 'avg')
}
# Recent alerts
recent_alerts = get_recent_alerts()
return {
'system': system_metrics,
'application': app_metrics,
'business': business_metrics,
'alerts': recent_alerts,
'generated_at': time.time()
}
'''
calculate_request_rate_fujsen = '''
def calculate_request_rate():
# Calculate requests per second for last minute
requests_last_minute = query("""
SELECT COUNT(*) FROM request_logs
WHERE timestamp > datetime('now', '-1 minute')
""")[0][0]
return requests_last_minute / 60 # requests per second
'''
calculate_revenue_today_fujsen = '''
def calculate_revenue_today():
# Calculate total revenue for today
revenue = query("""
SELECT COALESCE(SUM(amount), 0) FROM payments
WHERE success = 1 AND DATE(timestamp) = DATE('now')
""")[0][0]
return revenue
'''
get_recent_alerts_fujsen = '''
def get_recent_alerts():
# Get recent alerts (last hour)
alerts = query("""
SELECT type, severity, message, timestamp
FROM alerts
WHERE timestamp > datetime('now', '-1 hour')
ORDER BY timestamp DESC
LIMIT 10
""")
return [{
'type': alert[0],
'severity': alert[1],
'message': alert[2],
'timestamp': alert[3]
} for alert in alerts]
'''
export_metrics_fujsen = '''
def export_metrics():
# Export metrics in Prometheus format
metrics = []
# System metrics
cpu_usage = get_metrics('cpu_usage', '1m', 'avg')
if cpu_usage:
metrics.append(f'cpu_usage {cpu_usage[0]}')
memory_usage = get_metrics('memory_usage', '1m', 'avg')
if memory_usage:
metrics.append(f'memory_usage {memory_usage[0]}')
# Application metrics
request_count = get_metrics('requests_total', '1m', 'sum')
if request_count:
metrics.append(f'requests_total {request_count[0]}')
error_count = get_metrics('errors_total', '1m', 'sum')
if error_count:
metrics.append(f'errors_total {error_count[0]}')
return '\\n'.join(metrics)
'''
""")
🎯 Monitoring Best Practices
1. Metrics Collection
- Collect both system and business metrics - Use appropriate metric types (counter, gauge, histogram) - Implement proper labeling and tagging - Regular metric aggregation and cleanup2. Logging Strategy
- Use structured logging with consistent format - Include correlation IDs for request tracing - Implement log rotation and retention policies - Separate application and access logs3. Health Checks
- Implement comprehensive health checks - Check all dependencies and external services - Use different endpoints for liveness and readiness - Regular health check execution4. Alerting
- Set appropriate thresholds for alerts - Use different severity levels - Implement alert aggregation and deduplication - Regular alert review and tuning5. Visualization
- Create meaningful dashboards - Use appropriate chart types for different metrics - Implement real-time updates - Regular dashboard review and optimization🚀 Next Steps
1. Implement metrics collection for key application metrics 2. Set up structured logging with correlation IDs 3. Configure health checks for all dependencies 4. Create alerting rules with appropriate thresholds 5. Build monitoring dashboards for visualization
---
"We don't bow to any king" - TuskLang provides comprehensive monitoring and observability features to ensure your applications run smoothly. Implement proper metrics, logging, and alerting to maintain operational excellence!