☕ ⚡ Reactive Programming with TuskLang Java
⚡ Reactive Programming with TuskLang Java
"We don't bow to any king" - Reactive Java Edition
TuskLang Java enables building reactive applications with Spring WebFlux, Project Reactor, and reactive patterns that handle high concurrency with non-blocking I/O and backpressure management.
🎯 Reactive Architecture Overview
Reactive Configuration
// reactive-app.tsk
[reactive_system]
name: "Reactive TuskLang App"
version: "2.0.0"
paradigm: "reactive"
backpressure_strategy: "buffer"[webflux]
port: 8080
host: "0.0.0.0"
thread_pool: {
core_size: 4
max_size: 8
queue_capacity: 1000
keep_alive: "60s"
}
[reactor]
scheduler: {
parallel: {
threads: 4
name_prefix: "parallel-"
}
elastic: {
threads: 8
name_prefix: "elastic-"
ttl: "60s"
}
bounded_elastic: {
threads: 4
name_prefix: "bounded-elastic-"
queue_capacity: 1000
ttl: "60s"
}
}
backpressure: {
buffer_size: 256
timeout: "5s"
retry_attempts: 3
retry_backoff: "exponential"
}
[databases]
reactive_postgres: {
type: "r2dbc"
host: @env("DB_HOST", "localhost")
port: @env("DB_PORT", "5432")
database: @env("DB_NAME", "reactive_app")
username: @env("DB_USER", "reactive_user")
password: @env.secure("DB_PASSWORD")
pool_size: 20
connection_timeout: "30s"
statement_timeout: "60s"
max_lifetime: "30m"
}
reactive_redis: {
type: "lettuce"
host: @env("REDIS_HOST", "localhost")
port: @env("REDIS_PORT", "6379")
password: @env.secure("REDIS_PASSWORD")
pool_size: 20
connection_timeout: "5s"
command_timeout: "3s"
ttl: "1h"
}
[streaming]
kafka: {
bootstrap_servers: [
@env("KAFKA_BROKER_1", "localhost:9092"),
@env("KAFKA_BROKER_2", "localhost:9093")
]
consumer: {
group_id: "reactive-app-group"
auto_offset_reset: "earliest"
enable_auto_commit: false
max_poll_records: 500
fetch_min_size: 1
fetch_max_wait: "500ms"
}
producer: {
acks: "all"
retries: 3
batch_size: 16384
linger_ms: 5
buffer_memory: 33554432
}
}
[circuit_breaker]
resilience4j: {
enabled: true
sliding_window_size: 10
minimum_number_of_calls: 5
failure_rate_threshold: 50
wait_duration_in_open_state: "60s"
permitted_number_of_calls_in_half_open_state: 3
automatic_transition_from_open_to_half_open_enabled: true
}
[monitoring]
micrometer: {
enabled: true
prometheus: true
jvm_metrics: true
reactor_metrics: true
}
[streaming_endpoints]
user_stream: {
path: "/stream/users"
buffer_size: 100
timeout: "30s"
backpressure: "drop"
}
order_stream: {
path: "/stream/orders"
buffer_size: 200
timeout: "60s"
backpressure: "buffer"
}
notification_stream: {
path: "/stream/notifications"
buffer_size: 50
timeout: "15s"
backpressure: "latest"
}
🔄 Spring WebFlux Integration
WebFlux Configuration
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.tusklang.java.annotations.TuskConfig;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;@Configuration
@EnableWebFlux
@TuskConfig
public class WebFluxConfiguration implements WebFluxConfigurer {
private final ReactiveConfig reactiveConfig;
public WebFluxConfiguration(ReactiveConfig reactiveConfig) {
this.reactiveConfig = reactiveConfig;
}
@Bean
public RouterFunction<ServerResponse> reactiveRoutes(
UserHandler userHandler,
OrderHandler orderHandler,
NotificationHandler notificationHandler) {
return RouterFunctions.route()
// User routes
.GET("/api/users", userHandler::getAllUsers)
.GET("/api/users/{id}", userHandler::getUserById)
.POST("/api/users", userHandler::createUser)
.PUT("/api/users/{id}", userHandler::updateUser)
.DELETE("/api/users/{id}", userHandler::deleteUser)
// Order routes
.GET("/api/orders", orderHandler::getAllOrders)
.GET("/api/orders/{id}", orderHandler::getOrderById)
.POST("/api/orders", orderHandler::createOrder)
.PUT("/api/orders/{id}", orderHandler::updateOrder)
.DELETE("/api/orders/{id}", orderHandler::deleteOrder)
// Streaming routes
.GET("/stream/users", userHandler::streamUsers)
.GET("/stream/orders", orderHandler::streamOrders)
.GET("/stream/notifications", notificationHandler::streamNotifications)
.build();
}
@Bean
public SchedulerConfig schedulerConfig() {
SchedulerConfig config = reactiveConfig.getReactor().getScheduler();
return SchedulerConfig.builder()
.parallelScheduler(Schedulers.newParallel(
config.getParallel().getNamePrefix(),
config.getParallel().getThreads()))
.elasticScheduler(Schedulers.newElastic(
config.getElastic().getNamePrefix(),
config.getElastic().getThreads(),
Duration.parse(config.getElastic().getTtl())))
.boundedElasticScheduler(Schedulers.newBoundedElastic(
config.getBoundedElastic().getThreads(),
config.getBoundedElastic().getQueueCapacity(),
config.getBoundedElastic().getNamePrefix(),
Duration.parse(config.getBoundedElastic().getTtl())))
.build();
}
}
@TuskConfig
public class ReactiveConfig {
private String name;
private String version;
private String paradigm;
private String backpressureStrategy;
private WebFluxConfig webflux;
private ReactorConfig reactor;
private Map<String, DatabaseConfig> databases;
private StreamingConfig streaming;
private CircuitBreakerConfig circuitBreaker;
private MonitoringConfig monitoring;
private Map<String, StreamingEndpointConfig> streamingEndpoints;
// Getters and setters
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getVersion() { return version; }
public void setVersion(String version) { this.version = version; }
public String getParadigm() { return paradigm; }
public void setParadigm(String paradigm) { this.paradigm = paradigm; }
public String getBackpressureStrategy() { return backpressureStrategy; }
public void setBackpressureStrategy(String backpressureStrategy) { this.backpressureStrategy = backpressureStrategy; }
public WebFluxConfig getWebflux() { return webflux; }
public void setWebflux(WebFluxConfig webflux) { this.webflux = webflux; }
public ReactorConfig getReactor() { return reactor; }
public void setReactor(ReactorConfig reactor) { this.reactor = reactor; }
public Map<String, DatabaseConfig> getDatabases() { return databases; }
public void setDatabases(Map<String, DatabaseConfig> databases) { this.databases = databases; }
public StreamingConfig getStreaming() { return streaming; }
public void setStreaming(StreamingConfig streaming) { this.streaming = streaming; }
public CircuitBreakerConfig getCircuitBreaker() { return circuitBreaker; }
public void setCircuitBreaker(CircuitBreakerConfig circuitBreaker) { this.circuitBreaker = circuitBreaker; }
public MonitoringConfig getMonitoring() { return monitoring; }
public void setMonitoring(MonitoringConfig monitoring) { this.monitoring = monitoring; }
public Map<String, StreamingEndpointConfig> getStreamingEndpoints() { return streamingEndpoints; }
public void setStreamingEndpoints(Map<String, StreamingEndpointConfig> streamingEndpoints) { this.streamingEndpoints = streamingEndpoints; }
}
@TuskConfig
public class WebFluxConfig {
private int port;
private String host;
private ThreadPoolConfig threadPool;
// Getters and setters
public int getPort() { return port; }
public void setPort(int port) { this.port = port; }
public String getHost() { return host; }
public void setHost(String host) { this.host = host; }
public ThreadPoolConfig getThreadPool() { return threadPool; }
public void setThreadPool(ThreadPoolConfig threadPool) { this.threadPool = threadPool; }
}
@TuskConfig
public class ThreadPoolConfig {
private int coreSize;
private int maxSize;
private int queueCapacity;
private String keepAlive;
// Getters and setters
public int getCoreSize() { return coreSize; }
public void setCoreSize(int coreSize) { this.coreSize = coreSize; }
public int getMaxSize() { return maxSize; }
public void setMaxSize(int maxSize) { this.maxSize = maxSize; }
public int getQueueCapacity() { return queueCapacity; }
public void setQueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; }
public String getKeepAlive() { return keepAlive; }
public void setKeepAlive(String keepAlive) { this.keepAlive = keepAlive; }
}
Reactive User Handler
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;@Component
public class UserHandler {
private final ReactiveUserService userService;
private final ReactiveConfig reactiveConfig;
private final CircuitBreaker circuitBreaker;
public UserHandler(ReactiveUserService userService,
ReactiveConfig reactiveConfig,
CircuitBreaker circuitBreaker) {
this.userService = userService;
this.reactiveConfig = reactiveConfig;
this.circuitBreaker = circuitBreaker;
}
public Mono<ServerResponse> getAllUsers(ServerRequest request) {
return request.queryParam("page")
.map(page -> {
int pageNum = Integer.parseInt(page);
int size = request.queryParam("size")
.map(Integer::parseInt)
.orElse(20);
String search = request.queryParam("search").orElse("");
return userService.getUsers(pageNum, size, search)
.collectList()
.map(users -> Map.of(
"users", users,
"page", pageNum,
"size", size,
"totalElements", users.size()
));
})
.orElse(userService.getAllUsers().collectList())
.flatMap(response -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(response))
.onErrorResume(e -> {
log.error("Error getting users", e);
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(Map.of("error", "Failed to retrieve users"));
});
}
public Mono<ServerResponse> getUserById(ServerRequest request) {
String userId = request.pathVariable("id");
return circuitBreaker.run(
userService.getUserById(userId)
.subscribeOn(Schedulers.boundedElastic()),
throwable -> {
log.error("Circuit breaker opened for user service", throwable);
return Mono.error(new ServiceUnavailableException("User service temporarily unavailable"));
}
)
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build())
.onErrorResume(ServiceUnavailableException.class, e ->
ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE)
.bodyValue(Map.of("error", e.getMessage()))
)
.onErrorResume(e -> {
log.error("Error getting user by ID: {}", userId, e);
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(Map.of("error", "Failed to retrieve user"));
});
}
public Mono<ServerResponse> createUser(ServerRequest request) {
return request.bodyToMono(CreateUserRequest.class)
.flatMap(userService::createUser)
.flatMap(user -> ServerResponse.status(HttpStatus.CREATED)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.onErrorResume(IllegalArgumentException.class, e ->
ServerResponse.badRequest()
.bodyValue(Map.of("error", e.getMessage()))
)
.onErrorResume(e -> {
log.error("Error creating user", e);
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(Map.of("error", "Failed to create user"));
});
}
public Mono<ServerResponse> updateUser(ServerRequest request) {
String userId = request.pathVariable("id");
return request.bodyToMono(UpdateUserRequest.class)
.flatMap(updateRequest -> userService.updateUser(userId, updateRequest))
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build())
.onErrorResume(IllegalArgumentException.class, e ->
ServerResponse.badRequest()
.bodyValue(Map.of("error", e.getMessage()))
)
.onErrorResume(e -> {
log.error("Error updating user: {}", userId, e);
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(Map.of("error", "Failed to update user"));
});
}
public Mono<ServerResponse> deleteUser(ServerRequest request) {
String userId = request.pathVariable("id");
return userService.deleteUser(userId)
.flatMap(deleted -> {
if (deleted) {
return ServerResponse.ok()
.bodyValue(Map.of("message", "User deleted successfully"));
} else {
return ServerResponse.notFound().build();
}
})
.onErrorResume(e -> {
log.error("Error deleting user: {}", userId, e);
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(Map.of("error", "Failed to delete user"));
});
}
public Mono<ServerResponse> streamUsers(ServerRequest request) {
StreamingEndpointConfig config = reactiveConfig.getStreamingEndpoints().get("user_stream");
return userService.streamUsers()
.bufferTimeout(config.getBufferSize(), Duration.parse(config.getTimeout()))
.flatMap(users -> Flux.fromIterable(users))
.onBackpressureBuffer(config.getBufferSize())
.subscribeOn(Schedulers.parallel())
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_NDJSON)
.bodyValue(user))
.onErrorResume(e -> {
log.error("Error streaming users", e);
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(Map.of("error", "Failed to stream users"));
});
}
}
🔄 Project Reactor Patterns
Reactive Service Implementation
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import java.time.Duration;@Service
public class ReactiveUserService {
private final ReactiveUserRepository userRepository;
private final ReactiveCacheService cacheService;
private final ReactiveNotificationService notificationService;
private final ReactiveConfig reactiveConfig;
public ReactiveUserService(ReactiveUserRepository userRepository,
ReactiveCacheService cacheService,
ReactiveNotificationService notificationService,
ReactiveConfig reactiveConfig) {
this.userRepository = userRepository;
this.cacheService = cacheService;
this.notificationService = notificationService;
this.reactiveConfig = reactiveConfig;
}
public Flux<User> getAllUsers() {
return userRepository.findAll()
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(user -> log.debug("Retrieved user: {}", user.getId()))
.doOnError(error -> log.error("Error retrieving all users", error));
}
public Flux<User> getUsers(int page, int size, String search) {
return userRepository.findBySearchCriteria(search, page, size)
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(user -> log.debug("Retrieved user: {}", user.getId()))
.doOnError(error -> log.error("Error retrieving users with search: {}", search, error));
}
@CircuitBreaker(name = "userService")
public Mono<User> getUserById(String userId) {
return cacheService.getUser(userId)
.switchIfEmpty(
userRepository.findById(userId)
.subscribeOn(Schedulers.boundedElastic())
.flatMap(user -> cacheService.cacheUser(user).thenReturn(user))
)
.doOnNext(user -> log.debug("Retrieved user by ID: {}", userId))
.doOnError(error -> log.error("Error retrieving user by ID: {}", userId, error));
}
public Mono<User> createUser(CreateUserRequest request) {
return Mono.fromCallable(() -> validateCreateUserRequest(request))
.subscribeOn(Schedulers.boundedElastic())
.flatMap(validated -> userRepository.save(validated))
.flatMap(user -> cacheService.cacheUser(user).thenReturn(user))
.flatMap(user -> notificationService.sendUserCreatedNotification(user).thenReturn(user))
.doOnNext(user -> log.info("Created user: {}", user.getId()))
.doOnError(error -> log.error("Error creating user", error));
}
public Mono<User> updateUser(String userId, UpdateUserRequest request) {
return userRepository.findById(userId)
.subscribeOn(Schedulers.boundedElastic())
.switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + userId)))
.flatMap(existingUser -> {
User updatedUser = updateUserFields(existingUser, request);
return userRepository.save(updatedUser);
})
.flatMap(user -> cacheService.cacheUser(user).thenReturn(user))
.flatMap(user -> notificationService.sendUserUpdatedNotification(user).thenReturn(user))
.doOnNext(user -> log.info("Updated user: {}", userId))
.doOnError(error -> log.error("Error updating user: {}", userId, error));
}
public Mono<Boolean> deleteUser(String userId) {
return userRepository.findById(userId)
.subscribeOn(Schedulers.boundedElastic())
.flatMap(user -> userRepository.deleteById(userId).thenReturn(true))
.flatMap(deleted -> cacheService.evictUser(userId).thenReturn(deleted))
.flatMap(deleted -> notificationService.sendUserDeletedNotification(userId).thenReturn(deleted))
.defaultIfEmpty(false)
.doOnNext(deleted -> {
if (deleted) {
log.info("Deleted user: {}", userId);
} else {
log.warn("User not found for deletion: {}", userId);
}
})
.doOnError(error -> log.error("Error deleting user: {}", userId, error));
}
public Flux<User> streamUsers() {
return userRepository.findAll()
.subscribeOn(Schedulers.parallel())
.delayElements(Duration.ofMillis(100)) // Simulate processing time
.doOnNext(user -> log.debug("Streaming user: {}", user.getId()))
.doOnError(error -> log.error("Error streaming users", error));
}
private User validateCreateUserRequest(CreateUserRequest request) {
if (request.getEmail() == null || request.getEmail().trim().isEmpty()) {
throw new IllegalArgumentException("Email is required");
}
if (request.getFirstName() == null || request.getFirstName().trim().isEmpty()) {
throw new IllegalArgumentException("First name is required");
}
if (request.getLastName() == null || request.getLastName().trim().isEmpty()) {
throw new IllegalArgumentException("Last name is required");
}
User user = new User();
user.setId(UUID.randomUUID().toString());
user.setEmail(request.getEmail());
user.setFirstName(request.getFirstName());
user.setLastName(request.getLastName());
user.setCreatedAt(Instant.now());
user.setUpdatedAt(Instant.now());
return user;
}
private User updateUserFields(User existingUser, UpdateUserRequest request) {
if (request.getEmail() != null) {
existingUser.setEmail(request.getEmail());
}
if (request.getFirstName() != null) {
existingUser.setFirstName(request.getFirstName());
}
if (request.getLastName() != null) {
existingUser.setLastName(request.getLastName());
}
existingUser.setUpdatedAt(Instant.now());
return existingUser;
}
}
Reactive Repository Implementation
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.data.relational.core.query.Query;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import static org.springframework.data.relational.core.query.Criteria.where;@Repository
public class ReactiveUserRepository {
private final R2dbcEntityTemplate template;
private final DatabaseClient databaseClient;
public ReactiveUserRepository(R2dbcEntityTemplate template, DatabaseClient databaseClient) {
this.template = template;
this.databaseClient = databaseClient;
}
public Flux<User> findAll() {
return template.select(User.class)
.all()
.doOnError(error -> log.error("Error finding all users", error));
}
public Mono<User> findById(String id) {
return template.select(User.class)
.matching(Query.query(where("id").is(id)))
.one()
.doOnError(error -> log.error("Error finding user by ID: {}", id, error));
}
public Flux<User> findBySearchCriteria(String search, int page, int size) {
Query query = Query.query(where("first_name").like("%" + search + "%")
.or("last_name").like("%" + search + "%")
.or("email").like("%" + search + "%"))
.limit(size)
.offset(page * size);
return template.select(User.class)
.matching(query)
.all()
.doOnError(error -> log.error("Error finding users with search criteria: {}", search, error));
}
public Mono<User> save(User user) {
if (user.getId() == null) {
return template.insert(User.class)
.using(user)
.doOnSuccess(savedUser -> log.debug("Inserted user: {}", savedUser.getId()))
.doOnError(error -> log.error("Error inserting user", error));
} else {
return template.update(User.class)
.matching(Query.query(where("id").is(user.getId())))
.apply(Update.update("email", user.getEmail())
.set("first_name", user.getFirstName())
.set("last_name", user.getLastName())
.set("updated_at", user.getUpdatedAt()))
.thenReturn(user)
.doOnSuccess(updatedUser -> log.debug("Updated user: {}", updatedUser.getId()))
.doOnError(error -> log.error("Error updating user: {}", user.getId(), error));
}
}
public Mono<Void> deleteById(String id) {
return template.delete(User.class)
.matching(Query.query(where("id").is(id)))
.all()
.then()
.doOnSuccess(v -> log.debug("Deleted user: {}", id))
.doOnError(error -> log.error("Error deleting user: {}", id, error));
}
public Flux<User> streamAll() {
return databaseClient.sql("SELECT * FROM users ORDER BY created_at DESC")
.map(row -> {
User user = new User();
user.setId(row.get("id", String.class));
user.setEmail(row.get("email", String.class));
user.setFirstName(row.get("first_name", String.class));
user.setLastName(row.get("last_name", String.class));
user.setCreatedAt(row.get("created_at", Instant.class));
user.setUpdatedAt(row.get("updated_at", Instant.class));
return user;
})
.all()
.doOnError(error -> log.error("Error streaming users", error));
}
}
🔄 Backpressure Management
Backpressure Strategies
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;@Service
public class BackpressureService {
private final ReactiveConfig reactiveConfig;
public BackpressureService(ReactiveConfig reactiveConfig) {
this.reactiveConfig = reactiveConfig;
}
// Buffer strategy - accumulates items and processes them in batches
public Flux<String> processWithBuffer(Flux<String> source) {
return source
.bufferTimeout(100, Duration.ofSeconds(1))
.flatMap(batch -> Flux.fromIterable(batch)
.map(this::processItem)
.subscribeOn(Schedulers.parallel()))
.onBackpressureBuffer(1000)
.doOnNext(item -> log.debug("Processed item: {}", item))
.doOnError(error -> log.error("Error processing with buffer", error));
}
// Drop strategy - drops items when buffer is full
public Flux<String> processWithDrop(Flux<String> source) {
return source
.onBackpressureDrop(dropped -> log.warn("Dropped item: {}", dropped))
.map(this::processItem)
.subscribeOn(Schedulers.parallel())
.doOnNext(item -> log.debug("Processed item: {}", item))
.doOnError(error -> log.error("Error processing with drop", error));
}
// Latest strategy - keeps only the latest item
public Flux<String> processWithLatest(Flux<String> source) {
return source
.onBackpressureLatest()
.map(this::processItem)
.subscribeOn(Schedulers.parallel())
.doOnNext(item -> log.debug("Processed item: {}", item))
.doOnError(error -> log.error("Error processing with latest", error));
}
// Error strategy - emits error when buffer is full
public Flux<String> processWithError(Flux<String> source) {
return source
.onBackpressureError()
.map(this::processItem)
.subscribeOn(Schedulers.parallel())
.doOnNext(item -> log.debug("Processed item: {}", item))
.doOnError(error -> log.error("Error processing with error strategy", error));
}
// Adaptive strategy - switches between strategies based on load
public Flux<String> processWithAdaptive(Flux<String> source) {
return source
.window(Duration.ofSeconds(1))
.flatMap(window -> window
.collectList()
.flatMapMany(items -> {
if (items.size() > 1000) {
// High load - use drop strategy
return Flux.fromIterable(items)
.onBackpressureDrop(dropped -> log.warn("Dropped item: {}", dropped))
.map(this::processItem);
} else if (items.size() > 500) {
// Medium load - use buffer strategy
return Flux.fromIterable(items)
.onBackpressureBuffer(100)
.map(this::processItem);
} else {
// Low load - process normally
return Flux.fromIterable(items)
.map(this::processItem);
}
}))
.subscribeOn(Schedulers.parallel())
.doOnNext(item -> log.debug("Processed item: {}", item))
.doOnError(error -> log.error("Error processing with adaptive strategy", error));
}
private String processItem(String item) {
// Simulate processing time
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Processed: " + item;
}
}
🔧 Best Practices
1. Reactive Streams
- Use appropriate backpressure strategies - Avoid blocking operations in reactive streams - Use proper schedulers for different types of work - Handle errors gracefully2. Performance Optimization
- Use connection pooling for databases - Implement proper caching strategies - Use circuit breakers for external services - Monitor and optimize memory usage3. Error Handling
- Use onErrorResume for fallback strategies - Implement retry mechanisms with backoff - Use circuit breakers for resilience - Log errors appropriately4. Testing
- Use StepVerifier for testing reactive streams - Test backpressure scenarios - Mock external dependencies - Test error conditions5. Monitoring
- Use Micrometer for metrics - Monitor reactive stream performance - Track backpressure events - Monitor circuit breaker states🎯 Summary
TuskLang Java reactive programming provides:
- Spring WebFlux Integration: Non-blocking web framework - Project Reactor: Reactive streams implementation - Backpressure Management: Multiple strategies for handling overload - Circuit Breakers: Resilience patterns for external services - Performance Optimization: Efficient resource utilization
The combination of TuskLang's executable configuration with Java's reactive programming capabilities creates a powerful platform for building high-performance, scalable, and responsive applications.
"We don't bow to any king" - Build reactive applications that handle high concurrency!