🦀 Async/Await Patterns in TuskLang with Rust
Async/Await Patterns in TuskLang with Rust
⚡ Async Foundation
Async/await patterns with TuskLang and Rust provide powerful concurrency primitives for building high-performance, non-blocking applications. This guide covers futures, streams, and advanced async programming techniques.
🏗️ Async Architecture
Async Runtime
[async_runtime]
tokio: true
async_std: true
smol: true
custom_runtime: true[async_components]
futures: "async_computations"
streams: "async_iterators"
channels: "async_communication"
tasks: "async_execution"
Runtime Configuration
[runtime_configuration]
thread_pool: true
io_driver: true
time_driver: true
signal_driver: true[runtime_implementation]
use tokio::runtime::{Runtime, Builder};
pub struct AsyncRuntime {
runtime: Runtime,
}
impl AsyncRuntime {
pub fn new() -> Result<Self, std::io::Error> {
let runtime = Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()?;
Ok(Self { runtime })
}
pub fn block_on<F>(&self, future: F) -> F::Output
where
F: std::future::Future,
{
self.runtime.block_on(future)
}
pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
where
F: std::future::Future + Send + 'static,
F::Output: Send + 'static,
{
self.runtime.spawn(future)
}
}
🔄 Basic Async Patterns
Simple Async Functions
[basic_async_patterns]
async_functions: true
future_traits: true
await_keyword: true[basic_implementation]
use std::time::Duration;
use tokio::time::sleep;
// Basic async function
pub async fn simple_async_function() -> String {
sleep(Duration::from_millis(100)).await;
"Hello from async function".to_string()
}
// Async function with parameters
pub async fn async_with_params(name: &str, delay_ms: u64) -> String {
sleep(Duration::from_millis(delay_ms)).await;
format!("Hello, {}!", name)
}
// Async function returning Result
pub async fn async_result_function(input: i32) -> Result<i32, String> {
if input < 0 {
return Err("Input must be positive".to_string());
}
sleep(Duration::from_millis(50)).await;
Ok(input * 2)
}
// Async function with multiple awaits
pub async fn multiple_awaits() -> Vec<String> {
let mut results = Vec::new();
// First async operation
let result1 = async_with_params("Alice", 100).await;
results.push(result1);
// Second async operation
let result2 = async_with_params("Bob", 200).await;
results.push(result2);
// Third async operation
let result3 = async_with_params("Charlie", 300).await;
results.push(result3);
results
}
// Async function with conditional awaits
pub async fn conditional_async(condition: bool) -> String {
if condition {
sleep(Duration::from_millis(100)).await;
"Condition was true".to_string()
} else {
"Condition was false".to_string()
}
}
Future Combinators
[future_combinators]
map: true
and_then: true
or_else: true
select: true[combinator_implementation]
use futures::future::{self, FutureExt, TryFutureExt};
use std::pin::Pin;
// Map combinator
pub async fn future_map_example() -> i32 {
let future = future::ready(5);
future.map(|x| x * 2).await
}
// And then combinator
pub async fn future_and_then_example() -> Result<i32, String> {
let future = future::ready(Ok(5));
future
.and_then(|x| async move { Ok(x * 2) })
.await
}
// Or else combinator
pub async fn future_or_else_example() -> Result<i32, String> {
let future = future::ready(Err("Original error".to_string()));
future
.or_else(|_| async move { Ok(42) })
.await
}
// Select combinator
pub async fn future_select_example() -> String {
let future1 = async {
sleep(Duration::from_millis(100)).await;
"Future 1 completed"
};
let future2 = async {
sleep(Duration::from_millis(200)).await;
"Future 2 completed"
};
tokio::select! {
result = future1 => result,
result = future2 => result,
}
}
// Join combinator
pub async fn future_join_example() -> (String, i32) {
let future1 = async { "Hello".to_string() };
let future2 = async { 42 };
future::join(future1, future2).await
}
// Try join combinator
pub async fn future_try_join_example() -> Result<(String, i32), String> {
let future1 = async { Ok("Hello".to_string()) };
let future2 = async { Ok(42) };
future::try_join(future1, future2).await
}
🌊 Streams and Async Iterators
Basic Streams
[stream_patterns]
stream_traits: true
stream_processing: true
async_iterators: true[stream_implementation]
use futures::stream::{self, Stream, StreamExt};
use std::pin::Pin;
// Basic stream
pub fn create_number_stream() -> impl Stream<Item = i32> {
stream::iter(1..=10)
}
// Async stream
pub fn create_async_stream() -> impl Stream<Item = String> {
stream::unfold(0, |state| async move {
if state < 5 {
let item = format!("Item {}", state);
Some((item, state + 1))
} else {
None
}
})
}
// Stream processing
pub async fn process_stream() -> Vec<i32> {
let stream = create_number_stream();
stream
.map(|x| x * 2)
.filter(|&x| x % 4 == 0)
.collect::<Vec<_>>()
.await
}
// Stream with side effects
pub async fn stream_with_side_effects() {
let stream = create_async_stream();
stream
.for_each(|item| async move {
println!("Processing: {}", item);
sleep(Duration::from_millis(100)).await;
})
.await;
}
// Stream batching
pub async fn stream_batching() -> Vec<Vec<i32>> {
let stream = create_number_stream();
stream
.chunks(3)
.collect::<Vec<_>>()
.await
}
// Stream timeout
pub async fn stream_with_timeout() -> Result<Vec<i32>, tokio::time::error::Elapsed> {
let stream = create_number_stream();
tokio::time::timeout(
Duration::from_millis(500),
stream.collect::<Vec<_>>()
).await
}
Advanced Stream Patterns
[advanced_stream_patterns]
stream_merging: true
stream_splitting: true
backpressure: true[advanced_stream_implementation]
use futures::stream::{self, StreamExt};
use tokio::sync::mpsc;
// Merge multiple streams
pub async fn merge_streams() -> Vec<i32> {
let stream1 = stream::iter(1..=5);
let stream2 = stream::iter(6..=10);
let stream3 = stream::iter(11..=15);
stream::select_all(vec![stream1, stream2, stream3])
.map(|(item, _index, _remaining)| item)
.collect::<Vec<_>>()
.await
}
// Stream with backpressure
pub async fn stream_with_backpressure() {
let (tx, mut rx) = mpsc::channel(10);
// Producer task
let producer = tokio::spawn(async move {
for i in 1..=100 {
tx.send(i).await.unwrap();
sleep(Duration::from_millis(10)).await;
}
});
// Consumer task
let consumer = tokio::spawn(async move {
while let Some(item) = rx.recv().await {
println!("Received: {}", item);
sleep(Duration::from_millis(50)).await; // Slower consumer
}
});
// Wait for both tasks
let _ = tokio::join!(producer, consumer);
}
// Stream splitting
pub async fn stream_splitting() {
let stream = create_number_stream();
let (mut tx1, mut tx2) = stream.split();
let task1 = tokio::spawn(async move {
while let Some(item) = tx1.next().await {
println!("Task 1: {}", item);
}
});
let task2 = tokio::spawn(async move {
while let Some(item) = tx2.next().await {
println!("Task 2: {}", item);
}
});
tokio::join!(task1, task2);
}
// Stream with error handling
pub async fn stream_error_handling() -> Result<Vec<i32>, String> {
let stream = stream::iter(1..=10)
.map(|x| {
if x == 5 {
Err("Error at 5".to_string())
} else {
Ok(x)
}
});
stream
.try_collect::<Vec<_>>()
.await
}
🔗 Channels and Communication
Async Channels
[channel_patterns]
mpsc_channels: true
broadcast_channels: true
oneshot_channels: true[channel_implementation]
use tokio::sync::{mpsc, broadcast, oneshot};
// MPSC channel
pub async fn mpsc_example() {
let (tx, mut rx) = mpsc::channel(100);
// Sender task
let sender = tokio::spawn(async move {
for i in 1..=10 {
tx.send(i).await.unwrap();
sleep(Duration::from_millis(100)).await;
}
});
// Receiver task
let receiver = tokio::spawn(async move {
while let Some(item) = rx.recv().await {
println!("Received: {}", item);
}
});
tokio::join!(sender, receiver);
}
// Broadcast channel
pub async fn broadcast_example() {
let (tx, mut rx1) = broadcast::channel(10);
let mut rx2 = tx.subscribe();
// Sender task
let sender = tokio::spawn(async move {
for i in 1..=5 {
tx.send(i).unwrap();
sleep(Duration::from_millis(200)).await;
}
});
// Receiver 1
let receiver1 = tokio::spawn(async move {
while let Ok(item) = rx1.recv().await {
println!("Receiver 1: {}", item);
}
});
// Receiver 2
let receiver2 = tokio::spawn(async move {
while let Ok(item) = rx2.recv().await {
println!("Receiver 2: {}", item);
}
});
tokio::join!(sender, receiver1, receiver2);
}
// Oneshot channel
pub async fn oneshot_example() {
let (tx, rx) = oneshot::channel();
// Sender task
let sender = tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
tx.send("Hello from oneshot".to_string()).unwrap();
});
// Receiver task
let receiver = tokio::spawn(async move {
let message = rx.await.unwrap();
println!("Received: {}", message);
});
tokio::join!(sender, receiver);
}
// Channel with timeout
pub async fn channel_with_timeout() -> Result<String, tokio::time::error::Elapsed> {
let (tx, rx) = oneshot::channel();
// Sender task
tokio::spawn(async move {
sleep(Duration::from_millis(200)).await;
let _ = tx.send("Delayed message".to_string());
});
// Receiver with timeout
tokio::time::timeout(Duration::from_millis(100), rx).await
.map_err(|_| tokio::time::error::Elapsed(()))
.and_then(|result| result.map_err(|_| tokio::time::error::Elapsed(())))
}
🎯 Task Management
Task Spawning and Management
[task_management]
task_spawning: true
task_cancellation: true
task_supervision: true[task_implementation]
use tokio::task::{JoinHandle, spawn, spawn_blocking};
use std::sync::Arc;
use tokio::sync::Mutex;
// Basic task spawning
pub async fn basic_task_spawning() {
let handle = spawn(async {
sleep(Duration::from_millis(100)).await;
"Task completed".to_string()
});
let result = handle.await.unwrap();
println!("{}", result);
}
// Task with cancellation
pub async fn task_with_cancellation() {
let handle = spawn(async {
loop {
sleep(Duration::from_millis(100)).await;
println!("Task running...");
}
});
// Cancel after 1 second
sleep(Duration::from_secs(1)).await;
handle.abort();
match handle.await {
Ok(_) => println!("Task completed normally"),
Err(err) if err.is_cancelled() => println!("Task was cancelled"),
Err(err) => println!("Task failed: {:?}", err),
}
}
// Task supervision
pub async fn task_supervision() {
let shared_state = Arc::new(Mutex::new(0));
let mut handles = Vec::new();
for i in 0..5 {
let state = Arc::clone(&shared_state);
let handle = spawn(async move {
loop {
{
let mut count = state.lock().await;
*count += 1;
println!("Task {}: count = {}", i, *count);
}
if let Err(_) = tokio::time::timeout(
Duration::from_millis(100),
sleep(Duration::from_millis(100))
).await {
break;
}
}
});
handles.push(handle);
}
// Wait for all tasks
for handle in handles {
let _ = handle.await;
}
}
// Blocking task
pub async fn blocking_task_example() {
let handle = spawn_blocking(|| {
// CPU-intensive work
let mut sum = 0;
for i in 1..=1_000_000 {
sum += i;
}
sum
});
let result = handle.await.unwrap();
println!("Sum: {}", result);
}
// Task with error handling
pub async fn task_error_handling() {
let handle = spawn(async {
if rand::random::<bool>() {
Ok("Success".to_string())
} else {
Err("Random error".to_string())
}
});
match handle.await {
Ok(Ok(result)) => println!("Task succeeded: {}", result),
Ok(Err(err)) => println!("Task failed: {}", err),
Err(err) => println!("Task panicked: {:?}", err),
}
}
🔄 Advanced Async Patterns
Async Traits and Generics
[advanced_async_patterns]
async_traits: true
async_generics: true
async_closures: true[advanced_implementation]
use std::future::Future;
use async_trait::async_trait;
// Async trait
#[async_trait]
pub trait AsyncProcessor {
async fn process(&self, input: String) -> Result<String, String>;
async fn batch_process(&self, inputs: Vec<String>) -> Vec<Result<String, String>>;
}
// Implementation
pub struct SimpleProcessor;
#[async_trait]
impl AsyncProcessor for SimpleProcessor {
async fn process(&self, input: String) -> Result<String, String> {
sleep(Duration::from_millis(100)).await;
Ok(format!("Processed: {}", input))
}
async fn batch_process(&self, inputs: Vec<String>) -> Vec<Result<String, String>> {
let mut results = Vec::new();
for input in inputs {
let result = self.process(input).await;
results.push(result);
}
results
}
}
// Async generic function
pub async fn async_generic<T, F, Fut>(items: Vec<T>, processor: F) -> Vec<T>
where
F: Fn(T) -> Fut + Send + Sync,
Fut: Future<Output = T> + Send,
T: Send + 'static,
{
let mut results = Vec::new();
for item in items {
let processed = processor(item).await;
results.push(processed);
}
results
}
// Async closure
pub async fn async_closure_example() {
let processor = |input: i32| async move {
sleep(Duration::from_millis(100)).await;
input * 2
};
let items = vec![1, 2, 3, 4, 5];
let results = async_generic(items, processor).await;
println!("Results: {:?}", results);
}
// Async function with lifetime
pub async fn async_with_lifetime<'a>(data: &'a str) -> String {
sleep(Duration::from_millis(100)).await;
format!("Processed: {}", data)
}
Async Resource Management
[async_resource_management]
async_drop: true
resource_pools: true
connection_pools: true[resource_implementation]
use std::sync::Arc;
use tokio::sync::Semaphore;
// Async resource with custom drop
pub struct AsyncResource {
data: String,
}
impl AsyncResource {
pub async fn new(data: String) -> Self {
sleep(Duration::from_millis(50)).await;
Self { data }
}
pub async fn process(&self) -> String {
sleep(Duration::from_millis(100)).await;
format!("Processed: {}", self.data)
}
}
impl Drop for AsyncResource {
fn drop(&mut self) {
// Note: Drop cannot be async, but we can spawn a task
let data = self.data.clone();
tokio::spawn(async move {
sleep(Duration::from_millis(50)).await;
println!("Cleaned up resource: {}", data);
});
}
}
// Resource pool
pub struct ResourcePool {
semaphore: Arc<Semaphore>,
resources: Arc<Mutex<Vec<AsyncResource>>>,
}
impl ResourcePool {
pub fn new(capacity: usize) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(capacity)),
resources: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn acquire(&self) -> Option<AsyncResource> {
let _permit = self.semaphore.acquire().await.ok()?;
let mut resources = self.resources.lock().await;
resources.pop()
}
pub async fn release(&self, resource: AsyncResource) {
let mut resources = self.resources.lock().await;
resources.push(resource);
}
}
// Connection pool example
pub struct DatabaseConnection {
id: u32,
}
impl DatabaseConnection {
pub async fn new(id: u32) -> Self {
sleep(Duration::from_millis(100)).await; // Simulate connection time
Self { id }
}
pub async fn query(&self, sql: &str) -> String {
sleep(Duration::from_millis(50)).await; // Simulate query time
format!("Result from connection {}: {}", self.id, sql)
}
}
pub struct ConnectionPool {
connections: Arc<Mutex<Vec<DatabaseConnection>>>,
max_connections: usize,
}
impl ConnectionPool {
pub async fn new(max_connections: usize) -> Self {
let mut connections = Vec::new();
for i in 0..max_connections {
connections.push(DatabaseConnection::new(i).await);
}
Self {
connections: Arc::new(Mutex::new(connections)),
max_connections,
}
}
pub async fn get_connection(&self) -> Option<DatabaseConnection> {
let mut connections = self.connections.lock().await;
connections.pop()
}
pub async fn return_connection(&self, connection: DatabaseConnection) {
let mut connections = self.connections.lock().await;
if connections.len() < self.max_connections {
connections.push(connection);
}
}
}
🎯 Best Practices
1. Async Function Design
- Keep async functions focused and composable - Use proper error types and Result handling - Avoid blocking operations in async contexts - Use appropriate timeouts and cancellation2. Resource Management
- Implement proper cleanup in async contexts - Use connection pools for expensive resources - Handle backpressure in streams - Monitor task lifecycle and cancellation3. Performance Optimization
- Use appropriate concurrency levels - Avoid spawning too many tasks - Use blocking tasks for CPU-intensive work - Profile async performance regularly4. Error Handling
- Implement comprehensive error types - Use proper error propagation - Handle task cancellation gracefully - Monitor and log async errors5. Testing
- Test async functions thoroughly - Use async test utilities - Mock async dependencies - Test error conditions and timeoutsAsync/await patterns with TuskLang and Rust provide powerful concurrency primitives for building high-performance, non-blocking applications with excellent resource management and error handling.