💎 Data Governance with TuskLang and Ruby
Data Governance with TuskLang and Ruby
🏛️ Govern Your Data with Precision and Control
TuskLang enables comprehensive data governance for Ruby applications, providing data classification, lineage tracking, quality management, and compliance frameworks. Build applications that maintain data integrity, traceability, and regulatory compliance.
🚀 Quick Start: Data Classification
Basic Data Governance Configuration
config/data_governance.tsk
[data_governance]
enabled: @env("DATA_GOVERNANCE_ENABLED", "true")
organization: @env("ORGANIZATION_NAME", "Acme Corp")
data_steward: @env("DATA_STEWARD", "data.team@acme.com")
retention_policy: @env("DATA_RETENTION_POLICY", "7y")[classification]
auto_classification: @env("AUTO_CLASSIFICATION_ENABLED", "true")
sensitive_patterns: @env("SENSITIVE_PATTERNS", "ssn,credit_card,email,phone")
classification_levels: @env("CLASSIFICATION_LEVELS", "public,internal,confidential,restricted")
[lineage]
tracking_enabled: @env("LINEAGE_TRACKING_ENABLED", "true")
retention_period: @env("LINEAGE_RETENTION_PERIOD", "10y")
real_time_tracking: @env("REAL_TIME_LINEAGE_TRACKING", "true")
[quality]
monitoring_enabled: @env("QUALITY_MONITORING_ENABLED", "true")
validation_rules: @env("QUALITY_VALIDATION_RULES", "required,format,range,uniqueness")
quality_threshold: @env("QUALITY_THRESHOLD", "95")
Data Classification Engine
lib/data_classification_engine.rb
require 'tusk'
require 'redis'
require 'json'
require 'regexp'class DataClassificationEngine
def initialize(config_path = 'config/data_governance.tsk')
@config = Tusk.load(config_path)
@redis = Redis.new(url: @config['redis']['url'])
setup_classification_rules
end
def classify_data(data, context = {})
return { classification: 'public' } unless @config['data_governance']['enabled'] == 'true'
classification_result = {
data_id: generate_data_id(data),
classification: determine_classification(data, context),
confidence: calculate_confidence(data, context),
patterns_found: find_sensitive_patterns(data),
classification_reason: determine_classification_reason(data, context),
timestamp: Time.now.iso8601,
context: context
}
store_classification_result(classification_result)
apply_classification_policies(classification_result)
classification_result
end
def classify_database_table(table_name, columns)
table_classification = {
table_name: table_name,
overall_classification: 'public',
column_classifications: {},
timestamp: Time.now.iso8601
}
columns.each do |column|
column_classification = classify_column(column)
table_classification[:column_classifications][column[:name]] = column_classification
# Determine overall table classification based on highest sensitivity
if column_classification[:classification] == 'restricted'
table_classification[:overall_classification] = 'restricted'
elsif column_classification[:classification] == 'confidential' &&
table_classification[:overall_classification] != 'restricted'
table_classification[:overall_classification] = 'confidential'
elsif column_classification[:classification] == 'internal' &&
table_classification[:overall_classification] == 'public'
table_classification[:overall_classification] = 'internal'
end
end
store_table_classification(table_classification)
table_classification
end
def classify_file(file_path, content = nil)
file_classification = {
file_path: file_path,
file_type: determine_file_type(file_path),
content_classification: nil,
metadata_classification: classify_file_metadata(file_path),
timestamp: Time.now.iso8601
}
if content
file_classification[:content_classification] = classify_data(content)
end
# Determine overall file classification
classifications = [
file_classification[:content_classification]&.dig(:classification),
file_classification[:metadata_classification]&.dig(:classification)
].compact
file_classification[:overall_classification] = determine_highest_classification(classifications)
store_file_classification(file_classification)
file_classification
end
def get_classification_policies(classification_level)
policies = {
'public' => {
access_control: 'open',
encryption: false,
audit_logging: false,
retention_period: '1y'
},
'internal' => {
access_control: 'authenticated',
encryption: false,
audit_logging: true,
retention_period: '3y'
},
'confidential' => {
access_control: 'authorized',
encryption: true,
audit_logging: true,
retention_period: '7y'
},
'restricted' => {
access_control: 'strict',
encryption: true,
audit_logging: true,
retention_period: '10y',
additional_controls: ['mfa_required', 'data_masking']
}
}
policies[classification_level] || policies['public']
end
def update_classification(data_id, new_classification, reason)
classification_update = {
data_id: data_id,
previous_classification: get_current_classification(data_id),
new_classification: new_classification,
reason: reason,
updated_by: get_current_user,
timestamp: Time.now.iso8601
}
store_classification_update(classification_update)
apply_classification_policies({ classification: new_classification })
classification_update
end
private
def setup_classification_rules
@sensitive_patterns = {
ssn: /\b\d{3}-\d{2}-\d{4}\b/,
credit_card: /\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b/,
email: /\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/,
phone: /\b\d{3}[-.]?\d{3}[-.]?\d{4}\b/,
ip_address: /\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b/,
api_key: /\b[A-Za-z0-9]{32,}\b/,
password: /\bpassword\s[:=]\s\S+/i
}
@classification_keywords = {
'restricted' => ['secret', 'password', 'key', 'token', 'credential'],
'confidential' => ['private', 'personal', 'sensitive', 'internal'],
'internal' => ['company', 'business', 'corporate', 'employee'],
'public' => ['public', 'general', 'open', 'shared']
}
end
def determine_classification(data, context)
data_string = data.to_s.downcase
# Check for restricted patterns
if find_restricted_patterns(data_string).any?
return 'restricted'
end
# Check for confidential patterns
if find_confidential_patterns(data_string).any?
return 'confidential'
end
# Check for internal patterns
if find_internal_patterns(data_string).any?
return 'internal'
end
# Check context for additional classification hints
if context[:business_critical] || context[:financial_data]
return 'confidential'
end
'public'
end
def calculate_confidence(data, context)
patterns_found = find_sensitive_patterns(data)
keyword_matches = find_keyword_matches(data)
context_score = calculate_context_score(context)
# Weighted confidence calculation
pattern_score = patterns_found.length * 0.4
keyword_score = keyword_matches.length * 0.3
context_score = context_score * 0.3
total_score = pattern_score + keyword_score + context_score
[total_score * 100, 100].min.round(2)
end
def find_sensitive_patterns(data)
data_string = data.to_s
found_patterns = []
@sensitive_patterns.each do |pattern_name, pattern|
if data_string.match?(pattern)
found_patterns << {
pattern: pattern_name,
matches: data_string.scan(pattern).length
}
end
end
found_patterns
end
def find_restricted_patterns(data)
data.scan(/secret|password|key|token|credential/i)
end
def find_confidential_patterns(data)
data.scan(/private|personal|sensitive|internal/i)
end
def find_internal_patterns(data)
data.scan(/company|business|corporate|employee/i)
end
def find_keyword_matches(data)
data_string = data.to_s.downcase
matches = []
@classification_keywords.each do |classification, keywords|
keywords.each do |keyword|
if data_string.include?(keyword)
matches << { classification: classification, keyword: keyword }
end
end
end
matches
end
def calculate_context_score(context)
score = 0
score += 0.5 if context[:business_critical]
score += 0.3 if context[:financial_data]
score += 0.2 if context[:personal_data]
score += 0.1 if context[:internal_use]
score
end
def determine_classification_reason(data, context)
patterns = find_sensitive_patterns(data)
keywords = find_keyword_matches(data)
reasons = []
reasons << "Contains sensitive patterns: #{patterns.map { |p| p[:pattern] }.join(', ')}" if patterns.any?
reasons << "Contains classification keywords: #{keywords.map { |k| k[:keyword] }.join(', ')}" if keywords.any?
reasons << "Business critical data" if context[:business_critical]
reasons << "Financial data" if context[:financial_data]
reasons.join('; ')
end
def classify_column(column)
{
name: column[:name],
classification: determine_classification(column[:name], {}),
data_type: column[:type],
nullable: column[:nullable],
unique: column[:unique],
patterns_found: find_sensitive_patterns(column[:name])
}
end
def determine_file_type(file_path)
extension = File.extname(file_path).downcase
case extension
when '.csv', '.json', '.xml'
'data'
when '.log'
'log'
when '.txt', '.md'
'text'
when '.jpg', '.png', '.gif'
'image'
when '.pdf'
'document'
else
'unknown'
end
end
def classify_file_metadata(file_path)
{
classification: 'internal',
reason: 'File metadata classification',
patterns_found: []
}
end
def determine_highest_classification(classifications)
priority = { 'restricted' => 4, 'confidential' => 3, 'internal' => 2, 'public' => 1 }
highest = classifications.max_by { |c| priority[c] || 0 }
highest || 'public'
end
def generate_data_id(data)
Digest::SHA256.hexdigest(data.to_s)
end
def store_classification_result(result)
@redis.hset('data_classifications', result[:data_id], result.to_json)
end
def store_table_classification(classification)
@redis.hset('table_classifications', classification[:table_name], classification.to_json)
end
def store_file_classification(classification)
@redis.hset('file_classifications', classification[:file_path], classification.to_json)
end
def store_classification_update(update)
@redis.lpush('classification_updates', update.to_json)
@redis.ltrim('classification_updates', 0, 9999)
end
def get_current_classification(data_id)
classification_data = @redis.hget('data_classifications', data_id)
return 'public' unless classification_data
JSON.parse(classification_data)['classification']
end
def get_current_user
# Implementation to get current user
'system'
end
def apply_classification_policies(classification_result)
policies = get_classification_policies(classification_result[:classification])
# Apply encryption if required
if policies[:encryption]
apply_encryption(classification_result[:data_id])
end
# Apply access controls
apply_access_controls(classification_result[:data_id], policies[:access_control])
# Enable audit logging
if policies[:audit_logging]
enable_audit_logging(classification_result[:data_id])
end
end
def apply_encryption(data_id)
# Implementation to apply encryption
end
def apply_access_controls(data_id, access_control)
# Implementation to apply access controls
end
def enable_audit_logging(data_id)
# Implementation to enable audit logging
end
end
🔗 Data Lineage Tracking
Data Lineage Tracker
lib/data_lineage_tracker.rb
require 'tusk'
require 'redis'
require 'json'class DataLineageTracker
def initialize(config_path = 'config/data_governance.tsk')
@config = Tusk.load(config_path)
@redis = Redis.new(url: @config['redis']['url'])
end
def track_data_flow(source_id, target_id, transformation = nil, context = {})
return unless @config['lineage']['tracking_enabled'] == 'true'
lineage_record = {
id: SecureRandom.uuid,
source_id: source_id,
target_id: target_id,
transformation: transformation,
context: context,
timestamp: Time.now.iso8601,
user: get_current_user,
session_id: get_session_id
}
store_lineage_record(lineage_record)
update_lineage_graph(source_id, target_id, lineage_record)
lineage_record
end
def track_database_operation(operation_type, table_name, query, affected_rows = nil)
lineage_record = {
id: SecureRandom.uuid,
operation_type: operation_type,
table_name: table_name,
query: sanitize_query(query),
affected_rows: affected_rows,
timestamp: Time.now.iso8601,
user: get_current_user,
session_id: get_session_id
}
store_database_lineage(lineage_record)
lineage_record
end
def track_file_operation(operation_type, file_path, source_path = nil)
lineage_record = {
id: SecureRandom.uuid,
operation_type: operation_type,
file_path: file_path,
source_path: source_path,
timestamp: Time.now.iso8601,
user: get_current_user,
session_id: get_session_id
}
store_file_lineage(lineage_record)
lineage_record
end
def get_data_lineage(data_id, depth = 5)
lineage = {
data_id: data_id,
upstream: get_upstream_lineage(data_id, depth),
downstream: get_downstream_lineage(data_id, depth),
transformations: get_transformations(data_id),
metadata: get_lineage_metadata(data_id)
}
lineage
end
def get_lineage_graph(start_date = nil, end_date = nil)
lineage_records = get_lineage_records_in_range(start_date, end_date)
graph = {
nodes: extract_nodes(lineage_records),
edges: extract_edges(lineage_records),
metadata: {
total_nodes: 0,
total_edges: 0,
date_range: { start: start_date, end: end_date }
}
}
graph[:metadata][:total_nodes] = graph[:nodes].length
graph[:metadata][:total_edges] = graph[:edges].length
graph
end
def analyze_data_impact(data_id)
impact_analysis = {
data_id: data_id,
direct_dependents: get_direct_dependents(data_id),
indirect_dependents: get_indirect_dependents(data_id),
impact_score: calculate_impact_score(data_id),
risk_assessment: assess_impact_risk(data_id),
recommendations: generate_impact_recommendations(data_id)
}
impact_analysis
end
def generate_lineage_report(start_date, end_date, report_type = 'comprehensive')
case report_type
when 'comprehensive'
generate_comprehensive_lineage_report(start_date, end_date)
when 'data_flow'
generate_data_flow_report(start_date, end_date)
when 'transformation'
generate_transformation_report(start_date, end_date)
when 'compliance'
generate_compliance_lineage_report(start_date, end_date)
else
raise ArgumentError, "Unknown report type: #{report_type}"
end
end
def validate_data_lineage(data_id)
lineage = get_data_lineage(data_id)
validation_result = {
data_id: data_id,
validation_status: 'valid',
issues: [],
recommendations: []
}
# Check for broken lineage
if lineage[:upstream].empty? && lineage[:downstream].empty?
validation_result[:validation_status] = 'warning'
validation_result[:issues] << 'No lineage information found'
end
# Check for circular dependencies
if has_circular_dependencies(data_id)
validation_result[:validation_status] = 'error'
validation_result[:issues] << 'Circular dependencies detected'
end
# Check for orphaned data
if is_orphaned_data(data_id)
validation_result[:validation_status] = 'warning'
validation_result[:issues] << 'Orphaned data detected'
end
validation_result
end
private
def store_lineage_record(record)
@redis.lpush('data_lineage', record.to_json)
@redis.ltrim('data_lineage', 0, 99999)
end
def store_database_lineage(record)
@redis.lpush('database_lineage', record.to_json)
@redis.ltrim('database_lineage', 0, 99999)
end
def store_file_lineage(record)
@redis.lpush('file_lineage', record.to_json)
@redis.ltrim('file_lineage', 0, 99999)
end
def update_lineage_graph(source_id, target_id, record)
# Update graph relationships
@redis.sadd("lineage:upstream:#{target_id}", source_id)
@redis.sadd("lineage:downstream:#{source_id}", target_id)
@redis.hset("lineage:relationships", "#{source_id}:#{target_id}", record.to_json)
end
def get_upstream_lineage(data_id, depth)
upstream = []
visited = Set.new
queue = [[data_id, 0]]
while queue.any?
current_id, current_depth = queue.shift
next if current_depth >= depth || visited.include?(current_id)
visited.add(current_id)
upstream_sources = @redis.smembers("lineage:upstream:#{current_id}")
upstream_sources.each do |source_id|
upstream << {
data_id: source_id,
depth: current_depth,
relationship: get_relationship_details(current_id, source_id)
}
queue << [source_id, current_depth + 1]
end
end
upstream
end
def get_downstream_lineage(data_id, depth)
downstream = []
visited = Set.new
queue = [[data_id, 0]]
while queue.any?
current_id, current_depth = queue.shift
next if current_depth >= depth || visited.include?(current_id)
visited.add(current_id)
downstream_targets = @redis.smembers("lineage:downstream:#{current_id}")
downstream_targets.each do |target_id|
downstream << {
data_id: target_id,
depth: current_depth,
relationship: get_relationship_details(current_id, target_id)
}
queue << [target_id, current_depth + 1]
end
end
downstream
end
def get_transformations(data_id)
transformations = []
lineage_records = @redis.lrange('data_lineage', 0, -1)
lineage_records.each do |record|
parsed_record = JSON.parse(record)
if parsed_record['source_id'] == data_id || parsed_record['target_id'] == data_id
transformations << parsed_record['transformation'] if parsed_record['transformation']
end
end
transformations.uniq
end
def get_lineage_metadata(data_id)
{
created_at: get_data_creation_time(data_id),
last_modified: get_data_last_modified(data_id),
classification: get_data_classification(data_id),
owner: get_data_owner(data_id)
}
end
def get_lineage_records_in_range(start_date, end_date)
records = @redis.lrange('data_lineage', 0, -1)
if start_date && end_date
records.select do |record|
timestamp = Time.parse(JSON.parse(record)['timestamp'])
timestamp >= Time.parse(start_date) && timestamp <= Time.parse(end_date)
end
else
records
end
end
def extract_nodes(records)
nodes = Set.new
records.each do |record|
parsed_record = JSON.parse(record)
nodes.add(parsed_record['source_id'])
nodes.add(parsed_record['target_id'])
end
nodes.map { |node_id| { id: node_id, type: determine_node_type(node_id) } }
end
def extract_edges(records)
records.map do |record|
parsed_record = JSON.parse(record)
{
source: parsed_record['source_id'],
target: parsed_record['target_id'],
transformation: parsed_record['transformation'],
timestamp: parsed_record['timestamp']
}
end
end
def get_direct_dependents(data_id)
@redis.smembers("lineage:downstream:#{data_id}")
end
def get_indirect_dependents(data_id)
indirect = Set.new
queue = [data_id]
while queue.any?
current_id = queue.shift
direct_dependents = @redis.smembers("lineage:downstream:#{current_id}")
direct_dependents.each do |dependent_id|
unless indirect.include?(dependent_id)
indirect.add(dependent_id)
queue << dependent_id
end
end
end
indirect.to_a
end
def calculate_impact_score(data_id)
direct_count = get_direct_dependents(data_id).length
indirect_count = get_indirect_dependents(data_id).length
# Weighted impact score
(direct_count 0.7 + indirect_count 0.3).round(2)
end
def assess_impact_risk(data_id)
impact_score = calculate_impact_score(data_id)
case impact_score
when 0..5
'low'
when 6..20
'medium'
when 21..50
'high'
else
'critical'
end
end
def generate_impact_recommendations(data_id)
recommendations = []
impact_score = calculate_impact_score(data_id)
if impact_score > 50
recommendations << "High impact data - consider implementing additional safeguards"
end
if impact_score > 20
recommendations << "Medium impact data - review access controls and monitoring"
end
recommendations
end
def has_circular_dependencies(data_id)
visited = Set.new
rec_stack = Set.new
has_circular_dependencies_dfs(data_id, visited, rec_stack)
end
def has_circular_dependencies_dfs(node, visited, rec_stack)
visited.add(node)
rec_stack.add(node)
downstream = @redis.smembers("lineage:downstream:#{node}")
downstream.each do |neighbor|
if !visited.include?(neighbor)
return true if has_circular_dependencies_dfs(neighbor, visited, rec_stack)
elsif rec_stack.include?(neighbor)
return true
end
end
rec_stack.delete(node)
false
end
def is_orphaned_data(data_id)
upstream = @redis.smembers("lineage:upstream:#{data_id}")
downstream = @redis.smembers("lineage:downstream:#{data_id}")
upstream.empty? && downstream.empty?
end
def get_relationship_details(source_id, target_id)
relationship_data = @redis.hget("lineage:relationships", "#{source_id}:#{target_id}")
return {} unless relationship_data
JSON.parse(relationship_data)
end
def sanitize_query(query)
# Remove sensitive information from queries
query.gsub(/\b(password|secret|key|token)\s[:=]\s['"][^'"]*['"]/i, '\1 = [REDACTED]')
end
def get_current_user
# Implementation to get current user
'system'
end
def get_session_id
# Implementation to get session ID
SecureRandom.uuid
end
def determine_node_type(node_id)
# Implementation to determine node type
'data'
end
def get_data_creation_time(data_id)
# Implementation to get data creation time
Time.now.iso8601
end
def get_data_last_modified(data_id)
# Implementation to get data last modified time
Time.now.iso8601
end
def get_data_classification(data_id)
# Implementation to get data classification
'internal'
end
def get_data_owner(data_id)
# Implementation to get data owner
'unknown'
end
# Report generation methods
def generate_comprehensive_lineage_report(start_date, end_date)
{
report_type: 'comprehensive',
start_date: start_date,
end_date: end_date,
generated_at: Time.now.iso8601,
lineage_graph: get_lineage_graph(start_date, end_date),
summary: generate_lineage_summary(start_date, end_date)
}
end
def generate_data_flow_report(start_date, end_date)
{
report_type: 'data_flow',
start_date: start_date,
end_date: end_date,
generated_at: Time.now.iso8601,
data_flows: analyze_data_flows(start_date, end_date)
}
end
def generate_transformation_report(start_date, end_date)
{
report_type: 'transformation',
start_date: start_date,
end_date: end_date,
generated_at: Time.now.iso8601,
transformations: analyze_transformations(start_date, end_date)
}
end
def generate_compliance_lineage_report(start_date, end_date)
{
report_type: 'compliance',
start_date: start_date,
end_date: end_date,
generated_at: Time.now.iso8601,
compliance_status: assess_lineage_compliance(start_date, end_date)
}
end
def generate_lineage_summary(start_date, end_date)
# Implementation to generate lineage summary
{}
end
def analyze_data_flows(start_date, end_date)
# Implementation to analyze data flows
[]
end
def analyze_transformations(start_date, end_date)
# Implementation to analyze transformations
[]
end
def assess_lineage_compliance(start_date, end_date)
# Implementation to assess lineage compliance
{}
end
end
📊 Data Quality Management
Data Quality Manager
lib/data_quality_manager.rb
require 'tusk'
require 'redis'
require 'json'class DataQualityManager
def initialize(config_path = 'config/data_governance.tsk')
@config = Tusk.load(config_path)
@redis = Redis.new(url: @config['redis']['url'])
setup_quality_rules
end
def validate_data_quality(data, rules = nil)
rules ||= get_default_quality_rules
validation_results = []
rules.each do |rule|
result = apply_quality_rule(data, rule)
validation_results << result
end
quality_score = calculate_quality_score(validation_results)
quality_report = {
data_id: generate_data_id(data),
quality_score: quality_score,
validation_results: validation_results,
passed: quality_score >= @config['quality']['quality_threshold'].to_f,
timestamp: Time.now.iso8601
}
store_quality_report(quality_report)
quality_report
end
def monitor_data_quality(dataset_id, schedule = 'daily')
monitoring_config = {
dataset_id: dataset_id,
schedule: schedule,
rules: get_dataset_quality_rules(dataset_id),
threshold: @config['quality']['quality_threshold'].to_f,
notifications: get_quality_notifications(dataset_id)
}
store_monitoring_config(monitoring_config)
schedule_quality_monitoring(monitoring_config)
end
def generate_quality_report(dataset_id, start_date = nil, end_date = nil)
quality_reports = get_quality_reports(dataset_id, start_date, end_date)
report = {
dataset_id: dataset_id,
period: { start: start_date, end: end_date },
generated_at: Time.now.iso8601,
summary: generate_quality_summary(quality_reports),
trends: analyze_quality_trends(quality_reports),
issues: identify_quality_issues(quality_reports),
recommendations: generate_quality_recommendations(quality_reports)
}
store_generated_report(report)
report
end
def fix_data_quality_issues(dataset_id, issues)
fixes_applied = []
issues.each do |issue|
fix_result = apply_data_fix(dataset_id, issue)
fixes_applied << fix_result if fix_result[:success]
end
{
dataset_id: dataset_id,
fixes_applied: fixes_applied,
total_issues: issues.length,
successful_fixes: fixes_applied.length,
timestamp: Time.now.iso8601
}
end
private
def setup_quality_rules
@quality_rules = {
'required' => ->(data, field) { !data[field].nil? && !data[field].to_s.empty? },
'format' => ->(data, field, format) { validate_format(data[field], format) },
'range' => ->(data, field, min, max) { validate_range(data[field], min, max) },
'uniqueness' => ->(data, field) { validate_uniqueness(data, field) },
'completeness' => ->(data, fields) { validate_completeness(data, fields) },
'consistency' => ->(data, rules) { validate_consistency(data, rules) },
'accuracy' => ->(data, field, reference) { validate_accuracy(data, field, reference) }
}
end
def apply_quality_rule(data, rule)
rule_type = rule[:type]
field = rule[:field]
parameters = rule[:parameters] || {}
begin
validation_function = @quality_rules[rule_type]
if validation_function
passed = validation_function.call(data, field, *parameters.values)
{
rule_type: rule_type,
field: field,
passed: passed,
error_message: passed ? nil : rule[:error_message],
severity: rule[:severity] || 'medium'
}
else
{
rule_type: rule_type,
field: field,
passed: false,
error_message: "Unknown rule type: #{rule_type}",
severity: 'high'
}
end
rescue => e
{
rule_type: rule_type,
field: field,
passed: false,
error_message: "Validation error: #{e.message}",
severity: 'high'
}
end
end
def validate_format(value, format)
return true if value.nil? || value.to_s.empty?
case format
when 'email'
value.match?(/\A[\w+\-.]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+\z/i)
when 'phone'
value.match?(/\b\d{3}[-.]?\d{3}[-.]?\d{4}\b/)
when 'date'
begin
Date.parse(value.to_s)
true
rescue
false
end
when 'numeric'
value.to_s.match?(/^\d+(\.\d+)?$/)
when 'url'
begin
URI.parse(value.to_s)
true
rescue
false
end
else
true
end
end
def validate_range(value, min, max)
return true if value.nil?
numeric_value = value.to_f
numeric_value >= min.to_f && numeric_value <= max.to_f
end
def validate_uniqueness(data, field)
# This would need to be implemented based on your data storage
true
end
def validate_completeness(data, fields)
fields.all? { |field| !data[field].nil? && !data[field].to_s.empty? }
end
def validate_consistency(data, rules)
# Implementation for consistency validation
true
end
def validate_accuracy(data, field, reference)
# Implementation for accuracy validation
true
end
def calculate_quality_score(validation_results)
return 100 if validation_results.empty?
passed_rules = validation_results.count { |result| result[:passed] }
total_rules = validation_results.length
(passed_rules.to_f / total_rules * 100).round(2)
end
def generate_data_id(data)
Digest::SHA256.hexdigest(data.to_json)
end
def store_quality_report(report)
@redis.lpush("quality_reports:#{report[:data_id]}", report.to_json)
@redis.ltrim("quality_reports:#{report[:data_id]}", 0, 99)
end
def get_default_quality_rules
[
{ type: 'required', field: 'id', error_message: 'ID is required' },
{ type: 'format', field: 'email', parameters: { format: 'email' }, error_message: 'Invalid email format' },
{ type: 'range', field: 'age', parameters: { min: 0, max: 150 }, error_message: 'Age must be between 0 and 150' }
]
end
def get_dataset_quality_rules(dataset_id)
# Implementation to get dataset-specific quality rules
get_default_quality_rules
end
def get_quality_notifications(dataset_id)
# Implementation to get quality notifications
[]
end
def store_monitoring_config(config)
@redis.hset('quality_monitoring', config[:dataset_id], config.to_json)
end
def schedule_quality_monitoring(config)
# Implementation to schedule quality monitoring
end
def get_quality_reports(dataset_id, start_date, end_date)
# Implementation to get quality reports
[]
end
def generate_quality_summary(reports)
# Implementation to generate quality summary
{}
end
def analyze_quality_trends(reports)
# Implementation to analyze quality trends
{}
end
def identify_quality_issues(reports)
# Implementation to identify quality issues
[]
end
def generate_quality_recommendations(reports)
# Implementation to generate quality recommendations
[]
end
def store_generated_report(report)
@redis.hset('generated_quality_reports', report[:dataset_id], report.to_json)
end
def apply_data_fix(dataset_id, issue)
# Implementation to apply data fixes
{ success: true, issue: issue, fix_applied: 'placeholder' }
end
end
🎯 Configuration Management
Data Governance Configuration
config/data_governance_features.tsk
[data_governance]
enabled: @env("DATA_GOVERNANCE_ENABLED", "true")
organization: @env("ORGANIZATION_NAME", "Acme Corp")
data_steward: @env("DATA_STEWARD", "data.team@acme.com")
retention_policy: @env("DATA_RETENTION_POLICY", "7y")[classification]
auto_classification: @env("AUTO_CLASSIFICATION_ENABLED", "true")
sensitive_patterns: @env("SENSITIVE_PATTERNS", "ssn,credit_card,email,phone")
classification_levels: @env("CLASSIFICATION_LEVELS", "public,internal,confidential,restricted")
confidence_threshold: @env("CLASSIFICATION_CONFIDENCE_THRESHOLD", "80")
[lineage]
tracking_enabled: @env("LINEAGE_TRACKING_ENABLED", "true")
retention_period: @env("LINEAGE_RETENTION_PERIOD", "10y")
real_time_tracking: @env("REAL_TIME_LINEAGE_TRACKING", "true")
graph_depth_limit: @env("LINEAGE_GRAPH_DEPTH_LIMIT", "10")
[quality]
monitoring_enabled: @env("QUALITY_MONITORING_ENABLED", "true")
validation_rules: @env("QUALITY_VALIDATION_RULES", "required,format,range,uniqueness")
quality_threshold: @env("QUALITY_THRESHOLD", "95")
monitoring_schedule: @env("QUALITY_MONITORING_SCHEDULE", "daily")
[compliance]
gdpr_enabled: @env("GDPR_ENABLED", "true")
data_retention_policy: @env("DATA_RETENTION_POLICY", "7y")
privacy_controls: @env("PRIVACY_CONTROLS_ENABLED", "true")
consent_management: @env("CONSENT_MANAGEMENT_ENABLED", "true")
data_subject_rights: @env("DATA_SUBJECT_RIGHTS_ENABLED", "true")
[monitoring]
quality_monitoring: @env("QUALITY_MONITORING_ENABLED", "true")
lineage_monitoring: @env("LINEAGE_MONITORING_ENABLED", "true")
classification_monitoring: @env("CLASSIFICATION_MONITORING_ENABLED", "true")
alerting_enabled: @env("DATA_GOVERNANCE_ALERTING_ENABLED", "true")
🎯 Summary
This comprehensive guide covers data governance with TuskLang and Ruby, including:
- Data Classification: Automatic classification of data based on content and context - Data Lineage Tracking: Comprehensive tracking of data flow and transformations - Data Quality Management: Quality validation, monitoring, and issue resolution - Configuration Management: Enterprise-grade data governance configuration - Compliance Frameworks: GDPR and other regulatory compliance features - Monitoring and Alerting: Real-time monitoring of data governance metrics
The data governance features with TuskLang provide a robust foundation for building applications that maintain data integrity, traceability, and regulatory compliance while ensuring data quality and proper classification.