🦀 Microservices Architecture in TuskLang with Rust
Microservices Architecture in TuskLang with Rust
🏗️ Microservices Foundation
Microservices architecture with TuskLang and Rust provides scalable, maintainable, and resilient distributed systems. This guide covers service decomposition, inter-service communication, and advanced microservices patterns.
🏗️ Architecture Overview
Microservices Stack
[microservices_architecture]
service_mesh: true
api_gateway: true
event_driven: true
distributed_tracing: true[microservices_components]
tonic: "grpc_framework"
tokio: "async_runtime"
serde: "serialization"
tracing: "observability"
Service Configuration
[microservices_configuration]
enable_service_mesh: true
enable_circuit_breaker: true
enable_distributed_tracing: true
enable_metrics: true[microservices_implementation]
use tonic::{transport::Server, Request, Response, Status};
use tokio::sync::mpsc;
use serde::{Deserialize, Serialize};
use tracing::{info, error, instrument};
use std::sync::Arc;
use tokio::sync::RwLock;
// Service registry
pub struct ServiceRegistry {
services: Arc<RwLock<HashMap<String, ServiceInfo>>>,
health_checks: Arc<RwLock<HashMap<String, HealthCheck>>>,
}
#[derive(Debug, Clone)]
pub struct ServiceInfo {
pub name: String,
pub version: String,
pub host: String,
pub port: u16,
pub health_endpoint: String,
pub status: ServiceStatus,
pub last_heartbeat: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone)]
pub enum ServiceStatus {
Healthy,
Unhealthy,
Unknown,
}
#[derive(Debug, Clone)]
pub struct HealthCheck {
pub service_name: String,
pub interval: Duration,
pub timeout: Duration,
pub retries: u32,
}
impl ServiceRegistry {
pub fn new() -> Self {
Self {
services: Arc::new(RwLock::new(HashMap::new())),
health_checks: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn register_service(&self, service: ServiceInfo) -> Result<(), RegistryError> {
let mut services = self.services.write().await;
services.insert(service.name.clone(), service);
info!("Service registered: {}", service.name);
Ok(())
}
pub async fn deregister_service(&self, service_name: &str) -> Result<(), RegistryError> {
let mut services = self.services.write().await;
services.remove(service_name);
info!("Service deregistered: {}", service_name);
Ok(())
}
pub async fn get_service(&self, service_name: &str) -> Option<ServiceInfo> {
let services = self.services.read().await;
services.get(service_name).cloned()
}
pub async fn list_services(&self) -> Vec<ServiceInfo> {
let services = self.services.read().await;
services.values().cloned().collect()
}
pub async fn update_service_status(&self, service_name: &str, status: ServiceStatus) -> Result<(), RegistryError> {
let mut services = self.services.write().await;
if let Some(service) = services.get_mut(service_name) {
service.status = status;
service.last_heartbeat = chrono::Utc::now();
}
Ok(())
}
pub async fn add_health_check(&self, health_check: HealthCheck) {
let mut health_checks = self.health_checks.write().await;
health_checks.insert(health_check.service_name.clone(), health_check);
}
pub async fn start_health_monitoring(&self) {
let health_checks = self.health_checks.read().await;
let services = self.services.clone();
for health_check in health_checks.values() {
let health_check = health_check.clone();
let services = services.clone();
tokio::spawn(async move {
loop {
if let Some(service) = services.read().await.get(&health_check.service_name) {
let status = Self::perform_health_check(service).await;
let _ = Self::update_service_status(&services, &health_check.service_name, status).await;
}
tokio::time::sleep(health_check.interval).await;
}
});
}
}
async fn perform_health_check(service: &ServiceInfo) -> ServiceStatus {
let client = reqwest::Client::new();
let url = format!("http://{}:{}{}", service.host, service.port, service.health_endpoint);
match client.get(&url).timeout(Duration::from_secs(5)).send().await {
Ok(response) => {
if response.status().is_success() {
ServiceStatus::Healthy
} else {
ServiceStatus::Unhealthy
}
}
Err(_) => ServiceStatus::Unhealthy,
}
}
async fn update_service_status(
services: &Arc<RwLock<HashMap<String, ServiceInfo>>>,
service_name: &str,
status: ServiceStatus,
) -> Result<(), RegistryError> {
let mut services = services.write().await;
if let Some(service) = services.get_mut(service_name) {
service.status = status;
service.last_heartbeat = chrono::Utc::now();
}
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum RegistryError {
#[error("Service not found: {service_name}")]
ServiceNotFound { service_name: String },
#[error("Service already exists: {service_name}")]
ServiceAlreadyExists { service_name: String },
#[error("Health check failed: {message}")]
HealthCheckFailed { message: String },
}
🔌 Service Communication
gRPC Service Communication
[service_communication]
grpc: true
http: true
message_queue: true
event_streaming: true[communication_implementation]
// User service
pub struct UserService {
db: Arc<RwLock<HashMap<u64, User>>>,
event_publisher: Arc<EventPublisher>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct User {
pub id: u64,
pub username: String,
pub email: String,
pub created_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateUserRequest {
pub username: String,
pub email: String,
pub password: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserCreatedEvent {
pub user_id: u64,
pub username: String,
pub email: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
impl UserService {
pub fn new(event_publisher: Arc<EventPublisher>) -> Self {
Self {
db: Arc::new(RwLock::new(HashMap::new())),
event_publisher,
}
}
#[instrument(skip(self))]
pub async fn create_user(&self, request: CreateUserRequest) -> Result<User, ServiceError> {
info!("Creating user: {}", request.username);
// Validate input
if request.username.is_empty() || request.email.is_empty() {
return Err(ServiceError::ValidationError {
message: "Username and email are required".to_string()
});
}
let mut db = self.db.write().await;
let user_id = db.len() as u64 + 1;
let user = User {
id: user_id,
username: request.username.clone(),
email: request.email.clone(),
created_at: chrono::Utc::now(),
};
db.insert(user.id, user.clone());
// Publish event
let event = UserCreatedEvent {
user_id: user.id,
username: user.username.clone(),
email: user.email.clone(),
timestamp: user.created_at,
};
self.event_publisher.publish("user.created", &event).await?;
info!("User created successfully: {}", user.id);
Ok(user)
}
#[instrument(skip(self))]
pub async fn get_user(&self, id: u64) -> Result<Option<User>, ServiceError> {
let db = self.db.read().await;
Ok(db.get(&id).cloned())
}
}
// Order service
pub struct OrderService {
db: Arc<RwLock<HashMap<u64, Order>>>,
user_service_client: Arc<UserServiceClient>,
event_publisher: Arc<EventPublisher>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Order {
pub id: u64,
pub user_id: u64,
pub items: Vec<OrderItem>,
pub total_amount: f64,
pub status: OrderStatus,
pub created_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrderItem {
pub product_id: u64,
pub quantity: u32,
pub price: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OrderStatus {
Pending,
Confirmed,
Shipped,
Delivered,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateOrderRequest {
pub user_id: u64,
pub items: Vec<OrderItem>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrderCreatedEvent {
pub order_id: u64,
pub user_id: u64,
pub total_amount: f64,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
impl OrderService {
pub fn new(
user_service_client: Arc<UserServiceClient>,
event_publisher: Arc<EventPublisher>,
) -> Self {
Self {
db: Arc::new(RwLock::new(HashMap::new())),
user_service_client,
event_publisher,
}
}
#[instrument(skip(self))]
pub async fn create_order(&self, request: CreateOrderRequest) -> Result<Order, ServiceError> {
info!("Creating order for user: {}", request.user_id);
// Verify user exists
let user = self.user_service_client.get_user(request.user_id).await?;
if user.is_none() {
return Err(ServiceError::ValidationError {
message: "User not found".to_string()
});
}
// Calculate total
let total_amount: f64 = request.items.iter().map(|item| item.price * item.quantity as f64).sum();
let mut db = self.db.write().await;
let order_id = db.len() as u64 + 1;
let order = Order {
id: order_id,
user_id: request.user_id,
items: request.items,
total_amount,
status: OrderStatus::Pending,
created_at: chrono::Utc::now(),
};
db.insert(order.id, order.clone());
// Publish event
let event = OrderCreatedEvent {
order_id: order.id,
user_id: order.user_id,
total_amount: order.total_amount,
timestamp: order.created_at,
};
self.event_publisher.publish("order.created", &event).await?;
info!("Order created successfully: {}", order.id);
Ok(order)
}
#[instrument(skip(self))]
pub async fn get_order(&self, id: u64) -> Result<Option<Order>, ServiceError> {
let db = self.db.read().await;
Ok(db.get(&id).cloned())
}
#[instrument(skip(self))]
pub async fn update_order_status(&self, id: u64, status: OrderStatus) -> Result<Order, ServiceError> {
let mut db = self.db.write().await;
if let Some(order) = db.get_mut(&id) {
order.status = status;
Ok(order.clone())
} else {
Err(ServiceError::NotFound { message: "Order not found".to_string() })
}
}
}
// User service client
pub struct UserServiceClient {
client: reqwest::Client,
base_url: String,
}
impl UserServiceClient {
pub fn new(base_url: String) -> Self {
Self {
client: reqwest::Client::new(),
base_url,
}
}
pub async fn get_user(&self, id: u64) -> Result<Option<User>, ServiceError> {
let url = format!("{}/users/{}", self.base_url, id);
match self.client.get(&url).send().await {
Ok(response) => {
if response.status().is_success() {
let user: User = response.json().await
.map_err(|e| ServiceError::CommunicationError { message: e.to_string() })?;
Ok(Some(user))
} else if response.status() == reqwest::StatusCode::NOT_FOUND {
Ok(None)
} else {
Err(ServiceError::CommunicationError {
message: format!("HTTP {}: {}", response.status(), response.text().await.unwrap_or_default())
})
}
}
Err(e) => Err(ServiceError::CommunicationError { message: e.to_string() }),
}
}
}
📡 Event-Driven Architecture
Event Publishing and Consumption
[event_driven_architecture]
event_publishing: true
event_consumption: true
event_sourcing: true
cqrs: true[event_implementation]
// Event publisher
pub struct EventPublisher {
event_store: Arc<EventStore>,
message_queue: Arc<MessageQueue>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Event {
pub id: String,
pub event_type: String,
pub aggregate_id: String,
pub data: serde_json::Value,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub version: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventMetadata {
pub correlation_id: String,
pub causation_id: String,
pub user_id: Option<String>,
}
impl EventPublisher {
pub fn new(event_store: Arc<EventStore>, message_queue: Arc<MessageQueue>) -> Self {
Self {
event_store,
message_queue,
}
}
pub async fn publish<T: Serialize>(&self, event_type: &str, data: &T) -> Result<(), EventError> {
let event = Event {
id: uuid::Uuid::new_v4().to_string(),
event_type: event_type.to_string(),
aggregate_id: "".to_string(), // Would be set based on context
data: serde_json::to_value(data)
.map_err(|e| EventError::SerializationError { message: e.to_string() })?,
timestamp: chrono::Utc::now(),
version: 1,
};
// Store event
self.event_store.store_event(&event).await?;
// Publish to message queue
self.message_queue.publish(event_type, &event).await?;
info!("Event published: {} - {}", event_type, event.id);
Ok(())
}
}
// Event store
pub struct EventStore {
events: Arc<RwLock<Vec<Event>>>,
}
impl EventStore {
pub fn new() -> Self {
Self {
events: Arc::new(RwLock::new(Vec::new())),
}
}
pub async fn store_event(&self, event: &Event) -> Result<(), EventError> {
let mut events = self.events.write().await;
events.push(event.clone());
Ok(())
}
pub async fn get_events(&self, aggregate_id: &str) -> Result<Vec<Event>, EventError> {
let events = self.events.read().await;
Ok(events.iter()
.filter(|e| e.aggregate_id == aggregate_id)
.cloned()
.collect())
}
pub async fn get_events_by_type(&self, event_type: &str) -> Result<Vec<Event>, EventError> {
let events = self.events.read().await;
Ok(events.iter()
.filter(|e| e.event_type == event_type)
.cloned()
.collect())
}
}
// Message queue
pub struct MessageQueue {
subscribers: Arc<RwLock<HashMap<String, Vec<Box<dyn EventHandler>>>>>,
}
impl MessageQueue {
pub fn new() -> Self {
Self {
subscribers: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn subscribe(&self, event_type: &str, handler: Box<dyn EventHandler>) {
let mut subscribers = self.subscribers.write().await;
subscribers.entry(event_type.to_string())
.or_insert_with(Vec::new)
.push(handler);
}
pub async fn publish(&self, event_type: &str, event: &Event) -> Result<(), EventError> {
let subscribers = self.subscribers.read().await;
if let Some(handlers) = subscribers.get(event_type) {
for handler in handlers {
if let Err(e) = handler.handle(event).await {
error!("Event handler error: {}", e);
}
}
}
Ok(())
}
}
// Event handler trait
#[async_trait::async_trait]
pub trait EventHandler: Send + Sync {
async fn handle(&self, event: &Event) -> Result<(), EventError>;
}
// Notification service event handler
pub struct NotificationEventHandler {
notification_service: Arc<NotificationService>,
}
#[async_trait::async_trait]
impl EventHandler for NotificationEventHandler {
async fn handle(&self, event: &Event) -> Result<(), EventError> {
match event.event_type.as_str() {
"user.created" => {
if let Ok(user_data) = serde_json::from_value::<UserCreatedEvent>(event.data.clone()) {
self.notification_service.send_welcome_email(&user_data).await?;
}
}
"order.created" => {
if let Ok(order_data) = serde_json::from_value::<OrderCreatedEvent>(event.data.clone()) {
self.notification_service.send_order_confirmation(&order_data).await?;
}
}
_ => {}
}
Ok(())
}
}
// Notification service
pub struct NotificationService {
email_client: Arc<EmailClient>,
}
impl NotificationService {
pub fn new(email_client: Arc<EmailClient>) -> Self {
Self { email_client }
}
pub async fn send_welcome_email(&self, event: &UserCreatedEvent) -> Result<(), ServiceError> {
let email = Email {
to: event.email.clone(),
subject: "Welcome to our platform!".to_string(),
body: format!("Hello {}, welcome to our platform!", event.username),
};
self.email_client.send_email(&email).await?;
info!("Welcome email sent to: {}", event.email);
Ok(())
}
pub async fn send_order_confirmation(&self, event: &OrderCreatedEvent) -> Result<(), ServiceError> {
// Implementation would send order confirmation email
info!("Order confirmation sent for order: {}", event.order_id);
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct Email {
pub to: String,
pub subject: String,
pub body: String,
}
pub struct EmailClient;
impl EmailClient {
pub async fn send_email(&self, email: &Email) -> Result<(), ServiceError> {
// Implementation would send actual email
info!("Email sent to {}: {}", email.to, email.subject);
Ok(())
}
}
🔄 Circuit Breaker Pattern
Circuit Breaker Implementation
[circuit_breaker]
failure_threshold: true
recovery_timeout: true
half_open_state: true[circuit_breaker_implementation]
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{Duration, Instant};
// Circuit breaker
pub struct CircuitBreaker {
failure_threshold: u32,
recovery_timeout: Duration,
state: Arc<RwLock<CircuitState>>,
failure_count: Arc<AtomicU32>,
last_failure_time: Arc<RwLock<Option<Instant>>>,
}
#[derive(Debug, Clone)]
pub enum CircuitState {
Closed,
Open,
HalfOpen,
}
impl CircuitBreaker {
pub fn new(failure_threshold: u32, recovery_timeout: Duration) -> Self {
Self {
failure_threshold,
recovery_timeout,
state: Arc::new(RwLock::new(CircuitState::Closed)),
failure_count: Arc::new(AtomicU32::new(0)),
last_failure_time: Arc::new(RwLock::new(None)),
}
}
pub async fn call<F, Fut, T, E>(&self, f: F) -> Result<T, CircuitBreakerError<E>>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: std::fmt::Debug,
{
let state = self.state.read().await;
match *state {
CircuitState::Open => {
let last_failure = self.last_failure_time.read().await;
if let Some(failure_time) = *last_failure {
if failure_time.elapsed() >= self.recovery_timeout {
// Transition to half-open
drop(state);
let mut state = self.state.write().await;
*state = CircuitState::HalfOpen;
drop(state);
// Try the call
self.try_call(f).await
} else {
Err(CircuitBreakerError::CircuitOpen)
}
} else {
Err(CircuitBreakerError::CircuitOpen)
}
}
CircuitState::HalfOpen | CircuitState::Closed => {
drop(state);
self.try_call(f).await
}
}
}
async fn try_call<F, Fut, T, E>(&self, f: F) -> Result<T, CircuitBreakerError<E>>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: std::fmt::Debug,
{
match f().await {
Ok(result) => {
// Success - reset failure count and close circuit
self.failure_count.store(0, Ordering::SeqCst);
let mut state = self.state.write().await;
*state = CircuitState::Closed;
Ok(result)
}
Err(error) => {
// Failure - increment count and potentially open circuit
let failure_count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
let mut last_failure_time = self.last_failure_time.write().await;
*last_failure_time = Some(Instant::now());
if failure_count >= self.failure_threshold {
let mut state = self.state.write().await;
*state = CircuitState::Open;
Err(CircuitBreakerError::CircuitOpen)
} else {
Err(CircuitBreakerError::ServiceError(error))
}
}
}
}
pub async fn get_state(&self) -> CircuitState {
self.state.read().await.clone()
}
}
#[derive(Debug, thiserror::Error)]
pub enum CircuitBreakerError<E> {
#[error("Circuit breaker is open")]
CircuitOpen,
#[error("Service error: {0:?}")]
ServiceError(E),
}
// Service with circuit breaker
pub struct ResilientUserServiceClient {
client: UserServiceClient,
circuit_breaker: CircuitBreaker,
}
impl ResilientUserServiceClient {
pub fn new(client: UserServiceClient) -> Self {
Self {
circuit_breaker: CircuitBreaker::new(5, Duration::from_secs(30)),
client,
}
}
pub async fn get_user(&self, id: u64) -> Result<Option<User>, ServiceError> {
self.circuit_breaker.call(|| self.client.get_user(id)).await
.map_err(|e| match e {
CircuitBreakerError::CircuitOpen => ServiceError::ServiceUnavailable {
message: "Service temporarily unavailable".to_string()
},
CircuitBreakerError::ServiceError(e) => e,
})
}
}
🎯 Best Practices
1. Service Decomposition
- Decompose by business capability - Keep services focused and cohesive - Use bounded contexts for domain modeling - Implement proper service boundaries2. Inter-Service Communication
- Use asynchronous communication when possible - Implement circuit breakers for resilience - Use event-driven patterns for loose coupling - Implement proper error handling and retries3. Data Management
- Use database per service pattern - Implement eventual consistency - Use event sourcing for audit trails - Implement proper data versioning4. Observability
- Implement distributed tracing - Use structured logging - Monitor service health and metrics - Implement proper alerting5. TuskLang Integration
- Use TuskLang configuration for service parameters - Implement service discovery with TuskLang - Configure circuit breakers through TuskLang - Use TuskLang for event routing and filteringMicroservices architecture with TuskLang and Rust provides scalable, maintainable, and resilient distributed systems with comprehensive patterns for service communication and coordination.