💎 Event Sourcing with TuskLang and Ruby
Event Sourcing with TuskLang and Ruby
📡 Build Systems That Remember Everything
TuskLang enables sophisticated event sourcing for Ruby applications, providing event stores, aggregates, projections, and event-driven architecture. Build systems that maintain complete audit trails and can reconstruct any state at any point in time.
🚀 Quick Start: Event Store
Basic Event Sourcing Configuration
config/event_sourcing.tsk
[event_sourcing]
enabled: @env("EVENT_SOURCING_ENABLED", "true")
event_store_type: @env("EVENT_STORE_TYPE", "postgresql") # postgresql, redis, mongodb
snapshot_frequency: @env("SNAPSHOT_FREQUENCY", "100")
event_serialization: @env("EVENT_SERIALIZATION", "json") # json, msgpack, protobuf[event_store]
host: @env("EVENT_STORE_HOST", "localhost")
port: @env("EVENT_STORE_PORT", "5432")
database: @env("EVENT_STORE_DATABASE", "event_store")
username: @env("EVENT_STORE_USERNAME", "postgres")
password: @env.secure("EVENT_STORE_PASSWORD")
[projections]
enabled: @env("PROJECTIONS_ENABLED", "true")
real_time: @env("REAL_TIME_PROJECTIONS", "true")
batch_size: @env("PROJECTION_BATCH_SIZE", "1000")
parallel_processing: @env("PARALLEL_PROJECTIONS", "true")
[aggregates]
snapshot_enabled: @env("AGGREGATE_SNAPSHOTS_ENABLED", "true")
snapshot_retention: @env("SNAPSHOT_RETENTION_PERIOD", "30d")
optimistic_concurrency: @env("OPTIMISTIC_CONCURRENCY", "true")
Event Store Implementation
lib/event_store.rb
require 'tusk'
require 'redis'
require 'json'
require 'securerandom'class EventStore
def initialize(config_path = 'config/event_sourcing.tsk')
@config = Tusk.load(config_path)
@redis = Redis.new(url: @config['redis']['url'])
setup_event_store
end
def append_events(stream_id, events, expected_version = nil)
return { success: false, error: 'Event sourcing disabled' } unless @config['event_sourcing']['enabled'] == 'true'
# Validate expected version for optimistic concurrency
if expected_version && @config['aggregates']['optimistic_concurrency'] == 'true'
current_version = get_stream_version(stream_id)
if current_version != expected_version
return {
success: false,
error: "Concurrency conflict: expected version #{expected_version}, got #{current_version}"
}
end
end
# Prepare events for storage
prepared_events = events.map do |event|
prepare_event_for_storage(event, stream_id)
end
# Store events atomically
begin
store_events(stream_id, prepared_events)
update_stream_metadata(stream_id, prepared_events.length)
# Trigger projections
trigger_projections(prepared_events) if @config['projections']['enabled'] == 'true'
# Create snapshot if needed
create_snapshot_if_needed(stream_id) if @config['aggregates']['snapshot_enabled'] == 'true'
{
success: true,
stream_id: stream_id,
events_count: prepared_events.length,
next_expected_version: get_stream_version(stream_id)
}
rescue => e
{
success: false,
error: "Failed to append events: #{e.message}"
}
end
end
def get_events(stream_id, from_version = 0, to_version = nil)
events = []
current_version = from_version
loop do
batch = get_events_batch(stream_id, current_version, 100)
break if batch.empty?
events.concat(batch)
current_version = batch.last[:version] + 1
break if to_version && current_version > to_version
end
events
end
def get_stream_metadata(stream_id)
metadata = @redis.hgetall("stream_metadata:#{stream_id}")
return nil if metadata.empty?
{
stream_id: stream_id,
version: metadata['version'].to_i,
event_count: metadata['event_count'].to_i,
created_at: metadata['created_at'],
last_updated: metadata['last_updated']
}
end
def create_snapshot(stream_id, aggregate_state, version)
snapshot = {
id: SecureRandom.uuid,
stream_id: stream_id,
version: version,
state: aggregate_state,
created_at: Time.now.iso8601
}
@redis.hset("snapshots:#{stream_id}", version.to_s, snapshot.to_json)
snapshot
end
def get_latest_snapshot(stream_id)
snapshots = @redis.hgetall("snapshots:#{stream_id}")
return nil if snapshots.empty?
latest_version = snapshots.keys.map(&:to_i).max
snapshot_data = snapshots[latest_version.to_s]
JSON.parse(snapshot_data) if snapshot_data
end
def subscribe_to_events(event_types = nil, &block)
subscription_id = SecureRandom.uuid
subscription = {
id: subscription_id,
event_types: event_types,
callback: block,
created_at: Time.now.iso8601,
active: true
}
@redis.hset('event_subscriptions', subscription_id, subscription.to_json)
subscription
end
def unsubscribe_from_events(subscription_id)
@redis.hdel('event_subscriptions', subscription_id)
end
def get_event_statistics
{
total_events: get_total_event_count,
total_streams: get_total_stream_count,
events_by_type: get_events_by_type,
recent_events: get_recent_events(100)
}
end
def replay_events(from_timestamp = nil, to_timestamp = nil)
events = get_events_in_timerange(from_timestamp, to_timestamp)
replay_result = {
events_processed: 0,
errors: [],
start_time: Time.now.iso8601
}
events.each do |event|
begin
process_event_for_replay(event)
replay_result[:events_processed] += 1
rescue => e
replay_result[:errors] << {
event_id: event[:id],
error: e.message
}
end
end
replay_result[:end_time] = Time.now.iso8601
replay_result[:duration] = Time.parse(replay_result[:end_time]) - Time.parse(replay_result[:start_time])
replay_result
end
private
def setup_event_store
# Initialize event store tables/collections if needed
case @config['event_sourcing']['event_store_type']
when 'postgresql'
setup_postgresql_event_store
when 'redis'
setup_redis_event_store
when 'mongodb'
setup_mongodb_event_store
end
end
def setup_postgresql_event_store
# Implementation for PostgreSQL event store setup
end
def setup_redis_event_store
# Implementation for Redis event store setup
end
def setup_mongodb_event_store
# Implementation for MongoDB event store setup
end
def prepare_event_for_storage(event, stream_id)
{
id: SecureRandom.uuid,
stream_id: stream_id,
type: event[:type],
data: event[:data],
metadata: event[:metadata] || {},
version: get_next_version(stream_id),
timestamp: Time.now.iso8601,
correlation_id: event[:correlation_id] || SecureRandom.uuid,
causation_id: event[:causation_id]
}
end
def store_events(stream_id, events)
events.each do |event|
event_key = "event:#{stream_id}:#{event[:version]}"
@redis.setex(event_key, 86400 * 365, event.to_json) # 1 year retention
# Store event in global event log
@redis.lpush('event_log', event.to_json)
@redis.ltrim('event_log', 0, 999999) # Keep last 1M events
end
end
def update_stream_metadata(stream_id, events_count)
metadata_key = "stream_metadata:#{stream_id}"
current_metadata = @redis.hgetall(metadata_key)
if current_metadata.empty?
@redis.hmset(metadata_key,
'version', events_count - 1,
'event_count', events_count,
'created_at', Time.now.iso8601,
'last_updated', Time.now.iso8601
)
else
current_version = current_metadata['version'].to_i
current_event_count = current_metadata['event_count'].to_i
@redis.hmset(metadata_key,
'version', current_version + events_count,
'event_count', current_event_count + events_count,
'last_updated', Time.now.iso8601
)
end
end
def get_stream_version(stream_id)
metadata = get_stream_metadata(stream_id)
metadata ? metadata[:version] : -1
end
def get_next_version(stream_id)
get_stream_version(stream_id) + 1
end
def get_events_batch(stream_id, from_version, limit)
events = []
current_version = from_version
limit.times do
event_key = "event:#{stream_id}:#{current_version}"
event_data = @redis.get(event_key)
break unless event_data
events << JSON.parse(event_data)
current_version += 1
end
events
end
def trigger_projections(events)
subscriptions = @redis.hgetall('event_subscriptions')
subscriptions.each do |subscription_id, subscription_data|
subscription = JSON.parse(subscription_data)
next unless subscription['active']
events.each do |event|
if should_process_event_for_subscription(event, subscription)
begin
subscription['callback'].call(event)
rescue => e
Rails.logger.error "Projection error: #{e.message}"
end
end
end
end
end
def should_process_event_for_subscription(event, subscription)
return true unless subscription['event_types']
subscription['event_types'].include?(event[:type])
end
def create_snapshot_if_needed(stream_id)
frequency = @config['event_sourcing']['snapshot_frequency'].to_i
metadata = get_stream_metadata(stream_id)
return unless metadata && metadata[:version] % frequency == 0
# This would typically reconstruct the aggregate state
# For now, we'll create a placeholder snapshot
create_snapshot(stream_id, { placeholder: true }, metadata[:version])
end
def get_total_event_count
@redis.llen('event_log')
end
def get_total_stream_count
@redis.keys('stream_metadata:*').length
end
def get_events_by_type
events = @redis.lrange('event_log', 0, 9999)
event_types = {}
events.each do |event_data|
event = JSON.parse(event_data)
event_types[event['type']] ||= 0
event_types[event['type']] += 1
end
event_types
end
def get_recent_events(limit)
events = @redis.lrange('event_log', 0, limit - 1)
events.map { |event_data| JSON.parse(event_data) }
end
def get_events_in_timerange(from_timestamp, to_timestamp)
events = @redis.lrange('event_log', 0, -1)
filtered_events = []
events.each do |event_data|
event = JSON.parse(event_data)
event_time = Time.parse(event['timestamp'])
if from_timestamp && event_time < Time.parse(from_timestamp)
next
end
if to_timestamp && event_time > Time.parse(to_timestamp)
next
end
filtered_events << event
end
filtered_events
end
def process_event_for_replay(event)
# Implementation for event replay processing
Rails.logger.info "Replaying event: #{event['id']}"
end
end
🏗️ Aggregate Implementation
Event-Sourced Aggregates
lib/aggregate.rb
require 'tusk'
require 'json'class Aggregate
def initialize(id, event_store)
@id = id
@event_store = event_store
@version = 0
@uncommitted_events = []
@state = initial_state
end
def load_from_history
# Load from snapshot if available
snapshot = @event_store.get_latest_snapshot(@id)
if snapshot
@state = snapshot['state']
@version = snapshot['version']
end
# Load remaining events
events = @event_store.get_events(@id, @version + 1)
events.each do |event|
apply_event(event)
@version = event['version']
end
self
end
def save
return { success: true, events_count: 0 } if @uncommitted_events.empty?
result = @event_store.append_events(@id, @uncommitted_events, @version)
if result[:success]
@version = result[:next_expected_version]
@uncommitted_events.clear
end
result
end
def apply_event(event)
handler_method = "apply_#{event['type'].underscore}"
if respond_to?(handler_method, true)
send(handler_method, event['data'])
else
Rails.logger.warn "No handler found for event: #{event['type']}"
end
end
def record_event(event_type, data, metadata = {})
event = {
type: event_type,
data: data,
metadata: metadata,
correlation_id: get_correlation_id,
causation_id: get_causation_id
}
@uncommitted_events << event
apply_event(event)
end
def get_state
@state.dup
end
def get_version
@version
end
def get_uncommitted_events
@uncommitted_events.dup
end
protected
def initial_state
{}
end
def get_correlation_id
Thread.current[:correlation_id] || SecureRandom.uuid
end
def get_causation_id
Thread.current[:causation_id] || SecureRandom.uuid
end
end
Example User Aggregate
class UserAggregate < Aggregate
def initialize(id, event_store)
super(id, event_store)
end def create_user(email, name)
return { success: false, error: 'User already exists' } if @state[:email]
record_event('UserCreated', {
email: email,
name: name,
created_at: Time.now.iso8601
})
{ success: true }
end
def update_profile(updates)
return { success: false, error: 'User does not exist' } unless @state[:email]
record_event('UserProfileUpdated', {
updates: updates,
updated_at: Time.now.iso8601
})
{ success: true }
end
def deactivate_user(reason)
return { success: false, error: 'User does not exist' } unless @state[:email]
return { success: false, error: 'User already deactivated' } if @state[:deactivated]
record_event('UserDeactivated', {
reason: reason,
deactivated_at: Time.now.iso8601
})
{ success: true }
end
private
def apply_user_created(data)
@state.merge!(
email: data['email'],
name: data['name'],
created_at: data['created_at'],
active: true
)
end
def apply_user_profile_updated(data)
@state.merge!(data['updates'])
@state[:updated_at] = data['updated_at']
end
def apply_user_deactivated(data)
@state[:deactivated] = true
@state[:deactivated_at] = data['deactivated_at']
@state[:deactivation_reason] = data['reason']
end
end
Example Order Aggregate
class OrderAggregate < Aggregate
def initialize(id, event_store)
super(id, event_store)
end def create_order(user_id, items)
record_event('OrderCreated', {
user_id: user_id,
items: items,
total: calculate_total(items),
created_at: Time.now.iso8601
})
{ success: true }
end
def add_item(item)
return { success: false, error: 'Order cannot be modified' } unless can_modify?
record_event('ItemAdded', {
item: item,
added_at: Time.now.iso8601
})
{ success: true }
end
def remove_item(item_id)
return { success: false, error: 'Order cannot be modified' } unless can_modify?
record_event('ItemRemoved', {
item_id: item_id,
removed_at: Time.now.iso8601
})
{ success: true }
end
def confirm_order
return { success: false, error: 'Order already confirmed' } if @state[:confirmed]
return { success: false, error: 'Order has no items' } if @state[:items]&.empty?
record_event('OrderConfirmed', {
confirmed_at: Time.now.iso8601
})
{ success: true }
end
def cancel_order(reason)
return { success: false, error: 'Order already cancelled' } if @state[:cancelled]
return { success: false, error: 'Order already shipped' } if @state[:shipped]
record_event('OrderCancelled', {
reason: reason,
cancelled_at: Time.now.iso8601
})
{ success: true }
end
private
def can_modify?
!@state[:confirmed] && !@state[:cancelled] && !@state[:shipped]
end
def calculate_total(items)
items.sum { |item| item['price'] * item['quantity'] }
end
def apply_order_created(data)
@state.merge!(
user_id: data['user_id'],
items: data['items'],
total: data['total'],
created_at: data['created_at'],
status: 'pending'
)
end
def apply_item_added(data)
@state[:items] ||= []
@state[:items] << data['item']
@state[:total] = calculate_total(@state[:items])
@state[:updated_at] = data['added_at']
end
def apply_item_removed(data)
@state[:items].reject! { |item| item['id'] == data['item_id'] }
@state[:total] = calculate_total(@state[:items])
@state[:updated_at] = data['removed_at']
end
def apply_order_confirmed(data)
@state[:confirmed] = true
@state[:confirmed_at] = data['confirmed_at']
@state[:status] = 'confirmed'
end
def apply_order_cancelled(data)
@state[:cancelled] = true
@state[:cancelled_at] = data['cancelled_at']
@state[:cancellation_reason] = data['reason']
@state[:status] = 'cancelled'
end
end
📊 Projections
Event Projections
lib/projection.rb
require 'tusk'
require 'redis'
require 'json'class Projection
def initialize(name, event_store, config_path = 'config/event_sourcing.tsk')
@name = name
@event_store = event_store
@config = Tusk.load(config_path)
@redis = Redis.new(url: @config['redis']['url'])
@state = {}
end
def process_event(event)
handler_method = "handle_#{event['type'].underscore}"
if respond_to?(handler_method, true)
send(handler_method, event)
update_projection_state
end
end
def get_state
@state
end
def reset
@state = {}
@redis.del("projection_state:#{@name}")
end
def rebuild
reset
events = @event_store.get_events_in_timerange(nil, nil)
events.each do |event|
process_event(event)
end
end
protected
def update_projection_state
@redis.set("projection_state:#{@name}", @state.to_json)
end
def load_projection_state
state_data = @redis.get("projection_state:#{@name}")
@state = state_data ? JSON.parse(state_data) : {}
end
end
Example User Projection
class UserProjection < Projection
def initialize(event_store, config_path)
super('user_projection', event_store, config_path)
load_projection_state
end private
def handle_user_created(event)
user_id = event['stream_id']
data = event['data']
@state[user_id] = {
id: user_id,
email: data['email'],
name: data['name'],
created_at: data['created_at'],
active: true,
updated_at: data['created_at']
}
end
def handle_user_profile_updated(event)
user_id = event['stream_id']
data = event['data']
return unless @state[user_id]
@state[user_id].merge!(data['updates'])
@state[user_id][:updated_at] = data['updated_at']
end
def handle_user_deactivated(event)
user_id = event['stream_id']
data = event['data']
return unless @state[user_id]
@state[user_id].merge!(
deactivated: true,
deactivated_at: data['deactivated_at'],
deactivation_reason: data['reason'],
active: false,
updated_at: data['deactivated_at']
)
end
end
Example Order Projection
class OrderProjection < Projection
def initialize(event_store, config_path)
super('order_projection', event_store, config_path)
load_projection_state
end def get_user_orders(user_id)
@state.values.select { |order| order['user_id'] == user_id }
end
def get_active_orders
@state.values.select { |order| order['status'] == 'pending' }
end
def get_order_statistics
total_orders = @state.length
confirmed_orders = @state.values.count { |order| order['confirmed'] }
cancelled_orders = @state.values.count { |order| order['cancelled'] }
total_revenue = @state.values.sum { |order| order['total'] || 0 }
{
total_orders: total_orders,
confirmed_orders: confirmed_orders,
cancelled_orders: cancelled_orders,
total_revenue: total_revenue,
conversion_rate: total_orders > 0 ? (confirmed_orders.to_f / total_orders * 100).round(2) : 0
}
end
private
def handle_order_created(event)
order_id = event['stream_id']
data = event['data']
@state[order_id] = {
id: order_id,
user_id: data['user_id'],
items: data['items'],
total: data['total'],
created_at: data['created_at'],
status: 'pending',
updated_at: data['created_at']
}
end
def handle_item_added(event)
order_id = event['stream_id']
data = event['data']
return unless @state[order_id]
@state[order_id][:items] ||= []
@state[order_id][:items] << data['item']
@state[order_id][:total] = calculate_total(@state[order_id][:items])
@state[order_id][:updated_at] = data['added_at']
end
def handle_item_removed(event)
order_id = event['stream_id']
data = event['data']
return unless @state[order_id]
@state[order_id][:items].reject! { |item| item['id'] == data['item_id'] }
@state[order_id][:total] = calculate_total(@state[order_id][:items])
@state[order_id][:updated_at] = data['removed_at']
end
def handle_order_confirmed(event)
order_id = event['stream_id']
data = event['data']
return unless @state[order_id]
@state[order_id].merge!(
confirmed: true,
confirmed_at: data['confirmed_at'],
status: 'confirmed',
updated_at: data['confirmed_at']
)
end
def handle_order_cancelled(event)
order_id = event['stream_id']
data = event['data']
return unless @state[order_id]
@state[order_id].merge!(
cancelled: true,
cancelled_at: data['cancelled_at'],
cancellation_reason: data['reason'],
status: 'cancelled',
updated_at: data['cancelled_at']
)
end
def calculate_total(items)
items.sum { |item| item['price'] * item['quantity'] }
end
end
🔄 Event Handlers
Event Processing Pipeline
lib/event_handler.rb
require 'tusk'
require 'redis'
require 'json'class EventHandler
def initialize(config_path = 'config/event_sourcing.tsk')
@config = Tusk.load(config_path)
@redis = Redis.new(url: @config['redis']['url'])
@handlers = {}
@middleware = []
end
def register_handler(event_type, handler)
@handlers[event_type] ||= []
@handlers[event_type] << handler
end
def add_middleware(middleware)
@middleware << middleware
end
def process_event(event)
return unless @config['event_sourcing']['enabled'] == 'true'
# Apply middleware
processed_event = apply_middleware(event)
return unless processed_event
# Get handlers for this event type
handlers = @handlers[event['type']] || []
# Process with all registered handlers
results = handlers.map do |handler|
process_with_handler(handler, processed_event)
end
{
event_id: event['id'],
event_type: event['type'],
handlers_processed: handlers.length,
results: results
}
end
def process_events_batch(events)
return { processed: 0, errors: [] } if events.empty?
batch_size = @config['projections']['batch_size'].to_i
processed = 0
errors = []
events.each_slice(batch_size) do |batch|
batch.each do |event|
begin
process_event(event)
processed += 1
rescue => e
errors << {
event_id: event['id'],
error: e.message
}
end
end
end
{
processed: processed,
errors: errors,
total_events: events.length
}
end
def get_handler_statistics
stats = {}
@handlers.each do |event_type, handlers|
stats[event_type] = {
handler_count: handlers.length,
handler_types: handlers.map(&:class).map(&:name)
}
end
stats
end
private
def apply_middleware(event)
processed_event = event.dup
@middleware.each do |middleware|
begin
processed_event = middleware.process(processed_event)
break unless processed_event
rescue => e
Rails.logger.error "Middleware error: #{e.message}"
return nil
end
end
processed_event
end
def process_with_handler(handler, event)
start_time = Time.now
begin
if handler.respond_to?(:call)
result = handler.call(event)
elsif handler.respond_to?(:process_event)
result = handler.process_event(event)
else
raise "Handler does not respond to call or process_event"
end
{
handler: handler.class.name,
success: true,
result: result,
duration: Time.now - start_time
}
rescue => e
{
handler: handler.class.name,
success: false,
error: e.message,
duration: Time.now - start_time
}
end
end
end
Example Email Notification Handler
class EmailNotificationHandler
def process_event(event)
case event['type']
when 'UserCreated'
send_welcome_email(event['data'])
when 'OrderConfirmed'
send_order_confirmation_email(event['data'])
when 'OrderCancelled'
send_order_cancellation_email(event['data'])
end
end private
def send_welcome_email(user_data)
# Implementation for sending welcome email
Rails.logger.info "Sending welcome email to #{user_data['email']}"
end
def send_order_confirmation_email(order_data)
# Implementation for sending order confirmation email
Rails.logger.info "Sending order confirmation email"
end
def send_order_cancellation_email(order_data)
# Implementation for sending order cancellation email
Rails.logger.info "Sending order cancellation email"
end
end
Example Audit Log Handler
class AuditLogHandler
def process_event(event)
audit_entry = {
event_id: event['id'],
event_type: event['type'],
stream_id: event['stream_id'],
timestamp: event['timestamp'],
data: event['data'],
metadata: event['metadata']
} store_audit_entry(audit_entry)
end
private
def store_audit_entry(audit_entry)
# Implementation for storing audit entry
Rails.logger.info "Audit log: #{audit_entry.to_json}"
end
end
🎯 Configuration Management
Event Sourcing Configuration
config/event_sourcing_features.tsk
[event_sourcing]
enabled: @env("EVENT_SOURCING_ENABLED", "true")
event_store_type: @env("EVENT_STORE_TYPE", "postgresql")
snapshot_frequency: @env("SNAPSHOT_FREQUENCY", "100")
event_serialization: @env("EVENT_SERIALIZATION", "json")[event_store]
host: @env("EVENT_STORE_HOST", "localhost")
port: @env("EVENT_STORE_PORT", "5432")
database: @env("EVENT_STORE_DATABASE", "event_store")
username: @env("EVENT_STORE_USERNAME", "postgres")
password: @env.secure("EVENT_STORE_PASSWORD")
connection_pool: @env("EVENT_STORE_CONNECTION_POOL", "10")
[projections]
enabled: @env("PROJECTIONS_ENABLED", "true")
real_time: @env("REAL_TIME_PROJECTIONS", "true")
batch_size: @env("PROJECTION_BATCH_SIZE", "1000")
parallel_processing: @env("PARALLEL_PROJECTIONS", "true")
retry_attempts: @env("PROJECTION_RETRY_ATTEMPTS", "3")
[aggregates]
snapshot_enabled: @env("AGGREGATE_SNAPSHOTS_ENABLED", "true")
snapshot_retention: @env("SNAPSHOT_RETENTION_PERIOD", "30d")
optimistic_concurrency: @env("OPTIMISTIC_CONCURRENCY", "true")
max_events_per_aggregate: @env("MAX_EVENTS_PER_AGGREGATE", "10000")
[handlers]
enabled: @env("EVENT_HANDLERS_ENABLED", "true")
async_processing: @env("ASYNC_EVENT_PROCESSING", "true")
error_handling: @env("EVENT_HANDLER_ERROR_HANDLING", "true")
retry_failed_events: @env("RETRY_FAILED_EVENTS", "true")
[monitoring]
metrics_enabled: @env("EVENT_SOURCING_METRICS_ENABLED", "true")
event_tracking: @env("EVENT_TRACKING_ENABLED", "true")
performance_monitoring: @env("EVENT_PERFORMANCE_MONITORING", "true")
🎯 Summary
This comprehensive guide covers event sourcing with TuskLang and Ruby, including:
- Event Store: Complete event storage and retrieval system - Aggregates: Event-sourced aggregates with state reconstruction - Projections: Real-time and batch event projections - Event Handlers: Event processing pipeline with middleware - Configuration Management: Enterprise-grade event sourcing configuration - Snapshot Management: Performance optimization through snapshots - Event Replay: Complete system state reconstruction capabilities
The event sourcing features with TuskLang provide a robust foundation for building systems that maintain complete audit trails, enable temporal queries, and support complex event-driven architectures while ensuring data consistency and scalability.