💎 IoT Integration with TuskLang and Ruby
IoT Integration with TuskLang and Ruby
🔌 Connect the Physical and Digital Worlds
TuskLang enables sophisticated IoT integration for Ruby applications, providing device management, sensor data processing, and IoT platform connectivity. Build applications that bridge the gap between physical devices and digital systems.
🚀 Quick Start: IoT Platform Setup
Basic IoT Configuration
config/iot.tsk
[iot]
enabled: @env("IOT_ENABLED", "true")
platform: @env("IOT_PLATFORM", "aws_iot") # aws_iot, azure_iot, google_iot, custom
mqtt_broker: @env("MQTT_BROKER_URL", "mqtt://localhost:1883")
websocket_enabled: @env("IOT_WEBSOCKET_ENABLED", "true")
websocket_port: @env("IOT_WEBSOCKET_PORT", "8080")[device_management]
enabled: @env("DEVICE_MANAGEMENT_ENABLED", "true")
auto_registration: @env("DEVICE_AUTO_REGISTRATION", "true")
device_authentication: @env("DEVICE_AUTHENTICATION", "true")
firmware_updates: @env("FIRMWARE_UPDATES_ENABLED", "true")
[sensor_data]
enabled: @env("SENSOR_DATA_ENABLED", "true")
data_retention: @env("SENSOR_DATA_RETENTION", "30d")
real_time_processing: @env("REAL_TIME_SENSOR_PROCESSING", "true")
data_aggregation: @env("SENSOR_DATA_AGGREGATION", "true")
IoT Platform Implementation
lib/iot_platform.rb
require 'tusk'
require 'redis'
require 'json'
require 'securerandom'
require 'mqtt'class IoTPlatform
def initialize(config_path = 'config/iot.tsk')
@config = Tusk.load(config_path)
@redis = Redis.new(url: @config['redis']['url'])
@devices = {}
@sensors = {}
@mqtt_client = nil
setup_iot_platform
end
def register_device(device_info)
return { success: false, error: 'IoT disabled' } unless @config['iot']['enabled'] == 'true'
device_id = SecureRandom.uuid
device = {
id: device_id,
name: device_info[:name],
type: device_info[:type],
model: device_info[:model],
location: device_info[:location] || {},
capabilities: device_info[:capabilities] || [],
status: 'offline',
registered_at: Time.now.iso8601,
last_seen: nil,
firmware_version: device_info[:firmware_version] || '1.0.0',
metadata: device_info[:metadata] || {}
}
@devices[device_id] = device
@redis.hset('iot_devices', device_id, device.to_json)
# Generate device credentials
credentials = generate_device_credentials(device_id)
{
success: true,
device_id: device_id,
device: device,
credentials: credentials
}
end
def connect_device(device_id, credentials)
return { success: false, error: 'Device not found' } unless @devices[device_id]
device = @devices[device_id]
# Validate credentials
unless validate_device_credentials(device_id, credentials)
return { success: false, error: 'Invalid credentials' }
end
# Update device status
device[:status] = 'online'
device[:last_seen] = Time.now.iso8601
device[:connection_id] = SecureRandom.uuid
@redis.hset('iot_devices', device_id, device.to_json)
# Subscribe to device topics
subscribe_to_device_topics(device_id)
{
success: true,
device_id: device_id,
connection_id: device[:connection_id],
topics: get_device_topics(device_id)
}
end
def disconnect_device(device_id)
return { success: false, error: 'Device not found' } unless @devices[device_id]
device = @devices[device_id]
device[:status] = 'offline'
device[:last_seen] = Time.now.iso8601
device[:connection_id] = nil
@redis.hset('iot_devices', device_id, device.to_json)
# Unsubscribe from device topics
unsubscribe_from_device_topics(device_id)
{
success: true,
device_id: device_id
}
end
def publish_sensor_data(device_id, sensor_data)
return { success: false, error: 'Device not found' } unless @devices[device_id]
device = @devices[device_id]
return { success: false, error: 'Device offline' } unless device[:status] == 'online'
# Process and store sensor data
processed_data = process_sensor_data(device_id, sensor_data)
# Publish to MQTT
publish_to_mqtt(device_id, processed_data)
# Store in database
store_sensor_data(device_id, processed_data)
# Trigger real-time processing
trigger_real_time_processing(device_id, processed_data) if @config['sensor_data']['real_time_processing'] == 'true'
{
success: true,
device_id: device_id,
data_id: processed_data[:id],
timestamp: processed_data[:timestamp]
}
end
def get_device_status(device_id)
return nil unless @devices[device_id]
device = @devices[device_id]
{
device_id: device_id,
name: device[:name],
status: device[:status],
last_seen: device[:last_seen],
firmware_version: device[:firmware_version],
sensor_count: get_device_sensor_count(device_id),
data_points: get_device_data_points(device_id)
}
end
def update_device_firmware(device_id, firmware_info)
return { success: false, error: 'Device not found' } unless @devices[device_id]
device = @devices[device_id]
firmware_update = {
id: SecureRandom.uuid,
device_id: device_id,
current_version: device[:firmware_version],
target_version: firmware_info[:version],
download_url: firmware_info[:download_url],
checksum: firmware_info[:checksum],
status: 'pending',
created_at: Time.now.iso8601
}
@redis.hset('firmware_updates', firmware_update[:id], firmware_update.to_json)
# Notify device of firmware update
notify_device_firmware_update(device_id, firmware_update)
{
success: true,
firmware_update_id: firmware_update[:id],
device_id: device_id
}
end
def get_sensor_data(device_id, sensor_type = nil, start_time = nil, end_time = nil)
return { success: false, error: 'Device not found' } unless @devices[device_id]
# Get sensor data from database
data = retrieve_sensor_data(device_id, sensor_type, start_time, end_time)
{
success: true,
device_id: device_id,
sensor_type: sensor_type,
data_points: data.length,
data: data
}
end
def create_alert_rule(rule_config)
rule_id = SecureRandom.uuid
rule = {
id: rule_id,
name: rule_config[:name],
device_id: rule_config[:device_id],
sensor_type: rule_config[:sensor_type],
condition: rule_config[:condition], # >, <, >=, <=, ==, !=
threshold: rule_config[:threshold],
action: rule_config[:action], # email, webhook, mqtt
enabled: rule_config[:enabled] || true,
created_at: Time.now.iso8601
}
@redis.hset('alert_rules', rule_id, rule.to_json)
{
success: true,
rule_id: rule_id,
rule: rule
}
end
def get_iot_statistics
{
total_devices: @devices.length,
online_devices: @devices.count { |_, device| device[:status] == 'online' },
offline_devices: @devices.count { |_, device| device[:status] == 'offline' },
total_sensors: get_total_sensors,
total_data_points: get_total_data_points,
alert_rules: get_alert_rules_count
}
end
private
def setup_iot_platform
# Initialize IoT platform components
setup_mqtt_client if @config['iot']['mqtt_broker']
end
def setup_mqtt_client
begin
@mqtt_client = MQTT::Client.connect(@config['iot']['mqtt_broker'])
# Subscribe to system topics
@mqtt_client.subscribe('iot/+/status')
@mqtt_client.subscribe('iot/+/data')
@mqtt_client.subscribe('iot/+/firmware')
# Start MQTT message handling
Thread.new { handle_mqtt_messages }
rescue => e
Rails.logger.error "MQTT connection failed: #{e.message}"
end
end
def generate_device_credentials(device_id)
{
client_id: "device_#{device_id}",
username: "device_#{device_id}",
password: SecureRandom.hex(32),
certificate: generate_device_certificate(device_id)
}
end
def generate_device_certificate(device_id)
# Implementation for generating device certificate
"certificate_placeholder_for_#{device_id}"
end
def validate_device_credentials(device_id, credentials)
# Implementation for validating device credentials
true
end
def subscribe_to_device_topics(device_id)
topics = get_device_topics(device_id)
topics.each do |topic|
@mqtt_client&.subscribe(topic)
end
end
def unsubscribe_from_device_topics(device_id)
topics = get_device_topics(device_id)
topics.each do |topic|
@mqtt_client&.unsubscribe(topic)
end
end
def get_device_topics(device_id)
[
"iot/#{device_id}/status",
"iot/#{device_id}/data",
"iot/#{device_id}/firmware",
"iot/#{device_id}/commands"
]
end
def process_sensor_data(device_id, sensor_data)
processed_data = {
id: SecureRandom.uuid,
device_id: device_id,
timestamp: Time.now.iso8601,
sensors: {}
}
sensor_data.each do |sensor_type, value|
processed_data[:sensors][sensor_type] = {
value: value,
unit: get_sensor_unit(sensor_type),
quality: assess_data_quality(value, sensor_type)
}
end
processed_data
end
def get_sensor_unit(sensor_type)
units = {
'temperature' => '°C',
'humidity' => '%',
'pressure' => 'hPa',
'light' => 'lux',
'motion' => 'boolean'
}
units[sensor_type] || 'unknown'
end
def assess_data_quality(value, sensor_type)
# Implementation for data quality assessment
'good'
end
def publish_to_mqtt(device_id, data)
topic = "iot/#{device_id}/data"
payload = data.to_json
@mqtt_client&.publish(topic, payload)
end
def store_sensor_data(device_id, data)
# Store in Redis for real-time access
@redis.lpush("sensor_data:#{device_id}", data.to_json)
@redis.ltrim("sensor_data:#{device_id}", 0, 9999) # Keep last 10k data points
# Store in database for long-term storage
store_in_database(device_id, data)
end
def store_in_database(device_id, data)
# Implementation for storing in database
end
def trigger_real_time_processing(device_id, data)
# Implementation for real-time processing
check_alert_rules(device_id, data)
end
def handle_mqtt_messages
@mqtt_client&.get do |topic, message|
begin
data = JSON.parse(message)
handle_device_message(topic, data)
rescue => e
Rails.logger.error "MQTT message handling error: #{e.message}"
end
end
end
def handle_device_message(topic, data)
device_id = extract_device_id_from_topic(topic)
message_type = extract_message_type_from_topic(topic)
case message_type
when 'status'
handle_device_status_update(device_id, data)
when 'data'
handle_sensor_data(device_id, data)
when 'firmware'
handle_firmware_update_response(device_id, data)
end
end
def extract_device_id_from_topic(topic)
topic.split('/')[1]
end
def extract_message_type_from_topic(topic)
topic.split('/')[2]
end
def handle_device_status_update(device_id, data)
return unless @devices[device_id]
device = @devices[device_id]
device[:status] = data['status']
device[:last_seen] = Time.now.iso8601
@redis.hset('iot_devices', device_id, device.to_json)
end
def handle_sensor_data(device_id, data)
publish_sensor_data(device_id, data['sensors'])
end
def handle_firmware_update_response(device_id, data)
# Implementation for handling firmware update responses
end
def notify_device_firmware_update(device_id, firmware_update)
topic = "iot/#{device_id}/firmware"
payload = {
action: 'update',
firmware_id: firmware_update[:id],
version: firmware_update[:target_version],
download_url: firmware_update[:download_url],
checksum: firmware_update[:checksum]
}.to_json
@mqtt_client&.publish(topic, payload)
end
def retrieve_sensor_data(device_id, sensor_type, start_time, end_time)
# Implementation for retrieving sensor data
[]
end
def check_alert_rules(device_id, data)
rules = get_device_alert_rules(device_id)
rules.each do |rule|
if should_trigger_alert(rule, data)
trigger_alert(rule, data)
end
end
end
def get_device_alert_rules(device_id)
rules_data = @redis.hgetall('alert_rules')
rules = []
rules_data.each do |rule_id, rule_json|
rule = JSON.parse(rule_json)
if rule['device_id'] == device_id && rule['enabled']
rules << rule
end
end
rules
end
def should_trigger_alert(rule, data)
sensor_value = data[:sensors][rule['sensor_type']]&.dig(:value)
return false unless sensor_value
case rule['condition']
when '>'
sensor_value > rule['threshold']
when '<'
sensor_value < rule['threshold']
when '>='
sensor_value >= rule['threshold']
when '<='
sensor_value <= rule['threshold']
when '=='
sensor_value == rule['threshold']
when '!='
sensor_value != rule['threshold']
else
false
end
end
def trigger_alert(rule, data)
alert = {
id: SecureRandom.uuid,
rule_id: rule['id'],
device_id: rule['device_id'],
sensor_type: rule['sensor_type'],
threshold: rule['threshold'],
actual_value: data[:sensors][rule['sensor_type']][:value],
triggered_at: Time.now.iso8601
}
@redis.lpush('iot_alerts', alert.to_json)
# Execute alert action
execute_alert_action(rule['action'], alert)
end
def execute_alert_action(action, alert)
case action
when 'email'
send_alert_email(alert)
when 'webhook'
send_alert_webhook(alert)
when 'mqtt'
publish_alert_mqtt(alert)
end
end
def send_alert_email(alert)
# Implementation for sending alert email
end
def send_alert_webhook(alert)
# Implementation for sending alert webhook
end
def publish_alert_mqtt(alert)
topic = "iot/alerts"
@mqtt_client&.publish(topic, alert.to_json)
end
def get_device_sensor_count(device_id)
# Implementation for getting device sensor count
0
end
def get_device_data_points(device_id)
# Implementation for getting device data points
0
end
def get_total_sensors
# Implementation for getting total sensors
0
end
def get_total_data_points
# Implementation for getting total data points
0
end
def get_alert_rules_count
@redis.hlen('alert_rules')
end
end
📊 Sensor Data Processing
Real-Time Data Analytics
lib/sensor_data_processor.rb
require 'tusk'
require 'redis'
require 'json'class SensorDataProcessor
def initialize(config_path = 'config/iot.tsk')
@config = Tusk.load(config_path)
@redis = Redis.new(url: @config['redis']['url'])
@processors = {}
@aggregators = {}
setup_data_processor
end
def process_sensor_data(data)
return { success: false, error: 'Sensor data processing disabled' } unless @config['sensor_data']['enabled'] == 'true'
processed_data = data.dup
# Apply data processors
@processors.each do |processor_name, processor|
result = apply_processor(processor, processed_data)
processed_data = result if result
end
# Apply aggregators if enabled
if @config['sensor_data']['data_aggregation'] == 'true'
aggregated_data = apply_aggregators(processed_data)
processed_data.merge!(aggregated_data)
end
# Store processed data
store_processed_data(processed_data)
{
success: true,
processed_data: processed_data
}
end
def add_data_processor(processor_name, processor_config)
processor_id = SecureRandom.uuid
processor = {
id: processor_id,
name: processor_name,
type: processor_config[:type],
config: processor_config,
enabled: processor_config[:enabled] || true,
created_at: Time.now.iso8601
}
@processors[processor_name] = processor
@redis.hset('data_processors', processor_name, processor.to_json)
{
success: true,
processor_id: processor_id,
processor_name: processor_name
}
end
def add_data_aggregator(aggregator_name, aggregator_config)
aggregator_id = SecureRandom.uuid
aggregator = {
id: aggregator_id,
name: aggregator_name,
type: aggregator_config[:type],
config: aggregator_config,
enabled: aggregator_config[:enabled] || true,
created_at: Time.now.iso8601
}
@aggregators[aggregator_name] = aggregator
@redis.hset('data_aggregators', aggregator_name, aggregator.to_json)
{
success: true,
aggregator_id: aggregator_id,
aggregator_name: aggregator_name
}
end
def get_data_analytics(device_id, time_range = '24h')
# Get sensor data for analysis
sensor_data = get_sensor_data_for_analysis(device_id, time_range)
analytics = {
device_id: device_id,
time_range: time_range,
data_points: sensor_data.length,
statistics: calculate_statistics(sensor_data),
trends: analyze_trends(sensor_data),
anomalies: detect_anomalies(sensor_data)
}
analytics
end
def create_data_visualization(device_id, sensor_type, time_range = '24h')
sensor_data = get_sensor_data_for_analysis(device_id, time_range)
visualization = {
device_id: device_id,
sensor_type: sensor_type,
time_range: time_range,
data_points: sensor_data.length,
chart_data: prepare_chart_data(sensor_data, sensor_type),
summary: generate_data_summary(sensor_data, sensor_type)
}
visualization
end
def export_sensor_data(device_id, start_time, end_time, format = 'json')
sensor_data = get_sensor_data_for_analysis(device_id, "#{start_time}_#{end_time}")
case format
when 'json'
export_as_json(sensor_data)
when 'csv'
export_as_csv(sensor_data)
when 'xml'
export_as_xml(sensor_data)
else
{ success: false, error: "Unsupported format: #{format}" }
end
end
private
def setup_data_processor
# Initialize data processor components
end
def apply_processor(processor, data)
case processor[:type]
when 'filter'
apply_filter_processor(processor, data)
when 'transform'
apply_transform_processor(processor, data)
when 'validate'
apply_validation_processor(processor, data)
when 'enrich'
apply_enrichment_processor(processor, data)
else
data
end
end
def apply_filter_processor(processor, data)
config = processor[:config]
data[:sensors].select! do |sensor_type, sensor_data|
value = sensor_data[:value]
if config[:min_value] && value < config[:min_value]
false
elsif config[:max_value] && value > config[:max_value]
false
else
true
end
end
data
end
def apply_transform_processor(processor, data)
config = processor[:config]
data[:sensors].transform_values! do |sensor_data|
value = sensor_data[:value]
case config[:transformation]
when 'scale'
sensor_data[:value] = value * config[:scale_factor]
when 'offset'
sensor_data[:value] = value + config[:offset]
when 'convert'
sensor_data[:value] = convert_units(value, config[:from_unit], config[:to_unit])
end
sensor_data
end
data
end
def apply_validation_processor(processor, data)
config = processor[:config]
data[:sensors].each do |sensor_type, sensor_data|
value = sensor_data[:value]
if config[:valid_range] && (value < config[:valid_range][:min] || value > config[:valid_range][:max])
sensor_data[:quality] = 'invalid'
elsif config[:warning_range] && (value < config[:warning_range][:min] || value > config[:warning_range][:max])
sensor_data[:quality] = 'warning'
else
sensor_data[:quality] = 'good'
end
end
data
end
def apply_enrichment_processor(processor, data)
config = processor[:config]
data[:sensors].each do |sensor_type, sensor_data|
# Add metadata
sensor_data[:metadata] = {
processed_at: Time.now.iso8601,
processor: processor[:name],
location: config[:location],
unit: sensor_data[:unit]
}
# Add derived values
if config[:derived_values]
sensor_data[:derived] = calculate_derived_values(sensor_data[:value], config[:derived_values])
end
end
data
end
def apply_aggregators(data)
aggregated_data = {}
@aggregators.each do |aggregator_name, aggregator|
result = apply_aggregator(aggregator, data)
aggregated_data.merge!(result) if result
end
aggregated_data
end
def apply_aggregator(aggregator, data)
case aggregator[:type]
when 'average'
calculate_average(aggregator, data)
when 'min_max'
calculate_min_max(aggregator, data)
when 'count'
calculate_count(aggregator, data)
when 'sum'
calculate_sum(aggregator, data)
end
end
def calculate_average(aggregator, data)
config = aggregator[:config]
sensor_type = config[:sensor_type]
values = data[:sensors][sensor_type]&.dig(:value)
return nil unless values
{
"#{sensor_type}_average" => values.sum / values.length
}
end
def calculate_min_max(aggregator, data)
config = aggregator[:config]
sensor_type = config[:sensor_type]
values = data[:sensors][sensor_type]&.dig(:value)
return nil unless values
{
"#{sensor_type}_min" => values.min,
"#{sensor_type}_max" => values.max
}
end
def calculate_count(aggregator, data)
config = aggregator[:config]
sensor_type = config[:sensor_type]
count = data[:sensors][sensor_type] ? 1 : 0
{
"#{sensor_type}_count" => count
}
end
def calculate_sum(aggregator, data)
config = aggregator[:config]
sensor_type = config[:sensor_type]
value = data[:sensors][sensor_type]&.dig(:value) || 0
{
"#{sensor_type}_sum" => value
}
end
def store_processed_data(data)
@redis.lpush("processed_sensor_data", data.to_json)
@redis.ltrim("processed_sensor_data", 0, 9999)
end
def get_sensor_data_for_analysis(device_id, time_range)
# Implementation for getting sensor data for analysis
[]
end
def calculate_statistics(sensor_data)
# Implementation for calculating statistics
{}
end
def analyze_trends(sensor_data)
# Implementation for analyzing trends
{}
end
def detect_anomalies(sensor_data)
# Implementation for detecting anomalies
[]
end
def prepare_chart_data(sensor_data, sensor_type)
# Implementation for preparing chart data
[]
end
def generate_data_summary(sensor_data, sensor_type)
# Implementation for generating data summary
{}
end
def convert_units(value, from_unit, to_unit)
# Implementation for unit conversion
value
end
def calculate_derived_values(value, derived_config)
# Implementation for calculating derived values
{}
end
def export_as_json(data)
{
success: true,
format: 'json',
data: data
}
end
def export_as_csv(data)
# Implementation for CSV export
{
success: true,
format: 'csv',
data: 'csv_data_placeholder'
}
end
def export_as_xml(data)
# Implementation for XML export
{
success: true,
format: 'xml',
data: 'xml_data_placeholder'
}
end
end
🎯 Configuration Management
IoT Configuration
config/iot_features.tsk
[iot]
enabled: @env("IOT_ENABLED", "true")
platform: @env("IOT_PLATFORM", "aws_iot")
mqtt_broker: @env("MQTT_BROKER_URL", "mqtt://localhost:1883")
websocket_enabled: @env("IOT_WEBSOCKET_ENABLED", "true")
websocket_port: @env("IOT_WEBSOCKET_PORT", "8080")
ssl_enabled: @env("IOT_SSL_ENABLED", "true")[device_management]
enabled: @env("DEVICE_MANAGEMENT_ENABLED", "true")
auto_registration: @env("DEVICE_AUTO_REGISTRATION", "true")
device_authentication: @env("DEVICE_AUTHENTICATION", "true")
firmware_updates: @env("FIRMWARE_UPDATES_ENABLED", "true")
device_monitoring: @env("DEVICE_MONITORING_ENABLED", "true")
[sensor_data]
enabled: @env("SENSOR_DATA_ENABLED", "true")
data_retention: @env("SENSOR_DATA_RETENTION", "30d")
real_time_processing: @env("REAL_TIME_SENSOR_PROCESSING", "true")
data_aggregation: @env("SENSOR_DATA_AGGREGATION", "true")
data_compression: @env("SENSOR_DATA_COMPRESSION", "true")
[analytics]
enabled: @env("IOT_ANALYTICS_ENABLED", "true")
real_time_analytics: @env("REAL_TIME_ANALYTICS_ENABLED", "true")
predictive_analytics: @env("PREDICTIVE_ANALYTICS_ENABLED", "true")
machine_learning: @env("IOT_ML_ENABLED", "true")
[security]
encryption_enabled: @env("IOT_ENCRYPTION_ENABLED", "true")
certificate_management: @env("CERTIFICATE_MANAGEMENT_ENABLED", "true")
access_control: @env("IOT_ACCESS_CONTROL_ENABLED", "true")
audit_logging: @env("IOT_AUDIT_LOGGING_ENABLED", "true")
[monitoring]
device_monitoring: @env("DEVICE_MONITORING_ENABLED", "true")
performance_monitoring: @env("IOT_PERFORMANCE_MONITORING_ENABLED", "true")
alerting_enabled: @env("IOT_ALERTING_ENABLED", "true")
dashboard_enabled: @env("IOT_DASHBOARD_ENABLED", "true")
🎯 Summary
This comprehensive guide covers IoT integration with TuskLang and Ruby, including:
- IoT Platform: Complete device management and connectivity - Sensor Data Processing: Real-time data processing and analytics - Device Management: Device registration, authentication, and firmware updates - Data Analytics: Real-time analytics and data visualization - Configuration Management: Enterprise-grade IoT configuration - Security Features: Device authentication and data encryption - Monitoring: Device and performance monitoring capabilities
The IoT features with TuskLang provide a robust foundation for building applications that connect physical devices to digital systems, enabling real-time data processing, device management, and intelligent analytics.