Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,12 @@ public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
return this;
}

@Override
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
setProperty("subscription_enabled", String.valueOf(subscriptionEnabled));
return this;
}

@Override
public CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
setProperty("default_storage_group_level", String.valueOf(defaultStorageGroupLevel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,13 @@ public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
return this;
}

@Override
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
dnConfig.setSubscriptionEnabled(subscriptionEnabled);
cnConfig.setSubscriptionEnabled(subscriptionEnabled);
return this;
}

@Override
public CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
dnConfig.setDefaultStorageGroupLevel(defaultStorageGroupLevel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,4 +380,9 @@ public CommonConfig setPipeConnectorRequestSliceThresholdBytes(
public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
return this;
}

@Override
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ CommonConfig setPipeConnectorRequestSliceThresholdBytes(

CommonConfig setQueryMemoryProportion(String queryMemoryProportion);

CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled);

default CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class IoTDBDatabaseIT {

@Before
public void setUp() throws Exception {
// enable subscription
EnvFactory.getEnv().getConfig().getCommonConfig().setSubscriptionEnabled(true);
EnvFactory.getEnv().initClusterEnvironment();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public class IoTDBSubscriptionRestartIT extends AbstractSubscriptionIT {
public void setUp() throws Exception {
super.setUp();

// enable subscription
EnvFactory.getEnv().getConfig().getCommonConfig().setSubscriptionEnabled(true);

// set cluster env
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public void setUp() throws Exception {
}

protected void setUpConfig() {
// enable subscription
senderEnv.getConfig().getCommonConfig().setSubscriptionEnabled(true);
receiverEnv.getConfig().getCommonConfig().setSubscriptionEnabled(true);

// enable auto create schema
senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public abstract class AbstractSubscriptionLocalIT extends AbstractSubscriptionIT
public void setUp() throws Exception {
super.setUp();

// enable subscription
EnvFactory.getEnv().getConfig().getCommonConfig().setSubscriptionEnabled(true);
EnvFactory.getEnv().initClusterEnvironment();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public void setUp() throws Exception {
}

protected void setUpConfig() {
// enable subscription
sender.getConfig().getCommonConfig().setSubscriptionEnabled(true);
receiver1.getConfig().getCommonConfig().setSubscriptionEnabled(true);
receiver2.getConfig().getCommonConfig().setSubscriptionEnabled(true);

// enable auto create schema
sender.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
receiver1.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ public class ExportTsFileTestIT extends AbstractScriptIT {

@BeforeClass
public static void setUp() throws Exception {
// enable subscription
EnvFactory.getEnv().getConfig().getCommonConfig().setSubscriptionEnabled(true);
EnvFactory.getEnv().initClusterEnvironment();

ip = EnvFactory.getEnv().getIP();
port = EnvFactory.getEnv().getPort();
toolsPath = EnvFactory.getEnv().getToolsPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ public enum TSStatusCode {
SUBSCRIPTION_MISSING_CUSTOMER(1909),
SHOW_SUBSCRIPTION_ERROR(1910),
SUBSCRIPTION_PIPE_TIMEOUT_ERROR(1911),
SUBSCRIPTION_NOT_ENABLED_ERROR(1912),

// Topic
CREATE_TOPIC_ERROR(2000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ private static void verifyPipeSubscribeSuccess(final TSStatus status)
case 1900: // SUBSCRIPTION_VERSION_ERROR
case 1901: // SUBSCRIPTION_TYPE_ERROR
case 1909: // SUBSCRIPTION_MISSING_CUSTOMER
case 1912: // SUBSCRIPTION_NOT_ENABLED_ERROR
default:
{
final String errorMessage =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.pipe.agent.task.execution;

import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.consensus.PipeConsensusSubtaskExecutor;
import org.apache.iotdb.db.subscription.task.execution.SubscriptionSubtaskExecutor;

Expand Down Expand Up @@ -53,7 +54,10 @@ public PipeConsensusSubtaskExecutor getConsensusExecutor() {
private PipeSubtaskExecutorManager() {
processorExecutor = new PipeProcessorSubtaskExecutor();
connectorExecutor = new PipeConnectorSubtaskExecutor();
subscriptionExecutor = new SubscriptionSubtaskExecutor();
subscriptionExecutor =
SubscriptionConfig.getInstance().getSubscriptionEnabled()
? new SubscriptionSubtaskExecutor()
: null;
consensusExecutor = new PipeConsensusSubtaskExecutor();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.commons.trigger.TriggerInformation;
Expand Down Expand Up @@ -1239,6 +1240,11 @@ public TPushPipeMetaResp pushMultiPipeMeta(TPushMultiPipeMetaReq req) {

@Override
public TPushTopicMetaResp pushTopicMeta(TPushTopicMetaReq req) {
if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
return new TPushTopicMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}

final List<TopicMeta> topicMetas = new ArrayList<>();
for (ByteBuffer byteBuffer : req.getTopicMetas()) {
topicMetas.add(TopicMeta.deserialize(byteBuffer));
Expand All @@ -1262,6 +1268,11 @@ public TPushTopicMetaResp pushTopicMeta(TPushTopicMetaReq req) {

@Override
public TPushTopicMetaResp pushSingleTopicMeta(TPushSingleTopicMetaReq req) {
if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
return new TPushTopicMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}

try {
final TPushTopicMetaRespExceptionMessage exceptionMessage;
if (req.isSetTopicNameToDrop()) {
Expand Down Expand Up @@ -1290,6 +1301,11 @@ public TPushTopicMetaResp pushSingleTopicMeta(TPushSingleTopicMetaReq req) {

@Override
public TPushTopicMetaResp pushMultiTopicMeta(TPushMultiTopicMetaReq req) {
if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
return new TPushTopicMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}

boolean hasException = false;
// If there is any exception, we use the size of exceptionMessages to record the fail index
List<TPushTopicMetaRespExceptionMessage> exceptionMessages = new ArrayList<>();
Expand Down Expand Up @@ -1337,6 +1353,11 @@ public TPushTopicMetaResp pushMultiTopicMeta(TPushMultiTopicMetaReq req) {

@Override
public TPushConsumerGroupMetaResp pushConsumerGroupMeta(TPushConsumerGroupMetaReq req) {
if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
return new TPushConsumerGroupMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}

final List<ConsumerGroupMeta> consumerGroupMetas = new ArrayList<>();
for (ByteBuffer byteBuffer : req.getConsumerGroupMetas()) {
consumerGroupMetas.add(ConsumerGroupMeta.deserialize(byteBuffer));
Expand All @@ -1361,6 +1382,11 @@ public TPushConsumerGroupMetaResp pushConsumerGroupMeta(TPushConsumerGroupMetaRe
@Override
public TPushConsumerGroupMetaResp pushSingleConsumerGroupMeta(
TPushSingleConsumerGroupMetaReq req) {
if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
return new TPushConsumerGroupMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}

try {
final TPushConsumerGroupMetaRespExceptionMessage exceptionMessage;
if (req.isSetConsumerGroupNameToDrop()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchemaUtil;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFClassLoader;
Expand Down Expand Up @@ -375,6 +376,16 @@ private static final class ClusterConfigTaskExecutorHolder {
private ClusterConfigTaskExecutorHolder() {}
}

private static final SettableFuture<ConfigTaskResult> SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;

static {
SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE = SettableFuture.create();
SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE.setException(
new IoTDBException(
"Subscription not enabled, please set config `subscription_enabled` to true.",
TSStatusCode.SUBSCRIPTION_NOT_ENABLED_ERROR.getStatusCode()));
}

public static ClusterConfigTaskExecutor getInstance() {
return ClusterConfigTaskExecutor.ClusterConfigTaskExecutorHolder.INSTANCE;
}
Expand Down Expand Up @@ -2391,6 +2402,10 @@ public SettableFuture<ConfigTaskResult> showPipes(final ShowPipesStatement showP
@Override
public SettableFuture<ConfigTaskResult> showSubscriptions(
final ShowSubscriptionsStatement showSubscriptionsStatement) {
if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
}

final SettableFuture<ConfigTaskResult> future = SettableFuture.create();

try (final ConfigNodeClient configNodeClient =
Expand Down Expand Up @@ -2426,6 +2441,10 @@ public SettableFuture<ConfigTaskResult> showSubscriptions(
@Override
public SettableFuture<ConfigTaskResult> dropSubscription(
final DropSubscriptionStatement dropSubscriptionStatement) {
if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
}

final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
Expand All @@ -2449,6 +2468,10 @@ public SettableFuture<ConfigTaskResult> dropSubscription(
@Override
public SettableFuture<ConfigTaskResult> createTopic(
final CreateTopicStatement createTopicStatement) {
if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
}

final SettableFuture<ConfigTaskResult> future = SettableFuture.create();

final String topicName = createTopicStatement.getTopicName();
Expand Down Expand Up @@ -2515,6 +2538,10 @@ public SettableFuture<ConfigTaskResult> createTopic(

@Override
public SettableFuture<ConfigTaskResult> dropTopic(final DropTopicStatement dropTopicStatement) {
if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
}

final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
Expand All @@ -2538,6 +2565,10 @@ public SettableFuture<ConfigTaskResult> dropTopic(final DropTopicStatement dropT
@Override
public SettableFuture<ConfigTaskResult> showTopics(
final ShowTopicsStatement showTopicsStatement) {
if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
}

final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,29 @@ public class SubscriptionReceiverAgent {

private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionReceiverAgent.class);

private final ThreadLocal<SubscriptionReceiver> receiverThreadLocal = new ThreadLocal<>();

private static final Map<Byte, Supplier<SubscriptionReceiver>> RECEIVER_CONSTRUCTORS =
new HashMap<>();

private static final TPipeSubscribeResp SUBSCRIPTION_NOT_ENABLED_ERROR_RESP =
new TPipeSubscribeResp(
RpcUtils.getStatus(
TSStatusCode.SUBSCRIPTION_NOT_ENABLED_ERROR,
"Subscription not enabled, please set config `subscription_enabled` to true."),
PipeSubscribeResponseVersion.VERSION_1.getVersion(),
PipeSubscribeResponseType.ACK.getType());

private final ThreadLocal<SubscriptionReceiver> receiverThreadLocal = new ThreadLocal<>();

SubscriptionReceiverAgent() {
RECEIVER_CONSTRUCTORS.put(
PipeSubscribeRequestVersion.VERSION_1.getVersion(), SubscriptionReceiverV1::new);
}

public TPipeSubscribeResp handle(final TPipeSubscribeReq req) {
if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
return SUBSCRIPTION_NOT_ENABLED_ERROR_RESP;
}

final byte reqVersion = req.getVersion();
if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
return getReceiver(reqVersion).handle(req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ SubscriptionTopicAgent topic() {

@Override
public void start() throws StartupException {
if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
return;
}

SubscriptionConfig.getInstance().printAllConfigs();

SubscriptionAgentLauncher.launchSubscriptionTopicAgent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ public class CommonConfig {
private boolean pipeEventReferenceTrackingEnabled = true;
private long pipeEventReferenceEliminateIntervalSeconds = 10;

private boolean subscriptionEnabled = false;

private float subscriptionCacheMemoryUsagePercentage = 0.2F;
private int subscriptionSubtaskExecutorMaxThreadNum = 2;

Expand Down Expand Up @@ -2156,6 +2158,14 @@ public void setPipeEventReferenceEliminateIntervalSeconds(
pipeEventReferenceEliminateIntervalSeconds);
}

public boolean getSubscriptionEnabled() {
return subscriptionEnabled;
}

public void setSubscriptionEnabled(boolean subscriptionEnabled) {
this.subscriptionEnabled = subscriptionEnabled;
}

public float getSubscriptionCacheMemoryUsagePercentage() {
return subscriptionCacheMemoryUsagePercentage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ public void loadCommonProps(TrimProperties properties) throws IOException {
}

private void loadSubscriptionProps(TrimProperties properties) {
config.setSubscriptionEnabled(
Boolean.parseBoolean(
properties.getProperty(
"subscription_enabled", String.valueOf(config.getSubscriptionEnabled()))));

config.setSubscriptionCacheMemoryUsagePercentage(
Float.parseFloat(
properties.getProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
Expand Down Expand Up @@ -167,6 +168,13 @@ protected TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChangesInternal(
private void executeSinglePipeMetaChanges(final PipeMeta metaFromCoordinator)
throws IllegalPathException {
final String pipeName = metaFromCoordinator.getStaticMeta().getPipeName();

// Do nothing with the subscription pipe if disable subscription
if (PipeStaticMeta.isSubscriptionPipe(pipeName)
&& !SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
return;
}

final PipeMeta metaInAgent = pipeMetaKeeper.getPipeMeta(pipeName);

// If pipe meta does not exist on local agent, create a new pipe
Expand Down
Loading
Loading