🦀 Concurrent Programming in TuskLang with Rust
Concurrent Programming in TuskLang with Rust
🔄 Concurrent Foundation
Concurrent programming with TuskLang and Rust provides powerful tools for building high-performance, scalable applications that can handle multiple tasks simultaneously. This guide covers threads, async/await, and advanced concurrent patterns.
🏗️ Concurrent Architecture
Concurrency Models
[concurrency_models]
thread_based: true
async_await: true
actor_model: true
shared_state: true[concurrency_patterns]
message_passing: true
shared_memory: true
lock_free: true
atomic_operations: true
Concurrent Components
[concurrent_components]
threads: "std_thread"
async_runtime: "tokio"
channels: "message_passing"
locks: "mutex_rwlock"
🔧 Thread-Based Concurrency
Thread Management
[thread_management]
thread_creation: true
thread_pooling: true
thread_synchronization: true[thread_implementation]
use std::thread;
use std::sync::{Arc, Mutex};
use std::time::Duration;
// Basic thread creation
pub fn basic_thread_example() {
let handle = thread::spawn(|| {
for i in 1..=10 {
println!("Thread: {}", i);
thread::sleep(Duration::from_millis(100));
}
});
handle.join().unwrap();
}
// Thread with data
pub fn thread_with_data() {
let data = vec![1, 2, 3, 4, 5];
let handle = thread::spawn(move || {
let sum: i32 = data.iter().sum();
println!("Sum: {}", sum);
});
handle.join().unwrap();
}
// Thread pool implementation
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Message>>,
}
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(Message::NewJob(job)).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job();
}
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
enum Message {
NewJob(Job),
Terminate,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
Thread Synchronization
[thread_synchronization]
mutex: true
rwlock: true
semaphore: true
barrier: true[synchronization_implementation]
use std::sync::{Arc, Mutex, RwLock, Condvar};
use std::collections::HashMap;
// Mutex example
pub fn mutex_example() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
// RwLock example
pub fn rwlock_example() {
let data = Arc::new(RwLock::new(HashMap::new()));
let mut handles = vec![];
// Writers
for i in 0..3 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
let mut map = data.write().unwrap();
map.insert(i, format!("value_{}", i));
});
handles.push(handle);
}
// Readers
for _ in 0..5 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
let map = data.read().unwrap();
println!("Read: {:?}", *map);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
// Condition variable example
pub fn condition_variable_example() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
thread::spawn(move || {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
});
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
}
⚡ Async/Await Concurrency
Async Runtime
[async_runtime]
tokio_runtime: true
async_functions: true
future_handling: true[async_implementation]
use tokio::time::{sleep, Duration};
use futures::future::{join, join_all};
// Basic async function
pub async fn async_function() -> String {
sleep(Duration::from_secs(1)).await;
"Hello from async".to_string()
}
// Async function with error handling
pub async fn async_with_result() -> Result<String, Box<dyn std::error::Error>> {
sleep(Duration::from_millis(100)).await;
Ok("Success".to_string())
}
// Concurrent execution
pub async fn concurrent_execution() {
let futures = vec![
async_function(),
async_function(),
async_function(),
];
let results = join_all(futures).await;
println!("Results: {:?}", results);
}
// Join multiple futures
pub async fn join_futures() {
let future1 = async_function();
let future2 = async_with_result();
let (result1, result2) = join(future1, future2).await;
println!("Result1: {}, Result2: {:?}", result1, result2);
}
// Async stream processing
pub async fn async_stream_processing() {
use tokio_stream::{self as stream, StreamExt};
let mut stream = stream::iter(1..=10)
.map(|x| async move {
sleep(Duration::from_millis(100)).await;
x * 2
})
.buffer_unordered(3);
while let Some(result) = stream.next().await {
println!("Processed: {}", result);
}
}
Async Channels
[async_channels]
mpsc: true
broadcast: true
watch: true[channel_implementation]
use tokio::sync::{mpsc, broadcast, watch};
// MPSC channel
pub async fn mpsc_example() {
let (tx, mut rx) = mpsc::channel(100);
// Sender task
let sender = tokio::spawn(async move {
for i in 0..10 {
tx.send(i).await.unwrap();
sleep(Duration::from_millis(100)).await;
}
});
// Receiver task
let receiver = tokio::spawn(async move {
while let Some(value) = rx.recv().await {
println!("Received: {}", value);
}
});
let _ = join(sender, receiver).await;
}
// Broadcast channel
pub async fn broadcast_example() {
let (tx, _) = broadcast::channel(16);
let mut receivers = Vec::new();
// Create multiple receivers
for i in 0..3 {
let mut rx = tx.subscribe();
let receiver = tokio::spawn(async move {
while let Ok(value) = rx.recv().await {
println!("Receiver {}: {}", i, value);
}
});
receivers.push(receiver);
}
// Send messages
for i in 0..5 {
tx.send(i).unwrap();
sleep(Duration::from_millis(100)).await;
}
// Wait for receivers
for receiver in receivers {
receiver.await.unwrap();
}
}
// Watch channel
pub async fn watch_example() {
let (tx, mut rx) = watch::channel(0);
// Watcher task
let watcher = tokio::spawn(async move {
while rx.changed().await.is_ok() {
println!("Value changed to: {}", *rx.borrow());
}
});
// Updater task
let updater = tokio::spawn(async move {
for i in 1..=5 {
tx.send(i).unwrap();
sleep(Duration::from_millis(200)).await;
}
});
let _ = join(watcher, updater).await;
}
🔄 Actor Model
Actor Implementation
[actor_model]
actor_system: true
message_passing: true
supervision: true[actor_implementation]
use tokio::sync::mpsc;
use std::collections::HashMap;
// Actor trait
pub trait Actor: Send + 'static {
type Message: Send + 'static;
type Response: Send + 'static;
async fn handle(&mut self, message: Self::Message) -> Option<Self::Response>;
}
// Actor system
pub struct ActorSystem {
actors: HashMap<String, ActorHandle>,
}
impl ActorSystem {
pub fn new() -> Self {
Self {
actors: HashMap::new(),
}
}
pub async fn spawn<A>(&mut self, name: String, actor: A) -> ActorRef<A::Message, A::Response>
where
A: Actor,
{
let (tx, mut rx) = mpsc::channel(100);
let mut actor = actor;
let handle = tokio::spawn(async move {
while let Some(message) = rx.recv().await {
if let Some(response) = actor.handle(message).await {
// Handle response
}
}
});
self.actors.insert(name.clone(), ActorHandle { handle });
ActorRef { sender: tx }
}
}
// Actor reference
pub struct ActorRef<M, R> {
sender: mpsc::Sender<M>,
}
impl<M, R> ActorRef<M, R> {
pub async fn send(&self, message: M) -> Result<(), mpsc::error::SendError<M>> {
self.sender.send(message).await
}
}
// Example actor
pub struct CounterActor {
count: i32,
}
impl Actor for CounterActor {
type Message = CounterMessage;
type Response = CounterResponse;
async fn handle(&mut self, message: Self::Message) -> Option<Self::Response> {
match message {
CounterMessage::Increment => {
self.count += 1;
Some(CounterResponse::Count(self.count))
}
CounterMessage::Get => {
Some(CounterResponse::Count(self.count))
}
CounterMessage::Reset => {
self.count = 0;
Some(CounterResponse::Count(self.count))
}
}
}
}
#[derive(Debug)]
pub enum CounterMessage {
Increment,
Get,
Reset,
}
#[derive(Debug)]
pub enum CounterResponse {
Count(i32),
}
// Usage example
pub async fn actor_example() {
let mut system = ActorSystem::new();
let counter_ref = system
.spawn("counter".to_string(), CounterActor { count: 0 })
.await;
counter_ref.send(CounterMessage::Increment).await.unwrap();
counter_ref.send(CounterMessage::Increment).await.unwrap();
counter_ref.send(CounterMessage::Get).await.unwrap();
}
🔒 Lock-Free Programming
Atomic Operations
[atomic_operations]
atomic_types: true
compare_exchange: true
memory_ordering: true[atomic_implementation]
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
// Atomic counter
pub struct AtomicCounter {
count: AtomicUsize,
}
impl AtomicCounter {
pub fn new() -> Self {
Self {
count: AtomicUsize::new(0),
}
}
pub fn increment(&self) -> usize {
self.count.fetch_add(1, Ordering::SeqCst)
}
pub fn get(&self) -> usize {
self.count.load(Ordering::SeqCst)
}
pub fn compare_exchange(&self, current: usize, new: usize) -> Result<usize, usize> {
self.count.compare_exchange(current, new, Ordering::SeqCst, Ordering::SeqCst)
}
}
// Lock-free queue
pub struct LockFreeQueue<T> {
head: AtomicPtr<Node<T>>,
tail: AtomicPtr<Node<T>>,
}
struct Node<T> {
data: Option<T>,
next: AtomicPtr<Node<T>>,
}
impl<T> LockFreeQueue<T> {
pub fn new() -> Self {
let dummy = Box::into_raw(Box::new(Node {
data: None,
next: AtomicPtr::new(std::ptr::null_mut()),
}));
Self {
head: AtomicPtr::new(dummy),
tail: AtomicPtr::new(dummy),
}
}
pub fn enqueue(&self, value: T) {
let new_node = Box::into_raw(Box::new(Node {
data: Some(value),
next: AtomicPtr::new(std::ptr::null_mut()),
}));
loop {
let tail = self.tail.load(Ordering::Acquire);
let next = unsafe { (*tail).next.load(Ordering::Acquire) };
if next.is_null() {
if unsafe { (*tail).next.compare_exchange(
std::ptr::null_mut(),
new_node,
Ordering::Release,
Ordering::Relaxed,
) }.is_ok() {
self.tail.compare_exchange(
tail,
new_node,
Ordering::Release,
Ordering::Relaxed,
).ok();
break;
}
} else {
self.tail.compare_exchange(
tail,
next,
Ordering::Release,
Ordering::Relaxed,
).ok();
}
}
}
pub fn dequeue(&self) -> Option<T> {
loop {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
let next = unsafe { (*head).next.load(Ordering::Acquire) };
if head == tail {
if next.is_null() {
return None;
}
self.tail.compare_exchange(
tail,
next,
Ordering::Release,
Ordering::Relaxed,
).ok();
} else {
if let Some(data) = unsafe { (*next).data.take() } {
self.head.compare_exchange(
head,
next,
Ordering::Release,
Ordering::Relaxed,
).ok();
return Some(data);
}
}
}
}
}
🔄 Concurrent Patterns
Producer-Consumer
[producer_consumer]
bounded_buffer: true
multiple_producers: true
multiple_consumers: true[producer_consumer_implementation]
use tokio::sync::mpsc;
pub struct ProducerConsumer<T> {
sender: mpsc::Sender<T>,
receiver: mpsc::Receiver<T>,
}
impl<T> ProducerConsumer<T> {
pub fn new(buffer_size: usize) -> Self {
let (sender, receiver) = mpsc::channel(buffer_size);
Self { sender, receiver }
}
pub fn producer(&self) -> mpsc::Sender<T> {
self.sender.clone()
}
pub fn consumer(&mut self) -> mpsc::Receiver<T> {
self.receiver.clone()
}
}
// Usage example
pub async fn producer_consumer_example() {
let mut pc = ProducerConsumer::new(10);
// Multiple producers
let producers = (0..3).map(|id| {
let sender = pc.producer();
tokio::spawn(async move {
for i in 0..5 {
sender.send(format!("Producer {}: {}", id, i)).await.unwrap();
sleep(Duration::from_millis(100)).await;
}
})
});
// Multiple consumers
let consumers = (0..2).map(|id| {
let mut receiver = pc.consumer();
tokio::spawn(async move {
while let Some(item) = receiver.recv().await {
println!("Consumer {}: {}", id, item);
}
})
});
// Wait for all tasks
let all_tasks = producers.chain(consumers);
for task in all_tasks {
task.await.unwrap();
}
}
Read-Write Lock
[read_write_lock]
rwlock_implementation: true
starvation_prevention: true
fairness: true[rwlock_implementation]
use std::sync::{Arc, Mutex, Condvar};
use std::collections::VecDeque;
pub struct FairRwLock<T> {
data: Arc<Mutex<T>>,
readers: Arc<Mutex<VecDeque<Condvar>>>,
writers: Arc<Mutex<VecDeque<Condvar>>>,
active_readers: Arc<Mutex<usize>>,
active_writers: Arc<Mutex<usize>>,
}
impl<T> FairRwLock<T> {
pub fn new(data: T) -> Self {
Self {
data: Arc::new(Mutex::new(data)),
readers: Arc::new(Mutex::new(VecDeque::new())),
writers: Arc::new(Mutex::new(VecDeque::new())),
active_readers: Arc::new(Mutex::new(0)),
active_writers: Arc::new(Mutex::new(0)),
}
}
pub async fn read<F, R>(&self, f: F) -> R
where
F: FnOnce(&T) -> R,
{
let (cv, _) = Condvar::new();
{
let mut readers = self.readers.lock().unwrap();
readers.push_back(cv.clone());
}
loop {
let mut active_writers = self.active_writers.lock().unwrap();
let mut readers = self.readers.lock().unwrap();
if *active_writers == 0 && readers.front() == Some(&cv) {
*self.active_readers.lock().unwrap() += 1;
readers.pop_front();
break;
}
cv = readers.pop_front().unwrap();
cv = active_writers.wait(cv).unwrap();
}
let result = f(&*self.data.lock().unwrap());
*self.active_readers.lock().unwrap() -= 1;
result
}
pub async fn write<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut T) -> R,
{
let (cv, _) = Condvar::new();
{
let mut writers = self.writers.lock().unwrap();
writers.push_back(cv.clone());
}
loop {
let mut active_readers = self.active_readers.lock().unwrap();
let mut active_writers = self.active_writers.lock().unwrap();
let mut writers = self.writers.lock().unwrap();
if active_readers == 0 && active_writers == 0 && writers.front() == Some(&cv) {
*active_writers += 1;
writers.pop_front();
break;
}
cv = writers.pop_front().unwrap();
cv = active_readers.wait(cv).unwrap();
}
let result = f(&mut *self.data.lock().unwrap());
*self.active_writers.lock().unwrap() -= 1;
result
}
}
🎯 Best Practices
1. Thread Safety
- Use appropriate synchronization primitives - Avoid data races - Use atomic operations when possible - Implement proper error handling2. Async Programming
- Use async/await for I/O operations - Avoid blocking in async contexts - Use appropriate runtime - Handle cancellation properly3. Performance
- Use thread pools for CPU-bound tasks - Implement backpressure handling - Monitor resource usage - Profile concurrent code4. Error Handling
- Implement proper error propagation - Use Result types consistently - Handle panics gracefully - Implement timeouts5. Testing
- Test concurrent code thoroughly - Use stress testing - Test race conditions - Verify thread safetyConcurrent programming with TuskLang and Rust provides powerful tools for building high-performance, scalable applications that can efficiently utilize modern multi-core systems.