🦀 Async Operations in TuskLang for Rust
Async Operations in TuskLang for Rust
Async operations in TuskLang for Rust leverage Rust's powerful async/await syntax and the tokio ecosystem to provide high-performance, non-blocking I/O operations while maintaining type safety and memory safety.
Basic Async Operations
// Simple async function
async fn fetch_user_data(user_id: u32) -> Result<User, Box<dyn std::error::Error + Send + Sync>> {
let response = @http.get(&format!("https://api.example.com/users/{}", user_id)).await?;
let user: User = serde_json::from_str(&response.text().await?)?;
Ok(user)
}// Async function with TuskLang @ operators
async fn process_user_data(user_id: u32) -> Result<ProcessedData, Box<dyn std::error::Error + Send + Sync>> {
// Parallel async operations
let (user, preferences, history) = tokio::try_join!(
@db.users.find(user_id),
@cache.get(&format!("user_prefs:{}", user_id)),
@api.user_history(user_id)
)?;
// Async database operations
let processed = @db.transaction(async |tx| {
let updated_user = @db.users.update(user_id, &user).await?;
let analytics = @analytics.track("user_processed", &updated_user).await?;
Ok((updated_user, analytics))
}).await?;
Ok(ProcessedData::new(processed.0, processed.1))
}
Async Database Operations
// Async database queries with TuskLang
struct UserRepository {
db: Database,
}impl UserRepository {
// Async find with caching
async fn find_user(&self, id: u32) -> Result<Option<User>, Box<dyn std::error::Error + Send + Sync>> {
// Check cache first
if let Some(cached_user) = @cache.get(&format!("user:{}", id)).await? {
return Ok(Some(cached_user));
}
// Database query
let user = @db.table("users")
.where("id", id)
.first()
.await?;
// Cache the result
if let Some(ref user) = user {
@cache.set(&format!("user:{}", id), user, 3600).await?;
}
Ok(user)
}
// Batch async operations
async fn find_users_by_ids(&self, ids: &[u32]) -> Result<Vec<User>, Box<dyn std::error::Error + Send + Sync>> {
let mut users = Vec::new();
let mut futures = Vec::new();
// Create futures for parallel execution
for &id in ids {
futures.push(self.find_user(id));
}
// Execute all futures concurrently
let results = futures::future::join_all(futures).await;
for result in results {
if let Ok(Some(user)) = result {
users.push(user);
}
}
Ok(users)
}
// Async transactions
async fn create_user_with_profile(&self, user_data: UserData, profile_data: ProfileData)
-> Result<User, Box<dyn std::error::Error + Send + Sync>> {
@db.transaction(async |tx| {
// Create user
let user_id = @db.table("users")
.insert_get_id(&user_data)
.await?;
// Create profile
let profile = Profile {
user_id,
..profile_data
};
@db.table("profiles")
.insert(&profile)
.await?;
// Update cache
@cache.set(&format!("user:{}", user_id), &user_data, 3600).await?;
Ok(User::new(user_id, user_data))
}).await
}
}
Async HTTP Operations
// Async HTTP client with TuskLang
struct ApiClient {
client: reqwest::Client,
base_url: String,
}impl ApiClient {
// Async GET request
async fn get<T>(&self, endpoint: &str) -> Result<T, Box<dyn std::error::Error + Send + Sync>>
where
T: for<'de> serde::Deserialize<'de>,
{
let url = format!("{}{}", self.base_url, endpoint);
let response = @http.get(&url)
.header("Authorization", &format!("Bearer {}", @env.get("API_TOKEN")?))
.timeout(std::time::Duration::from_secs(30))
.send()
.await?;
if response.status().is_success() {
let data = response.json::<T>().await?;
Ok(data)
} else {
Err(format!("HTTP error: {}", response.status()).into())
}
}
// Async POST request with retry logic
async fn post<T, U>(&self, endpoint: &str, data: &T) -> Result<U, Box<dyn std::error::Error + Send + Sync>>
where
T: serde::Serialize,
U: for<'de> serde::Deserialize<'de>,
{
let url = format!("{}{}", self.base_url, endpoint);
// Retry logic with exponential backoff
let mut attempt = 0;
let max_attempts = 3;
loop {
match @http.post(&url)
.json(data)
.header("Content-Type", "application/json")
.timeout(std::time::Duration::from_secs(30))
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
return Ok(response.json::<U>().await?);
} else {
return Err(format!("HTTP error: {}", response.status()).into());
}
}
Err(e) => {
attempt += 1;
if attempt >= max_attempts {
return Err(e.into());
}
// Exponential backoff
let delay = std::time::Duration::from_secs(2_u64.pow(attempt));
tokio::time::sleep(delay).await;
}
}
}
}
// Concurrent API calls
async fn fetch_multiple_endpoints(&self, endpoints: &[&str]) -> Result<Vec<serde_json::Value>, Box<dyn std::error::Error + Send + Sync>> {
let mut futures = Vec::new();
for endpoint in endpoints {
futures.push(self.get::<serde_json::Value>(endpoint));
}
let results = futures::future::join_all(futures).await;
let mut responses = Vec::new();
for result in results {
match result {
Ok(data) => responses.push(data),
Err(e) => {
@log.error(&format!("Failed to fetch endpoint: {}", e));
// Continue with other responses
}
}
}
Ok(responses)
}
}
Async File Operations
// Async file operations with TuskLang
struct FileProcessor {
base_path: std::path::PathBuf,
}impl FileProcessor {
// Async file reading
async fn read_file(&self, filename: &str) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let file_path = self.base_path.join(filename);
// Use TuskLang file operations
let content = @file.read(&file_path.to_string_lossy()).await?;
Ok(content)
}
// Async file writing with atomic operations
async fn write_file_atomic(&self, filename: &str, content: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let file_path = self.base_path.join(filename);
let temp_path = file_path.with_extension("tmp");
// Write to temporary file first
@file.write(&temp_path.to_string_lossy(), content).await?;
// Atomic rename
tokio::fs::rename(&temp_path, &file_path).await?;
Ok(())
}
// Async directory processing
async fn process_directory(&self, dir_path: &str) -> Result<Vec<ProcessedFile>, Box<dyn std::error::Error + Send + Sync>> {
let entries = @file.read_dir(dir_path).await?;
let mut futures = Vec::new();
for entry in entries {
if entry.is_file() {
let file_path = entry.path();
futures.push(self.process_single_file(&file_path));
}
}
let results = futures::future::join_all(futures).await;
let mut processed_files = Vec::new();
for result in results {
match result {
Ok(processed) => processed_files.push(processed),
Err(e) => {
@log.error(&format!("Failed to process file: {}", e));
}
}
}
Ok(processed_files)
}
async fn process_single_file(&self, file_path: &std::path::Path) -> Result<ProcessedFile, Box<dyn std::error::Error + Send + Sync>> {
let content = @file.read(&file_path.to_string_lossy()).await?;
let metadata = @file.metadata(&file_path.to_string_lossy()).await?;
// Process content asynchronously
let processed_content = self.process_content(&content).await?;
Ok(ProcessedFile {
path: file_path.to_path_buf(),
content: processed_content,
size: metadata.len(),
modified: metadata.modified()?,
})
}
async fn process_content(&self, content: &str) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
// Simulate async processing
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
// Apply transformations
let processed = content
.lines()
.map(|line| line.trim())
.filter(|line| !line.is_empty())
.collect::<Vec<_>>()
.join("\n");
Ok(processed)
}
}
Async Stream Processing
// Async stream processing with TuskLang
use tokio_stream::StreamExt;
use futures::stream;struct StreamProcessor {
batch_size: usize,
}
impl StreamProcessor {
// Process data streams asynchronously
async fn process_data_stream(&self, data_stream: impl Stream<Item = DataItem> + Unpin)
-> Result<Vec<ProcessedItem>, Box<dyn std::error::Error + Send + Sync>> {
let mut processed_items = Vec::new();
let mut batch = Vec::new();
tokio::pin!(data_stream);
while let Some(item) = data_stream.next().await {
batch.push(item);
if batch.len() >= self.batch_size {
let processed_batch = self.process_batch(&batch).await?;
processed_items.extend(processed_batch);
batch.clear();
}
}
// Process remaining items
if !batch.is_empty() {
let processed_batch = self.process_batch(&batch).await?;
processed_items.extend(processed_batch);
}
Ok(processed_items)
}
async fn process_batch(&self, batch: &[DataItem]) -> Result<Vec<ProcessedItem>, Box<dyn std::error::Error + Send + Sync>> {
let mut futures = Vec::new();
for item in batch {
futures.push(self.process_single_item(item));
}
let results = futures::future::join_all(futures).await;
let mut processed_items = Vec::new();
for result in results {
match result {
Ok(processed) => processed_items.push(processed),
Err(e) => {
@log.error(&format!("Failed to process item: {}", e));
}
}
}
Ok(processed_items)
}
async fn process_single_item(&self, item: &DataItem) -> Result<ProcessedItem, Box<dyn std::error::Error + Send + Sync>> {
// Async processing with TuskLang operators
let enriched_data = @cache.remember(&format!("item:{}", item.id), 3600, || async {
@api.enrich_data(&item.raw_data).await
}).await?;
let validated_data = @validator.validate(&enriched_data).await?;
Ok(ProcessedItem {
id: item.id,
data: validated_data,
processed_at: @date.now(),
})
}
// Generate async streams
async fn generate_data_stream(&self, source: &str) -> impl Stream<Item = DataItem> {
stream::unfold(0, move |offset| {
let source = source.to_string();
async move {
let items = @api.fetch_data(&source, offset, 100).await.ok()?;
if items.is_empty() {
None
} else {
Some((items, offset + items.len()))
}
}
})
.flat_map(|items| stream::iter(items))
}
}
Async Background Tasks
// Async background task processing
struct BackgroundTaskProcessor {
task_queue: tokio::sync::mpsc::UnboundedSender<BackgroundTask>,
worker_handles: Vec<tokio::task::JoinHandle<()>>,
}impl BackgroundTaskProcessor {
fn new(worker_count: usize) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let mut worker_handles = Vec::new();
for worker_id in 0..worker_count {
let rx = rx.clone();
let handle = tokio::spawn(async move {
Self::worker_loop(worker_id, rx).await;
});
worker_handles.push(handle);
}
Self {
task_queue: tx,
worker_handles,
}
}
async fn worker_loop(worker_id: usize, mut rx: tokio::sync::mpsc::UnboundedReceiver<BackgroundTask>) {
while let Some(task) = rx.recv().await {
@log.info(&format!("Worker {} processing task: {:?}", worker_id, task.id));
match Self::process_task(task).await {
Ok(_) => {
@log.info(&format!("Worker {} completed task: {:?}", worker_id, task.id));
}
Err(e) => {
@log.error(&format!("Worker {} failed task: {:?}: {}", worker_id, task.id, e));
}
}
}
}
async fn process_task(task: BackgroundTask) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match task.task_type {
TaskType::EmailSend { to, subject, body } => {
@email.send(to, subject, body).await?;
}
TaskType::DataProcessing { data_id } => {
let data = @db.data.find(data_id).await?;
let processed = @processor.process(&data).await?;
@db.processed_data.insert(&processed).await?;
}
TaskType::ReportGeneration { report_id } => {
let report = @reporter.generate(report_id).await?;
@file.write(&format!("reports/{}.pdf", report_id), &report).await?;
}
}
Ok(())
}
async fn enqueue_task(&self, task: BackgroundTask) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.task_queue.send(task)?;
Ok(())
}
async fn shutdown(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Drop the sender to close the channel
drop(self.task_queue);
// Wait for all workers to finish
for handle in self.worker_handles {
handle.await?;
}
Ok(())
}
}
Async Error Handling
// Comprehensive async error handling
struct AsyncErrorHandler;impl AsyncErrorHandler {
// Retry with exponential backoff
async fn retry_with_backoff<F, T, E>(mut operation: F, max_retries: usize) -> Result<T, E>
where
F: FnMut() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>,
E: std::fmt::Display,
{
let mut attempt = 0;
loop {
match operation().await {
Ok(result) => return Ok(result),
Err(e) => {
attempt += 1;
if attempt >= max_retries {
return Err(e);
}
@log.warn(&format!("Attempt {} failed: {}. Retrying...", attempt, e));
let delay = std::time::Duration::from_secs(2_u64.pow(attempt as u32));
tokio::time::sleep(delay).await;
}
}
}
}
// Timeout wrapper
async fn with_timeout<F, T>(future: F, timeout: std::time::Duration) -> Result<T, Box<dyn std::error::Error + Send + Sync>>
where
F: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>,
{
match tokio::time::timeout(timeout, future).await {
Ok(result) => result,
Err(_) => Err("Operation timed out".into()),
}
}
// Circuit breaker pattern
async fn with_circuit_breaker<F, T>(operation: F, breaker: &CircuitBreaker) -> Result<T, Box<dyn std::error::Error + Send + Sync>>
where
F: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>,
{
if breaker.is_open() {
return Err("Circuit breaker is open".into());
}
match operation.await {
Ok(result) => {
breaker.record_success();
Ok(result)
}
Err(e) => {
breaker.record_failure();
Err(e)
}
}
}
}
// Circuit breaker implementation
struct CircuitBreaker {
failure_threshold: usize,
failure_count: std::sync::atomic::AtomicUsize,
last_failure_time: std::sync::Mutex<Option<std::time::Instant>>,
timeout: std::time::Duration,
}
impl CircuitBreaker {
fn new(failure_threshold: usize, timeout: std::time::Duration) -> Self {
Self {
failure_threshold,
failure_count: std::sync::atomic::AtomicUsize::new(0),
last_failure_time: std::sync::Mutex::new(None),
timeout,
}
}
fn is_open(&self) -> bool {
let failure_count = self.failure_count.load(std::sync::atomic::Ordering::Relaxed);
if failure_count >= self.failure_threshold {
if let Ok(last_failure) = self.last_failure_time.lock() {
if let Some(time) = *last_failure {
if time.elapsed() < self.timeout {
return true;
}
}
}
// Reset if timeout has passed
self.failure_count.store(0, std::sync::atomic::Ordering::Relaxed);
}
false
}
fn record_success(&self) {
self.failure_count.store(0, std::sync::atomic::Ordering::Relaxed);
}
fn record_failure(&self) {
self.failure_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut last_failure) = self.last_failure_time.lock() {
*last_failure = Some(std::time::Instant::now());
}
}
}
Async Performance Optimization
// Async performance optimization techniques
struct AsyncOptimizer;impl AsyncOptimizer {
// Connection pooling
async fn with_connection_pool<F, T>(pool: &ConnectionPool, operation: F) -> Result<T, Box<dyn std::error::Error + Send + Sync>>
where
F: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>,
{
let connection = pool.acquire().await?;
let result = operation.await;
pool.release(connection).await;
result
}
// Async caching with TTL
async fn cached_operation<F, T>(cache: &Cache, key: &str, ttl: u64, operation: F) -> Result<T, Box<dyn std::error::Error + Send + Sync>>
where
F: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>,
T: Clone + serde::Serialize + for<'de> serde::Deserialize<'de>,
{
if let Some(cached) = @cache.get(key).await? {
return Ok(cached);
}
let result = operation.await?;
@cache.set(key, &result, ttl).await?;
Ok(result)
}
// Batch processing
async fn batch_process<T, U, F>(items: Vec<T>, batch_size: usize, processor: F) -> Result<Vec<U>, Box<dyn std::error::Error + Send + Sync>>
where
F: Fn(Vec<T>) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<U>, Box<dyn std::error::Error + Send + Sync>>> + Send>> + Send + Sync,
{
let mut results = Vec::new();
let mut batches = items.chunks(batch_size);
while let Some(batch) = batches.next() {
let batch_result = processor(batch.to_vec()).await?;
results.extend(batch_result);
}
Ok(results)
}
// Rate limiting
async fn rate_limited_operation<F, T>(limiter: &RateLimiter, operation: F) -> Result<T, Box<dyn std::error::Error + Send + Sync>>
where
F: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>,
{
limiter.acquire().await?;
operation.await
}
}
// Rate limiter implementation
struct RateLimiter {
tokens: std::sync::atomic::AtomicU32,
max_tokens: u32,
refill_rate: f64, // tokens per second
last_refill: std::sync::Mutex<std::time::Instant>,
}
impl RateLimiter {
fn new(max_tokens: u32, refill_rate: f64) -> Self {
Self {
tokens: std::sync::atomic::AtomicU32::new(max_tokens),
max_tokens,
refill_rate,
last_refill: std::sync::Mutex::new(std::time::Instant::now()),
}
}
async fn acquire(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
loop {
self.refill_tokens();
let current_tokens = self.tokens.load(std::sync::atomic::Ordering::Relaxed);
if current_tokens > 0 {
if self.tokens.compare_exchange(
current_tokens,
current_tokens - 1,
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
).is_ok() {
return Ok(());
}
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
fn refill_tokens(&self) {
if let Ok(mut last_refill) = self.last_refill.lock() {
let now = std::time::Instant::now();
let elapsed = now.duration_since(*last_refill).as_secs_f64();
let tokens_to_add = (elapsed * self.refill_rate) as u32;
if tokens_to_add > 0 {
let current_tokens = self.tokens.load(std::sync::atomic::Ordering::Relaxed);
let new_tokens = std::cmp::min(current_tokens + tokens_to_add, self.max_tokens);
self.tokens.store(new_tokens, std::sync::atomic::Ordering::Relaxed);
*last_refill = now;
}
}
}
}
Async Testing
#[cfg(test)]
mod async_tests {
use super::*;
use tokio_test;
#[tokio::test]
async fn test_async_database_operations() {
let repo = UserRepository::new();
let user = repo.find_user(1).await.unwrap();
assert!(user.is_some());
}
#[tokio::test]
async fn test_async_http_operations() {
let client = ApiClient::new("https://api.example.com".to_string());
let data: serde_json::Value = client.get("/test").await.unwrap();
assert!(data.is_object());
}
#[tokio::test]
async fn test_async_stream_processing() {
let processor = StreamProcessor::new(10);
let data_stream = stream::iter(vec![
DataItem::new(1, "data1"),
DataItem::new(2, "data2"),
DataItem::new(3, "data3"),
]);
let processed = processor.process_data_stream(data_stream).await.unwrap();
assert_eq!(processed.len(), 3);
}
#[tokio::test]
async fn test_async_error_handling() {
let handler = AsyncErrorHandler;
let operation = || Box::pin(async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok::<i32, String>(42)
});
let result = handler.retry_with_backoff(operation, 3).await.unwrap();
assert_eq!(result, 42);
}
}
This comprehensive guide covers Rust-specific async operations, leveraging Rust's async/await syntax and the tokio ecosystem while integrating seamlessly with TuskLang's @ operators and features.