🦀 Serverless Architecture in TuskLang with Rust
Serverless Architecture in TuskLang with Rust
☁️ Serverless Foundation
Serverless architecture with TuskLang and Rust provides scalable, cost-effective, and event-driven computing solutions. This guide covers AWS Lambda, Azure Functions, and advanced serverless patterns.
🏗️ Architecture Overview
Serverless Stack
[serverless_architecture]
aws_lambda: true
azure_functions: true
google_cloud_functions: true
knative: true[serverless_components]
lambda_runtime: "aws_lambda"
azure_functions: "azure_functions"
serde: "serialization"
tokio: "async_runtime"
Serverless Configuration
[serverless_configuration]
enable_aws_lambda: true
enable_azure_functions: true
enable_cold_start_optimization: true
enable_monitoring: true[serverless_implementation]
use lambda_runtime::{service_fn, LambdaEvent, Error};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use std::sync::Arc;
// Serverless function manager
pub struct ServerlessManager {
config: ServerlessConfig,
functions: Arc<RwLock<HashMap<String, FunctionInfo>>>,
metrics: Arc<RwLock<FunctionMetrics>>,
}
#[derive(Debug, Clone)]
pub struct ServerlessConfig {
pub provider: CloudProvider,
pub region: String,
pub memory_size: u32,
pub timeout: Duration,
pub environment_variables: HashMap<String, String>,
pub vpc_config: Option<VpcConfig>,
}
#[derive(Debug, Clone)]
pub enum CloudProvider {
AWS,
Azure,
GoogleCloud,
}
#[derive(Debug, Clone)]
pub struct VpcConfig {
pub subnet_ids: Vec<String>,
pub security_group_ids: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct FunctionInfo {
pub name: String,
pub handler: String,
pub runtime: String,
pub memory_size: u32,
pub timeout: Duration,
pub environment_variables: HashMap<String, String>,
pub last_updated: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone)]
pub struct FunctionMetrics {
pub invocations: u64,
pub errors: u64,
pub duration_ms: u64,
pub cold_starts: u64,
}
impl ServerlessManager {
pub fn new(config: ServerlessConfig) -> Self {
Self {
config,
functions: Arc::new(RwLock::new(HashMap::new())),
metrics: Arc::new(RwLock::new(FunctionMetrics {
invocations: 0,
errors: 0,
duration_ms: 0,
cold_starts: 0,
})),
}
}
pub async fn deploy_function(&self, function: FunctionInfo) -> Result<(), ServerlessError> {
let mut functions = self.functions.write().await;
functions.insert(function.name.clone(), function);
info!("Function deployed: {}", function.name);
Ok(())
}
pub async fn invoke_function(&self, name: &str, payload: serde_json::Value) -> Result<serde_json::Value, ServerlessError> {
let start_time = std::time::Instant::now();
// Update metrics
{
let mut metrics = self.metrics.write().await;
metrics.invocations += 1;
}
// Simulate function invocation
let result = match name {
"user-service" => self.handle_user_service(payload).await,
"order-service" => self.handle_order_service(payload).await,
"notification-service" => self.handle_notification_service(payload).await,
_ => Err(ServerlessError::FunctionNotFound { name: name.to_string() }),
};
let duration = start_time.elapsed();
// Update metrics
{
let mut metrics = self.metrics.write().await;
metrics.duration_ms += duration.as_millis() as u64;
if result.is_err() {
metrics.errors += 1;
}
}
result
}
async fn handle_user_service(&self, payload: serde_json::Value) -> Result<serde_json::Value, ServerlessError> {
// Simulate user service logic
let user_id = payload["user_id"].as_str().unwrap_or("unknown");
let response = serde_json::json!({
"user_id": user_id,
"username": format!("user_{}", user_id),
"email": format!("user_{}@example.com", user_id),
"status": "active"
});
Ok(response)
}
async fn handle_order_service(&self, payload: serde_json::Value) -> Result<serde_json::Value, ServerlessError> {
// Simulate order service logic
let order_id = payload["order_id"].as_str().unwrap_or("unknown");
let response = serde_json::json!({
"order_id": order_id,
"status": "processing",
"total": 99.99,
"items": vec![
{"product_id": "prod_1", "quantity": 2, "price": 49.99}
]
});
Ok(response)
}
async fn handle_notification_service(&self, payload: serde_json::Value) -> Result<serde_json::Value, ServerlessError> {
// Simulate notification service logic
let message = payload["message"].as_str().unwrap_or("Hello!");
let recipient = payload["recipient"].as_str().unwrap_or("user@example.com");
let response = serde_json::json!({
"status": "sent",
"message_id": uuid::Uuid::new_v4().to_string(),
"recipient": recipient,
"message": message
});
Ok(response)
}
pub async fn get_metrics(&self) -> FunctionMetrics {
self.metrics.read().await.clone()
}
}
#[derive(Debug, thiserror::Error)]
pub enum ServerlessError {
#[error("Function not found: {name}")]
FunctionNotFound { name: String },
#[error("Deployment failed: {message}")]
DeploymentFailed { message: String },
#[error("Invocation failed: {message}")]
InvocationFailed { message: String },
#[error("Configuration error: {message}")]
ConfigError { message: String },
}
🚀 AWS Lambda
AWS Lambda Implementation
[aws_lambda]
lambda_runtime: true
api_gateway: true
dynamodb: true
s3: true[lambda_implementation]
use lambda_runtime::{service_fn, LambdaEvent, Error};
use aws_lambda_events::{
apigw::{ApiGatewayProxyRequest, ApiGatewayProxyResponse},
dynamodb::DynamoDbEvent,
s3::S3Event,
};
use serde::{Deserialize, Serialize};
// Lambda handler for API Gateway
pub async fn api_gateway_handler(
event: LambdaEvent<ApiGatewayProxyRequest>,
) -> Result<ApiGatewayProxyResponse, Error> {
let request = event.payload;
let path = request.path.unwrap_or_default();
let method = request.http_method.as_str();
info!("Received {} request to {}", method, path);
let response = match (method, path.as_str()) {
("GET", "/users") => handle_get_users().await,
("POST", "/users") => handle_create_user(request).await,
("GET", path) if path.starts_with("/users/") => {
let user_id = path.strip_prefix("/users/").unwrap_or("");
handle_get_user(user_id).await
}
_ => {
let response = ApiGatewayProxyResponse {
status_code: 404,
headers: HashMap::new(),
multi_value_headers: HashMap::new(),
body: Some("Not Found".into()),
is_base64_encoded: false,
};
Ok(response)
}
};
response.map_err(|e| Error::from(e))
}
async fn handle_get_users() -> Result<ApiGatewayProxyResponse, Box<dyn std::error::Error>> {
let users = vec![
serde_json::json!({
"id": "1",
"username": "john_doe",
"email": "john@example.com"
}),
serde_json::json!({
"id": "2",
"username": "jane_smith",
"email": "jane@example.com"
})
];
let response = ApiGatewayProxyResponse {
status_code: 200,
headers: HashMap::new(),
multi_value_headers: HashMap::new(),
body: Some(serde_json::to_string(&users)?.into()),
is_base64_encoded: false,
};
Ok(response)
}
async fn handle_create_user(request: ApiGatewayProxyRequest) -> Result<ApiGatewayProxyResponse, Box<dyn std::error::Error>> {
let body = request.body.unwrap_or_default();
let user_data: CreateUserRequest = serde_json::from_str(&body)?;
// Validate input
if user_data.username.is_empty() || user_data.email.is_empty() {
let error_response = serde_json::json!({
"error": "Username and email are required"
});
let response = ApiGatewayProxyResponse {
status_code: 400,
headers: HashMap::new(),
multi_value_headers: HashMap::new(),
body: Some(serde_json::to_string(&error_response)?.into()),
is_base64_encoded: false,
};
return Ok(response);
}
// Create user (simulated)
let user = serde_json::json!({
"id": uuid::Uuid::new_v4().to_string(),
"username": user_data.username,
"email": user_data.email,
"created_at": chrono::Utc::now().to_rfc3339()
});
let response = ApiGatewayProxyResponse {
status_code: 201,
headers: HashMap::new(),
multi_value_headers: HashMap::new(),
body: Some(serde_json::to_string(&user)?.into()),
is_base64_encoded: false,
};
Ok(response)
}
async fn handle_get_user(user_id: &str) -> Result<ApiGatewayProxyResponse, Box<dyn std::error::Error>> {
if user_id.is_empty() {
let error_response = serde_json::json!({
"error": "User ID is required"
});
let response = ApiGatewayProxyResponse {
status_code: 400,
headers: HashMap::new(),
multi_value_headers: HashMap::new(),
body: Some(serde_json::to_string(&error_response)?.into()),
is_base64_encoded: false,
};
return Ok(response);
}
// Simulate user lookup
let user = serde_json::json!({
"id": user_id,
"username": format!("user_{}", user_id),
"email": format!("user_{}@example.com", user_id),
"status": "active"
});
let response = ApiGatewayProxyResponse {
status_code: 200,
headers: HashMap::new(),
multi_value_headers: HashMap::new(),
body: Some(serde_json::to_string(&user)?.into()),
is_base64_encoded: false,
};
Ok(response)
}
#[derive(Debug, Deserialize)]
pub struct CreateUserRequest {
pub username: String,
pub email: String,
}
// Lambda handler for DynamoDB events
pub async fn dynamodb_handler(event: LambdaEvent<DynamoDbEvent>) -> Result<(), Error> {
for record in event.payload.records {
info!("Processing DynamoDB record: {:?}", record);
match record.event_name.as_str() {
"INSERT" => handle_dynamodb_insert(record).await?,
"MODIFY" => handle_dynamodb_modify(record).await?,
"REMOVE" => handle_dynamodb_remove(record).await?,
_ => warn!("Unknown event type: {}", record.event_name),
}
}
Ok(())
}
async fn handle_dynamodb_insert(record: aws_lambda_events::dynamodb::DynamoDbStreamRecord) -> Result<(), Box<dyn std::error::Error>> {
info!("Handling DynamoDB INSERT event");
if let Some(new_image) = record.new_image {
// Process new item
if let Some(user_id) = new_image.get("id").and_then(|v| v.as_str()) {
info!("New user created: {}", user_id);
// Send notification
let notification = serde_json::json!({
"type": "user_created",
"user_id": user_id,
"timestamp": chrono::Utc::now().to_rfc3339()
});
// Publish to SNS or send to another Lambda
info!("Notification sent: {:?}", notification);
}
}
Ok(())
}
async fn handle_dynamodb_modify(record: aws_lambda_events::dynamodb::DynamoDbStreamRecord) -> Result<(), Box<dyn std::error::Error>> {
info!("Handling DynamoDB MODIFY event");
if let (Some(old_image), Some(new_image)) = (record.old_image, record.new_image) {
// Process modified item
if let Some(user_id) = new_image.get("id").and_then(|v| v.as_str()) {
info!("User modified: {}", user_id);
// Check what changed
if let Some(old_status) = old_image.get("status").and_then(|v| v.as_str()) {
if let Some(new_status) = new_image.get("status").and_then(|v| v.as_str()) {
if old_status != new_status {
info!("User status changed from {} to {}", old_status, new_status);
}
}
}
}
}
Ok(())
}
async fn handle_dynamodb_remove(record: aws_lambda_events::dynamodb::DynamoDbStreamRecord) -> Result<(), Box<dyn std::error::Error>> {
info!("Handling DynamoDB REMOVE event");
if let Some(old_image) = record.old_image {
// Process deleted item
if let Some(user_id) = old_image.get("id").and_then(|v| v.as_str()) {
info!("User deleted: {}", user_id);
// Clean up related data
let cleanup = serde_json::json!({
"type": "user_deleted",
"user_id": user_id,
"timestamp": chrono::Utc::now().to_rfc3339()
});
info!("Cleanup initiated: {:?}", cleanup);
}
}
Ok(())
}
// Lambda handler for S3 events
pub async fn s3_handler(event: LambdaEvent<S3Event>) -> Result<(), Error> {
for record in event.payload.records {
info!("Processing S3 event: {:?}", record);
let bucket_name = record.s3.bucket.name;
let object_key = record.s3.object.key.unwrap_or_default();
let event_name = record.event_name;
match event_name.as_str() {
"ObjectCreated:Put" => handle_s3_upload(bucket_name, object_key).await?,
"ObjectRemoved:Delete" => handle_s3_delete(bucket_name, object_key).await?,
_ => warn!("Unknown S3 event: {}", event_name),
}
}
Ok(())
}
async fn handle_s3_upload(bucket_name: String, object_key: String) -> Result<(), Box<dyn std::error::Error>> {
info!("Handling S3 upload: {} in bucket {}", object_key, bucket_name);
// Process uploaded file
if object_key.ends_with(".jpg") || object_key.ends_with(".png") {
// Image processing
let image_processing = serde_json::json!({
"type": "image_uploaded",
"bucket": bucket_name,
"key": object_key,
"timestamp": chrono::Utc::now().to_rfc3339()
});
info!("Image processing initiated: {:?}", image_processing);
} else if object_key.ends_with(".json") {
// JSON data processing
let data_processing = serde_json::json!({
"type": "data_uploaded",
"bucket": bucket_name,
"key": object_key,
"timestamp": chrono::Utc::now().to_rfc3339()
});
info!("Data processing initiated: {:?}", data_processing);
}
Ok(())
}
async fn handle_s3_delete(bucket_name: String, object_key: String) -> Result<(), Box<dyn std::error::Error>> {
info!("Handling S3 delete: {} from bucket {}", object_key, bucket_name);
// Clean up related data
let cleanup = serde_json::json!({
"type": "file_deleted",
"bucket": bucket_name,
"key": object_key,
"timestamp": chrono::Utc::now().to_rfc3339()
});
info!("Cleanup initiated: {:?}", cleanup);
Ok(())
}
// Lambda main function
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_target(false)
.without_time()
.init();
lambda_runtime::run(service_fn(api_gateway_handler)).await
}
🔷 Azure Functions
Azure Functions Implementation
[azure_functions]
http_triggers: true
timer_triggers: true
blob_triggers: true
cosmos_db_triggers: true[azure_implementation]
use azure_functions::{
bindings::{HttpRequest, HttpResponse, HttpTrigger},
func,
FunctionContext,
};
use serde::{Deserialize, Serialize};
// HTTP trigger function
#[func(name = "user-service")]
#[binding(name = "req", route = "users/{id?}", auth_level = "anonymous")]
pub fn user_service(req: HttpRequest) -> HttpResponse {
let path = req.uri().path();
let method = req.method().as_str();
info!("Received {} request to {}", method, path);
match (method, path) {
("GET", "/api/users") => handle_get_users(),
("POST", "/api/users") => handle_create_user(req),
(method, path) if path.starts_with("/api/users/") && method == "GET" => {
let user_id = path.strip_prefix("/api/users/").unwrap_or("");
handle_get_user(user_id)
}
_ => {
HttpResponse::builder()
.status(404)
.body("Not Found")
.unwrap()
}
}
}
fn handle_get_users() -> HttpResponse {
let users = vec![
serde_json::json!({
"id": "1",
"username": "john_doe",
"email": "john@example.com"
}),
serde_json::json!({
"id": "2",
"username": "jane_smith",
"email": "jane@example.com"
})
];
HttpResponse::builder()
.status(200)
.header("Content-Type", "application/json")
.body(serde_json::to_string(&users).unwrap())
.unwrap()
}
fn handle_create_user(req: HttpRequest) -> HttpResponse {
// Parse request body
let body = req.body();
let user_data: Result<CreateUserRequest, _> = serde_json::from_slice(body);
match user_data {
Ok(user_data) => {
// Validate input
if user_data.username.is_empty() || user_data.email.is_empty() {
let error_response = serde_json::json!({
"error": "Username and email are required"
});
return HttpResponse::builder()
.status(400)
.header("Content-Type", "application/json")
.body(serde_json::to_string(&error_response).unwrap())
.unwrap();
}
// Create user (simulated)
let user = serde_json::json!({
"id": uuid::Uuid::new_v4().to_string(),
"username": user_data.username,
"email": user_data.email,
"created_at": chrono::Utc::now().to_rfc3339()
});
HttpResponse::builder()
.status(201)
.header("Content-Type", "application/json")
.body(serde_json::to_string(&user).unwrap())
.unwrap()
}
Err(_) => {
let error_response = serde_json::json!({
"error": "Invalid request body"
});
HttpResponse::builder()
.status(400)
.header("Content-Type", "application/json")
.body(serde_json::to_string(&error_response).unwrap())
.unwrap()
}
}
}
fn handle_get_user(user_id: &str) -> HttpResponse {
if user_id.is_empty() {
let error_response = serde_json::json!({
"error": "User ID is required"
});
return HttpResponse::builder()
.status(400)
.header("Content-Type", "application/json")
.body(serde_json::to_string(&error_response).unwrap())
.unwrap();
}
// Simulate user lookup
let user = serde_json::json!({
"id": user_id,
"username": format!("user_{}", user_id),
"email": format!("user_{}@example.com", user_id),
"status": "active"
});
HttpResponse::builder()
.status(200)
.header("Content-Type", "application/json")
.body(serde_json::to_string(&user).unwrap())
.unwrap()
}
// Timer trigger function
#[func(name = "scheduled-task")]
#[binding(name = "timer", schedule = "0 /5 *")] // Every 5 minutes
pub fn scheduled_task(timer: azure_functions::bindings::TimerInfo) -> () {
info!("Scheduled task triggered at: {}", timer.schedule_status.last);
// Perform scheduled task
let task_result = perform_scheduled_task();
match task_result {
Ok(_) => info!("Scheduled task completed successfully"),
Err(e) => error!("Scheduled task failed: {}", e),
}
}
fn perform_scheduled_task() -> Result<(), Box<dyn std::error::Error>> {
// Simulate scheduled task
info!("Performing scheduled task...");
// Database cleanup
info!("Cleaning up old records...");
// Data processing
info!("Processing batch data...");
// Send notifications
info!("Sending scheduled notifications...");
Ok(())
}
// Blob trigger function
#[func(name = "blob-processor")]
#[binding(name = "blob", path = "uploads/{name}", connection = "AzureWebJobsStorage")]
pub fn blob_processor(blob: azure_functions::bindings::Blob) -> () {
let blob_name = blob.name;
let blob_size = blob.data.len();
info!("Processing blob: {} (size: {} bytes)", blob_name, blob_size);
// Process blob based on type
if blob_name.ends_with(".jpg") || blob_name.ends_with(".png") {
process_image_blob(&blob);
} else if blob_name.ends_with(".json") {
process_json_blob(&blob);
} else if blob_name.ends_with(".csv") {
process_csv_blob(&blob);
} else {
warn!("Unknown blob type: {}", blob_name);
}
}
fn process_image_blob(blob: &azure_functions::bindings::Blob) {
info!("Processing image blob: {}", blob.name);
// Image processing logic
let image_processing = serde_json::json!({
"type": "image_processing",
"blob_name": blob.name,
"size": blob.data.len(),
"timestamp": chrono::Utc::now().to_rfc3339()
});
info!("Image processing result: {:?}", image_processing);
}
fn process_json_blob(blob: &azure_functions::bindings::Blob) {
info!("Processing JSON blob: {}", blob.name);
// Parse JSON data
match serde_json::from_slice::<serde_json::Value>(&blob.data) {
Ok(data) => {
info!("JSON data processed: {:?}", data);
// Process the data
let processing_result = serde_json::json!({
"type": "json_processing",
"blob_name": blob.name,
"records_processed": 42,
"timestamp": chrono::Utc::now().to_rfc3339()
});
info!("JSON processing result: {:?}", processing_result);
}
Err(e) => {
error!("Failed to parse JSON blob: {}", e);
}
}
}
fn process_csv_blob(blob: &azure_functions::bindings::Blob) {
info!("Processing CSV blob: {}", blob.name);
// Parse CSV data
let csv_data = String::from_utf8_lossy(&blob.data);
let lines: Vec<&str> = csv_data.lines().collect();
info!("CSV has {} lines", lines.len());
// Process CSV data
let processing_result = serde_json::json!({
"type": "csv_processing",
"blob_name": blob.name,
"lines_processed": lines.len(),
"timestamp": chrono::Utc::now().to_rfc3339()
});
info!("CSV processing result: {:?}", processing_result);
}
#[derive(Debug, Deserialize)]
pub struct CreateUserRequest {
pub username: String,
pub email: String,
}
🎯 Best Practices
1. Function Design
- Keep functions small and focused - Use cold start optimization techniques - Implement proper error handling - Use environment variables for configuration2. Performance Optimization
- Minimize cold start times - Use connection pooling for databases - Implement caching strategies - Optimize memory usage3. Security
- Use least privilege access - Implement proper authentication - Validate all inputs - Use secure environment variables4. Monitoring and Observability
- Implement comprehensive logging - Use distributed tracing - Monitor function metrics - Set up proper alerting5. TuskLang Integration
- Use TuskLang configuration for function parameters - Configure environment variables through TuskLang - Implement function routing with TuskLang - Use TuskLang for serverless deployment configurationServerless architecture with TuskLang and Rust provides scalable, cost-effective, and event-driven computing solutions with comprehensive patterns for cloud-native applications.