🐍 🔄 Data Pipelines & ETL - Python
🔄 Data Pipelines & ETL - Python
"We don't bow to any king" - Data Engineering Edition
TuskLang empowers you to build robust, scalable data pipelines and ETL (Extract, Transform, Load) workflows for analytics, machine learning, and automation.
🚀 Data Pipeline Concepts
- Extract: Ingest data from multiple sources (databases, APIs, files, streams) - Transform: Clean, enrich, and reshape data (validation, mapping, aggregation) - Load: Store processed data in target systems (databases, data lakes, warehouses) - Orchestration: Schedule, monitor, and manage pipeline execution
🏗️ Building ETL Pipelines with TuskLang
Basic ETL Pipeline
from tsk import TSK
import pandas as pdETL configuration
etl_config = TSK.from_string("""
[etl]
Data sources
source_db: @env("SOURCE_DB", "sqlite:///source.db")
target_db: @env("TARGET_DB", "sqlite:///target.db")Extract step
extract_data_fujsen = '''
def extract_data(query):
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine(source_db)
df = pd.read_sql(query, engine)
return df.to_dict(orient='records')
'''Transform step
transform_data_fujsen = '''
def transform_data(records):
# Example: filter, map, and enrich
transformed = []
for rec in records:
if rec['active']:
rec['full_name'] = f"{rec['first_name']} {rec['last_name']}"
rec['signup_year'] = int(rec['signup_date'][:4])
transformed.append(rec)
return transformed
'''Load step
load_data_fujsen = '''
def load_data(records, table):
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine(target_db)
df = pd.DataFrame(records)
df.to_sql(table, engine, if_exists='replace', index=False)
return {'rows_loaded': len(df)}
'''
""")Example ETL run
source_query = "SELECT * FROM users"
extracted = etl_config.execute_fujsen('etl', 'extract_data', source_query)
transformed = etl_config.execute_fujsen('etl', 'transform_data', extracted)
result = etl_config.execute_fujsen('etl', 'load_data', transformed, 'users_cleaned')
print(f"ETL complete: {result['rows_loaded']} rows loaded.")
🧩 Advanced Data Processing
Batch vs. Streaming
- Batch: Process large datasets on a schedule (nightly, hourly) - Streaming: Process data in real-time as it arrives (Kafka, Redis Streams)Data Validation & Cleansing
Data validation configuration
data_validation = TSK.from_string("""
[data_validation]
validate_record_fujsen = '''
def validate_record(record):
errors = []
if not record.get('email') or '@' not in record['email']:
errors.append('Invalid email')
if record.get('age', 0) < 0:
errors.append('Negative age')
if not record.get('signup_date'):
errors.append('Missing signup_date')
return {'valid': not errors, 'errors': errors}
'''cleanse_data_fujsen = '''
def cleanse_data(records):
cleaned = []
for rec in records:
validation = validate_record(rec)
if validation['valid']:
cleaned.append(rec)
return cleaned
'''
""")
Data Enrichment
Data enrichment configuration
data_enrichment = TSK.from_string("""
[data_enrichment]
enrich_with_geo_fujsen = '''
def enrich_with_geo(records):
for rec in records:
# Example: add country based on IP (mocked)
rec['country'] = 'USA' if rec.get('ip', '').startswith('192.') else 'Unknown'
return records
'''
""")
🔄 Orchestrating Pipelines
Scheduling with Airflow
airflow_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from tsk import TSKdef run_etl():
etl_config = TSK.from_file('etl.tsk')
source_query = "SELECT * FROM users"
extracted = etl_config.execute_fujsen('etl', 'extract_data', source_query)
transformed = etl_config.execute_fujsen('etl', 'transform_data', extracted)
result = etl_config.execute_fujsen('etl', 'load_data', transformed, 'users_cleaned')
print(f"ETL complete: {result['rows_loaded']} rows loaded.")
dag = DAG('tusk_etl', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
etl_task = PythonOperator(
task_id='run_etl',
python_callable=run_etl,
dag=dag
)
Orchestrating with Prefect
from prefect import task, flow
from tsk import TSK@task
def extract():
etl_config = TSK.from_file('etl.tsk')
return etl_config.execute_fujsen('etl', 'extract_data', "SELECT * FROM users")
@task
def transform(records):
etl_config = TSK.from_file('etl.tsk')
return etl_config.execute_fujsen('etl', 'transform_data', records)
@task
def load(records):
etl_config = TSK.from_file('etl.tsk')
return etl_config.execute_fujsen('etl', 'load_data', records, 'users_cleaned')
@flow
def tusk_etl_flow():
data = extract()
clean = transform(data)
result = load(clean)
print(result)
tusk_etl_flow()
📊 Data Pipeline Monitoring
- Track pipeline runs, failures, and data quality metrics - Log row counts, error rates, and processing times - Use TSK to store and query pipeline metadata
🎯 Data Pipeline Best Practices
- Validate and cleanse all input data - Use idempotent ETL steps for re-runs - Log all pipeline activity and errors - Parameterize pipeline configuration with TSK - Monitor pipeline health and data quality - Document pipeline logic and data contracts
🚀 Next Steps
1. Define your ETL steps in TSK 2. Build Python ETL scripts using TSK and pandas 3. Add validation, cleansing, and enrichment 4. Orchestrate pipelines with Airflow or Prefect 5. Monitor and document your data pipelines
---
"We don't bow to any king" - TuskLang empowers you to build robust, scalable data pipelines and ETL workflows. Extract, transform, load, and orchestrate your data with confidence!