Skip to content

Commit

Permalink
Merge pull request #221 from AxonFramework/bugfix/simple_command_bus_…
Browse files Browse the repository at this point in the history
…reused

Isolate SimpleCommandBus Instances per Tenant
  • Loading branch information
schananas authored Sep 4, 2024
2 parents c8ce38e + 00c012b commit 1611ed5
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 11 deletions.
26 changes: 26 additions & 0 deletions docs/extension-guide/modules/ROOT/pages/configuration.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,29 @@ public MultiTenantEventProcessorPredicate multiTenantEventProcessorPredicate() {
----

This bean should return `true` for each processor that you want to be multi-tenant, and `false` for each processor that you want to be single tenant.

=== Tenant Segment Factories

This extension provides several factory interfaces that are used to create tenant-specific segments for various Axon components, such as Command Bus, Query Bus, Event Store, and Event Scheduler. These factories allow you to configure and customize the behavior of these components for each tenant.

The following tenant segment factories are available:

==== TenantCommandSegmentFactory

This factory is responsible for creating a `CommandBus` instance for each tenant. By default, it creates an `AxonServerCommandBus` that uses a `SimpleCommandBus` as the local segment and connects to Axon Server. You can override this factory to provide a custom implementation of the `CommandBus` for specific tenants.

==== TenantQuerySegmentFactory

This factory creates a `QueryBus` instance for each tenant. By default, it creates an `AxonServerQueryBus` that uses a `SimpleQueryBus` as lhe local segment and connects to Axon Server. You can override this factory to provide a custom implementation of the `QueryBus` for specific tenants.

==== TenantEventSegmentFactory

This factory is responsible for creating an `EventStore` instance for each tenant. By default, it creates an `AxonServerEventStore` that connects to Axon Server. You can override this factory to provide a custom implementation of the `EventStore` for specific tenants.

==== TenantEventSchedulerSegmentFactory

This factory creates an `EventScheduler` instance for each tenant. By default, it creates an `AxonServerEventScheduler` that connects to Axon Server. You can override this factory to provide a custom implementation of the `EventScheduler` for specific tenants.

==== TenantEventProcessorControlSegmentFactory

This factory creates a `TenantDescriptor` for each event processor, which is used to identify the tenant associated with the event processor. By default, it uses the tenant identifier as the `TenantDescriptor`. You can override this factory to provide a custom implementation of the `TenantDescriptor` for specific event processors.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import org.axonframework.axonserver.connector.event.axon.EventProcessorInfoConfiguration;
import org.axonframework.axonserver.connector.query.AxonServerQueryBus;
import org.axonframework.axonserver.connector.query.QueryPriorityCalculator;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.*;
import org.axonframework.commandhandling.distributed.RoutingStrategy;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.config.Configuration;
Expand All @@ -42,12 +41,7 @@
import org.axonframework.extensions.multitenancy.components.queryhandeling.TenantQueryUpdateEmitterSegmentFactory;
import org.axonframework.extensions.multitenancy.components.scheduling.TenantEventSchedulerSegmentFactory;
import org.axonframework.messaging.interceptors.CorrelationDataInterceptor;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryInvocationErrorHandler;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.axonframework.queryhandling.SimpleQueryBus;
import org.axonframework.queryhandling.SimpleQueryUpdateEmitter;
import org.axonframework.queryhandling.*;
import org.axonframework.serialization.Serializer;
import org.axonframework.spring.config.SpringAxonConfiguration;
import org.axonframework.springboot.autoconfig.AxonServerAutoConfiguration;
Expand Down Expand Up @@ -100,21 +94,38 @@ public TenantProvider tenantProvider(Environment env,
axonServerConnectionManager);
}

private SimpleCommandBus localCommandBus(TransactionManager txManager, Configuration axonConfiguration,
DuplicateCommandHandlerResolver duplicateCommandHandlerResolver) {
SimpleCommandBus commandBus =
SimpleCommandBus.builder()
.transactionManager(txManager)
.duplicateCommandHandlerResolver(duplicateCommandHandlerResolver)
.spanFactory(axonConfiguration.getComponent(CommandBusSpanFactory.class))
.messageMonitor(axonConfiguration.messageMonitor(CommandBus.class, "commandBus"))
.build();
commandBus.registerHandlerInterceptor(
new CorrelationDataInterceptor<>(axonConfiguration.correlationDataProviders())
);
return commandBus;
}

@Bean
@ConditionalOnClass(name = "org.axonframework.axonserver.connector.command.AxonServerCommandBus")
public TenantCommandSegmentFactory tenantAxonServerCommandSegmentFactory(
@Qualifier("messageSerializer") Serializer messageSerializer,
@Qualifier("localSegment") CommandBus localSegment,
RoutingStrategy routingStrategy,
CommandPriorityCalculator priorityCalculator,
CommandLoadFactorProvider loadFactorProvider,
TargetContextResolver<? super CommandMessage<?>> targetContextResolver,
AxonServerConfiguration axonServerConfig,
AxonServerConnectionManager connectionManager
AxonServerConnectionManager connectionManager,
TransactionManager txManager, Configuration axonConfiguration,
DuplicateCommandHandlerResolver duplicateCommandHandlerResolver
) {
return tenantDescriptor -> {
SimpleCommandBus localCommandBus = localCommandBus(txManager, axonConfiguration, duplicateCommandHandlerResolver);
AxonServerCommandBus commandBus = AxonServerCommandBus.builder()
.localSegment(localSegment)
.localSegment(localCommandBus)
.serializer(messageSerializer)
.routingStrategy(routingStrategy)
.priorityCalculator(priorityCalculator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package org.axonframework.extensions.multitenancy.autoconfig;

import org.axonframework.axonserver.connector.command.AxonServerCommandBus;
import org.axonframework.axonserver.connector.event.axon.EventProcessorInfoConfiguration;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.extensions.multitenancy.components.TenantDescriptor;
import org.axonframework.extensions.multitenancy.components.commandhandeling.TenantCommandSegmentFactory;
import org.axonframework.extensions.multitenancy.components.eventstore.TenantEventSegmentFactory;
import org.axonframework.extensions.multitenancy.components.queryhandeling.TenantQuerySegmentFactory;
Expand All @@ -33,10 +37,13 @@
import org.axonframework.springboot.autoconfig.TransactionAutoConfiguration;
import org.axonframework.springboot.autoconfig.XStreamAutoConfiguration;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

/**
* Test class validating the autoconfiguration of Axon Server-specific multi-tenancy components.
Expand Down Expand Up @@ -86,6 +93,32 @@ void axonServerAutoConfiguration() {
});
}

@Test
void tenantCommandSegmentFactoryUsesOwnSimpleCommandBus() {
contextRunner.withConfiguration(AutoConfigurations.of(MultiTenancyAxonServerAutoConfiguration.class))
.withConfiguration(AutoConfigurations.of(MultiTenancyAutoConfiguration.class))
.withUserConfiguration(SharedCommandBus.class)
.withPropertyValues("axon.axonserver.contexts=tenant-1,tenant-2")
.run(context -> {
TenantCommandSegmentFactory factory = context.getBean(TenantCommandSegmentFactory.class);
SimpleCommandBus sharedCommandBus = context.getBean("sharedSimpleCommandBus", SimpleCommandBus.class);

AxonServerCommandBus commandBus = (AxonServerCommandBus) factory.apply(new TenantDescriptor("test-tenant"));
CommandBus localSegment = commandBus.localSegment();

assertThat(localSegment).isNotSameAs(sharedCommandBus);
});
}


static class SharedCommandBus {
@Qualifier("localSegment")
@Bean
public SimpleCommandBus sharedSimpleCommandBus() {
return mock(SimpleCommandBus.class);
}
}

@Test
void axonServerDisabled() {
contextRunner.withPropertyValues("axon.axonserver.enabled:false", "axon.axonserver.contexts=tenant-1,tenant-2")
Expand Down

0 comments on commit 1611ed5

Please sign in to comment.