🦀 Async/Await Patterns in TuskLang with Rust

Rust Documentation

Async/Await Patterns in TuskLang with Rust

⚡ Async Foundation

Async/await patterns with TuskLang and Rust provide powerful concurrency primitives for building high-performance, non-blocking applications. This guide covers futures, streams, and advanced async programming techniques.

🏗️ Async Architecture

Async Runtime

[async_runtime]
tokio: true
async_std: true
smol: true
custom_runtime: true

[async_components] futures: "async_computations" streams: "async_iterators" channels: "async_communication" tasks: "async_execution"

Runtime Configuration

[runtime_configuration]
thread_pool: true
io_driver: true
time_driver: true
signal_driver: true

[runtime_implementation] use tokio::runtime::{Runtime, Builder};

pub struct AsyncRuntime { runtime: Runtime, }

impl AsyncRuntime { pub fn new() -> Result<Self, std::io::Error> { let runtime = Builder::new_multi_thread() .worker_threads(4) .enable_all() .build()?; Ok(Self { runtime }) } pub fn block_on<F>(&self, future: F) -> F::Output where F: std::future::Future, { self.runtime.block_on(future) } pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output> where F: std::future::Future + Send + 'static, F::Output: Send + 'static, { self.runtime.spawn(future) } }

🔄 Basic Async Patterns

Simple Async Functions

[basic_async_patterns]
async_functions: true
future_traits: true
await_keyword: true

[basic_implementation] use std::time::Duration; use tokio::time::sleep;

// Basic async function pub async fn simple_async_function() -> String { sleep(Duration::from_millis(100)).await; "Hello from async function".to_string() }

// Async function with parameters pub async fn async_with_params(name: &str, delay_ms: u64) -> String { sleep(Duration::from_millis(delay_ms)).await; format!("Hello, {}!", name) }

// Async function returning Result pub async fn async_result_function(input: i32) -> Result<i32, String> { if input < 0 { return Err("Input must be positive".to_string()); } sleep(Duration::from_millis(50)).await; Ok(input * 2) }

// Async function with multiple awaits pub async fn multiple_awaits() -> Vec<String> { let mut results = Vec::new(); // First async operation let result1 = async_with_params("Alice", 100).await; results.push(result1); // Second async operation let result2 = async_with_params("Bob", 200).await; results.push(result2); // Third async operation let result3 = async_with_params("Charlie", 300).await; results.push(result3); results }

// Async function with conditional awaits pub async fn conditional_async(condition: bool) -> String { if condition { sleep(Duration::from_millis(100)).await; "Condition was true".to_string() } else { "Condition was false".to_string() } }

Future Combinators

[future_combinators]
map: true
and_then: true
or_else: true
select: true

[combinator_implementation] use futures::future::{self, FutureExt, TryFutureExt}; use std::pin::Pin;

// Map combinator pub async fn future_map_example() -> i32 { let future = future::ready(5); future.map(|x| x * 2).await }

// And then combinator pub async fn future_and_then_example() -> Result<i32, String> { let future = future::ready(Ok(5)); future .and_then(|x| async move { Ok(x * 2) }) .await }

// Or else combinator pub async fn future_or_else_example() -> Result<i32, String> { let future = future::ready(Err("Original error".to_string())); future .or_else(|_| async move { Ok(42) }) .await }

// Select combinator pub async fn future_select_example() -> String { let future1 = async { sleep(Duration::from_millis(100)).await; "Future 1 completed" }; let future2 = async { sleep(Duration::from_millis(200)).await; "Future 2 completed" }; tokio::select! { result = future1 => result, result = future2 => result, } }

// Join combinator pub async fn future_join_example() -> (String, i32) { let future1 = async { "Hello".to_string() }; let future2 = async { 42 }; future::join(future1, future2).await }

// Try join combinator pub async fn future_try_join_example() -> Result<(String, i32), String> { let future1 = async { Ok("Hello".to_string()) }; let future2 = async { Ok(42) }; future::try_join(future1, future2).await }

🌊 Streams and Async Iterators

Basic Streams

[stream_patterns]
stream_traits: true
stream_processing: true
async_iterators: true

[stream_implementation] use futures::stream::{self, Stream, StreamExt}; use std::pin::Pin;

// Basic stream pub fn create_number_stream() -> impl Stream<Item = i32> { stream::iter(1..=10) }

// Async stream pub fn create_async_stream() -> impl Stream<Item = String> { stream::unfold(0, |state| async move { if state < 5 { let item = format!("Item {}", state); Some((item, state + 1)) } else { None } }) }

// Stream processing pub async fn process_stream() -> Vec<i32> { let stream = create_number_stream(); stream .map(|x| x * 2) .filter(|&x| x % 4 == 0) .collect::<Vec<_>>() .await }

// Stream with side effects pub async fn stream_with_side_effects() { let stream = create_async_stream(); stream .for_each(|item| async move { println!("Processing: {}", item); sleep(Duration::from_millis(100)).await; }) .await; }

// Stream batching pub async fn stream_batching() -> Vec<Vec<i32>> { let stream = create_number_stream(); stream .chunks(3) .collect::<Vec<_>>() .await }

// Stream timeout pub async fn stream_with_timeout() -> Result<Vec<i32>, tokio::time::error::Elapsed> { let stream = create_number_stream(); tokio::time::timeout( Duration::from_millis(500), stream.collect::<Vec<_>>() ).await }

Advanced Stream Patterns

[advanced_stream_patterns]
stream_merging: true
stream_splitting: true
backpressure: true

[advanced_stream_implementation] use futures::stream::{self, StreamExt}; use tokio::sync::mpsc;

// Merge multiple streams pub async fn merge_streams() -> Vec<i32> { let stream1 = stream::iter(1..=5); let stream2 = stream::iter(6..=10); let stream3 = stream::iter(11..=15); stream::select_all(vec![stream1, stream2, stream3]) .map(|(item, _index, _remaining)| item) .collect::<Vec<_>>() .await }

// Stream with backpressure pub async fn stream_with_backpressure() { let (tx, mut rx) = mpsc::channel(10); // Producer task let producer = tokio::spawn(async move { for i in 1..=100 { tx.send(i).await.unwrap(); sleep(Duration::from_millis(10)).await; } }); // Consumer task let consumer = tokio::spawn(async move { while let Some(item) = rx.recv().await { println!("Received: {}", item); sleep(Duration::from_millis(50)).await; // Slower consumer } }); // Wait for both tasks let _ = tokio::join!(producer, consumer); }

// Stream splitting pub async fn stream_splitting() { let stream = create_number_stream(); let (mut tx1, mut tx2) = stream.split(); let task1 = tokio::spawn(async move { while let Some(item) = tx1.next().await { println!("Task 1: {}", item); } }); let task2 = tokio::spawn(async move { while let Some(item) = tx2.next().await { println!("Task 2: {}", item); } }); tokio::join!(task1, task2); }

// Stream with error handling pub async fn stream_error_handling() -> Result<Vec<i32>, String> { let stream = stream::iter(1..=10) .map(|x| { if x == 5 { Err("Error at 5".to_string()) } else { Ok(x) } }); stream .try_collect::<Vec<_>>() .await }

🔗 Channels and Communication

Async Channels

[channel_patterns]
mpsc_channels: true
broadcast_channels: true
oneshot_channels: true

[channel_implementation] use tokio::sync::{mpsc, broadcast, oneshot};

// MPSC channel pub async fn mpsc_example() { let (tx, mut rx) = mpsc::channel(100); // Sender task let sender = tokio::spawn(async move { for i in 1..=10 { tx.send(i).await.unwrap(); sleep(Duration::from_millis(100)).await; } }); // Receiver task let receiver = tokio::spawn(async move { while let Some(item) = rx.recv().await { println!("Received: {}", item); } }); tokio::join!(sender, receiver); }

// Broadcast channel pub async fn broadcast_example() { let (tx, mut rx1) = broadcast::channel(10); let mut rx2 = tx.subscribe(); // Sender task let sender = tokio::spawn(async move { for i in 1..=5 { tx.send(i).unwrap(); sleep(Duration::from_millis(200)).await; } }); // Receiver 1 let receiver1 = tokio::spawn(async move { while let Ok(item) = rx1.recv().await { println!("Receiver 1: {}", item); } }); // Receiver 2 let receiver2 = tokio::spawn(async move { while let Ok(item) = rx2.recv().await { println!("Receiver 2: {}", item); } }); tokio::join!(sender, receiver1, receiver2); }

// Oneshot channel pub async fn oneshot_example() { let (tx, rx) = oneshot::channel(); // Sender task let sender = tokio::spawn(async move { sleep(Duration::from_millis(100)).await; tx.send("Hello from oneshot".to_string()).unwrap(); }); // Receiver task let receiver = tokio::spawn(async move { let message = rx.await.unwrap(); println!("Received: {}", message); }); tokio::join!(sender, receiver); }

// Channel with timeout pub async fn channel_with_timeout() -> Result<String, tokio::time::error::Elapsed> { let (tx, rx) = oneshot::channel(); // Sender task tokio::spawn(async move { sleep(Duration::from_millis(200)).await; let _ = tx.send("Delayed message".to_string()); }); // Receiver with timeout tokio::time::timeout(Duration::from_millis(100), rx).await .map_err(|_| tokio::time::error::Elapsed(())) .and_then(|result| result.map_err(|_| tokio::time::error::Elapsed(()))) }

🎯 Task Management

Task Spawning and Management

[task_management]
task_spawning: true
task_cancellation: true
task_supervision: true

[task_implementation] use tokio::task::{JoinHandle, spawn, spawn_blocking}; use std::sync::Arc; use tokio::sync::Mutex;

// Basic task spawning pub async fn basic_task_spawning() { let handle = spawn(async { sleep(Duration::from_millis(100)).await; "Task completed".to_string() }); let result = handle.await.unwrap(); println!("{}", result); }

// Task with cancellation pub async fn task_with_cancellation() { let handle = spawn(async { loop { sleep(Duration::from_millis(100)).await; println!("Task running..."); } }); // Cancel after 1 second sleep(Duration::from_secs(1)).await; handle.abort(); match handle.await { Ok(_) => println!("Task completed normally"), Err(err) if err.is_cancelled() => println!("Task was cancelled"), Err(err) => println!("Task failed: {:?}", err), } }

// Task supervision pub async fn task_supervision() { let shared_state = Arc::new(Mutex::new(0)); let mut handles = Vec::new(); for i in 0..5 { let state = Arc::clone(&shared_state); let handle = spawn(async move { loop { { let mut count = state.lock().await; *count += 1; println!("Task {}: count = {}", i, *count); } if let Err(_) = tokio::time::timeout( Duration::from_millis(100), sleep(Duration::from_millis(100)) ).await { break; } } }); handles.push(handle); } // Wait for all tasks for handle in handles { let _ = handle.await; } }

// Blocking task pub async fn blocking_task_example() { let handle = spawn_blocking(|| { // CPU-intensive work let mut sum = 0; for i in 1..=1_000_000 { sum += i; } sum }); let result = handle.await.unwrap(); println!("Sum: {}", result); }

// Task with error handling pub async fn task_error_handling() { let handle = spawn(async { if rand::random::<bool>() { Ok("Success".to_string()) } else { Err("Random error".to_string()) } }); match handle.await { Ok(Ok(result)) => println!("Task succeeded: {}", result), Ok(Err(err)) => println!("Task failed: {}", err), Err(err) => println!("Task panicked: {:?}", err), } }

🔄 Advanced Async Patterns

Async Traits and Generics

[advanced_async_patterns]
async_traits: true
async_generics: true
async_closures: true

[advanced_implementation] use std::future::Future; use async_trait::async_trait;

// Async trait #[async_trait] pub trait AsyncProcessor { async fn process(&self, input: String) -> Result<String, String>; async fn batch_process(&self, inputs: Vec<String>) -> Vec<Result<String, String>>; }

// Implementation pub struct SimpleProcessor;

#[async_trait] impl AsyncProcessor for SimpleProcessor { async fn process(&self, input: String) -> Result<String, String> { sleep(Duration::from_millis(100)).await; Ok(format!("Processed: {}", input)) } async fn batch_process(&self, inputs: Vec<String>) -> Vec<Result<String, String>> { let mut results = Vec::new(); for input in inputs { let result = self.process(input).await; results.push(result); } results } }

// Async generic function pub async fn async_generic<T, F, Fut>(items: Vec<T>, processor: F) -> Vec<T> where F: Fn(T) -> Fut + Send + Sync, Fut: Future<Output = T> + Send, T: Send + 'static, { let mut results = Vec::new(); for item in items { let processed = processor(item).await; results.push(processed); } results }

// Async closure pub async fn async_closure_example() { let processor = |input: i32| async move { sleep(Duration::from_millis(100)).await; input * 2 }; let items = vec![1, 2, 3, 4, 5]; let results = async_generic(items, processor).await; println!("Results: {:?}", results); }

// Async function with lifetime pub async fn async_with_lifetime<'a>(data: &'a str) -> String { sleep(Duration::from_millis(100)).await; format!("Processed: {}", data) }

Async Resource Management

[async_resource_management]
async_drop: true
resource_pools: true
connection_pools: true

[resource_implementation] use std::sync::Arc; use tokio::sync::Semaphore;

// Async resource with custom drop pub struct AsyncResource { data: String, }

impl AsyncResource { pub async fn new(data: String) -> Self { sleep(Duration::from_millis(50)).await; Self { data } } pub async fn process(&self) -> String { sleep(Duration::from_millis(100)).await; format!("Processed: {}", self.data) } }

impl Drop for AsyncResource { fn drop(&mut self) { // Note: Drop cannot be async, but we can spawn a task let data = self.data.clone(); tokio::spawn(async move { sleep(Duration::from_millis(50)).await; println!("Cleaned up resource: {}", data); }); } }

// Resource pool pub struct ResourcePool { semaphore: Arc<Semaphore>, resources: Arc<Mutex<Vec<AsyncResource>>>, }

impl ResourcePool { pub fn new(capacity: usize) -> Self { Self { semaphore: Arc::new(Semaphore::new(capacity)), resources: Arc::new(Mutex::new(Vec::new())), } } pub async fn acquire(&self) -> Option<AsyncResource> { let _permit = self.semaphore.acquire().await.ok()?; let mut resources = self.resources.lock().await; resources.pop() } pub async fn release(&self, resource: AsyncResource) { let mut resources = self.resources.lock().await; resources.push(resource); } }

// Connection pool example pub struct DatabaseConnection { id: u32, }

impl DatabaseConnection { pub async fn new(id: u32) -> Self { sleep(Duration::from_millis(100)).await; // Simulate connection time Self { id } } pub async fn query(&self, sql: &str) -> String { sleep(Duration::from_millis(50)).await; // Simulate query time format!("Result from connection {}: {}", self.id, sql) } }

pub struct ConnectionPool { connections: Arc<Mutex<Vec<DatabaseConnection>>>, max_connections: usize, }

impl ConnectionPool { pub async fn new(max_connections: usize) -> Self { let mut connections = Vec::new(); for i in 0..max_connections { connections.push(DatabaseConnection::new(i).await); } Self { connections: Arc::new(Mutex::new(connections)), max_connections, } } pub async fn get_connection(&self) -> Option<DatabaseConnection> { let mut connections = self.connections.lock().await; connections.pop() } pub async fn return_connection(&self, connection: DatabaseConnection) { let mut connections = self.connections.lock().await; if connections.len() < self.max_connections { connections.push(connection); } } }

🎯 Best Practices

1. Async Function Design

- Keep async functions focused and composable - Use proper error types and Result handling - Avoid blocking operations in async contexts - Use appropriate timeouts and cancellation

2. Resource Management

- Implement proper cleanup in async contexts - Use connection pools for expensive resources - Handle backpressure in streams - Monitor task lifecycle and cancellation

3. Performance Optimization

- Use appropriate concurrency levels - Avoid spawning too many tasks - Use blocking tasks for CPU-intensive work - Profile async performance regularly

4. Error Handling

- Implement comprehensive error types - Use proper error propagation - Handle task cancellation gracefully - Monitor and log async errors

5. Testing

- Test async functions thoroughly - Use async test utilities - Mock async dependencies - Test error conditions and timeouts

Async/await patterns with TuskLang and Rust provide powerful concurrency primitives for building high-performance, non-blocking applications with excellent resource management and error handling.