🦀 Event-Driven Architecture in TuskLang with Rust
Event-Driven Architecture in TuskLang with Rust
🎯 Event-Driven Foundation
Event-driven architecture with TuskLang and Rust provides a powerful foundation for building scalable, loosely-coupled systems that can handle complex business workflows and real-time data processing. This guide covers event sourcing, CQRS, and event streaming patterns.
🏗️ Event Architecture
Event System Design
[event_architecture]
pattern: "event_sourcing"
storage: "event_store"
processing: "stream_processing"
projections: "real_time"[architecture_principles]
event_first: true
immutable_events: true
temporal_ordering: true
causal_consistency: true
Event Types
[event_types]
domain_events: true
integration_events: true
system_events: true
audit_events: true[event_categories]
user_events:
- "UserCreated"
- "UserUpdated"
- "UserDeleted"
order_events:
- "OrderPlaced"
- "OrderConfirmed"
- "OrderShipped"
- "OrderDelivered"
payment_events:
- "PaymentInitiated"
- "PaymentProcessed"
- "PaymentFailed"
- "PaymentRefunded"
🔧 Event Implementation
Event Structure
[event_structure]
event_id: "uuid"
event_type: "string"
aggregate_id: "uuid"
version: "integer"
timestamp: "datetime"
data: "json"
metadata: "json"[event_implementation]
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Event {
pub event_id: Uuid,
pub event_type: String,
pub aggregate_id: Uuid,
pub version: u64,
pub timestamp: DateTime<Utc>,
pub data: serde_json::Value,
pub metadata: EventMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventMetadata {
pub user_id: Option<Uuid>,
pub correlation_id: Option<Uuid>,
pub causation_id: Option<Uuid>,
pub source: String,
pub version: String,
}
Event Store
[event_store]
backend: "postgresql"
streaming: "kafka"
caching: "redis"[store_config]
postgresql:
connection_string: "@env('EVENT_STORE_DB_URL')"
table_name: "events"
partitioning: "by_aggregate"
kafka:
brokers: ["kafka-1:9092", "kafka-2:9092"]
topic_prefix: "events"
retention: "7d"
redis:
connection_string: "@env('REDIS_URL')"
cache_ttl: "1h"
event_cache: true
🔄 Event Sourcing
Aggregate Design
[aggregate_design]
pattern: "domain_aggregate"
state_management: "event_based"
command_handling: "command_bus"
event_application: "state_machine"[aggregate_implementation]
pub trait Aggregate {
type Command;
type Event;
type Error;
fn apply(&mut self, event: &Self::Event);
fn handle(&self, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error>;
}
pub struct UserAggregate {
id: Uuid,
email: String,
name: String,
status: UserStatus,
version: u64,
}
impl Aggregate for UserAggregate {
type Command = UserCommand;
type Event = UserEvent;
type Error = UserError;
fn apply(&mut self, event: &Self::Event) {
match event {
UserEvent::UserCreated { id, email, name } => {
self.id = *id;
self.email = email.clone();
self.name = name.clone();
self.status = UserStatus::Active;
}
UserEvent::UserUpdated { name, .. } => {
self.name = name.clone();
}
UserEvent::UserDeleted { .. } => {
self.status = UserStatus::Deleted;
}
}
self.version += 1;
}
fn handle(&self, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error> {
match command {
UserCommand::CreateUser { email, name } => {
Ok(vec![UserEvent::UserCreated {
id: Uuid::new_v4(),
email,
name,
}])
}
UserCommand::UpdateUser { name } => {
Ok(vec![UserEvent::UserUpdated {
id: self.id,
name,
}])
}
UserCommand::DeleteUser => {
Ok(vec![UserEvent::UserDeleted {
id: self.id,
}])
}
}
}
}
Event Store Operations
[event_store_operations]
append_events: true
read_events: true
snapshots: true
projections: true[store_operations]
pub struct EventStore {
db: PgPool,
kafka_producer: KafkaProducer,
redis: RedisPool,
}
impl EventStore {
pub async fn append_events(
&self,
aggregate_id: Uuid,
expected_version: u64,
events: Vec<Event>,
) -> Result<(), EventStoreError> {
let mut transaction = self.db.begin().await?;
// Check version
let current_version = self.get_current_version(&mut transaction, aggregate_id).await?;
if current_version != expected_version {
return Err(EventStoreError::ConcurrencyConflict);
}
// Store events
for event in events {
self.store_event(&mut transaction, &event).await?;
}
// Publish to Kafka
for event in &events {
self.publish_event(event).await?;
}
transaction.commit().await?;
Ok(())
}
pub async fn read_events(
&self,
aggregate_id: Uuid,
from_version: u64,
) -> Result<Vec<Event>, EventStoreError> {
let events = sqlx::query_as!(
Event,
"SELECT * FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version",
aggregate_id,
from_version
)
.fetch_all(&self.db)
.await?;
Ok(events)
}
}
📊 CQRS Implementation
Command Side
[command_side]
command_bus: "in_memory"
command_handlers: "async"
validation: "command_validators"[command_implementation]
pub struct CommandBus {
handlers: HashMap<TypeId, Box<dyn CommandHandler>>,
}
impl CommandBus {
pub async fn dispatch<C: Command>(&self, command: C) -> Result<Vec<Event>, CommandError> {
let handler = self.handlers.get(&TypeId::of::<C>())
.ok_or(CommandError::HandlerNotFound)?;
handler.handle(command).await
}
}
pub trait CommandHandler: Send + Sync {
type Command: Command;
type Event: Event;
type Error;
async fn handle(&self, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error>;
}
Query Side
[query_side]
read_models: "optimized"
query_handlers: "async"
caching: "redis"[query_implementation]
pub struct QueryBus {
handlers: HashMap<TypeId, Box<dyn QueryHandler>>,
cache: RedisPool,
}
impl QueryBus {
pub async fn dispatch<Q: Query>(&self, query: Q) -> Result<Q::Result, QueryError> {
// Check cache first
if let Some(cached) = self.cache.get(&query.cache_key()).await? {
return Ok(cached);
}
let handler = self.handlers.get(&TypeId::of::<Q>())
.ok_or(QueryError::HandlerNotFound)?;
let result = handler.handle(query).await?;
// Cache result
self.cache.set(&query.cache_key(), &result, Duration::from_secs(300)).await?;
Ok(result)
}
}
pub trait QueryHandler: Send + Sync {
type Query: Query;
type Result;
type Error;
async fn handle(&self, query: Self::Query) -> Result<Self::Result, Self::Error>;
}
🔄 Event Streaming
Kafka Integration
[kafka_integration]
producer: "async"
consumer: "consumer_groups"
streaming: "kafka_streams"[kafka_config]
brokers: ["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"]
topics: ["user_events", "order_events", "payment_events"]
consumer_groups: ["user_service", "order_service", "notification_service"]
Event Producer
[event_producer]
async_producer: true
batch_publishing: true
error_handling: true[producer_implementation]
pub struct EventProducer {
producer: KafkaProducer,
topic: String,
}
impl EventProducer {
pub async fn publish_event(&self, event: &Event) -> Result<(), ProducerError> {
let key = event.aggregate_id.to_string();
let value = serde_json::to_string(event)?;
self.producer
.send(ProducerRecord::new(&self.topic, &key, &value))
.await?;
Ok(())
}
pub async fn publish_events(&self, events: &[Event]) -> Result<(), ProducerError> {
let records: Vec<ProducerRecord<_, _>> = events
.iter()
.map(|event| {
let key = event.aggregate_id.to_string();
let value = serde_json::to_string(event).unwrap();
ProducerRecord::new(&self.topic, &key, &value)
})
.collect();
for record in records {
self.producer.send(record).await?;
}
Ok(())
}
}
Event Consumer
[event_consumer]
consumer_groups: true
offset_management: true
error_handling: true[consumer_implementation]
pub struct EventConsumer {
consumer: KafkaConsumer,
handlers: HashMap<String, Box<dyn EventHandler>>,
}
impl EventConsumer {
pub async fn start_consuming(&mut self) -> Result<(), ConsumerError> {
loop {
let records = self.consumer.poll(Duration::from_millis(100)).await?;
for record in records {
let event: Event = serde_json::from_slice(record.value())?;
if let Some(handler) = self.handlers.get(&event.event_type) {
handler.handle(&event).await?;
}
}
}
}
}
pub trait EventHandler: Send + Sync {
async fn handle(&self, event: &Event) -> Result<(), HandlerError>;
}
📈 Projections
Read Model Projections
[read_model_projections]
real_time: true
batch_processing: true
materialized_views: true[projection_implementation]
pub struct UserProjection {
db: PgPool,
cache: RedisPool,
}
impl EventHandler for UserProjection {
async fn handle(&self, event: &Event) -> Result<(), HandlerError> {
match event.event_type.as_str() {
"UserCreated" => {
let data: UserCreatedData = serde_json::from_value(event.data.clone())?;
self.create_user(&data).await?;
}
"UserUpdated" => {
let data: UserUpdatedData = serde_json::from_value(event.data.clone())?;
self.update_user(&data).await?;
}
"UserDeleted" => {
let data: UserDeletedData = serde_json::from_value(event.data.clone())?;
self.delete_user(&data).await?;
}
_ => {}
}
Ok(())
}
}
impl UserProjection {
async fn create_user(&self, data: &UserCreatedData) -> Result<(), ProjectionError> {
sqlx::query!(
"INSERT INTO users (id, email, name, status, created_at) VALUES ($1, $2, $3, $4, $5)",
data.id,
data.email,
data.name,
"active",
chrono::Utc::now()
)
.execute(&self.db)
.await?;
// Invalidate cache
self.cache.del(&format!("user:{}", data.id)).await?;
Ok(())
}
}
Materialized Views
[materialized_views]
refresh_strategy: "incremental"
indexing: "optimized"
partitioning: "by_date"[view_implementation]
pub struct UserStatsView {
db: PgPool,
}
impl UserStatsView {
pub async fn refresh(&self) -> Result<(), ViewError> {
sqlx::query!(
r#"
REFRESH MATERIALIZED VIEW CONCURRENTLY user_stats
"#
)
.execute(&self.db)
.await?;
Ok(())
}
pub async fn get_user_stats(&self) -> Result<UserStats, ViewError> {
let stats = sqlx::query_as!(
UserStats,
"SELECT * FROM user_stats"
)
.fetch_one(&self.db)
.await?;
Ok(stats)
}
}
🔄 Saga Pattern
Saga Implementation
[saga_implementation]
coordination: "choreography"
compensation: "automatic"
monitoring: "distributed"[saga_pattern]
pub struct OrderSaga {
order_id: Uuid,
steps: Vec<SagaStep>,
current_step: usize,
status: SagaStatus,
}
impl OrderSaga {
pub async fn execute(&mut self) -> Result<(), SagaError> {
for (i, step) in self.steps.iter().enumerate() {
self.current_step = i;
match step.execute().await {
Ok(_) => {
self.status = SagaStatus::InProgress;
}
Err(error) => {
self.status = SagaStatus::Failed;
self.compensate().await?;
return Err(error);
}
}
}
self.status = SagaStatus::Completed;
Ok(())
}
async fn compensate(&mut self) -> Result<(), SagaError> {
for step in self.steps[..self.current_step].iter().rev() {
step.compensate().await?;
}
Ok(())
}
}
🔍 Event Monitoring
Event Metrics
[event_metrics]
event_count: true
processing_latency: true
error_rate: true
throughput: true[metrics_config]
prometheus_metrics: true
custom_metrics: true
business_metrics: true
Event Tracing
[event_tracing]
distributed_tracing: true
correlation_ids: true
causation_ids: true[tracing_config]
jaeger_endpoint: "@env('JAEGER_ENDPOINT')"
trace_sampling: 0.1
correlation_propagation: true
🚀 Performance Optimization
Event Processing
[event_processing]
parallel_processing: true
batch_processing: true
streaming: true[processing_config]
worker_threads: 4
batch_size: 100
processing_timeout: "30s"
Caching Strategy
[caching_strategy]
read_through: true
write_through: true
cache_aside: true[cache_config]
redis_cache: "distributed"
cache_ttl: "1h"
cache_invalidation: "event_driven"
🔧 Development Tools
Event Testing
[event_testing]
unit_tests: true
integration_tests: true
event_replay: true[testing_config]
test_event_store: "in_memory"
event_replay: "from_snapshot"
integration_testing: "docker_compose"
Event Debugging
[event_debugging]
event_logging: true
event_inspection: true
event_replay: true[debug_config]
event_log_level: "debug"
event_inspection: "web_ui"
event_replay: "cli_tool"
🎯 Best Practices
1. Event Design
- Design events for business meaning - Keep events immutable - Use semantic event names - Include all necessary data2. Event Sourcing
- Use aggregates for consistency - Implement proper versioning - Handle concurrency conflicts - Use snapshots for performance3. CQRS
- Separate read and write models - Optimize for specific use cases - Use appropriate consistency levels - Implement proper caching4. Event Streaming
- Use appropriate partitioning - Handle backpressure - Implement proper error handling - Monitor event processing5. Monitoring
- Track event processing metrics - Monitor event store performance - Implement proper alerting - Use distributed tracingEvent-driven architecture with TuskLang and Rust provides a powerful foundation for building scalable, maintainable, and performant systems that can handle complex business workflows and real-time data processing.