🐍 🔐 Security Hardening - Python

Python Documentation

🔐 Security Hardening - Python

"We don't bow to any king" - Security Edition

TuskLang provides comprehensive security features to protect your applications from common vulnerabilities and ensure data integrity.

🚀 Input Validation & Sanitization

Basic Input Validation

from tsk import TSK
import re

Security-focused configuration

security_config = TSK.from_string(""" [validation] validate_email_fujsen = ''' def validate_email(email): if not email or not isinstance(email, str): return {'valid': False, 'error': 'Email must be a non-empty string'} # Strict email validation pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(pattern, email): return {'valid': False, 'error': 'Invalid email format'} # Check for suspicious patterns if '..' in email or email.count('@') != 1: return {'valid': False, 'error': 'Suspicious email pattern'} return {'valid': True, 'email': email} '''

validate_password_fujsen = ''' def validate_password(password): if not password or not isinstance(password, str): return {'valid': False, 'error': 'Password must be a non-empty string'} if len(password) < 12: return {'valid': False, 'error': 'Password must be at least 12 characters'} if not re.search(r'[A-Z]', password): return {'valid': False, 'error': 'Password must contain uppercase letter'} if not re.search(r'[a-z]', password): return {'valid': False, 'error': 'Password must contain lowercase letter'} if not re.search(r'\\d', password): return {'valid': False, 'error': 'Password must contain digit'} if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): return {'valid': False, 'error': 'Password must contain special character'} # Check for common weak passwords weak_passwords = ['password', '123456', 'qwerty', 'admin'] if password.lower() in weak_passwords: return {'valid': False, 'error': 'Password is too common'} return {'valid': True, 'password': password} '''

validate_sql_input_fujsen = ''' def validate_sql_input(input_data): if not input_data or not isinstance(input_data, str): return {'valid': False, 'error': 'Input must be a non-empty string'} # Check for SQL injection patterns sql_patterns = [ r'\\b(union|select|insert|update|delete|drop|create|alter)\\b', r'--|/\\|\\/', r'[;\\'"]', r'xp_|sp_', r'<script|javascript:', r'data:text/html' ] for pattern in sql_patterns: if re.search(pattern, input_data, re.IGNORECASE): return {'valid': False, 'error': 'Potentially malicious input detected'} return {'valid': True, 'input': input_data} ''' """)

Test validation functions

def test_input_validation(): # Test email validation result = security_config.execute_fujsen('validation', 'validate_email', 'test@example.com') assert result['valid'] == True result = security_config.execute_fujsen('validation', 'validate_email', 'invalid-email') assert result['valid'] == False # Test password validation result = security_config.execute_fujsen('validation', 'validate_password', 'StrongPass123!') assert result['valid'] == True result = security_config.execute_fujsen('validation', 'validate_password', 'weak') assert result['valid'] == False # Test SQL input validation result = security_config.execute_fujsen('validation', 'validate_sql_input', 'normal text') assert result['valid'] == True result = security_config.execute_fujsen('validation', 'validate_sql_input', "'; DROP TABLE users; --") assert result['valid'] == False

Advanced Input Validation

Advanced security validation

advanced_security = TSK.from_string(""" [advanced_validation] validate_file_upload_fujsen = ''' def validate_file_upload(file_data): if not file_data: return {'valid': False, 'error': 'No file data provided'} # Check file size (max 10MB) max_size = 10 1024 1024 if len(file_data) > max_size: return {'valid': False, 'error': 'File too large'} # Check file type by magic bytes allowed_types = { b'\\xff\\xd8\\xff': 'image/jpeg', b'\\x89PNG\\r\\n\\x1a\\n': 'image/png', b'GIF87a': 'image/gif', b'GIF89a': 'image/gif', b'%PDF': 'application/pdf' } file_header = file_data[:8] detected_type = None for magic_bytes, mime_type in allowed_types.items(): if file_header.startswith(magic_bytes): detected_type = mime_type break if not detected_type: return {'valid': False, 'error': 'Unsupported file type'} return { 'valid': True, 'file_type': detected_type, 'file_size': len(file_data) } '''

validate_json_input_fujsen = ''' def validate_json_input(json_data, schema): import json try: if isinstance(json_data, str): data = json.loads(json_data) else: data = json_data # Basic schema validation if not isinstance(data, dict): return {'valid': False, 'error': 'Data must be an object'} # Check required fields for field in schema.get('required', []): if field not in data: return {'valid': False, 'error': f'Missing required field: {field}'} # Validate field types for field, expected_type in schema.get('types', {}).items(): if field in data: if not isinstance(data[field], expected_type): return {'valid': False, 'error': f'Invalid type for {field}'} # Validate string lengths for field, max_length in schema.get('max_lengths', {}).items(): if field in data and isinstance(data[field], str): if len(data[field]) > max_length: return {'valid': False, 'error': f'{field} too long'} return {'valid': True, 'data': data} except json.JSONDecodeError: return {'valid': False, 'error': 'Invalid JSON format'} '''

validate_url_fujsen = ''' def validate_url(url): import urllib.parse if not url or not isinstance(url, str): return {'valid': False, 'error': 'URL must be a non-empty string'} try: parsed = urllib.parse.urlparse(url) # Check for valid scheme if parsed.scheme not in ['http', 'https']: return {'valid': False, 'error': 'Only HTTP and HTTPS schemes allowed'} # Check for valid netloc if not parsed.netloc: return {'valid': False, 'error': 'Invalid URL format'} # Check for suspicious patterns suspicious_patterns = [ 'javascript:', 'data:text/html', 'vbscript:', 'file://' ] for pattern in suspicious_patterns: if pattern in url.lower(): return {'valid': False, 'error': 'Suspicious URL pattern detected'} return {'valid': True, 'url': url, 'parsed': parsed} except Exception: return {'valid': False, 'error': 'Invalid URL format'} ''' """)

🔐 SQL Injection Prevention

Parameterized Queries

SQL injection prevention

sql_security = TSK.from_string(""" [database_security] safe_user_query_fujsen = ''' def safe_user_query(user_id): # Always use parameterized queries user = query("SELECT id, username, email FROM users WHERE id = ?", user_id) if not user: return None return { 'id': user[0][0], 'username': user[0][1], 'email': user[0][2] } '''

safe_search_fujsen = ''' def safe_search(search_term, limit=10): # Use parameterized queries with LIKE results = query(""" SELECT id, username, email FROM users WHERE username LIKE ? OR email LIKE ? LIMIT ? """, f'%{search_term}%', f'%{search_term}%', limit) return [{ 'id': row[0], 'username': row[1], 'email': row[2] } for row in results] '''

safe_insert_fujsen = ''' def safe_insert(username, email, password_hash): # Validate input first if not username or not email or not password_hash: raise ValueError("All fields are required") # Check for existing user existing = query("SELECT id FROM users WHERE username = ? OR email = ?", username, email) if existing: raise ValueError("User already exists") # Safe insert with parameters user_id = execute(""" INSERT INTO users (username, email, password_hash, created_at) VALUES (?, ?, ?, datetime('now')) """, username, email, password_hash) return {'id': user_id, 'username': username, 'email': email} '''

safe_update_fujsen = ''' def safe_update(user_id, **updates): # Validate user exists user = query("SELECT id FROM users WHERE id = ?", user_id) if not user: raise ValueError("User not found") # Build safe update query allowed_fields = ['username', 'email', 'status'] set_clauses = [] values = [] for field, value in updates.items(): if field in allowed_fields: set_clauses.append(f"{field} = ?") values.append(value) if not set_clauses: return {'message': 'No valid fields to update'} values.append(user_id) query_str = f"UPDATE users SET {', '.join(set_clauses)} WHERE id = ?" execute(query_str, *values) return {'message': 'User updated successfully'} ''' """)

Test SQL security

def test_sql_security(): # Test safe user query result = sql_security.execute_fujsen('database_security', 'safe_user_query', 1) # Should work safely with any user_id input # Test safe search results = sql_security.execute_fujsen('database_security', 'safe_search', 'test') # Should work safely with any search term # Test safe insert result = sql_security.execute_fujsen('database_security', 'safe_insert', 'newuser', 'new@example.com', 'hashed_password') # Should work safely with any input

🔐 Authentication & Authorization

Secure Authentication

Secure authentication system

auth_security = TSK.from_string(""" [authentication] hash_password_fujsen = ''' def hash_password(password): import bcrypt import secrets # Generate salt salt = bcrypt.gensalt(rounds=12) # Hash password password_bytes = password.encode('utf-8') hashed = bcrypt.hashpw(password_bytes, salt) return hashed.decode('utf-8') '''

verify_password_fujsen = ''' def verify_password(password, hashed_password): import bcrypt password_bytes = password.encode('utf-8') hashed_bytes = hashed_password.encode('utf-8') return bcrypt.checkpw(password_bytes, hashed_bytes) '''

generate_token_fujsen = ''' def generate_token(user_id, expires_in=3600): import jwt import time import secrets # Generate secure secret secret = secrets.token_urlsafe(32) payload = { 'user_id': user_id, 'exp': time.time() + expires_in, 'iat': time.time(), 'jti': secrets.token_urlsafe(16) # JWT ID for uniqueness } token = jwt.encode(payload, secret, algorithm='HS256') return { 'token': token, 'expires_in': expires_in, 'token_type': 'Bearer' } '''

verify_token_fujsen = ''' def verify_token(token): import jwt import time try: # Decode token payload = jwt.decode(token, secret_key, algorithms=['HS256']) # Check expiration if payload['exp'] < time.time(): return {'valid': False, 'error': 'Token expired'} # Get user info user = query("SELECT id, username, email, status FROM users WHERE id = ?", payload['user_id']) if not user: return {'valid': False, 'error': 'User not found'} if user[0][3] != 'active': return {'valid': False, 'error': 'User account inactive'} return { 'valid': True, 'user': { 'id': user[0][0], 'username': user[0][1], 'email': user[0][2] } } except jwt.ExpiredSignatureError: return {'valid': False, 'error': 'Token expired'} except jwt.InvalidTokenError: return {'valid': False, 'error': 'Invalid token'} '''

[authorization] check_permission_fujsen = ''' def check_permission(user_id, resource, action): # Get user roles roles = query(""" SELECT r.name FROM user_roles ur JOIN roles r ON ur.role_id = r.id WHERE ur.user_id = ? """, user_id) user_roles = [role[0] for role in roles] # Check permissions permissions = query(""" SELECT rp.permission FROM role_permissions rp JOIN roles r ON rp.role_id = r.id WHERE r.name IN ({}) AND rp.resource = ? AND rp.action = ? """.format(','.join(['?' for _ in user_roles])), *user_roles, resource, action) return len(permissions) > 0 '''

require_permission_fujsen = ''' def require_permission(user_id, resource, action): has_permission = check_permission(user_id, resource, action) if not has_permission: raise PermissionError(f"Access denied: {action} on {resource}") return True ''' """)

🔐 Data Encryption

Field-Level Encryption

Data encryption system

encryption_security = TSK.from_string(""" [encryption] encrypt_field_fujsen = ''' def encrypt_field(data, field_name): from cryptography.fernet import Fernet import base64 # Get encryption key for field key = get_field_key(field_name) fernet = Fernet(key) # Encrypt data data_bytes = str(data).encode('utf-8') encrypted = fernet.encrypt(data_bytes) return base64.b64encode(encrypted).decode('utf-8') '''

decrypt_field_fujsen = ''' def decrypt_field(encrypted_data, field_name): from cryptography.fernet import Fernet import base64 # Get encryption key for field key = get_field_key(field_name) fernet = Fernet(key) # Decrypt data encrypted_bytes = base64.b64decode(encrypted_data.encode('utf-8')) decrypted = fernet.decrypt(encrypted_bytes) return decrypted.decode('utf-8') '''

get_field_key_fujsen = ''' def get_field_key(field_name): # Get field-specific encryption key key_data = query("SELECT encryption_key FROM field_keys WHERE field_name = ?", field_name) if not key_data: # Generate new key from cryptography.fernet import Fernet key = Fernet.generate_key() execute("INSERT INTO field_keys (field_name, encryption_key) VALUES (?, ?)", field_name, key.decode('utf-8')) return key return key_data[0][0].encode('utf-8') '''

store_sensitive_data_fujsen = ''' def store_sensitive_data(user_id, data): # Encrypt sensitive fields encrypted_ssn = encrypt_field(data.get('ssn'), 'ssn') encrypted_credit_card = encrypt_field(data.get('credit_card'), 'credit_card') # Store encrypted data record_id = execute(""" INSERT INTO sensitive_data (user_id, ssn_encrypted, credit_card_encrypted) VALUES (?, ?, ?) """, user_id, encrypted_ssn, encrypted_credit_card) return {'id': record_id, 'encrypted': True} '''

retrieve_sensitive_data_fujsen = ''' def retrieve_sensitive_data(user_id): # Get encrypted data data = query(""" SELECT ssn_encrypted, credit_card_encrypted FROM sensitive_data WHERE user_id = ? """, user_id) if not data: return None # Decrypt data ssn = decrypt_field(data[0][0], 'ssn') credit_card = decrypt_field(data[0][1], 'credit_card') return { 'ssn': ssn, 'credit_card': credit_card } ''' """)

🔐 Rate Limiting & DDoS Protection

Rate Limiting Implementation

Rate limiting system

rate_limit_security = TSK.from_string(""" [rate_limiting] check_rate_limit_fujsen = ''' def check_rate_limit(identifier, action, limit, window): import time current_time = time.time() window_start = current_time - window # Count recent requests count = query(""" SELECT COUNT(*) FROM rate_limits WHERE identifier = ? AND action = ? AND timestamp > ? """, identifier, action, window_start) current_count = count[0][0] if current_count >= limit: return {'allowed': False, 'remaining': 0, 'reset_time': window_start + window} # Record this request execute(""" INSERT INTO rate_limits (identifier, action, timestamp) VALUES (?, ?, ?) """, identifier, action, current_time) return {'allowed': True, 'remaining': limit - current_count - 1, 'reset_time': window_start + window} '''

cleanup_rate_limits_fujsen = ''' def cleanup_rate_limits(): import time # Remove old rate limit records (older than 1 hour) cutoff_time = time.time() - 3600 execute("DELETE FROM rate_limits WHERE timestamp < ?", cutoff_time) return {'cleaned': True, 'cutoff_time': cutoff_time} '''

[ddos_protection] detect_anomaly_fujsen = ''' def detect_anomaly(ip_address, time_window=300): import time current_time = time.time() window_start = current_time - time_window # Count requests from IP count = query(""" SELECT COUNT(*) FROM request_log WHERE ip_address = ? AND timestamp > ? """, ip_address, window_start) request_count = count[0][0] # Check for anomaly (more than 100 requests in 5 minutes) if request_count > 100: # Log anomaly execute(""" INSERT INTO security_events (ip_address, event_type, details, timestamp) VALUES (?, 'ddos_attempt', ?, ?) """, ip_address, f'{request_count} requests in {time_window}s', current_time) return {'anomaly': True, 'request_count': request_count, 'action': 'block'} return {'anomaly': False, 'request_count': request_count} '''

block_ip_fujsen = ''' def block_ip(ip_address, reason, duration=3600): import time block_until = time.time() + duration execute(""" INSERT OR REPLACE INTO ip_blocks (ip_address, reason, blocked_until, created_at) VALUES (?, ?, ?, ?) """, ip_address, reason, block_until, time.time()) return {'blocked': True, 'ip': ip_address, 'until': block_until} '''

check_ip_blocked_fujsen = ''' def check_ip_blocked(ip_address): import time block = query(""" SELECT reason, blocked_until FROM ip_blocks WHERE ip_address = ? AND blocked_until > ? """, ip_address, time.time()) if block: return {'blocked': True, 'reason': block[0][0], 'until': block[0][1]} return {'blocked': False} ''' """)

🔐 Audit Logging

Comprehensive Audit System

Audit logging system

audit_security = TSK.from_string(""" [audit] log_security_event_fujsen = ''' def log_security_event(event_type, details, user_id=None, ip_address=None, severity='info'): import time import json event_data = { 'event_type': event_type, 'details': details, 'user_id': user_id, 'ip_address': ip_address, 'severity': severity, 'timestamp': time.time() } # Log to database event_id = execute(""" INSERT INTO security_audit_log (event_type, details, user_id, ip_address, severity, timestamp) VALUES (?, ?, ?, ?, ?, ?) """, event_type, json.dumps(details), user_id, ip_address, severity, time.time()) # Log to file for critical events if severity in ['critical', 'high']: log_to_file('security.log', event_data) return {'logged': True, 'event_id': event_id} '''

log_user_action_fujsen = ''' def log_user_action(user_id, action, resource, details=None, ip_address=None): import time import json # Log user action log_id = execute(""" INSERT INTO user_audit_log (user_id, action, resource, details, ip_address, timestamp) VALUES (?, ?, ?, ?, ?, ?) """, user_id, action, resource, json.dumps(details) if details else None, ip_address, time.time()) # Check for suspicious patterns check_suspicious_activity(user_id, action, resource) return {'logged': True, 'log_id': log_id} '''

check_suspicious_activity_fujsen = ''' def check_suspicious_activity(user_id, action, resource): import time # Check for rapid repeated actions recent_actions = query(""" SELECT COUNT(*) FROM user_audit_log WHERE user_id = ? AND action = ? AND timestamp > ? """, user_id, action, time.time() - 60) # Last minute if recent_actions[0][0] > 10: # More than 10 actions per minute log_security_event('suspicious_activity', { 'user_id': user_id, 'action': action, 'resource': resource, 'count': recent_actions[0][0] }, user_id, severity='warning') # Check for unusual resource access unusual_resources = ['admin', 'config', 'system', 'password'] if any(resource in unusual_resources for resource in [resource]): log_security_event('sensitive_resource_access', { 'user_id': user_id, 'action': action, 'resource': resource }, user_id, severity='info') '''

log_to_file_fujsen = ''' def log_to_file(filename, data): import json import time log_entry = { 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'data': data } with open(filename, 'a') as f: f.write(json.dumps(log_entry) + '\\n') '''

get_audit_report_fujsen = ''' def get_audit_report(start_time=None, end_time=None, user_id=None, event_type=None): import time if not start_time: start_time = time.time() - 86400 # Last 24 hours if not end_time: end_time = time.time() # Build query query_parts = ["SELECT * FROM security_audit_log WHERE timestamp BETWEEN ? AND ?"] params = [start_time, end_time] if user_id: query_parts.append("AND user_id = ?") params.append(user_id) if event_type: query_parts.append("AND event_type = ?") params.append(event_type) query_parts.append("ORDER BY timestamp DESC") results = query(" ".join(query_parts), *params) return [{ 'id': row[0], 'event_type': row[1], 'details': row[2], 'user_id': row[3], 'ip_address': row[4], 'severity': row[5], 'timestamp': row[6] } for row in results] ''' """)

🔐 Configuration Security

Secure Configuration Management

Secure configuration

config_security = TSK.from_string(""" [secure_config] load_encrypted_config_fujsen = ''' def load_encrypted_config(config_path, encryption_key): import json from cryptography.fernet import Fernet # Read encrypted config with open(config_path, 'rb') as f: encrypted_data = f.read() # Decrypt fernet = Fernet(encryption_key) decrypted_data = fernet.decrypt(encrypted_data) # Parse JSON config = json.loads(decrypted_data.decode('utf-8')) return config '''

save_encrypted_config_fujsen = ''' def save_encrypted_config(config_data, config_path, encryption_key): import json from cryptography.fernet import Fernet # Serialize config config_json = json.dumps(config_data, indent=2) # Encrypt fernet = Fernet(encryption_key) encrypted_data = fernet.encrypt(config_json.encode('utf-8')) # Save encrypted config with open(config_path, 'wb') as f: f.write(encrypted_data) return {'saved': True, 'path': config_path} '''

validate_config_security_fujsen = ''' def validate_config_security(config): security_issues = [] # Check for hardcoded secrets sensitive_keys = ['password', 'secret', 'key', 'token', 'api_key'] def check_dict(d, path=''): for key, value in d.items(): current_path = f"{path}.{key}" if path else key if isinstance(value, dict): check_dict(value, current_path) elif isinstance(value, str): key_lower = key.lower() if any(sensitive in key_lower for sensitive in sensitive_keys): if value and value != '*': security_issues.append(f"Hardcoded secret found: {current_path}") check_dict(config) # Check for insecure defaults if config.get('security', {}).get('debug', False): security_issues.append("Debug mode enabled in production") if config.get('database', {}).get('ssl', False) == False: security_issues.append("Database SSL not enabled") return { 'secure': len(security_issues) == 0, 'issues': security_issues } '''

rotate_secrets_fujsen = ''' def rotate_secrets(): import secrets import time # Generate new secrets new_secret_key = secrets.token_urlsafe(32) new_jwt_secret = secrets.token_urlsafe(32) new_api_key = secrets.token_urlsafe(32) # Update secrets in database execute(""" INSERT OR REPLACE INTO secrets (key_name, secret_value, rotated_at) VALUES ('secret_key', ?, ?), ('jwt_secret', ?, ?), ('api_key', ?, ?) """, new_secret_key, time.time(), new_jwt_secret, time.time(), new_api_key, time.time()) return { 'rotated': True, 'timestamp': time.time(), 'secrets': ['secret_key', 'jwt_secret', 'api_key'] } ''' """)

🎯 Security Best Practices

1. Input Validation

- Always validate and sanitize all input data - Use parameterized queries to prevent SQL injection - Implement strict type checking - Validate file uploads and URLs

2. Authentication & Authorization

- Use strong password hashing (bcrypt) - Implement proper session management - Use JWT tokens with short expiration - Implement role-based access control

3. Data Protection

- Encrypt sensitive data at rest - Use HTTPS for all communications - Implement field-level encryption - Regular secret rotation

4. Rate Limiting

- Implement rate limiting for all endpoints - Monitor for DDoS attacks - Block suspicious IP addresses - Log security events

5. Audit Logging

- Log all security-relevant events - Monitor for suspicious activity - Regular security audits - Compliance reporting

🚀 Next Steps

1. Implement input validation for all user inputs 2. Set up authentication with secure password hashing 3. Configure encryption for sensitive data 4. Enable rate limiting and DDoS protection 5. Set up audit logging for security monitoring

---

"We don't bow to any king" - TuskLang provides comprehensive security features to protect your applications from vulnerabilities. Implement proper validation, encryption, and monitoring to build secure, robust applications!