💎 Data Governance with TuskLang and Ruby

Ruby Documentation

Data Governance with TuskLang and Ruby

🏛️ Govern Your Data with Precision and Control

TuskLang enables comprehensive data governance for Ruby applications, providing data classification, lineage tracking, quality management, and compliance frameworks. Build applications that maintain data integrity, traceability, and regulatory compliance.

🚀 Quick Start: Data Classification

Basic Data Governance Configuration

config/data_governance.tsk

[data_governance] enabled: @env("DATA_GOVERNANCE_ENABLED", "true") organization: @env("ORGANIZATION_NAME", "Acme Corp") data_steward: @env("DATA_STEWARD", "data.team@acme.com") retention_policy: @env("DATA_RETENTION_POLICY", "7y")

[classification] auto_classification: @env("AUTO_CLASSIFICATION_ENABLED", "true") sensitive_patterns: @env("SENSITIVE_PATTERNS", "ssn,credit_card,email,phone") classification_levels: @env("CLASSIFICATION_LEVELS", "public,internal,confidential,restricted")

[lineage] tracking_enabled: @env("LINEAGE_TRACKING_ENABLED", "true") retention_period: @env("LINEAGE_RETENTION_PERIOD", "10y") real_time_tracking: @env("REAL_TIME_LINEAGE_TRACKING", "true")

[quality] monitoring_enabled: @env("QUALITY_MONITORING_ENABLED", "true") validation_rules: @env("QUALITY_VALIDATION_RULES", "required,format,range,uniqueness") quality_threshold: @env("QUALITY_THRESHOLD", "95")

Data Classification Engine

lib/data_classification_engine.rb

require 'tusk' require 'redis' require 'json' require 'regexp'

class DataClassificationEngine def initialize(config_path = 'config/data_governance.tsk') @config = Tusk.load(config_path) @redis = Redis.new(url: @config['redis']['url']) setup_classification_rules end

def classify_data(data, context = {}) return { classification: 'public' } unless @config['data_governance']['enabled'] == 'true'

classification_result = { data_id: generate_data_id(data), classification: determine_classification(data, context), confidence: calculate_confidence(data, context), patterns_found: find_sensitive_patterns(data), classification_reason: determine_classification_reason(data, context), timestamp: Time.now.iso8601, context: context }

store_classification_result(classification_result) apply_classification_policies(classification_result) classification_result end

def classify_database_table(table_name, columns) table_classification = { table_name: table_name, overall_classification: 'public', column_classifications: {}, timestamp: Time.now.iso8601 }

columns.each do |column| column_classification = classify_column(column) table_classification[:column_classifications][column[:name]] = column_classification # Determine overall table classification based on highest sensitivity if column_classification[:classification] == 'restricted' table_classification[:overall_classification] = 'restricted' elsif column_classification[:classification] == 'confidential' && table_classification[:overall_classification] != 'restricted' table_classification[:overall_classification] = 'confidential' elsif column_classification[:classification] == 'internal' && table_classification[:overall_classification] == 'public' table_classification[:overall_classification] = 'internal' end end

store_table_classification(table_classification) table_classification end

def classify_file(file_path, content = nil) file_classification = { file_path: file_path, file_type: determine_file_type(file_path), content_classification: nil, metadata_classification: classify_file_metadata(file_path), timestamp: Time.now.iso8601 }

if content file_classification[:content_classification] = classify_data(content) end

# Determine overall file classification classifications = [ file_classification[:content_classification]&.dig(:classification), file_classification[:metadata_classification]&.dig(:classification) ].compact

file_classification[:overall_classification] = determine_highest_classification(classifications) store_file_classification(file_classification) file_classification end

def get_classification_policies(classification_level) policies = { 'public' => { access_control: 'open', encryption: false, audit_logging: false, retention_period: '1y' }, 'internal' => { access_control: 'authenticated', encryption: false, audit_logging: true, retention_period: '3y' }, 'confidential' => { access_control: 'authorized', encryption: true, audit_logging: true, retention_period: '7y' }, 'restricted' => { access_control: 'strict', encryption: true, audit_logging: true, retention_period: '10y', additional_controls: ['mfa_required', 'data_masking'] } }

policies[classification_level] || policies['public'] end

def update_classification(data_id, new_classification, reason) classification_update = { data_id: data_id, previous_classification: get_current_classification(data_id), new_classification: new_classification, reason: reason, updated_by: get_current_user, timestamp: Time.now.iso8601 }

store_classification_update(classification_update) apply_classification_policies({ classification: new_classification }) classification_update end

private

def setup_classification_rules @sensitive_patterns = { ssn: /\b\d{3}-\d{2}-\d{4}\b/, credit_card: /\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b/, email: /\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/, phone: /\b\d{3}[-.]?\d{3}[-.]?\d{4}\b/, ip_address: /\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b/, api_key: /\b[A-Za-z0-9]{32,}\b/, password: /\bpassword\s[:=]\s\S+/i }

@classification_keywords = { 'restricted' => ['secret', 'password', 'key', 'token', 'credential'], 'confidential' => ['private', 'personal', 'sensitive', 'internal'], 'internal' => ['company', 'business', 'corporate', 'employee'], 'public' => ['public', 'general', 'open', 'shared'] } end

def determine_classification(data, context) data_string = data.to_s.downcase

# Check for restricted patterns if find_restricted_patterns(data_string).any? return 'restricted' end

# Check for confidential patterns if find_confidential_patterns(data_string).any? return 'confidential' end

# Check for internal patterns if find_internal_patterns(data_string).any? return 'internal' end

# Check context for additional classification hints if context[:business_critical] || context[:financial_data] return 'confidential' end

'public' end

def calculate_confidence(data, context) patterns_found = find_sensitive_patterns(data) keyword_matches = find_keyword_matches(data) context_score = calculate_context_score(context)

# Weighted confidence calculation pattern_score = patterns_found.length * 0.4 keyword_score = keyword_matches.length * 0.3 context_score = context_score * 0.3

total_score = pattern_score + keyword_score + context_score [total_score * 100, 100].min.round(2) end

def find_sensitive_patterns(data) data_string = data.to_s found_patterns = []

@sensitive_patterns.each do |pattern_name, pattern| if data_string.match?(pattern) found_patterns << { pattern: pattern_name, matches: data_string.scan(pattern).length } end end

found_patterns end

def find_restricted_patterns(data) data.scan(/secret|password|key|token|credential/i) end

def find_confidential_patterns(data) data.scan(/private|personal|sensitive|internal/i) end

def find_internal_patterns(data) data.scan(/company|business|corporate|employee/i) end

def find_keyword_matches(data) data_string = data.to_s.downcase matches = []

@classification_keywords.each do |classification, keywords| keywords.each do |keyword| if data_string.include?(keyword) matches << { classification: classification, keyword: keyword } end end end

matches end

def calculate_context_score(context) score = 0 score += 0.5 if context[:business_critical] score += 0.3 if context[:financial_data] score += 0.2 if context[:personal_data] score += 0.1 if context[:internal_use] score end

def determine_classification_reason(data, context) patterns = find_sensitive_patterns(data) keywords = find_keyword_matches(data)

reasons = [] reasons << "Contains sensitive patterns: #{patterns.map { |p| p[:pattern] }.join(', ')}" if patterns.any? reasons << "Contains classification keywords: #{keywords.map { |k| k[:keyword] }.join(', ')}" if keywords.any? reasons << "Business critical data" if context[:business_critical] reasons << "Financial data" if context[:financial_data]

reasons.join('; ') end

def classify_column(column) { name: column[:name], classification: determine_classification(column[:name], {}), data_type: column[:type], nullable: column[:nullable], unique: column[:unique], patterns_found: find_sensitive_patterns(column[:name]) } end

def determine_file_type(file_path) extension = File.extname(file_path).downcase case extension when '.csv', '.json', '.xml' 'data' when '.log' 'log' when '.txt', '.md' 'text' when '.jpg', '.png', '.gif' 'image' when '.pdf' 'document' else 'unknown' end end

def classify_file_metadata(file_path) { classification: 'internal', reason: 'File metadata classification', patterns_found: [] } end

def determine_highest_classification(classifications) priority = { 'restricted' => 4, 'confidential' => 3, 'internal' => 2, 'public' => 1 } highest = classifications.max_by { |c| priority[c] || 0 } highest || 'public' end

def generate_data_id(data) Digest::SHA256.hexdigest(data.to_s) end

def store_classification_result(result) @redis.hset('data_classifications', result[:data_id], result.to_json) end

def store_table_classification(classification) @redis.hset('table_classifications', classification[:table_name], classification.to_json) end

def store_file_classification(classification) @redis.hset('file_classifications', classification[:file_path], classification.to_json) end

def store_classification_update(update) @redis.lpush('classification_updates', update.to_json) @redis.ltrim('classification_updates', 0, 9999) end

def get_current_classification(data_id) classification_data = @redis.hget('data_classifications', data_id) return 'public' unless classification_data

JSON.parse(classification_data)['classification'] end

def get_current_user # Implementation to get current user 'system' end

def apply_classification_policies(classification_result) policies = get_classification_policies(classification_result[:classification]) # Apply encryption if required if policies[:encryption] apply_encryption(classification_result[:data_id]) end

# Apply access controls apply_access_controls(classification_result[:data_id], policies[:access_control])

# Enable audit logging if policies[:audit_logging] enable_audit_logging(classification_result[:data_id]) end end

def apply_encryption(data_id) # Implementation to apply encryption end

def apply_access_controls(data_id, access_control) # Implementation to apply access controls end

def enable_audit_logging(data_id) # Implementation to enable audit logging end end

🔗 Data Lineage Tracking

Data Lineage Tracker

lib/data_lineage_tracker.rb

require 'tusk' require 'redis' require 'json'

class DataLineageTracker def initialize(config_path = 'config/data_governance.tsk') @config = Tusk.load(config_path) @redis = Redis.new(url: @config['redis']['url']) end

def track_data_flow(source_id, target_id, transformation = nil, context = {}) return unless @config['lineage']['tracking_enabled'] == 'true'

lineage_record = { id: SecureRandom.uuid, source_id: source_id, target_id: target_id, transformation: transformation, context: context, timestamp: Time.now.iso8601, user: get_current_user, session_id: get_session_id }

store_lineage_record(lineage_record) update_lineage_graph(source_id, target_id, lineage_record) lineage_record end

def track_database_operation(operation_type, table_name, query, affected_rows = nil) lineage_record = { id: SecureRandom.uuid, operation_type: operation_type, table_name: table_name, query: sanitize_query(query), affected_rows: affected_rows, timestamp: Time.now.iso8601, user: get_current_user, session_id: get_session_id }

store_database_lineage(lineage_record) lineage_record end

def track_file_operation(operation_type, file_path, source_path = nil) lineage_record = { id: SecureRandom.uuid, operation_type: operation_type, file_path: file_path, source_path: source_path, timestamp: Time.now.iso8601, user: get_current_user, session_id: get_session_id }

store_file_lineage(lineage_record) lineage_record end

def get_data_lineage(data_id, depth = 5) lineage = { data_id: data_id, upstream: get_upstream_lineage(data_id, depth), downstream: get_downstream_lineage(data_id, depth), transformations: get_transformations(data_id), metadata: get_lineage_metadata(data_id) }

lineage end

def get_lineage_graph(start_date = nil, end_date = nil) lineage_records = get_lineage_records_in_range(start_date, end_date) graph = { nodes: extract_nodes(lineage_records), edges: extract_edges(lineage_records), metadata: { total_nodes: 0, total_edges: 0, date_range: { start: start_date, end: end_date } } }

graph[:metadata][:total_nodes] = graph[:nodes].length graph[:metadata][:total_edges] = graph[:edges].length graph end

def analyze_data_impact(data_id) impact_analysis = { data_id: data_id, direct_dependents: get_direct_dependents(data_id), indirect_dependents: get_indirect_dependents(data_id), impact_score: calculate_impact_score(data_id), risk_assessment: assess_impact_risk(data_id), recommendations: generate_impact_recommendations(data_id) }

impact_analysis end

def generate_lineage_report(start_date, end_date, report_type = 'comprehensive') case report_type when 'comprehensive' generate_comprehensive_lineage_report(start_date, end_date) when 'data_flow' generate_data_flow_report(start_date, end_date) when 'transformation' generate_transformation_report(start_date, end_date) when 'compliance' generate_compliance_lineage_report(start_date, end_date) else raise ArgumentError, "Unknown report type: #{report_type}" end end

def validate_data_lineage(data_id) lineage = get_data_lineage(data_id) validation_result = { data_id: data_id, validation_status: 'valid', issues: [], recommendations: [] }

# Check for broken lineage if lineage[:upstream].empty? && lineage[:downstream].empty? validation_result[:validation_status] = 'warning' validation_result[:issues] << 'No lineage information found' end

# Check for circular dependencies if has_circular_dependencies(data_id) validation_result[:validation_status] = 'error' validation_result[:issues] << 'Circular dependencies detected' end

# Check for orphaned data if is_orphaned_data(data_id) validation_result[:validation_status] = 'warning' validation_result[:issues] << 'Orphaned data detected' end

validation_result end

private

def store_lineage_record(record) @redis.lpush('data_lineage', record.to_json) @redis.ltrim('data_lineage', 0, 99999) end

def store_database_lineage(record) @redis.lpush('database_lineage', record.to_json) @redis.ltrim('database_lineage', 0, 99999) end

def store_file_lineage(record) @redis.lpush('file_lineage', record.to_json) @redis.ltrim('file_lineage', 0, 99999) end

def update_lineage_graph(source_id, target_id, record) # Update graph relationships @redis.sadd("lineage:upstream:#{target_id}", source_id) @redis.sadd("lineage:downstream:#{source_id}", target_id) @redis.hset("lineage:relationships", "#{source_id}:#{target_id}", record.to_json) end

def get_upstream_lineage(data_id, depth) upstream = [] visited = Set.new queue = [[data_id, 0]]

while queue.any? current_id, current_depth = queue.shift next if current_depth >= depth || visited.include?(current_id)

visited.add(current_id) upstream_sources = @redis.smembers("lineage:upstream:#{current_id}")

upstream_sources.each do |source_id| upstream << { data_id: source_id, depth: current_depth, relationship: get_relationship_details(current_id, source_id) } queue << [source_id, current_depth + 1] end end

upstream end

def get_downstream_lineage(data_id, depth) downstream = [] visited = Set.new queue = [[data_id, 0]]

while queue.any? current_id, current_depth = queue.shift next if current_depth >= depth || visited.include?(current_id)

visited.add(current_id) downstream_targets = @redis.smembers("lineage:downstream:#{current_id}")

downstream_targets.each do |target_id| downstream << { data_id: target_id, depth: current_depth, relationship: get_relationship_details(current_id, target_id) } queue << [target_id, current_depth + 1] end end

downstream end

def get_transformations(data_id) transformations = [] lineage_records = @redis.lrange('data_lineage', 0, -1)

lineage_records.each do |record| parsed_record = JSON.parse(record) if parsed_record['source_id'] == data_id || parsed_record['target_id'] == data_id transformations << parsed_record['transformation'] if parsed_record['transformation'] end end

transformations.uniq end

def get_lineage_metadata(data_id) { created_at: get_data_creation_time(data_id), last_modified: get_data_last_modified(data_id), classification: get_data_classification(data_id), owner: get_data_owner(data_id) } end

def get_lineage_records_in_range(start_date, end_date) records = @redis.lrange('data_lineage', 0, -1) if start_date && end_date records.select do |record| timestamp = Time.parse(JSON.parse(record)['timestamp']) timestamp >= Time.parse(start_date) && timestamp <= Time.parse(end_date) end else records end end

def extract_nodes(records) nodes = Set.new

records.each do |record| parsed_record = JSON.parse(record) nodes.add(parsed_record['source_id']) nodes.add(parsed_record['target_id']) end

nodes.map { |node_id| { id: node_id, type: determine_node_type(node_id) } } end

def extract_edges(records) records.map do |record| parsed_record = JSON.parse(record) { source: parsed_record['source_id'], target: parsed_record['target_id'], transformation: parsed_record['transformation'], timestamp: parsed_record['timestamp'] } end end

def get_direct_dependents(data_id) @redis.smembers("lineage:downstream:#{data_id}") end

def get_indirect_dependents(data_id) indirect = Set.new queue = [data_id]

while queue.any? current_id = queue.shift direct_dependents = @redis.smembers("lineage:downstream:#{current_id}")

direct_dependents.each do |dependent_id| unless indirect.include?(dependent_id) indirect.add(dependent_id) queue << dependent_id end end end

indirect.to_a end

def calculate_impact_score(data_id) direct_count = get_direct_dependents(data_id).length indirect_count = get_indirect_dependents(data_id).length # Weighted impact score (direct_count 0.7 + indirect_count 0.3).round(2) end

def assess_impact_risk(data_id) impact_score = calculate_impact_score(data_id) case impact_score when 0..5 'low' when 6..20 'medium' when 21..50 'high' else 'critical' end end

def generate_impact_recommendations(data_id) recommendations = [] impact_score = calculate_impact_score(data_id)

if impact_score > 50 recommendations << "High impact data - consider implementing additional safeguards" end

if impact_score > 20 recommendations << "Medium impact data - review access controls and monitoring" end

recommendations end

def has_circular_dependencies(data_id) visited = Set.new rec_stack = Set.new

has_circular_dependencies_dfs(data_id, visited, rec_stack) end

def has_circular_dependencies_dfs(node, visited, rec_stack) visited.add(node) rec_stack.add(node)

downstream = @redis.smembers("lineage:downstream:#{node}") downstream.each do |neighbor| if !visited.include?(neighbor) return true if has_circular_dependencies_dfs(neighbor, visited, rec_stack) elsif rec_stack.include?(neighbor) return true end end

rec_stack.delete(node) false end

def is_orphaned_data(data_id) upstream = @redis.smembers("lineage:upstream:#{data_id}") downstream = @redis.smembers("lineage:downstream:#{data_id}") upstream.empty? && downstream.empty? end

def get_relationship_details(source_id, target_id) relationship_data = @redis.hget("lineage:relationships", "#{source_id}:#{target_id}") return {} unless relationship_data

JSON.parse(relationship_data) end

def sanitize_query(query) # Remove sensitive information from queries query.gsub(/\b(password|secret|key|token)\s[:=]\s['"][^'"]*['"]/i, '\1 = [REDACTED]') end

def get_current_user # Implementation to get current user 'system' end

def get_session_id # Implementation to get session ID SecureRandom.uuid end

def determine_node_type(node_id) # Implementation to determine node type 'data' end

def get_data_creation_time(data_id) # Implementation to get data creation time Time.now.iso8601 end

def get_data_last_modified(data_id) # Implementation to get data last modified time Time.now.iso8601 end

def get_data_classification(data_id) # Implementation to get data classification 'internal' end

def get_data_owner(data_id) # Implementation to get data owner 'unknown' end

# Report generation methods def generate_comprehensive_lineage_report(start_date, end_date) { report_type: 'comprehensive', start_date: start_date, end_date: end_date, generated_at: Time.now.iso8601, lineage_graph: get_lineage_graph(start_date, end_date), summary: generate_lineage_summary(start_date, end_date) } end

def generate_data_flow_report(start_date, end_date) { report_type: 'data_flow', start_date: start_date, end_date: end_date, generated_at: Time.now.iso8601, data_flows: analyze_data_flows(start_date, end_date) } end

def generate_transformation_report(start_date, end_date) { report_type: 'transformation', start_date: start_date, end_date: end_date, generated_at: Time.now.iso8601, transformations: analyze_transformations(start_date, end_date) } end

def generate_compliance_lineage_report(start_date, end_date) { report_type: 'compliance', start_date: start_date, end_date: end_date, generated_at: Time.now.iso8601, compliance_status: assess_lineage_compliance(start_date, end_date) } end

def generate_lineage_summary(start_date, end_date) # Implementation to generate lineage summary {} end

def analyze_data_flows(start_date, end_date) # Implementation to analyze data flows [] end

def analyze_transformations(start_date, end_date) # Implementation to analyze transformations [] end

def assess_lineage_compliance(start_date, end_date) # Implementation to assess lineage compliance {} end end

📊 Data Quality Management

Data Quality Manager

lib/data_quality_manager.rb

require 'tusk' require 'redis' require 'json'

class DataQualityManager def initialize(config_path = 'config/data_governance.tsk') @config = Tusk.load(config_path) @redis = Redis.new(url: @config['redis']['url']) setup_quality_rules end

def validate_data_quality(data, rules = nil) rules ||= get_default_quality_rules validation_results = []

rules.each do |rule| result = apply_quality_rule(data, rule) validation_results << result end

quality_score = calculate_quality_score(validation_results) quality_report = { data_id: generate_data_id(data), quality_score: quality_score, validation_results: validation_results, passed: quality_score >= @config['quality']['quality_threshold'].to_f, timestamp: Time.now.iso8601 }

store_quality_report(quality_report) quality_report end

def monitor_data_quality(dataset_id, schedule = 'daily') monitoring_config = { dataset_id: dataset_id, schedule: schedule, rules: get_dataset_quality_rules(dataset_id), threshold: @config['quality']['quality_threshold'].to_f, notifications: get_quality_notifications(dataset_id) }

store_monitoring_config(monitoring_config) schedule_quality_monitoring(monitoring_config) end

def generate_quality_report(dataset_id, start_date = nil, end_date = nil) quality_reports = get_quality_reports(dataset_id, start_date, end_date) report = { dataset_id: dataset_id, period: { start: start_date, end: end_date }, generated_at: Time.now.iso8601, summary: generate_quality_summary(quality_reports), trends: analyze_quality_trends(quality_reports), issues: identify_quality_issues(quality_reports), recommendations: generate_quality_recommendations(quality_reports) }

store_generated_report(report) report end

def fix_data_quality_issues(dataset_id, issues) fixes_applied = []

issues.each do |issue| fix_result = apply_data_fix(dataset_id, issue) fixes_applied << fix_result if fix_result[:success] end

{ dataset_id: dataset_id, fixes_applied: fixes_applied, total_issues: issues.length, successful_fixes: fixes_applied.length, timestamp: Time.now.iso8601 } end

private

def setup_quality_rules @quality_rules = { 'required' => ->(data, field) { !data[field].nil? && !data[field].to_s.empty? }, 'format' => ->(data, field, format) { validate_format(data[field], format) }, 'range' => ->(data, field, min, max) { validate_range(data[field], min, max) }, 'uniqueness' => ->(data, field) { validate_uniqueness(data, field) }, 'completeness' => ->(data, fields) { validate_completeness(data, fields) }, 'consistency' => ->(data, rules) { validate_consistency(data, rules) }, 'accuracy' => ->(data, field, reference) { validate_accuracy(data, field, reference) } } end

def apply_quality_rule(data, rule) rule_type = rule[:type] field = rule[:field] parameters = rule[:parameters] || {}

begin validation_function = @quality_rules[rule_type] if validation_function passed = validation_function.call(data, field, *parameters.values) { rule_type: rule_type, field: field, passed: passed, error_message: passed ? nil : rule[:error_message], severity: rule[:severity] || 'medium' } else { rule_type: rule_type, field: field, passed: false, error_message: "Unknown rule type: #{rule_type}", severity: 'high' } end rescue => e { rule_type: rule_type, field: field, passed: false, error_message: "Validation error: #{e.message}", severity: 'high' } end end

def validate_format(value, format) return true if value.nil? || value.to_s.empty?

case format when 'email' value.match?(/\A[\w+\-.]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+\z/i) when 'phone' value.match?(/\b\d{3}[-.]?\d{3}[-.]?\d{4}\b/) when 'date' begin Date.parse(value.to_s) true rescue false end when 'numeric' value.to_s.match?(/^\d+(\.\d+)?$/) when 'url' begin URI.parse(value.to_s) true rescue false end else true end end

def validate_range(value, min, max) return true if value.nil?

numeric_value = value.to_f numeric_value >= min.to_f && numeric_value <= max.to_f end

def validate_uniqueness(data, field) # This would need to be implemented based on your data storage true end

def validate_completeness(data, fields) fields.all? { |field| !data[field].nil? && !data[field].to_s.empty? } end

def validate_consistency(data, rules) # Implementation for consistency validation true end

def validate_accuracy(data, field, reference) # Implementation for accuracy validation true end

def calculate_quality_score(validation_results) return 100 if validation_results.empty?

passed_rules = validation_results.count { |result| result[:passed] } total_rules = validation_results.length

(passed_rules.to_f / total_rules * 100).round(2) end

def generate_data_id(data) Digest::SHA256.hexdigest(data.to_json) end

def store_quality_report(report) @redis.lpush("quality_reports:#{report[:data_id]}", report.to_json) @redis.ltrim("quality_reports:#{report[:data_id]}", 0, 99) end

def get_default_quality_rules [ { type: 'required', field: 'id', error_message: 'ID is required' }, { type: 'format', field: 'email', parameters: { format: 'email' }, error_message: 'Invalid email format' }, { type: 'range', field: 'age', parameters: { min: 0, max: 150 }, error_message: 'Age must be between 0 and 150' } ] end

def get_dataset_quality_rules(dataset_id) # Implementation to get dataset-specific quality rules get_default_quality_rules end

def get_quality_notifications(dataset_id) # Implementation to get quality notifications [] end

def store_monitoring_config(config) @redis.hset('quality_monitoring', config[:dataset_id], config.to_json) end

def schedule_quality_monitoring(config) # Implementation to schedule quality monitoring end

def get_quality_reports(dataset_id, start_date, end_date) # Implementation to get quality reports [] end

def generate_quality_summary(reports) # Implementation to generate quality summary {} end

def analyze_quality_trends(reports) # Implementation to analyze quality trends {} end

def identify_quality_issues(reports) # Implementation to identify quality issues [] end

def generate_quality_recommendations(reports) # Implementation to generate quality recommendations [] end

def store_generated_report(report) @redis.hset('generated_quality_reports', report[:dataset_id], report.to_json) end

def apply_data_fix(dataset_id, issue) # Implementation to apply data fixes { success: true, issue: issue, fix_applied: 'placeholder' } end end

🎯 Configuration Management

Data Governance Configuration

config/data_governance_features.tsk

[data_governance] enabled: @env("DATA_GOVERNANCE_ENABLED", "true") organization: @env("ORGANIZATION_NAME", "Acme Corp") data_steward: @env("DATA_STEWARD", "data.team@acme.com") retention_policy: @env("DATA_RETENTION_POLICY", "7y")

[classification] auto_classification: @env("AUTO_CLASSIFICATION_ENABLED", "true") sensitive_patterns: @env("SENSITIVE_PATTERNS", "ssn,credit_card,email,phone") classification_levels: @env("CLASSIFICATION_LEVELS", "public,internal,confidential,restricted") confidence_threshold: @env("CLASSIFICATION_CONFIDENCE_THRESHOLD", "80")

[lineage] tracking_enabled: @env("LINEAGE_TRACKING_ENABLED", "true") retention_period: @env("LINEAGE_RETENTION_PERIOD", "10y") real_time_tracking: @env("REAL_TIME_LINEAGE_TRACKING", "true") graph_depth_limit: @env("LINEAGE_GRAPH_DEPTH_LIMIT", "10")

[quality] monitoring_enabled: @env("QUALITY_MONITORING_ENABLED", "true") validation_rules: @env("QUALITY_VALIDATION_RULES", "required,format,range,uniqueness") quality_threshold: @env("QUALITY_THRESHOLD", "95") monitoring_schedule: @env("QUALITY_MONITORING_SCHEDULE", "daily")

[compliance] gdpr_enabled: @env("GDPR_ENABLED", "true") data_retention_policy: @env("DATA_RETENTION_POLICY", "7y") privacy_controls: @env("PRIVACY_CONTROLS_ENABLED", "true") consent_management: @env("CONSENT_MANAGEMENT_ENABLED", "true") data_subject_rights: @env("DATA_SUBJECT_RIGHTS_ENABLED", "true")

[monitoring] quality_monitoring: @env("QUALITY_MONITORING_ENABLED", "true") lineage_monitoring: @env("LINEAGE_MONITORING_ENABLED", "true") classification_monitoring: @env("CLASSIFICATION_MONITORING_ENABLED", "true") alerting_enabled: @env("DATA_GOVERNANCE_ALERTING_ENABLED", "true")

🎯 Summary

This comprehensive guide covers data governance with TuskLang and Ruby, including:

- Data Classification: Automatic classification of data based on content and context - Data Lineage Tracking: Comprehensive tracking of data flow and transformations - Data Quality Management: Quality validation, monitoring, and issue resolution - Configuration Management: Enterprise-grade data governance configuration - Compliance Frameworks: GDPR and other regulatory compliance features - Monitoring and Alerting: Real-time monitoring of data governance metrics

The data governance features with TuskLang provide a robust foundation for building applications that maintain data integrity, traceability, and regulatory compliance while ensuring data quality and proper classification.