Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.sql.engine;

import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/**
* Integration tests to verify SQL query execution during concurrent schema updates.
*/
public class ItSqlConcurrentSchemaModificationTest extends BaseSqlIntegrationTest {
@Override
protected int initialNodes() {
return 1;
}

@Override
protected String getNodeBootstrapConfigTemplate() {
return "ignite.sql.execution.threadCount: 64";
}

@AfterEach
void dropTables() {
System.clearProperty("FAST_QUERY_OPTIMIZATION_ENABLED");
Commons.resetFastQueryOptimizationFlag();
unwrapIgniteImpl(CLUSTER.aliveNode()).queryEngine().invalidatePlannerCache(Set.of());

dropAllTables();
}

@ParameterizedTest(name = "FastQueryOptimization={0}")
@ValueSource(booleans = {true, false})
void dmlWithConcurrentDdl(Boolean fastPlan) throws InterruptedException {
System.setProperty("FAST_QUERY_OPTIMIZATION_ENABLED", fastPlan.toString());

IgniteSql sql = CLUSTER.aliveNode().sql();

sql("CREATE TABLE t(id INT PRIMARY KEY)");

int iterations = 20;

for (int i = 0; i < iterations; i++) {
log.info("iteration #" + i);

String ddlQuery = i % 2 == 0
? "ALTER TABLE t ADD COLUMN val VARCHAR DEFAULT 'abc'"
: "ALTER TABLE t DROP COLUMN val";

CompletableFuture<AsyncResultSet<SqlRow>> ddlFut = sql.executeAsync(null, ddlQuery);

Thread.sleep(ThreadLocalRandom.current().nextInt(15) * 10);

assertQuery("INSERT INTO t (id) VALUES (?)")
.withParam(i)
.returns(1L)
.check();

await(await(ddlFut).closeAsync());

assertEquals(0, txManager().pending());
}
}

@ParameterizedTest(name = "FastQueryOptimization={0}")
@ValueSource(booleans = {true, false})
void selectWithConcurrentDdl(boolean fastPlan) throws InterruptedException {
System.setProperty("FAST_QUERY_OPTIMIZATION_ENABLED", String.valueOf(fastPlan));

IgniteSql sql = CLUSTER.aliveNode().sql();

sql("CREATE TABLE t(id INT PRIMARY KEY, id2 INT)");
sql("INSERT INTO t VALUES (0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9)");

int iterations = 20;

for (int i = 0; i < iterations; i++) {
log.info("iteration #" + i);

String ddlQuery = i % 2 == 0
? "ALTER TABLE t ADD COLUMN val VARCHAR DEFAULT 'abc'"
: "ALTER TABLE t DROP COLUMN val";

CompletableFuture<AsyncResultSet<SqlRow>> ddlFut = sql.executeAsync(null, ddlQuery);
Thread.sleep(ThreadLocalRandom.current().nextInt(15) * 10);

int id = i % 10;

assertQuery(format("SELECT id2 FROM t WHERE id={}", id))
.returns(id)
.check();

await(await(ddlFut).closeAsync());

assertEquals(0, txManager().pending());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.ignite.internal.hlc.HybridTimestamp;
Expand Down Expand Up @@ -52,6 +53,7 @@ public final class SqlOperationContext {
private final @Nullable Consumer<Throwable> errorListener;
private final @Nullable String userName;
private final @Nullable Long topologyVersion;
private final @Nullable AtomicReference<QueryTransactionWrapper> retryTxHolder;

/**
* Private constructor, used by a builder.
Expand All @@ -67,7 +69,8 @@ private SqlOperationContext(
@Nullable Consumer<QueryTransactionWrapper> txUsedListener,
@Nullable Consumer<Throwable> errorListener,
@Nullable String userName,
@Nullable Long topologyVersion
@Nullable Long topologyVersion,
@Nullable QueryTransactionWrapper retryTx
) {
this.queryId = queryId;
this.timeZoneId = timeZoneId;
Expand All @@ -80,12 +83,36 @@ private SqlOperationContext(
this.errorListener = errorListener;
this.userName = userName;
this.topologyVersion = topologyVersion;
this.retryTxHolder = new AtomicReference<>(retryTx);
}

public static Builder builder() {
return new Builder();
}

/**
* Copies an existing context preserving the existing transaction.
*
* <p>Used in case of a retry. If the operation is repeated while preserving the running transaction,
* the operation time is taken from this transaction.
*/
public SqlOperationContext withTransactionForRetry(QueryTransactionWrapper tx) {
return new SqlOperationContext(
queryId,
timeZoneId,
parameters,
tx.unwrap().schemaTimestamp(),
txContext,
cancel,
defaultSchemaName,
txUsedListener,
errorListener,
userName,
topologyVersion,
tx
);
}

/** Returns unique identifier of the query. */
public UUID queryId() {
return queryId;
Expand Down Expand Up @@ -182,6 +209,11 @@ public void excludeNode(String nodeName) {
return excludedNodes.isEmpty() ? null : excludedNodes::contains;
}

/** Returns transaction used for retry operation or {@code null}. */
public @Nullable QueryTransactionWrapper retryTx() {
return retryTxHolder.getAndSet(null);
}

/**
* Query context builder.
*/
Expand Down Expand Up @@ -268,7 +300,8 @@ public SqlOperationContext build() {
txUsedListener,
errorListener,
userName,
topologyVersion
topologyVersion,
null
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.sql.engine;

import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.sql.engine.exec.SqlPlanOutdatedException;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.jetbrains.annotations.TestOnly;

/**
* SQL query execution plan validator.
*
* <p>Performs validation of the catalog version from the {@link MultiStepPlan multi-step plan} relative to the started transaction.
*/
public class SqlPlanToTxSchemaVersionValidator {
@TestOnly
public static final SqlPlanToTxSchemaVersionValidator NOOP = new NoopSqlPlanToTxSchemaVersionValidator();

private final SchemaSyncService schemaSyncService;
private final CatalogService catalogService;

/**
* Compares the catalog version from the plan with the version of the active catalog
* at the {@link InternalTransaction#schemaTimestamp() schema time} of the transaction.
*
* @param plan {@link MultiStepPlan multi-step} execution plan.
* @param tx Query transaction wrapper.
* @return Successfully completed future if the provided transaction is explicit or the catalog versions match.
* Otherwise returns a future completed with an exception {@link SqlPlanOutdatedException}.
*/
public CompletableFuture<Void> validate(MultiStepPlan plan, QueryTransactionWrapper tx) {
if (!tx.implicit()) {
return nullCompletedFuture();
}

HybridTimestamp ts = tx.unwrap().schemaTimestamp();

return schemaSyncService.waitForMetadataCompleteness(ts)
.thenRun(() -> {
// TODO https://issues.apache.org/jira/browse/IGNITE-27491 Avoid re-planning in case of unrelated catalog changes
int requiredCatalog = catalogService.activeCatalogVersion(ts.longValue());

if (requiredCatalog != plan.catalogVersion()) {
throw new SqlPlanOutdatedException();
}
});
}

private SqlPlanToTxSchemaVersionValidator(SchemaSyncService schemaSyncService, CatalogService catalogService) {
this.schemaSyncService = schemaSyncService;
this.catalogService = catalogService;
}

public static SqlPlanToTxSchemaVersionValidator create(SchemaSyncService schemaSyncService, CatalogService catalogService) {
return new SqlPlanToTxSchemaVersionValidator(schemaSyncService, catalogService);
}

private static class NoopSqlPlanToTxSchemaVersionValidator extends SqlPlanToTxSchemaVersionValidator {
@SuppressWarnings("DataFlowIssue")
private NoopSqlPlanToTxSchemaVersionValidator() {
super(null, null);
}

@Override
public CompletableFuture<Void> validate(MultiStepPlan plan, QueryTransactionWrapper tx) {
return nullCompletedFuture();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ public synchronized CompletableFuture<Void> startAsync(ComponentContext componen
clockService,
killCommandHandler,
expressionFactory,
EXECUTION_SERVICE_SHUTDOWN_TIMEOUT
EXECUTION_SERVICE_SHUTDOWN_TIMEOUT,
SqlPlanToTxSchemaVersionValidator.create(schemaSyncService, catalogManager)
));

queryExecutor = registerService(new QueryExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ public CompletableFuture<Void> sendError(String nodeName, ExecutionId executionI
Throwable traceableErr = ExceptionUtils.unwrapCause(error);

if (!(traceableErr instanceof TraceableException)) {
traceableErr = error = new IgniteInternalException(INTERNAL_ERR, error);
traceableErr = new IgniteInternalException(INTERNAL_ERR, error);
}

if (((TraceableException) traceableErr).code() == INTERNAL_ERR) {
LOG.info(format("Failed to execute query fragment: traceId={}, executionId={}, fragmentId={}",
((TraceableException) traceableErr).traceId(), executionId, fragmentId), error);
} else if (LOG.isDebugEnabled()) {
Expand Down
Loading