💎 Distributed Systems with TuskLang and Ruby
Distributed Systems with TuskLang and Ruby
🌐 Scale Across the Digital Universe
TuskLang enables sophisticated distributed systems for Ruby applications, providing service discovery, load balancing, fault tolerance, and distributed coordination. Build systems that span multiple nodes, regions, and clouds with seamless coordination.
🚀 Quick Start: Service Discovery
Basic Distributed System Configuration
config/distributed_systems.tsk
[distributed_systems]
enabled: @env("DISTRIBUTED_SYSTEMS_ENABLED", "true")
cluster_name: @env("CLUSTER_NAME", "production-cluster")
node_id: @env("NODE_ID", "node-1")
region: @env("REGION", "us-east-1")
zone: @env("ZONE", "us-east-1a")[service_discovery]
enabled: @env("SERVICE_DISCOVERY_ENABLED", "true")
provider: @env("SERVICE_DISCOVERY_PROVIDER", "consul") # consul, etcd, zookeeper
refresh_interval: @env("SERVICE_DISCOVERY_REFRESH", "30")
health_check_interval: @env("HEALTH_CHECK_INTERVAL", "10")
[load_balancing]
enabled: @env("LOAD_BALANCING_ENABLED", "true")
algorithm: @env("LOAD_BALANCING_ALGORITHM", "round_robin") # round_robin, least_connections, weighted
health_check_enabled: @env("LOAD_BALANCER_HEALTH_CHECK", "true")
failover_enabled: @env("LOAD_BALANCER_FAILOVER", "true")
[fault_tolerance]
enabled: @env("FAULT_TOLERANCE_ENABLED", "true")
circuit_breaker_enabled: @env("CIRCUIT_BREAKER_ENABLED", "true")
retry_attempts: @env("RETRY_ATTEMPTS", "3")
timeout: @env("DISTRIBUTED_TIMEOUT", "30")
Service Discovery Implementation
lib/service_discovery.rb
require 'tusk'
require 'redis'
require 'json'
require 'securerandom'
require 'net/http'
require 'uri'class ServiceDiscovery
def initialize(config_path = 'config/distributed_systems.tsk')
@config = Tusk.load(config_path)
@redis = Redis.new(url: @config['redis']['url'])
@services = {}
@health_checks = {}
setup_service_discovery
end
def register_service(service_name, service_info)
return { success: false, error: 'Service discovery disabled' } unless @config['service_discovery']['enabled'] == 'true'
service_id = SecureRandom.uuid
service_data = {
id: service_id,
name: service_name,
host: service_info[:host],
port: service_info[:port],
protocol: service_info[:protocol] || 'http',
health_endpoint: service_info[:health_endpoint] || '/health',
metadata: service_info[:metadata] || {},
node_id: @config['distributed_systems']['node_id'],
region: @config['distributed_systems']['region'],
zone: @config['distributed_systems']['zone'],
registered_at: Time.now.iso8601,
last_health_check: Time.now.iso8601,
status: 'healthy'
}
# Store service registration
@redis.hset("services:#{service_name}", service_id, service_data.to_json)
@redis.sadd("service_names", service_name)
# Start health checking
start_health_check(service_name, service_id, service_data)
{
success: true,
service_id: service_id,
service_name: service_name
}
end
def deregister_service(service_name, service_id)
@redis.hdel("services:#{service_name}", service_id)
# Stop health check
stop_health_check(service_name, service_id)
# Remove service if no instances remain
if @redis.hlen("services:#{service_name}") == 0
@redis.srem("service_names", service_name)
end
{ success: true }
end
def discover_service(service_name, filters = {})
services_data = @redis.hgetall("services:#{service_name}")
services = []
services_data.each do |service_id, service_json|
service = JSON.parse(service_json)
# Apply filters
if filters[:region] && service['region'] != filters[:region]
next
end
if filters[:zone] && service['zone'] != filters[:zone]
next
end
if filters[:status] && service['status'] != filters[:status]
next
end
services << service
end
# Filter healthy services only
healthy_services = services.select { |s| s['status'] == 'healthy' }
{
service_name: service_name,
total_instances: services.length,
healthy_instances: healthy_services.length,
services: healthy_services
}
end
def get_service_instances(service_name)
services_data = @redis.hgetall("services:#{service_name}")
services_data.map { |service_id, service_json| JSON.parse(service_json) }
end
def update_service_health(service_name, service_id, status)
service_data = @redis.hget("services:#{service_name}", service_id)
return { success: false, error: 'Service not found' } unless service_data
service = JSON.parse(service_data)
service['status'] = status
service['last_health_check'] = Time.now.iso8601
@redis.hset("services:#{service_name}", service_id, service.to_json)
{ success: true }
end
def get_cluster_health
service_names = @redis.smembers("service_names")
cluster_health = {
total_services: service_names.length,
total_instances: 0,
healthy_instances: 0,
unhealthy_instances: 0,
services: {}
}
service_names.each do |service_name|
services = get_service_instances(service_name)
healthy_count = services.count { |s| s['status'] == 'healthy' }
cluster_health[:services][service_name] = {
total_instances: services.length,
healthy_instances: healthy_count,
unhealthy_instances: services.length - healthy_count
}
cluster_health[:total_instances] += services.length
cluster_health[:healthy_instances] += healthy_count
cluster_health[:unhealthy_instances] += (services.length - healthy_count)
end
cluster_health
end
def get_service_statistics
service_names = @redis.smembers("service_names")
statistics = {}
service_names.each do |service_name|
services = get_service_instances(service_name)
statistics[service_name] = {
total_instances: services.length,
healthy_instances: services.count { |s| s['status'] == 'healthy' },
regions: services.map { |s| s['region'] }.uniq,
zones: services.map { |s| s['zone'] }.uniq,
average_uptime: calculate_average_uptime(services)
}
end
statistics
end
private
def setup_service_discovery
case @config['service_discovery']['provider']
when 'consul'
setup_consul_discovery
when 'etcd'
setup_etcd_discovery
when 'zookeeper'
setup_zookeeper_discovery
end
end
def setup_consul_discovery
# Implementation for Consul service discovery
end
def setup_etcd_discovery
# Implementation for etcd service discovery
end
def setup_zookeeper_discovery
# Implementation for ZooKeeper service discovery
end
def start_health_check(service_name, service_id, service_data)
return unless @config['service_discovery']['health_check_interval']
interval = @config['service_discovery']['health_check_interval'].to_i
@health_checks[service_id] = Thread.new do
loop do
begin
health_status = check_service_health(service_data)
update_service_health(service_name, service_id, health_status)
rescue => e
Rails.logger.error "Health check error for #{service_name}:#{service_id}: #{e.message}"
update_service_health(service_name, service_id, 'unhealthy')
end
sleep interval
end
end
end
def stop_health_check(service_name, service_id)
thread = @health_checks[service_id]
thread&.exit
@health_checks.delete(service_id)
end
def check_service_health(service_data)
health_url = "#{service_data['protocol']}://#{service_data['host']}:#{service_data['port']}#{service_data['health_endpoint']}"
uri = URI(health_url)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = uri.scheme == 'https'
http.read_timeout = 5
http.open_timeout = 5
response = http.get(uri.request_uri)
if response.code == '200'
'healthy'
else
'unhealthy'
end
rescue
'unhealthy'
end
def calculate_average_uptime(services)
return 0 if services.empty?
total_uptime = services.sum do |service|
registered_time = Time.parse(service['registered_at'])
last_check_time = Time.parse(service['last_health_check'])
if service['status'] == 'healthy'
(last_check_time - registered_time).to_i
else
0
end
end
(total_uptime.to_f / services.length / 3600).round(2) # Hours
end
end
⚖️ Load Balancer Implementation
Intelligent Load Balancing
lib/load_balancer.rb
require 'tusk'
require 'redis'
require 'json'class LoadBalancer
def initialize(config_path = 'config/distributed_systems.tsk')
@config = Tusk.load(config_path)
@redis = Redis.new(url: @config['redis']['url'])
@service_discovery = ServiceDiscovery.new(config_path)
@algorithm = @config['load_balancing']['algorithm']
@health_check_enabled = @config['load_balancing']['health_check_enabled'] == 'true'
@failover_enabled = @config['load_balancing']['failover_enabled'] == 'true'
end
def get_service_instance(service_name, filters = {})
return nil unless @config['load_balancing']['enabled'] == 'true'
# Discover service instances
discovery_result = @service_discovery.discover_service(service_name, filters)
services = discovery_result[:services]
return nil if services.empty?
# Apply load balancing algorithm
selected_service = case @algorithm
when 'round_robin'
round_robin_select(service_name, services)
when 'least_connections'
least_connections_select(service_name, services)
when 'weighted'
weighted_select(service_name, services)
else
round_robin_select(service_name, services)
end
# Update connection count
increment_connection_count(service_name, selected_service['id'])
selected_service
end
def release_connection(service_name, service_id)
decrement_connection_count(service_name, service_id)
end
def get_load_balancer_statistics
service_names = @service_discovery.redis.smembers("service_names")
statistics = {}
service_names.each do |service_name|
services = @service_discovery.get_service_instances(service_name)
statistics[service_name] = {
algorithm: @algorithm,
total_instances: services.length,
healthy_instances: services.count { |s| s['status'] == 'healthy' },
connection_counts: get_connection_counts(service_name),
average_connections: calculate_average_connections(service_name)
}
end
statistics
end
def update_service_weight(service_name, service_id, weight)
weight_key = "service_weight:#{service_name}:#{service_id}"
@redis.set(weight_key, weight)
{ success: true }
end
def get_service_weight(service_name, service_id)
weight_key = "service_weight:#{service_name}:#{service_id}"
@redis.get(weight_key).to_f || 1.0
end
private
def round_robin_select(service_name, services)
current_index_key = "round_robin_index:#{service_name}"
current_index = @redis.get(current_index_key).to_i
selected_service = services[current_index % services.length]
# Update index for next selection
@redis.set(current_index_key, (current_index + 1) % services.length)
selected_service
end
def least_connections_select(service_name, services)
services.min_by do |service|
get_connection_count(service_name, service['id'])
end
end
def weighted_select(service_name, services)
total_weight = services.sum { |service| get_service_weight(service_name, service['id']) }
random_value = rand * total_weight
current_weight = 0
services.each do |service|
current_weight += get_service_weight(service_name, service['id'])
return service if random_value <= current_weight
end
services.last
end
def get_connection_count(service_name, service_id)
connection_key = "connection_count:#{service_name}:#{service_id}"
@redis.get(connection_key).to_i
end
def increment_connection_count(service_name, service_id)
connection_key = "connection_count:#{service_name}:#{service_id}"
@redis.incr(connection_key)
end
def decrement_connection_count(service_name, service_id)
connection_key = "connection_count:#{service_name}:#{service_id}"
current_count = @redis.get(connection_key).to_i
if current_count > 0
@redis.set(connection_key, current_count - 1)
end
end
def get_connection_counts(service_name)
services = @service_discovery.get_service_instances(service_name)
connection_counts = {}
services.each do |service|
connection_counts[service['id']] = get_connection_count(service_name, service['id'])
end
connection_counts
end
def calculate_average_connections(service_name)
connection_counts = get_connection_counts(service_name)
return 0 if connection_counts.empty?
total_connections = connection_counts.values.sum
(total_connections.to_f / connection_counts.length).round(2)
end
end
🛡️ Fault Tolerance Implementation
Circuit Breaker and Retry Logic
lib/fault_tolerance.rb
require 'tusk'
require 'redis'
require 'json'class FaultTolerance
def initialize(config_path = 'config/distributed_systems.tsk')
@config = Tusk.load(config_path)
@redis = Redis.new(url: @config['redis']['url'])
@circuit_breakers = {}
setup_fault_tolerance
end
def execute_with_fault_tolerance(service_name, operation, max_retries = nil)
return { success: false, error: 'Fault tolerance disabled' } unless @config['fault_tolerance']['enabled'] == 'true'
max_retries ||= @config['fault_tolerance']['retry_attempts'].to_i
timeout = @config['fault_tolerance']['timeout'].to_i
# Check circuit breaker
circuit_breaker = get_circuit_breaker(service_name)
if circuit_breaker.open?
return {
success: false,
error: 'Circuit breaker is open',
circuit_breaker_state: circuit_breaker.state
}
end
# Execute with retry logic
retry_count = 0
last_error = nil
loop do
begin
result = execute_with_timeout(operation, timeout)
# Success - close circuit breaker
circuit_breaker.on_success
return { success: true, result: result, retry_count: retry_count }
rescue => e
last_error = e
retry_count += 1
# Update circuit breaker
circuit_breaker.on_failure
# Check if we should retry
break if retry_count > max_retries
# Exponential backoff
sleep(2 ** retry_count)
end
end
{
success: false,
error: last_error.message,
retry_count: retry_count,
circuit_breaker_state: circuit_breaker.state
}
end
def get_circuit_breaker(service_name)
@circuit_breakers[service_name] ||= CircuitBreaker.new(
service_name: service_name,
failure_threshold: 5,
timeout: 60,
redis: @redis
)
end
def get_fault_tolerance_statistics
statistics = {
circuit_breakers: {},
total_operations: get_total_operations,
successful_operations: get_successful_operations,
failed_operations: get_failed_operations,
average_retry_count: get_average_retry_count
}
@circuit_breakers.each do |service_name, circuit_breaker|
statistics[:circuit_breakers][service_name] = {
state: circuit_breaker.state,
failure_count: circuit_breaker.failure_count,
success_count: circuit_breaker.success_count,
last_failure_time: circuit_breaker.last_failure_time,
last_success_time: circuit_breaker.last_success_time
}
end
statistics
end
def reset_circuit_breaker(service_name)
circuit_breaker = get_circuit_breaker(service_name)
circuit_breaker.reset
{ success: true }
end
def force_open_circuit_breaker(service_name)
circuit_breaker = get_circuit_breaker(service_name)
circuit_breaker.force_open
{ success: true }
end
def force_close_circuit_breaker(service_name)
circuit_breaker = get_circuit_breaker(service_name)
circuit_breaker.force_close
{ success: true }
end
private
def setup_fault_tolerance
# Initialize fault tolerance components
end
def execute_with_timeout(operation, timeout)
Timeout::timeout(timeout) do
operation.call
end
end
def get_total_operations
@redis.get('total_operations').to_i
end
def get_successful_operations
@redis.get('successful_operations').to_i
end
def get_failed_operations
@redis.get('failed_operations').to_i
end
def get_average_retry_count
total_retries = @redis.get('total_retries').to_i
total_operations = get_total_operations
return 0 if total_operations == 0
(total_retries.to_f / total_operations).round(2)
end
end
class CircuitBreaker
def initialize(service_name:, failure_threshold: 5, timeout: 60, redis:)
@service_name = service_name
@failure_threshold = failure_threshold
@timeout = timeout
@redis = redis
@state = :closed
@failure_count = 0
@last_failure_time = nil
@last_success_time = nil
end
def call(&block)
case @state
when :open
if Time.now - @last_failure_time >= @timeout
@state = :half_open
else
raise CircuitBreakerOpenError, "Circuit breaker is open for #{@service_name}"
end
end
result = yield
on_success
result
rescue => e
on_failure
raise e
end
def open?
@state == :open
end
def closed?
@state == :closed
end
def half_open?
@state == :half_open
end
def on_success
@failure_count = 0
@last_success_time = Time.now
@state = :closed
update_redis_state
end
def on_failure
@failure_count += 1
@last_failure_time = Time.now
if @failure_count >= @failure_threshold
@state = :open
end
update_redis_state
end
def reset
@failure_count = 0
@state = :closed
@last_failure_time = nil
@last_success_time = nil
update_redis_state
end
def force_open
@state = :open
@last_failure_time = Time.now
update_redis_state
end
def force_close
@state = :closed
@failure_count = 0
update_redis_state
end
def state
@state
end
def failure_count
@failure_count
end
def success_count
@redis.get("circuit_breaker_success:#{@service_name}").to_i
end
def last_failure_time
@last_failure_time
end
def last_success_time
@last_success_time
end
private
def update_redis_state
state_data = {
state: @state,
failure_count: @failure_count,
last_failure_time: @last_failure_time&.iso8601,
last_success_time: @last_success_time&.iso8601
}
@redis.set("circuit_breaker:#{@service_name}", state_data.to_json)
end
end
class CircuitBreakerOpenError < StandardError; end
🔄 Distributed Coordination
Leader Election and Consensus
lib/distributed_coordination.rb
require 'tusk'
require 'redis'
require 'json'
require 'securerandom'class DistributedCoordination
def initialize(config_path = 'config/distributed_systems.tsk')
@config = Tusk.load(config_path)
@redis = Redis.new(url: @config['redis']['url'])
@node_id = @config['distributed_systems']['node_id']
@cluster_name = @config['distributed_systems']['cluster_name']
@election_timeout = 30
@heartbeat_interval = 10
@is_leader = false
@leader_id = nil
setup_coordination
end
def start_leader_election
return { success: false, error: 'Already participating in election' } if @election_thread
@election_thread = Thread.new do
participate_in_election
end
{ success: true }
end
def stop_leader_election
@election_thread&.exit
@election_thread = nil
{ success: true }
end
def is_leader?
@is_leader
end
def get_leader_info
leader_data = @redis.get("leader:#{@cluster_name}")
return nil unless leader_data
leader_info = JSON.parse(leader_data)
{
leader_id: leader_info['leader_id'],
elected_at: leader_info['elected_at'],
term: leader_info['term']
}
end
def get_cluster_members
members_data = @redis.hgetall("cluster_members:#{@cluster_name}")
members = []
members_data.each do |node_id, member_json|
member = JSON.parse(member_json)
members << {
node_id: member['node_id'],
region: member['region'],
zone: member['zone'],
last_heartbeat: member['last_heartbeat'],
status: member['status']
}
end
members
end
def join_cluster
member_info = {
node_id: @node_id,
region: @config['distributed_systems']['region'],
zone: @config['distributed_systems']['zone'],
joined_at: Time.now.iso8601,
last_heartbeat: Time.now.iso8601,
status: 'active'
}
@redis.hset("cluster_members:#{@cluster_name}", @node_id, member_info.to_json)
start_heartbeat
{ success: true }
end
def leave_cluster
@redis.hdel("cluster_members:#{@cluster_name}", @node_id)
stop_heartbeat
stop_leader_election
{ success: true }
end
def get_coordination_statistics
{
node_id: @node_id,
cluster_name: @cluster_name,
is_leader: @is_leader,
leader_id: @leader_id,
cluster_members: get_cluster_members,
total_members: get_cluster_members.length,
active_members: get_cluster_members.count { |m| m[:status] == 'active' }
}
end
private
def setup_coordination
# Initialize coordination components
end
def participate_in_election
loop do
begin
# Check if leader exists
current_leader = get_leader_info
if current_leader.nil?
# No leader exists, start election
start_election
elsif Time.now - Time.parse(current_leader['elected_at']) > @election_timeout
# Leader timeout, start election
start_election
else
# Leader exists and is active
@is_leader = false
@leader_id = current_leader['leader_id']
end
sleep 5
rescue => e
Rails.logger.error "Election error: #{e.message}"
sleep 5
end
end
end
def start_election
# Increment term
current_term = @redis.get("term:#{@cluster_name}").to_i + 1
@redis.set("term:#{@cluster_name}", current_term)
# Vote for self
votes_received = 1
@redis.set("votes:#{@cluster_name}:#{@node_id}", current_term)
# Request votes from other members
members = get_cluster_members
members.each do |member|
next if member[:node_id] == @node_id
if request_vote(member[:node_id], current_term)
votes_received += 1
end
end
# Check if we won the election
if votes_received > members.length / 2
become_leader(current_term)
end
end
def request_vote(candidate_id, term)
# Implementation for requesting votes
# This would typically involve RPC calls to other nodes
true
end
def become_leader(term)
leader_info = {
leader_id: @node_id,
elected_at: Time.now.iso8601,
term: term
}
@redis.set("leader:#{@cluster_name}", leader_info.to_json)
@is_leader = true
@leader_id = @node_id
Rails.logger.info "Became leader for term #{term}"
end
def start_heartbeat
@heartbeat_thread = Thread.new do
loop do
begin
update_heartbeat
sleep @heartbeat_interval
rescue => e
Rails.logger.error "Heartbeat error: #{e.message}"
sleep @heartbeat_interval
end
end
end
end
def stop_heartbeat
@heartbeat_thread&.exit
@heartbeat_thread = nil
end
def update_heartbeat
member_data = @redis.hget("cluster_members:#{@cluster_name}", @node_id)
return unless member_data
member = JSON.parse(member_data)
member['last_heartbeat'] = Time.now.iso8601
member['status'] = 'active'
@redis.hset("cluster_members:#{@cluster_name}", @node_id, member.to_json)
end
end
🎯 Configuration Management
Distributed Systems Configuration
config/distributed_systems_features.tsk
[distributed_systems]
enabled: @env("DISTRIBUTED_SYSTEMS_ENABLED", "true")
cluster_name: @env("CLUSTER_NAME", "production-cluster")
node_id: @env("NODE_ID", "node-1")
region: @env("REGION", "us-east-1")
zone: @env("ZONE", "us-east-1a")
datacenter: @env("DATACENTER", "dc1")[service_discovery]
enabled: @env("SERVICE_DISCOVERY_ENABLED", "true")
provider: @env("SERVICE_DISCOVERY_PROVIDER", "consul")
refresh_interval: @env("SERVICE_DISCOVERY_REFRESH", "30")
health_check_interval: @env("HEALTH_CHECK_INTERVAL", "10")
deregister_after: @env("DEREGISTER_AFTER", "300")
[load_balancing]
enabled: @env("LOAD_BALANCING_ENABLED", "true")
algorithm: @env("LOAD_BALANCING_ALGORITHM", "round_robin")
health_check_enabled: @env("LOAD_BALANCER_HEALTH_CHECK", "true")
failover_enabled: @env("LOAD_BALANCER_FAILOVER", "true")
sticky_sessions: @env("STICKY_SESSIONS_ENABLED", "false")
[fault_tolerance]
enabled: @env("FAULT_TOLERANCE_ENABLED", "true")
circuit_breaker_enabled: @env("CIRCUIT_BREAKER_ENABLED", "true")
retry_attempts: @env("RETRY_ATTEMPTS", "3")
timeout: @env("DISTRIBUTED_TIMEOUT", "30")
backoff_strategy: @env("BACKOFF_STRATEGY", "exponential")
[coordination]
enabled: @env("COORDINATION_ENABLED", "true")
leader_election: @env("LEADER_ELECTION_ENABLED", "true")
consensus_protocol: @env("CONSENSUS_PROTOCOL", "raft")
election_timeout: @env("ELECTION_TIMEOUT", "30")
heartbeat_interval: @env("HEARTBEAT_INTERVAL", "10")
[monitoring]
metrics_enabled: @env("DISTRIBUTED_METRICS_ENABLED", "true")
health_checks_enabled: @env("DISTRIBUTED_HEALTH_CHECKS", "true")
alerting_enabled: @env("DISTRIBUTED_ALERTING_ENABLED", "true")
tracing_enabled: @env("DISTRIBUTED_TRACING_ENABLED", "true")
🎯 Summary
This comprehensive guide covers distributed systems implementation with TuskLang and Ruby, including:
- Service Discovery: Automatic service registration, discovery, and health checking - Load Balancing: Multiple load balancing algorithms with health checks and failover - Fault Tolerance: Circuit breakers, retry logic, and timeout handling - Distributed Coordination: Leader election, consensus protocols, and cluster management - Configuration Management: Enterprise-grade distributed systems configuration
The distributed systems features with TuskLang provide a robust foundation for building systems that can scale across multiple nodes, regions, and clouds with seamless coordination, fault tolerance, and high availability.