🐍 🔐 Security Hardening - Python
🔐 Security Hardening - Python
"We don't bow to any king" - Security Edition
TuskLang provides comprehensive security features to protect your applications from common vulnerabilities and ensure data integrity.
🚀 Input Validation & Sanitization
Basic Input Validation
from tsk import TSK
import reSecurity-focused configuration
security_config = TSK.from_string("""
[validation]
validate_email_fujsen = '''
def validate_email(email):
if not email or not isinstance(email, str):
return {'valid': False, 'error': 'Email must be a non-empty string'}
# Strict email validation
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(pattern, email):
return {'valid': False, 'error': 'Invalid email format'}
# Check for suspicious patterns
if '..' in email or email.count('@') != 1:
return {'valid': False, 'error': 'Suspicious email pattern'}
return {'valid': True, 'email': email}
'''validate_password_fujsen = '''
def validate_password(password):
if not password or not isinstance(password, str):
return {'valid': False, 'error': 'Password must be a non-empty string'}
if len(password) < 12:
return {'valid': False, 'error': 'Password must be at least 12 characters'}
if not re.search(r'[A-Z]', password):
return {'valid': False, 'error': 'Password must contain uppercase letter'}
if not re.search(r'[a-z]', password):
return {'valid': False, 'error': 'Password must contain lowercase letter'}
if not re.search(r'\\d', password):
return {'valid': False, 'error': 'Password must contain digit'}
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
return {'valid': False, 'error': 'Password must contain special character'}
# Check for common weak passwords
weak_passwords = ['password', '123456', 'qwerty', 'admin']
if password.lower() in weak_passwords:
return {'valid': False, 'error': 'Password is too common'}
return {'valid': True, 'password': password}
'''
validate_sql_input_fujsen = '''
def validate_sql_input(input_data):
if not input_data or not isinstance(input_data, str):
return {'valid': False, 'error': 'Input must be a non-empty string'}
# Check for SQL injection patterns
sql_patterns = [
r'\\b(union|select|insert|update|delete|drop|create|alter)\\b',
r'--|/\\|\\/',
r'[;\\'"]',
r'xp_|sp_',
r'<script|javascript:',
r'data:text/html'
]
for pattern in sql_patterns:
if re.search(pattern, input_data, re.IGNORECASE):
return {'valid': False, 'error': 'Potentially malicious input detected'}
return {'valid': True, 'input': input_data}
'''
""")
Test validation functions
def test_input_validation():
# Test email validation
result = security_config.execute_fujsen('validation', 'validate_email', 'test@example.com')
assert result['valid'] == True
result = security_config.execute_fujsen('validation', 'validate_email', 'invalid-email')
assert result['valid'] == False
# Test password validation
result = security_config.execute_fujsen('validation', 'validate_password', 'StrongPass123!')
assert result['valid'] == True
result = security_config.execute_fujsen('validation', 'validate_password', 'weak')
assert result['valid'] == False
# Test SQL input validation
result = security_config.execute_fujsen('validation', 'validate_sql_input', 'normal text')
assert result['valid'] == True
result = security_config.execute_fujsen('validation', 'validate_sql_input', "'; DROP TABLE users; --")
assert result['valid'] == False
Advanced Input Validation
Advanced security validation
advanced_security = TSK.from_string("""
[advanced_validation]
validate_file_upload_fujsen = '''
def validate_file_upload(file_data):
if not file_data:
return {'valid': False, 'error': 'No file data provided'}
# Check file size (max 10MB)
max_size = 10 1024 1024
if len(file_data) > max_size:
return {'valid': False, 'error': 'File too large'}
# Check file type by magic bytes
allowed_types = {
b'\\xff\\xd8\\xff': 'image/jpeg',
b'\\x89PNG\\r\\n\\x1a\\n': 'image/png',
b'GIF87a': 'image/gif',
b'GIF89a': 'image/gif',
b'%PDF': 'application/pdf'
}
file_header = file_data[:8]
detected_type = None
for magic_bytes, mime_type in allowed_types.items():
if file_header.startswith(magic_bytes):
detected_type = mime_type
break
if not detected_type:
return {'valid': False, 'error': 'Unsupported file type'}
return {
'valid': True,
'file_type': detected_type,
'file_size': len(file_data)
}
'''validate_json_input_fujsen = '''
def validate_json_input(json_data, schema):
import json
try:
if isinstance(json_data, str):
data = json.loads(json_data)
else:
data = json_data
# Basic schema validation
if not isinstance(data, dict):
return {'valid': False, 'error': 'Data must be an object'}
# Check required fields
for field in schema.get('required', []):
if field not in data:
return {'valid': False, 'error': f'Missing required field: {field}'}
# Validate field types
for field, expected_type in schema.get('types', {}).items():
if field in data:
if not isinstance(data[field], expected_type):
return {'valid': False, 'error': f'Invalid type for {field}'}
# Validate string lengths
for field, max_length in schema.get('max_lengths', {}).items():
if field in data and isinstance(data[field], str):
if len(data[field]) > max_length:
return {'valid': False, 'error': f'{field} too long'}
return {'valid': True, 'data': data}
except json.JSONDecodeError:
return {'valid': False, 'error': 'Invalid JSON format'}
'''
validate_url_fujsen = '''
def validate_url(url):
import urllib.parse
if not url or not isinstance(url, str):
return {'valid': False, 'error': 'URL must be a non-empty string'}
try:
parsed = urllib.parse.urlparse(url)
# Check for valid scheme
if parsed.scheme not in ['http', 'https']:
return {'valid': False, 'error': 'Only HTTP and HTTPS schemes allowed'}
# Check for valid netloc
if not parsed.netloc:
return {'valid': False, 'error': 'Invalid URL format'}
# Check for suspicious patterns
suspicious_patterns = [
'javascript:',
'data:text/html',
'vbscript:',
'file://'
]
for pattern in suspicious_patterns:
if pattern in url.lower():
return {'valid': False, 'error': 'Suspicious URL pattern detected'}
return {'valid': True, 'url': url, 'parsed': parsed}
except Exception:
return {'valid': False, 'error': 'Invalid URL format'}
'''
""")
🔐 SQL Injection Prevention
Parameterized Queries
SQL injection prevention
sql_security = TSK.from_string("""
[database_security]
safe_user_query_fujsen = '''
def safe_user_query(user_id):
# Always use parameterized queries
user = query("SELECT id, username, email FROM users WHERE id = ?", user_id)
if not user:
return None
return {
'id': user[0][0],
'username': user[0][1],
'email': user[0][2]
}
'''safe_search_fujsen = '''
def safe_search(search_term, limit=10):
# Use parameterized queries with LIKE
results = query("""
SELECT id, username, email
FROM users
WHERE username LIKE ? OR email LIKE ?
LIMIT ?
""", f'%{search_term}%', f'%{search_term}%', limit)
return [{
'id': row[0],
'username': row[1],
'email': row[2]
} for row in results]
'''
safe_insert_fujsen = '''
def safe_insert(username, email, password_hash):
# Validate input first
if not username or not email or not password_hash:
raise ValueError("All fields are required")
# Check for existing user
existing = query("SELECT id FROM users WHERE username = ? OR email = ?", username, email)
if existing:
raise ValueError("User already exists")
# Safe insert with parameters
user_id = execute("""
INSERT INTO users (username, email, password_hash, created_at)
VALUES (?, ?, ?, datetime('now'))
""", username, email, password_hash)
return {'id': user_id, 'username': username, 'email': email}
'''
safe_update_fujsen = '''
def safe_update(user_id, **updates):
# Validate user exists
user = query("SELECT id FROM users WHERE id = ?", user_id)
if not user:
raise ValueError("User not found")
# Build safe update query
allowed_fields = ['username', 'email', 'status']
set_clauses = []
values = []
for field, value in updates.items():
if field in allowed_fields:
set_clauses.append(f"{field} = ?")
values.append(value)
if not set_clauses:
return {'message': 'No valid fields to update'}
values.append(user_id)
query_str = f"UPDATE users SET {', '.join(set_clauses)} WHERE id = ?"
execute(query_str, *values)
return {'message': 'User updated successfully'}
'''
""")
Test SQL security
def test_sql_security():
# Test safe user query
result = sql_security.execute_fujsen('database_security', 'safe_user_query', 1)
# Should work safely with any user_id input
# Test safe search
results = sql_security.execute_fujsen('database_security', 'safe_search', 'test')
# Should work safely with any search term
# Test safe insert
result = sql_security.execute_fujsen('database_security', 'safe_insert',
'newuser', 'new@example.com', 'hashed_password')
# Should work safely with any input
🔐 Authentication & Authorization
Secure Authentication
Secure authentication system
auth_security = TSK.from_string("""
[authentication]
hash_password_fujsen = '''
def hash_password(password):
import bcrypt
import secrets
# Generate salt
salt = bcrypt.gensalt(rounds=12)
# Hash password
password_bytes = password.encode('utf-8')
hashed = bcrypt.hashpw(password_bytes, salt)
return hashed.decode('utf-8')
'''verify_password_fujsen = '''
def verify_password(password, hashed_password):
import bcrypt
password_bytes = password.encode('utf-8')
hashed_bytes = hashed_password.encode('utf-8')
return bcrypt.checkpw(password_bytes, hashed_bytes)
'''
generate_token_fujsen = '''
def generate_token(user_id, expires_in=3600):
import jwt
import time
import secrets
# Generate secure secret
secret = secrets.token_urlsafe(32)
payload = {
'user_id': user_id,
'exp': time.time() + expires_in,
'iat': time.time(),
'jti': secrets.token_urlsafe(16) # JWT ID for uniqueness
}
token = jwt.encode(payload, secret, algorithm='HS256')
return {
'token': token,
'expires_in': expires_in,
'token_type': 'Bearer'
}
'''
verify_token_fujsen = '''
def verify_token(token):
import jwt
import time
try:
# Decode token
payload = jwt.decode(token, secret_key, algorithms=['HS256'])
# Check expiration
if payload['exp'] < time.time():
return {'valid': False, 'error': 'Token expired'}
# Get user info
user = query("SELECT id, username, email, status FROM users WHERE id = ?", payload['user_id'])
if not user:
return {'valid': False, 'error': 'User not found'}
if user[0][3] != 'active':
return {'valid': False, 'error': 'User account inactive'}
return {
'valid': True,
'user': {
'id': user[0][0],
'username': user[0][1],
'email': user[0][2]
}
}
except jwt.ExpiredSignatureError:
return {'valid': False, 'error': 'Token expired'}
except jwt.InvalidTokenError:
return {'valid': False, 'error': 'Invalid token'}
'''
[authorization]
check_permission_fujsen = '''
def check_permission(user_id, resource, action):
# Get user roles
roles = query("""
SELECT r.name FROM user_roles ur
JOIN roles r ON ur.role_id = r.id
WHERE ur.user_id = ?
""", user_id)
user_roles = [role[0] for role in roles]
# Check permissions
permissions = query("""
SELECT rp.permission FROM role_permissions rp
JOIN roles r ON rp.role_id = r.id
WHERE r.name IN ({})
AND rp.resource = ? AND rp.action = ?
""".format(','.join(['?' for _ in user_roles])), *user_roles, resource, action)
return len(permissions) > 0
'''
require_permission_fujsen = '''
def require_permission(user_id, resource, action):
has_permission = check_permission(user_id, resource, action)
if not has_permission:
raise PermissionError(f"Access denied: {action} on {resource}")
return True
'''
""")
🔐 Data Encryption
Field-Level Encryption
Data encryption system
encryption_security = TSK.from_string("""
[encryption]
encrypt_field_fujsen = '''
def encrypt_field(data, field_name):
from cryptography.fernet import Fernet
import base64
# Get encryption key for field
key = get_field_key(field_name)
fernet = Fernet(key)
# Encrypt data
data_bytes = str(data).encode('utf-8')
encrypted = fernet.encrypt(data_bytes)
return base64.b64encode(encrypted).decode('utf-8')
'''decrypt_field_fujsen = '''
def decrypt_field(encrypted_data, field_name):
from cryptography.fernet import Fernet
import base64
# Get encryption key for field
key = get_field_key(field_name)
fernet = Fernet(key)
# Decrypt data
encrypted_bytes = base64.b64decode(encrypted_data.encode('utf-8'))
decrypted = fernet.decrypt(encrypted_bytes)
return decrypted.decode('utf-8')
'''
get_field_key_fujsen = '''
def get_field_key(field_name):
# Get field-specific encryption key
key_data = query("SELECT encryption_key FROM field_keys WHERE field_name = ?", field_name)
if not key_data:
# Generate new key
from cryptography.fernet import Fernet
key = Fernet.generate_key()
execute("INSERT INTO field_keys (field_name, encryption_key) VALUES (?, ?)",
field_name, key.decode('utf-8'))
return key
return key_data[0][0].encode('utf-8')
'''
store_sensitive_data_fujsen = '''
def store_sensitive_data(user_id, data):
# Encrypt sensitive fields
encrypted_ssn = encrypt_field(data.get('ssn'), 'ssn')
encrypted_credit_card = encrypt_field(data.get('credit_card'), 'credit_card')
# Store encrypted data
record_id = execute("""
INSERT INTO sensitive_data (user_id, ssn_encrypted, credit_card_encrypted)
VALUES (?, ?, ?)
""", user_id, encrypted_ssn, encrypted_credit_card)
return {'id': record_id, 'encrypted': True}
'''
retrieve_sensitive_data_fujsen = '''
def retrieve_sensitive_data(user_id):
# Get encrypted data
data = query("""
SELECT ssn_encrypted, credit_card_encrypted
FROM sensitive_data
WHERE user_id = ?
""", user_id)
if not data:
return None
# Decrypt data
ssn = decrypt_field(data[0][0], 'ssn')
credit_card = decrypt_field(data[0][1], 'credit_card')
return {
'ssn': ssn,
'credit_card': credit_card
}
'''
""")
🔐 Rate Limiting & DDoS Protection
Rate Limiting Implementation
Rate limiting system
rate_limit_security = TSK.from_string("""
[rate_limiting]
check_rate_limit_fujsen = '''
def check_rate_limit(identifier, action, limit, window):
import time
current_time = time.time()
window_start = current_time - window
# Count recent requests
count = query("""
SELECT COUNT(*) FROM rate_limits
WHERE identifier = ? AND action = ? AND timestamp > ?
""", identifier, action, window_start)
current_count = count[0][0]
if current_count >= limit:
return {'allowed': False, 'remaining': 0, 'reset_time': window_start + window}
# Record this request
execute("""
INSERT INTO rate_limits (identifier, action, timestamp)
VALUES (?, ?, ?)
""", identifier, action, current_time)
return {'allowed': True, 'remaining': limit - current_count - 1, 'reset_time': window_start + window}
'''cleanup_rate_limits_fujsen = '''
def cleanup_rate_limits():
import time
# Remove old rate limit records (older than 1 hour)
cutoff_time = time.time() - 3600
execute("DELETE FROM rate_limits WHERE timestamp < ?", cutoff_time)
return {'cleaned': True, 'cutoff_time': cutoff_time}
'''
[ddos_protection]
detect_anomaly_fujsen = '''
def detect_anomaly(ip_address, time_window=300):
import time
current_time = time.time()
window_start = current_time - time_window
# Count requests from IP
count = query("""
SELECT COUNT(*) FROM request_log
WHERE ip_address = ? AND timestamp > ?
""", ip_address, window_start)
request_count = count[0][0]
# Check for anomaly (more than 100 requests in 5 minutes)
if request_count > 100:
# Log anomaly
execute("""
INSERT INTO security_events (ip_address, event_type, details, timestamp)
VALUES (?, 'ddos_attempt', ?, ?)
""", ip_address, f'{request_count} requests in {time_window}s', current_time)
return {'anomaly': True, 'request_count': request_count, 'action': 'block'}
return {'anomaly': False, 'request_count': request_count}
'''
block_ip_fujsen = '''
def block_ip(ip_address, reason, duration=3600):
import time
block_until = time.time() + duration
execute("""
INSERT OR REPLACE INTO ip_blocks (ip_address, reason, blocked_until, created_at)
VALUES (?, ?, ?, ?)
""", ip_address, reason, block_until, time.time())
return {'blocked': True, 'ip': ip_address, 'until': block_until}
'''
check_ip_blocked_fujsen = '''
def check_ip_blocked(ip_address):
import time
block = query("""
SELECT reason, blocked_until FROM ip_blocks
WHERE ip_address = ? AND blocked_until > ?
""", ip_address, time.time())
if block:
return {'blocked': True, 'reason': block[0][0], 'until': block[0][1]}
return {'blocked': False}
'''
""")
🔐 Audit Logging
Comprehensive Audit System
Audit logging system
audit_security = TSK.from_string("""
[audit]
log_security_event_fujsen = '''
def log_security_event(event_type, details, user_id=None, ip_address=None, severity='info'):
import time
import json
event_data = {
'event_type': event_type,
'details': details,
'user_id': user_id,
'ip_address': ip_address,
'severity': severity,
'timestamp': time.time()
}
# Log to database
event_id = execute("""
INSERT INTO security_audit_log
(event_type, details, user_id, ip_address, severity, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
""", event_type, json.dumps(details), user_id, ip_address, severity, time.time())
# Log to file for critical events
if severity in ['critical', 'high']:
log_to_file('security.log', event_data)
return {'logged': True, 'event_id': event_id}
'''log_user_action_fujsen = '''
def log_user_action(user_id, action, resource, details=None, ip_address=None):
import time
import json
# Log user action
log_id = execute("""
INSERT INTO user_audit_log
(user_id, action, resource, details, ip_address, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
""", user_id, action, resource, json.dumps(details) if details else None,
ip_address, time.time())
# Check for suspicious patterns
check_suspicious_activity(user_id, action, resource)
return {'logged': True, 'log_id': log_id}
'''
check_suspicious_activity_fujsen = '''
def check_suspicious_activity(user_id, action, resource):
import time
# Check for rapid repeated actions
recent_actions = query("""
SELECT COUNT(*) FROM user_audit_log
WHERE user_id = ? AND action = ? AND timestamp > ?
""", user_id, action, time.time() - 60) # Last minute
if recent_actions[0][0] > 10: # More than 10 actions per minute
log_security_event('suspicious_activity', {
'user_id': user_id,
'action': action,
'resource': resource,
'count': recent_actions[0][0]
}, user_id, severity='warning')
# Check for unusual resource access
unusual_resources = ['admin', 'config', 'system', 'password']
if any(resource in unusual_resources for resource in [resource]):
log_security_event('sensitive_resource_access', {
'user_id': user_id,
'action': action,
'resource': resource
}, user_id, severity='info')
'''
log_to_file_fujsen = '''
def log_to_file(filename, data):
import json
import time
log_entry = {
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'data': data
}
with open(filename, 'a') as f:
f.write(json.dumps(log_entry) + '\\n')
'''
get_audit_report_fujsen = '''
def get_audit_report(start_time=None, end_time=None, user_id=None, event_type=None):
import time
if not start_time:
start_time = time.time() - 86400 # Last 24 hours
if not end_time:
end_time = time.time()
# Build query
query_parts = ["SELECT * FROM security_audit_log WHERE timestamp BETWEEN ? AND ?"]
params = [start_time, end_time]
if user_id:
query_parts.append("AND user_id = ?")
params.append(user_id)
if event_type:
query_parts.append("AND event_type = ?")
params.append(event_type)
query_parts.append("ORDER BY timestamp DESC")
results = query(" ".join(query_parts), *params)
return [{
'id': row[0],
'event_type': row[1],
'details': row[2],
'user_id': row[3],
'ip_address': row[4],
'severity': row[5],
'timestamp': row[6]
} for row in results]
'''
""")
🔐 Configuration Security
Secure Configuration Management
Secure configuration
config_security = TSK.from_string("""
[secure_config]
load_encrypted_config_fujsen = '''
def load_encrypted_config(config_path, encryption_key):
import json
from cryptography.fernet import Fernet
# Read encrypted config
with open(config_path, 'rb') as f:
encrypted_data = f.read()
# Decrypt
fernet = Fernet(encryption_key)
decrypted_data = fernet.decrypt(encrypted_data)
# Parse JSON
config = json.loads(decrypted_data.decode('utf-8'))
return config
'''save_encrypted_config_fujsen = '''
def save_encrypted_config(config_data, config_path, encryption_key):
import json
from cryptography.fernet import Fernet
# Serialize config
config_json = json.dumps(config_data, indent=2)
# Encrypt
fernet = Fernet(encryption_key)
encrypted_data = fernet.encrypt(config_json.encode('utf-8'))
# Save encrypted config
with open(config_path, 'wb') as f:
f.write(encrypted_data)
return {'saved': True, 'path': config_path}
'''
validate_config_security_fujsen = '''
def validate_config_security(config):
security_issues = []
# Check for hardcoded secrets
sensitive_keys = ['password', 'secret', 'key', 'token', 'api_key']
def check_dict(d, path=''):
for key, value in d.items():
current_path = f"{path}.{key}" if path else key
if isinstance(value, dict):
check_dict(value, current_path)
elif isinstance(value, str):
key_lower = key.lower()
if any(sensitive in key_lower for sensitive in sensitive_keys):
if value and value != '*':
security_issues.append(f"Hardcoded secret found: {current_path}")
check_dict(config)
# Check for insecure defaults
if config.get('security', {}).get('debug', False):
security_issues.append("Debug mode enabled in production")
if config.get('database', {}).get('ssl', False) == False:
security_issues.append("Database SSL not enabled")
return {
'secure': len(security_issues) == 0,
'issues': security_issues
}
'''
rotate_secrets_fujsen = '''
def rotate_secrets():
import secrets
import time
# Generate new secrets
new_secret_key = secrets.token_urlsafe(32)
new_jwt_secret = secrets.token_urlsafe(32)
new_api_key = secrets.token_urlsafe(32)
# Update secrets in database
execute("""
INSERT OR REPLACE INTO secrets (key_name, secret_value, rotated_at)
VALUES
('secret_key', ?, ?),
('jwt_secret', ?, ?),
('api_key', ?, ?)
""", new_secret_key, time.time(), new_jwt_secret, time.time(),
new_api_key, time.time())
return {
'rotated': True,
'timestamp': time.time(),
'secrets': ['secret_key', 'jwt_secret', 'api_key']
}
'''
""")
🎯 Security Best Practices
1. Input Validation
- Always validate and sanitize all input data - Use parameterized queries to prevent SQL injection - Implement strict type checking - Validate file uploads and URLs2. Authentication & Authorization
- Use strong password hashing (bcrypt) - Implement proper session management - Use JWT tokens with short expiration - Implement role-based access control3. Data Protection
- Encrypt sensitive data at rest - Use HTTPS for all communications - Implement field-level encryption - Regular secret rotation4. Rate Limiting
- Implement rate limiting for all endpoints - Monitor for DDoS attacks - Block suspicious IP addresses - Log security events5. Audit Logging
- Log all security-relevant events - Monitor for suspicious activity - Regular security audits - Compliance reporting🚀 Next Steps
1. Implement input validation for all user inputs 2. Set up authentication with secure password hashing 3. Configure encryption for sensitive data 4. Enable rate limiting and DDoS protection 5. Set up audit logging for security monitoring
---
"We don't bow to any king" - TuskLang provides comprehensive security features to protect your applications from vulnerabilities. Implement proper validation, encryption, and monitoring to build secure, robust applications!