From 2918fe859349fa659ba92c11b99f71bcd6ce4473 Mon Sep 17 00:00:00 2001 From: guojn1 Date: Fri, 6 Dec 2024 15:27:54 +0800 Subject: [PATCH] [feature][dingo-executor] Resolve oom of delRegion --- .../java/io/dingodb/common/ddl/DdlUtil.java | 3 + .../io/dingodb/common/ddl/GcDeleteRegion.java | 45 ++++++++++ .../common/profile/StmtSummaryMap.java | 23 ++--- .../java/io/dingodb/common/util/Utils.java | 11 +++ .../io/dingodb/driver/DingoDriverParser.java | 1 - .../server/executor/ddl/DdlServer.java | 29 +++++++ .../server/executor/ddl/JobTableUtil.java | 30 ++++++- .../server/executor/ddl/TableUtil.java | 6 +- .../server/executor/prepare/PrepareMeta.java | 2 +- .../schedule/SafePointUpdateTask.java | 31 ++++--- .../main/resources/mysql-gcDeleteRange.json | 20 ++--- .../resources/mysql-gcDeleteRangeDone.json | 34 ++++++-- .../dingodb/store/proxy/meta/MetaService.java | 87 ++++++++----------- 13 files changed, 224 insertions(+), 98 deletions(-) create mode 100644 dingo-common/src/main/java/io/dingodb/common/ddl/GcDeleteRegion.java diff --git a/dingo-common/src/main/java/io/dingodb/common/ddl/DdlUtil.java b/dingo-common/src/main/java/io/dingodb/common/ddl/DdlUtil.java index 4b4eeb63db..fb45bcf6d4 100644 --- a/dingo-common/src/main/java/io/dingodb/common/ddl/DdlUtil.java +++ b/dingo-common/src/main/java/io/dingodb/common/ddl/DdlUtil.java @@ -31,7 +31,9 @@ import io.dingodb.common.type.scalar.TimestampType; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; public final class DdlUtil { @@ -64,6 +66,7 @@ public final class DdlUtil { public static int errorCountLimit = 5; public static final String ddlId = String.format("%s:%d", DingoConfiguration.host(), DingoConfiguration.port()); + public static BlockingQueue gcDelRegionQueue = new LinkedBlockingDeque<>(10000); private DdlUtil() { } diff --git a/dingo-common/src/main/java/io/dingodb/common/ddl/GcDeleteRegion.java b/dingo-common/src/main/java/io/dingodb/common/ddl/GcDeleteRegion.java new file mode 100644 index 0000000000..dba5514e63 --- /dev/null +++ b/dingo-common/src/main/java/io/dingodb/common/ddl/GcDeleteRegion.java @@ -0,0 +1,45 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.common.ddl; + +import lombok.Builder; +import lombok.Data; + +@Builder +@Data +public class GcDeleteRegion { + long jobId; + long regionId; + String startKey; + String endKey; + long startTs; + String eleId; + String eleType; + + public GcDeleteRegion( + long jobId, long regionId, String startKey, String endKey, long startTs, + String eleId, String eleType + ) { + this.jobId = jobId; + this.regionId = regionId; + this.startKey = startKey; + this.endKey = endKey; + this.startTs = startTs; + this.eleId = eleId; + this.eleType = eleType; + } +} diff --git a/dingo-common/src/main/java/io/dingodb/common/profile/StmtSummaryMap.java b/dingo-common/src/main/java/io/dingodb/common/profile/StmtSummaryMap.java index b8309e0061..69ab5d6d6d 100644 --- a/dingo-common/src/main/java/io/dingodb/common/profile/StmtSummaryMap.java +++ b/dingo-common/src/main/java/io/dingodb/common/profile/StmtSummaryMap.java @@ -50,7 +50,7 @@ public static Iterator iterator() { .iterator(); } - static { + static { stmtSummaryMap = CacheBuilder.newBuilder() .maximumSize(4096) .build(new CacheLoader() { @@ -63,17 +63,18 @@ public static Iterator iterator() { Executors.execute("stmtSummary", StmtSummaryMap::handleProfile); analyzeQueue = new LinkedBlockingDeque<>(2000); DingoMetrics.metricRegistry.register("profileQueue", new CachedGauge(1, TimeUnit.MINUTES) { - @Override - protected Integer loadValue() { - return profileQueue.size(); - } + @Override + protected Integer loadValue() { + return profileQueue.size(); + } + }); + + DingoMetrics.metricRegistry.register("analyzeTaskQueue", new CachedGauge(1, TimeUnit.MINUTES) { + @Override + protected Integer loadValue() { + return analyzeQueue.size(); + } }); - DingoMetrics.metricRegistry.register("analyzeTaskQueue", new CachedGauge(1, TimeUnit.MINUTES) { - @Override - protected Integer loadValue() { - return analyzeQueue.size(); - } - }); } private static void handleProfile() { diff --git a/dingo-common/src/main/java/io/dingodb/common/util/Utils.java b/dingo-common/src/main/java/io/dingodb/common/util/Utils.java index ed916462e9..43ad6cee91 100644 --- a/dingo-common/src/main/java/io/dingodb/common/util/Utils.java +++ b/dingo-common/src/main/java/io/dingodb/common/util/Utils.java @@ -16,7 +16,9 @@ package io.dingodb.common.util; +import io.dingodb.common.log.LogUtils; import io.dingodb.common.type.TupleMapping; +import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.NonNull; import java.lang.reflect.InvocationTargetException; @@ -37,6 +39,7 @@ import static io.dingodb.common.util.Parameters.cleanNull; +@Slf4j public final class Utils { private Utils() { } @@ -298,5 +301,13 @@ public static void sleep(long waitTime) { } } + public static void put(@NonNull BlockingQueue queue, T element) { + try { + queue.put(element); + } catch (InterruptedException e) { + LogUtils.error(log, e.getMessage(), e); + } + } + public static final int INTEGER_LEN_IN_BYTES = 4; } diff --git a/dingo-driver/host/src/main/java/io/dingodb/driver/DingoDriverParser.java b/dingo-driver/host/src/main/java/io/dingodb/driver/DingoDriverParser.java index 8468a4cfc7..4042a5e106 100644 --- a/dingo-driver/host/src/main/java/io/dingodb/driver/DingoDriverParser.java +++ b/dingo-driver/host/src/main/java/io/dingodb/driver/DingoDriverParser.java @@ -104,7 +104,6 @@ import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.type.BasicSqlType; import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.validate.SqlValidator; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; diff --git a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/DdlServer.java b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/DdlServer.java index eb4788132c..5679c5ea3e 100644 --- a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/DdlServer.java +++ b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/DdlServer.java @@ -16,6 +16,7 @@ package io.dingodb.server.executor.ddl; +import com.codahale.metrics.CachedGauge; import com.codahale.metrics.Timer; import io.dingodb.common.concurrent.Executors; import io.dingodb.common.ddl.DdlJob; @@ -23,6 +24,7 @@ import io.dingodb.common.ddl.DdlJobEventSource; import io.dingodb.common.ddl.DdlJobListenerImpl; import io.dingodb.common.ddl.DdlUtil; +import io.dingodb.common.ddl.GcDeleteRegion; import io.dingodb.common.ddl.JobState; import io.dingodb.common.environment.ExecutionEnvironment; import io.dingodb.common.log.LogUtils; @@ -61,6 +63,19 @@ public static void watchDdlJob() { if (DdlUtil.delDiff) { delVerSchemaDiff(); } + delRegion(); + DingoMetrics.metricRegistry.register("delRegionQueue", new CachedGauge(1, TimeUnit.MINUTES) { + @Override + protected Integer loadValue() { + return DdlUtil.gcDelRegionQueue.size(); + } + }); + DingoMetrics.metricRegistry.register("verDelQueue", new CachedGauge(1, TimeUnit.MINUTES) { + @Override + protected Integer loadValue() { + return verDelQueue.size(); + } + }); } public static void delVerSchemaDiff() { @@ -85,6 +100,20 @@ public static void delVerSchemaDiff() { }).start(); } + public static void delRegion() { + new Thread(() -> { + while (true) { + if (!ExecutionEnvironment.INSTANCE.ddlOwner.get()) { + Utils.sleep(5000); + continue; + } + GcDeleteRegion gcDeleteRegion = Utils.forceTake(DdlUtil.gcDelRegionQueue); + JobTableUtil.insertGcDeleteRange(gcDeleteRegion); + DingoMetrics.counter("insertGcRegionCnt").inc(); + } + }).start(); + } + public static void watchDdlKey() { WatchService watchService = new WatchService(Configuration.coordinators()); Kv kv = Kv.builder().kv(KeyValue.builder() diff --git a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/JobTableUtil.java b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/JobTableUtil.java index e5c2373b06..d9bf34588c 100644 --- a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/JobTableUtil.java +++ b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/JobTableUtil.java @@ -21,6 +21,7 @@ import io.dingodb.common.ddl.ActionType; import io.dingodb.common.ddl.DdlJob; import io.dingodb.common.ddl.DdlUtil; +import io.dingodb.common.ddl.GcDeleteRegion; import io.dingodb.common.log.LogUtils; import io.dingodb.common.metrics.DingoMetrics; import io.dingodb.common.session.Session; @@ -270,13 +271,16 @@ public static boolean gcDeleteDone( Object ts, long regionId, Object startKey, - Object endKey + Object endKey, + String eleId, + String eleType ) { LogUtils.info(log, "gcDeleteDone start, jobId:{}, regionId:{}", jobId, regionId); - String sql = "insert into mysql.gc_delete_range_done(job_id, region_id, ts, start_key, end_key)" - + " values(%d, %d, %d, %s, %s)"; + String sql = "insert into mysql.gc_delete_range_done(job_id, region_id, ts, start_key, end_key, " + + " element_id, element_type)" + + " values(%d, %d, %d, %s, %s, %s, %s)"; sql = String.format(sql, jobId, regionId, ts, Utils.quoteForSql(startKey.toString()), - Utils.quoteForSql(endKey.toString())); + Utils.quoteForSql(endKey.toString()), Utils.quoteForSql(eleId), Utils.quoteForSql(eleType)); Session session = SessionUtil.INSTANCE.getSession(); session.setAutoCommit(false); session.executeUpdate(sql); @@ -288,4 +292,22 @@ public static boolean gcDeleteDone( return true; } + public static void insertGcDeleteRange( + GcDeleteRegion gcDeleteRegion + ) { + String sql = "insert into mysql.gc_delete_range" + + "(job_id, region_id, start_key, end_key, ts, element_id, element_type) " + + "values"; + String conditionValue = "(%d, %d, %s, %s, %d, %s, %s)"; + conditionValue = String.format(conditionValue, gcDeleteRegion.getJobId(), + gcDeleteRegion.getRegionId(), Utils.quoteForSql(gcDeleteRegion.getStartKey()), + Utils.quoteForSql(gcDeleteRegion.getEndKey()), gcDeleteRegion.getStartTs(), + Utils.quoteForSql(gcDeleteRegion.getEleId()), Utils.quoteForSql(gcDeleteRegion.getEleType())); + sql = sql + conditionValue; + String error = SessionUtil.INSTANCE.exeUpdateInTxn(sql); + if (error != null) { + LogUtils.error(log, "insert into gc delete region error,reason:{}", error); + } + } + } diff --git a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/TableUtil.java b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/TableUtil.java index cc64682c7b..b5d6199cc1 100644 --- a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/TableUtil.java +++ b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/TableUtil.java @@ -183,7 +183,7 @@ public static void recoverTable( List indexList ) { // remove gc_delete_range to gc_delete_range_done - String sql = "select region_id,start_key,end_key,job_id,ts from mysql.gc_delete_range where ts<" + String sql = "select region_id,start_key,end_key,job_id,ts, element_id, element_type from mysql.gc_delete_range where ts<" + recoverInfo.getDropJobId(); Session session = SessionUtil.INSTANCE.getSession(); try { @@ -197,7 +197,9 @@ public static void recoverTable( long ts = (long) objects[4]; String startKey = objects[1].toString(); String endKey = objects[2].toString(); - if (!JobTableUtil.gcDeleteDone(jobId, ts, regionId, startKey, endKey)) { + String eleId = objects[5].toString(); + String eleType = objects[6].toString(); + if (!JobTableUtil.gcDeleteDone(jobId, ts, regionId, startKey, endKey, eleId, eleType)) { LogUtils.error(log, "remove gcDeleteTask failed"); } } catch (Exception e) { diff --git a/dingo-executor/src/main/java/io/dingodb/server/executor/prepare/PrepareMeta.java b/dingo-executor/src/main/java/io/dingodb/server/executor/prepare/PrepareMeta.java index a8ddb1cd4f..f0c07d1a4c 100644 --- a/dingo-executor/src/main/java/io/dingodb/server/executor/prepare/PrepareMeta.java +++ b/dingo-executor/src/main/java/io/dingodb/server/executor/prepare/PrepareMeta.java @@ -284,7 +284,7 @@ public static List getGlobalVariablesList() { values.add(new Object[]{"txn_retry", "off"}); values.add(new Object[]{"txn_retry_cnt", "0"}); values.add(new Object[]{"enable_safe_point_update", "1"}); - values.add(new Object[]{"txn_history_duration", String.valueOf(60 * 10)}); + values.add(new Object[]{"txn_history_duration", String.valueOf(60 * 3)}); values.add(new Object[]{"slow_query_enable", "on"}); values.add(new Object[]{"slow_query_threshold", "5000"}); values.add(new Object[]{"sql_profile_enable", "on"}); diff --git a/dingo-executor/src/main/java/io/dingodb/server/executor/schedule/SafePointUpdateTask.java b/dingo-executor/src/main/java/io/dingodb/server/executor/schedule/SafePointUpdateTask.java index df6911ddd8..2900666713 100644 --- a/dingo-executor/src/main/java/io/dingodb/server/executor/schedule/SafePointUpdateTask.java +++ b/dingo-executor/src/main/java/io/dingodb/server/executor/schedule/SafePointUpdateTask.java @@ -20,13 +20,13 @@ import io.dingodb.cluster.ClusterService; import io.dingodb.common.concurrent.Executors; import io.dingodb.common.config.DingoConfiguration; -import io.dingodb.common.environment.ExecutionEnvironment; import io.dingodb.common.log.LogUtils; import io.dingodb.common.session.Session; import io.dingodb.common.session.SessionUtil; import io.dingodb.common.tenant.TenantConstant; import io.dingodb.common.util.Optional; import io.dingodb.exec.transaction.impl.TransactionManager; +import io.dingodb.meta.InfoSchemaService; import io.dingodb.net.api.ApiRegistry; import io.dingodb.sdk.service.CoordinatorService; import io.dingodb.sdk.service.DocumentService; @@ -115,16 +115,20 @@ public static void run() { isLeader = true; LogUtils.info(log, "Start safe point update task."); ScheduledFuture future = Executors.scheduleWithFixedDelay( - lockKeyStr, SafePointUpdateTask::safePointUpdate, 1, 600, TimeUnit.SECONDS + lockKeyStr, SafePointUpdateTask::safePointUpdate, 1, 300, TimeUnit.SECONDS + ); + ScheduledFuture regionDelFuture = Executors.scheduleWithFixedDelay( + lockKeyStr, SafePointUpdateTask::gcDeleteRegion, 60, 60, TimeUnit.SECONDS ); lock.watchDestroy().thenRun(() -> { future.cancel(true); + regionDelFuture.cancel(true); isLeader = false; lockService.cancel(); run(); }); } catch (Exception e) { - lockService.cancel(); + run(); } }); @@ -190,11 +194,6 @@ private static void safePointUpdate() { Services.coordinatorService(coordinators).updateGCSafePoint( reqTs, request ); - if (TenantConstant.TENANT_ID == 0) { - gcDeleteRange(request.getSafePoint()); - } else { - gcDeleteRange(request.getTenantSafePoints().get(TenantConstant.TENANT_ID)); - } } else { LogUtils.info(log, "Safe point update task disabled, skip call coordinator."); } @@ -206,6 +205,16 @@ private static void safePointUpdate() { } } + private static void gcDeleteRegion() { + long currentTime = System.currentTimeMillis(); + String gcLifeTimeStr = InfoSchemaService.root().getGlobalVariables().get("txn_history_duration"); + long gcLifeTime = Long.parseLong(gcLifeTimeStr); + long safePointTs = currentTime - (gcLifeTime * 1000); + long tso = TsoService.getDefault().tso(safePointTs); + LogUtils.info(log, "gcDeleteRegion tso:{}", tso); + gcDeleteRange(tso); + } + private static void gcDeleteRange(long startTs) { String sql = "select region_id,start_key,end_key,job_id,ts, element_id, element_type" + " from mysql.gc_delete_range where ts<" + startTs; @@ -230,14 +239,14 @@ private static void gcDeleteRange(long startTs) { long ts = (long) objects[4]; String startKey = objects[1].toString(); String endKey = objects[2].toString(); - if (!JobTableUtil.gcDeleteDone(jobId, ts, regionId, startKey, endKey)) { + String eleId = (String) objects[5]; + String eleType = (String) objects[6]; + if (!JobTableUtil.gcDeleteDone(jobId, ts, regionId, startKey, endKey, eleId, eleType)) { LogUtils.error(log, "remove gcDeleteTask failed"); } else { delDone.incrementAndGet(); } - String eleType = (String) objects[6]; if (eleType.endsWith("auto")) { - String eleId = (String) objects[5]; String[] eleIds = eleId.split("-"); DingoCommonId tableId = DingoCommonId.builder() .parentEntityId(Long.parseLong(eleIds[0])) diff --git a/dingo-executor/src/main/resources/mysql-gcDeleteRange.json b/dingo-executor/src/main/resources/mysql-gcDeleteRange.json index 4130b8d580..6c5496c270 100644 --- a/dingo-executor/src/main/resources/mysql-gcDeleteRange.json +++ b/dingo-executor/src/main/resources/mysql-gcDeleteRange.json @@ -1,10 +1,19 @@ [ + { + "name": "region_id", + "type": "bigint", + "scale": -2147483648, + "precision": -1, + "default": 0, + "primary": 0, + "nullable": false + }, { "name": "job_id", "type": "bigint", "scale": -2147483648, "precision": -1, - "primary": 0, + "primary": -1, "nullable": false }, { @@ -25,15 +34,6 @@ "primary": -1, "nullable": true }, - { - "name": "region_id", - "type": "bigint", - "scale": -2147483648, - "precision": -1, - "default": 0, - "primary": -1, - "nullable": false - }, { "name": "start_key", "type": "varchar", diff --git a/dingo-executor/src/main/resources/mysql-gcDeleteRangeDone.json b/dingo-executor/src/main/resources/mysql-gcDeleteRangeDone.json index ecdd4803d3..83bbeba7ae 100644 --- a/dingo-executor/src/main/resources/mysql-gcDeleteRangeDone.json +++ b/dingo-executor/src/main/resources/mysql-gcDeleteRangeDone.json @@ -1,20 +1,38 @@ [ + { + "name": "region_id", + "type": "bigint", + "scale": -2147483648, + "precision": -1, + "default": 0, + "primary": 0, + "nullable": false + }, { "name": "job_id", "type": "bigint", "scale": -2147483648, "precision": -1, - "primary": 0, + "primary": -1, "nullable": false }, { - "name": "region_id", - "type": "bigint", - "scale": -2147483648, - "precision": -1, - "default": 0, - "primary": -1, - "nullable": false + "name": "element_id", + "type": "varchar", + "scale": -2147483648, + "precision": -1, + "default": 0, + "primary": -1, + "nullable": true + }, + { + "name": "element_type", + "type": "varchar", + "scale": -2147483648, + "precision": -1, + "default": 0, + "primary": -1, + "nullable": true }, { "name": "start_key", diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/meta/MetaService.java b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/meta/MetaService.java index 4d173fdf8d..862d80f6e2 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/meta/MetaService.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/meta/MetaService.java @@ -20,6 +20,8 @@ import com.google.auto.service.AutoService; import io.dingodb.common.CommonId; import io.dingodb.common.concurrent.Executors; +import io.dingodb.common.ddl.DdlUtil; +import io.dingodb.common.ddl.GcDeleteRegion; import io.dingodb.common.log.LogUtils; import io.dingodb.common.meta.SchemaInfo; import io.dingodb.common.meta.SchemaState; @@ -29,8 +31,6 @@ import io.dingodb.common.partition.PartitionDefinition; import io.dingodb.common.partition.PartitionDetailDefinition; import io.dingodb.common.partition.RangeDistribution; -import io.dingodb.common.session.Session; -import io.dingodb.common.session.SessionUtil; import io.dingodb.common.table.ColumnDefinition; import io.dingodb.common.table.IndexDefinition; import io.dingodb.common.table.TableDefinition; @@ -50,6 +50,7 @@ import io.dingodb.partition.DingoPartitionServiceProvider; import io.dingodb.partition.PartitionService; import io.dingodb.sdk.common.serial.RecordEncoder; +import io.dingodb.sdk.common.utils.ByteArrayUtils; import io.dingodb.sdk.service.CoordinatorService; import io.dingodb.sdk.service.Services; import io.dingodb.sdk.service.entity.common.Engine; @@ -90,10 +91,7 @@ import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.NonNull; -import java.sql.PreparedStatement; -import java.sql.SQLException; import java.util.ArrayList; -import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -993,7 +991,7 @@ public void dropRegionByTable( .values(); if (ScopeVariables.getNeedGc() && jobId >= 0) { Timer.Context context = DingoMetrics.getTimeContext("insertGcDeleteRange"); - insertGcDeleteRange(rangeDistributions, jobId, ts, tableId, autoInc); + gcDeleteRegion(rangeDistributions, jobId, ts, tableId, autoInc); context.stop(); } else { if (autoInc) { @@ -1296,57 +1294,46 @@ public void delAutoInc(DingoCommonId tableId) { } } - public static void insertGcDeleteRange( + public static void gcDeleteRegion( Collection rangeDistributions, long jobId, long ts, CommonId id, boolean autoInc ) { - String sql = "insert into mysql.gc_delete_range" - + "(job_id, region_id, start_key, end_key, ts, element_id, element_type) " - + "values(?, ?, ?, ?, ?, ?, ?)"; - Session session = SessionUtil.INSTANCE.getSession(); - PreparedStatement ps = null; - try { - String eleType = id.type.name(); - if (autoInc) { - eleType = id.type + "_auto"; - } - String eleId = id.domain + "-" + id.seq; - session.setAutoCommit(false); - ps = session.getPrepareStatement(sql); - int i = 0; - for (RangeDistribution rangeDistribution : rangeDistributions) { - ps.setLong(1, jobId); - ps.setLong(2, rangeDistribution.getId().seq); - String startKey = Base64.getEncoder().encodeToString(rangeDistribution.getStartKey()); - String endKey = Base64.getEncoder().encodeToString(rangeDistribution.getEndKey()); - ps.setString(3, startKey); - ps.setString(4, endKey); - ps.setLong(5, ts); - ps.setString(6, eleId); - if (i == 0) { - ps.setString(7, eleType); - } else { - ps.setString(7, id.type.name()); - } - i ++; - ps.addBatch(); - } - ps.executeBatch(); - session.commit(); - } catch (Exception e) { - LogUtils.error(log, e.getMessage(), e); - } finally { - if (ps != null) { - try { - ps.close(); - } catch (SQLException e) { - LogUtils.error(log, e.getMessage(), e); - } + String eleType = id.type.name(); + if (autoInc) { + eleType = id.type + "_auto"; + } + String eleId = id.domain + "-" + id.seq; + int i = 0; + for (RangeDistribution rangeDistribution : rangeDistributions) { + CommonId partitionId = new CommonId(CommonId.CommonType.PARTITION, id.seq, rangeDistribution.id().domain); + byte[] startKey = rangeDistribution.getStartKey(); + byte[] endKey = rangeDistribution.getEndKey(); + startKey = io.dingodb.codec.CodecService.getDefault() + .setId(startKey, partitionId); + endKey = io.dingodb.codec.CodecService.getDefault() + .setId(endKey, partitionId); + GcDeleteRegion gcDeleteRegion = GcDeleteRegion + .builder() + .jobId(jobId) + .regionId(rangeDistribution.getId().seq) + .startTs(ts) + .startKey(ByteArrayUtils.toHex(startKey)) + .endKey(ByteArrayUtils.toHex(endKey)) + .eleId(eleId) + .build(); + if (i == 0) { + gcDeleteRegion.setEleType(eleType); + } else { + gcDeleteRegion.setEleType(id.type.name()); } - session.destroy(); + Utils.put(DdlUtil.gcDelRegionQueue, gcDeleteRegion); + LogUtils.info(log, "gcDelete put queue tableId:{}, regionId:{}, startKey:{}, endKey:{}", + id, gcDeleteRegion.getRegionId(), gcDeleteRegion.getStartTs(), gcDeleteRegion.getEndKey()); + i ++; } } + }