☕ 📡 Event-Driven Architecture with TuskLang Java
📡 Event-Driven Architecture with TuskLang Java
"We don't bow to any king" - Event Edition
TuskLang Java enables sophisticated event-driven architectures with built-in support for event sourcing, CQRS, message queues, and real-time event processing. Build reactive systems that respond to events and maintain data consistency across distributed services.
🎯 Event-Driven Architecture Overview
Event Sourcing Configuration
import org.tusklang.java.TuskLang;
import org.tusklang.java.config.TuskConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class EventDrivenApplication {
@Bean
public TuskConfig tuskConfig() {
TuskLang parser = new TuskLang();
return parser.parseFile("event-driven.tsk", TuskConfig.class);
}
public static void main(String[] args) {
SpringApplication.run(EventDrivenApplication.class, args);
}
}
// Event-driven configuration
@TuskConfig
public class EventDrivenConfig {
private String applicationName;
private String version;
private EventStoreConfig eventStore;
private MessageQueueConfig messageQueue;
private EventHandlersConfig eventHandlers;
private ProjectionConfig projections;
private CqrsConfig cqrs;
private MonitoringConfig monitoring;
// Getters and setters
public String getApplicationName() { return applicationName; }
public void setApplicationName(String applicationName) { this.applicationName = applicationName; }
public String getVersion() { return version; }
public void setVersion(String version) { this.version = version; }
public EventStoreConfig getEventStore() { return eventStore; }
public void setEventStore(EventStoreConfig eventStore) { this.eventStore = eventStore; }
public MessageQueueConfig getMessageQueue() { return messageQueue; }
public void setMessageQueue(MessageQueueConfig messageQueue) { this.messageQueue = messageQueue; }
public EventHandlersConfig getEventHandlers() { return eventHandlers; }
public void setEventHandlers(EventHandlersConfig eventHandlers) { this.eventHandlers = eventHandlers; }
public ProjectionConfig getProjections() { return projections; }
public void setProjections(ProjectionConfig projections) { this.projections = projections; }
public CqrsConfig getCqrs() { return cqrs; }
public void setCqrs(CqrsConfig cqrs) { this.cqrs = cqrs; }
public MonitoringConfig getMonitoring() { return monitoring; }
public void setMonitoring(MonitoringConfig monitoring) { this.monitoring = monitoring; }
}
@TuskConfig
public class EventStoreConfig {
private String type;
private String connectionString;
private String database;
private String collection;
private SnapshotConfig snapshot;
private SerializationConfig serialization;
// Getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getConnectionString() { return connectionString; }
public void setConnectionString(String connectionString) { this.connectionString = connectionString; }
public String getDatabase() { return database; }
public void setDatabase(String database) { this.database = database; }
public String getCollection() { return collection; }
public void setCollection(String collection) { this.collection = collection; }
public SnapshotConfig getSnapshot() { return snapshot; }
public void setSnapshot(SnapshotConfig snapshot) { this.snapshot = snapshot; }
public SerializationConfig getSerialization() { return serialization; }
public void setSerialization(SerializationConfig serialization) { this.serialization = serialization; }
}
@TuskConfig
public class SnapshotConfig {
private boolean enabled;
private int interval;
private String storage;
private CompressionConfig compression;
// Getters and setters
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public int getInterval() { return interval; }
public void setInterval(int interval) { this.interval = interval; }
public String getStorage() { return storage; }
public void setStorage(String storage) { this.storage = storage; }
public CompressionConfig getCompression() { return compression; }
public void setCompression(CompressionConfig compression) { this.compression = compression; }
}
@TuskConfig
public class CompressionConfig {
private boolean enabled;
private String algorithm;
private int level;
// Getters and setters
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public String getAlgorithm() { return algorithm; }
public void setAlgorithm(String algorithm) { this.algorithm = algorithm; }
public int getLevel() { return level; }
public void setLevel(int level) { this.level = level; }
}
@TuskConfig
public class SerializationConfig {
private String format;
private String schemaRegistry;
private boolean schemaValidation;
private Map<String, String> customSerializers;
// Getters and setters
public String getFormat() { return format; }
public void setFormat(String format) { this.format = format; }
public String getSchemaRegistry() { return schemaRegistry; }
public void setSchemaRegistry(String schemaRegistry) { this.schemaRegistry = schemaRegistry; }
public boolean isSchemaValidation() { return schemaValidation; }
public void setSchemaValidation(boolean schemaValidation) { this.schemaValidation = schemaValidation; }
public Map<String, String> getCustomSerializers() { return customSerializers; }
public void setCustomSerializers(Map<String, String> customSerializers) { this.customSerializers = customSerializers; }
}
@TuskConfig
public class MessageQueueConfig {
private String broker;
private List<String> brokers;
private String topic;
private String groupId;
private int partitions;
private int replicationFactor;
private RetentionConfig retention;
private ConsumerConfig consumer;
private ProducerConfig producer;
// Getters and setters
public String getBroker() { return broker; }
public void setBroker(String broker) { this.broker = broker; }
public List<String> getBrokers() { return brokers; }
public void setBrokers(List<String> brokers) { this.brokers = brokers; }
public String getTopic() { return topic; }
public void setTopic(String topic) { this.topic = topic; }
public String getGroupId() { return groupId; }
public void setGroupId(String groupId) { this.groupId = groupId; }
public int getPartitions() { return partitions; }
public void setPartitions(int partitions) { this.partitions = partitions; }
public int getReplicationFactor() { return replicationFactor; }
public void setReplicationFactor(int replicationFactor) { this.replicationFactor = replicationFactor; }
public RetentionConfig getRetention() { return retention; }
public void setRetention(RetentionConfig retention) { this.retention = retention; }
public ConsumerConfig getConsumer() { return consumer; }
public void setConsumer(ConsumerConfig consumer) { this.consumer = consumer; }
public ProducerConfig getProducer() { return producer; }
public void setProducer(ProducerConfig producer) { this.producer = producer; }
}
@TuskConfig
public class RetentionConfig {
private long timeMs;
private long sizeBytes;
private String cleanupPolicy;
// Getters and setters
public long getTimeMs() { return timeMs; }
public void setTimeMs(long timeMs) { this.timeMs = timeMs; }
public long getSizeBytes() { return sizeBytes; }
public void setSizeBytes(long sizeBytes) { this.sizeBytes = sizeBytes; }
public String getCleanupPolicy() { return cleanupPolicy; }
public void setCleanupPolicy(String cleanupPolicy) { this.cleanupPolicy = cleanupPolicy; }
}
@TuskConfig
public class ConsumerConfig {
private String autoOffsetReset;
private int maxPollRecords;
private int maxPollInterval;
private int sessionTimeout;
private int heartbeatInterval;
private boolean enableAutoCommit;
private int autoCommitInterval;
// Getters and setters
public String getAutoOffsetReset() { return autoOffsetReset; }
public void setAutoOffsetReset(String autoOffsetReset) { this.autoOffsetReset = autoOffsetReset; }
public int getMaxPollRecords() { return maxPollRecords; }
public void setMaxPollRecords(int maxPollRecords) { this.maxPollRecords = maxPollRecords; }
public int getMaxPollInterval() { return maxPollInterval; }
public void setMaxPollInterval(int maxPollInterval) { this.maxPollInterval = maxPollInterval; }
public int getSessionTimeout() { return sessionTimeout; }
public void setSessionTimeout(int sessionTimeout) { this.sessionTimeout = sessionTimeout; }
public int getHeartbeatInterval() { return heartbeatInterval; }
public void setHeartbeatInterval(int heartbeatInterval) { this.heartbeatInterval = heartbeatInterval; }
public boolean isEnableAutoCommit() { return enableAutoCommit; }
public void setEnableAutoCommit(boolean enableAutoCommit) { this.enableAutoCommit = enableAutoCommit; }
public int getAutoCommitInterval() { return autoCommitInterval; }
public void setAutoCommitInterval(int autoCommitInterval) { this.autoCommitInterval = autoCommitInterval; }
}
@TuskConfig
public class ProducerConfig {
private String acks;
private int retries;
private int batchSize;
private int lingerMs;
private int bufferMemory;
private String compressionType;
// Getters and setters
public String getAcks() { return acks; }
public void setAcks(String acks) { this.acks = acks; }
public int getRetries() { return retries; }
public void setRetries(int retries) { this.retries = retries; }
public int getBatchSize() { return batchSize; }
public void setBatchSize(int batchSize) { this.batchSize = batchSize; }
public int getLingerMs() { return lingerMs; }
public void setLingerMs(int lingerMs) { this.lingerMs = lingerMs; }
public int getBufferMemory() { return bufferMemory; }
public void setBufferMemory(int bufferMemory) { this.bufferMemory = bufferMemory; }
public String getCompressionType() { return compressionType; }
public void setCompressionType(String compressionType) { this.compressionType = compressionType; }
}
@TuskConfig
public class EventHandlersConfig {
private Map<String, String> handlers;
private ThreadPoolConfig threadPool;
private RetryConfig retry;
private DeadLetterConfig deadLetter;
// Getters and setters
public Map<String, String> getHandlers() { return handlers; }
public void setHandlers(Map<String, String> handlers) { this.handlers = handlers; }
public ThreadPoolConfig getThreadPool() { return threadPool; }
public void setThreadPool(ThreadPoolConfig threadPool) { this.threadPool = threadPool; }
public RetryConfig getRetry() { return retry; }
public void setRetry(RetryConfig retry) { this.retry = retry; }
public DeadLetterConfig getDeadLetter() { return deadLetter; }
public void setDeadLetter(DeadLetterConfig deadLetter) { this.deadLetter = deadLetter; }
}
@TuskConfig
public class ThreadPoolConfig {
private int coreSize;
private int maxSize;
private int queueCapacity;
private int keepAliveSeconds;
private String threadNamePrefix;
// Getters and setters
public int getCoreSize() { return coreSize; }
public void setCoreSize(int coreSize) { this.coreSize = coreSize; }
public int getMaxSize() { return maxSize; }
public void setMaxSize(int maxSize) { this.maxSize = maxSize; }
public int getQueueCapacity() { return queueCapacity; }
public void setQueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; }
public int getKeepAliveSeconds() { return keepAliveSeconds; }
public void setKeepAliveSeconds(int keepAliveSeconds) { this.keepAliveSeconds = keepAliveSeconds; }
public String getThreadNamePrefix() { return threadNamePrefix; }
public void setThreadNamePrefix(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; }
}
@TuskConfig
public class RetryConfig {
private int maxAttempts;
private long initialDelay;
private long maxDelay;
private double multiplier;
private List<String> retryableExceptions;
// Getters and setters
public int getMaxAttempts() { return maxAttempts; }
public void setMaxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; }
public long getInitialDelay() { return initialDelay; }
public void setInitialDelay(long initialDelay) { this.initialDelay = initialDelay; }
public long getMaxDelay() { return maxDelay; }
public void setMaxDelay(long maxDelay) { this.maxDelay = maxDelay; }
public double getMultiplier() { return multiplier; }
public void setMultiplier(double multiplier) { this.multiplier = multiplier; }
public List<String> getRetryableExceptions() { return retryableExceptions; }
public void setRetryableExceptions(List<String> retryableExceptions) { this.retryableExceptions = retryableExceptions; }
}
@TuskConfig
public class DeadLetterConfig {
private boolean enabled;
private String topic;
private int maxRetries;
private String handler;
// Getters and setters
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public String getTopic() { return topic; }
public void setTopic(String topic) { this.topic = topic; }
public int getMaxRetries() { return maxRetries; }
public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; }
public String getHandler() { return handler; }
public void setHandler(String handler) { this.handler = handler; }
}
@TuskConfig
public class ProjectionConfig {
private boolean enabled;
private String type;
private String database;
private String collection;
private Map<String, String> handlers;
private BatchConfig batch;
// Getters and setters
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getDatabase() { return database; }
public void setDatabase(String database) { this.database = database; }
public String getCollection() { return collection; }
public void setCollection(String collection) { this.collection = collection; }
public Map<String, String> getHandlers() { return handlers; }
public void setHandlers(Map<String, String> handlers) { this.handlers = handlers; }
public BatchConfig getBatch() { return batch; }
public void setBatch(BatchConfig batch) { this.batch = batch; }
}
@TuskConfig
public class BatchConfig {
private int size;
private long timeout;
private boolean enabled;
// Getters and setters
public int getSize() { return size; }
public void setSize(int size) { this.size = size; }
public long getTimeout() { return timeout; }
public void setTimeout(long timeout) { this.timeout = timeout; }
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
}
@TuskConfig
public class CqrsConfig {
private boolean enabled;
private CommandConfig command;
private QueryConfig query;
private EventBusConfig eventBus;
// Getters and setters
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public CommandConfig getCommand() { return command; }
public void setCommand(CommandConfig command) { this.command = command; }
public QueryConfig getQuery() { return query; }
public void setQuery(QueryConfig query) { this.query = query; }
public EventBusConfig getEventBus() { return eventBus; }
public void setEventBus(EventBusConfig eventBus) { this.eventBus = eventBus; }
}
@TuskConfig
public class CommandConfig {
private String handlerPackage;
private boolean validationEnabled;
private String validationPackage;
private Map<String, String> commandHandlers;
// Getters and setters
public String getHandlerPackage() { return handlerPackage; }
public void setHandlerPackage(String handlerPackage) { this.handlerPackage = handlerPackage; }
public boolean isValidationEnabled() { return validationEnabled; }
public void setValidationEnabled(boolean validationEnabled) { this.validationEnabled = validationEnabled; }
public String getValidationPackage() { return validationPackage; }
public void setValidationPackage(String validationPackage) { this.validationPackage = validationPackage; }
public Map<String, String> getCommandHandlers() { return commandHandlers; }
public void setCommandHandlers(Map<String, String> commandHandlers) { this.commandHandlers = commandHandlers; }
}
@TuskConfig
public class QueryConfig {
private String handlerPackage;
private boolean cachingEnabled;
private String cacheProvider;
private int cacheTtl;
private Map<String, String> queryHandlers;
// Getters and setters
public String getHandlerPackage() { return handlerPackage; }
public void setHandlerPackage(String handlerPackage) { this.handlerPackage = handlerPackage; }
public boolean isCachingEnabled() { return cachingEnabled; }
public void setCachingEnabled(boolean cachingEnabled) { this.cachingEnabled = cachingEnabled; }
public String getCacheProvider() { return cacheProvider; }
public void setCacheProvider(String cacheProvider) { this.cacheProvider = cacheProvider; }
public int getCacheTtl() { return cacheTtl; }
public void setCacheTtl(int cacheTtl) { this.cacheTtl = cacheTtl; }
public Map<String, String> getQueryHandlers() { return queryHandlers; }
public void setQueryHandlers(Map<String, String> queryHandlers) { this.queryHandlers = queryHandlers; }
}
@TuskConfig
public class EventBusConfig {
private String type;
private String connectionString;
private int maxConcurrency;
private boolean asyncEnabled;
// Getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getConnectionString() { return connectionString; }
public void setConnectionString(String connectionString) { this.connectionString = connectionString; }
public int getMaxConcurrency() { return maxConcurrency; }
public void setMaxConcurrency(int maxConcurrency) { this.maxConcurrency = maxConcurrency; }
public boolean isAsyncEnabled() { return asyncEnabled; }
public void setAsyncEnabled(boolean asyncEnabled) { this.asyncEnabled = asyncEnabled; }
}
@TuskConfig
public class MonitoringConfig {
private String prometheusEndpoint;
private boolean enabled;
private Map<String, String> labels;
private int scrapeInterval;
private AlertingConfig alerting;
// Getters and setters
public String getPrometheusEndpoint() { return prometheusEndpoint; }
public void setPrometheusEndpoint(String prometheusEndpoint) { this.prometheusEndpoint = prometheusEndpoint; }
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public Map<String, String> getLabels() { return labels; }
public void setLabels(Map<String, String> labels) { this.labels = labels; }
public int getScrapeInterval() { return scrapeInterval; }
public void setScrapeInterval(int scrapeInterval) { this.scrapeInterval = scrapeInterval; }
public AlertingConfig getAlerting() { return alerting; }
public void setAlerting(AlertingConfig alerting) { this.alerting = alerting; }
}
@TuskConfig
public class AlertingConfig {
private String slackWebhook;
private String emailEndpoint;
private Map<String, AlertRule> rules;
// Getters and setters
public String getSlackWebhook() { return slackWebhook; }
public void setSlackWebhook(String slackWebhook) { this.slackWebhook = slackWebhook; }
public String getEmailEndpoint() { return emailEndpoint; }
public void setEmailEndpoint(String emailEndpoint) { this.emailEndpoint = emailEndpoint; }
public Map<String, AlertRule> getRules() { return rules; }
public void setRules(Map<String, AlertRule> rules) { this.rules = rules; }
}
@TuskConfig
public class AlertRule {
private String condition;
private String threshold;
private String duration;
private List<String> channels;
private String severity;
// Getters and setters
public String getCondition() { return condition; }
public void setCondition(String condition) { this.condition = condition; }
public String getThreshold() { return threshold; }
public void setThreshold(String threshold) { this.threshold = threshold; }
public String getDuration() { return duration; }
public void setDuration(String duration) { this.duration = duration; }
public List<String> getChannels() { return channels; }
public void setChannels(List<String> channels) { this.channels = channels; }
public String getSeverity() { return severity; }
public void setSeverity(String severity) { this.severity = severity; }
}
🏗️ Event-Driven TuskLang Configuration
event-driven.tsk
Event-Driven Architecture Configuration
[application]
name: "user-service"
version: "2.1.0"
environment: @env("ENVIRONMENT", "production")[event_store]
type: "mongodb"
connection_string: @env("EVENT_STORE_CONNECTION", "mongodb://eventstore:27017")
database: "eventstore"
collection: "events"
[snapshot]
enabled: true
interval: 1000
storage: "mongodb"
[compression]
enabled: true
algorithm: "gzip"
level: 6
[serialization]
format: "json"
schema_registry: @env("SCHEMA_REGISTRY_URL", "http://schema-registry:8081")
schema_validation: true
custom_serializers {
"UserCreated": "com.example.serializers.UserCreatedSerializer"
"UserUpdated": "com.example.serializers.UserUpdatedSerializer"
"UserDeleted": "com.example.serializers.UserDeletedSerializer"
}
[message_queue]
broker: "kafka"
brokers: [
"kafka-1:9092",
"kafka-2:9092",
"kafka-3:9092"
]
topic: "user-events"
group_id: "user-service-group"
partitions: 6
replication_factor: 3
[retention]
time_ms: 604800000
size_bytes: 1073741824
cleanup_policy: "delete"
[consumer]
auto_offset_reset: "earliest"
max_poll_records: 500
max_poll_interval: 300000
session_timeout: 30000
heartbeat_interval: 3000
enable_auto_commit: false
auto_commit_interval: 5000
[producer]
acks: "all"
retries: 3
batch_size: 16384
linger_ms: 100
buffer_memory: 33554432
compression_type: "gzip"
[event_handlers]
handlers {
"UserCreated": "com.example.handlers.UserCreatedHandler"
"UserUpdated": "com.example.handlers.UserUpdatedHandler"
"UserDeleted": "com.example.handlers.UserDeletedHandler"
"UserActivated": "com.example.handlers.UserActivatedHandler"
"UserDeactivated": "com.example.handlers.UserDeactivatedHandler"
}
[thread_pool]
core_size: 10
max_size: 50
queue_capacity: 1000
keep_alive_seconds: 60
thread_name_prefix: "event-handler-"
[retry]
max_attempts: 3
initial_delay: 1000
max_delay: 10000
multiplier: 2.0
retryable_exceptions: [
"java.net.SocketTimeoutException",
"java.net.ConnectException",
"org.apache.kafka.common.errors.TimeoutException"
]
[dead_letter]
enabled: true
topic: "user-events-dlq"
max_retries: 3
handler: "com.example.handlers.DeadLetterHandler"
[projections]
enabled: true
type: "mongodb"
database: "projections"
collection: "user_projections"
handlers {
"UserCreated": "com.example.projections.UserCreatedProjection"
"UserUpdated": "com.example.projections.UserUpdatedProjection"
"UserDeleted": "com.example.projections.UserDeletedProjection"
}
[batch]
size: 100
timeout: 5000
enabled: true
[cqrs]
enabled: true
[command]
handler_package: "com.example.commands"
validation_enabled: true
validation_package: "com.example.validators"
command_handlers {
"CreateUser": "com.example.commands.CreateUserHandler"
"UpdateUser": "com.example.commands.UpdateUserHandler"
"DeleteUser": "com.example.commands.DeleteUserHandler"
"ActivateUser": "com.example.commands.ActivateUserHandler"
}
[query]
handler_package: "com.example.queries"
caching_enabled: true
cache_provider: "redis"
cache_ttl: 300
query_handlers {
"GetUser": "com.example.queries.GetUserHandler"
"GetUsers": "com.example.queries.GetUsersHandler"
"SearchUsers": "com.example.queries.SearchUsersHandler"
}
[event_bus]
type: "kafka"
connection_string: @env("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092")
max_concurrency: 10
async_enabled: true
[monitoring]
prometheus_endpoint: "/actuator/prometheus"
enabled: true
labels {
service: "user-service"
version: "2.1.0"
environment: @env("ENVIRONMENT", "production")
}
scrape_interval: 15
[alerting]
slack_webhook: @env.secure("SLACK_WEBHOOK")
email_endpoint: @env("ALERT_EMAIL")
[rules]
high_event_lag {
condition: "event_lag > 1000"
threshold: "1000 events"
duration: "5m"
channels: ["slack", "email"]
severity: "warning"
}
event_processing_failure {
condition: "event_processing_errors > 0.05"
threshold: "5%"
duration: "3m"
channels: ["slack", "email"]
severity: "critical"
}
dead_letter_queue_growth {
condition: "dlq_size > 100"
threshold: "100 messages"
duration: "10m"
channels: ["slack"]
severity: "warning"
}
Dynamic event-driven configuration
[monitoring]
event_count: @query("SELECT COUNT(*) FROM events WHERE aggregate_type = 'User'")
snapshot_count: @query("SELECT COUNT(*) FROM snapshots WHERE aggregate_type = 'User'")
projection_lag: @metrics("projection_lag_events", 0)
event_size_avg: @metrics("event_size_bytes", 0)
consumer_lag: @metrics("kafka_consumer_lag", 0)
processing_rate: @metrics("events_processed_per_second", 0)
error_rate: @metrics("event_processing_errors_total", 0)
📊 Event Sourcing Implementation
Event Store Configuration
import org.tusklang.java.TuskLang;
import org.tusklang.java.config.TuskConfig;@TuskConfig
public class EventStoreConfig {
private String type;
private String connectionString;
private String database;
private String collection;
private SnapshotConfig snapshot;
private SerializationConfig serialization;
private IndexConfig indexes;
// Getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getConnectionString() { return connectionString; }
public void setConnectionString(String connectionString) { this.connectionString = connectionString; }
public String getDatabase() { return database; }
public void setDatabase(String database) { this.database = database; }
public String getCollection() { return collection; }
public void setCollection(String collection) { this.collection = collection; }
public SnapshotConfig getSnapshot() { return snapshot; }
public void setSnapshot(SnapshotConfig snapshot) { this.snapshot = snapshot; }
public SerializationConfig getSerialization() { return serialization; }
public void setSerialization(SerializationConfig serialization) { this.serialization = serialization; }
public IndexConfig getIndexes() { return indexes; }
public void setIndexes(IndexConfig indexes) { this.indexes = indexes; }
}
@TuskConfig
public class IndexConfig {
private List<String> indexes;
private boolean autoCreate;
private String indexType;
// Getters and setters
public List<String> getIndexes() { return indexes; }
public void setIndexes(List<String> indexes) { this.indexes = indexes; }
public boolean isAutoCreate() { return autoCreate; }
public void setAutoCreate(boolean autoCreate) { this.autoCreate = autoCreate; }
public String getIndexType() { return indexType; }
public void setIndexType(String indexType) { this.indexType = indexType; }
}
event-store.tsk
[event_store]
type: "mongodb"
connection_string: @env("EVENT_STORE_CONNECTION", "mongodb://eventstore:27017")
database: "eventstore"
collection: "events"[snapshot]
enabled: true
interval: 1000
storage: "mongodb"
[compression]
enabled: true
algorithm: "gzip"
level: 6
[serialization]
format: "json"
schema_registry: @env("SCHEMA_REGISTRY_URL", "http://schema-registry:8081")
schema_validation: true
[indexes]
indexes: [
"aggregate_id",
"aggregate_type",
"event_type",
"timestamp",
"version"
]
auto_create: true
index_type: "btree"
Event store monitoring
[monitoring]
event_count: @query("SELECT COUNT(*) FROM events WHERE aggregate_type = 'User'")
snapshot_count: @query("SELECT COUNT(*) FROM snapshots WHERE aggregate_type = 'User'")
storage_size: @metrics("event_store_size_bytes", 0)
write_rate: @metrics("events_written_per_second", 0)
read_rate: @metrics("events_read_per_second", 0)
🔄 CQRS Implementation
Command and Query Separation
import org.tusklang.java.TuskLang;
import org.tusklang.java.config.TuskConfig;@TuskConfig
public class CqrsConfig {
private boolean enabled;
private CommandConfig command;
private QueryConfig query;
private EventBusConfig eventBus;
private ReadModelConfig readModel;
// Getters and setters
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public CommandConfig getCommand() { return command; }
public void setCommand(CommandConfig command) { this.command = command; }
public QueryConfig getQuery() { return query; }
public void setQuery(QueryConfig query) { this.query = query; }
public EventBusConfig getEventBus() { return eventBus; }
public void setEventBus(EventBusConfig eventBus) { this.eventBus = eventBus; }
public ReadModelConfig getReadModel() { return readModel; }
public void setReadModel(ReadModelConfig readModel) { this.readModel = readModel; }
}
@TuskConfig
public class ReadModelConfig {
private String type;
private String connectionString;
private String database;
private String collection;
private CacheConfig cache;
private ReplicationConfig replication;
// Getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getConnectionString() { return connectionString; }
public void setConnectionString(String connectionString) { this.connectionString = connectionString; }
public String getDatabase() { return database; }
public void setDatabase(String database) { this.database = database; }
public String getCollection() { return collection; }
public void setCollection(String collection) { this.collection = collection; }
public CacheConfig getCache() { return cache; }
public void setCache(CacheConfig cache) { this.cache = cache; }
public ReplicationConfig getReplication() { return replication; }
public void setReplication(ReplicationConfig replication) { this.replication = replication; }
}
@TuskConfig
public class CacheConfig {
private boolean enabled;
private String provider;
private String host;
private int port;
private int ttl;
private int maxSize;
// Getters and setters
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public String getProvider() { return provider; }
public void setProvider(String provider) { this.provider = provider; }
public String getHost() { return host; }
public void setHost(String host) { this.host = host; }
public int getPort() { return port; }
public void setPort(int port) { this.port = port; }
public int getTtl() { return ttl; }
public void setTtl(int ttl) { this.ttl = ttl; }
public int getMaxSize() { return maxSize; }
public void setMaxSize(int maxSize) { this.maxSize = maxSize; }
}
@TuskConfig
public class ReplicationConfig {
private boolean enabled;
private String strategy;
private int factor;
private boolean syncReplication;
// Getters and setters
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public String getStrategy() { return strategy; }
public void setStrategy(String strategy) { this.strategy = strategy; }
public int getFactor() { return factor; }
public void setFactor(int factor) { this.factor = factor; }
public boolean isSyncReplication() { return syncReplication; }
public void setSyncReplication(boolean syncReplication) { this.syncReplication = syncReplication; }
}
cqrs.tsk
[cqrs]
enabled: true[command]
handler_package: "com.example.commands"
validation_enabled: true
validation_package: "com.example.validators"
command_handlers {
"CreateUser": "com.example.commands.CreateUserHandler"
"UpdateUser": "com.example.commands.UpdateUserHandler"
"DeleteUser": "com.example.commands.DeleteUserHandler"
"ActivateUser": "com.example.commands.ActivateUserHandler"
}
[query]
handler_package: "com.example.queries"
caching_enabled: true
cache_provider: "redis"
cache_ttl: 300
query_handlers {
"GetUser": "com.example.queries.GetUserHandler"
"GetUsers": "com.example.queries.GetUsersHandler"
"SearchUsers": "com.example.queries.SearchUsersHandler"
}
[read_model]
type: "mongodb"
connection_string: @env("READ_MODEL_CONNECTION", "mongodb://readmodel:27017")
database: "readmodel"
collection: "users"
[cache]
enabled: true
provider: "redis"
host: @env("REDIS_HOST", "redis")
port: @env("REDIS_PORT", "6379")
ttl: 300
max_size: 10000
[replication]
enabled: true
strategy: "eventual"
factor: 3
sync_replication: false
[event_bus]
type: "kafka"
connection_string: @env("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092")
max_concurrency: 10
async_enabled: true
CQRS monitoring
[monitoring]
command_count: @metrics("commands_processed_total", 0)
query_count: @metrics("queries_processed_total", 0)
cache_hit_rate: @metrics("cache_hit_rate_percent", 85)
read_model_lag: @metrics("read_model_lag_events", 0)
write_model_lag: @metrics("write_model_lag_events", 0)
🎯 Best Practices
1. Event Design
- Use descriptive event names - Include all necessary data - Version events properly - Keep events immutable2. Event Sourcing
- Use snapshots for performance - Implement proper projections - Handle event versioning - Monitor event store performance3. CQRS
- Separate read and write models - Use appropriate consistency levels - Implement proper caching - Monitor read model lag4. Message Queues
- Use appropriate partitioning - Configure replication - Implement dead letter queues - Monitor consumer lag5. Event Processing
- Handle failures gracefully - Implement retry mechanisms - Use idempotent handlers - Monitor processing performance🔧 Troubleshooting
Common Issues
1. Event Processing Failures - Check event handler configuration - Review retry settings - Monitor dead letter queue - Verify event schema
2. Read Model Lag - Check projection performance - Monitor event processing rate - Review database performance - Verify network connectivity
3. Message Queue Issues - Monitor consumer lag - Check partition distribution - Review retention policies - Verify broker connectivity
4. Event Store Problems - Check storage capacity - Monitor write performance - Review index configuration - Verify backup procedures
Debug Commands
Check event store
curl -X GET http://eventstore:2113/streams/user-eventsMonitor Kafka consumer
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group user-service-groupCheck projections
curl -X GET http://projections:8080/projectionsMonitor event processing
curl -X GET http://user-service:8080/actuator/metrics/events.processed
🚀 Next Steps
1. Implement event sourcing for data consistency 2. Set up CQRS for read/write separation 3. Configure message queues for async processing 4. Implement projections for read models 5. Monitor event processing for reliability
---
Ready to build reactive event-driven systems with TuskLang Java? The future of distributed systems is here, and it's event-driven!