Skip to content

Commit

Permalink
Merge pull request #206 from AxonFramework/feature/streams
Browse files Browse the repository at this point in the history
Persistent streams
  • Loading branch information
smcvb authored Jul 23, 2024
2 parents a381b8b + 0ffd726 commit 4b7d521
Show file tree
Hide file tree
Showing 14 changed files with 490 additions and 32 deletions.
6 changes: 3 additions & 3 deletions multitenancy-spring-boot-3-integrationtests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-messaging</artifactId>
<version>4.9.3</version>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-autoconfigure</artifactId>
<version>4.9.3</version>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-test</artifactId>
<version>4.9.3</version>
<version>${axon.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,15 @@
import org.axonframework.extensions.multitenancy.components.queryhandeling.TenantQueryUpdateEmitterSegmentFactory;
import org.axonframework.extensions.multitenancy.components.scheduling.TenantEventSchedulerSegmentFactory;
import org.axonframework.messaging.interceptors.CorrelationDataInterceptor;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryInvocationErrorHandler;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.axonframework.queryhandling.SimpleQueryBus;
import org.axonframework.queryhandling.SimpleQueryUpdateEmitter;
import org.axonframework.queryhandling.*;
import org.axonframework.serialization.Serializer;
import org.axonframework.spring.config.SpringAxonConfiguration;
import org.axonframework.springboot.autoconfig.AxonServerAutoConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand Down Expand Up @@ -260,4 +256,5 @@ public EventProcessorInfoConfiguration processorInfoConfiguration(
return controlService;
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2010-2024. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.extensions.multitenancy.autoconfig;

import org.axonframework.axonserver.connector.event.axon.PersistentStreamMessageSource;
import org.axonframework.axonserver.connector.event.axon.PersistentStreamMessageSourceFactory;
import org.axonframework.common.StringUtils;
import org.axonframework.extensions.multitenancy.components.TenantProvider;
import org.axonframework.springboot.autoconfig.AxonServerAutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;

/**
* Auto-configuration class for multi-tenant persistent stream support in Axon Framework.
* This configuration is enabled when Axon Server and multi-tenancy are both enabled.
*
* @author Stefan Dragisic
* @since 4.10.0
*/
@AutoConfiguration
@ConditionalOnProperty(value = {"axon.axonserver.enabled", "axon.multi-tenancy.enabled"}, matchIfMissing = true)
@AutoConfigureBefore(AxonServerAutoConfiguration.class)
public class MultiTenantPersistentStreamAutoConfiguration {

/**
* Creates a PersistentStreamMessageSourceFactory for multi-tenant environments.
*
* @param tenantProvider The TenantProvider for managing tenants.
* @param tenantPersistentStreamMessageSourceFactory The factory for creating tenant-specific PersistentStreamMessageSources.
* @return A PersistentStreamMessageSourceFactory that supports multi-tenancy.
*/
@Bean
@ConditionalOnMissingBean
public PersistentStreamMessageSourceFactory persistentStreamMessageSourceFactory(
TenantProvider tenantProvider,
TenantPersistentStreamMessageSourceFactory tenantPersistentStreamMessageSourceFactory
) {
return (name, persistentStreamProperties, scheduler, batchSize, context, configuration) -> {
MultiTenantPersistentStreamMessageSource component = new MultiTenantPersistentStreamMessageSource(name, persistentStreamProperties, scheduler, batchSize, context, configuration,
tenantPersistentStreamMessageSourceFactory);
tenantProvider.subscribe(component);
return component;
};
}

/**
* Creates a TenantPersistentStreamMessageSourceFactory for creating tenant-specific PersistentStreamMessageSources.
* @return A TenantPersistentStreamMessageSourceFactory.
*/
@Bean
@ConditionalOnMissingBean
public TenantPersistentStreamMessageSourceFactory tenantPersistentStreamMessageSourceFactory(
) {
return ( name,
persistentStreamProperties,
scheduler,
batchSize,
context,
configuration,
tenantDescriptor) ->
new PersistentStreamMessageSource(name + "@" + tenantDescriptor.tenantId(),
configuration,
persistentStreamProperties,
scheduler,
batchSize,
StringUtils.emptyOrNull(context) ? tenantDescriptor.tenantId() : context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (c) 2010-2024. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.extensions.multitenancy.autoconfig;


import io.axoniq.axonserver.connector.event.PersistentStreamProperties;
import org.axonframework.axonserver.connector.event.axon.PersistentStreamMessageSource;
import org.axonframework.config.Configuration;
import org.axonframework.extensions.multitenancy.components.MultiTenantAwareComponent;
import org.axonframework.extensions.multitenancy.components.TenantDescriptor;
import org.axonframework.common.Registration;
import org.axonframework.extensions.multitenancy.components.eventstore.MultiTenantSubscribableMessageSource;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;

/**
* A multi-tenant persistent stream message source that extends PersistentStreamMessageSource
* and implements MultiTenantAwareComponent and MultiTenantSubscribableMessageSource interfaces.
* <p>
* This class provides functionality to manage message sources for multiple tenants,
* allowing registration and management of tenant-specific persistent stream message sources.
* It maintains a concurrent map of tenant descriptors to their corresponding message sources.
* </p>
* <p>
* The class supports operations such as registering new tenants, starting tenants,
* and retrieving all tenant segments. It uses a factory to create tenant-specific
* message sources, ensuring proper initialization and configuration for each tenant.
* </p>
* @author Stefan Dragisic
* @since 4.10.0
*/
public class MultiTenantPersistentStreamMessageSource extends PersistentStreamMessageSource
implements MultiTenantAwareComponent, MultiTenantSubscribableMessageSource<PersistentStreamMessageSource> {

private final String name;
private final Configuration configuration;
private final TenantPersistentStreamMessageSourceFactory tenantPersistentStreamMessageSourceFactory;
private final Map<TenantDescriptor, PersistentStreamMessageSource> tenantSegments = new ConcurrentHashMap<>();
private final PersistentStreamProperties persistentStreamProperties;
private final ScheduledExecutorService scheduler;
private final int batchSize;
private final String context;

/**
* Constructs a new MultiTenantPersistentStreamMessageSource.
*
* @param name The name of the message source.
* @param persistentStreamProperties Properties for the persistent stream.
* @param scheduler The scheduled executor service for managing tasks.
* @param batchSize The size of each batch of messages to process.
* @param context The context in which this message source operates.
* @param configuration The configuration settings for the message source.
* @param tenantPersistentStreamMessageSourceFactory The factory for creating tenant-specific message sources.
*/
public MultiTenantPersistentStreamMessageSource(String name, PersistentStreamProperties
persistentStreamProperties, ScheduledExecutorService scheduler, int batchSize, String context, Configuration configuration,
TenantPersistentStreamMessageSourceFactory tenantPersistentStreamMessageSourceFactory) {

super(name, configuration, persistentStreamProperties, scheduler, batchSize, context);
this.tenantPersistentStreamMessageSourceFactory = tenantPersistentStreamMessageSourceFactory;
this.name = name;
this.configuration = configuration;
this.persistentStreamProperties = persistentStreamProperties;
this.scheduler = scheduler;
this.batchSize = batchSize;
this.context = context;
}

/**
* Registers a new tenant with the message source.
*
* @param tenantDescriptor The descriptor of the tenant to register.
* @return A Registration object that can be used to unregister the tenant.
*/
@Override
public Registration registerTenant(TenantDescriptor tenantDescriptor) {
PersistentStreamMessageSource tenantSegment = tenantPersistentStreamMessageSourceFactory.build(name,
persistentStreamProperties, scheduler, batchSize, context, configuration, tenantDescriptor);
tenantSegments.putIfAbsent(tenantDescriptor, tenantSegment);

return () -> {
PersistentStreamMessageSource removed = tenantSegments.remove(tenantDescriptor);
return removed != null;
};
}

/**
* Registers and starts a new tenant with the message source.
* In this implementation, it's equivalent to just registering the tenant.
* This component doesn't require any additional steps to start a tenant.
*
* @param tenantDescriptor The descriptor of the tenant to register and start.
* @return A Registration object that can be used to unregister the tenant.
*/
@Override
public Registration registerAndStartTenant(TenantDescriptor tenantDescriptor) {
return registerTenant(tenantDescriptor);
}

/**
* Returns a map of all registered tenant segments.
*
* @return An unmodifiable map where keys are TenantDescriptors and values are PersistentStreamMessageSources.
*/
@Override
public Map<TenantDescriptor, PersistentStreamMessageSource> tenantSegments() {
return Collections.unmodifiableMap(tenantSegments);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2010-2024. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.extensions.multitenancy.autoconfig;

import io.axoniq.axonserver.connector.event.PersistentStreamProperties;
import org.axonframework.axonserver.connector.event.axon.PersistentStreamMessageSource;
import org.axonframework.config.Configuration;
import org.axonframework.extensions.multitenancy.components.TenantDescriptor;

import java.util.concurrent.ScheduledExecutorService;


/**
* Factory interface for creating a {@link PersistentStreamMessageSource} for a specific tenant.
* The created PersistentStreamMessageSource can be used to read a stream of events from an Axon Server for a specific processor and tenant.
* The PersistentStreamMessageSource is configured with the provided processor name, settings, tenant descriptor, and Axon configuration.
*
* This interface is used to create a {@link PersistentStreamMessageSource} for a given tenant,
* @author Stefan Dragisic
* @since 4.10.0
*/
@FunctionalInterface
public interface TenantPersistentStreamMessageSourceFactory {


/**
* Builds a new instance of {@link PersistentStreamMessageSource} with the specified parameters.
*
* @param name The name of the persistent stream. This is used to identify the stream.
* @param persistentStreamProperties The properties of the persistent stream, containing configuration details.
* @param scheduler The {@link ScheduledExecutorService} to be used for scheduling tasks related to the message source.
* @param batchSize The number of events to be fetched in a single batch from the stream.
* @param context The context in which the persistent stream operates. This can be used to differentiate streams in different environments or applications.
* @param configuration The Axon {@link Configuration} object, which provides access to the framework's configuration settings.
* @param tenantDescriptor The descriptor of the tenant for which the PersistentStreamMessageSource is created.
* @return A new instance of {@link PersistentStreamMessageSource} configured with the provided parameters.
* @throws IllegalArgumentException if any of the required parameters are null or invalid.
* @throws org.axonframework.axonserver.connector.AxonServerException if there's an issue connecting to or configuring the Axon Server.
*/
PersistentStreamMessageSource build(
String name,
PersistentStreamProperties persistentStreamProperties,
ScheduledExecutorService scheduler,
int batchSize,
String context,
Configuration configuration,
TenantDescriptor tenantDescriptor);
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.axonframework.extensions.multitenancy.autoconfig.MultiTenantPersistentStreamAutoConfiguration,\
org.axonframework.extensions.multitenancy.autoconfig.MultiTenancyAutoConfiguration,\
org.axonframework.extensions.multitenancy.autoconfig.MultiTenancyAxonServerAutoConfiguration

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.axonframework.extensions.multitenancy.autoconfig;

import org.axonframework.axonserver.connector.event.axon.PersistentStreamMessageSourceFactory;
import org.axonframework.extensions.multitenancy.components.TargetTenantResolver;
import org.axonframework.extensions.multitenancy.components.TenantConnectPredicate;
import org.axonframework.extensions.multitenancy.components.TenantDescriptor;
Expand All @@ -31,16 +32,7 @@
import org.axonframework.extensions.multitenancy.configuration.MultiTenantEventProcessingModule;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.correlation.CorrelationDataProvider;
import org.axonframework.springboot.autoconfig.AxonAutoConfiguration;
import org.axonframework.springboot.autoconfig.AxonServerAutoConfiguration;
import org.axonframework.springboot.autoconfig.AxonServerBusAutoConfiguration;
import org.axonframework.springboot.autoconfig.AxonTracingAutoConfiguration;
import org.axonframework.springboot.autoconfig.EventProcessingAutoConfiguration;
import org.axonframework.springboot.autoconfig.InfraConfiguration;
import org.axonframework.springboot.autoconfig.NoOpTransactionAutoConfiguration;
import org.axonframework.springboot.autoconfig.ObjectMapperAutoConfiguration;
import org.axonframework.springboot.autoconfig.TransactionAutoConfiguration;
import org.axonframework.springboot.autoconfig.XStreamAutoConfiguration;
import org.axonframework.springboot.autoconfig.*;
import org.junit.jupiter.api.*;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
Expand Down Expand Up @@ -96,6 +88,10 @@ void multiTenancyAutoConfiguration() {
.isExactlyInstanceOf(MultiTenantEventScheduler.class);
assertThat(context).getBean("multiTenantQueryUpdateEmitter")
.isInstanceOf(MultiTenantQueryUpdateEmitter.class);
assertThat(context).getBean("persistentStreamMessageSourceFactory")
.isInstanceOf(PersistentStreamMessageSourceFactory.class);
assertThat(context).getBean("tenantPersistentStreamMessageSourceFactory")
.isInstanceOf(TenantPersistentStreamMessageSourceFactory.class);
});
}

Expand All @@ -121,6 +117,7 @@ void multiTenancyDisabled() {
assertThat(context).doesNotHaveBean(MultiTenantDeadLetterQueueFactory.class);
assertThat(context).doesNotHaveBean(MultiTenantEventScheduler.class);
assertThat(context).doesNotHaveBean(MultiTenantQueryUpdateEmitter.class);
assertThat(context).doesNotHaveBean(TenantPersistentStreamMessageSourceFactory.class);
});
}

Expand Down
Loading

0 comments on commit 4b7d521

Please sign in to comment.