🦀 Concurrent Programming in TuskLang with Rust

Rust Documentation

Concurrent Programming in TuskLang with Rust

🔄 Concurrent Foundation

Concurrent programming with TuskLang and Rust provides powerful tools for building high-performance, scalable applications that can handle multiple tasks simultaneously. This guide covers threads, async/await, and advanced concurrent patterns.

🏗️ Concurrent Architecture

Concurrency Models

[concurrency_models]
thread_based: true
async_await: true
actor_model: true
shared_state: true

[concurrency_patterns] message_passing: true shared_memory: true lock_free: true atomic_operations: true

Concurrent Components

[concurrent_components]
threads: "std_thread"
async_runtime: "tokio"
channels: "message_passing"
locks: "mutex_rwlock"

🔧 Thread-Based Concurrency

Thread Management

[thread_management]
thread_creation: true
thread_pooling: true
thread_synchronization: true

[thread_implementation] use std::thread; use std::sync::{Arc, Mutex}; use std::time::Duration;

// Basic thread creation pub fn basic_thread_example() { let handle = thread::spawn(|| { for i in 1..=10 { println!("Thread: {}", i); thread::sleep(Duration::from_millis(100)); } }); handle.join().unwrap(); }

// Thread with data pub fn thread_with_data() { let data = vec![1, 2, 3, 4, 5]; let handle = thread::spawn(move || { let sum: i32 = data.iter().sum(); println!("Sum: {}", sum); }); handle.join().unwrap(); }

// Thread pool implementation pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Message>>, }

impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(Message::NewJob(job)).unwrap(); } }

impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in &mut self.workers { if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } }

struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, }

impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv().unwrap(); match message { Message::NewJob(job) => { println!("Worker {} got a job; executing.", id); job(); } Message::Terminate => { println!("Worker {} was told to terminate.", id); break; } } }); Worker { id, thread: Some(thread), } } }

enum Message { NewJob(Job), Terminate, }

type Job = Box<dyn FnOnce() + Send + 'static>;

Thread Synchronization

[thread_synchronization]
mutex: true
rwlock: true
semaphore: true
barrier: true

[synchronization_implementation] use std::sync::{Arc, Mutex, RwLock, Condvar}; use std::collections::HashMap;

// Mutex example pub fn mutex_example() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }

// RwLock example pub fn rwlock_example() { let data = Arc::new(RwLock::new(HashMap::new())); let mut handles = vec![]; // Writers for i in 0..3 { let data = Arc::clone(&data); let handle = thread::spawn(move || { let mut map = data.write().unwrap(); map.insert(i, format!("value_{}", i)); }); handles.push(handle); } // Readers for _ in 0..5 { let data = Arc::clone(&data); let handle = thread::spawn(move || { let map = data.read().unwrap(); println!("Read: {:?}", *map); }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } }

// Condition variable example pub fn condition_variable_example() { let pair = Arc::new((Mutex::new(false), Condvar::new())); let pair2 = Arc::clone(&pair); thread::spawn(move || { let (lock, cvar) = &*pair2; let mut started = lock.lock().unwrap(); *started = true; cvar.notify_one(); }); let (lock, cvar) = &*pair; let mut started = lock.lock().unwrap(); while !*started { started = cvar.wait(started).unwrap(); } }

⚡ Async/Await Concurrency

Async Runtime

[async_runtime]
tokio_runtime: true
async_functions: true
future_handling: true

[async_implementation] use tokio::time::{sleep, Duration}; use futures::future::{join, join_all};

// Basic async function pub async fn async_function() -> String { sleep(Duration::from_secs(1)).await; "Hello from async".to_string() }

// Async function with error handling pub async fn async_with_result() -> Result<String, Box<dyn std::error::Error>> { sleep(Duration::from_millis(100)).await; Ok("Success".to_string()) }

// Concurrent execution pub async fn concurrent_execution() { let futures = vec![ async_function(), async_function(), async_function(), ]; let results = join_all(futures).await; println!("Results: {:?}", results); }

// Join multiple futures pub async fn join_futures() { let future1 = async_function(); let future2 = async_with_result(); let (result1, result2) = join(future1, future2).await; println!("Result1: {}, Result2: {:?}", result1, result2); }

// Async stream processing pub async fn async_stream_processing() { use tokio_stream::{self as stream, StreamExt}; let mut stream = stream::iter(1..=10) .map(|x| async move { sleep(Duration::from_millis(100)).await; x * 2 }) .buffer_unordered(3); while let Some(result) = stream.next().await { println!("Processed: {}", result); } }

Async Channels

[async_channels]
mpsc: true
broadcast: true
watch: true

[channel_implementation] use tokio::sync::{mpsc, broadcast, watch};

// MPSC channel pub async fn mpsc_example() { let (tx, mut rx) = mpsc::channel(100); // Sender task let sender = tokio::spawn(async move { for i in 0..10 { tx.send(i).await.unwrap(); sleep(Duration::from_millis(100)).await; } }); // Receiver task let receiver = tokio::spawn(async move { while let Some(value) = rx.recv().await { println!("Received: {}", value); } }); let _ = join(sender, receiver).await; }

// Broadcast channel pub async fn broadcast_example() { let (tx, _) = broadcast::channel(16); let mut receivers = Vec::new(); // Create multiple receivers for i in 0..3 { let mut rx = tx.subscribe(); let receiver = tokio::spawn(async move { while let Ok(value) = rx.recv().await { println!("Receiver {}: {}", i, value); } }); receivers.push(receiver); } // Send messages for i in 0..5 { tx.send(i).unwrap(); sleep(Duration::from_millis(100)).await; } // Wait for receivers for receiver in receivers { receiver.await.unwrap(); } }

// Watch channel pub async fn watch_example() { let (tx, mut rx) = watch::channel(0); // Watcher task let watcher = tokio::spawn(async move { while rx.changed().await.is_ok() { println!("Value changed to: {}", *rx.borrow()); } }); // Updater task let updater = tokio::spawn(async move { for i in 1..=5 { tx.send(i).unwrap(); sleep(Duration::from_millis(200)).await; } }); let _ = join(watcher, updater).await; }

🔄 Actor Model

Actor Implementation

[actor_model]
actor_system: true
message_passing: true
supervision: true

[actor_implementation] use tokio::sync::mpsc; use std::collections::HashMap;

// Actor trait pub trait Actor: Send + 'static { type Message: Send + 'static; type Response: Send + 'static; async fn handle(&mut self, message: Self::Message) -> Option<Self::Response>; }

// Actor system pub struct ActorSystem { actors: HashMap<String, ActorHandle>, }

impl ActorSystem { pub fn new() -> Self { Self { actors: HashMap::new(), } } pub async fn spawn<A>(&mut self, name: String, actor: A) -> ActorRef<A::Message, A::Response> where A: Actor, { let (tx, mut rx) = mpsc::channel(100); let mut actor = actor; let handle = tokio::spawn(async move { while let Some(message) = rx.recv().await { if let Some(response) = actor.handle(message).await { // Handle response } } }); self.actors.insert(name.clone(), ActorHandle { handle }); ActorRef { sender: tx } } }

// Actor reference pub struct ActorRef<M, R> { sender: mpsc::Sender<M>, }

impl<M, R> ActorRef<M, R> { pub async fn send(&self, message: M) -> Result<(), mpsc::error::SendError<M>> { self.sender.send(message).await } }

// Example actor pub struct CounterActor { count: i32, }

impl Actor for CounterActor { type Message = CounterMessage; type Response = CounterResponse; async fn handle(&mut self, message: Self::Message) -> Option<Self::Response> { match message { CounterMessage::Increment => { self.count += 1; Some(CounterResponse::Count(self.count)) } CounterMessage::Get => { Some(CounterResponse::Count(self.count)) } CounterMessage::Reset => { self.count = 0; Some(CounterResponse::Count(self.count)) } } } }

#[derive(Debug)] pub enum CounterMessage { Increment, Get, Reset, }

#[derive(Debug)] pub enum CounterResponse { Count(i32), }

// Usage example pub async fn actor_example() { let mut system = ActorSystem::new(); let counter_ref = system .spawn("counter".to_string(), CounterActor { count: 0 }) .await; counter_ref.send(CounterMessage::Increment).await.unwrap(); counter_ref.send(CounterMessage::Increment).await.unwrap(); counter_ref.send(CounterMessage::Get).await.unwrap(); }

🔒 Lock-Free Programming

Atomic Operations

[atomic_operations]
atomic_types: true
compare_exchange: true
memory_ordering: true

[atomic_implementation] use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};

// Atomic counter pub struct AtomicCounter { count: AtomicUsize, }

impl AtomicCounter { pub fn new() -> Self { Self { count: AtomicUsize::new(0), } } pub fn increment(&self) -> usize { self.count.fetch_add(1, Ordering::SeqCst) } pub fn get(&self) -> usize { self.count.load(Ordering::SeqCst) } pub fn compare_exchange(&self, current: usize, new: usize) -> Result<usize, usize> { self.count.compare_exchange(current, new, Ordering::SeqCst, Ordering::SeqCst) } }

// Lock-free queue pub struct LockFreeQueue<T> { head: AtomicPtr<Node<T>>, tail: AtomicPtr<Node<T>>, }

struct Node<T> { data: Option<T>, next: AtomicPtr<Node<T>>, }

impl<T> LockFreeQueue<T> { pub fn new() -> Self { let dummy = Box::into_raw(Box::new(Node { data: None, next: AtomicPtr::new(std::ptr::null_mut()), })); Self { head: AtomicPtr::new(dummy), tail: AtomicPtr::new(dummy), } } pub fn enqueue(&self, value: T) { let new_node = Box::into_raw(Box::new(Node { data: Some(value), next: AtomicPtr::new(std::ptr::null_mut()), })); loop { let tail = self.tail.load(Ordering::Acquire); let next = unsafe { (*tail).next.load(Ordering::Acquire) }; if next.is_null() { if unsafe { (*tail).next.compare_exchange( std::ptr::null_mut(), new_node, Ordering::Release, Ordering::Relaxed, ) }.is_ok() { self.tail.compare_exchange( tail, new_node, Ordering::Release, Ordering::Relaxed, ).ok(); break; } } else { self.tail.compare_exchange( tail, next, Ordering::Release, Ordering::Relaxed, ).ok(); } } } pub fn dequeue(&self) -> Option<T> { loop { let head = self.head.load(Ordering::Acquire); let tail = self.tail.load(Ordering::Acquire); let next = unsafe { (*head).next.load(Ordering::Acquire) }; if head == tail { if next.is_null() { return None; } self.tail.compare_exchange( tail, next, Ordering::Release, Ordering::Relaxed, ).ok(); } else { if let Some(data) = unsafe { (*next).data.take() } { self.head.compare_exchange( head, next, Ordering::Release, Ordering::Relaxed, ).ok(); return Some(data); } } } } }

🔄 Concurrent Patterns

Producer-Consumer

[producer_consumer]
bounded_buffer: true
multiple_producers: true
multiple_consumers: true

[producer_consumer_implementation] use tokio::sync::mpsc;

pub struct ProducerConsumer<T> { sender: mpsc::Sender<T>, receiver: mpsc::Receiver<T>, }

impl<T> ProducerConsumer<T> { pub fn new(buffer_size: usize) -> Self { let (sender, receiver) = mpsc::channel(buffer_size); Self { sender, receiver } } pub fn producer(&self) -> mpsc::Sender<T> { self.sender.clone() } pub fn consumer(&mut self) -> mpsc::Receiver<T> { self.receiver.clone() } }

// Usage example pub async fn producer_consumer_example() { let mut pc = ProducerConsumer::new(10); // Multiple producers let producers = (0..3).map(|id| { let sender = pc.producer(); tokio::spawn(async move { for i in 0..5 { sender.send(format!("Producer {}: {}", id, i)).await.unwrap(); sleep(Duration::from_millis(100)).await; } }) }); // Multiple consumers let consumers = (0..2).map(|id| { let mut receiver = pc.consumer(); tokio::spawn(async move { while let Some(item) = receiver.recv().await { println!("Consumer {}: {}", id, item); } }) }); // Wait for all tasks let all_tasks = producers.chain(consumers); for task in all_tasks { task.await.unwrap(); } }

Read-Write Lock

[read_write_lock]
rwlock_implementation: true
starvation_prevention: true
fairness: true

[rwlock_implementation] use std::sync::{Arc, Mutex, Condvar}; use std::collections::VecDeque;

pub struct FairRwLock<T> { data: Arc<Mutex<T>>, readers: Arc<Mutex<VecDeque<Condvar>>>, writers: Arc<Mutex<VecDeque<Condvar>>>, active_readers: Arc<Mutex<usize>>, active_writers: Arc<Mutex<usize>>, }

impl<T> FairRwLock<T> { pub fn new(data: T) -> Self { Self { data: Arc::new(Mutex::new(data)), readers: Arc::new(Mutex::new(VecDeque::new())), writers: Arc::new(Mutex::new(VecDeque::new())), active_readers: Arc::new(Mutex::new(0)), active_writers: Arc::new(Mutex::new(0)), } } pub async fn read<F, R>(&self, f: F) -> R where F: FnOnce(&T) -> R, { let (cv, _) = Condvar::new(); { let mut readers = self.readers.lock().unwrap(); readers.push_back(cv.clone()); } loop { let mut active_writers = self.active_writers.lock().unwrap(); let mut readers = self.readers.lock().unwrap(); if *active_writers == 0 && readers.front() == Some(&cv) { *self.active_readers.lock().unwrap() += 1; readers.pop_front(); break; } cv = readers.pop_front().unwrap(); cv = active_writers.wait(cv).unwrap(); } let result = f(&*self.data.lock().unwrap()); *self.active_readers.lock().unwrap() -= 1; result } pub async fn write<F, R>(&self, f: F) -> R where F: FnOnce(&mut T) -> R, { let (cv, _) = Condvar::new(); { let mut writers = self.writers.lock().unwrap(); writers.push_back(cv.clone()); } loop { let mut active_readers = self.active_readers.lock().unwrap(); let mut active_writers = self.active_writers.lock().unwrap(); let mut writers = self.writers.lock().unwrap(); if active_readers == 0 && active_writers == 0 && writers.front() == Some(&cv) { *active_writers += 1; writers.pop_front(); break; } cv = writers.pop_front().unwrap(); cv = active_readers.wait(cv).unwrap(); } let result = f(&mut *self.data.lock().unwrap()); *self.active_writers.lock().unwrap() -= 1; result } }

🎯 Best Practices

1. Thread Safety

- Use appropriate synchronization primitives - Avoid data races - Use atomic operations when possible - Implement proper error handling

2. Async Programming

- Use async/await for I/O operations - Avoid blocking in async contexts - Use appropriate runtime - Handle cancellation properly

3. Performance

- Use thread pools for CPU-bound tasks - Implement backpressure handling - Monitor resource usage - Profile concurrent code

4. Error Handling

- Implement proper error propagation - Use Result types consistently - Handle panics gracefully - Implement timeouts

5. Testing

- Test concurrent code thoroughly - Use stress testing - Test race conditions - Verify thread safety

Concurrent programming with TuskLang and Rust provides powerful tools for building high-performance, scalable applications that can efficiently utilize modern multi-core systems.