EAF Integration Overview
This module covers how Axon Framework integrates with EAF infrastructure, leveraging existing components while adding event sourcing capabilities.
📚 Learning Objectives
By the end of this module, you will understand:
- How EAF's custom EventStorageEngine works with PostgreSQL
- Multi-tenant event handling with TenantContextHolder
- NATS integration for event distribution
- Security context propagation in commands and events
- Global sequence tracking for event ordering
🏗️ EAF Integration Architecture
EAF's Axon integration bridges Axon Framework with existing EAF infrastructure:
🔧 Custom EventStorageEngine Implementation
EAF uses a custom EventStorageEngine
that bridges Axon with the existing EAF Event Store:
EventStorageEngine Adapter
@Component
class EafPostgresEventStorageEngine(
private val eafEventStore: EafEventStoreSdk,
private val eventMessageMapper: AxonEventMessageMapper,
private val tenantContextHolder: TenantContextHolder,
private val persistenceExceptionResolver: PersistenceExceptionResolver
) : EventStorageEngine {
private val logger = LoggerFactory.getLogger(EafPostgresEventStorageEngine::class.java)
override fun appendEvents(events: List<DomainEventMessage<*>>): List<DomainEventMessage<*>> {
val tenantId = tenantContextHolder.getCurrentTenantId()
?: throw IllegalStateException("No tenant context available")
try {
val persistedEvents = events.map { event ->
val persistedEvent = eventMessageMapper.mapToPersistedEvent(event, tenantId)
eafEventStore.appendEvent(persistedEvent)
event
}
logger.debug(
"Appended {} events for tenant {} to aggregate {}",
events.size,
tenantId,
events.first().aggregateIdentifier
)
return persistedEvents
} catch (e: Exception) {
logger.error(
"Failed to append events for tenant {}: {}",
tenantId,
e.message,
e
)
throw persistenceExceptionResolver.resolve(e)
}
}
override fun readEvents(aggregateIdentifier: String): Stream<DomainEventMessage<*>> {
val tenantId = tenantContextHolder.getCurrentTenantId()
?: throw IllegalStateException("No tenant context available")
return try {
val streamId = PersistedEvent.createStreamId("UnknownAggregate", aggregateIdentifier)
eafEventStore.readEventStream(tenantId, streamId)
.map { persistedEvent -> eventMessageMapper.mapToDomainEvent(persistedEvent) }
.stream()
} catch (e: Exception) {
logger.error(
"Failed to read events for aggregate {} in tenant {}: {}",
aggregateIdentifier,
tenantId,
e.message,
e
)
throw persistenceExceptionResolver.resolve(e)
}
}
override fun createTailToken(): TrackingToken {
return GlobalSequenceTrackingToken.createTailToken()
}
override fun createHeadToken(): TrackingToken {
return GlobalSequenceTrackingToken.createHeadToken(eafEventStore.getMaxGlobalSequence())
}
override fun createTokenAt(dateTime: Instant): TrackingToken {
val globalSequence = eafEventStore.findGlobalSequenceAtTimestamp(dateTime)
return GlobalSequenceTrackingToken(globalSequence)
}
}
Event Message Mapping
The AxonEventMessageMapper
handles bidirectional conversion between Axon and EAF events:
@Component
class AxonEventMessageMapper(
private val objectMapper: ObjectMapper
) {
fun mapToPersistedEvent(
event: DomainEventMessage<*>,
tenantId: String
): PersistedEvent {
return PersistedEvent(
eventId = UUID.fromString(event.identifier),
streamId = PersistedEvent.createStreamId(event.type, event.aggregateIdentifier),
aggregateId = event.aggregateIdentifier,
aggregateType = event.type,
sequenceNumber = event.sequenceNumber,
tenantId = tenantId,
eventType = event.payloadType.name,
payload = serializePayload(event.payload),
metadata = serializeMetadata(event.metaData, tenantId, event),
timestampUtc = event.timestamp
)
}
fun mapToDomainEvent(persistedEvent: PersistedEvent): DomainEventMessage<*> {
val payload = deserializePayload(persistedEvent.payload, persistedEvent.eventType)
val metadata = deserializeMetadata(persistedEvent.metadata)
.and("global_sequence_id", persistedEvent.globalSequenceId.toString())
.and("tenant_id", persistedEvent.tenantId)
return GenericDomainEventMessage(
persistedEvent.aggregateType,
persistedEvent.aggregateId,
persistedEvent.sequenceNumber,
payload,
metadata
)
}
}
🏢 Multi-Tenant Support
TenantContextHolder Integration
EAF's multi-tenancy integrates seamlessly with Axon through context propagation:
@Component
class TenantContextHolder {
private val tenantContext = ThreadLocal<String>()
fun setCurrentTenantId(tenantId: String) {
tenantContext.set(tenantId)
MDC.put("tenant_id", tenantId)
}
fun getCurrentTenantId(): String? = tenantContext.get()
fun clearContext() {
tenantContext.remove()
MDC.remove("tenant_id")
}
}
// Tenant-aware Command Gateway
@Component
class TenantAwareCommandGateway(
private val commandGateway: CommandGateway,
private val tenantContextHolder: TenantContextHolder
) {
fun <R> send(command: Any): CompletableFuture<R> {
val tenantId = tenantContextHolder.getCurrentTenantId()
?: throw IllegalStateException("No tenant context")
// Add tenant metadata to command
val metaData = MetaData.with("tenant_id", tenantId)
return commandGateway.send(command, metaData)
}
}
Tenant Isolation in Event Processing
@Component
@ProcessingGroup("tenant-aware-projections")
class TenantAwareProjectionHandler(
private val userProjectionRepository: UserProjectionRepository
) {
@EventHandler
fun on(event: UserCreatedEvent, @MetaData("tenant_id") tenantId: String) {
// Tenant ID is automatically available from metadata
val projection = UserProjection(
userId = event.userId,
tenantId = tenantId, // Ensures proper tenant isolation
email = event.email,
// ... other fields
)
userProjectionRepository.save(projection)
}
}
📡 NATS Integration for Event Distribution
EAF publishes Axon events to NATS for external system integration:
NATS Event Publisher
@Component
@ProcessingGroup("nats-event-publisher")
class NatsEventRelayProcessor(
private val natsEventPublisher: NatsEventPublisher
) {
private val logger = LoggerFactory.getLogger(NatsEventRelayProcessor::class.java)
@EventHandler
suspend fun on(
event: Any,
@MetaData("tenant_id") tenantId: String,
@MetaData("correlation_id") correlationId: String?
) {
try {
val subject = "events.${event.javaClass.simpleName}"
natsEventPublisher.publish(
subject = subject,
tenantId = tenantId,
event = event,
metadata = mapOf(
"correlation_id" to (correlationId ?: UUID.randomUUID().toString()),
"source" to "axon-framework"
)
)
logger.debug(
"Published event {} to NATS subject {} for tenant {}",
event.javaClass.simpleName,
subject,
tenantId
)
} catch (e: Exception) {
logger.error(
"Failed to publish event {} to NATS for tenant {}: {}",
event.javaClass.simpleName,
tenantId,
e.message,
e
)
// Don't rethrow - event processing should continue
}
}
}
NATS Configuration for Axon
@Configuration
class NatsAxonConfiguration {
@Bean
fun natsEventProcessingConfigurer(): EventProcessingConfigurer {
return { configurer ->
configurer.registerTrackingEventProcessor("nats-event-publisher") { config ->
TrackingEventProcessorConfiguration
.forSingleThreadedProcessing()
.andInitialTrackingToken { config.eventStore().createHeadToken() }
.andBatchSize(10) // Process events in small batches
}
}
}
}
🔒 Security Context Integration
EAF's security context propagates through Axon commands and events:
Security Context Provider
@Component
class SecurityContextCorrelationDataProvider : CorrelationDataProvider {
private val logger = LoggerFactory.getLogger(SecurityContextCorrelationDataProvider::class.java)
override fun correlationDataFor(message: Message<*>): Map<String, *> {
val securityContext = SecurityContextHolder.getContext()
val authentication = securityContext.authentication
val correlationData = mutableMapOf<String, Any>()
if (authentication != null && authentication.isAuthenticated) {
correlationData["user_id"] = authentication.name
correlationData["authorities"] = authentication.authorities
.map { it.authority }
.joinToString(",")
}
// Add tenant context if available
val tenantId = MDC.get("tenant_id")
if (tenantId != null) {
correlationData["tenant_id"] = tenantId
}
// Add correlation ID for distributed tracing
correlationData["correlation_id"] = UUID.randomUUID().toString()
correlationData["timestamp"] = Instant.now().toString()
logger.debug("Added correlation data: {}", correlationData.keys)
return correlationData
}
}
Command Authorization
@Component
class UserCommandHandler(
private val repository: Repository<User>
) {
@CommandHandler
@PreAuthorize("hasRole('USER_MANAGEMENT') and #command.tenantId == principal.tenantId")
fun handle(command: CreateUserCommand) {
// Security context automatically enforced via @PreAuthorize
val user = User(command)
repository.save(user)
}
@CommandHandler
@PreAuthorize("hasRole('USER_MANAGEMENT')")
fun handle(command: UpdateUserEmailCommand, @MetaData("tenant_id") tenantId: String) {
// Tenant ID available in metadata for additional validation
val currentTenantId = SecurityContextHolder.getContext()
.authentication
.details as? String
require(tenantId == currentTenantId) { "Cross-tenant operation not allowed" }
val user = repository.load(command.userId)
user.updateEmail(command.newEmail, command.reason)
repository.save(user)
}
}
📈 Global Sequence Tracking
EAF implements GlobalSequenceTrackingToken
for ordered event processing:
GlobalSequenceTrackingToken Implementation
data class GlobalSequenceTrackingToken(
val globalSequence: Long
) : TrackingToken {
companion object {
fun createTailToken(): GlobalSequenceTrackingToken = GlobalSequenceTrackingToken(0L)
fun createHeadToken(maxSequence: Long): GlobalSequenceTrackingToken =
GlobalSequenceTrackingToken(maxSequence)
}
override fun covers(other: TrackingToken): Boolean {
return when (other) {
is GlobalSequenceTrackingToken -> this.globalSequence >= other.globalSequence
else -> false
}
}
override fun unwrap(): Optional<TrackingToken> = Optional.empty()
fun advance(): GlobalSequenceTrackingToken = copy(globalSequence = globalSequence + 1)
}
Token Store Integration
@Component
class PostgresTokenStore(
private val dataSource: DataSource
) : TokenStore {
override fun storeToken(token: TrackingToken, processorName: String, segment: Int) {
val globalToken = token as GlobalSequenceTrackingToken
dataSource.connection.use { connection ->
val sql = """
INSERT INTO tracking_tokens (processor_name, segment, global_sequence, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT (processor_name, segment)
DO UPDATE SET global_sequence = EXCLUDED.global_sequence,
updated_at = EXCLUDED.updated_at
""".trimIndent()
connection.prepareStatement(sql).use { stmt ->
stmt.setString(1, processorName)
stmt.setInt(2, segment)
stmt.setLong(3, globalToken.globalSequence)
stmt.setTimestamp(4, Timestamp.from(Instant.now()))
stmt.executeUpdate()
}
}
}
override fun fetchToken(processorName: String, segment: Int): TrackingToken? {
dataSource.connection.use { connection ->
val sql = "SELECT global_sequence FROM tracking_tokens WHERE processor_name = ? AND segment = ?"
connection.prepareStatement(sql).use { stmt ->
stmt.setString(1, processorName)
stmt.setInt(2, segment)
stmt.executeQuery().use { rs ->
return if (rs.next()) {
GlobalSequenceTrackingToken(rs.getLong("global_sequence"))
} else {
null
}
}
}
}
}
}
🔧 Configuration Example
Complete EAF Axon configuration:
@Configuration
@EnableAxon
class EafAxonConfiguration {
@Bean
fun configurer(
eventStorageEngine: EafPostgresEventStorageEngine,
correlationProviders: List<CorrelationDataProvider>,
tokenStore: TokenStore
): Configurer {
return DefaultConfigurer.defaultConfiguration()
.configureEventStore { eventStorageEngine }
.configureCorrelationDataProviders { correlationProviders }
.configureTokenStore { tokenStore }
.configureEventProcessing { config ->
config.registerTrackingEventProcessor("default") {
TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
}
}
}
@Bean
@Primary
fun commandGateway(configurer: Configurer): CommandGateway {
return configurer.buildConfiguration().commandGateway()
}
@Bean
@Primary
fun queryGateway(configurer: Configurer): QueryGateway {
return configurer.buildConfiguration().queryGateway()
}
}
🎯 Key Integration Benefits
- Seamless Multi-tenancy: Automatic tenant isolation at all levels
- Event Distribution: Automatic NATS publishing for external systems
- Security Integration: Built-in authorization and audit trails
- Performance: Optimized PostgreSQL storage with global sequencing
- Observability: Comprehensive logging and correlation tracking
🚀 Next Steps
Now that you understand EAF integration, let's dive into practical implementation:
Next Module: Aggregates & Commands →
Topics covered next:
- Building EAF-compatible aggregates
- Command validation patterns
- Multi-tenant aggregate design
- Event sourcing best practices
💡 Key Takeaway: EAF's Axon integration preserves all existing infrastructure benefits while adding powerful event sourcing capabilities!