🦀 Async Operations in TuskLang for Rust

Rust Documentation

Async Operations in TuskLang for Rust

Async operations in TuskLang for Rust leverage Rust's powerful async/await syntax and the tokio ecosystem to provide high-performance, non-blocking I/O operations while maintaining type safety and memory safety.

Basic Async Operations

// Simple async function
async fn fetch_user_data(user_id: u32) -> Result<User, Box<dyn std::error::Error + Send + Sync>> {
    let response = @http.get(&format!("https://api.example.com/users/{}", user_id)).await?;
    let user: User = serde_json::from_str(&response.text().await?)?;
    Ok(user)
}

// Async function with TuskLang @ operators async fn process_user_data(user_id: u32) -> Result<ProcessedData, Box<dyn std::error::Error + Send + Sync>> { // Parallel async operations let (user, preferences, history) = tokio::try_join!( @db.users.find(user_id), @cache.get(&format!("user_prefs:{}", user_id)), @api.user_history(user_id) )?; // Async database operations let processed = @db.transaction(async |tx| { let updated_user = @db.users.update(user_id, &user).await?; let analytics = @analytics.track("user_processed", &updated_user).await?; Ok((updated_user, analytics)) }).await?; Ok(ProcessedData::new(processed.0, processed.1)) }

Async Database Operations

// Async database queries with TuskLang
struct UserRepository {
    db: Database,
}

impl UserRepository { // Async find with caching async fn find_user(&self, id: u32) -> Result<Option<User>, Box<dyn std::error::Error + Send + Sync>> { // Check cache first if let Some(cached_user) = @cache.get(&format!("user:{}", id)).await? { return Ok(Some(cached_user)); } // Database query let user = @db.table("users") .where("id", id) .first() .await?; // Cache the result if let Some(ref user) = user { @cache.set(&format!("user:{}", id), user, 3600).await?; } Ok(user) } // Batch async operations async fn find_users_by_ids(&self, ids: &[u32]) -> Result<Vec<User>, Box<dyn std::error::Error + Send + Sync>> { let mut users = Vec::new(); let mut futures = Vec::new(); // Create futures for parallel execution for &id in ids { futures.push(self.find_user(id)); } // Execute all futures concurrently let results = futures::future::join_all(futures).await; for result in results { if let Ok(Some(user)) = result { users.push(user); } } Ok(users) } // Async transactions async fn create_user_with_profile(&self, user_data: UserData, profile_data: ProfileData) -> Result<User, Box<dyn std::error::Error + Send + Sync>> { @db.transaction(async |tx| { // Create user let user_id = @db.table("users") .insert_get_id(&user_data) .await?; // Create profile let profile = Profile { user_id, ..profile_data }; @db.table("profiles") .insert(&profile) .await?; // Update cache @cache.set(&format!("user:{}", user_id), &user_data, 3600).await?; Ok(User::new(user_id, user_data)) }).await } }

Async HTTP Operations

// Async HTTP client with TuskLang
struct ApiClient {
    client: reqwest::Client,
    base_url: String,
}

impl ApiClient { // Async GET request async fn get<T>(&self, endpoint: &str) -> Result<T, Box<dyn std::error::Error + Send + Sync>> where T: for<'de> serde::Deserialize<'de>, { let url = format!("{}{}", self.base_url, endpoint); let response = @http.get(&url) .header("Authorization", &format!("Bearer {}", @env.get("API_TOKEN")?)) .timeout(std::time::Duration::from_secs(30)) .send() .await?; if response.status().is_success() { let data = response.json::<T>().await?; Ok(data) } else { Err(format!("HTTP error: {}", response.status()).into()) } } // Async POST request with retry logic async fn post<T, U>(&self, endpoint: &str, data: &T) -> Result<U, Box<dyn std::error::Error + Send + Sync>> where T: serde::Serialize, U: for<'de> serde::Deserialize<'de>, { let url = format!("{}{}", self.base_url, endpoint); // Retry logic with exponential backoff let mut attempt = 0; let max_attempts = 3; loop { match @http.post(&url) .json(data) .header("Content-Type", "application/json") .timeout(std::time::Duration::from_secs(30)) .send() .await { Ok(response) => { if response.status().is_success() { return Ok(response.json::<U>().await?); } else { return Err(format!("HTTP error: {}", response.status()).into()); } } Err(e) => { attempt += 1; if attempt >= max_attempts { return Err(e.into()); } // Exponential backoff let delay = std::time::Duration::from_secs(2_u64.pow(attempt)); tokio::time::sleep(delay).await; } } } } // Concurrent API calls async fn fetch_multiple_endpoints(&self, endpoints: &[&str]) -> Result<Vec<serde_json::Value>, Box<dyn std::error::Error + Send + Sync>> { let mut futures = Vec::new(); for endpoint in endpoints { futures.push(self.get::<serde_json::Value>(endpoint)); } let results = futures::future::join_all(futures).await; let mut responses = Vec::new(); for result in results { match result { Ok(data) => responses.push(data), Err(e) => { @log.error(&format!("Failed to fetch endpoint: {}", e)); // Continue with other responses } } } Ok(responses) } }

Async File Operations

// Async file operations with TuskLang
struct FileProcessor {
    base_path: std::path::PathBuf,
}

impl FileProcessor { // Async file reading async fn read_file(&self, filename: &str) -> Result<String, Box<dyn std::error::Error + Send + Sync>> { let file_path = self.base_path.join(filename); // Use TuskLang file operations let content = @file.read(&file_path.to_string_lossy()).await?; Ok(content) } // Async file writing with atomic operations async fn write_file_atomic(&self, filename: &str, content: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { let file_path = self.base_path.join(filename); let temp_path = file_path.with_extension("tmp"); // Write to temporary file first @file.write(&temp_path.to_string_lossy(), content).await?; // Atomic rename tokio::fs::rename(&temp_path, &file_path).await?; Ok(()) } // Async directory processing async fn process_directory(&self, dir_path: &str) -> Result<Vec<ProcessedFile>, Box<dyn std::error::Error + Send + Sync>> { let entries = @file.read_dir(dir_path).await?; let mut futures = Vec::new(); for entry in entries { if entry.is_file() { let file_path = entry.path(); futures.push(self.process_single_file(&file_path)); } } let results = futures::future::join_all(futures).await; let mut processed_files = Vec::new(); for result in results { match result { Ok(processed) => processed_files.push(processed), Err(e) => { @log.error(&format!("Failed to process file: {}", e)); } } } Ok(processed_files) } async fn process_single_file(&self, file_path: &std::path::Path) -> Result<ProcessedFile, Box<dyn std::error::Error + Send + Sync>> { let content = @file.read(&file_path.to_string_lossy()).await?; let metadata = @file.metadata(&file_path.to_string_lossy()).await?; // Process content asynchronously let processed_content = self.process_content(&content).await?; Ok(ProcessedFile { path: file_path.to_path_buf(), content: processed_content, size: metadata.len(), modified: metadata.modified()?, }) } async fn process_content(&self, content: &str) -> Result<String, Box<dyn std::error::Error + Send + Sync>> { // Simulate async processing tokio::time::sleep(std::time::Duration::from_millis(10)).await; // Apply transformations let processed = content .lines() .map(|line| line.trim()) .filter(|line| !line.is_empty()) .collect::<Vec<_>>() .join("\n"); Ok(processed) } }

Async Stream Processing

// Async stream processing with TuskLang
use tokio_stream::StreamExt;
use futures::stream;

struct StreamProcessor { batch_size: usize, }

impl StreamProcessor { // Process data streams asynchronously async fn process_data_stream(&self, data_stream: impl Stream<Item = DataItem> + Unpin) -> Result<Vec<ProcessedItem>, Box<dyn std::error::Error + Send + Sync>> { let mut processed_items = Vec::new(); let mut batch = Vec::new(); tokio::pin!(data_stream); while let Some(item) = data_stream.next().await { batch.push(item); if batch.len() >= self.batch_size { let processed_batch = self.process_batch(&batch).await?; processed_items.extend(processed_batch); batch.clear(); } } // Process remaining items if !batch.is_empty() { let processed_batch = self.process_batch(&batch).await?; processed_items.extend(processed_batch); } Ok(processed_items) } async fn process_batch(&self, batch: &[DataItem]) -> Result<Vec<ProcessedItem>, Box<dyn std::error::Error + Send + Sync>> { let mut futures = Vec::new(); for item in batch { futures.push(self.process_single_item(item)); } let results = futures::future::join_all(futures).await; let mut processed_items = Vec::new(); for result in results { match result { Ok(processed) => processed_items.push(processed), Err(e) => { @log.error(&format!("Failed to process item: {}", e)); } } } Ok(processed_items) } async fn process_single_item(&self, item: &DataItem) -> Result<ProcessedItem, Box<dyn std::error::Error + Send + Sync>> { // Async processing with TuskLang operators let enriched_data = @cache.remember(&format!("item:{}", item.id), 3600, || async { @api.enrich_data(&item.raw_data).await }).await?; let validated_data = @validator.validate(&enriched_data).await?; Ok(ProcessedItem { id: item.id, data: validated_data, processed_at: @date.now(), }) } // Generate async streams async fn generate_data_stream(&self, source: &str) -> impl Stream<Item = DataItem> { stream::unfold(0, move |offset| { let source = source.to_string(); async move { let items = @api.fetch_data(&source, offset, 100).await.ok()?; if items.is_empty() { None } else { Some((items, offset + items.len())) } } }) .flat_map(|items| stream::iter(items)) } }

Async Background Tasks

// Async background task processing
struct BackgroundTaskProcessor {
    task_queue: tokio::sync::mpsc::UnboundedSender<BackgroundTask>,
    worker_handles: Vec<tokio::task::JoinHandle<()>>,
}

impl BackgroundTaskProcessor { fn new(worker_count: usize) -> Self { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let mut worker_handles = Vec::new(); for worker_id in 0..worker_count { let rx = rx.clone(); let handle = tokio::spawn(async move { Self::worker_loop(worker_id, rx).await; }); worker_handles.push(handle); } Self { task_queue: tx, worker_handles, } } async fn worker_loop(worker_id: usize, mut rx: tokio::sync::mpsc::UnboundedReceiver<BackgroundTask>) { while let Some(task) = rx.recv().await { @log.info(&format!("Worker {} processing task: {:?}", worker_id, task.id)); match Self::process_task(task).await { Ok(_) => { @log.info(&format!("Worker {} completed task: {:?}", worker_id, task.id)); } Err(e) => { @log.error(&format!("Worker {} failed task: {:?}: {}", worker_id, task.id, e)); } } } } async fn process_task(task: BackgroundTask) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { match task.task_type { TaskType::EmailSend { to, subject, body } => { @email.send(to, subject, body).await?; } TaskType::DataProcessing { data_id } => { let data = @db.data.find(data_id).await?; let processed = @processor.process(&data).await?; @db.processed_data.insert(&processed).await?; } TaskType::ReportGeneration { report_id } => { let report = @reporter.generate(report_id).await?; @file.write(&format!("reports/{}.pdf", report_id), &report).await?; } } Ok(()) } async fn enqueue_task(&self, task: BackgroundTask) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { self.task_queue.send(task)?; Ok(()) } async fn shutdown(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { // Drop the sender to close the channel drop(self.task_queue); // Wait for all workers to finish for handle in self.worker_handles { handle.await?; } Ok(()) } }

Async Error Handling

// Comprehensive async error handling
struct AsyncErrorHandler;

impl AsyncErrorHandler { // Retry with exponential backoff async fn retry_with_backoff<F, T, E>(mut operation: F, max_retries: usize) -> Result<T, E> where F: FnMut() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>, E: std::fmt::Display, { let mut attempt = 0; loop { match operation().await { Ok(result) => return Ok(result), Err(e) => { attempt += 1; if attempt >= max_retries { return Err(e); } @log.warn(&format!("Attempt {} failed: {}. Retrying...", attempt, e)); let delay = std::time::Duration::from_secs(2_u64.pow(attempt as u32)); tokio::time::sleep(delay).await; } } } } // Timeout wrapper async fn with_timeout<F, T>(future: F, timeout: std::time::Duration) -> Result<T, Box<dyn std::error::Error + Send + Sync>> where F: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>, { match tokio::time::timeout(timeout, future).await { Ok(result) => result, Err(_) => Err("Operation timed out".into()), } } // Circuit breaker pattern async fn with_circuit_breaker<F, T>(operation: F, breaker: &CircuitBreaker) -> Result<T, Box<dyn std::error::Error + Send + Sync>> where F: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>, { if breaker.is_open() { return Err("Circuit breaker is open".into()); } match operation.await { Ok(result) => { breaker.record_success(); Ok(result) } Err(e) => { breaker.record_failure(); Err(e) } } } }

// Circuit breaker implementation struct CircuitBreaker { failure_threshold: usize, failure_count: std::sync::atomic::AtomicUsize, last_failure_time: std::sync::Mutex<Option<std::time::Instant>>, timeout: std::time::Duration, }

impl CircuitBreaker { fn new(failure_threshold: usize, timeout: std::time::Duration) -> Self { Self { failure_threshold, failure_count: std::sync::atomic::AtomicUsize::new(0), last_failure_time: std::sync::Mutex::new(None), timeout, } } fn is_open(&self) -> bool { let failure_count = self.failure_count.load(std::sync::atomic::Ordering::Relaxed); if failure_count >= self.failure_threshold { if let Ok(last_failure) = self.last_failure_time.lock() { if let Some(time) = *last_failure { if time.elapsed() < self.timeout { return true; } } } // Reset if timeout has passed self.failure_count.store(0, std::sync::atomic::Ordering::Relaxed); } false } fn record_success(&self) { self.failure_count.store(0, std::sync::atomic::Ordering::Relaxed); } fn record_failure(&self) { self.failure_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if let Ok(mut last_failure) = self.last_failure_time.lock() { *last_failure = Some(std::time::Instant::now()); } } }

Async Performance Optimization

// Async performance optimization techniques
struct AsyncOptimizer;

impl AsyncOptimizer { // Connection pooling async fn with_connection_pool<F, T>(pool: &ConnectionPool, operation: F) -> Result<T, Box<dyn std::error::Error + Send + Sync>> where F: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>, { let connection = pool.acquire().await?; let result = operation.await; pool.release(connection).await; result } // Async caching with TTL async fn cached_operation<F, T>(cache: &Cache, key: &str, ttl: u64, operation: F) -> Result<T, Box<dyn std::error::Error + Send + Sync>> where F: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>, T: Clone + serde::Serialize + for<'de> serde::Deserialize<'de>, { if let Some(cached) = @cache.get(key).await? { return Ok(cached); } let result = operation.await?; @cache.set(key, &result, ttl).await?; Ok(result) } // Batch processing async fn batch_process<T, U, F>(items: Vec<T>, batch_size: usize, processor: F) -> Result<Vec<U>, Box<dyn std::error::Error + Send + Sync>> where F: Fn(Vec<T>) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<U>, Box<dyn std::error::Error + Send + Sync>>> + Send>> + Send + Sync, { let mut results = Vec::new(); let mut batches = items.chunks(batch_size); while let Some(batch) = batches.next() { let batch_result = processor(batch.to_vec()).await?; results.extend(batch_result); } Ok(results) } // Rate limiting async fn rate_limited_operation<F, T>(limiter: &RateLimiter, operation: F) -> Result<T, Box<dyn std::error::Error + Send + Sync>> where F: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>, { limiter.acquire().await?; operation.await } }

// Rate limiter implementation struct RateLimiter { tokens: std::sync::atomic::AtomicU32, max_tokens: u32, refill_rate: f64, // tokens per second last_refill: std::sync::Mutex<std::time::Instant>, }

impl RateLimiter { fn new(max_tokens: u32, refill_rate: f64) -> Self { Self { tokens: std::sync::atomic::AtomicU32::new(max_tokens), max_tokens, refill_rate, last_refill: std::sync::Mutex::new(std::time::Instant::now()), } } async fn acquire(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { loop { self.refill_tokens(); let current_tokens = self.tokens.load(std::sync::atomic::Ordering::Relaxed); if current_tokens > 0 { if self.tokens.compare_exchange( current_tokens, current_tokens - 1, std::sync::atomic::Ordering::Relaxed, std::sync::atomic::Ordering::Relaxed, ).is_ok() { return Ok(()); } } tokio::time::sleep(std::time::Duration::from_millis(10)).await; } } fn refill_tokens(&self) { if let Ok(mut last_refill) = self.last_refill.lock() { let now = std::time::Instant::now(); let elapsed = now.duration_since(*last_refill).as_secs_f64(); let tokens_to_add = (elapsed * self.refill_rate) as u32; if tokens_to_add > 0 { let current_tokens = self.tokens.load(std::sync::atomic::Ordering::Relaxed); let new_tokens = std::cmp::min(current_tokens + tokens_to_add, self.max_tokens); self.tokens.store(new_tokens, std::sync::atomic::Ordering::Relaxed); *last_refill = now; } } } }

Async Testing

#[cfg(test)]
mod async_tests {
    use super::*;
    use tokio_test;
    
    #[tokio::test]
    async fn test_async_database_operations() {
        let repo = UserRepository::new();
        let user = repo.find_user(1).await.unwrap();
        
        assert!(user.is_some());
    }
    
    #[tokio::test]
    async fn test_async_http_operations() {
        let client = ApiClient::new("https://api.example.com".to_string());
        let data: serde_json::Value = client.get("/test").await.unwrap();
        
        assert!(data.is_object());
    }
    
    #[tokio::test]
    async fn test_async_stream_processing() {
        let processor = StreamProcessor::new(10);
        let data_stream = stream::iter(vec![
            DataItem::new(1, "data1"),
            DataItem::new(2, "data2"),
            DataItem::new(3, "data3"),
        ]);
        
        let processed = processor.process_data_stream(data_stream).await.unwrap();
        
        assert_eq!(processed.len(), 3);
    }
    
    #[tokio::test]
    async fn test_async_error_handling() {
        let handler = AsyncErrorHandler;
        let operation = || Box::pin(async {
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            Ok::<i32, String>(42)
        });
        
        let result = handler.retry_with_backoff(operation, 3).await.unwrap();
        assert_eq!(result, 42);
    }
}

This comprehensive guide covers Rust-specific async operations, leveraging Rust's async/await syntax and the tokio ecosystem while integrating seamlessly with TuskLang's @ operators and features.