🐍 🏗️ Microservices - Python
🏗️ Microservices - Python
"We don't bow to any king" - Microservices Edition
TuskLang provides powerful tools for building and orchestrating microservices architectures with seamless communication and service discovery.
🚀 Service Architecture
Basic Microservice Structure
from tsk import TSK
from flask import Flask, request, jsonify
import requests
import jsonapp = Flask(__name__)
Microservice configuration
service_config = TSK.from_string("""
[microservice]
Service metadata
name: "user-service"
version: "1.0.0"
port: 8001
environment: @env("SERVICE_ENV", "development")Service dependencies
dependencies: {
auth_service: "http://auth-service:8002",
notification_service: "http://notification-service:8003",
database_service: "http://database-service:8004"
}Service discovery
service_registry: @env("SERVICE_REGISTRY", "http://registry:8000")Health check configuration
health_check: {
endpoint: "/health",
interval: 30,
timeout: 5
}Service communication
service_client_fujsen = '''
def service_client(service_name, endpoint, method='GET', data=None, headers=None):
# Get service URL from registry or dependencies
service_url = get_service_url(service_name)
if not service_url:
raise ServiceUnavailableError(f"Service {service_name} not available")
# Make service call
url = f"{service_url}{endpoint}"
try:
if method.upper() == 'GET':
response = requests.get(url, headers=headers, timeout=10)
elif method.upper() == 'POST':
response = requests.post(url, json=data, headers=headers, timeout=10)
elif method.upper() == 'PUT':
response = requests.put(url, json=data, headers=headers, timeout=10)
elif method.upper() == 'DELETE':
response = requests.delete(url, headers=headers, timeout=10)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
log_error(f"Service call failed: {service_name}", e)
raise ServiceCommunicationError(f"Failed to communicate with {service_name}: {str(e)}")
'''Service discovery
get_service_url_fujsen = '''
def get_service_url(service_name):
# First check dependencies
if service_name in dependencies:
return dependencies[service_name]
# Then check service registry
try:
response = requests.get(f"{service_registry}/services/{service_name}")
if response.status_code == 200:
service_info = response.json()
return service_info['url']
except:
pass
return None
'''Circuit breaker for service calls
circuit_breaker_fujsen = '''
class ServiceCircuitBreaker:
def __init__(self, service_name, failure_threshold=5, recovery_timeout=60):
self.service_name = service_name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED'
def call(self, service_func, args, *kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = 'HALF_OPEN'
else:
raise ServiceUnavailableError(f"Circuit breaker for {self.service_name} is OPEN")
try:
result = service_func(args, *kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
self.failure_count = 0
self.state = 'CLOSED'
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
'''Service health check
health_check_fujsen = '''
def health_check():
health_status = {
'service': name,
'version': version,
'status': 'healthy',
'timestamp': time.time(),
'dependencies': {}
}
# Check dependencies
for dep_name, dep_url in dependencies.items():
try:
response = requests.get(f"{dep_url}/health", timeout=5)
health_status['dependencies'][dep_name] = {
'status': 'healthy' if response.status_code == 200 else 'unhealthy',
'response_time': response.elapsed.total_seconds()
}
except Exception as e:
health_status['dependencies'][dep_name] = {
'status': 'unhealthy',
'error': str(e)
}
# Determine overall status
all_healthy = all(dep['status'] == 'healthy' for dep in health_status['dependencies'].values())
health_status['status'] = 'healthy' if all_healthy else 'degraded'
return health_status
'''
""")User service endpoints
@app.route('/users', methods=['GET'])
def get_users():
try:
# Get users from database service
users = service_config.execute_fujsen('microservice', 'service_client',
'database_service', '/users', 'GET')
return jsonify({
'status': 'success',
'data': users,
'service': service_config.get('microservice.name')
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e),
'service': service_config.get('microservice.name')
}), 500@app.route('/users', methods=['POST'])
def create_user():
try:
data = request.get_json()
# Validate user data
if not data.get('username') or not data.get('email'):
return jsonify({'status': 'error', 'message': 'Missing required fields'}), 400
# Create user in database service
user = service_config.execute_fujsen('microservice', 'service_client',
'database_service', '/users', 'POST', data)
# Send notification
try:
service_config.execute_fujsen('microservice', 'service_client',
'notification_service', '/notifications', 'POST', {
'type': 'user_created',
'user_id': user['id'],
'email': user['email']
})
except Exception as e:
# Log notification failure but don't fail the request
print(f"Notification failed: {e}")
return jsonify({
'status': 'success',
'data': user,
'service': service_config.get('microservice.name')
}), 201
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e),
'service': service_config.get('microservice.name')
}), 500
@app.route('/health', methods=['GET'])
def health():
"""Health check endpoint"""
health_status = service_config.execute_fujsen('microservice', 'health_check')
status_code = 200 if health_status['status'] == 'healthy' else 503
return jsonify(health_status), status_code
🔄 Service Communication
Inter-Service Communication
Service communication configuration
communication_config = TSK.from_string("""
[service_communication]
Message queue configuration
message_queue: @env("MESSAGE_QUEUE", "redis://localhost:6379")
queue_name: "service_events"Event-driven communication
publish_event_fujsen = '''
def publish_event(event_type, event_data, target_service=None):
import redis
# Connect to Redis
r = redis.from_url(message_queue)
# Create event message
event = {
'id': generate_event_id(),
'type': event_type,
'data': event_data,
'source_service': name,
'target_service': target_service,
'timestamp': time.time()
}
# Publish to queue
r.lpush(queue_name, json.dumps(event))
return event
'''subscribe_events_fujsen = '''
def subscribe_events(event_types=None):
import redis
# Connect to Redis
r = redis.from_url(message_queue)
while True:
# Pop event from queue
event_data = r.brpop(queue_name, timeout=1)
if event_data:
event = json.loads(event_data[1])
# Filter by event type if specified
if event_types and event['type'] not in event_types:
continue
# Process event
process_event(event)
'''
process_event_fujsen = '''
def process_event(event):
# Route event to appropriate handler
handlers = {
'user_created': handle_user_created,
'user_updated': handle_user_updated,
'user_deleted': handle_user_deleted,
'order_created': handle_order_created
}
handler = handlers.get(event['type'])
if handler:
try:
handler(event['data'])
except Exception as e:
log_error(f"Event processing failed: {event['type']}", e)
else:
log_warning(f"No handler for event type: {event['type']}")
'''
gRPC communication
grpc_client_fujsen = '''
def grpc_client(service_name, method, request_data):
import grpc
# Get service address
service_address = get_service_url(service_name)
# Create gRPC channel
channel = grpc.insecure_channel(service_address)
try:
# Import service stub
if service_name == 'user_service':
from user_service_pb2_grpc import UserServiceStub
from user_service_pb2 import UserRequest
stub = UserServiceStub(channel)
request = UserRequest(**request_data)
if method == 'get_user':
response = stub.GetUser(request)
elif method == 'create_user':
response = stub.CreateUser(request)
else:
raise ValueError(f"Unknown method: {method}")
return response
except grpc.RpcError as e:
log_error(f"gRPC call failed: {service_name}.{method}", e)
raise ServiceCommunicationError(f"gRPC call failed: {str(e)}")
finally:
channel.close()
'''Service mesh integration
service_mesh_fujsen = '''
def service_mesh_request(service_name, endpoint, method='GET', data=None):
# Add service mesh headers
headers = {
'X-Request-ID': generate_request_id(),
'X-Service-Name': name,
'X-Service-Version': version
}
# Make request through service mesh
service_url = get_service_url(service_name)
try:
if method.upper() == 'GET':
response = requests.get(f"{service_url}{endpoint}", headers=headers)
elif method.upper() == 'POST':
response = requests.post(f"{service_url}{endpoint}", json=data, headers=headers)
else:
raise ValueError(f"Unsupported method: {method}")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
log_error(f"Service mesh request failed: {service_name}", e)
raise ServiceCommunicationError(f"Service mesh request failed: {str(e)}")
'''
""")
🔍 Service Discovery & Registry
Service Registry Implementation
Service registry configuration
registry_config = TSK.from_string("""
[service_registry]
Registry storage
registry_db: @env("REGISTRY_DB", "sqlite:///registry.db")Service registration
register_service_fujsen = '''
def register_service(service_info):
# Validate service info
required_fields = ['name', 'version', 'url', 'health_endpoint']
for field in required_fields:
if field not in service_info:
raise ValueError(f"Missing required field: {field}")
# Check if service is healthy
try:
response = requests.get(f"{service_info['url']}{service_info['health_endpoint']}", timeout=5)
if response.status_code != 200:
raise ValueError("Service health check failed")
except Exception as e:
raise ValueError(f"Service health check failed: {str(e)}")
# Register service
service_id = f"{service_info['name']}-{service_info['version']}"
execute("""
INSERT OR REPLACE INTO services (
service_id, name, version, url, health_endpoint,
metadata, registered_at, last_health_check
) VALUES (?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))
""", service_id, service_info['name'], service_info['version'],
service_info['url'], service_info['health_endpoint'],
json.dumps(service_info.get('metadata', {})))
return {'service_id': service_id, 'status': 'registered'}
'''Service discovery
discover_service_fujsen = '''
def discover_service(service_name, version=None):
if version:
service_id = f"{service_name}-{version}"
service = query("""
SELECT * FROM services
WHERE service_id = ? AND active = 1
ORDER BY last_health_check DESC
""", service_id)
else:
service = query("""
SELECT * FROM services
WHERE name = ? AND active = 1
ORDER BY last_health_check DESC, version DESC
""", service_name)
if not service:
return None
return {
'service_id': service[0][0],
'name': service[0][1],
'version': service[0][2],
'url': service[0][3],
'health_endpoint': service[0][4],
'metadata': json.loads(service[0][5]),
'registered_at': service[0][6],
'last_health_check': service[0][7]
}
'''Health check monitoring
monitor_services_fujsen = '''
def monitor_services():
# Get all registered services
services = query("SELECT * FROM services WHERE active = 1")
for service in services:
service_url = service[3]
health_endpoint = service[4]
try:
# Check service health
response = requests.get(f"{service_url}{health_endpoint}", timeout=5)
if response.status_code == 200:
# Service is healthy
execute("""
UPDATE services
SET last_health_check = datetime('now'), health_status = 'healthy'
WHERE service_id = ?
""", service[0])
else:
# Service is unhealthy
execute("""
UPDATE services
SET last_health_check = datetime('now'), health_status = 'unhealthy'
WHERE service_id = ?
""", service[0])
except Exception as e:
# Service is unreachable
execute("""
UPDATE services
SET last_health_check = datetime('now'), health_status = 'unreachable'
WHERE service_id = ?
""", service[0])
log_error(f"Service health check failed: {service[1]}", e)
'''Load balancing
load_balance_fujsen = '''
def load_balance(service_name, strategy='round_robin'):
# Get all healthy instances of the service
services = query("""
SELECT * FROM services
WHERE name = ? AND health_status = 'healthy' AND active = 1
ORDER BY last_health_check DESC
""", service_name)
if not services:
return None
if strategy == 'round_robin':
# Simple round-robin
return services[0][3] # Return URL of first service
elif strategy == 'least_connections':
# Return service with least active connections
# This would require tracking connection counts
return services[0][3]
elif strategy == 'random':
import random
return random.choice(services)[3]
else:
return services[0][3]
'''
""")
📊 Service Monitoring & Observability
Distributed Tracing
Service monitoring configuration
monitoring_config = TSK.from_string("""
[service_monitoring]
Distributed tracing
trace_request_fujsen = '''
def trace_request(request_id, service_name, operation, duration, status='success'):
# Create trace span
span = {
'trace_id': request_id,
'service_name': service_name,
'operation': operation,
'start_time': time.time() - duration,
'end_time': time.time(),
'duration': duration,
'status': status,
'metadata': {
'request_id': request_id,
'service_version': version
}
}
# Store trace
execute("""
INSERT INTO traces (
trace_id, service_name, operation, start_time,
end_time, duration, status, metadata
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", span['trace_id'], span['service_name'], span['operation'],
span['start_time'], span['end_time'], span['duration'],
span['status'], json.dumps(span['metadata']))
return span
'''Service metrics
collect_metrics_fujsen = '''
def collect_metrics():
# Collect service metrics
metrics = {
'requests_total': get_request_count(),
'requests_per_second': calculate_rps(),
'average_response_time': get_avg_response_time(),
'error_rate': get_error_rate(),
'active_connections': get_active_connections(),
'memory_usage': get_memory_usage(),
'cpu_usage': get_cpu_usage()
}
# Store metrics
for metric_name, value in metrics.items():
record_metric(metric_name, value, {'service': name})
return metrics
'''Service alerts
check_alerts_fujsen = '''
def check_alerts():
alerts = []
# Check error rate
error_rate = get_error_rate()
if error_rate > 5: # 5% error rate threshold
alerts.append({
'type': 'high_error_rate',
'severity': 'warning',
'message': f'Error rate is {error_rate}%',
'value': error_rate,
'threshold': 5
})
# Check response time
avg_response_time = get_avg_response_time()
if avg_response_time > 2: # 2 second threshold
alerts.append({
'type': 'high_response_time',
'severity': 'warning',
'message': f'Average response time is {avg_response_time}s',
'value': avg_response_time,
'threshold': 2
})
# Check memory usage
memory_usage = get_memory_usage()
if memory_usage > 80: # 80% memory threshold
alerts.append({
'type': 'high_memory_usage',
'severity': 'critical',
'message': f'Memory usage is {memory_usage}%',
'value': memory_usage,
'threshold': 80
})
return alerts
'''Service dashboard
generate_dashboard_fujsen = '''
def generate_dashboard():
# Generate service dashboard data
dashboard_data = {
'service_info': {
'name': name,
'version': version,
'status': 'healthy',
'uptime': get_uptime()
},
'metrics': collect_metrics(),
'recent_requests': get_recent_requests(),
'error_breakdown': get_error_breakdown(),
'dependencies': get_dependency_status()
}
return dashboard_data
'''
""")
🔧 Service Configuration Management
Configuration Management
Configuration management
config_management = TSK.from_string("""
[configuration_management]
Configuration store
config_store: @env("CONFIG_STORE", "http://config-service:8005")Load configuration
load_config_fujsen = '''
def load_config(service_name, environment=None):
if not environment:
environment = @env("SERVICE_ENV", "development")
try:
# Load from config service
response = requests.get(f"{config_store}/config/{service_name}/{environment}")
if response.status_code == 200:
return response.json()
except:
pass
# Fallback to local config
return load_local_config(service_name, environment)
'''Configuration validation
validate_config_fujsen = '''
def validate_config(config, schema):
errors = []
for field, rules in schema.items():
if field not in config:
if rules.get('required', False):
errors.append(f"Missing required config: {field}")
continue
value = config[field]
# Type validation
if 'type' in rules:
if not isinstance(value, rules['type']):
errors.append(f"Config {field} must be {rules['type'].__name__}")
# Value validation
if 'min' in rules and value < rules['min']:
errors.append(f"Config {field} must be at least {rules['min']}")
if 'max' in rules and value > rules['max']:
errors.append(f"Config {field} must be at most {rules['max']}")
if errors:
raise ValueError("Configuration validation failed: " + "; ".join(errors))
return True
'''Hot reload configuration
hot_reload_config_fujsen = '''
def hot_reload_config():
# Watch for configuration changes
import time
last_config_hash = None
while True:
try:
# Get current config
current_config = load_config(name)
current_hash = hash(json.dumps(current_config, sort_keys=True))
if current_hash != last_config_hash:
# Configuration changed
validate_and_apply_config(current_config)
last_config_hash = current_hash
log_info("Configuration reloaded")
except Exception as e:
log_error("Configuration reload failed", e)
time.sleep(30) # Check every 30 seconds
'''Apply configuration
apply_config_fujsen = '''
def apply_config(config):
# Apply configuration changes
for key, value in config.items():
if key == 'database':
update_database_config(value)
elif key == 'cache':
update_cache_config(value)
elif key == 'logging':
update_logging_config(value)
elif key == 'security':
update_security_config(value)
log_info("Configuration applied successfully")
'''
""")
🎯 Microservices Best Practices
1. Service Design
- Design services around business capabilities - Keep services small and focused - Use appropriate communication patterns - Implement proper error handling2. Service Communication
- Use asynchronous communication when possible - Implement circuit breakers for resilience - Use service mesh for complex routing - Monitor service dependencies3. Data Management
- Use database per service pattern - Implement eventual consistency - Use event sourcing for data synchronization - Handle distributed transactions carefully4. Deployment & Operations
- Use containerization for consistency - Implement proper service discovery - Use health checks and monitoring - Implement proper logging and tracing5. Security
- Implement service-to-service authentication - Use secure communication protocols - Implement proper access controls - Monitor for security threats🚀 Next Steps
1. Design your service architecture with clear boundaries 2. Implement service discovery and registration 3. Set up inter-service communication with proper error handling 4. Add monitoring and tracing for observability 5. Implement configuration management for flexibility
---
"We don't bow to any king" - TuskLang provides powerful tools for building and orchestrating microservices architectures. Design with clear boundaries, implement proper communication, and build scalable, resilient systems!