💎 Message Queue Integration with TuskLang and Ruby

Ruby Documentation

Message Queue Integration with TuskLang and Ruby

This guide covers integrating message queues with TuskLang and Ruby applications for asynchronous processing and distributed systems.

Table of Contents

1. Overview 2. Installation 3. Basic Setup 4. Producer Implementation 5. Consumer Implementation 6. Message Types 7. Error Handling 8. Monitoring 9. Testing 10. Deployment

Overview

Message queues enable asynchronous processing and decoupled communication between services. This guide shows how to integrate various message queue systems with TuskLang and Ruby applications.

Key Features

- Multiple queue backends (Redis, RabbitMQ, Apache Kafka) - Reliable message delivery with acknowledgments - Dead letter queues for failed messages - Message routing and filtering - Batch processing support - Monitoring and metrics

Installation

Dependencies

Gemfile

gem 'sidekiq' gem 'redis' gem 'bunny' gem 'ruby-kafka' gem 'connection_pool' gem 'json'

TuskLang Configuration

config/message_queue.tusk

message_queue: backend: "redis" # redis, rabbitmq, kafka redis: url: "redis://localhost:6379/1" pool_size: 10 pool_timeout: 5 retry_attempts: 3 retry_delay: 1 rabbitmq: host: "localhost" port: 5672 username: "guest" password: "guest" vhost: "/" connection_pool_size: 5 channel_pool_size: 10 kafka: brokers: ["localhost:9092"] client_id: "tusk_ruby_client" group_id: "tusk_ruby_group" auto_offset_reset: "earliest" enable_auto_commit: true queues: default: "default" high_priority: "high_priority" low_priority: "low_priority" dead_letter: "dead_letter" retry: "retry" retry: max_attempts: 3 backoff_multiplier: 2 initial_delay: 1 monitoring: enabled: true metrics_port: 9090 health_check_interval: 30

Basic Setup

Message Queue Manager

app/message_queue/queue_manager.rb

class QueueManager include Singleton

def initialize @config = Rails.application.config.message_queue @backend = create_backend end

def publish(queue_name, message, options = {}) @backend.publish(queue_name, message, options) end

def subscribe(queue_name, handler_class, options = {}) @backend.subscribe(queue_name, handler_class, options) end

def delete_queue(queue_name) @backend.delete_queue(queue_name) end

def purge_queue(queue_name) @backend.purge_queue(queue_name) end

def queue_size(queue_name) @backend.queue_size(queue_name) end

private

def create_backend case @config[:backend] when 'redis' RedisQueueBackend.new(@config[:redis]) when 'rabbitmq' RabbitMQBackend.new(@config[:rabbitmq]) when 'kafka' KafkaBackend.new(@config[:kafka]) else raise "Unsupported message queue backend: #{@config[:backend]}" end end end

Base Message

app/message_queue/base_message.rb

class BaseMessage include ActiveModel::Model include ActiveModel::Serialization

attr_accessor :id, :type, :data, :metadata, :created_at, :attempts

def initialize(attributes = {}) @id = attributes[:id] || SecureRandom.uuid @type = attributes[:type] || self.class.name @data = attributes[:data] || {} @metadata = attributes[:metadata] || {} @created_at = attributes[:created_at] || Time.current @attempts = attributes[:attempts] || 0 end

def to_json { id: id, type: type, data: data, metadata: metadata, created_at: created_at.iso8601, attempts: attempts }.to_json end

def self.from_json(json_string) data = JSON.parse(json_string) new( id: data['id'], type: data['type'], data: data['data'], metadata: data['metadata'], created_at: Time.parse(data['created_at']), attempts: data['attempts'] ) end

def increment_attempts! @attempts += 1 end

def max_attempts_reached? attempts >= Rails.application.config.message_queue[:retry][:max_attempts] end end

Producer Implementation

Message Producer

app/message_queue/producer.rb

class MessageProducer include Singleton

def initialize @queue_manager = QueueManager.instance @config = Rails.application.config.message_queue end

def publish_user_created(user) message = UserCreatedMessage.new( data: { user_id: user.id, email: user.email, name: user.name } ) publish(@config[:queues][:default], message) end

def publish_user_updated(user) message = UserUpdatedMessage.new( data: { user_id: user.id, email: user.email, name: user.name, updated_at: user.updated_at } ) publish(@config[:queues][:default], message) end

def publish_post_created(post) message = PostCreatedMessage.new( data: { post_id: post.id, user_id: post.user_id, title: post.title, category: post.category } ) publish(@config[:queues][:high_priority], message) end

def publish_email_notification(user, template, data) message = EmailNotificationMessage.new( data: { user_id: user.id, email: user.email, template: template, data: data } ) publish(@config[:queues][:low_priority], message) end

def publish_data_export(user, format) message = DataExportMessage.new( data: { user_id: user.id, format: format, requested_at: Time.current } ) publish(@config[:queues][:low_priority], message) end

private

def publish(queue_name, message, options = {}) Rails.logger.info "Publishing message to #{queue_name}: #{message.type}" @queue_manager.publish(queue_name, message.to_json, options) # Track metrics track_metrics(:published, queue_name, message.type) rescue => e Rails.logger.error "Failed to publish message: #{e.message}" track_metrics(:publish_error, queue_name, message.type) raise e end

def track_metrics(action, queue_name, message_type) return unless @config[:monitoring][:enabled] # Implementation would send metrics to monitoring system Rails.logger.debug "Metric: #{action} - #{queue_name} - #{message_type}" end end

Message Types

app/message_queue/messages/user_created_message.rb

class UserCreatedMessage < BaseMessage def initialize(attributes = {}) super(attributes.merge(type: 'UserCreated')) end

def user_id data['user_id'] end

def email data['email'] end

def name data['name'] end end

app/message_queue/messages/user_updated_message.rb

class UserUpdatedMessage < BaseMessage def initialize(attributes = {}) super(attributes.merge(type: 'UserUpdated')) end

def user_id data['user_id'] end

def email data['email'] end

def name data['name'] end

def updated_at Time.parse(data['updated_at']) if data['updated_at'] end end

app/message_queue/messages/post_created_message.rb

class PostCreatedMessage < BaseMessage def initialize(attributes = {}) super(attributes.merge(type: 'PostCreated')) end

def post_id data['post_id'] end

def user_id data['user_id'] end

def title data['title'] end

def category data['category'] end end

app/message_queue/messages/email_notification_message.rb

class EmailNotificationMessage < BaseMessage def initialize(attributes = {}) super(attributes.merge(type: 'EmailNotification')) end

def user_id data['user_id'] end

def email data['email'] end

def template data['template'] end

def notification_data data['data'] end end

app/message_queue/messages/data_export_message.rb

class DataExportMessage < BaseMessage def initialize(attributes = {}) super(attributes.merge(type: 'DataExport')) end

def user_id data['user_id'] end

def format data['format'] end

def requested_at Time.parse(data['requested_at']) if data['requested_at'] end end

Consumer Implementation

Base Consumer

app/message_queue/consumers/base_consumer.rb

class BaseConsumer include Singleton

def initialize @config = Rails.application.config.message_queue end

def process(message) Rails.logger.info "Processing message: #{message.type} - #{message.id}" start_time = Time.current begin handle_message(message) track_metrics(:processed, message.type, Time.current - start_time) rescue => e handle_error(message, e) track_metrics(:error, message.type, Time.current - start_time) raise e end end

protected

def handle_message(message) raise NotImplementedError, "#{self.class} must implement handle_message" end

def handle_error(message, error) Rails.logger.error "Error processing message #{message.id}: #{error.message}" message.increment_attempts! if message.max_attempts_reached? send_to_dead_letter_queue(message, error) else retry_message(message, error) end end

def send_to_dead_letter_queue(message, error) dead_letter_message = DeadLetterMessage.new( original_message: message, error: error.message, failed_at: Time.current ) QueueManager.instance.publish( @config[:queues][:dead_letter], dead_letter_message.to_json ) Rails.logger.warn "Message #{message.id} sent to dead letter queue" end

def retry_message(message, error) delay = calculate_retry_delay(message.attempts) retry_message = RetryMessage.new( original_message: message, retry_at: Time.current + delay.seconds ) QueueManager.instance.publish( @config[:queues][:retry], retry_message.to_json ) Rails.logger.info "Message #{message.id} scheduled for retry in #{delay} seconds" end

def calculate_retry_delay(attempts) initial_delay = @config[:retry][:initial_delay] multiplier = @config[:retry][:backoff_multiplier] initial_delay (multiplier * (attempts - 1)) end

def track_metrics(action, message_type, duration = nil) return unless @config[:monitoring][:enabled] # Implementation would send metrics to monitoring system Rails.logger.debug "Metric: #{action} - #{message_type} - #{duration}" end end

Specific Consumers

app/message_queue/consumers/user_consumer.rb

class UserConsumer < BaseConsumer def handle_message(message) case message.type when 'UserCreated' handle_user_created(message) when 'UserUpdated' handle_user_updated(message) else raise "Unknown message type: #{message.type}" end end

private

def handle_user_created(message) user = User.find(message.user_id) # Send welcome email UserMailer.welcome_email(user).deliver_later # Create user profile UserProfile.create!( user: user, bio: "Welcome to our platform!", avatar_url: nil ) # Update user statistics update_user_statistics(user) Rails.logger.info "Processed user created: #{user.id}" end

def handle_user_updated(message) user = User.find(message.user_id) # Update search index SearchIndex.update_user(user) # Notify followers notify_followers_of_update(user) # Update cache Rails.cache.delete("user:#{user.id}") Rails.logger.info "Processed user updated: #{user.id}" end

def update_user_statistics(user) # Update global user count Rails.cache.increment('total_users') # Update daily signups today = Date.current Rails.cache.increment("signups:#{today}") end

def notify_followers_of_update(user) user.followers.find_each do |follower| Notification.create!( user: follower, notifiable: user, action: 'profile_updated', data: { user_name: user.name } ) end end end

app/message_queue/consumers/post_consumer.rb

class PostConsumer < BaseConsumer def handle_message(message) case message.type when 'PostCreated' handle_post_created(message) else raise "Unknown message type: #{message.type}" end end

private

def handle_post_created(message) post = Post.find(message.post_id) # Update search index SearchIndex.index_post(post) # Send notifications to followers notify_followers_of_post(post) # Update user post count post.user.increment!(:posts_count) # Process hashtags process_hashtags(post) Rails.logger.info "Processed post created: #{post.id}" end

def notify_followers_of_post(post) post.user.followers.find_each do |follower| Notification.create!( user: follower, notifiable: post, action: 'new_post', data: { user_name: post.user.name, post_title: post.title } ) end end

def process_hashtags(post) hashtags = post.content.scan(/#\w+/) hashtags.each do |hashtag| tag = Tag.find_or_create_by(name: hashtag.downcase) PostTag.create!(post: post, tag: tag) end end end

app/message_queue/consumers/email_consumer.rb

class EmailConsumer < BaseConsumer def handle_message(message) case message.type when 'EmailNotification' handle_email_notification(message) else raise "Unknown message type: #{message.type}" end end

private

def handle_email_notification(message) user = User.find(message.user_id) case message.template when 'welcome' UserMailer.welcome_email(user).deliver_now when 'password_reset' UserMailer.password_reset_email(user, message.notification_data['token']).deliver_now when 'post_notification' UserMailer.post_notification_email(user, message.notification_data).deliver_now else raise "Unknown email template: #{message.template}" end Rails.logger.info "Sent email notification to user: #{user.id}" end end

app/message_queue/consumers/data_export_consumer.rb

class DataExportConsumer < BaseConsumer def handle_message(message) case message.type when 'DataExport' handle_data_export(message) else raise "Unknown message type: #{message.type}" end end

private

def handle_data_export(message) user = User.find(message.user_id) case message.format when 'csv' export_to_csv(user) when 'json' export_to_json(user) when 'pdf' export_to_pdf(user) else raise "Unknown export format: #{message.format}" end # Notify user when export is complete Notification.create!( user: user, action: 'data_export_complete', data: { format: message.format } ) Rails.logger.info "Completed data export for user: #{user.id}" end

def export_to_csv(user) # Implementation for CSV export csv_data = generate_csv_data(user) save_export_file(user, csv_data, 'csv') end

def export_to_json(user) # Implementation for JSON export json_data = generate_json_data(user) save_export_file(user, json_data, 'json') end

def export_to_pdf(user) # Implementation for PDF export pdf_data = generate_pdf_data(user) save_export_file(user, pdf_data, 'pdf') end

def generate_csv_data(user) # Generate CSV data for user CSV.generate do |csv| csv << ['ID', 'Email', 'Name', 'Created At'] csv << [user.id, user.email, user.name, user.created_at] end end

def generate_json_data(user) user.as_json(include: :posts) end

def generate_pdf_data(user) # Generate PDF data for user # Implementation would use a PDF generation library "PDF data for user #{user.id}" end

def save_export_file(user, data, format) filename = "export_#{user.id}_#{Time.current.to_i}.#{format}" file_path = Rails.root.join('tmp', 'exports', filename) FileUtils.mkdir_p(File.dirname(file_path)) File.write(file_path, data) # Store file reference in database DataExport.create!( user: user, file_path: file_path.to_s, format: format, completed_at: Time.current ) end end

Error Handling

Dead Letter Queue Handler

app/message_queue/consumers/dead_letter_consumer.rb

class DeadLetterConsumer < BaseConsumer def handle_message(message) case message.type when 'DeadLetter' handle_dead_letter(message) else raise "Unknown message type: #{message.type}" end end

private

def handle_dead_letter(message) original_message = message.original_message error = message.error # Log the failure Rails.logger.error "Dead letter message: #{original_message.id} - #{error}" # Store in database for analysis DeadLetterMessage.create!( message_id: original_message.id, message_type: original_message.type, message_data: original_message.data, error_message: error, failed_at: message.failed_at, attempts: original_message.attempts ) # Send alert to monitoring system send_alert(original_message, error) end

def send_alert(message, error) # Implementation would send alert to monitoring system Rails.logger.warn "Alert: Dead letter message #{message.id} - #{error}" end end

app/message_queue/consumers/retry_consumer.rb

class RetryConsumer < BaseConsumer def handle_message(message) case message.type when 'Retry' handle_retry(message) else raise "Unknown message type: #{message.type}" end end

private

def handle_retry(message) original_message = message.original_message # Wait until retry time if message.retry_at > Time.current sleep_time = message.retry_at - Time.current sleep(sleep_time) if sleep_time > 0 end # Re-publish to original queue QueueManager.instance.publish( @config[:queues][:default], original_message.to_json ) Rails.logger.info "Retried message: #{original_message.id}" end end

Monitoring

Queue Monitor

app/message_queue/monitoring/queue_monitor.rb

class QueueMonitor include Singleton

def initialize @queue_manager = QueueManager.instance @config = Rails.application.config.message_queue end

def monitor_queues queues = @config[:queues].values queues.each do |queue_name| size = @queue_manager.queue_size(queue_name) track_queue_metrics(queue_name, size) end end

def health_check { status: 'healthy', queues: queue_statuses, timestamp: Time.current.iso8601 } end

private

def queue_statuses @config[:queues].map do |name, queue_name| { name: name, queue: queue_name, size: @queue_manager.queue_size(queue_name), status: queue_healthy?(queue_name) ? 'healthy' : 'unhealthy' } end end

def queue_healthy?(queue_name) size = @queue_manager.queue_size(queue_name) size < 1000 # Threshold for healthy queue end

def track_queue_metrics(queue_name, size) return unless @config[:monitoring][:enabled] # Implementation would send metrics to monitoring system Rails.logger.debug "Queue metric: #{queue_name} - #{size} messages" end end

Testing

Message Queue Test Helper

spec/support/message_queue_helper.rb

module MessageQueueHelper def publish_test_message(queue_name, message_type, data = {}) message = BaseMessage.new( type: message_type, data: data ) QueueManager.instance.publish(queue_name, message.to_json) end

def clear_all_queues config = Rails.application.config.message_queue config[:queues].values.each do |queue_name| QueueManager.instance.purge_queue(queue_name) end end

def expect_message_published(queue_name, message_type) # Implementation would verify message was published # This is a simplified version expect(QueueManager.instance.queue_size(queue_name)).to be > 0 end end

RSpec.configure do |config| config.include MessageQueueHelper, type: :message_queue config.before(:each, type: :message_queue) do clear_all_queues end end

Message Queue Tests

spec/message_queue/producer_spec.rb

RSpec.describe MessageProducer, type: :message_queue do let(:producer) { MessageProducer.instance } let(:user) { create(:user) } let(:post) { create(:post, user: user) }

describe '#publish_user_created' do it 'publishes user created message' do producer.publish_user_created(user) expect_message_published('default', 'UserCreated') end end

describe '#publish_post_created' do it 'publishes post created message to high priority queue' do producer.publish_post_created(post) expect_message_published('high_priority', 'PostCreated') end end

describe '#publish_email_notification' do it 'publishes email notification to low priority queue' do producer.publish_email_notification(user, 'welcome', {}) expect_message_published('low_priority', 'EmailNotification') end end end

spec/message_queue/consumers/user_consumer_spec.rb

RSpec.describe UserConsumer, type: :message_queue do let(:consumer) { UserConsumer.instance } let(:user) { create(:user) }

describe '#handle_user_created' do it 'processes user created message' do message = UserCreatedMessage.new( data: { user_id: user.id, email: user.email, name: user.name } ) expect { consumer.process(message) }.to change { UserProfile.count }.by(1) end end end

Deployment

Production Configuration

config/environments/production.rb

Rails.application.configure do # Message queue configuration config.message_queue = { backend: ENV['MESSAGE_QUEUE_BACKEND'] || 'redis', redis: { url: ENV['REDIS_URL'] || 'redis://localhost:6379/1', pool_size: ENV['REDIS_POOL_SIZE'] || 10, pool_timeout: ENV['REDIS_POOL_TIMEOUT'] || 5, retry_attempts: ENV['REDIS_RETRY_ATTEMPTS'] || 3, retry_delay: ENV['REDIS_RETRY_DELAY'] || 1 }, rabbitmq: { host: ENV['RABBITMQ_HOST'] || 'localhost', port: ENV['RABBITMQ_PORT'] || 5672, username: ENV['RABBITMQ_USERNAME'] || 'guest', password: ENV['RABBITMQ_PASSWORD'] || 'guest', vhost: ENV['RABBITMQ_VHOST'] || '/', connection_pool_size: ENV['RABBITMQ_CONNECTION_POOL_SIZE'] || 5, channel_pool_size: ENV['RABBITMQ_CHANNEL_POOL_SIZE'] || 10 }, kafka: { brokers: ENV['KAFKA_BROKERS']&.split(',') || ['localhost:9092'], client_id: ENV['KAFKA_CLIENT_ID'] || 'tusk_ruby_client', group_id: ENV['KAFKA_GROUP_ID'] || 'tusk_ruby_group', auto_offset_reset: ENV['KAFKA_AUTO_OFFSET_RESET'] || 'earliest', enable_auto_commit: ENV['KAFKA_ENABLE_AUTO_COMMIT'] != 'false' }, queues: { default: ENV['DEFAULT_QUEUE'] || 'default', high_priority: ENV['HIGH_PRIORITY_QUEUE'] || 'high_priority', low_priority: ENV['LOW_PRIORITY_QUEUE'] || 'low_priority', dead_letter: ENV['DEAD_LETTER_QUEUE'] || 'dead_letter', retry: ENV['RETRY_QUEUE'] || 'retry' }, retry: { max_attempts: ENV['RETRY_MAX_ATTEMPTS'] || 3, backoff_multiplier: ENV['RETRY_BACKOFF_MULTIPLIER'] || 2, initial_delay: ENV['RETRY_INITIAL_DELAY'] || 1 }, monitoring: { enabled: ENV['MESSAGE_QUEUE_MONITORING_ENABLED'] != 'false', metrics_port: ENV['MESSAGE_QUEUE_METRICS_PORT'] || 9090, health_check_interval: ENV['MESSAGE_QUEUE_HEALTH_CHECK_INTERVAL'] || 30 } } end

Systemd Service

/etc/systemd/system/message-queue-consumer.service

[Unit] Description=Message Queue Consumer After=network.target

[Service] Type=simple User=deploy WorkingDirectory=/var/www/tsk/sdk Environment=RAILS_ENV=production ExecStart=/usr/bin/bundle exec ruby app/message_queue/consumer_runner.rb Restart=always RestartSec=5

[Install] WantedBy=multi-user.target

Docker Configuration

Dockerfile.message-queue

FROM ruby:3.2-alpine

RUN apk add --no-cache \ build-base \ redis

WORKDIR /app

COPY Gemfile Gemfile.lock ./ RUN bundle install --jobs 4 --retry 3

COPY . .

CMD ["bundle", "exec", "ruby", "app/message_queue/consumer_runner.rb"]

docker-compose.message-queue.yml

version: '3.8'

services: message-queue-consumer: build: context: . dockerfile: Dockerfile.message-queue environment: - RAILS_ENV=production - REDIS_URL=redis://redis:6379/1 - MESSAGE_QUEUE_BACKEND=redis depends_on: - redis - db

redis: image: redis:7-alpine volumes: - redis_data:/data

db: image: postgres:15-alpine environment: - POSTGRES_DB=message_queue_app - POSTGRES_USER=postgres - POSTGRES_PASSWORD=password volumes: - postgres_data:/var/lib/postgresql/data

volumes: redis_data: postgres_data:

This comprehensive message queue integration guide provides everything needed to build robust asynchronous processing systems with TuskLang and Ruby, including multiple backend support, reliable message delivery, error handling, monitoring, testing, and deployment strategies.