Skip to content

Commit

Permalink
Merge pull request #200 from AxonFramework/feature/ep-control-segments
Browse files Browse the repository at this point in the history
Tenant Segments for Event Processor Control
  • Loading branch information
schananas authored May 24, 2024
2 parents d7347ea + 3cf555f commit b7fc607
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.config.Configuration;
import org.axonframework.extensions.multitenancy.components.TenantConnectPredicate;
import org.axonframework.extensions.multitenancy.components.TenantDescriptor;
import org.axonframework.extensions.multitenancy.components.TenantEventProcessorControlSegmentFactory;
import org.axonframework.extensions.multitenancy.components.TenantProvider;
import org.axonframework.extensions.multitenancy.components.commandhandeling.TenantCommandSegmentFactory;
import org.axonframework.extensions.multitenancy.components.eventstore.TenantEventSegmentFactory;
Expand All @@ -54,10 +56,12 @@
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;

/**
Expand Down Expand Up @@ -232,16 +236,27 @@ public TenantEventSchedulerSegmentFactory tenantEventSchedulerSegmentFactory(
};
}

@Bean
@Primary
@ConditionalOnMissingBean
public TenantEventProcessorControlSegmentFactory tenantEventProcessorControlSegmentFactory(

) {
return TenantDescriptor::tenantId;
}

@Bean
public EventProcessorInfoConfiguration processorInfoConfiguration(
TenantProvider tenantProvider,
AxonServerConnectionManager connectionManager
AxonServerConnectionManager connectionManager,
TenantEventProcessorControlSegmentFactory tenantEventProcessorControlSegmentFactory
) {
return new EventProcessorInfoConfiguration(c -> {
MultiTenantEventProcessorControlService controlService = new MultiTenantEventProcessorControlService(
connectionManager,
c.eventProcessingConfiguration(),
c.getComponent(AxonServerConfiguration.class)
c.getComponent(AxonServerConfiguration.class),
tenantEventProcessorControlSegmentFactory
);
tenantProvider.subscribe(controlService);
return controlService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.extensions.multitenancy.components.MultiTenantAwareComponent;
import org.axonframework.extensions.multitenancy.components.TenantDescriptor;
import org.axonframework.extensions.multitenancy.components.TenantEventProcessorControlSegmentFactory;
import org.axonframework.extensions.multitenancy.components.eventhandeling.MultiTenantEventProcessor;
import org.axonframework.lifecycle.Phase;
import org.axonframework.lifecycle.StartHandler;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class MultiTenantEventProcessorControlService
implements MultiTenantAwareComponent {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final TenantEventProcessorControlSegmentFactory tenantEventProcessorControlSegmentFactory;

/**
* Initialize a {@link MultiTenantEventProcessorControlService}.
Expand All @@ -70,40 +72,18 @@ public class MultiTenantEventProcessorControlService
* @param axonServerConfiguration The {@link AxonServerConfiguration} used to retrieve the
* {@link AxonServerConnectionManager#getDefaultContext() default context}
* from.
* @param tenantEventProcessorControlSegmentFactory The {@link TenantEventProcessorControlSegmentFactory} used to
* retrieve the context name for the given tenant.
*/
public MultiTenantEventProcessorControlService(AxonServerConnectionManager axonServerConnectionManager,
EventProcessingConfiguration eventProcessingConfiguration,
AxonServerConfiguration axonServerConfiguration) {
this(axonServerConnectionManager,
AxonServerConfiguration axonServerConfiguration,
TenantEventProcessorControlSegmentFactory tenantEventProcessorControlSegmentFactory) {
super(axonServerConnectionManager,
eventProcessingConfiguration,
axonServerConfiguration.getContext(),
axonServerConfiguration.getEventhandling().getProcessors());
}

/**
* Initialize a {@link MultiTenantEventProcessorControlService}.
* <p>
* This service adds processor instruction handlers to the {@link ControlChannel} of the given {@code context}, for
* every tenant. Doing so ensures operation like the {@link EventProcessor#start() start} and
* {@link EventProcessor#shutDown() shutdown} can be triggered through Axon Server. Furthermore, it sets the
* configured load balancing strategies through the {@link AdminChannel} of the {@code context}.
*
* @param axonServerConnectionManager A {@link AxonServerConnectionManager} from which to retrieve the
* {@link ControlChannel} and {@link AdminChannel}.
* @param eventProcessingConfiguration The {@link EventProcessor} configuration of this application, used to
* retrieve the registered event processors from.
* @param context The context of this application instance to retrieve the
* {@link ControlChannel} and {@link AdminChannel} for.
* @param processorConfig The processor configuration from the {@link AxonServerConfiguration}, used to
* (for example) retrieve the load balancing strategies from.
*/
public MultiTenantEventProcessorControlService(
AxonServerConnectionManager axonServerConnectionManager,
EventProcessingConfiguration eventProcessingConfiguration,
String context,
Map<String, AxonServerConfiguration.Eventhandling.ProcessorSettings> processorConfig
) {
super(axonServerConnectionManager, eventProcessingConfiguration, context, processorConfig);
this.tenantEventProcessorControlSegmentFactory = tenantEventProcessorControlSegmentFactory;
}

@StartHandler(phase = Phase.INSTRUCTION_COMPONENTS)
Expand Down Expand Up @@ -142,7 +122,7 @@ private Map<String, String> strategiesPerProcessor(Map<String, EventProcessor> e
// Filter out MultiTenantEventProcessors as those aren't registered with Axon Server anyhow.
.filter(entry -> !(entry.getValue() instanceof MultiTenantEventProcessor))
.map(Map.Entry::getKey)
.map(MultiTenantEventProcessorControlService::processorNameFromCombination)
.map(this::processorNameFromCombination)
.collect(Collectors.toList());
return processorConfig.entrySet()
.stream()
Expand Down Expand Up @@ -206,15 +186,19 @@ private Optional<String> tokenStoreIdentifierFor(String processorName) {
.retrieveStorageIdentifier();
}

private static String processorNameFromCombination(String processorAndContext) {
int index = processorAndContext.indexOf("@");
return index == -1 ? processorAndContext : processorAndContext.substring(0, index);
private String processorNameFromCombination(String processorAndTenantId) {
int index = processorAndTenantId.indexOf("@");
return index == -1 ? processorAndTenantId : processorAndTenantId.substring(0, index);
}

private static String contextFromCombination(String processorAndContext) {
int index = processorAndContext.indexOf("@");
private String contextFromCombination(String processorAndTenantId) {
int index = processorAndTenantId.indexOf("@");
//if there is no context name in the processorAndContext, return the _admin as default
return index == -1 ? "_admin" : processorAndContext.substring(index + 1);
String tenantId = index == -1 ? "_admin" : processorAndTenantId.substring(index + 1);
if ("_admin".equals(tenantId)) {
return "_admin";
}
return tenantEventProcessorControlSegmentFactory.apply(TenantDescriptor.tenantWithId(tenantId));
}

@Override
Expand All @@ -233,7 +217,8 @@ public Registration registerAndStartTenant(TenantDescriptor tenantDescriptor) {
if (processor instanceof MultiTenantEventProcessor || !name.contains(tenantDescriptor.tenantId())) {
return;
}
ControlChannel controlChannel = axonServerConnectionManager.getConnection(tenantDescriptor.tenantId())
String context = tenantEventProcessorControlSegmentFactory.apply(tenantDescriptor);
ControlChannel controlChannel = axonServerConnectionManager.getConnection(context)
.controlChannel();
AxonProcessorInstructionHandler instructionHandler = new AxonProcessorInstructionHandler(processor, name);
controlChannel.registerEventProcessor(name, infoSupplier(processor), instructionHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.extensions.multitenancy.components.TenantDescriptor;
import org.axonframework.extensions.multitenancy.components.TenantEventProcessorControlSegmentFactory;
import org.axonframework.extensions.multitenancy.components.eventhandeling.MultiTenantEventProcessor;
import org.junit.jupiter.api.*;
import org.mockito.*;
Expand Down Expand Up @@ -75,9 +76,12 @@ void setUp() {
AxonServerConfiguration axonServerConfig = mock(AxonServerConfiguration.class);
mockAxonServerConfig(axonServerConfig);

TenantEventProcessorControlSegmentFactory tenantEventProcessorControlSegmentFactory = TenantDescriptor::tenantId;

testSubject = new MultiTenantEventProcessorControlService(axonServerConnectionManager,
eventProcessingConfiguration,
axonServerConfig);
axonServerConfig,
tenantEventProcessorControlSegmentFactory);
}

private void mockConnectionManager() {
Expand Down Expand Up @@ -112,7 +116,7 @@ private void mockConnectionManager() {
when(axonServerConnectionManager.getConnection(contextCapture.capture()))
.thenAnswer(a -> {
String capturedValue = contextCapture.getValue();
if (capturedValue.equals("tenant-1")) {
if (capturedValue.equals("tenant-1") || capturedValue.equals("tenant-1-context")) {
return connectionTenant1;
} else if (capturedValue.equals("tenant-2")) {
return connectionTenant2;
Expand Down Expand Up @@ -218,4 +222,29 @@ void willSetLoadBalancingStrategyForProcessorsWithPropertiesOnStart() {
verify(adminTenant1, never()).setAutoLoadBalanceStrategy(eq(processorNameWithoutSettings), any(), any());
verify(adminTenant2, never()).setAutoLoadBalanceStrategy(eq(processorNameWithoutSettings), any(), any());
}

@Test
void testNonDefaultTenantEventProcessorControlSegmentFactory() {
AxonServerConfiguration axonServerConfig = mock(AxonServerConfiguration.class);
mockAxonServerConfig(axonServerConfig);
// Arrange
TenantEventProcessorControlSegmentFactory tenantEventProcessorControlSegmentFactory = tenantId -> tenantId.tenantId() + "-context";
MultiTenantEventProcessorControlService testSubject = new MultiTenantEventProcessorControlService(
axonServerConnectionManager,
eventProcessingConfiguration,
axonServerConfig,
tenantEventProcessorControlSegmentFactory
);

when(eventProcessingConfiguration.eventProcessors()).thenReturn(ImmutableMap.of(
PROCESSOR_NAME + "@tenant-1", mock(EventProcessor.class)
));

// Act
testSubject.start();

// Assert
verify(axonServerConnectionManager).getConnection("tenant-1-context");
verify(controlTenant1).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-1"), any(), any());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.axonframework.extensions.multitenancy.components;

import java.util.function.Function;

/**
* Maps a tenant id to the context name for the EventProcessorControlService.
* <p>
* This interface is used to create a mapping between a given {@link TenantDescriptor} and a context name.
* After a mapping is created, it will be used by EventProcessorControlService
* to associate event processor control with the given context.
*
* @author Stefan Dragisic
* @since 4.9.3
*/
public interface TenantEventProcessorControlSegmentFactory extends Function<TenantDescriptor, String> {

}

0 comments on commit b7fc607

Please sign in to comment.