Skip to main content

EAF-Specific Patterns

This module covers advanced patterns specific to the EAF platform, including multi-tenancy strategies, security context handling, and NATS integration patterns.

📚 Learning Objectives

By the end of this module, you will be able to:

  • Implement advanced multi-tenant patterns with Axon
  • Handle security context propagation correctly
  • Design NATS integration for reliable event distribution
  • Use EAF SDKs effectively with Axon Framework
  • Implement cross-service communication patterns

🏢 Advanced Multi-Tenancy Patterns

Tenant-Aware Command Gateway

@Component
class EafTenantAwareCommandGateway(
private val commandGateway: CommandGateway,
private val tenantContextHolder: TenantContextHolder,
private val securityContextAccessor: SecurityContextAccessor
) {

private val logger = LoggerFactory.getLogger(EafTenantAwareCommandGateway::class.java)

fun <R> send(command: Any): CompletableFuture<R> {
val tenantId = tenantContextHolder.getCurrentTenantId()
?: throw IllegalStateException("No tenant context available")

val securityContext = securityContextAccessor.getCurrentSecurityContext()
val userId = securityContext?.userId
?: throw IllegalStateException("No authenticated user")

// Validate tenant access
validateTenantAccess(tenantId, userId)

// Create enhanced metadata
val metadata = MetaData.with(mapOf(
"tenant_id" to tenantId,
"user_id" to userId,
"correlation_id" to UUID.randomUUID().toString(),
"timestamp" to Instant.now().toString(),
"ip_address" to securityContext.ipAddress,
"user_agent" to securityContext.userAgent
))

logger.debug(
"Sending command {} for tenant {} by user {}",
command::class.simpleName,
tenantId,
userId
)

return commandGateway.send(command, metadata)
}

fun <R> sendAndWait(command: Any): R {
return send<R>(command).get()
}

fun <R> sendAndWait(command: Any, timeout: Duration): R {
return send<R>(command).get(timeout.toMillis(), TimeUnit.MILLISECONDS)
}

private fun validateTenantAccess(tenantId: String, userId: String) {
// Integration with IAM service to validate tenant access
val hasAccess = iamClientService.hasUserAccessToTenant(userId, tenantId)
if (!hasAccess) {
throw TenantAccessDeniedException("User $userId does not have access to tenant $tenantId")
}
}
}

Tenant-Isolated Event Processing

@Component
@ProcessingGroup("tenant-isolated-projections")
class TenantIsolatedProjectionHandler(
private val userProjectionRepository: UserProjectionRepository,
private val tenantMetricsService: TenantMetricsService
) {

@EventHandler
fun on(
event: UserCreatedEvent,
@MetaData("tenant_id") tenantId: String,
@MetaData("user_id") actingUserId: String
) {
// Validate tenant isolation
require(event.tenantId == tenantId) {
"Event tenant ID (${event.tenantId}) does not match metadata tenant ID ($tenantId)"
}

// Create projection with explicit tenant isolation
val projection = UserProjection(
userId = event.userId,
tenantId = tenantId, // Always use metadata tenant ID
email = event.email,
firstName = event.firstName,
lastName = event.lastName,
// ... other fields
)

// Save with tenant validation
userProjectionRepository.saveWithTenantValidation(projection, tenantId)

// Update tenant metrics
tenantMetricsService.incrementUserCount(tenantId)

// Log with tenant context
logger.info(
"Created user projection for user {} in tenant {} by user {}",
event.userId,
tenantId,
actingUserId
)
}
}

// Repository with tenant validation
interface TenantValidatedUserProjectionRepository : UserProjectionRepository {

fun saveWithTenantValidation(projection: UserProjection, expectedTenantId: String): UserProjection {
require(projection.tenantId == expectedTenantId) {
"Projection tenant ID does not match expected tenant ID"
}
return save(projection)
}

fun findByIdWithTenantValidation(id: String, tenantId: String): UserProjection? {
return findById(id).takeIf { it?.tenantId == tenantId }
}
}

Cross-Tenant Data Protection

@Component
class TenantDataProtectionAspect {

@Around("@annotation(RequiresTenantIsolation)")
fun enforceTenantIsolation(joinPoint: ProceedingJoinPoint): Any? {
val currentTenantId = tenantContextHolder.getCurrentTenantId()
?: throw IllegalStateException("No tenant context")

// Validate all arguments that implement TenantAware
joinPoint.args.forEach { arg ->
if (arg is TenantAware && arg.tenantId != currentTenantId) {
throw TenantIsolationViolationException(
"Cross-tenant access attempt: expected $currentTenantId, got ${arg.tenantId}"
)
}
}

return joinPoint.proceed()
}
}

interface TenantAware {
val tenantId: String
}

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class RequiresTenantIsolation

🔒 Security Context Integration

Security-Aware Command Handlers

@Component
class SecureUserCommandHandler(
private val repository: Repository<User>,
private val securityContextAccessor: SecurityContextAccessor,
private val authorizationService: AuthorizationService
) {

@CommandHandler
@RequiresTenantIsolation
fun handle(
command: CreateUserCommand,
@MetaData("tenant_id") tenantId: String,
@MetaData("user_id") actingUserId: String
) {
// Validate security context
val securityContext = securityContextAccessor.getCurrentSecurityContext()
?: throw SecurityException("No security context available")

// Check permissions
authorizationService.requirePermission(
userId = actingUserId,
tenantId = tenantId,
permission = "USER_CREATE",
resourceContext = mapOf("department" to command.department)
)

// Audit the command
auditService.logCommand(
command = command,
actingUserId = actingUserId,
tenantId = tenantId,
ipAddress = securityContext.ipAddress,
userAgent = securityContext.userAgent
)

try {
// Execute command
val user = User(command)
repository.save(user)

logger.info(
"User {} created successfully by {} in tenant {}",
command.userId,
actingUserId,
tenantId
)

} catch (e: Exception) {
// Log security-relevant failures
auditService.logCommandFailure(
command = command,
actingUserId = actingUserId,
tenantId = tenantId,
error = e.message,
securityContext = securityContext
)
throw e
}
}
}

Role-Based Event Filtering

@Component
@ProcessingGroup("role-based-notifications")
class RoleBasedNotificationHandler(
private val notificationService: NotificationService,
private val userProjectionRepository: UserProjectionRepository
) {

@EventHandler
fun on(
event: UserSuspendedEvent,
@MetaData("tenant_id") tenantId: String
) {
// Find users who should be notified based on roles
val adminUsers = userProjectionRepository
.findByTenantIdAndRole(tenantId, "ADMIN")

val managerUsers = userProjectionRepository
.findByTenantIdAndRole(tenantId, "MANAGER")

// Create targeted notifications
val notifications = (adminUsers + managerUsers).distinct().map { user ->
UserSuspensionNotification(
recipientUserId = user.userId,
suspendedUserId = event.userId,
reason = event.reason,
suspendedBy = event.suspendedBy,
tenantId = tenantId,
notificationLevel = when {
"ADMIN" in user.roles -> NotificationLevel.HIGH
"MANAGER" in user.roles -> NotificationLevel.MEDIUM
else -> NotificationLevel.LOW
}
)
}

// Send notifications asynchronously
notifications.forEach { notification ->
notificationService.sendAsync(notification)
}
}
}

📡 Advanced NATS Integration

Reliable Event Publishing

@Component
@ProcessingGroup("nats-reliable-publisher")
class ReliableNatsEventPublisher(
private val natsEventPublisher: NatsEventPublisher,
private val eventPublishingMetricsService: EventPublishingMetricsService,
private val deadLetterQueueService: DeadLetterQueueService
) {

private val logger = LoggerFactory.getLogger(ReliableNatsEventPublisher::class.java)

@EventHandler
suspend fun on(
event: Any,
@MetaData("tenant_id") tenantId: String,
@MetaData("correlation_id") correlationId: String,
@MetaData("global_sequence_id") sequenceId: Long
) {
val eventType = event::class.simpleName!!
val subject = "events.$eventType"

val startTime = System.currentTimeMillis()
var attempt = 0
val maxAttempts = 3

while (attempt < maxAttempts) {
try {
attempt++

val publishResult = natsEventPublisher.publish(
subject = subject,
tenantId = tenantId,
event = event,
metadata = mapOf(
"correlation_id" to correlationId,
"sequence_id" to sequenceId.toString(),
"attempt" to attempt.toString(),
"max_attempts" to maxAttempts.toString()
)
)

// Record successful publish
eventPublishingMetricsService.recordSuccess(
eventType = eventType,
tenantId = tenantId,
duration = System.currentTimeMillis() - startTime,
attempt = attempt
)

logger.debug(
"Successfully published {} to {} for tenant {} (attempt {})",
eventType,
subject,
tenantId,
attempt
)

return // Success!

} catch (e: Exception) {
logger.warn(
"Failed to publish {} to {} for tenant {} (attempt {}/{}): {}",
eventType,
subject,
tenantId,
attempt,
maxAttempts,
e.message
)

if (attempt >= maxAttempts) {
// Record failure and send to DLQ
eventPublishingMetricsService.recordFailure(
eventType = eventType,
tenantId = tenantId,
error = e.message ?: "Unknown error",
finalAttempt = attempt
)

deadLetterQueueService.sendToDeadLetter(
event = event,
subject = subject,
tenantId = tenantId,
correlationId = correlationId,
sequenceId = sequenceId,
error = e,
attempts = attempt
)

// Don't rethrow - let event processing continue
return
}

// Exponential backoff
delay(1000L * attempt)
}
}
}
}

Event Type Routing

@Component
@ProcessingGroup("nats-event-router")
class NatsEventRouter(
private val natsEventPublisher: NatsEventPublisher
) {

@EventHandler
suspend fun on(event: UserCreatedEvent, @MetaData("tenant_id") tenantId: String) {
// Route to multiple subjects based on event content
val routingTargets = determineRoutingTargets(event, tenantId)

routingTargets.forEach { target ->
natsEventPublisher.publish(
subject = target.subject,
tenantId = tenantId,
event = event,
metadata = target.metadata
)
}
}

@EventHandler
suspend fun on(event: UserSuspendedEvent, @MetaData("tenant_id") tenantId: String) {
// High-priority security event
natsEventPublisher.publish(
subject = "security.user.suspended",
tenantId = tenantId,
event = event,
metadata = mapOf(
"priority" to "HIGH",
"category" to "SECURITY",
"requires_immediate_attention" to "true"
)
)

// Also send to general user events
natsEventPublisher.publish(
subject = "events.UserSuspended",
tenantId = tenantId,
event = event
)
}

private fun determineRoutingTargets(
event: UserCreatedEvent,
tenantId: String
): List<RoutingTarget> {
val targets = mutableListOf<RoutingTarget>()

// General user events
targets.add(RoutingTarget(
subject = "events.UserCreated",
metadata = emptyMap()
))

// Department-specific routing
event.department?.let { department ->
targets.add(RoutingTarget(
subject = "department.$department.user.created",
metadata = mapOf("department" to department)
))
}

// Role-based routing
if (event.roles.contains("ADMIN")) {
targets.add(RoutingTarget(
subject = "admin.user.created",
metadata = mapOf("admin_alert" to "true")
))
}

return targets
}
}

data class RoutingTarget(
val subject: String,
val metadata: Map<String, Any> = emptyMap()
)

Event Replay and Recovery

@Component
class EventReplayService(
private val eventStore: EventStore,
private val natsEventPublisher: NatsEventPublisher,
private val tenantContextHolder: TenantContextHolder
) {

suspend fun replayEventsForTenant(
tenantId: String,
fromSequenceId: Long,
toSequenceId: Long? = null,
eventTypes: Set<String> = emptySet()
): ReplayResult {

tenantContextHolder.setCurrentTenantId(tenantId)

try {
val startTime = System.currentTimeMillis()
var processedCount = 0
var errorCount = 0
val errors = mutableListOf<String>()

// Create tracking token for replay
val startToken = GlobalSequenceTrackingToken(fromSequenceId)
val endToken = toSequenceId?.let { GlobalSequenceTrackingToken(it) }

// Stream events
val eventStream = if (endToken != null) {
eventStore.readEvents(startToken, endToken)
} else {
eventStore.readEvents(startToken)
}

eventStream.forEach { eventMessage ->
try {
val eventType = eventMessage.payloadType.simpleName

// Filter by event types if specified
if (eventTypes.isNotEmpty() && eventType !in eventTypes) {
return@forEach
}

// Extract tenant from metadata
val eventTenantId = eventMessage.metaData["tenant_id"] as? String
if (eventTenantId != tenantId) {
return@forEach // Skip events from other tenants
}

// Republish to NATS
natsEventPublisher.publish(
subject = "replay.events.$eventType",
tenantId = tenantId,
event = eventMessage.payload,
metadata = mapOf(
"replay" to "true",
"original_sequence_id" to eventMessage.sequenceNumber.toString(),
"replay_timestamp" to Instant.now().toString()
)
)

processedCount++

} catch (e: Exception) {
errorCount++
errors.add("Event ${eventMessage.identifier}: ${e.message}")
logger.error("Failed to replay event {}: {}", eventMessage.identifier, e.message, e)
}
}

val duration = System.currentTimeMillis() - startTime

return ReplayResult(
tenantId = tenantId,
processedCount = processedCount,
errorCount = errorCount,
errors = errors,
durationMs = duration,
success = errorCount == 0
)

} finally {
tenantContextHolder.clearContext()
}
}
}

data class ReplayResult(
val tenantId: String,
val processedCount: Int,
val errorCount: Int,
val errors: List<String>,
val durationMs: Long,
val success: Boolean
)

🔄 Cross-Service Communication

Service-to-Service Commands

@Component
class CrossServiceCommandHandler(
private val licenseServiceClient: LicenseServiceClient,
private val commandGateway: CommandGateway
) {

@EventHandler
suspend fun on(
event: UserCreatedEvent,
@MetaData("tenant_id") tenantId: String
) {
try {
// Check if user needs license allocation
val licenseRequirement = determineLicenseRequirement(event)

if (licenseRequirement != null) {
// Send command to License Service
val allocateCommand = AllocateLicenseCommand(
userId = event.userId,
tenantId = tenantId,
licenseType = licenseRequirement.type,
reason = "New user creation"
)

val result = licenseServiceClient.allocateLicense(allocateCommand)

if (result.isSuccess) {
// Send confirmation command locally
commandGateway.send(ConfirmUserLicenseAllocationCommand(
userId = event.userId,
licenseId = result.licenseId,
allocatedAt = result.allocatedAt
))
} else {
// Handle license allocation failure
commandGateway.send(HandleLicenseAllocationFailureCommand(
userId = event.userId,
reason = result.errorMessage
))
}
}

} catch (e: Exception) {
logger.error(
"Failed to process license allocation for user {} in tenant {}: {}",
event.userId,
tenantId,
e.message,
e
)

// Send failure command
commandGateway.send(HandleLicenseAllocationFailureCommand(
userId = event.userId,
reason = e.message ?: "Unknown error"
))
}
}
}

Circuit Breaker Integration

@Component
class ResilientServiceIntegrationHandler(
private val externalServiceClient: ExternalServiceClient,
private val circuitBreakerFactory: CircuitBreakerFactory
) {

private val circuitBreaker = circuitBreakerFactory.create("external-service")

@EventHandler
suspend fun on(
event: UserCreatedEvent,
@MetaData("tenant_id") tenantId: String
) {
try {
// Use circuit breaker for external calls
val result = circuitBreaker.executeSupplier {
externalServiceClient.notifyUserCreation(
tenantId = tenantId,
userId = event.userId,
email = event.email
)
}

if (result.isSuccess) {
logger.info("Successfully notified external service of user creation: {}", event.userId)
} else {
logger.warn("External service notification failed: {}", result.errorMessage)
}

} catch (e: CallNotPermittedException) {
// Circuit breaker is open
logger.warn(
"Circuit breaker open for external service - skipping notification for user {}",
event.userId
)

// Could queue for retry later
retryQueueService.queueForRetry(event, "external-service-notification")

} catch (e: Exception) {
logger.error(
"Failed to notify external service of user creation {}: {}",
event.userId,
e.message,
e
)
}
}
}

📊 Monitoring and Observability

Event Processing Metrics

@Component
class AxonMetricsCollector(
private val meterRegistry: MeterRegistry
) {

private val commandCounter = Counter.builder("axon.commands.processed")
.description("Number of commands processed")
.register(meterRegistry)

private val eventCounter = Counter.builder("axon.events.published")
.description("Number of events published")
.register(meterRegistry)

private val projectionTimer = Timer.builder("axon.projections.processing.duration")
.description("Time taken to process projection updates")
.register(meterRegistry)

@EventHandler
fun recordEvent(event: Any, @MetaData("tenant_id") tenantId: String) {
eventCounter.increment(
Tags.of(
Tag.of("event_type", event::class.simpleName!!),
Tag.of("tenant_id", tenantId)
)
)
}

@EventInterceptor
fun recordProjectionProcessing(event: Any, @MetaData metadata: MetaData): Any {
val sample = Timer.start(meterRegistry)

return object : EventMessage<Any> by event as EventMessage<Any> {
override fun getMetaData(): MetaData {
sample.stop(projectionTimer)
return metadata
}
}
}
}

Health Checks

@Component
class AxonHealthIndicator(
private val eventProcessingConfiguration: EventProcessingConfiguration,
private val eventStore: EventStore,
private val natsConnection: Connection
) : HealthIndicator {

override fun health(): Health {
val builder = Health.Builder()

try {
// Check event processors
val processors = eventProcessingConfiguration.eventProcessors()
val runningProcessors = processors.filter { it.value.isRunning }
val stoppedProcessors = processors.filter { !it.value.isRunning }

if (stoppedProcessors.isNotEmpty()) {
builder.down()
.withDetail("stopped_processors", stoppedProcessors.keys)
} else {
builder.up()
}

builder.withDetail("total_processors", processors.size)
.withDetail("running_processors", runningProcessors.size)

// Check event store connectivity
try {
eventStore.createHeadToken()
builder.withDetail("event_store", "UP")
} catch (e: Exception) {
builder.down().withDetail("event_store", "DOWN: ${e.message}")
}

// Check NATS connectivity
when (natsConnection.status) {
Connection.Status.CONNECTED -> builder.withDetail("nats", "UP")
else -> builder.down().withDetail("nats", "DOWN: ${natsConnection.status}")
}

} catch (e: Exception) {
builder.down().withException(e)
}

return builder.build()
}
}

🎯 Best Practices Summary

✅ Multi-Tenancy Do's

  • Always validate tenant context in commands and events
  • Use tenant-aware repositories and queries
  • Implement proper tenant isolation checks
  • Include tenant ID in all metadata and logging

✅ Security Do's

  • Integrate with EAF IAM for authorization
  • Propagate security context through metadata
  • Audit all security-relevant operations
  • Implement proper error handling for security failures

✅ NATS Integration Do's

  • Use reliable publishing with retries
  • Implement proper dead letter queue handling
  • Route events based on content and tenant
  • Monitor publishing success rates

❌ Common Pitfalls

  • Don't bypass tenant validation
  • Don't ignore security context
  • Don't forget error handling in async operations
  • Don't skip monitoring and metrics

🚀 Next Steps

You've mastered EAF-specific patterns! Next, let's explore advanced coordination:

Next Module: Sagas & Process Managers

Topics covered next:

  • Long-running process management
  • Cross-aggregate coordination
  • Compensation patterns
  • Timeout and deadline handling

💡 Key Takeaway: EAF patterns ensure security, isolation, and reliability across all Axon Framework operations!