Skip to content

Commit

Permalink
[feature][dingo-executor] Resolve oom of delRegion
Browse files Browse the repository at this point in the history
  • Loading branch information
guojn1 authored and githubgxll committed Dec 9, 2024
1 parent 4a92802 commit 2918fe8
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 98 deletions.
3 changes: 3 additions & 0 deletions dingo-common/src/main/java/io/dingodb/common/ddl/DdlUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import io.dingodb.common.type.scalar.TimestampType;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;

public final class DdlUtil {
Expand Down Expand Up @@ -64,6 +66,7 @@ public final class DdlUtil {
public static int errorCountLimit = 5;

public static final String ddlId = String.format("%s:%d", DingoConfiguration.host(), DingoConfiguration.port());
public static BlockingQueue<GcDeleteRegion> gcDelRegionQueue = new LinkedBlockingDeque<>(10000);

private DdlUtil() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2021 DataCanvas
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.dingodb.common.ddl;

import lombok.Builder;
import lombok.Data;

@Builder
@Data
public class GcDeleteRegion {
long jobId;
long regionId;
String startKey;
String endKey;
long startTs;
String eleId;
String eleType;

public GcDeleteRegion(
long jobId, long regionId, String startKey, String endKey, long startTs,
String eleId, String eleType
) {
this.jobId = jobId;
this.regionId = regionId;
this.startKey = startKey;
this.endKey = endKey;
this.startTs = startTs;
this.eleId = eleId;
this.eleType = eleType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static Iterator<Object[]> iterator() {
.iterator();
}

static {
static {
stmtSummaryMap = CacheBuilder.newBuilder()
.maximumSize(4096)
.build(new CacheLoader<String, StmtSummary>() {
Expand All @@ -63,17 +63,18 @@ public static Iterator<Object[]> iterator() {
Executors.execute("stmtSummary", StmtSummaryMap::handleProfile);
analyzeQueue = new LinkedBlockingDeque<>(2000);
DingoMetrics.metricRegistry.register("profileQueue", new CachedGauge<Integer>(1, TimeUnit.MINUTES) {
@Override
protected Integer loadValue() {
return profileQueue.size();
}
@Override
protected Integer loadValue() {
return profileQueue.size();
}
});

DingoMetrics.metricRegistry.register("analyzeTaskQueue", new CachedGauge<Integer>(1, TimeUnit.MINUTES) {
@Override
protected Integer loadValue() {
return analyzeQueue.size();
}
});
DingoMetrics.metricRegistry.register("analyzeTaskQueue", new CachedGauge<Integer>(1, TimeUnit.MINUTES) {
@Override
protected Integer loadValue() {
return analyzeQueue.size();
}
});
}

private static void handleProfile() {
Expand Down
11 changes: 11 additions & 0 deletions dingo-common/src/main/java/io/dingodb/common/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package io.dingodb.common.util;

import io.dingodb.common.log.LogUtils;
import io.dingodb.common.type.TupleMapping;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;

import java.lang.reflect.InvocationTargetException;
Expand All @@ -37,6 +39,7 @@

import static io.dingodb.common.util.Parameters.cleanNull;

@Slf4j
public final class Utils {
private Utils() {
}
Expand Down Expand Up @@ -298,5 +301,13 @@ public static void sleep(long waitTime) {
}
}

public static <T> void put(@NonNull BlockingQueue<T> queue, T element) {
try {
queue.put(element);
} catch (InterruptedException e) {
LogUtils.error(log, e.getMessage(), e);
}
}

public static final int INTEGER_LEN_IN_BYTES = 4;
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.type.BasicSqlType;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlValidator;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package io.dingodb.server.executor.ddl;

import com.codahale.metrics.CachedGauge;
import com.codahale.metrics.Timer;
import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.ddl.DdlJob;
import io.dingodb.common.ddl.DdlJobEvent;
import io.dingodb.common.ddl.DdlJobEventSource;
import io.dingodb.common.ddl.DdlJobListenerImpl;
import io.dingodb.common.ddl.DdlUtil;
import io.dingodb.common.ddl.GcDeleteRegion;
import io.dingodb.common.ddl.JobState;
import io.dingodb.common.environment.ExecutionEnvironment;
import io.dingodb.common.log.LogUtils;
Expand Down Expand Up @@ -61,6 +63,19 @@ public static void watchDdlJob() {
if (DdlUtil.delDiff) {
delVerSchemaDiff();
}
delRegion();
DingoMetrics.metricRegistry.register("delRegionQueue", new CachedGauge<Integer>(1, TimeUnit.MINUTES) {
@Override
protected Integer loadValue() {
return DdlUtil.gcDelRegionQueue.size();
}
});
DingoMetrics.metricRegistry.register("verDelQueue", new CachedGauge<Integer>(1, TimeUnit.MINUTES) {
@Override
protected Integer loadValue() {
return verDelQueue.size();
}
});
}

public static void delVerSchemaDiff() {
Expand All @@ -85,6 +100,20 @@ public static void delVerSchemaDiff() {
}).start();
}

public static void delRegion() {
new Thread(() -> {
while (true) {
if (!ExecutionEnvironment.INSTANCE.ddlOwner.get()) {
Utils.sleep(5000);
continue;
}
GcDeleteRegion gcDeleteRegion = Utils.forceTake(DdlUtil.gcDelRegionQueue);
JobTableUtil.insertGcDeleteRange(gcDeleteRegion);
DingoMetrics.counter("insertGcRegionCnt").inc();
}
}).start();
}

public static void watchDdlKey() {
WatchService watchService = new WatchService(Configuration.coordinators());
Kv kv = Kv.builder().kv(KeyValue.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.dingodb.common.ddl.ActionType;
import io.dingodb.common.ddl.DdlJob;
import io.dingodb.common.ddl.DdlUtil;
import io.dingodb.common.ddl.GcDeleteRegion;
import io.dingodb.common.log.LogUtils;
import io.dingodb.common.metrics.DingoMetrics;
import io.dingodb.common.session.Session;
Expand Down Expand Up @@ -270,13 +271,16 @@ public static boolean gcDeleteDone(
Object ts,
long regionId,
Object startKey,
Object endKey
Object endKey,
String eleId,
String eleType
) {
LogUtils.info(log, "gcDeleteDone start, jobId:{}, regionId:{}", jobId, regionId);
String sql = "insert into mysql.gc_delete_range_done(job_id, region_id, ts, start_key, end_key)"
+ " values(%d, %d, %d, %s, %s)";
String sql = "insert into mysql.gc_delete_range_done(job_id, region_id, ts, start_key, end_key, "
+ " element_id, element_type)"
+ " values(%d, %d, %d, %s, %s, %s, %s)";
sql = String.format(sql, jobId, regionId, ts, Utils.quoteForSql(startKey.toString()),
Utils.quoteForSql(endKey.toString()));
Utils.quoteForSql(endKey.toString()), Utils.quoteForSql(eleId), Utils.quoteForSql(eleType));
Session session = SessionUtil.INSTANCE.getSession();
session.setAutoCommit(false);
session.executeUpdate(sql);
Expand All @@ -288,4 +292,22 @@ public static boolean gcDeleteDone(
return true;
}

public static void insertGcDeleteRange(
GcDeleteRegion gcDeleteRegion
) {
String sql = "insert into mysql.gc_delete_range"
+ "(job_id, region_id, start_key, end_key, ts, element_id, element_type) "
+ "values";
String conditionValue = "(%d, %d, %s, %s, %d, %s, %s)";
conditionValue = String.format(conditionValue, gcDeleteRegion.getJobId(),
gcDeleteRegion.getRegionId(), Utils.quoteForSql(gcDeleteRegion.getStartKey()),
Utils.quoteForSql(gcDeleteRegion.getEndKey()), gcDeleteRegion.getStartTs(),
Utils.quoteForSql(gcDeleteRegion.getEleId()), Utils.quoteForSql(gcDeleteRegion.getEleType()));
sql = sql + conditionValue;
String error = SessionUtil.INSTANCE.exeUpdateInTxn(sql);
if (error != null) {
LogUtils.error(log, "insert into gc delete region error,reason:{}", error);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public static void recoverTable(
List<Object> indexList
) {
// remove gc_delete_range to gc_delete_range_done
String sql = "select region_id,start_key,end_key,job_id,ts from mysql.gc_delete_range where ts<"
String sql = "select region_id,start_key,end_key,job_id,ts, element_id, element_type from mysql.gc_delete_range where ts<"
+ recoverInfo.getDropJobId();
Session session = SessionUtil.INSTANCE.getSession();
try {
Expand All @@ -197,7 +197,9 @@ public static void recoverTable(
long ts = (long) objects[4];
String startKey = objects[1].toString();
String endKey = objects[2].toString();
if (!JobTableUtil.gcDeleteDone(jobId, ts, regionId, startKey, endKey)) {
String eleId = objects[5].toString();
String eleType = objects[6].toString();
if (!JobTableUtil.gcDeleteDone(jobId, ts, regionId, startKey, endKey, eleId, eleType)) {
LogUtils.error(log, "remove gcDeleteTask failed");
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public static List<Object[]> getGlobalVariablesList() {
values.add(new Object[]{"txn_retry", "off"});
values.add(new Object[]{"txn_retry_cnt", "0"});
values.add(new Object[]{"enable_safe_point_update", "1"});
values.add(new Object[]{"txn_history_duration", String.valueOf(60 * 10)});
values.add(new Object[]{"txn_history_duration", String.valueOf(60 * 3)});
values.add(new Object[]{"slow_query_enable", "on"});
values.add(new Object[]{"slow_query_threshold", "5000"});
values.add(new Object[]{"sql_profile_enable", "on"});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import io.dingodb.cluster.ClusterService;
import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.config.DingoConfiguration;
import io.dingodb.common.environment.ExecutionEnvironment;
import io.dingodb.common.log.LogUtils;
import io.dingodb.common.session.Session;
import io.dingodb.common.session.SessionUtil;
import io.dingodb.common.tenant.TenantConstant;
import io.dingodb.common.util.Optional;
import io.dingodb.exec.transaction.impl.TransactionManager;
import io.dingodb.meta.InfoSchemaService;
import io.dingodb.net.api.ApiRegistry;
import io.dingodb.sdk.service.CoordinatorService;
import io.dingodb.sdk.service.DocumentService;
Expand Down Expand Up @@ -115,16 +115,20 @@ public static void run() {
isLeader = true;
LogUtils.info(log, "Start safe point update task.");
ScheduledFuture<?> future = Executors.scheduleWithFixedDelay(
lockKeyStr, SafePointUpdateTask::safePointUpdate, 1, 600, TimeUnit.SECONDS
lockKeyStr, SafePointUpdateTask::safePointUpdate, 1, 300, TimeUnit.SECONDS
);
ScheduledFuture<?> regionDelFuture = Executors.scheduleWithFixedDelay(
lockKeyStr, SafePointUpdateTask::gcDeleteRegion, 60, 60, TimeUnit.SECONDS
);
lock.watchDestroy().thenRun(() -> {
future.cancel(true);
regionDelFuture.cancel(true);
isLeader = false;
lockService.cancel();
run();
});
} catch (Exception e) {
lockService.cancel();

run();
}
});
Expand Down Expand Up @@ -190,11 +194,6 @@ private static void safePointUpdate() {
Services.coordinatorService(coordinators).updateGCSafePoint(
reqTs, request
);
if (TenantConstant.TENANT_ID == 0) {
gcDeleteRange(request.getSafePoint());
} else {
gcDeleteRange(request.getTenantSafePoints().get(TenantConstant.TENANT_ID));
}
} else {
LogUtils.info(log, "Safe point update task disabled, skip call coordinator.");
}
Expand All @@ -206,6 +205,16 @@ private static void safePointUpdate() {
}
}

private static void gcDeleteRegion() {
long currentTime = System.currentTimeMillis();
String gcLifeTimeStr = InfoSchemaService.root().getGlobalVariables().get("txn_history_duration");
long gcLifeTime = Long.parseLong(gcLifeTimeStr);
long safePointTs = currentTime - (gcLifeTime * 1000);
long tso = TsoService.getDefault().tso(safePointTs);
LogUtils.info(log, "gcDeleteRegion tso:{}", tso);
gcDeleteRange(tso);
}

private static void gcDeleteRange(long startTs) {
String sql = "select region_id,start_key,end_key,job_id,ts, element_id, element_type"
+ " from mysql.gc_delete_range where ts<" + startTs;
Expand All @@ -230,14 +239,14 @@ private static void gcDeleteRange(long startTs) {
long ts = (long) objects[4];
String startKey = objects[1].toString();
String endKey = objects[2].toString();
if (!JobTableUtil.gcDeleteDone(jobId, ts, regionId, startKey, endKey)) {
String eleId = (String) objects[5];
String eleType = (String) objects[6];
if (!JobTableUtil.gcDeleteDone(jobId, ts, regionId, startKey, endKey, eleId, eleType)) {
LogUtils.error(log, "remove gcDeleteTask failed");
} else {
delDone.incrementAndGet();
}
String eleType = (String) objects[6];
if (eleType.endsWith("auto")) {
String eleId = (String) objects[5];
String[] eleIds = eleId.split("-");
DingoCommonId tableId = DingoCommonId.builder()
.parentEntityId(Long.parseLong(eleIds[0]))
Expand Down
Loading

0 comments on commit 2918fe8

Please sign in to comment.