🦀 Reactive Programming in TuskLang with Rust
Reactive Programming in TuskLang with Rust
⚡ Reactive Foundation
Reactive programming with TuskLang and Rust provides a powerful paradigm for building responsive, resilient, and scalable applications. This guide covers streams, observables, and reactive patterns for handling asynchronous data flows.
🏗️ Reactive Architecture
Reactive Principles
[reactive_principles]
data_flow: "stream_based"
asynchronous: true
non_blocking: true
backpressure: true[reactive_patterns]
observable_stream: true
publisher_subscriber: true
reactive_streams: true
functional_programming: true
Reactive Components
[reactive_components]
observable: "data_source"
observer: "data_consumer"
operator: "data_transformation"
scheduler: "execution_context"
🔧 Observable Implementation
Basic Observable
[observable_implementation]
use tokio_stream::{Stream, StreamExt};
use futures_util::{Sink, SinkExt};
use std::pin::Pin;
use std::task::{Context, Poll};pub struct Observable<T> {
stream: Pin<Box<dyn Stream<Item = T> + Send>>,
}
impl<T> Observable<T> {
pub fn new<S>(stream: S) -> Self
where
S: Stream<Item = T> + Send + 'static,
{
Self {
stream: Box::pin(stream),
}
}
pub fn from_iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = T>,
T: Send + 'static,
{
let stream = tokio_stream::iter(iter);
Self::new(stream)
}
pub fn from_async_fn<F, Fut>(f: F) -> Self
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let stream = tokio_stream::unfold((), move |_| {
let fut = f();
async move {
let result = fut.await;
Some((result, ()))
}
});
Self::new(stream)
}
}
impl<T> Stream for Observable<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
Observable Operators
[observable_operators]
transformation: true
filtering: true
combining: true
error_handling: true[operator_implementation]
impl<T> Observable<T> {
pub fn map<U, F>(self, f: F) -> Observable<U>
where
F: Fn(T) -> U + Send + Sync + 'static,
T: Send + 'static,
U: Send + 'static,
{
let stream = self.map(f);
Observable::new(stream)
}
pub fn filter<F>(self, predicate: F) -> Observable<T>
where
F: Fn(&T) -> bool + Send + Sync + 'static,
T: Send + 'static,
{
let stream = self.filter(predicate);
Observable::new(stream)
}
pub fn take(self, n: usize) -> Observable<T>
where
T: Send + 'static,
{
let stream = self.take(n);
Observable::new(stream)
}
pub fn skip(self, n: usize) -> Observable<T>
where
T: Send + 'static,
{
let stream = self.skip(n);
Observable::new(stream)
}
pub fn debounce(self, duration: Duration) -> Observable<T>
where
T: Send + 'static,
{
let stream = self.debounce(duration);
Observable::new(stream)
}
pub fn throttle(self, duration: Duration) -> Observable<T>
where
T: Send + 'static,
{
let stream = self.throttle(duration);
Observable::new(stream)
}
}
🔄 Stream Processing
Data Streams
[data_streams]
real_time: true
batch_processing: true
window_operations: true[stream_implementation]
pub struct DataStream<T> {
source: Observable<T>,
config: StreamConfig,
}
#[derive(Clone)]
pub struct StreamConfig {
pub buffer_size: usize,
pub batch_size: usize,
pub window_size: Duration,
pub backpressure_strategy: BackpressureStrategy,
}
#[derive(Clone)]
pub enum BackpressureStrategy {
Drop,
Buffer,
Block,
}
impl<T> DataStream<T> {
pub fn new(source: Observable<T>, config: StreamConfig) -> Self {
Self { source, config }
}
pub async fn process<F, U>(self, processor: F) -> Observable<U>
where
F: Fn(T) -> U + Send + Sync + 'static,
T: Send + 'static,
U: Send + 'static,
{
let stream = self.source.map(processor);
Observable::new(stream)
}
pub async fn process_batch<F, U>(self, processor: F) -> Observable<Vec<U>>
where
F: Fn(Vec<T>) -> Vec<U> + Send + Sync + 'static,
T: Send + 'static,
U: Send + 'static,
{
let batch_size = self.config.batch_size;
let stream = self.source
.chunks(batch_size)
.map(processor);
Observable::new(stream)
}
pub async fn process_window<F, U>(self, processor: F) -> Observable<Vec<U>>
where
F: Fn(Vec<T>) -> Vec<U> + Send + Sync + 'static,
T: Send + 'static,
U: Send + 'static,
{
let window_size = self.config.window_size;
let stream = self.source
.timeout_window(window_size)
.map(processor);
Observable::new(stream)
}
}
Event Streams
[event_streams]
event_sourcing: true
event_processing: true
event_routing: true[event_stream_implementation]
pub struct EventStream {
events: Observable<Event>,
config: EventStreamConfig,
}
#[derive(Clone)]
pub struct EventStreamConfig {
pub event_types: Vec<String>,
pub routing_strategy: RoutingStrategy,
pub error_handling: ErrorHandlingStrategy,
}
#[derive(Clone)]
pub enum RoutingStrategy {
Broadcast,
RoundRobin,
Hash,
Topic,
}
#[derive(Clone)]
pub enum ErrorHandlingStrategy {
Retry { max_attempts: usize, backoff: Duration },
DeadLetter,
Skip,
}
impl EventStream {
pub fn new(events: Observable<Event>, config: EventStreamConfig) -> Self {
Self { events, config }
}
pub async fn route_events(self) -> HashMap<String, Observable<Event>> {
let mut routes = HashMap::new();
match self.config.routing_strategy {
RoutingStrategy::Broadcast => {
for event_type in &self.config.event_types {
let route = self.events.clone()
.filter(move |event| event.event_type == *event_type);
routes.insert(event_type.clone(), Observable::new(route));
}
}
RoutingStrategy::RoundRobin => {
let mut counter = 0;
let event_types = self.config.event_types.clone();
let stream = self.events
.map(move |event| {
let route = &event_types[counter % event_types.len()];
counter += 1;
(route.clone(), event)
});
// Group by route
// Implementation details...
}
_ => {
// Other routing strategies
}
}
routes
}
pub async fn process_with_retry<F>(self, processor: F) -> Observable<ProcessedEvent>
where
F: Fn(Event) -> Result<ProcessedEvent, EventError> + Send + Sync + 'static,
{
let max_attempts = match self.config.error_handling {
ErrorHandlingStrategy::Retry { max_attempts, .. } => max_attempts,
_ => 1,
};
let stream = self.events
.map(move |event| {
let mut attempts = 0;
loop {
match processor(event.clone()) {
Ok(result) => break result,
Err(_) if attempts < max_attempts => {
attempts += 1;
continue;
}
Err(e) => break ProcessedEvent::Error { event, error: e },
}
}
});
Observable::new(stream)
}
}
🔄 Reactive Patterns
Publisher-Subscriber
[publisher_subscriber]
pub_sub: true
topic_based: true
filtered_subscription: true[pubsub_implementation]
use tokio::sync::broadcast;
pub struct Publisher<T> {
sender: broadcast::Sender<T>,
}
impl<T> Publisher<T>
where
T: Clone + Send + 'static,
{
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self { sender }
}
pub async fn publish(&self, message: T) -> Result<usize, broadcast::error::SendError<T>> {
self.sender.send(message)
}
pub fn subscribe(&self) -> Subscriber<T> {
let receiver = self.sender.subscribe();
Subscriber::new(receiver)
}
}
pub struct Subscriber<T> {
receiver: broadcast::Receiver<T>,
}
impl<T> Subscriber<T> {
pub fn new(receiver: broadcast::Receiver<T>) -> Self {
Self { receiver }
}
pub async fn receive(&mut self) -> Result<T, broadcast::error::RecvError> {
self.receiver.recv().await
}
pub fn into_stream(self) -> impl Stream<Item = Result<T, broadcast::error::RecvError>> {
tokio_stream::wrappers::BroadcastStream::new(self.receiver)
}
}
Subject Pattern
[subject_pattern]
behavior_subject: true
replay_subject: true
async_subject: true[subject_implementation]
pub struct Subject<T> {
subscribers: Arc<Mutex<Vec<UnboundedSender<T>>>>,
}
impl<T> Subject<T>
where
T: Clone + Send + 'static,
{
pub fn new() -> Self {
Self {
subscribers: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn next(&self, value: T) {
let mut subscribers = self.subscribers.lock().await;
subscribers.retain(|sender| sender.send(value.clone()).is_ok());
}
pub async fn subscribe(&self) -> impl Stream<Item = T> {
let (sender, receiver) = unbounded_channel();
self.subscribers.lock().await.push(sender);
tokio_stream::wrappers::UnboundedReceiverStream::new(receiver)
}
}
pub struct BehaviorSubject<T> {
current_value: Arc<RwLock<Option<T>>>,
subject: Subject<T>,
}
impl<T> BehaviorSubject<T>
where
T: Clone + Send + 'static,
{
pub fn new(initial_value: T) -> Self {
let current_value = Arc::new(RwLock::new(Some(initial_value)));
let subject = Subject::new();
Self { current_value, subject }
}
pub async fn next(&self, value: T) {
*self.current_value.write().await = Some(value.clone());
self.subject.next(value).await;
}
pub async fn subscribe(&self) -> impl Stream<Item = T> {
let current_value = self.current_value.read().await.clone();
let subject_stream = self.subject.subscribe().await;
let initial_stream = if let Some(value) = current_value {
tokio_stream::once(value)
} else {
tokio_stream::empty()
};
initial_stream.chain(subject_stream)
}
}
🔄 Backpressure Handling
Backpressure Strategies
[backpressure_strategies]
drop: true
buffer: true
throttle: true
sample: true[backpressure_implementation]
pub enum BackpressureStrategy {
Drop,
Buffer { capacity: usize },
Throttle { rate: Duration },
Sample { interval: Duration },
}
impl<T> Observable<T> {
pub fn with_backpressure(self, strategy: BackpressureStrategy) -> Observable<T>
where
T: Send + 'static,
{
match strategy {
BackpressureStrategy::Drop => {
let stream = self.filter(|_| true); // No-op for now
Observable::new(stream)
}
BackpressureStrategy::Buffer { capacity } => {
let stream = self.buffer_unordered(capacity);
Observable::new(stream)
}
BackpressureStrategy::Throttle { rate } => {
let stream = self.throttle(rate);
Observable::new(stream)
}
BackpressureStrategy::Sample { interval } => {
let stream = self.sample(interval);
Observable::new(stream)
}
}
}
}
🔄 Error Handling
Error Recovery
[error_recovery]
retry: true
fallback: true
circuit_breaker: true[error_handling_implementation]
impl<T> Observable<T> {
pub fn retry(self, max_attempts: usize) -> Observable<T>
where
T: Send + 'static,
{
let stream = self.retry(max_attempts);
Observable::new(stream)
}
pub fn retry_with_backoff(self, max_attempts: usize, backoff: Duration) -> Observable<T>
where
T: Send + 'static,
{
let stream = self.retry_with_backoff(max_attempts, backoff);
Observable::new(stream)
}
pub fn fallback<F>(self, fallback: F) -> Observable<T>
where
F: Fn() -> T + Send + Sync + 'static,
T: Send + 'static,
{
let stream = self.or_else(move |_| {
let fallback = fallback.clone();
async move { fallback() }
});
Observable::new(stream)
}
pub fn circuit_breaker(self, failure_threshold: usize, timeout: Duration) -> Observable<T>
where
T: Send + 'static,
{
let stream = self.circuit_breaker(failure_threshold, timeout);
Observable::new(stream)
}
}
🔄 Schedulers
Execution Contexts
[schedulers]
immediate: true
current_thread: true
thread_pool: true
custom: true[scheduler_implementation]
pub enum Scheduler {
Immediate,
CurrentThread,
ThreadPool { threads: usize },
Custom(Box<dyn Executor + Send + Sync>),
}
impl<T> Observable<T> {
pub fn observe_on(self, scheduler: Scheduler) -> Observable<T>
where
T: Send + 'static,
{
let stream = match scheduler {
Scheduler::Immediate => self,
Scheduler::CurrentThread => self,
Scheduler::ThreadPool { threads } => {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(threads)
.build()
.unwrap();
// Implementation for thread pool scheduling
self
}
Scheduler::Custom(executor) => {
// Implementation for custom executor
self
}
};
Observable::new(stream)
}
}
🔄 Testing
Reactive Testing
[reactive_testing]
stream_testing: true
marble_testing: true
virtual_time: true[testing_implementation]
#[cfg(test)]
mod tests {
use super::*;
use tokio_test::{assert_ok, assert_pending, assert_ready};
#[tokio::test]
async fn test_observable_map() {
let observable = Observable::from_iter(vec![1, 2, 3, 4, 5])
.map(|x| x * 2);
let mut stream = observable;
let mut results = Vec::new();
while let Some(value) = stream.next().await {
results.push(value);
}
assert_eq!(results, vec![2, 4, 6, 8, 10]);
}
#[tokio::test]
async fn test_observable_filter() {
let observable = Observable::from_iter(vec![1, 2, 3, 4, 5])
.filter(|x| x % 2 == 0);
let mut stream = observable;
let mut results = Vec::new();
while let Some(value) = stream.next().await {
results.push(value);
}
assert_eq!(results, vec![2, 4]);
}
#[tokio::test]
async fn test_publisher_subscriber() {
let publisher = Publisher::new(10);
let mut subscriber = publisher.subscribe();
publisher.publish("Hello").await.unwrap();
publisher.publish("World").await.unwrap();
assert_eq!(subscriber.receive().await.unwrap(), "Hello");
assert_eq!(subscriber.receive().await.unwrap(), "World");
}
}
🎯 Best Practices
1. Stream Design
- Design streams for your use cases - Use appropriate backpressure strategies - Implement proper error handling - Consider memory usage2. Performance
- Use appropriate schedulers - Implement efficient operators - Monitor stream performance - Handle backpressure properly3. Error Handling
- Implement retry mechanisms - Use circuit breakers - Provide fallback strategies - Log errors appropriately4. Testing
- Test stream behavior - Use marble testing - Test error scenarios - Verify backpressure handling5. Monitoring
- Track stream metrics - Monitor error rates - Use distributed tracing - Set up alertingReactive programming with TuskLang and Rust provides a powerful foundation for building responsive, resilient, and scalable applications that can handle complex asynchronous data flows efficiently.