💎 Distributed Systems with TuskLang and Ruby

Ruby Documentation

Distributed Systems with TuskLang and Ruby

🌐 Scale Across the Digital Universe

TuskLang enables sophisticated distributed systems for Ruby applications, providing service discovery, load balancing, fault tolerance, and distributed coordination. Build systems that span multiple nodes, regions, and clouds with seamless coordination.

🚀 Quick Start: Service Discovery

Basic Distributed System Configuration

config/distributed_systems.tsk

[distributed_systems] enabled: @env("DISTRIBUTED_SYSTEMS_ENABLED", "true") cluster_name: @env("CLUSTER_NAME", "production-cluster") node_id: @env("NODE_ID", "node-1") region: @env("REGION", "us-east-1") zone: @env("ZONE", "us-east-1a")

[service_discovery] enabled: @env("SERVICE_DISCOVERY_ENABLED", "true") provider: @env("SERVICE_DISCOVERY_PROVIDER", "consul") # consul, etcd, zookeeper refresh_interval: @env("SERVICE_DISCOVERY_REFRESH", "30") health_check_interval: @env("HEALTH_CHECK_INTERVAL", "10")

[load_balancing] enabled: @env("LOAD_BALANCING_ENABLED", "true") algorithm: @env("LOAD_BALANCING_ALGORITHM", "round_robin") # round_robin, least_connections, weighted health_check_enabled: @env("LOAD_BALANCER_HEALTH_CHECK", "true") failover_enabled: @env("LOAD_BALANCER_FAILOVER", "true")

[fault_tolerance] enabled: @env("FAULT_TOLERANCE_ENABLED", "true") circuit_breaker_enabled: @env("CIRCUIT_BREAKER_ENABLED", "true") retry_attempts: @env("RETRY_ATTEMPTS", "3") timeout: @env("DISTRIBUTED_TIMEOUT", "30")

Service Discovery Implementation

lib/service_discovery.rb

require 'tusk' require 'redis' require 'json' require 'securerandom' require 'net/http' require 'uri'

class ServiceDiscovery def initialize(config_path = 'config/distributed_systems.tsk') @config = Tusk.load(config_path) @redis = Redis.new(url: @config['redis']['url']) @services = {} @health_checks = {} setup_service_discovery end

def register_service(service_name, service_info) return { success: false, error: 'Service discovery disabled' } unless @config['service_discovery']['enabled'] == 'true'

service_id = SecureRandom.uuid service_data = { id: service_id, name: service_name, host: service_info[:host], port: service_info[:port], protocol: service_info[:protocol] || 'http', health_endpoint: service_info[:health_endpoint] || '/health', metadata: service_info[:metadata] || {}, node_id: @config['distributed_systems']['node_id'], region: @config['distributed_systems']['region'], zone: @config['distributed_systems']['zone'], registered_at: Time.now.iso8601, last_health_check: Time.now.iso8601, status: 'healthy' }

# Store service registration @redis.hset("services:#{service_name}", service_id, service_data.to_json) @redis.sadd("service_names", service_name) # Start health checking start_health_check(service_name, service_id, service_data) { success: true, service_id: service_id, service_name: service_name } end

def deregister_service(service_name, service_id) @redis.hdel("services:#{service_name}", service_id) # Stop health check stop_health_check(service_name, service_id) # Remove service if no instances remain if @redis.hlen("services:#{service_name}") == 0 @redis.srem("service_names", service_name) end

{ success: true } end

def discover_service(service_name, filters = {}) services_data = @redis.hgetall("services:#{service_name}") services = []

services_data.each do |service_id, service_json| service = JSON.parse(service_json) # Apply filters if filters[:region] && service['region'] != filters[:region] next end if filters[:zone] && service['zone'] != filters[:zone] next end if filters[:status] && service['status'] != filters[:status] next end

services << service end

# Filter healthy services only healthy_services = services.select { |s| s['status'] == 'healthy' } { service_name: service_name, total_instances: services.length, healthy_instances: healthy_services.length, services: healthy_services } end

def get_service_instances(service_name) services_data = @redis.hgetall("services:#{service_name}") services_data.map { |service_id, service_json| JSON.parse(service_json) } end

def update_service_health(service_name, service_id, status) service_data = @redis.hget("services:#{service_name}", service_id) return { success: false, error: 'Service not found' } unless service_data

service = JSON.parse(service_data) service['status'] = status service['last_health_check'] = Time.now.iso8601

@redis.hset("services:#{service_name}", service_id, service.to_json) { success: true } end

def get_cluster_health service_names = @redis.smembers("service_names") cluster_health = { total_services: service_names.length, total_instances: 0, healthy_instances: 0, unhealthy_instances: 0, services: {} }

service_names.each do |service_name| services = get_service_instances(service_name) healthy_count = services.count { |s| s['status'] == 'healthy' } cluster_health[:services][service_name] = { total_instances: services.length, healthy_instances: healthy_count, unhealthy_instances: services.length - healthy_count } cluster_health[:total_instances] += services.length cluster_health[:healthy_instances] += healthy_count cluster_health[:unhealthy_instances] += (services.length - healthy_count) end

cluster_health end

def get_service_statistics service_names = @redis.smembers("service_names") statistics = {}

service_names.each do |service_name| services = get_service_instances(service_name) statistics[service_name] = { total_instances: services.length, healthy_instances: services.count { |s| s['status'] == 'healthy' }, regions: services.map { |s| s['region'] }.uniq, zones: services.map { |s| s['zone'] }.uniq, average_uptime: calculate_average_uptime(services) } end

statistics end

private

def setup_service_discovery case @config['service_discovery']['provider'] when 'consul' setup_consul_discovery when 'etcd' setup_etcd_discovery when 'zookeeper' setup_zookeeper_discovery end end

def setup_consul_discovery # Implementation for Consul service discovery end

def setup_etcd_discovery # Implementation for etcd service discovery end

def setup_zookeeper_discovery # Implementation for ZooKeeper service discovery end

def start_health_check(service_name, service_id, service_data) return unless @config['service_discovery']['health_check_interval']

interval = @config['service_discovery']['health_check_interval'].to_i @health_checks[service_id] = Thread.new do loop do begin health_status = check_service_health(service_data) update_service_health(service_name, service_id, health_status) rescue => e Rails.logger.error "Health check error for #{service_name}:#{service_id}: #{e.message}" update_service_health(service_name, service_id, 'unhealthy') end sleep interval end end end

def stop_health_check(service_name, service_id) thread = @health_checks[service_id] thread&.exit @health_checks.delete(service_id) end

def check_service_health(service_data) health_url = "#{service_data['protocol']}://#{service_data['host']}:#{service_data['port']}#{service_data['health_endpoint']}" uri = URI(health_url) http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = uri.scheme == 'https' http.read_timeout = 5 http.open_timeout = 5

response = http.get(uri.request_uri) if response.code == '200' 'healthy' else 'unhealthy' end rescue 'unhealthy' end

def calculate_average_uptime(services) return 0 if services.empty?

total_uptime = services.sum do |service| registered_time = Time.parse(service['registered_at']) last_check_time = Time.parse(service['last_health_check']) if service['status'] == 'healthy' (last_check_time - registered_time).to_i else 0 end end

(total_uptime.to_f / services.length / 3600).round(2) # Hours end end

⚖️ Load Balancer Implementation

Intelligent Load Balancing

lib/load_balancer.rb

require 'tusk' require 'redis' require 'json'

class LoadBalancer def initialize(config_path = 'config/distributed_systems.tsk') @config = Tusk.load(config_path) @redis = Redis.new(url: @config['redis']['url']) @service_discovery = ServiceDiscovery.new(config_path) @algorithm = @config['load_balancing']['algorithm'] @health_check_enabled = @config['load_balancing']['health_check_enabled'] == 'true' @failover_enabled = @config['load_balancing']['failover_enabled'] == 'true' end

def get_service_instance(service_name, filters = {}) return nil unless @config['load_balancing']['enabled'] == 'true'

# Discover service instances discovery_result = @service_discovery.discover_service(service_name, filters) services = discovery_result[:services]

return nil if services.empty?

# Apply load balancing algorithm selected_service = case @algorithm when 'round_robin' round_robin_select(service_name, services) when 'least_connections' least_connections_select(service_name, services) when 'weighted' weighted_select(service_name, services) else round_robin_select(service_name, services) end

# Update connection count increment_connection_count(service_name, selected_service['id'])

selected_service end

def release_connection(service_name, service_id) decrement_connection_count(service_name, service_id) end

def get_load_balancer_statistics service_names = @service_discovery.redis.smembers("service_names") statistics = {}

service_names.each do |service_name| services = @service_discovery.get_service_instances(service_name) statistics[service_name] = { algorithm: @algorithm, total_instances: services.length, healthy_instances: services.count { |s| s['status'] == 'healthy' }, connection_counts: get_connection_counts(service_name), average_connections: calculate_average_connections(service_name) } end

statistics end

def update_service_weight(service_name, service_id, weight) weight_key = "service_weight:#{service_name}:#{service_id}" @redis.set(weight_key, weight) { success: true } end

def get_service_weight(service_name, service_id) weight_key = "service_weight:#{service_name}:#{service_id}" @redis.get(weight_key).to_f || 1.0 end

private

def round_robin_select(service_name, services) current_index_key = "round_robin_index:#{service_name}" current_index = @redis.get(current_index_key).to_i selected_service = services[current_index % services.length] # Update index for next selection @redis.set(current_index_key, (current_index + 1) % services.length) selected_service end

def least_connections_select(service_name, services) services.min_by do |service| get_connection_count(service_name, service['id']) end end

def weighted_select(service_name, services) total_weight = services.sum { |service| get_service_weight(service_name, service['id']) } random_value = rand * total_weight current_weight = 0 services.each do |service| current_weight += get_service_weight(service_name, service['id']) return service if random_value <= current_weight end services.last end

def get_connection_count(service_name, service_id) connection_key = "connection_count:#{service_name}:#{service_id}" @redis.get(connection_key).to_i end

def increment_connection_count(service_name, service_id) connection_key = "connection_count:#{service_name}:#{service_id}" @redis.incr(connection_key) end

def decrement_connection_count(service_name, service_id) connection_key = "connection_count:#{service_name}:#{service_id}" current_count = @redis.get(connection_key).to_i if current_count > 0 @redis.set(connection_key, current_count - 1) end end

def get_connection_counts(service_name) services = @service_discovery.get_service_instances(service_name) connection_counts = {}

services.each do |service| connection_counts[service['id']] = get_connection_count(service_name, service['id']) end

connection_counts end

def calculate_average_connections(service_name) connection_counts = get_connection_counts(service_name) return 0 if connection_counts.empty?

total_connections = connection_counts.values.sum (total_connections.to_f / connection_counts.length).round(2) end end

🛡️ Fault Tolerance Implementation

Circuit Breaker and Retry Logic

lib/fault_tolerance.rb

require 'tusk' require 'redis' require 'json'

class FaultTolerance def initialize(config_path = 'config/distributed_systems.tsk') @config = Tusk.load(config_path) @redis = Redis.new(url: @config['redis']['url']) @circuit_breakers = {} setup_fault_tolerance end

def execute_with_fault_tolerance(service_name, operation, max_retries = nil) return { success: false, error: 'Fault tolerance disabled' } unless @config['fault_tolerance']['enabled'] == 'true'

max_retries ||= @config['fault_tolerance']['retry_attempts'].to_i timeout = @config['fault_tolerance']['timeout'].to_i

# Check circuit breaker circuit_breaker = get_circuit_breaker(service_name) if circuit_breaker.open? return { success: false, error: 'Circuit breaker is open', circuit_breaker_state: circuit_breaker.state } end

# Execute with retry logic retry_count = 0 last_error = nil

loop do begin result = execute_with_timeout(operation, timeout) # Success - close circuit breaker circuit_breaker.on_success return { success: true, result: result, retry_count: retry_count } rescue => e last_error = e retry_count += 1 # Update circuit breaker circuit_breaker.on_failure # Check if we should retry break if retry_count > max_retries # Exponential backoff sleep(2 ** retry_count) end end

{ success: false, error: last_error.message, retry_count: retry_count, circuit_breaker_state: circuit_breaker.state } end

def get_circuit_breaker(service_name) @circuit_breakers[service_name] ||= CircuitBreaker.new( service_name: service_name, failure_threshold: 5, timeout: 60, redis: @redis ) end

def get_fault_tolerance_statistics statistics = { circuit_breakers: {}, total_operations: get_total_operations, successful_operations: get_successful_operations, failed_operations: get_failed_operations, average_retry_count: get_average_retry_count }

@circuit_breakers.each do |service_name, circuit_breaker| statistics[:circuit_breakers][service_name] = { state: circuit_breaker.state, failure_count: circuit_breaker.failure_count, success_count: circuit_breaker.success_count, last_failure_time: circuit_breaker.last_failure_time, last_success_time: circuit_breaker.last_success_time } end

statistics end

def reset_circuit_breaker(service_name) circuit_breaker = get_circuit_breaker(service_name) circuit_breaker.reset { success: true } end

def force_open_circuit_breaker(service_name) circuit_breaker = get_circuit_breaker(service_name) circuit_breaker.force_open { success: true } end

def force_close_circuit_breaker(service_name) circuit_breaker = get_circuit_breaker(service_name) circuit_breaker.force_close { success: true } end

private

def setup_fault_tolerance # Initialize fault tolerance components end

def execute_with_timeout(operation, timeout) Timeout::timeout(timeout) do operation.call end end

def get_total_operations @redis.get('total_operations').to_i end

def get_successful_operations @redis.get('successful_operations').to_i end

def get_failed_operations @redis.get('failed_operations').to_i end

def get_average_retry_count total_retries = @redis.get('total_retries').to_i total_operations = get_total_operations return 0 if total_operations == 0 (total_retries.to_f / total_operations).round(2) end end

class CircuitBreaker def initialize(service_name:, failure_threshold: 5, timeout: 60, redis:) @service_name = service_name @failure_threshold = failure_threshold @timeout = timeout @redis = redis @state = :closed @failure_count = 0 @last_failure_time = nil @last_success_time = nil end

def call(&block) case @state when :open if Time.now - @last_failure_time >= @timeout @state = :half_open else raise CircuitBreakerOpenError, "Circuit breaker is open for #{@service_name}" end end

result = yield on_success result rescue => e on_failure raise e end

def open? @state == :open end

def closed? @state == :closed end

def half_open? @state == :half_open end

def on_success @failure_count = 0 @last_success_time = Time.now @state = :closed update_redis_state end

def on_failure @failure_count += 1 @last_failure_time = Time.now if @failure_count >= @failure_threshold @state = :open end update_redis_state end

def reset @failure_count = 0 @state = :closed @last_failure_time = nil @last_success_time = nil update_redis_state end

def force_open @state = :open @last_failure_time = Time.now update_redis_state end

def force_close @state = :closed @failure_count = 0 update_redis_state end

def state @state end

def failure_count @failure_count end

def success_count @redis.get("circuit_breaker_success:#{@service_name}").to_i end

def last_failure_time @last_failure_time end

def last_success_time @last_success_time end

private

def update_redis_state state_data = { state: @state, failure_count: @failure_count, last_failure_time: @last_failure_time&.iso8601, last_success_time: @last_success_time&.iso8601 }

@redis.set("circuit_breaker:#{@service_name}", state_data.to_json) end end

class CircuitBreakerOpenError < StandardError; end

🔄 Distributed Coordination

Leader Election and Consensus

lib/distributed_coordination.rb

require 'tusk' require 'redis' require 'json' require 'securerandom'

class DistributedCoordination def initialize(config_path = 'config/distributed_systems.tsk') @config = Tusk.load(config_path) @redis = Redis.new(url: @config['redis']['url']) @node_id = @config['distributed_systems']['node_id'] @cluster_name = @config['distributed_systems']['cluster_name'] @election_timeout = 30 @heartbeat_interval = 10 @is_leader = false @leader_id = nil setup_coordination end

def start_leader_election return { success: false, error: 'Already participating in election' } if @election_thread

@election_thread = Thread.new do participate_in_election end

{ success: true } end

def stop_leader_election @election_thread&.exit @election_thread = nil { success: true } end

def is_leader? @is_leader end

def get_leader_info leader_data = @redis.get("leader:#{@cluster_name}") return nil unless leader_data

leader_info = JSON.parse(leader_data) { leader_id: leader_info['leader_id'], elected_at: leader_info['elected_at'], term: leader_info['term'] } end

def get_cluster_members members_data = @redis.hgetall("cluster_members:#{@cluster_name}") members = []

members_data.each do |node_id, member_json| member = JSON.parse(member_json) members << { node_id: member['node_id'], region: member['region'], zone: member['zone'], last_heartbeat: member['last_heartbeat'], status: member['status'] } end

members end

def join_cluster member_info = { node_id: @node_id, region: @config['distributed_systems']['region'], zone: @config['distributed_systems']['zone'], joined_at: Time.now.iso8601, last_heartbeat: Time.now.iso8601, status: 'active' }

@redis.hset("cluster_members:#{@cluster_name}", @node_id, member_info.to_json) start_heartbeat { success: true } end

def leave_cluster @redis.hdel("cluster_members:#{@cluster_name}", @node_id) stop_heartbeat stop_leader_election { success: true } end

def get_coordination_statistics { node_id: @node_id, cluster_name: @cluster_name, is_leader: @is_leader, leader_id: @leader_id, cluster_members: get_cluster_members, total_members: get_cluster_members.length, active_members: get_cluster_members.count { |m| m[:status] == 'active' } } end

private

def setup_coordination # Initialize coordination components end

def participate_in_election loop do begin # Check if leader exists current_leader = get_leader_info if current_leader.nil? # No leader exists, start election start_election elsif Time.now - Time.parse(current_leader['elected_at']) > @election_timeout # Leader timeout, start election start_election else # Leader exists and is active @is_leader = false @leader_id = current_leader['leader_id'] end sleep 5 rescue => e Rails.logger.error "Election error: #{e.message}" sleep 5 end end end

def start_election # Increment term current_term = @redis.get("term:#{@cluster_name}").to_i + 1 @redis.set("term:#{@cluster_name}", current_term)

# Vote for self votes_received = 1 @redis.set("votes:#{@cluster_name}:#{@node_id}", current_term)

# Request votes from other members members = get_cluster_members members.each do |member| next if member[:node_id] == @node_id

if request_vote(member[:node_id], current_term) votes_received += 1 end end

# Check if we won the election if votes_received > members.length / 2 become_leader(current_term) end end

def request_vote(candidate_id, term) # Implementation for requesting votes # This would typically involve RPC calls to other nodes true end

def become_leader(term) leader_info = { leader_id: @node_id, elected_at: Time.now.iso8601, term: term }

@redis.set("leader:#{@cluster_name}", leader_info.to_json) @is_leader = true @leader_id = @node_id

Rails.logger.info "Became leader for term #{term}" end

def start_heartbeat @heartbeat_thread = Thread.new do loop do begin update_heartbeat sleep @heartbeat_interval rescue => e Rails.logger.error "Heartbeat error: #{e.message}" sleep @heartbeat_interval end end end end

def stop_heartbeat @heartbeat_thread&.exit @heartbeat_thread = nil end

def update_heartbeat member_data = @redis.hget("cluster_members:#{@cluster_name}", @node_id) return unless member_data

member = JSON.parse(member_data) member['last_heartbeat'] = Time.now.iso8601 member['status'] = 'active'

@redis.hset("cluster_members:#{@cluster_name}", @node_id, member.to_json) end end

🎯 Configuration Management

Distributed Systems Configuration

config/distributed_systems_features.tsk

[distributed_systems] enabled: @env("DISTRIBUTED_SYSTEMS_ENABLED", "true") cluster_name: @env("CLUSTER_NAME", "production-cluster") node_id: @env("NODE_ID", "node-1") region: @env("REGION", "us-east-1") zone: @env("ZONE", "us-east-1a") datacenter: @env("DATACENTER", "dc1")

[service_discovery] enabled: @env("SERVICE_DISCOVERY_ENABLED", "true") provider: @env("SERVICE_DISCOVERY_PROVIDER", "consul") refresh_interval: @env("SERVICE_DISCOVERY_REFRESH", "30") health_check_interval: @env("HEALTH_CHECK_INTERVAL", "10") deregister_after: @env("DEREGISTER_AFTER", "300")

[load_balancing] enabled: @env("LOAD_BALANCING_ENABLED", "true") algorithm: @env("LOAD_BALANCING_ALGORITHM", "round_robin") health_check_enabled: @env("LOAD_BALANCER_HEALTH_CHECK", "true") failover_enabled: @env("LOAD_BALANCER_FAILOVER", "true") sticky_sessions: @env("STICKY_SESSIONS_ENABLED", "false")

[fault_tolerance] enabled: @env("FAULT_TOLERANCE_ENABLED", "true") circuit_breaker_enabled: @env("CIRCUIT_BREAKER_ENABLED", "true") retry_attempts: @env("RETRY_ATTEMPTS", "3") timeout: @env("DISTRIBUTED_TIMEOUT", "30") backoff_strategy: @env("BACKOFF_STRATEGY", "exponential")

[coordination] enabled: @env("COORDINATION_ENABLED", "true") leader_election: @env("LEADER_ELECTION_ENABLED", "true") consensus_protocol: @env("CONSENSUS_PROTOCOL", "raft") election_timeout: @env("ELECTION_TIMEOUT", "30") heartbeat_interval: @env("HEARTBEAT_INTERVAL", "10")

[monitoring] metrics_enabled: @env("DISTRIBUTED_METRICS_ENABLED", "true") health_checks_enabled: @env("DISTRIBUTED_HEALTH_CHECKS", "true") alerting_enabled: @env("DISTRIBUTED_ALERTING_ENABLED", "true") tracing_enabled: @env("DISTRIBUTED_TRACING_ENABLED", "true")

🎯 Summary

This comprehensive guide covers distributed systems implementation with TuskLang and Ruby, including:

- Service Discovery: Automatic service registration, discovery, and health checking - Load Balancing: Multiple load balancing algorithms with health checks and failover - Fault Tolerance: Circuit breakers, retry logic, and timeout handling - Distributed Coordination: Leader election, consensus protocols, and cluster management - Configuration Management: Enterprise-grade distributed systems configuration

The distributed systems features with TuskLang provide a robust foundation for building systems that can scale across multiple nodes, regions, and clouds with seamless coordination, fault tolerance, and high availability.