🦀 Event-Driven Architecture in TuskLang with Rust
Event-Driven Architecture in TuskLang with Rust
📡 Event-Driven Foundation
Event-driven architecture with TuskLang and Rust provides scalable, loosely-coupled, and reactive systems. This guide covers event sourcing, CQRS, message queues, and advanced event-driven patterns.
🏗️ Architecture Overview
Event-Driven Stack
[event_driven_architecture]
event_sourcing: true
cqrs: true
message_queues: true
reactive_streams: true[event_components]
tokio: "async_runtime"
serde: "serialization"
tracing: "observability"
uuid: "event_identification"
Event Configuration
[event_configuration]
enable_event_sourcing: true
enable_cqrs: true
enable_message_queues: true
enable_streaming: true[event_implementation]
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing::{info, error, instrument};
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
// Event store
pub struct EventStore {
events: Arc<RwLock<Vec<StoredEvent>>>,
snapshots: Arc<RwLock<HashMap<String, Snapshot>>>,
projections: Arc<RwLock<HashMap<String, Projection>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredEvent {
pub id: String,
pub aggregate_id: String,
pub event_type: String,
pub data: serde_json::Value,
pub metadata: EventMetadata,
pub version: u64,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventMetadata {
pub correlation_id: String,
pub causation_id: String,
pub user_id: Option<String>,
pub source: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Snapshot {
pub aggregate_id: String,
pub data: serde_json::Value,
pub version: u64,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Projection {
pub name: String,
pub data: serde_json::Value,
pub last_event_id: String,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
impl EventStore {
pub fn new() -> Self {
Self {
events: Arc::new(RwLock::new(Vec::new())),
snapshots: Arc::new(RwLock::new(HashMap::new())),
projections: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn append_events(&self, aggregate_id: &str, events: Vec<Event>) -> Result<(), EventStoreError> {
let mut stored_events = self.events.write().await;
let current_version = stored_events.iter()
.filter(|e| e.aggregate_id == aggregate_id)
.map(|e| e.version)
.max()
.unwrap_or(0);
for (index, event) in events.iter().enumerate() {
let stored_event = StoredEvent {
id: Uuid::new_v4().to_string(),
aggregate_id: aggregate_id.to_string(),
event_type: event.event_type.clone(),
data: serde_json::to_value(event)
.map_err(|e| EventStoreError::SerializationError { message: e.to_string() })?,
metadata: event.metadata.clone(),
version: current_version + index as u64 + 1,
timestamp: chrono::Utc::now(),
};
stored_events.push(stored_event);
}
info!("Appended {} events for aggregate: {}", events.len(), aggregate_id);
Ok(())
}
pub async fn get_events(&self, aggregate_id: &str, from_version: Option<u64>) -> Result<Vec<StoredEvent>, EventStoreError> {
let events = self.events.read().await;
let filtered_events: Vec<StoredEvent> = events.iter()
.filter(|e| e.aggregate_id == aggregate_id)
.filter(|e| from_version.map_or(true, |v| e.version > v))
.cloned()
.collect();
Ok(filtered_events)
}
pub async fn save_snapshot(&self, snapshot: Snapshot) -> Result<(), EventStoreError> {
let mut snapshots = self.snapshots.write().await;
snapshots.insert(snapshot.aggregate_id.clone(), snapshot);
Ok(())
}
pub async fn get_snapshot(&self, aggregate_id: &str) -> Option<Snapshot> {
let snapshots = self.snapshots.read().await;
snapshots.get(aggregate_id).cloned()
}
pub async fn update_projection(&self, projection: Projection) -> Result<(), EventStoreError> {
let mut projections = self.projections.write().await;
projections.insert(projection.name.clone(), projection);
Ok(())
}
pub async fn get_projection(&self, name: &str) -> Option<Projection> {
let projections = self.projections.read().await;
projections.get(name).cloned()
}
}
#[derive(Debug, thiserror::Error)]
pub enum EventStoreError {
#[error("Serialization error: {message}")]
SerializationError { message: String },
#[error("Aggregate not found: {aggregate_id}")]
AggregateNotFound { aggregate_id: String },
#[error("Concurrency error: {message}")]
ConcurrencyError { message: String },
}
📝 Event Sourcing
Event Sourcing Implementation
[event_sourcing]
aggregate_pattern: true
event_replay: true
snapshot_pattern: true
projection_pattern: true[event_sourcing_implementation]
// Base event trait
pub trait Event: Serialize + DeserializeOwned + Send + Sync {
fn event_type(&self) -> &str;
fn aggregate_id(&self) -> &str;
}
// Base aggregate trait
pub trait Aggregate: Send + Sync {
type Event: Event;
type Error: std::error::Error + Send + Sync;
fn aggregate_id(&self) -> &str;
fn version(&self) -> u64;
fn apply_event(&mut self, event: &Self::Event) -> Result<(), Self::Error>;
fn snapshot(&self) -> serde_json::Value;
fn load_from_snapshot(&mut self, snapshot: serde_json::Value) -> Result<(), Self::Error>;
}
// User aggregate
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct User {
pub id: String,
pub username: String,
pub email: String,
pub status: UserStatus,
pub version: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UserStatus {
Active,
Inactive,
Suspended,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UserEvent {
UserCreated {
id: String,
username: String,
email: String,
metadata: EventMetadata,
},
UserUpdated {
username: Option<String>,
email: Option<String>,
metadata: EventMetadata,
},
UserStatusChanged {
status: UserStatus,
metadata: EventMetadata,
},
}
impl Event for UserEvent {
fn event_type(&self) -> &str {
match self {
UserEvent::UserCreated { .. } => "user.created",
UserEvent::UserUpdated { .. } => "user.updated",
UserEvent::UserStatusChanged { .. } => "user.status_changed",
}
}
fn aggregate_id(&self) -> &str {
match self {
UserEvent::UserCreated { id, .. } => id,
UserEvent::UserUpdated { .. } => &self.aggregate_id(),
UserEvent::UserStatusChanged { .. } => &self.aggregate_id(),
}
}
}
impl Aggregate for User {
type Event = UserEvent;
type Error = UserError;
fn aggregate_id(&self) -> &str {
&self.id
}
fn version(&self) -> u64 {
self.version
}
fn apply_event(&mut self, event: &Self::Event) -> Result<(), Self::Error> {
match event {
UserEvent::UserCreated { id, username, email, .. } => {
self.id = id.clone();
self.username = username.clone();
self.email = email.clone();
self.status = UserStatus::Active;
self.version += 1;
}
UserEvent::UserUpdated { username, email, .. } => {
if let Some(username) = username {
self.username = username.clone();
}
if let Some(email) = email {
self.email = email.clone();
}
self.version += 1;
}
UserEvent::UserStatusChanged { status, .. } => {
self.status = status.clone();
self.version += 1;
}
}
Ok(())
}
fn snapshot(&self) -> serde_json::Value {
serde_json::to_value(self).unwrap()
}
fn load_from_snapshot(&mut self, snapshot: serde_json::Value) -> Result<(), Self::Error> {
let user: User = serde_json::from_value(snapshot)
.map_err(|e| UserError::DeserializationError { message: e.to_string() })?;
*self = user;
Ok(())
}
}
// User repository with event sourcing
pub struct UserRepository {
event_store: Arc<EventStore>,
}
impl UserRepository {
pub fn new(event_store: Arc<EventStore>) -> Self {
Self { event_store }
}
pub async fn save(&self, user: &User, events: Vec<UserEvent>) -> Result<(), UserError> {
// Check for concurrency conflicts
let stored_events = self.event_store.get_events(&user.id, None).await
.map_err(|e| UserError::EventStoreError { message: e.to_string() })?;
let expected_version = stored_events.len() as u64;
if user.version() != expected_version {
return Err(UserError::ConcurrencyError {
message: format!("Expected version {}, got {}", expected_version, user.version())
});
}
// Append events
self.event_store.append_events(&user.id, events).await
.map_err(|e| UserError::EventStoreError { message: e.to_string() })?;
// Save snapshot every 10 events
if (user.version() + 1) % 10 == 0 {
let snapshot = Snapshot {
aggregate_id: user.id.clone(),
data: user.snapshot(),
version: user.version(),
timestamp: chrono::Utc::now(),
};
self.event_store.save_snapshot(snapshot).await
.map_err(|e| UserError::EventStoreError { message: e.to_string() })?;
}
Ok(())
}
pub async fn load(&self, id: &str) -> Result<Option<User>, UserError> {
// Try to load from snapshot first
if let Some(snapshot) = self.event_store.get_snapshot(id).await {
let mut user = User {
id: id.to_string(),
username: String::new(),
email: String::new(),
status: UserStatus::Inactive,
version: 0,
};
user.load_from_snapshot(snapshot.data)?;
// Load events after snapshot
let events = self.event_store.get_events(id, Some(snapshot.version)).await
.map_err(|e| UserError::EventStoreError { message: e.to_string() })?;
for stored_event in events {
let event: UserEvent = serde_json::from_value(stored_event.data)
.map_err(|e| UserError::DeserializationError { message: e.to_string() })?;
user.apply_event(&event)?;
}
Ok(Some(user))
} else {
// Load all events
let events = self.event_store.get_events(id, None).await
.map_err(|e| UserError::EventStoreError { message: e.to_string() })?;
if events.is_empty() {
Ok(None)
} else {
let mut user = User {
id: id.to_string(),
username: String::new(),
email: String::new(),
status: UserStatus::Inactive,
version: 0,
};
for stored_event in events {
let event: UserEvent = serde_json::from_value(stored_event.data)
.map_err(|e| UserError::DeserializationError { message: e.to_string() })?;
user.apply_event(&event)?;
}
Ok(Some(user))
}
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum UserError {
#[error("Event store error: {message}")]
EventStoreError { message: String },
#[error("Deserialization error: {message}")]
DeserializationError { message: String },
#[error("Concurrency error: {message}")]
ConcurrencyError { message: String },
#[error("Validation error: {message}")]
ValidationError { message: String },
}
🔄 CQRS (Command Query Responsibility Segregation)
CQRS Implementation
[cqrs_architecture]
command_handlers: true
query_handlers: true
read_models: true
write_models: true[cqrs_implementation]
// Command trait
pub trait Command: Send + Sync {
type Aggregate: Aggregate;
type Error: std::error::Error + Send + Sync;
fn aggregate_id(&self) -> &str;
fn execute(&self, aggregate: &mut Self::Aggregate) -> Result<Vec<<Self::Aggregate as Aggregate>::Event>, Self::Error>;
}
// Query trait
pub trait Query: Send + Sync {
type Result;
type Error: std::error::Error + Send + Sync;
}
// Command handler
pub struct CommandHandler<C: Command> {
repository: Arc<UserRepository>,
event_publisher: Arc<EventPublisher>,
}
impl<C: Command<Aggregate = User>> CommandHandler<C> {
pub fn new(repository: Arc<UserRepository>, event_publisher: Arc<EventPublisher>) -> Self {
Self {
repository,
event_publisher,
}
}
pub async fn handle(&self, command: C) -> Result<(), C::Error> {
// Load aggregate
let mut aggregate = self.repository.load(command.aggregate_id()).await
.map_err(|e| C::Error::from(e))?;
let aggregate = aggregate.get_or_insert_with(|| User {
id: command.aggregate_id().to_string(),
username: String::new(),
email: String::new(),
status: UserStatus::Inactive,
version: 0,
});
// Execute command
let events = command.execute(aggregate)?;
// Save aggregate
self.repository.save(aggregate, events.clone()).await
.map_err(|e| C::Error::from(e))?;
// Publish events
for event in events {
self.event_publisher.publish(&event).await
.map_err(|e| C::Error::from(e))?;
}
Ok(())
}
}
// User commands
#[derive(Debug, Clone)]
pub struct CreateUserCommand {
pub id: String,
pub username: String,
pub email: String,
pub metadata: EventMetadata,
}
#[derive(Debug, Clone)]
pub struct UpdateUserCommand {
pub id: String,
pub username: Option<String>,
pub email: Option<String>,
pub metadata: EventMetadata,
}
#[derive(Debug, Clone)]
pub struct ChangeUserStatusCommand {
pub id: String,
pub status: UserStatus,
pub metadata: EventMetadata,
}
impl Command for CreateUserCommand {
type Aggregate = User;
type Error = UserError;
fn aggregate_id(&self) -> &str {
&self.id
}
fn execute(&self, _aggregate: &mut Self::Aggregate) -> Result<Vec<UserEvent>, Self::Error> {
let event = UserEvent::UserCreated {
id: self.id.clone(),
username: self.username.clone(),
email: self.email.clone(),
metadata: self.metadata.clone(),
};
Ok(vec![event])
}
}
impl Command for UpdateUserCommand {
type Aggregate = User;
type Error = UserError;
fn aggregate_id(&self) -> &str {
&self.id
}
fn execute(&self, aggregate: &mut Self::Aggregate) -> Result<Vec<UserEvent>, Self::Error> {
let event = UserEvent::UserUpdated {
username: self.username.clone(),
email: self.email.clone(),
metadata: self.metadata.clone(),
};
Ok(vec![event])
}
}
impl Command for ChangeUserStatusCommand {
type Aggregate = User;
type Error = UserError;
fn aggregate_id(&self) -> &str {
&self.id
}
fn execute(&self, _aggregate: &mut Self::Aggregate) -> Result<Vec<UserEvent>, Self::Error> {
let event = UserEvent::UserStatusChanged {
status: self.status.clone(),
metadata: self.metadata.clone(),
};
Ok(vec![event])
}
}
// Query handlers
pub struct QueryHandler {
read_model: Arc<UserReadModel>,
}
impl QueryHandler {
pub fn new(read_model: Arc<UserReadModel>) -> Self {
Self { read_model }
}
pub async fn handle<Q: Query>(&self, query: Q) -> Result<Q::Result, Q::Error> {
// Route query to appropriate handler
// This is a simplified implementation
todo!("Implement query routing")
}
}
// User queries
#[derive(Debug, Clone)]
pub struct GetUserQuery {
pub id: String,
}
#[derive(Debug, Clone)]
pub struct ListUsersQuery {
pub page: u32,
pub per_page: u32,
pub status: Option<UserStatus>,
}
impl Query for GetUserQuery {
type Result = Option<UserReadModel>;
type Error = UserError;
}
impl Query for ListUsersQuery {
type Result = Vec<UserReadModel>;
type Error = UserError;
}
// Read model
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserReadModel {
pub id: String,
pub username: String,
pub email: String,
pub status: UserStatus,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
pub struct UserReadModel {
users: Arc<RwLock<HashMap<String, UserReadModel>>>,
}
impl UserReadModel {
pub fn new() -> Self {
Self {
users: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn get_user(&self, id: &str) -> Option<UserReadModel> {
let users = self.users.read().await;
users.get(id).cloned()
}
pub async fn list_users(&self, page: u32, per_page: u32, status: Option<UserStatus>) -> Vec<UserReadModel> {
let users = self.users.read().await;
let mut filtered_users: Vec<UserReadModel> = users.values()
.filter(|user| status.as_ref().map_or(true, |s| user.status == *s))
.cloned()
.collect();
// Sort by created_at
filtered_users.sort_by(|a, b| b.created_at.cmp(&a.created_at));
// Paginate
let start = (page as usize * per_page as usize).min(filtered_users.len());
let end = (start + per_page as usize).min(filtered_users.len());
filtered_users[start..end].to_vec()
}
pub async fn update_user(&self, user: UserReadModel) {
let mut users = self.users.write().await;
users.insert(user.id.clone(), user);
}
}
📨 Message Queues
Message Queue Implementation
[message_queues]
publish_subscribe: true
point_to_point: true
dead_letter_queues: true
message_persistence: true[message_queue_implementation]
// Message queue
pub struct MessageQueue {
topics: Arc<RwLock<HashMap<String, Topic>>>,
subscriptions: Arc<RwLock<HashMap<String, Vec<Subscription>>>>,
dead_letter_queue: Arc<RwLock<Vec<DeadLetterMessage>>>,
}
#[derive(Debug, Clone)]
pub struct Topic {
pub name: String,
pub messages: Vec<Message>,
pub max_retries: u32,
pub retention_period: Duration,
}
#[derive(Debug, Clone)]
pub struct Message {
pub id: String,
pub topic: String,
pub data: serde_json::Value,
pub metadata: MessageMetadata,
pub created_at: chrono::DateTime<chrono::Utc>,
pub delivery_count: u32,
}
#[derive(Debug, Clone)]
pub struct MessageMetadata {
pub correlation_id: String,
pub causation_id: String,
pub user_id: Option<String>,
pub priority: MessagePriority,
pub ttl: Option<Duration>,
}
#[derive(Debug, Clone)]
pub enum MessagePriority {
Low,
Normal,
High,
Critical,
}
#[derive(Debug, Clone)]
pub struct Subscription {
pub id: String,
pub topic: String,
pub handler: Box<dyn MessageHandler>,
pub filter: Option<MessageFilter>,
pub max_retries: u32,
pub retry_delay: Duration,
}
#[derive(Debug, Clone)]
pub struct MessageFilter {
pub field: String,
pub operator: FilterOperator,
pub value: serde_json::Value,
}
#[derive(Debug, Clone)]
pub enum FilterOperator {
Equals,
NotEquals,
Contains,
GreaterThan,
LessThan,
}
#[derive(Debug, Clone)]
pub struct DeadLetterMessage {
pub message: Message,
pub error: String,
pub failed_at: chrono::DateTime<chrono::Utc>,
}
impl MessageQueue {
pub fn new() -> Self {
Self {
topics: Arc::new(RwLock::new(HashMap::new())),
subscriptions: Arc::new(RwLock::new(HashMap::new())),
dead_letter_queue: Arc::new(RwLock::new(Vec::new())),
}
}
pub async fn create_topic(&self, name: &str, max_retries: u32, retention_period: Duration) -> Result<(), MessageQueueError> {
let mut topics = self.topics.write().await;
if topics.contains_key(name) {
return Err(MessageQueueError::TopicAlreadyExists { name: name.to_string() });
}
let topic = Topic {
name: name.to_string(),
messages: Vec::new(),
max_retries,
retention_period,
};
topics.insert(name.to_string(), topic);
info!("Topic created: {}", name);
Ok(())
}
pub async fn publish<T: Serialize>(&self, topic: &str, data: &T, metadata: MessageMetadata) -> Result<(), MessageQueueError> {
let message = Message {
id: Uuid::new_v4().to_string(),
topic: topic.to_string(),
data: serde_json::to_value(data)
.map_err(|e| MessageQueueError::SerializationError { message: e.to_string() })?,
metadata,
created_at: chrono::Utc::now(),
delivery_count: 0,
};
// Add to topic
{
let mut topics = self.topics.write().await;
if let Some(topic_data) = topics.get_mut(topic) {
topic_data.messages.push(message.clone());
} else {
return Err(MessageQueueError::TopicNotFound { name: topic.to_string() });
}
}
// Deliver to subscribers
self.deliver_message(&message).await?;
info!("Message published to topic {}: {}", topic, message.id);
Ok(())
}
pub async fn subscribe(
&self,
topic: &str,
handler: Box<dyn MessageHandler>,
filter: Option<MessageFilter>,
) -> Result<String, MessageQueueError> {
let subscription_id = Uuid::new_v4().to_string();
let subscription = Subscription {
id: subscription_id.clone(),
topic: topic.to_string(),
handler,
filter,
max_retries: 3,
retry_delay: Duration::from_secs(5),
};
let mut subscriptions = self.subscriptions.write().await;
subscriptions.entry(topic.to_string())
.or_insert_with(Vec::new)
.push(subscription);
info!("Subscription created for topic {}: {}", topic, subscription_id);
Ok(subscription_id)
}
async fn deliver_message(&self, message: &Message) -> Result<(), MessageQueueError> {
let subscriptions = self.subscriptions.read().await;
if let Some(topic_subscriptions) = subscriptions.get(&message.topic) {
for subscription in topic_subscriptions {
// Check filter
if let Some(filter) = &subscription.filter {
if !self.matches_filter(&message.data, filter) {
continue;
}
}
// Deliver message
let message_clone = message.clone();
let subscription_clone = subscription.clone();
let dead_letter_queue = self.dead_letter_queue.clone();
tokio::spawn(async move {
let mut retry_count = 0;
while retry_count < subscription_clone.max_retries {
match subscription_clone.handler.handle(&message_clone).await {
Ok(_) => {
info!("Message delivered successfully: {}", message_clone.id);
break;
}
Err(e) => {
retry_count += 1;
error!("Message delivery failed (attempt {}): {}", retry_count, e);
if retry_count >= subscription_clone.max_retries {
// Send to dead letter queue
let dead_letter = DeadLetterMessage {
message: message_clone.clone(),
error: e.to_string(),
failed_at: chrono::Utc::now(),
};
let mut dlq = dead_letter_queue.write().await;
dlq.push(dead_letter);
break;
}
tokio::time::sleep(subscription_clone.retry_delay).await;
}
}
}
});
}
}
Ok(())
}
fn matches_filter(&self, data: &serde_json::Value, filter: &MessageFilter) -> bool {
if let Some(value) = data.get(&filter.field) {
match filter.operator {
FilterOperator::Equals => value == &filter.value,
FilterOperator::NotEquals => value != &filter.value,
FilterOperator::Contains => {
if let Some(str_value) = value.as_str() {
if let Some(filter_str) = filter.value.as_str() {
return str_value.contains(filter_str);
}
}
false
}
FilterOperator::GreaterThan => value > &filter.value,
FilterOperator::LessThan => value < &filter.value,
}
} else {
false
}
}
pub async fn get_dead_letter_messages(&self) -> Vec<DeadLetterMessage> {
let dlq = self.dead_letter_queue.read().await;
dlq.clone()
}
}
// Message handler trait
#[async_trait::async_trait]
pub trait MessageHandler: Send + Sync {
async fn handle(&self, message: &Message) -> Result<(), MessageHandlerError>;
}
#[derive(Debug, thiserror::Error)]
pub enum MessageHandlerError {
#[error("Processing error: {message}")]
ProcessingError { message: String },
#[error("Validation error: {message}")]
ValidationError { message: String },
}
#[derive(Debug, thiserror::Error)]
pub enum MessageQueueError {
#[error("Topic not found: {name}")]
TopicNotFound { name: String },
#[error("Topic already exists: {name}")]
TopicAlreadyExists { name: String },
#[error("Serialization error: {message}")]
SerializationError { message: String },
#[error("Delivery error: {message}")]
DeliveryError { message: String },
}
🎯 Best Practices
1. Event Design
- Design events to be immutable and self-contained - Use descriptive event names and clear data structures - Include correlation and causation IDs for tracing - Version events for backward compatibility2. Event Sourcing
- Use snapshots for performance optimization - Implement proper concurrency control - Design aggregates with clear boundaries - Use event replay for debugging and testing3. CQRS Implementation
- Separate read and write models completely - Use projections for complex queries - Implement eventual consistency patterns - Optimize read models for specific use cases4. Message Queues
- Implement proper error handling and retries - Use dead letter queues for failed messages - Implement message filtering and routing - Monitor queue performance and health5. TuskLang Integration
- Use TuskLang configuration for event store settings - Configure message queue parameters through TuskLang - Implement event routing with TuskLang - Use TuskLang for CQRS configurationEvent-driven architecture with TuskLang and Rust provides scalable, loosely-coupled, and reactive systems with comprehensive patterns for event sourcing, CQRS, and message queuing.