💎 CQRS with TuskLang and Ruby

Ruby Documentation

CQRS with TuskLang and Ruby

Separate Commands from Queries for Maximum Performance

TuskLang enables sophisticated CQRS (Command Query Responsibility Segregation) for Ruby applications, providing separate command and query handlers, optimized read models, and scalable write models. Build systems that can handle massive read and write loads independently.

🚀 Quick Start: CQRS Setup

Basic CQRS Configuration

config/cqrs.tsk

[cqrs] enabled: @env("CQRS_ENABLED", "true") command_bus: @env("COMMAND_BUS_TYPE", "synchronous") # synchronous, asynchronous, distributed query_bus: @env("QUERY_BUS_TYPE", "synchronous") # synchronous, asynchronous, distributed event_sourcing: @env("CQRS_EVENT_SOURCING", "true")

[commands] validation_enabled: @env("COMMAND_VALIDATION_ENABLED", "true") authorization_enabled: @env("COMMAND_AUTHORIZATION_ENABLED", "true") logging_enabled: @env("COMMAND_LOGGING_ENABLED", "true") retry_attempts: @env("COMMAND_RETRY_ATTEMPTS", "3")

[queries] caching_enabled: @env("QUERY_CACHING_ENABLED", "true") cache_ttl: @env("QUERY_CACHE_TTL", "300") authorization_enabled: @env("QUERY_AUTHORIZATION_ENABLED", "true") pagination_enabled: @env("QUERY_PAGINATION_ENABLED", "true")

[read_models] database: @env("READ_MODEL_DATABASE", "postgresql") optimization_enabled: @env("READ_MODEL_OPTIMIZATION_ENABLED", "true") denormalization_enabled: @env("READ_MODEL_DENORMALIZATION_ENABLED", "true")

Command Bus Implementation

lib/command_bus.rb

require 'tusk' require 'redis' require 'json' require 'securerandom'

class CommandBus def initialize(config_path = 'config/cqrs.tsk') @config = Tusk.load(config_path) @redis = Redis.new(url: @config['redis']['url']) @handlers = {} @middleware = [] setup_command_bus end

def register_handler(command_class, handler) @handlers[command_class] = handler end

def add_middleware(middleware) @middleware << middleware end

def execute(command) return { success: false, error: 'CQRS disabled' } unless @config['cqrs']['enabled'] == 'true'

command_id = SecureRandom.uuid start_time = Time.now

begin # Apply middleware processed_command = apply_middleware(command, command_id) return processed_command unless processed_command[:success]

# Validate command validation_result = validate_command(command) unless validation_result[:valid] return { success: false, error: 'Command validation failed', details: validation_result[:errors], command_id: command_id } end

# Authorize command authorization_result = authorize_command(command) unless authorization_result[:authorized] return { success: false, error: 'Command authorization failed', details: authorization_result[:reason], command_id: command_id } end

# Execute command handler = get_handler(command.class) unless handler return { success: false, error: "No handler registered for command: #{command.class}", command_id: command_id } end

result = execute_with_handler(handler, command, command_id) # Log command execution log_command_execution(command, command_id, result, Time.now - start_time) result rescue => e error_result = { success: false, error: e.message, command_id: command_id, duration: Time.now - start_time } log_command_error(command, command_id, e) error_result end end

def execute_batch(commands) return { success: false, error: 'No commands provided' } if commands.empty?

results = [] batch_id = SecureRandom.uuid

commands.each_with_index do |command, index| result = execute(command) result[:batch_id] = batch_id result[:batch_index] = index results << result end

{ batch_id: batch_id, total_commands: commands.length, successful: results.count { |r| r[:success] }, failed: results.count { |r| !r[:success] }, results: results } end

def get_command_statistics { total_commands: get_total_command_count, successful_commands: get_successful_command_count, failed_commands: get_failed_command_count, average_execution_time: get_average_execution_time, commands_by_type: get_commands_by_type } end

private

def setup_command_bus case @config['cqrs']['command_bus'] when 'asynchronous' setup_asynchronous_bus when 'distributed' setup_distributed_bus end end

def setup_asynchronous_bus # Implementation for asynchronous command bus end

def setup_distributed_bus # Implementation for distributed command bus end

def apply_middleware(command, command_id) processed_command = command.dup

@middleware.each do |middleware| begin result = middleware.process(processed_command, command_id) return result unless result[:success] processed_command = result[:command] rescue => e return { success: false, error: "Middleware error: #{e.message}", command_id: command_id } end end

{ success: true, command: processed_command } end

def validate_command(command) return { valid: true } unless @config['commands']['validation_enabled'] == 'true'

if command.respond_to?(:valid?) { valid: command.valid?, errors: command.errors.full_messages } else { valid: true } end end

def authorize_command(command) return { authorized: true } unless @config['commands']['authorization_enabled'] == 'true'

if command.respond_to?(:authorize) command.authorize else { authorized: true } end end

def get_handler(command_class) @handlers[command_class] end

def execute_with_handler(handler, command, command_id) start_time = Time.now

begin if handler.respond_to?(:call) result = handler.call(command) elsif handler.respond_to?(:handle) result = handler.handle(command) else raise "Handler does not respond to call or handle" end

{ success: true, result: result, command_id: command_id, duration: Time.now - start_time } rescue => e { success: false, error: e.message, command_id: command_id, duration: Time.now - start_time } end end

def log_command_execution(command, command_id, result, duration) return unless @config['commands']['logging_enabled'] == 'true'

log_entry = { command_id: command_id, command_type: command.class.name, success: result[:success], duration: duration, timestamp: Time.now.iso8601 }

@redis.lpush('command_log', log_entry.to_json) @redis.ltrim('command_log', 0, 9999) end

def log_command_error(command, command_id, error) error_entry = { command_id: command_id, command_type: command.class.name, error: error.message, backtrace: error.backtrace.first(5), timestamp: Time.now.iso8601 }

@redis.lpush('command_errors', error_entry.to_json) @redis.ltrim('command_errors', 0, 9999) end

def get_total_command_count @redis.llen('command_log') end

def get_successful_command_count commands = @redis.lrange('command_log', 0, -1) commands.count { |cmd| JSON.parse(cmd)['success'] } end

def get_failed_command_count commands = @redis.lrange('command_log', 0, -1) commands.count { |cmd| !JSON.parse(cmd)['success'] } end

def get_average_execution_time commands = @redis.lrange('command_log', 0, -1) return 0 if commands.empty?

total_time = commands.sum { |cmd| JSON.parse(cmd)['duration'] } (total_time / commands.length).round(3) end

def get_commands_by_type commands = @redis.lrange('command_log', 0, -1) command_types = {}

commands.each do |cmd| cmd_data = JSON.parse(cmd) command_type = cmd_data['command_type'] command_types[command_type] ||= 0 command_types[command_type] += 1 end

command_types end end

🔍 Query Bus Implementation

Optimized Query Handling

lib/query_bus.rb

require 'tusk' require 'redis' require 'json' require 'securerandom'

class QueryBus def initialize(config_path = 'config/cqrs.tsk') @config = Tusk.load(config_path) @redis = Redis.new(url: @config['redis']['url']) @handlers = {} @middleware = [] setup_query_bus end

def register_handler(query_class, handler) @handlers[query_class] = handler end

def add_middleware(middleware) @middleware << middleware end

def execute(query) return { success: false, error: 'CQRS disabled' } unless @config['cqrs']['enabled'] == 'true'

query_id = SecureRandom.uuid start_time = Time.now

begin # Check cache first if @config['queries']['caching_enabled'] == 'true' cached_result = get_cached_result(query) if cached_result return { success: true, result: cached_result, cached: true, query_id: query_id, duration: Time.now - start_time } end end

# Apply middleware processed_query = apply_middleware(query, query_id) return processed_query unless processed_query[:success]

# Authorize query authorization_result = authorize_query(query) unless authorization_result[:authorized] return { success: false, error: 'Query authorization failed', details: authorization_result[:reason], query_id: query_id } end

# Execute query handler = get_handler(query.class) unless handler return { success: false, error: "No handler registered for query: #{query.class}", query_id: query_id } end

result = execute_with_handler(handler, query, query_id) # Cache result if successful if result[:success] && @config['queries']['caching_enabled'] == 'true' cache_result(query, result[:result]) end # Log query execution log_query_execution(query, query_id, result, Time.now - start_time) result rescue => e error_result = { success: false, error: e.message, query_id: query_id, duration: Time.now - start_time } log_query_error(query, query_id, e) error_result end end

def execute_batch(queries) return { success: false, error: 'No queries provided' } if queries.empty?

results = [] batch_id = SecureRandom.uuid

queries.each_with_index do |query, index| result = execute(query) result[:batch_id] = batch_id result[:batch_index] = index results << result end

{ batch_id: batch_id, total_queries: queries.length, successful: results.count { |r| r[:success] }, failed: results.count { |r| !r[:success] }, results: results } end

def invalidate_cache(pattern = nil) if pattern keys = @redis.keys("query_cache:#{pattern}") @redis.del(*keys) if keys.any? else keys = @redis.keys('query_cache:*') @redis.del(*keys) if keys.any? end

{ invalidated_keys: keys&.length || 0 } end

def get_query_statistics { total_queries: get_total_query_count, successful_queries: get_successful_query_count, failed_queries: get_failed_query_count, cache_hits: get_cache_hit_count, cache_miss_rate: get_cache_miss_rate, average_execution_time: get_average_execution_time, queries_by_type: get_queries_by_type } end

private

def setup_query_bus case @config['cqrs']['query_bus'] when 'asynchronous' setup_asynchronous_bus when 'distributed' setup_distributed_bus end end

def setup_asynchronous_bus # Implementation for asynchronous query bus end

def setup_distributed_bus # Implementation for distributed query bus end

def get_cached_result(query) cache_key = generate_cache_key(query) cached_data = @redis.get(cache_key) if cached_data increment_cache_hits JSON.parse(cached_data) else increment_cache_misses nil end end

def cache_result(query, result) cache_key = generate_cache_key(query) ttl = @config['queries']['cache_ttl'].to_i @redis.setex(cache_key, ttl, result.to_json) end

def generate_cache_key(query) query_data = query.to_h rescue query.to_s "query_cache:#{query.class.name}:#{Digest::SHA256.hexdigest(query_data.to_json)}" end

def apply_middleware(query, query_id) processed_query = query.dup

@middleware.each do |middleware| begin result = middleware.process(processed_query, query_id) return result unless result[:success] processed_query = result[:query] rescue => e return { success: false, error: "Middleware error: #{e.message}", query_id: query_id } end end

{ success: true, query: processed_query } end

def authorize_query(query) return { authorized: true } unless @config['queries']['authorization_enabled'] == 'true'

if query.respond_to?(:authorize) query.authorize else { authorized: true } end end

def get_handler(query_class) @handlers[query_class] end

def execute_with_handler(handler, query, query_id) start_time = Time.now

begin if handler.respond_to?(:call) result = handler.call(query) elsif handler.respond_to?(:handle) result = handler.handle(query) else raise "Handler does not respond to call or handle" end

{ success: true, result: result, query_id: query_id, duration: Time.now - start_time } rescue => e { success: false, error: e.message, query_id: query_id, duration: Time.now - start_time } end end

def log_query_execution(query, query_id, result, duration) log_entry = { query_id: query_id, query_type: query.class.name, success: result[:success], cached: result[:cached] || false, duration: duration, timestamp: Time.now.iso8601 }

@redis.lpush('query_log', log_entry.to_json) @redis.ltrim('query_log', 0, 9999) end

def log_query_error(query, query_id, error) error_entry = { query_id: query_id, query_type: query.class.name, error: error.message, backtrace: error.backtrace.first(5), timestamp: Time.now.iso8601 }

@redis.lpush('query_errors', error_entry.to_json) @redis.ltrim('query_errors', 0, 9999) end

def increment_cache_hits @redis.incr('cache_hits') end

def increment_cache_misses @redis.incr('cache_misses') end

def get_total_query_count @redis.llen('query_log') end

def get_successful_query_count queries = @redis.lrange('query_log', 0, -1) queries.count { |q| JSON.parse(q)['success'] } end

def get_failed_query_count queries = @redis.lrange('query_log', 0, -1) queries.count { |q| !JSON.parse(q)['success'] } end

def get_cache_hit_count @redis.get('cache_hits').to_i end

def get_cache_miss_rate hits = get_cache_hit_count misses = @redis.get('cache_misses').to_i total = hits + misses return 0 if total == 0 (misses.to_f / total * 100).round(2) end

def get_average_execution_time queries = @redis.lrange('query_log', 0, -1) return 0 if queries.empty?

total_time = queries.sum { |q| JSON.parse(q)['duration'] } (total_time / queries.length).round(3) end

def get_queries_by_type queries = @redis.lrange('query_log', 0, -1) query_types = {}

queries.each do |q| q_data = JSON.parse(q) query_type = q_data['query_type'] query_types[query_type] ||= 0 query_types[query_type] += 1 end

query_types end end

📝 Command and Query Classes

Command Definitions

lib/commands.rb

require 'active_model'

class CreateUserCommand include ActiveModel::Model include ActiveModel::Validations

attr_accessor :email, :name, :password, :role

validates :email, presence: true, format: { with: URI::MailTo::EMAIL_REGEXP } validates :name, presence: true, length: { minimum: 2 } validates :password, presence: true, length: { minimum: 8 } validates :role, inclusion: { in: %w[user admin moderator] }

def initialize(attributes = {}) super @role ||= 'user' end

def authorize # Implementation for command authorization { authorized: true } end end

class UpdateUserCommand include ActiveModel::Model include ActiveModel::Validations

attr_accessor :user_id, :email, :name, :role

validates :user_id, presence: true validates :email, format: { with: URI::MailTo::EMAIL_REGEXP }, allow_blank: true validates :name, length: { minimum: 2 }, allow_blank: true validates :role, inclusion: { in: %w[user admin moderator] }, allow_blank: true

def authorize # Implementation for command authorization { authorized: true } end end

class DeleteUserCommand include ActiveModel::Model include ActiveModel::Validations

attr_accessor :user_id, :reason

validates :user_id, presence: true validates :reason, presence: true, length: { minimum: 10 }

def authorize # Implementation for command authorization { authorized: true } end end

class CreateOrderCommand include ActiveModel::Model include ActiveModel::Validations

attr_accessor :user_id, :items, :shipping_address

validates :user_id, presence: true validates :items, presence: true validates :shipping_address, presence: true

def authorize # Implementation for command authorization { authorized: true } end end

class UpdateOrderCommand include ActiveModel::Model include ActiveModel::Validations

attr_accessor :order_id, :status, :notes

validates :order_id, presence: true validates :status, inclusion: { in: %w[pending confirmed shipped delivered cancelled] }

def authorize # Implementation for command authorization { authorized: true } end end

Query Definitions

lib/queries.rb

require 'active_model'

class GetUserQuery include ActiveModel::Model include ActiveModel::Validations

attr_accessor :user_id

validates :user_id, presence: true

def authorize # Implementation for query authorization { authorized: true } end end

class GetUsersQuery include ActiveModel::Model include ActiveModel::Validations

attr_accessor :page, :per_page, :filters, :sort_by, :sort_order

validates :page, numericality: { greater_than: 0 }, allow_blank: true validates :per_page, numericality: { greater_than: 0, less_than_or_equal_to: 100 }, allow_blank: true validates :sort_order, inclusion: { in: %w[asc desc] }, allow_blank: true

def initialize(attributes = {}) super @page ||= 1 @per_page ||= 20 @filters ||= {} @sort_by ||= 'created_at' @sort_order ||= 'desc' end

def authorize # Implementation for query authorization { authorized: true } end end

class GetUserOrdersQuery include ActiveModel::Model include ActiveModel::Validations

attr_accessor :user_id, :page, :per_page, :status

validates :user_id, presence: true validates :page, numericality: { greater_than: 0 }, allow_blank: true validates :per_page, numericality: { greater_than: 0, less_than_or_equal_to: 100 }, allow_blank: true validates :status, inclusion: { in: %w[pending confirmed shipped delivered cancelled] }, allow_blank: true

def initialize(attributes = {}) super @page ||= 1 @per_page ||= 20 end

def authorize # Implementation for query authorization { authorized: true } end end

class GetOrderQuery include ActiveModel::Model include ActiveModel::Validations

attr_accessor :order_id

validates :order_id, presence: true

def authorize # Implementation for query authorization { authorized: true } end end

class GetOrdersQuery include ActiveModel::Model include ActiveModel::Validations

attr_accessor :page, :per_page, :filters, :sort_by, :sort_order

validates :page, numericality: { greater_than: 0 }, allow_blank: true validates :per_page, numericality: { greater_than: 0, less_than_or_equal_to: 100 }, allow_blank: true validates :sort_order, inclusion: { in: %w[asc desc] }, allow_blank: true

def initialize(attributes = {}) super @page ||= 1 @per_page ||= 20 @filters ||= {} @sort_by ||= 'created_at' @sort_order ||= 'desc' end

def authorize # Implementation for query authorization { authorized: true } end end

🎯 Command and Query Handlers

Command Handlers

lib/command_handlers.rb

require 'tusk' require 'redis'

class CreateUserCommandHandler def initialize(event_store, read_model) @event_store = event_store @read_model = read_model end

def handle(command) # Check if user already exists existing_user = @read_model.find_user_by_email(command.email) if existing_user return { success: false, error: 'User with this email already exists' } end

# Create user aggregate user_aggregate = UserAggregate.new(SecureRandom.uuid, @event_store) # Execute command on aggregate result = user_aggregate.create_user(command.email, command.name, command.password, command.role) if result[:success] # Save aggregate save_result = user_aggregate.save if save_result[:success] { success: true, user_id: user_aggregate.id } else { success: false, error: save_result[:error] } end else result end end end

class UpdateUserCommandHandler def initialize(event_store, read_model) @event_store = event_store @read_model = read_model end

def handle(command) # Load user aggregate user_aggregate = UserAggregate.new(command.user_id, @event_store) user_aggregate.load_from_history

# Check if user exists unless user_aggregate.get_state[:email] return { success: false, error: 'User not found' } end

# Execute command on aggregate result = user_aggregate.update_profile({ email: command.email, name: command.name, role: command.role })

if result[:success] # Save aggregate save_result = user_aggregate.save if save_result[:success] { success: true, user_id: user_aggregate.id } else { success: false, error: save_result[:error] } end else result end end end

class DeleteUserCommandHandler def initialize(event_store, read_model) @event_store = event_store @read_model = read_model end

def handle(command) # Load user aggregate user_aggregate = UserAggregate.new(command.user_id, @event_store) user_aggregate.load_from_history

# Check if user exists unless user_aggregate.get_state[:email] return { success: false, error: 'User not found' } end

# Execute command on aggregate result = user_aggregate.deactivate_user(command.reason)

if result[:success] # Save aggregate save_result = user_aggregate.save if save_result[:success] { success: true, user_id: user_aggregate.id } else { success: false, error: save_result[:error] } end else result end end end

class CreateOrderCommandHandler def initialize(event_store, read_model) @event_store = event_store @read_model = read_model end

def handle(command) # Validate user exists user = @read_model.find_user(command.user_id) unless user return { success: false, error: 'User not found' } end

# Create order aggregate order_aggregate = OrderAggregate.new(SecureRandom.uuid, @event_store) # Execute command on aggregate result = order_aggregate.create_order(command.user_id, command.items, command.shipping_address) if result[:success] # Save aggregate save_result = order_aggregate.save if save_result[:success] { success: true, order_id: order_aggregate.id } else { success: false, error: save_result[:error] } end else result end end end

class UpdateOrderCommandHandler def initialize(event_store, read_model) @event_store = event_store @read_model = read_model end

def handle(command) # Load order aggregate order_aggregate = OrderAggregate.new(command.order_id, @event_store) order_aggregate.load_from_history

# Check if order exists unless order_aggregate.get_state[:user_id] return { success: false, error: 'Order not found' } end

# Execute command on aggregate result = order_aggregate.update_status(command.status, command.notes)

if result[:success] # Save aggregate save_result = order_aggregate.save if save_result[:success] { success: true, order_id: order_aggregate.id } else { success: false, error: save_result[:error] } end else result end end end

Query Handlers

lib/query_handlers.rb

require 'tusk' require 'redis'

class GetUserQueryHandler def initialize(read_model) @read_model = read_model end

def handle(query) user = @read_model.find_user(query.user_id) if user { success: true, user: user } else { success: false, error: 'User not found' } end end end

class GetUsersQueryHandler def initialize(read_model) @read_model = read_model end

def handle(query) users = @read_model.find_users( page: query.page, per_page: query.per_page, filters: query.filters, sort_by: query.sort_by, sort_order: query.sort_order )

total_count = @read_model.count_users(query.filters)

{ success: true, users: users, pagination: { page: query.page, per_page: query.per_page, total_count: total_count, total_pages: (total_count.to_f / query.per_page).ceil } } end end

class GetUserOrdersQueryHandler def initialize(read_model) @read_model = read_model end

def handle(query) orders = @read_model.find_user_orders( user_id: query.user_id, page: query.page, per_page: query.per_page, status: query.status )

total_count = @read_model.count_user_orders(query.user_id, query.status)

{ success: true, orders: orders, pagination: { page: query.page, per_page: query.per_page, total_count: total_count, total_pages: (total_count.to_f / query.per_page).ceil } } end end

class GetOrderQueryHandler def initialize(read_model) @read_model = read_model end

def handle(query) order = @read_model.find_order(query.order_id) if order { success: true, order: order } else { success: false, error: 'Order not found' } end end end

class GetOrdersQueryHandler def initialize(read_model) @read_model = read_model end

def handle(query) orders = @read_model.find_orders( page: query.page, per_page: query.per_page, filters: query.filters, sort_by: query.sort_by, sort_order: query.sort_order )

total_count = @read_model.count_orders(query.filters)

{ success: true, orders: orders, pagination: { page: query.page, per_page: query.per_page, total_count: total_count, total_pages: (total_count.to_f / query.per_page).ceil } } end end

🎯 Configuration Management

CQRS Configuration

config/cqrs_features.tsk

[cqrs] enabled: @env("CQRS_ENABLED", "true") command_bus: @env("COMMAND_BUS_TYPE", "synchronous") query_bus: @env("QUERY_BUS_TYPE", "synchronous") event_sourcing: @env("CQRS_EVENT_SOURCING", "true") read_write_separation: @env("READ_WRITE_SEPARATION", "true")

[commands] validation_enabled: @env("COMMAND_VALIDATION_ENABLED", "true") authorization_enabled: @env("COMMAND_AUTHORIZATION_ENABLED", "true") logging_enabled: @env("COMMAND_LOGGING_ENABLED", "true") retry_attempts: @env("COMMAND_RETRY_ATTEMPTS", "3") async_processing: @env("COMMAND_ASYNC_PROCESSING", "false")

[queries] caching_enabled: @env("QUERY_CACHING_ENABLED", "true") cache_ttl: @env("QUERY_CACHE_TTL", "300") authorization_enabled: @env("QUERY_AUTHORIZATION_ENABLED", "true") pagination_enabled: @env("QUERY_PAGINATION_ENABLED", "true") optimization_enabled: @env("QUERY_OPTIMIZATION_ENABLED", "true")

[read_models] database: @env("READ_MODEL_DATABASE", "postgresql") optimization_enabled: @env("READ_MODEL_OPTIMIZATION_ENABLED", "true") denormalization_enabled: @env("READ_MODEL_DENORMALIZATION_ENABLED", "true") indexing_enabled: @env("READ_MODEL_INDEXING_ENABLED", "true")

[write_models] database: @env("WRITE_MODEL_DATABASE", "postgresql") event_store_enabled: @env("WRITE_MODEL_EVENT_STORE", "true") snapshot_enabled: @env("WRITE_MODEL_SNAPSHOTS", "true") optimistic_concurrency: @env("WRITE_MODEL_OPTIMISTIC_CONCURRENCY", "true")

[performance] command_timeout: @env("COMMAND_TIMEOUT", "30") query_timeout: @env("QUERY_TIMEOUT", "10") max_concurrent_commands: @env("MAX_CONCURRENT_COMMANDS", "100") max_concurrent_queries: @env("MAX_CONCURRENT_QUERIES", "200")

🎯 Summary

This comprehensive guide covers CQRS implementation with TuskLang and Ruby, including:

- Command Bus: Command handling with validation, authorization, and middleware - Query Bus: Query handling with caching, authorization, and optimization - Command Classes: Structured command definitions with validation - Query Classes: Structured query definitions with pagination - Command Handlers: Business logic for command processing - Query Handlers: Optimized query processing with read models - Configuration Management: Enterprise-grade CQRS configuration

The CQRS features with TuskLang provide a robust foundation for building systems that can handle massive read and write loads independently, optimize performance through read/write separation, and maintain data consistency through event sourcing.