🔷 📡 Event-Driven Architecture - TuskLang for C# - "Event Mastery"

C# Documentation

📡 Event-Driven Architecture - TuskLang for C# - "Event Mastery"

Master event-driven architecture with TuskLang in your C# applications!

Event-driven architecture enables loose coupling and scalability. This guide covers event sourcing, CQRS, message queues, and real-world event-driven scenarios for TuskLang in C# environments.

🎯 Event-Driven Philosophy

"We Don't Bow to Any King"

- Loose coupling - Services communicate via events - Event sourcing - Events as source of truth - CQRS - Separate read and write models - Scalability - Handle high event volumes - Replay capability - Rebuild state from events

📨 Event Sourcing

Example: Event Store with TuskLang

// EventStoreService.cs
using System.Text.Json;

public class EventStoreService { private readonly TuskLang _parser; private readonly ILogger<EventStoreService> _logger; private readonly List<Event> _events; public EventStoreService(ILogger<EventStoreService> logger) { _parser = new TuskLang(); _events = new List<Event>(); _logger = logger; } public async Task<Event> AppendEventAsync(string aggregateId, string eventType, Dictionary<string, object> data) { var @event = new Event { Id = Guid.NewGuid().ToString(), AggregateId = aggregateId, EventType = eventType, Data = data, Timestamp = DateTime.UtcNow, Version = await GetNextVersionAsync(aggregateId) }; _events.Add(@event); _logger.LogInformation("Appended event {EventType} for aggregate {AggregateId} at version {Version}", eventType, aggregateId, @event.Version); return @event; } public async Task<List<Event>> GetEventsAsync(string aggregateId, int fromVersion = 0) { var events = _events .Where(e => e.AggregateId == aggregateId && e.Version > fromVersion) .OrderBy(e => e.Version) .ToList(); _logger.LogInformation("Retrieved {Count} events for aggregate {AggregateId} from version {FromVersion}", events.Count, aggregateId, fromVersion); return events; } public async Task<Dictionary<string, object>> GetEventStoreConfigurationAsync() { var config = new Dictionary<string, object>(); // Event store statistics config["total_events"] = _events.Count; config["unique_aggregates"] = _events.Select(e => e.AggregateId).Distinct().Count(); config["event_types"] = _events.Select(e => e.EventType).Distinct().ToList(); // Recent events var recentEvents = _events .OrderByDescending(e => e.Timestamp) .Take(10) .Select(e => new Dictionary<string, object> { ["id"] = e.Id, ["aggregate_id"] = e.AggregateId, ["event_type"] = e.EventType, ["timestamp"] = e.Timestamp.ToString("yyyy-MM-dd HH:mm:ss"), ["version"] = e.Version }) .ToList(); config["recent_events"] = recentEvents; return config; } private async Task<int> GetNextVersionAsync(string aggregateId) { var lastEvent = _events .Where(e => e.AggregateId == aggregateId) .OrderByDescending(e => e.Version) .FirstOrDefault(); return lastEvent?.Version + 1 ?? 1; } }

public class Event { public string Id { get; set; } = string.Empty; public string AggregateId { get; set; } = string.Empty; public string EventType { get; set; } = string.Empty; public Dictionary<string, object> Data { get; set; } = new Dictionary<string, object>(); public DateTime Timestamp { get; set; } public int Version { get; set; } }

🔄 CQRS Implementation

Example: Command and Query Separation

// CqrsService.cs
public class CqrsService
{
    private readonly EventStoreService _eventStore;
    private readonly TuskLang _parser;
    private readonly ILogger<CqrsService> _logger;
    private readonly Dictionary<string, object> _readModels;
    
    public CqrsService(EventStoreService eventStore, ILogger<CqrsService> logger)
    {
        _eventStore = eventStore;
        _parser = new TuskLang();
        _readModels = new Dictionary<string, object>();
        _logger = logger;
    }
    
    // Command side - Write operations
    public async Task<CommandResult> ExecuteCommandAsync(ICommand command)
    {
        try
        {
            var result = new CommandResult { Success = true };
            
            switch (command)
            {
                case CreateUserCommand createUser:
                    result = await HandleCreateUserAsync(createUser);
                    break;
                case UpdateUserCommand updateUser:
                    result = await HandleUpdateUserAsync(updateUser);
                    break;
                case DeleteUserCommand deleteUser:
                    result = await HandleDeleteUserAsync(deleteUser);
                    break;
                default:
                    result.Success = false;
                    result.Error = $"Unknown command type: {command.GetType().Name}";
                    break;
            }
            
            if (result.Success)
            {
                // Update read models
                await UpdateReadModelsAsync(command.AggregateId);
            }
            
            return result;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to execute command {CommandType}", command.GetType().Name);
            return new CommandResult { Success = false, Error = ex.Message };
        }
    }
    
    // Query side - Read operations
    public async Task<Dictionary<string, object>> ExecuteQueryAsync(IQuery query)
    {
        try
        {
            switch (query)
            {
                case GetUserQuery getUser:
                    return await HandleGetUserAsync(getUser);
                case GetUsersQuery getUsers:
                    return await HandleGetUsersAsync(getUsers);
                case GetUserHistoryQuery getUserHistory:
                    return await HandleGetUserHistoryAsync(getUserHistory);
                default:
                    return new Dictionary<string, object> { ["error"] = $"Unknown query type: {query.GetType().Name}" };
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to execute query {QueryType}", query.GetType().Name);
            return new Dictionary<string, object> { ["error"] = ex.Message };
        }
    }
    
    private async Task<CommandResult> HandleCreateUserAsync(CreateUserCommand command)
    {
        var eventData = new Dictionary<string, object>
        {
            ["user_id"] = command.UserId,
            ["first_name"] = command.FirstName,
            ["last_name"] = command.LastName,
            ["email"] = command.Email
        };
        
        await _eventStore.AppendEventAsync(command.UserId, "UserCreated", eventData);
        
        return new CommandResult { Success = true };
    }
    
    private async Task<CommandResult> HandleUpdateUserAsync(UpdateUserCommand command)
    {
        var eventData = new Dictionary<string, object>
        {
            ["user_id"] = command.UserId,
            ["first_name"] = command.FirstName,
            ["last_name"] = command.LastName
        };
        
        await _eventStore.AppendEventAsync(command.UserId, "UserUpdated", eventData);
        
        return new CommandResult { Success = true };
    }
    
    private async Task<CommandResult> HandleDeleteUserAsync(DeleteUserCommand command)
    {
        var eventData = new Dictionary<string, object>
        {
            ["user_id"] = command.UserId
        };
        
        await _eventStore.AppendEventAsync(command.UserId, "UserDeleted", eventData);
        
        return new CommandResult { Success = true };
    }
    
    private async Task<Dictionary<string, object>> HandleGetUserAsync(GetUserQuery query)
    {
        if (_readModels.TryGetValue(query.UserId, out var user))
        {
            return new Dictionary<string, object> { ["user"] = user };
        }
        
        return new Dictionary<string, object> { ["error"] = "User not found" };
    }
    
    private async Task<Dictionary<string, object>> HandleGetUsersAsync(GetUsersQuery query)
    {
        var users = _readModels.Values.Where(u => u is Dictionary<string, object>).ToList();
        return new Dictionary<string, object> { ["users"] = users };
    }
    
    private async Task<Dictionary<string, object>> HandleGetUserHistoryAsync(GetUserHistoryQuery query)
    {
        var events = await _eventStore.GetEventsAsync(query.UserId);
        return new Dictionary<string, object> { ["events"] = events };
    }
    
    private async Task UpdateReadModelsAsync(string aggregateId)
    {
        var events = await _eventStore.GetEventsAsync(aggregateId);
        var user = new Dictionary<string, object>();
        
        foreach (var @event in events)
        {
            switch (@event.EventType)
            {
                case "UserCreated":
                    user["id"] = @event.Data["user_id"];
                    user["first_name"] = @event.Data["first_name"];
                    user["last_name"] = @event.Data["last_name"];
                    user["email"] = @event.Data["email"];
                    user["created_at"] = @event.Timestamp.ToString("yyyy-MM-dd HH:mm:ss");
                    break;
                case "UserUpdated":
                    user["first_name"] = @event.Data["first_name"];
                    user["last_name"] = @event.Data["last_name"];
                    user["updated_at"] = @event.Timestamp.ToString("yyyy-MM-dd HH:mm:ss");
                    break;
                case "UserDeleted":
                    user["deleted"] = true;
                    user["deleted_at"] = @event.Timestamp.ToString("yyyy-MM-dd HH:mm:ss");
                    break;
            }
        }
        
        if (user.ContainsKey("deleted") && (bool)user["deleted"])
        {
            _readModels.Remove(aggregateId);
        }
        else
        {
            _readModels[aggregateId] = user;
        }
    }
}

// Command and Query interfaces public interface ICommand { string AggregateId { get; } }

public interface IQuery { }

public class CreateUserCommand : ICommand { public string AggregateId => UserId; public string UserId { get; set; } = string.Empty; public string FirstName { get; set; } = string.Empty; public string LastName { get; set; } = string.Empty; public string Email { get; set; } = string.Empty; }

public class UpdateUserCommand : ICommand { public string AggregateId => UserId; public string UserId { get; set; } = string.Empty; public string FirstName { get; set; } = string.Empty; public string LastName { get; set; } = string.Empty; }

public class DeleteUserCommand : ICommand { public string AggregateId => UserId; public string UserId { get; set; } = string.Empty; }

public class GetUserQuery : IQuery { public string UserId { get; set; } = string.Empty; }

public class GetUsersQuery : IQuery { }

public class GetUserHistoryQuery : IQuery { public string UserId { get; set; } = string.Empty; }

public class CommandResult { public bool Success { get; set; } public string? Error { get; set; } }

📬 Message Queue Integration

Example: RabbitMQ Integration

// MessageQueueService.cs
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

public class MessageQueueService : IDisposable { private readonly IConnection _connection; private readonly IModel _channel; private readonly TuskLang _parser; private readonly ILogger<MessageQueueService> _logger; public MessageQueueService(ILogger<MessageQueueService> logger) { _parser = new TuskLang(); _logger = logger; var factory = new ConnectionFactory { HostName = Environment.GetEnvironmentVariable("RABBITMQ_HOST") ?? "localhost", UserName = Environment.GetEnvironmentVariable("RABBITMQ_USER") ?? "guest", Password = Environment.GetEnvironmentVariable("RABBITMQ_PASS") ?? "guest" }; _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); // Declare exchanges and queues _channel.ExchangeDeclare("user_events", ExchangeType.Topic, durable: true); _channel.QueueDeclare("user_commands", durable: true, exclusive: false, autoDelete: false); _channel.QueueBind("user_commands", "user_events", "user.*"); } public void PublishEvent(string routingKey, Dictionary<string, object> eventData) { try { var message = JsonSerializer.Serialize(eventData); var body = Encoding.UTF8.GetBytes(message); _channel.BasicPublish( exchange: "user_events", routingKey: routingKey, basicProperties: null, body: body ); _logger.LogInformation("Published event with routing key {RoutingKey}", routingKey); } catch (Exception ex) { _logger.LogError(ex, "Failed to publish event with routing key {RoutingKey}", routingKey); throw; } } public void StartConsuming(string queueName, Action<Dictionary<string, object>> messageHandler) { var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { try { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); var eventData = JsonSerializer.Deserialize<Dictionary<string, object>>(message); messageHandler(eventData); _channel.BasicAck(ea.DeliveryTag, false); _logger.LogInformation("Processed message from queue {QueueName}", queueName); } catch (Exception ex) { _logger.LogError(ex, "Failed to process message from queue {QueueName}", queueName); _channel.BasicNack(ea.DeliveryTag, false, true); } }; _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); _logger.LogInformation("Started consuming from queue {QueueName}", queueName); } public async Task<Dictionary<string, object>> GetQueueConfigurationAsync() { var config = new Dictionary<string, object>(); // Queue statistics var queueInfo = _channel.QueueDeclarePassive("user_commands"); config["queue_name"] = "user_commands"; config["message_count"] = queueInfo.MessageCount; config["consumer_count"] = queueInfo.ConsumerCount; // Exchange information config["exchange_name"] = "user_events"; config["exchange_type"] = "topic"; return config; } public void Dispose() { _channel?.Dispose(); _connection?.Dispose(); } }

🛠️ Real-World Event-Driven Scenarios

- E-commerce: Order events, inventory events, payment events - Social media: Post events, like events, comment events - Banking: Transaction events, account events, fraud events - IoT: Sensor events, device events, alert events

🧩 Best Practices

- Use event sourcing for audit trails - Implement CQRS for complex domains - Use message queues for reliable event delivery - Design events for backward compatibility - Monitor event processing performance

🏁 You're Ready!

You can now: - Implement event sourcing with C# TuskLang - Use CQRS for complex domains - Integrate with message queues - Build scalable event-driven systems

Next: Caching Strategies

---

"We don't bow to any king" - Your event mastery, your architectural excellence, your scalability power.

Build event-driven systems. Scale with confidence. 📡