diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/logical/LogicalIndexFullScan.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/logical/LogicalIndexFullScan.java index e1867bfee3..4ade31af94 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/logical/LogicalIndexFullScan.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/logical/LogicalIndexFullScan.java @@ -20,8 +20,8 @@ import io.dingodb.calcite.rel.LogicalDingoTableScan; import io.dingodb.calcite.stats.StatsCache; import io.dingodb.common.CommonId; -import io.dingodb.meta.entity.Table; import io.dingodb.common.type.TupleMapping; +import io.dingodb.meta.entity.Table; import lombok.Getter; import lombok.Setter; import org.apache.calcite.plan.RelOptCluster; @@ -41,9 +41,9 @@ import static io.dingodb.calcite.meta.DingoCostModelV1.getAvgRowSize; import static io.dingodb.calcite.meta.DingoCostModelV1.getScanAvgRowSize; import static io.dingodb.calcite.meta.DingoCostModelV1.getScanCost; -import static io.dingodb.calcite.meta.DingoCostModelV1.scanFactor; import static io.dingodb.calcite.meta.DingoCostModelV1.netFactor; import static io.dingodb.calcite.meta.DingoCostModelV1.scanConcurrency; +import static io.dingodb.calcite.meta.DingoCostModelV1.scanFactor; public class LogicalIndexFullScan extends LogicalDingoTableScan { diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/logical/LogicalIndexScanWithRelOp.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/logical/LogicalIndexScanWithRelOp.java index 33ee9b09db..5d9146f31d 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/logical/LogicalIndexScanWithRelOp.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/logical/LogicalIndexScanWithRelOp.java @@ -52,7 +52,7 @@ public class LogicalIndexScanWithRelOp extends TableScan { @Getter protected boolean rangeScan; - public LogicalIndexScanWithRelOp ( + public LogicalIndexScanWithRelOp( RelOptCluster cluster, RelTraitSet traitSet, List hints, diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoAggTransformRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoAggTransformRule.java index 785f07a307..a96f9f6c4c 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoAggTransformRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoAggTransformRule.java @@ -131,7 +131,9 @@ public static void matchAggCount(DingoAggTransformRule rule, RelOptRuleCall call int ix = (int) val1.getValue(); Column column = table.getColumns().get(ix); int indexIx = indexTable.getColumns().indexOf(column); - RexInputRef rexInputRef = new RexInputRef(indexIx, scan.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER)); + RexInputRef rexInputRef = new RexInputRef( + indexIx, scan.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER) + ); return RexConverter.convert(rexInputRef); }).toArray(Expr[]::new); relOp = RelOpBuilder.builder() @@ -198,7 +200,8 @@ public interface Config extends RelRule.Config { return new DingoAggTransformRule(this); } - /** Forwards a call to {@link #onMatch(RelOptRuleCall)}. */ + /** + * Forwards a call to {@link #onMatch(RelOptRuleCall)}. */ MatchHandler matchHandler(); } diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoAggregateScanRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoAggregateScanRule.java index 0af11b42be..87699c45ea 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoAggregateScanRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoAggregateScanRule.java @@ -66,7 +66,8 @@ public void onMatch(@NonNull RelOptRuleCall call) { if (scan.getSelection() != null) { schema = schema.select(scan.getSelection()); } else if (isCountNoArgListAgg) { - // Optimization scenario similar to this SQL: select count(*) from t1 where sal > 1 and id = 1 and name ='a' + // Optimization scenario similar to this SQL: + // select count(*) from t1 where sal > 1 and id = 1 and name ='a' final List selectedColumns = new ArrayList<>(); final RexVisitorImpl visitor = new RexVisitorImpl(true) { @Override diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoDocumentIndexRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoDocumentIndexRule.java index 347269787b..a4a899e1b8 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoDocumentIndexRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoDocumentIndexRule.java @@ -18,14 +18,14 @@ import com.google.common.collect.ImmutableList; import io.dingodb.calcite.DingoTable; +import io.dingodb.calcite.rel.DingoDocument; import io.dingodb.calcite.rel.DingoGetByIndex; import io.dingodb.calcite.rel.DingoGetByIndexMerge; import io.dingodb.calcite.rel.DingoGetByKeys; import io.dingodb.calcite.rel.DingoGetDocumentPreFilter; import io.dingodb.calcite.rel.DingoTableScan; -import io.dingodb.calcite.rel.DingoDocument; -import io.dingodb.calcite.rel.LogicalDingoDocument; import io.dingodb.calcite.rel.DocumentStreamConvertor; +import io.dingodb.calcite.rel.LogicalDingoDocument; import io.dingodb.calcite.rel.dingo.DingoStreamingConverter; import io.dingodb.calcite.traits.DingoConvention; import io.dingodb.calcite.traits.DingoRelStreaming; @@ -82,7 +82,9 @@ public void onMatch(RelOptRuleCall call) { call.transformTo(relNode); } - public static RelNode getDingoGetDocumentPreFilter(RexNode condition, LogicalDingoDocument document, boolean forJoin) { + public static RelNode getDingoGetDocumentPreFilter( + RexNode condition, LogicalDingoDocument document, boolean forJoin + ) { DingoTable dingoTable = document.getTable().unwrap(DingoTable.class); assert dingoTable != null; TupleMapping selection = getDefaultSelection(dingoTable); @@ -117,7 +119,7 @@ public static RelNode getDingoGetDocumentPreFilter(RexNode condition, LogicalDin DingoTableScan dingoTableScan = new DingoTableScan(document.getCluster(), traitSet, ImmutableList.of(), - document.getTable(), + document.getTable(), condition, selection, null, @@ -128,12 +130,12 @@ public static RelNode getDingoGetDocumentPreFilter(RexNode condition, LogicalDin ); DocumentStreamConvertor documentStreamConvertor = new DocumentStreamConvertor( - document.getCluster(), - document.getTraitSet(), + document.getCluster(), + document.getTraitSet(), dingoTableScan, - document.getIndexTableId(), + document.getIndexTableId(), textIdPair.getKey(), - document.getIndexTable(), + document.getIndexTable(), false); return new DingoGetDocumentPreFilter( document.getCluster(), @@ -210,6 +212,7 @@ default DingoDocumentIndexRule toRule() { return new DingoDocumentIndexRule(this); } } + private static RelNode prePrimaryOrScalarPlan( RexNode condition, LogicalDingoDocument document, @@ -276,6 +279,7 @@ private static RelNode prePrimaryOrScalarPlan( return new DingoStreamingConverter(document.getCluster(), traits, dingoGetDocumentPreFilter); } + private static Pair getTextIdIndex(DingoTable dingoTable) { List indexes = dingoTable.getTable().getIndexes(); for (IndexTable index : indexes) { diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoDocumentJoinRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoDocumentJoinRule.java index 309a2ff272..f423fd3e88 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoDocumentJoinRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoDocumentJoinRule.java @@ -36,7 +36,7 @@ public class DingoDocumentJoinRule extends RelRule /** * Creates a RelRule. * - * @param config + * @param config config */ public DingoDocumentJoinRule(Config config) { super(config); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoFullScanProjectRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoFullScanProjectRule.java index 41a1a84a5a..f02d7a0f8a 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoFullScanProjectRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoFullScanProjectRule.java @@ -61,7 +61,9 @@ public void onMatch(RelOptRuleCall call) { return; } LogicalIndexScanWithRelOp logicalIndexScanWithRelOp = getLogicalIndexScanWithRelOp(scan); - if (logicalIndexScanWithRelOp == null) return; + if (logicalIndexScanWithRelOp == null) { + return; + } call.transformTo( logicalIndexScanWithRelOp ); @@ -106,7 +108,9 @@ public static LogicalIndexScanWithRelOp getLogicalIndexScanWithRelOp(LogicalScan int ix = (int) val1.getValue(); Column column = table.getColumns().get(ix); int indexIx = indexTable.getColumns().indexOf(column); - RexInputRef rexInputRef = new RexInputRef(indexIx, scan.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER)); + RexInputRef rexInputRef = new RexInputRef( + indexIx, scan.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER) + ); return RexConverter.convert(rexInputRef); }).toArray(Expr[]::new); RelOp relOp = RelOpBuilder.builder() @@ -129,7 +133,8 @@ public static LogicalIndexScanWithRelOp getLogicalIndexScanWithRelOp(LogicalScan @Value.Immutable public interface Config extends RelRule.Config { - DingoFullScanProjectRule.Config DINGO_FULL_SCAN_PROJECT_RULE = ImmutableDingoFullScanProjectRule.Config.builder() + DingoFullScanProjectRule.Config DINGO_FULL_SCAN_PROJECT_RULE + = ImmutableDingoFullScanProjectRule.Config.builder() .description("DingoFullScanProjectRule") .operandSupplier(b0 -> b0.operand(LogicalScanWithRelOp.class).trait(Convention.NONE) diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoFunctionScanRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoFunctionScanRule.java index 27f53f8b7d..95be0453a3 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoFunctionScanRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoFunctionScanRule.java @@ -100,8 +100,7 @@ public DingoFunctionScanRule(Config config) { hybridSearch.getFilter(), hybridSearch.hints ); - } - else if (rel instanceof DingoFunctionScan) { + } else if (rel instanceof DingoFunctionScan) { DingoFunctionScan scan = (DingoFunctionScan) rel; return new DingoFunctionScan( scan.getCluster(), diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoIndexScanMatchRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoIndexScanMatchRule.java index ef7ef2a0ca..45355269cc 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoIndexScanMatchRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoIndexScanMatchRule.java @@ -31,7 +31,6 @@ import io.dingodb.meta.entity.Column; import io.dingodb.meta.entity.Table; import org.apache.calcite.plan.RelOptRuleCall; - import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.RelCollation; @@ -113,8 +112,12 @@ private static void matchProjectSortOrder(DingoIndexScanMatchRule rule, RelOptRu new DingoRelCollationImpl(ImmutableList.of(), true, keepSerialOrder))); return; } - } else return; - if (RuleUtils.matchTablePrimary(logicalSort)) return; + } else { + return; + } + if (RuleUtils.matchTablePrimary(logicalSort)) { + return; + } int[] ixs = scan.getSelection().getMappings(); List ixList = new ArrayList<>(); @@ -127,7 +130,9 @@ private static void matchProjectSortOrder(DingoIndexScanMatchRule rule, RelOptRu return; } - if (validateNotRemoveSort(result.matchIndexTable, orderCol)) return; + if (validateNotRemoveSort(result.matchIndexTable, orderCol)) { + return; + } RelCollation relCollation = RelCollationImpl.of(new ArrayList<>()); @@ -203,9 +208,13 @@ private static void matchSort(DingoIndexScanMatchRule rule, RelOptRuleCall call) new DingoRelCollationImpl(ImmutableList.of(), true, keepSerialOrder))); return; } - } else return; + } else { + return; + } - if (RuleUtils.matchTablePrimary(logicalSort)) return; + if (RuleUtils.matchTablePrimary(logicalSort)) { + return; + } int[] ixs = scan.getSelection().getMappings(); List ixList = new ArrayList<>(); @@ -260,7 +269,9 @@ public static void indexRange(DingoIndexScanMatchRule rule, RelOptRuleCall call) return; } LogicalProject logicalProject1 = getLogicalProject(scan, logicalProject); - if (logicalProject1 == null) return; + if (logicalProject1 == null) { + return; + } call.transformTo(logicalProject1); } @@ -329,7 +340,9 @@ public static void indexRangeAsc(DingoIndexScanMatchRule rule, RelOptRuleCall ca if (disableIndex) { return; } - if (RuleUtils.matchTablePrimary(logicalSort)) return; + if (RuleUtils.matchTablePrimary(logicalSort)) { + return; + } List relFieldCollationList = logicalSort.getCollation().getFieldCollations(); if (relFieldCollationList.size() != 1) { @@ -349,7 +362,9 @@ public static void indexRangeAsc(DingoIndexScanMatchRule rule, RelOptRuleCall ca if (orderCol == null) { return; } - if (validateNotRemoveSort(logicalIndexRangeScan.getIndexTable(), orderCol)) return; + if (validateNotRemoveSort(logicalIndexRangeScan.getIndexTable(), orderCol)) { + return; + } int keepSerialOrder = RuleUtils.getSerialOrder(relFieldCollation); if (RuleUtils.preventRemoveOrder(keepSerialOrder)) { return; @@ -375,7 +390,9 @@ public static void nonLeftmostMatching(DingoIndexScanMatchRule rule, RelOptRuleC LogicalDingoTableScan scan = call.rel(1); RelNode relNode = getIndexFullScanRelNode(project, scan); - if (relNode == null) return; + if (relNode == null) { + return; + } call.transformTo(relNode); } @@ -389,7 +406,9 @@ public static void nonLeftmostMatchingOrder(DingoIndexScanMatchRule rule, RelOpt return; } - if (RuleUtils.matchTablePrimary(logicalSort)) return; + if (RuleUtils.matchTablePrimary(logicalSort)) { + return; + } List relFieldCollationList = logicalSort.getCollation().getFieldCollations(); if (relFieldCollationList.size() != 1) { @@ -406,7 +425,9 @@ public static void nonLeftmostMatchingOrder(DingoIndexScanMatchRule rule, RelOpt } else { return; } - if (validateNotRemoveSort(logicalIndexFullScan.getIndexTable(), orderCol)) return; + if (validateNotRemoveSort(logicalIndexFullScan.getIndexTable(), orderCol)) { + return; + } int keepSerialOrder = RuleUtils.getSerialOrder(relFieldCollation); if (RuleUtils.preventRemoveOrder(keepSerialOrder)) { return; @@ -651,7 +672,8 @@ public interface Config extends RelRule.Config { b0.operand(LogicalSort.class).oneInput(b1 -> b1.operand(LogicalProject.class).oneInput(b2 -> b2.operand(LogicalDingoTableScan.class) - .predicate(scan -> scan.getFilter() == null && !(scan instanceof LogicalIndexFullScan)).noInputs() + .predicate(scan -> scan.getFilter() == null + && !(scan instanceof LogicalIndexFullScan)).noInputs() ) ) ) diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoLikeRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoLikeRule.java index 4cbf353b46..fadbe7e669 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoLikeRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoLikeRule.java @@ -106,7 +106,9 @@ public void onMatch(@NonNull RelOptRuleCall call) { return; } - KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec(td.version, td.tupleType(), td.keyMapping()); + KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( + td.version, td.tupleType(), td.keyMapping() + ); Object[] tuple = new Object[td.getColumns().size()]; byte[] prefixBytes; diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoModifyIndexRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoModifyIndexRule.java index 0094c3ed13..c300a9ade5 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoModifyIndexRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoModifyIndexRule.java @@ -33,7 +33,7 @@ public class DingoModifyIndexRule extends RelRule implements Sub /** * Creates a temporary RelRule for update index col. * - * @param config + * @param config config */ protected DingoModifyIndexRule(Config config) { super(config); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoProjectRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoProjectRule.java index 68124b77f1..267763ea36 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoProjectRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoProjectRule.java @@ -80,44 +80,44 @@ private static void dispatchDistance(List projects, LogicalProject logi RelNode input = logicalProject.getInput(); for (RexNode rexNode : projects) { - if (rexNode instanceof RexCall && ((RexCall)rexNode).op.getName().equalsIgnoreCase("distance")) { - RexCall rexCall = (RexCall) rexNode; - RexNode ref = rexCall.getOperands().get(0); - RexInputRef rexInputRef = (RexInputRef) ref; + if (rexNode instanceof RexCall && ((RexCall)rexNode).op.getName().equalsIgnoreCase("distance")) { + RexCall rexCall = (RexCall) rexNode; + RexNode ref = rexCall.getOperands().get(0); + RexInputRef rexInputRef = (RexInputRef) ref; - DingoTable dingoTable = null; - TupleMapping tupleMapping = null; - if (input instanceof RelSubset) { - RelSubset relSubset = (RelSubset) input; - List relList = relSubset.getRelList(); - for (RelNode rel : relList) { - if (rel instanceof LogicalDingoTableScan) { - tupleMapping = ((LogicalDingoTableScan) rel).getSelection(); - dingoTable = Objects.requireNonNull(rel.getTable()).unwrap(DingoTable.class); - break; - } - } - } else if (input instanceof LogicalDingoTableScan) { - dingoTable = Objects.requireNonNull(input.getTable()).unwrap(DingoTable.class); - tupleMapping = ((LogicalDingoTableScan) input).getSelection(); - } + DingoTable dingoTable = null; + TupleMapping tupleMapping = null; + if (input instanceof RelSubset) { + RelSubset relSubset = (RelSubset) input; + List relList = relSubset.getRelList(); + for (RelNode rel : relList) { + if (rel instanceof LogicalDingoTableScan) { + tupleMapping = ((LogicalDingoTableScan) rel).getSelection(); + dingoTable = Objects.requireNonNull(rel.getTable()).unwrap(DingoTable.class); + break; + } + } + } else if (input instanceof LogicalDingoTableScan) { + dingoTable = Objects.requireNonNull(input.getTable()).unwrap(DingoTable.class); + tupleMapping = ((LogicalDingoTableScan) input).getSelection(); + } - int colIndex = tupleMapping.get(rexInputRef.getIndex()); + int colIndex = tupleMapping.get(rexInputRef.getIndex()); - assert dingoTable != null; - Column column = dingoTable.getTable().getColumns().get(colIndex); - String metricType = getIndexMetricType(dingoTable, column.getName()); - SqlOperator sqlOperator1 = findSqlOperator(metricType); + assert dingoTable != null; + Column column = dingoTable.getTable().getColumns().get(colIndex); + String metricType = getIndexMetricType(dingoTable, column.getName()); + SqlOperator sqlOperator1 = findSqlOperator(metricType); - try { - Field field = RexCall.class.getDeclaredField("op"); - field.setAccessible(true); - field.set(rexCall, sqlOperator1); - field.setAccessible(false); - } catch (Exception e) { - throw new RuntimeException(e); - } - } + try { + Field field = RexCall.class.getDeclaredField("op"); + field.setAccessible(true); + field.set(rexCall, sqlOperator1); + field.setAccessible(false); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } } diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRangeDeleteRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRangeDeleteRule.java index 92c226b25a..e64489681c 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRangeDeleteRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRangeDeleteRule.java @@ -55,7 +55,9 @@ public void onMatch(@NonNull RelOptRuleCall call) { if (td.getEngine() != null && td.getEngine().contains("TXN")) { return; } - KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec(td.version, td.tupleType(), td.keyMapping()); + KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( + td.version, td.tupleType(), td.keyMapping() + ); RangeDistribution range; if (rel.getFilter() == null && (rel.getSelection().size() == rel.getTable().getRowType().getFieldCount())) { range = RangeDistribution.builder() @@ -111,9 +113,9 @@ public interface Config extends RelRule.Config { default: return false; } - } else if(selection != null) { + } else if (selection != null) { // Optimize delete of full table data: delete from t1 - if(selection.size() == r.getTable().getRowType().getFieldCount()) { + if (selection.size() == r.getTable().getRowType().getFieldCount()) { DingoTable dingoTable = r.getTable().unwrap(DingoTable.class); List indexDefinitions = dingoTable.getTable().getIndexes(); return indexDefinitions.size() == 0; diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoScanProjectRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoScanProjectRule.java index 886a2f003f..a115e77a05 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoScanProjectRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoScanProjectRule.java @@ -124,17 +124,17 @@ public Void visitCall(RexCall call) { // select l2Distance(feature, array[1,2]) as store from table [ order by store limit 10] // feature is vector index // transform to post filtering -// if (vectorSelected.size() > 0) { -// LogicalProject logicalProject = getPostVectorFiltering( -// project, -// scan, -// vectorSelected, -// newProjectRexNodes); -// if (logicalProject != null) { -// call.transformTo(logicalProject); -// } -// return; -// } + //if (vectorSelected.size() > 0) { + // LogicalProject logicalProject = getPostVectorFiltering( + // project, + // scan, + // vectorSelected, + // newProjectRexNodes); + // if (logicalProject != null) { + // call.transformTo(logicalProject); + // } + // return; + //} call.transformTo( new LogicalProject( project.getCluster(), diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableCollationRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableCollationRule.java index 7a4023e2d9..69a8e272b6 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableCollationRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableCollationRule.java @@ -114,7 +114,9 @@ private static void logicalScanTandemSortRemove( call.transformTo(logicalSort1); } - private static void logicalScanProjectSortRemove(RelOptRuleCall call, LogicalScanWithRelOp logicalScanWithRelOp, LogicalSort logicalSort) { + private static void logicalScanProjectSortRemove( + RelOptRuleCall call, LogicalScanWithRelOp logicalScanWithRelOp, LogicalSort logicalSort + ) { ProjectOp projectOp = (ProjectOp) logicalScanWithRelOp.getRelOp(); if (!validateProjectOp(projectOp)) { return; @@ -232,7 +234,9 @@ private static void removeSortByOrder( call.transformTo(logicalSort1); } - private static void logicalScanFilterSortRemove(RelOptRuleCall call, LogicalSort logicalSort, LogicalScanWithRelOp logicalScanWithRelOp) { + private static void logicalScanFilterSortRemove( + RelOptRuleCall call, LogicalSort logicalSort, LogicalScanWithRelOp logicalScanWithRelOp + ) { List relFieldCollationList = logicalSort.getCollation().getFieldCollations(); if (relFieldCollationList.size() != 1) { return; diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoVectorIndexRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoVectorIndexRule.java index 0738ff962f..bcc10a31fc 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoVectorIndexRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoVectorIndexRule.java @@ -22,11 +22,11 @@ import io.dingodb.calcite.rel.DingoGetByIndexMerge; import io.dingodb.calcite.rel.DingoGetByKeys; import io.dingodb.calcite.rel.DingoGetVectorByDistance; -import io.dingodb.calcite.rel.dingo.DingoStreamingConverter; import io.dingodb.calcite.rel.DingoTableScan; import io.dingodb.calcite.rel.DingoVector; import io.dingodb.calcite.rel.LogicalDingoVector; import io.dingodb.calcite.rel.VectorStreamConvertor; +import io.dingodb.calcite.rel.dingo.DingoStreamingConverter; import io.dingodb.calcite.traits.DingoConvention; import io.dingodb.calcite.traits.DingoRelStreaming; import io.dingodb.calcite.utils.IndexValueMapSet; diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoVectorJoinRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoVectorJoinRule.java index 3f61d639ee..c97792372c 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoVectorJoinRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoVectorJoinRule.java @@ -16,8 +16,8 @@ package io.dingodb.calcite.rule; -import io.dingodb.calcite.rule.dingo.DingoHashJoinRule; import io.dingodb.calcite.rel.LogicalDingoVector; +import io.dingodb.calcite.rule.dingo.DingoHashJoinRule; import io.dingodb.calcite.traits.DingoConvention; import io.dingodb.calcite.traits.DingoRelStreaming; import io.dingodb.calcite.type.DingoSqlTypeFactory; @@ -37,10 +37,11 @@ @Value.Enclosing public class DingoVectorJoinRule extends RelRule { + /** * Creates a RelRule. * - * @param config + * @param config config */ public DingoVectorJoinRule(Config config) { super(config); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/IndexCompareFilterAggrRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/IndexCompareFilterAggrRule.java index 739680325a..ebc1b2bac9 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/IndexCompareFilterAggrRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/IndexCompareFilterAggrRule.java @@ -70,7 +70,7 @@ public void onMatch(RelOptRuleCall call) { } DingoTable dingoTable = dingoScanWithRelOp.getTable().unwrap(DingoTable.class); Table table = Objects.requireNonNull(dingoTable).getTable(); - if (!(dingoScanWithRelOp.getRelOp() instanceof TandemPipeCacheOp)){ + if (!(dingoScanWithRelOp.getRelOp() instanceof TandemPipeCacheOp)) { return; } TandemPipeCacheOp tandemPipeCacheOp = (TandemPipeCacheOp) dingoScanWithRelOp.getRelOp(); @@ -178,7 +178,9 @@ public void onMatch(RelOptRuleCall call) { if (!rangeScan.get()) { rangeScan.set(ixCol.primaryKeyIndex == 0); } - RexInputRef rexInputRef = new RexInputRef(indexIx, dingoScanWithRelOp.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER)); + RexInputRef rexInputRef = new RexInputRef( + indexIx, dingoScanWithRelOp.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER) + ); Expr indexOpExpr = RexConverter.convert(rexInputRef); return new BinaryOpExpr(binaryOpExpr.getOp(), indexOpExpr, binaryOpExpr.getOperand1()); }).toArray(Expr[]::new); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/IndexCompareMergeOpRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/IndexCompareMergeOpRule.java index a736d21b4a..a1744dff6d 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/IndexCompareMergeOpRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/IndexCompareMergeOpRule.java @@ -96,7 +96,9 @@ public void onMatch(RelOptRuleCall call) { int ix = (int) val1.getValue(); Column column = table.getColumns().get(indexRangeScan.getSelection().get(ix)); int indexIx = indexTable.getColumns().indexOf(column); - RexInputRef rexInputRef = new RexInputRef(indexIx, indexRangeScan.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER)); + RexInputRef rexInputRef = new RexInputRef( + indexIx, indexRangeScan.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER) + ); return RexConverter.convert(rexInputRef); }).toArray(Expr[]::new); relOp = RelOpBuilder.builder() @@ -111,7 +113,8 @@ public void onMatch(RelOptRuleCall call) { RelOp filterRelOp = null; RexNode rexFilter = indexRangeScan.getFilter(); List columnNames = indexTable.getColumns(); - List indexSelectionList = columnNames.stream().map(table.columns::indexOf).collect(Collectors.toList()); + List indexSelectionList = columnNames.stream() + .map(table.columns::indexOf).collect(Collectors.toList()); Mapping mapping = Mappings.target(indexSelectionList, table.getColumns().size()); if (indexRangeScan.getFilter() != null) { rexFilter = RexUtil.apply(mapping, rexFilter); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/IndexFullScanWithRelOpRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/IndexFullScanWithRelOpRule.java index 2607b3a04f..4fc5082840 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/IndexFullScanWithRelOpRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/IndexFullScanWithRelOpRule.java @@ -65,7 +65,7 @@ public void onMatch(RelOptRuleCall call) { RexNode rexFilter = indexFullScan.getFilter(); List columnNames = indexFullScan.getIndexTable().getColumns(); List indexSelectionList = columnNames - .stream().map(table.columns::indexOf).collect(Collectors.toList()); + .stream().map(table.columns::indexOf).collect(Collectors.toList()); Mapping mapping = Mappings.target(indexSelectionList, table.getColumns().size()); if (indexFullScan.getFilter() != null) { rexFilter = RexUtil.apply(mapping, rexFilter); @@ -84,7 +84,7 @@ public void onMatch(RelOptRuleCall call) { Column column = table.getColumns().get(mappings[i]); int indexIx = indexFullScan.getIndexTable().getColumns().indexOf(column); RexInputRef rexInputRef = new RexInputRef(indexIx, - indexFullScan.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER)); + indexFullScan.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER)); exprs[i] = RexConverter.convert(rexInputRef); } projectRelOp = RelOpBuilder.builder() diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/SubQueryRemoveRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/SubQueryRemoveRule.java index 6265c37056..15bfe43897 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/SubQueryRemoveRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/SubQueryRemoveRule.java @@ -1,12 +1,11 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Copyright 2021 DataCanvas * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -96,27 +95,27 @@ public boolean autoPruneOld() { return true; } - protected RexNode apply(RexSubQuery e, Set variablesSet, + protected RexNode apply(RexSubQuery rexSubQuery, Set variablesSet, RelOptUtil.Logic logic, RelBuilder builder, int inputCount, int offset) { - switch (e.getKind()) { + switch (rexSubQuery.getKind()) { case SCALAR_QUERY: - return rewriteScalarQuery(e, variablesSet, builder, inputCount, offset); + return rewriteScalarQuery(rexSubQuery, variablesSet, builder, inputCount, offset); case ARRAY_QUERY_CONSTRUCTOR: case MAP_QUERY_CONSTRUCTOR: case MULTISET_QUERY_CONSTRUCTOR: - return rewriteCollection(e, variablesSet, builder, + return rewriteCollection(rexSubQuery, variablesSet, builder, inputCount, offset); case SOME: - return rewriteSome(e, variablesSet, builder); + return rewriteSome(rexSubQuery, variablesSet, builder); case IN: - return rewriteIn(e, variablesSet, logic, builder, offset); + return rewriteIn(rexSubQuery, variablesSet, logic, builder, offset); case EXISTS: - return rewriteExists(e, variablesSet, logic, builder); + return rewriteExists(rexSubQuery, variablesSet, logic, builder); case UNIQUE: - return rewriteUnique(e, builder); + return rewriteUnique(rexSubQuery, builder); default: - throw new AssertionError(e.getKind()); + throw new AssertionError(rexSubQuery.getKind()); } } @@ -124,17 +123,17 @@ protected RexNode apply(RexSubQuery e, Set variablesSet, * Rewrites a scalar sub-query into an * {@link org.apache.calcite.rel.core.Aggregate}. * - * @param e Scalar sub-query to rewrite + * @param rexSubQuery Scalar sub-query to rewrite * @param variablesSet A set of variables used by a relational * expression of the specified RexSubQuery * @param builder Builder * @param offset Offset to shift {@link RexInputRef} * @return Expression that may be used to replace the RexSubQuery */ - private static RexNode rewriteScalarQuery(RexSubQuery e, Set variablesSet, + private static RexNode rewriteScalarQuery(RexSubQuery rexSubQuery, Set variablesSet, RelBuilder builder, int inputCount, int offset) { - builder.push(e.rel); - final RelMetadataQuery mq = e.rel.getCluster().getMetadataQuery(); + builder.push(rexSubQuery.rel); + final RelMetadataQuery mq = rexSubQuery.rel.getCluster().getMetadataQuery(); final Boolean unique = mq.areColumnsUnique(builder.peek(), ImmutableBitSet.of()); if (unique == null || !unique) { @@ -150,19 +149,19 @@ private static RexNode rewriteScalarQuery(RexSubQuery e, Set vari * Rewrites a sub-query into a * {@link Collect}. * - * @param e Sub-query to rewrite + * @param rexSubQuery Sub-query to rewrite * @param variablesSet A set of variables used by a relational * expression of the specified RexSubQuery * @param builder Builder * @param offset Offset to shift {@link RexInputRef} * @return Expression that may be used to replace the RexSubQuery */ - private static RexNode rewriteCollection(RexSubQuery e, + private static RexNode rewriteCollection(RexSubQuery rexSubQuery, Set variablesSet, RelBuilder builder, int inputCount, int offset) { - builder.push(e.rel); + builder.push(rexSubQuery.rel); builder.push( - Collect.create(builder.build(), e.getKind(), "x")); + Collect.create(builder.build(), rexSubQuery.getKind(), "x")); builder.join(JoinRelType.INNER, builder.literal(true), variablesSet); return field(builder, inputCount, offset); } @@ -170,11 +169,11 @@ private static RexNode rewriteCollection(RexSubQuery e, /** * Rewrites a SOME sub-query into a {@link Join}. * - * @param e SOME sub-query to rewrite + * @param rexSubQuery SOME sub-query to rewrite * @param builder Builder * @return Expression that may be used to replace the RexSubQuery */ - private static RexNode rewriteSome(RexSubQuery e, Set variablesSet, + private static RexNode rewriteSome(RexSubQuery rexSubQuery, Set variablesSet, RelBuilder builder) { // Most general case, where the left and right keys might have nulls, and // caller requires 3-valued logic return. @@ -196,7 +195,7 @@ private static RexNode rewriteSome(RexSubQuery e, Set variablesSe // select max(deptno) as m, count(*) as c, count(deptno) as d // from emp) as q // - final SqlQuantifyOperator op = (SqlQuantifyOperator) e.op; + final SqlQuantifyOperator op = (SqlQuantifyOperator) rexSubQuery.op; switch (op.comparisonKind) { case GREATER_THAN_OR_EQUAL: case LESS_THAN_OR_EQUAL: @@ -244,7 +243,7 @@ private static RexNode rewriteSome(RexSubQuery e, Set variablesSe // cross join ( // select max(deptno) as m, count(*) as c, count(deptno) as d // from emp) as q - builder.push(e.rel) + builder.push(rexSubQuery.rel) .aggregate(builder.groupKey(), builder.aggregateCall(minMax, builder.field(0)).as("m"), builder.count(false, "c"), @@ -256,13 +255,13 @@ private static RexNode rewriteSome(RexSubQuery e, Set variablesSe literalFalse, builder.call(SqlStdOperatorTable.IS_TRUE, builder.call(RexUtil.op(op.comparisonKind), - e.operands.get(0), builder.field("q", "m"))), + rexSubQuery.operands.get(0), builder.field("q", "m"))), literalTrue, builder.greaterThan(builder.field("q", "c"), builder.field("q", "d")), literalUnknown, builder.call(RexUtil.op(op.comparisonKind), - e.operands.get(0), builder.field("q", "m"))); + rexSubQuery.operands.get(0), builder.field("q", "m"))); break; case NOT_EQUALS: @@ -285,7 +284,7 @@ private static RexNode rewriteSome(RexSubQuery e, Set variablesSe // cross join ( // select count(*) as c, count(deptno) as d, max(deptno) as m // from (select distinct deptno from emp)) as q - builder.push(e.rel); + builder.push(rexSubQuery.rel); builder.distinct() .aggregate(builder.groupKey(), builder.count(false, "c"), @@ -296,17 +295,17 @@ private static RexNode rewriteSome(RexSubQuery e, Set variablesSe caseRexNode = builder.call(SqlStdOperatorTable.CASE, builder.equals(builder.field("c"), builder.literal(0)), literalFalse, - builder.isNull(e.getOperands().get(0)), + builder.isNull(rexSubQuery.getOperands().get(0)), literalUnknown, builder.and( builder.notEquals(builder.field("d"), builder.field("c")), builder.lessThanOrEqual(builder.field("d"), builder.literal(1))), builder.or( - builder.notEquals(e.operands.get(0), builder.field("q", "m")), + builder.notEquals(rexSubQuery.operands.get(0), builder.field("q", "m")), literalUnknown), builder.equals(builder.field("d"), builder.literal(1)), - builder.notEquals(e.operands.get(0), builder.field("q", "m")), + builder.notEquals(rexSubQuery.operands.get(0), builder.field("q", "m")), literalTrue); break; @@ -342,7 +341,7 @@ private static RexNode rewriteSome(RexSubQuery e, Set variablesSe // select name, max(deptno) as m, count(*) as c, count(deptno) as d, // "alwaysTrue" as indicator // from emp group by name) as q on e.name = q.name - builder.push(e.rel) + builder.push(rexSubQuery.rel) .aggregate(builder.groupKey(), builder.aggregateCall(minMax, builder.field(0)).as("m"), builder.count(false, "c"), @@ -359,13 +358,13 @@ private static RexNode rewriteSome(RexSubQuery e, Set variablesSe literalFalse, builder.call(SqlStdOperatorTable.IS_TRUE, builder.call(RexUtil.op(op.comparisonKind), - e.operands.get(0), builder.field("q", "m"))), + rexSubQuery.operands.get(0), builder.field("q", "m"))), literalTrue, builder.greaterThan(builder.field("q", "c"), builder.field("q", "d")), literalUnknown, builder.call(RexUtil.op(op.comparisonKind), - e.operands.get(0), builder.field("q", "m"))); + rexSubQuery.operands.get(0), builder.field("q", "m"))); break; case NOT_EQUALS: @@ -394,7 +393,7 @@ private static RexNode rewriteSome(RexSubQuery e, Set variablesSe // select name, count(distinct *) as c, count(distinct deptno) as d, // max(deptno) as m, "alwaysTrue" as indicator // from emp group by name) as q on e.name = q.name - builder.push(e.rel) + builder.push(rexSubQuery.rel) .aggregate(builder.groupKey(), builder.count(true, "c"), builder.count(true, "d", builder.field(0)), @@ -409,17 +408,17 @@ private static RexNode rewriteSome(RexSubQuery e, Set variablesSe literalFalse, builder.equals(builder.field("c"), builder.literal(0)), literalFalse, - builder.isNull(e.getOperands().get(0)), + builder.isNull(rexSubQuery.getOperands().get(0)), literalUnknown, builder.and( builder.notEquals(builder.field("d"), builder.field("c")), builder.lessThanOrEqual(builder.field("d"), builder.literal(1))), builder.or( - builder.notEquals(e.operands.get(0), builder.field("q", "m")), + builder.notEquals(rexSubQuery.operands.get(0), builder.field("q", "m")), literalUnknown), builder.equals(builder.field("d"), builder.literal(1)), - builder.notEquals(e.operands.get(0), builder.field("q", "m")), + builder.notEquals(rexSubQuery.operands.get(0), builder.field("q", "m")), literalTrue); break; @@ -433,8 +432,8 @@ private static RexNode rewriteSome(RexSubQuery e, Set variablesSe // is guaranteed for case statement to not produce NULLs. Therefore to avoid // planner complaining we need to add cast. Note that nullable type is // created due to the MIN aggregate call, since there is no GROUP BY. - if (!e.getType().isNullable()) { - return builder.cast(caseRexNode, e.getType().getSqlTypeName()); + if (!rexSubQuery.getType().isNullable()) { + return builder.cast(caseRexNode, rexSubQuery.getType().getSqlTypeName()); } return caseRexNode; } @@ -442,27 +441,27 @@ private static RexNode rewriteSome(RexSubQuery e, Set variablesSe /** * Rewrites an EXISTS RexSubQuery into a {@link Join}. * - * @param e EXISTS sub-query to rewrite + * @param rexSubQuery EXISTS sub-query to rewrite * @param variablesSet A set of variables used by a relational * expression of the specified RexSubQuery * @param logic Logic for evaluating * @param builder Builder * @return Expression that may be used to replace the RexSubQuery */ - private static RexNode rewriteExists(RexSubQuery e, Set variablesSet, + private static RexNode rewriteExists(RexSubQuery rexSubQuery, Set variablesSet, RelOptUtil.Logic logic, RelBuilder builder) { // If the sub-query is guaranteed to produce at least one row, just return // TRUE. - final RelMetadataQuery mq = e.rel.getCluster().getMetadataQuery(); - final Double minRowCount = mq.getMinRowCount(e.rel); + final RelMetadataQuery mq = rexSubQuery.rel.getCluster().getMetadataQuery(); + final Double minRowCount = mq.getMinRowCount(rexSubQuery.rel); if (minRowCount != null && minRowCount >= 1D) { return builder.literal(true); } - final Double maxRowCount = mq.getMaxRowCount(e.rel); + final Double maxRowCount = mq.getMaxRowCount(rexSubQuery.rel); if (maxRowCount != null && maxRowCount < 1D) { return builder.literal(false); } - builder.push(e.rel); + builder.push(rexSubQuery.rel); builder.project(builder.alias(builder.literal(true), "i")); switch (logic) { case TRUE: @@ -510,18 +509,18 @@ private static RexNode rewriteExists(RexSubQuery e, Set variables * ) * } * - * @param e UNIQUE sub-query to rewrite + * @param rexSubQuery UNIQUE sub-query to rewrite * @param builder Builder * @return Expression that may be used to replace the RexSubQuery */ - private static RexNode rewriteUnique(RexSubQuery e, RelBuilder builder) { + private static RexNode rewriteUnique(RexSubQuery rexSubQuery, RelBuilder builder) { // if sub-query always return unique value. - final RelMetadataQuery mq = e.rel.getCluster().getMetadataQuery(); - Boolean isUnique = mq.areRowsUnique(e.rel, true); + final RelMetadataQuery mq = rexSubQuery.rel.getCluster().getMetadataQuery(); + Boolean isUnique = mq.areRowsUnique(rexSubQuery.rel, true); if (isUnique != null && isUnique) { return builder.getRexBuilder().makeLiteral(true); } - builder.push(e.rel); + builder.push(rexSubQuery.rel); List notNullCondition = builder.fields().stream() .map(builder::isNotNull) @@ -538,7 +537,7 @@ private static RexNode rewriteUnique(RexSubQuery e, RelBuilder builder) { /** * Rewrites an IN RexSubQuery into a {@link Join}. * - * @param e IN sub-query to rewrite + * @param rexSubQuery IN sub-query to rewrite * @param variablesSet A set of variables used by a relational * expression of the specified RexSubQuery * @param logic Logic for evaluating @@ -546,7 +545,7 @@ private static RexNode rewriteUnique(RexSubQuery e, RelBuilder builder) { * @param offset Offset to shift {@link RexInputRef} * @return Expression that may be used to replace the RexSubQuery */ - private static RexNode rewriteIn(RexSubQuery e, Set variablesSet, + private static RexNode rewriteIn(RexSubQuery rexSubQuery, Set variablesSet, RelOptUtil.Logic logic, RelBuilder builder, int offset) { // Most general case, where the left and right keys might have nulls, and // caller requires 3-valued logic return. @@ -602,7 +601,7 @@ private static RexNode rewriteIn(RexSubQuery e, Set variablesSet, // on e.deptno = dt.deptno // - builder.push(e.rel); + builder.push(rexSubQuery.rel); final List fields = new ArrayList<>(builder.fields()); // for the case when IN has only literal operands, it may be handled @@ -629,10 +628,10 @@ private static RexNode rewriteIn(RexSubQuery e, Set variablesSet, // order by cs desc limit 1) as dt // - boolean allLiterals = RexUtil.allLiterals(e.getOperands()); - final List expressionOperands = new ArrayList<>(e.getOperands()); + boolean allLiterals = RexUtil.allLiterals(rexSubQuery.getOperands()); + final List expressionOperands = new ArrayList<>(rexSubQuery.getOperands()); - final List keyIsNulls = e.getOperands().stream() + final List keyIsNulls = rexSubQuery.getOperands().stream() .filter(operand -> operand.getType().isNullable()) .map(builder::isNull) .collect(Collectors.toList()); @@ -705,7 +704,7 @@ private static RexNode rewriteIn(RexSubQuery e, Set variablesSet, builder.join(JoinRelType.INNER, trueLiteral, variablesSet); } offset += 2; - builder.push(e.rel); + builder.push(rexSubQuery.rel); // fall through default: fields.add(builder.alias(trueLiteral, "i")); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexMergeVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexMergeVisitFun.java index 0b068e89c0..060e98aec3 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexMergeVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexMergeVisitFun.java @@ -92,8 +92,9 @@ public static Collection visit( NavigableMap indexRanges = metaService.getRangeDistribution(idxId); List keyTuples = TableUtils.getTuplesForKeyMapping(indexValSet.getValue(), indexTd); - KeyValueCodec codec = - CodecService.getDefault().createKeyValueCodec(indexTd.version, indexTd.tupleType(), indexTd.keyMapping()); + KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( + indexTd.version, indexTd.tupleType(), indexTd.keyMapping() + ); List keyList = new ArrayList<>(); for (Object[] keyTuple : keyTuples) { byte[] keys = codec.encodeKeyPrefix(keyTuple, calculatePrefixCount(keyTuple)); @@ -177,7 +178,9 @@ public static Collection visit( } List inputs = DingoCoalesce.coalesce(idGenerator, outputs); - return DingoBridge.bridge(idGenerator, inputs, new DingoGetByIndexMergeVisitFun.OperatorSupplier(rel, lookupKeyMapping)); + return DingoBridge.bridge( + idGenerator, inputs, new DingoGetByIndexMergeVisitFun.OperatorSupplier(rel, lookupKeyMapping) + ); } @AllArgsConstructor diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetVectorByDistanceVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetVectorByDistanceVisitFun.java index e27f1bb6cd..8fb6b9a6d9 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetVectorByDistanceVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetVectorByDistanceVisitFun.java @@ -40,8 +40,8 @@ import java.util.function.Supplier; import static io.dingodb.calcite.rel.DingoRel.dingo; -import static io.dingodb.calcite.visitor.function.DingoVectorVisitFun.getVectorFloats; import static io.dingodb.calcite.visitor.function.DingoVectorVisitFun.getTopkParam; +import static io.dingodb.calcite.visitor.function.DingoVectorVisitFun.getVectorFloats; import static io.dingodb.exec.utils.OperatorCodeUtils.VECTOR_POINT_DISTANCE; public final class DingoGetVectorByDistanceVisitFun { diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoVectorVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoVectorVisitFun.java index 831ae32b63..4a356ebe23 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoVectorVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoVectorVisitFun.java @@ -115,12 +115,8 @@ public static Collection visit( DingoRelOptTable relTable = rel.getTable(); DingoTable dingoTable = relTable.unwrap(DingoTable.class); - MetaService metaService = MetaService.root(); assert dingoTable != null; - CommonId tableId = dingoTable.getTableId(); - Table td = dingoTable.getTable(); - NavigableMap ranges = metaService.getRangeDistribution(tableId); List operandsList = rel.getOperands(); SqlIdentifier vectorColNmIdf = (SqlIdentifier) operandsList.get(1); @@ -128,16 +124,11 @@ public static Collection visit( if (vectorColNmIdf != null) { vectorColNm = vectorColNmIdf.getSimple(); } - Float[] floatArray = getVectorFloats(operandsList); if (!(operandsList.get(3) instanceof SqlNumericLiteral)) { throw new IllegalArgumentException("Top n not number."); } - int topN = ((Number) Objects.requireNonNull(((SqlNumericLiteral) operandsList.get(3)).getValue())).intValue(); - - List outputs = new ArrayList<>(); - IndexTable indexTable = (IndexTable) rel.getIndexTable(); boolean pushDown = pushDown(rel.getFilter(), dingoTable.getTable(), indexTable); RexNode rexFilter = rel.getFilter(); @@ -197,8 +188,13 @@ public static Collection visit( Map parameterMap = getParameterMap(operandsList); // Get all index table distributions NavigableMap indexRanges = - metaService.getRangeDistribution(rel.getIndexTableId()); - + MetaService.root().getRangeDistribution(rel.getIndexTableId()); + Table td = dingoTable.getTable(); + CommonId tableId = dingoTable.getTableId(); + NavigableMap ranges = MetaService.root().getRangeDistribution(tableId); + Float[] floatArray = getVectorFloats(operandsList); + int topN = ((Number) Objects.requireNonNull(((SqlNumericLiteral) operandsList.get(3)).getValue())).intValue(); + List outputs = new ArrayList<>(); // Create tasks based on partitions for (RangeDistribution rangeDistribution : indexRanges.values()) { Vertex vertex; @@ -341,8 +337,7 @@ public static Float[] getVectorFloats(List operandsList) { } public static Integer getTopkParam(List operandsList) { - Integer topK = ((Number) Objects.requireNonNull(((SqlNumericLiteral) operandsList.get(3)).getValue())).intValue(); - return topK; + return ((Number) Objects.requireNonNull(((SqlNumericLiteral) operandsList.get(3)).getValue())).intValue(); } private static Map getParameterMap(List operandsList) { diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartRangeScanOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartRangeScanOperator.java index 64dcd1208c..00a6e7b346 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartRangeScanOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartRangeScanOperator.java @@ -77,13 +77,19 @@ private TxnPartRangeScanOperator() { localKVIterator = Iterators.transform( localStore.scan(new StoreInstance.Range(encodeStart, encodeEnd, includeStart, includeEnd)), wrap(ByteUtils::mapping)::apply); - kvKVIterator = kvStore.txnScan(param.getScanTs(), new StoreInstance.Range(startKey, endKey, includeStart, includeEnd), param.getTimeOut()); + kvKVIterator = kvStore.txnScan( + param.getScanTs(), + new StoreInstance.Range(startKey, endKey, includeStart, includeEnd), param.getTimeOut() + ); profile.setTaskType("executor"); } else { localKVIterator = Iterators.transform( localStore.scan(new StoreInstance.Range(encodeStart, encodeEnd, includeStart, includeEnd), coprocessor), wrap(ByteUtils::mapping)::apply); - kvKVIterator = kvStore.txnScan(param.getScanTs(), new StoreInstance.Range(startKey, endKey, includeStart, includeEnd), param.getTimeOut()); + kvKVIterator = kvStore.txnScan( + param.getScanTs(), + new StoreInstance.Range(startKey, endKey, includeStart, includeEnd), param.getTimeOut() + ); profile.setTaskType("corp"); } if (kvKVIterator instanceof ProfileScanIterator) { diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/TransactionStoreInstance.java b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/TransactionStoreInstance.java index a003a41e2b..8243021354 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/TransactionStoreInstance.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/TransactionStoreInstance.java @@ -30,8 +30,8 @@ import io.dingodb.exec.transaction.impl.TransactionManager; import io.dingodb.exec.transaction.util.TransactionUtil; import io.dingodb.meta.entity.Table; -import io.dingodb.sdk.common.utils.Optional; import io.dingodb.sdk.common.DingoClientException.RequestErrorException; +import io.dingodb.sdk.common.utils.Optional; import io.dingodb.sdk.service.DocumentService; import io.dingodb.sdk.service.IndexService; import io.dingodb.sdk.service.Services; @@ -74,8 +74,8 @@ import io.dingodb.store.api.transaction.exception.CommitTsExpiredException; import io.dingodb.store.api.transaction.exception.DuplicateEntryException; import io.dingodb.store.api.transaction.exception.LockWaitException; -import io.dingodb.store.api.transaction.exception.OnePcNeedTwoPcCommit; import io.dingodb.store.api.transaction.exception.OnePcMaxSizeExceedException; +import io.dingodb.store.api.transaction.exception.OnePcNeedTwoPcCommit; import io.dingodb.store.api.transaction.exception.PrimaryMismatchException; import io.dingodb.store.api.transaction.exception.WriteConflictException; import io.dingodb.store.proxy.Configuration; @@ -106,13 +106,18 @@ public class TransactionStoreInstance { private final CommonId partitionId; private final DocumentService documentService; - private final static int VectorKeyLen = 17; + private static final int VectorKeyLen = 17; public TransactionStoreInstance(StoreService storeService, IndexService indexService, CommonId partitionId) { this(storeService, indexService, null, partitionId); } - public TransactionStoreInstance(StoreService storeService, IndexService indexService, DocumentService documentService, CommonId partitionId) { + public TransactionStoreInstance( + StoreService storeService, + IndexService indexService, + DocumentService documentService, + CommonId partitionId + ) { this.storeService = storeService; this.partitionId = partitionId; this.indexService = indexService; @@ -168,8 +173,9 @@ public boolean txnPreWriteRealKey(TxnPreWrite txnPreWrite, long timeOut) { TxnPrewriteRequest request = MAPPER.preWriteTo(txnPreWrite); TxnPrewriteResponse response; - if(request.isTryOnePc() && request.sizeOf() > TransactionUtil.maxRpcDataSize) { - throw new OnePcMaxSizeExceedException("one pc phase Data size exceed in 1pc, max:" + TransactionUtil.maxRpcDataSize + " cur:" +request.sizeOf()); + if (request.isTryOnePc() && request.sizeOf() > TransactionUtil.maxRpcDataSize) { + throw new OnePcMaxSizeExceedException("one pc phase Data size exceed in 1pc, " + + "max:" + TransactionUtil.maxRpcDataSize + " cur:" + request.sizeOf()); } long start1 = System.currentTimeMillis(); @@ -189,7 +195,8 @@ public boolean txnPreWriteRealKey(TxnPreWrite txnPreWrite, long timeOut) { if (response.getTxnResult() == null || response.getTxnResult().isEmpty()) { if (request.isTryOnePc() && response.getOnePcCommitTs() == 0) { //1pc failed, Need 2pc commit, but not 2pc pre-write. - throw new OnePcNeedTwoPcCommit("one pc phase 1pc commit ts is 0 in response, so need 2pc commit, ts:" + response.getOnePcCommitTs()); + throw new OnePcNeedTwoPcCommit("one pc phase 1pc commit ts is 0 in response, " + + "so need 2pc commit, ts:" + response.getOnePcCommitTs()); } return true; } @@ -318,7 +325,10 @@ public boolean txnCommitRealKey(TxnCommit txnCommit) { } } - public Future txnPessimisticLockPrimaryKey(TxnPessimisticLock txnPessimisticLock, long timeOut, boolean ignoreLockWait, List kvRet) { + public Future txnPessimisticLockPrimaryKey( + TxnPessimisticLock txnPessimisticLock, long timeOut, boolean ignoreLockWait, + List kvRet + ) { if (txnPessimisticLock(txnPessimisticLock, timeOut, ignoreLockWait, kvRet)) { LogUtils.info(log, "txn pessimistic heartbeat, startTs:{}, primaryKey is {}", txnPessimisticLock.getStartTs(), Arrays.toString(txnPessimisticLock.getPrimaryLock())); @@ -333,10 +343,14 @@ public Future txnPessimisticLockPrimaryKey(TxnPessimisticLock txnPessimisticLock throw new WriteConflictException(); } - public boolean txnPessimisticLock(TxnPessimisticLock txnPessimisticLock, long timeOut, boolean ignoreLockWait, List kvRet) { + public boolean txnPessimisticLock( + TxnPessimisticLock txnPessimisticLock, long timeOut, boolean ignoreLockWait, + List kvRet + ) { long start = System.currentTimeMillis(); try { - txnPessimisticLock.getMutations().stream().peek($ -> $.setKey(setId($.getKey()))).forEach($ -> $.getKey()[0] = 't'); + txnPessimisticLock.getMutations().stream() + .peek($ -> $.setKey(setId($.getKey()))).forEach($ -> $.getKey()[0] = 't'); IsolationLevel isolationLevel = txnPessimisticLock.getIsolationLevel(); int n = 1; List resolvedLocks = new ArrayList<>(); @@ -345,12 +359,18 @@ public boolean txnPessimisticLock(TxnPessimisticLock txnPessimisticLock, long ti TxnPessimisticLockResponse response; if (indexService != null) { txnPessimisticLock.getMutations().forEach($ -> $.setKey(Arrays.copyOf($.getKey(), VectorKeyLen))); - response = indexService.txnPessimisticLock(txnPessimisticLock.getStartTs(), MAPPER.pessimisticLockTo(txnPessimisticLock)); + response = indexService.txnPessimisticLock( + txnPessimisticLock.getStartTs(), MAPPER.pessimisticLockTo(txnPessimisticLock) + ); } else if (documentService != null) { txnPessimisticLock.getMutations().forEach($ -> $.setKey(Arrays.copyOf($.getKey(), VectorKeyLen))); - response = documentService.txnPessimisticLock(txnPessimisticLock.getStartTs(), MAPPER.pessimisticLockTo(txnPessimisticLock)); + response = documentService.txnPessimisticLock( + txnPessimisticLock.getStartTs(), MAPPER.pessimisticLockTo(txnPessimisticLock) + ); } else { - response = storeService.txnPessimisticLock(txnPessimisticLock.getStartTs(), MAPPER.pessimisticLockTo(txnPessimisticLock)); + response = storeService.txnPessimisticLock( + txnPessimisticLock.getStartTs(), MAPPER.pessimisticLockTo(txnPessimisticLock) + ); } if (response.getTxnResult() == null || response.getTxnResult().isEmpty()) { if (resolveLockFlag == ResolveLockStatus.LOCK_TTL && ignoreLockWait) { @@ -358,19 +378,21 @@ public boolean txnPessimisticLock(TxnPessimisticLock txnPessimisticLock, long ti throw new LockWaitException("Lock wait"); } - if(response.getKvs() != null) { + if (response.getKvs() != null) { kvRet.addAll(response.getKvs().stream().map(MAPPER::kvFrom).collect(Collectors.toList())); - } else if(response.getVector() != null) { + } else if (response.getVector() != null) { kvRet.addAll(response.getVector().stream() - .map(vectorWithId -> vectorWithId != null ? - new io.dingodb.common.store.KeyValue(vectorWithId.getTableData().getTableKey(), + .map(vectorWithId -> vectorWithId != null + ? new io.dingodb.common.store.KeyValue(vectorWithId.getTableData().getTableKey(), vectorWithId.getTableData().getTableValue()) : null) .collect(Collectors.toList())); - } else if(response.getDocuments() != null) { + } else if (response.getDocuments() != null) { kvRet.addAll(response.getDocuments().stream() - .map(documentWithId -> documentWithId != null ? - new io.dingodb.common.store.KeyValue(documentWithId.getDocument().getTableData().getTableKey(), - documentWithId.getDocument().getTableData().getTableValue()) : null) + .map(documentWithId -> documentWithId != null + ? new io.dingodb.common.store.KeyValue( + documentWithId.getDocument().getTableData().getTableKey(), + documentWithId.getDocument().getTableData().getTableValue() + ) : null) .collect(Collectors.toList())); } return true; @@ -422,16 +444,22 @@ public boolean txnPessimisticLockRollback(TxnPessimisticRollBack txnPessimisticR .map(key -> Arrays.copyOf(key, VectorKeyLen)) .collect(Collectors.toList()); txnPessimisticRollBack.setKeys(newKeys); - response = indexService.txnPessimisticRollback(startTs, MAPPER.pessimisticRollBackTo(txnPessimisticRollBack)); + response = indexService.txnPessimisticRollback( + startTs, MAPPER.pessimisticRollBackTo(txnPessimisticRollBack) + ); } else if (documentService != null) { List keys = txnPessimisticRollBack.getKeys(); List newKeys = keys.stream() .map(key -> Arrays.copyOf(key, VectorKeyLen)) .collect(Collectors.toList()); txnPessimisticRollBack.setKeys(newKeys); - response = documentService.txnPessimisticRollback(startTs, MAPPER.pessimisticRollBackTo(txnPessimisticRollBack)); + response = documentService.txnPessimisticRollback( + startTs, MAPPER.pessimisticRollBackTo(txnPessimisticRollBack) + ); } else { - response = storeService.txnPessimisticRollback(startTs, MAPPER.pessimisticRollBackTo(txnPessimisticRollBack)); + response = storeService.txnPessimisticRollback( + startTs, MAPPER.pessimisticRollBackTo(txnPessimisticRollBack) + ); } if (response.getTxnResult() != null && !response.getTxnResult().isEmpty()) { LogUtils.error(log, "txnPessimisticLockRollback txnResult:{}", response.getTxnResult().toString()); @@ -439,8 +467,8 @@ public boolean txnPessimisticLockRollback(TxnPessimisticRollBack txnPessimisticR LockInfo lockInfo = txnResultInfo.getLocked(); if (lockInfo != null && lockInfo.getLockTs() == startTs && lockInfo.getLockType() != Op.Lock) { LogUtils.info(log, "txnPessimisticLockRollback lockInfo:{}", lockInfo.toString()); - TxnBatchRollBack rollBackRequest = TxnBatchRollBack.builder(). - isolationLevel(txnPessimisticRollBack.getIsolationLevel()) + TxnBatchRollBack rollBackRequest = TxnBatchRollBack.builder() + .isolationLevel(txnPessimisticRollBack.getIsolationLevel()) .startTs(startTs) .keys(singletonList(lockInfo.getKey())) .build(); @@ -465,14 +493,6 @@ public Iterator txnScan(long ts, StoreInstance return txnScan(ts, range, timeOut, null); } - public Iterator txnScanWithoutStream( - long ts, StoreInstance.Range range, long timeOut - ) { - Stream.of(range.start).peek(this::setId).forEach($ -> $[0] = 't'); - Stream.of(range.end).peek(this::setId).forEach($ -> $[0] = 't'); - return getScanIterator(ts, range, timeOut, null); - } - public Iterator txnScan( long ts, StoreInstance.Range range, @@ -482,20 +502,30 @@ public Iterator txnScan( Stream.of(range.start).peek(this::setId).forEach($ -> $[0] = 't'); Stream.of(range.end).peek(this::setId).forEach($ -> $[0] = 't'); - if(ScopeVariables.txnScanByStream()) { + if (ScopeVariables.txnScanByStream()) { return getScanStreamIterator(ts, range, timeOut, coprocessor); } else { return getScanIterator(ts, range, timeOut, coprocessor); } } + public Iterator txnScanWithoutStream( + long ts, StoreInstance.Range range, long timeOut + ) { + Stream.of(range.start).peek(this::setId).forEach($ -> $[0] = 't'); + Stream.of(range.end).peek(this::setId).forEach($ -> $[0] = 't'); + return getScanIterator(ts, range, timeOut, null); + } + @NonNull public ScanIterator getScanIterator(long ts, StoreInstance.Range range, long timeOut, CoprocessorV2 coprocessor) { return new ScanIterator(ts, range, timeOut, coprocessor); } @NonNull - public ScanStreamIterator getScanStreamIterator(long ts, StoreInstance.Range range, long timeOut, CoprocessorV2 coprocessor) { + public ScanStreamIterator getScanStreamIterator( + long ts, StoreInstance.Range range, long timeOut, CoprocessorV2 coprocessor + ) { return new ScanStreamIterator(ts, range, timeOut, coprocessor); } @@ -511,7 +541,9 @@ public List getKeyValues(long startTs, List resolvedLocks = new ArrayList<>(); while (true) { - TxnBatchGetRequest txnBatchGetRequest = MAPPER.batchGetTo(startTs, IsolationLevel.SnapshotIsolation, keys); + TxnBatchGetRequest txnBatchGetRequest = MAPPER.batchGetTo( + startTs, IsolationLevel.SnapshotIsolation, keys + ); txnBatchGetRequest.setResolveLocks(resolvedLocks); TxnBatchGetResponse response; if (indexService != null) { @@ -519,19 +551,22 @@ public List getKeyValues(long startTs, List vectorWithId != null ? - new io.dingodb.common.store.KeyValue(vectorWithId.getTableData().getTableKey(), + .map(vectorWithId -> vectorWithId != null + ? new io.dingodb.common.store.KeyValue(vectorWithId.getTableData().getTableKey(), vectorWithId.getTableData().getTableValue()) : null) .collect(Collectors.toList()); } - } else if(documentService != null){ + } else if (documentService != null) { txnBatchGetRequest.getKeys().forEach($ -> Arrays.copyOf($, VectorKeyLen)); response = documentService.txnBatchGet(startTs, txnBatchGetRequest); if (response.getTxnResult() == null) { return response.getDocuments().stream() - .map(documentWithId -> documentWithId != null ? - new io.dingodb.common.store.KeyValue(documentWithId.getDocument().getTableData().getTableKey(), - documentWithId.getDocument().getTableData().getTableValue()): null) + .map(documentWithId -> documentWithId != null + ? new io.dingodb.common.store.KeyValue( + documentWithId.getDocument().getTableData().getTableKey(), + documentWithId.getDocument().getTableData().getTableValue() + ) : null + ) .collect(Collectors.toList()); } } else { @@ -578,12 +613,18 @@ public boolean txnBatchRollback(TxnBatchRollBack txnBatchRollBack) { TxnBatchRollbackResponse response; if (indexService != null) { txnBatchRollBack.getKeys().forEach($ -> Arrays.copyOf($, VectorKeyLen)); - response = indexService.txnBatchRollback(txnBatchRollBack.getStartTs(), MAPPER.rollbackTo(txnBatchRollBack)); + response = indexService.txnBatchRollback( + txnBatchRollBack.getStartTs(), MAPPER.rollbackTo(txnBatchRollBack) + ); } else if (documentService != null) { txnBatchRollBack.getKeys().forEach($ -> Arrays.copyOf($, VectorKeyLen)); - response = documentService.txnBatchRollback(txnBatchRollBack.getStartTs(), MAPPER.rollbackTo(txnBatchRollBack)); + response = documentService.txnBatchRollback( + txnBatchRollBack.getStartTs(), MAPPER.rollbackTo(txnBatchRollBack) + ); } else { - response = storeService.txnBatchRollback(txnBatchRollBack.getStartTs(), MAPPER.rollbackTo(txnBatchRollBack)); + response = storeService.txnBatchRollback( + txnBatchRollBack.getStartTs(), MAPPER.rollbackTo(txnBatchRollBack) + ); } if (response.getTxnResult() != null) { LogUtils.error(log, "txnBatchRollback txnResult:{}", response.getTxnResult().toString()); @@ -631,13 +672,13 @@ public ResolveLockStatus writeResolveConflict(List txnResult, int // CheckTxnStatus LogUtils.debug(log, "startTs:{}, {} lockInfo : {}", startTs, funName, lockInfo); long currentTs = TsoService.INSTANCE.tso(); - TxnCheckStatus txnCheckStatus = TxnCheckStatus.builder(). - isolationLevel(IsolationLevel.of(isolationLevel)). - primaryKey(lockInfo.getPrimaryLock()). - lockTs(lockInfo.getLockTs()). - callerStartTs(startTs). - currentTs(currentTs). - build(); + TxnCheckStatus txnCheckStatus = TxnCheckStatus.builder() + .isolationLevel(IsolationLevel.of(isolationLevel)) + .primaryKey(lockInfo.getPrimaryLock()) + .lockTs(lockInfo.getLockTs()) + .callerStartTs(startTs) + .currentTs(currentTs) + .build(); TxnCheckTxnStatusResponse statusResponse = txnCheckTxnStatus(txnCheckStatus); LogUtils.info(log, "startTs:{}, {} txnCheckStatus : {}", startTs, funName, statusResponse); TxnResultInfo resultInfo = statusResponse.getTxnResult(); @@ -651,8 +692,8 @@ public ResolveLockStatus writeResolveConflict(List txnResult, int || action == Action.TTLExpirePessimisticRollback || action == Action.TTLExpireRollback)) { // pessimistic lock - TxnPessimisticRollBack pessimisticRollBack = TxnPessimisticRollBack.builder(). - isolationLevel(IsolationLevel.of(isolationLevel)) + TxnPessimisticRollBack pessimisticRollBack = TxnPessimisticRollBack.builder() + .isolationLevel(IsolationLevel.of(isolationLevel)) .startTs(lockInfo.getLockTs()) .forUpdateTs(lockInfo.getForUpdateTs()) .keys(Collections.singletonList(lockInfo.getKey())) @@ -664,25 +705,27 @@ public ResolveLockStatus writeResolveConflict(List txnResult, int resolveLockStatus = ResolveLockStatus.LOCK_TTL; } else if (commitTs > 0) { // resolveLock store commit - TxnResolveLock resolveLockRequest = TxnResolveLock.builder(). - isolationLevel(IsolationLevel.of(isolationLevel)). - startTs(lockInfo.getLockTs()). - commitTs(commitTs). - keys(singletonList(lockInfo.getKey())). - build(); + TxnResolveLock resolveLockRequest = TxnResolveLock.builder() + .isolationLevel(IsolationLevel.of(isolationLevel)) + .startTs(lockInfo.getLockTs()) + .commitTs(commitTs) + .keys(singletonList(lockInfo.getKey())) + .build(); TxnResolveLockResponse txnResolveLockRes = txnResolveLock(resolveLockRequest); - LogUtils.info(log, "startTs:{}, {} txnResolveLockResponse: {}", startTs, funName, txnResolveLockRes); + LogUtils.info(log, + "startTs:{}, {} txnResolveLockResponse: {}", startTs, funName, txnResolveLockRes); resolveLockStatus = ResolveLockStatus.COMMIT; } else if (lockTtl == 0 && commitTs == 0) { // resolveLock store rollback - TxnResolveLock resolveLockRequest = TxnResolveLock.builder(). - isolationLevel(IsolationLevel.of(isolationLevel)). - startTs(lockInfo.getLockTs()). - commitTs(commitTs). - keys(singletonList(lockInfo.getKey())). - build(); + TxnResolveLock resolveLockRequest = TxnResolveLock.builder() + .isolationLevel(IsolationLevel.of(isolationLevel)) + .startTs(lockInfo.getLockTs()) + .commitTs(commitTs) + .keys(singletonList(lockInfo.getKey())) + .build(); TxnResolveLockResponse txnResolveLockRes = txnResolveLock(resolveLockRequest); - LogUtils.info(log, "startTs:{}, {} txnResolveLockResponse: {}", startTs, funName, txnResolveLockRes); + LogUtils.info(log, + "startTs:{}, {} txnResolveLockResponse: {}", startTs, funName, txnResolveLockRes); resolveLockStatus = ResolveLockStatus.ROLLBACK; } } else { @@ -693,8 +736,8 @@ public ResolveLockStatus writeResolveConflict(List txnResult, int if (action == Action.LockNotExistRollback || action == Action.TTLExpirePessimisticRollback || action == Action.TTLExpireRollback) { - TxnPessimisticRollBack pessimisticRollBack = TxnPessimisticRollBack.builder(). - isolationLevel(IsolationLevel.of(isolationLevel)) + TxnPessimisticRollBack pessimisticRollBack = TxnPessimisticRollBack.builder() + .isolationLevel(IsolationLevel.of(isolationLevel)) .startTs(lockInfo.getLockTs()) .forUpdateTs(lockInfo.getForUpdateTs()) .keys(Collections.singletonList(lockInfo.getKey())) @@ -711,7 +754,8 @@ public ResolveLockStatus writeResolveConflict(List txnResult, int if (resultInfo.getPrimaryMismatch() != null) { throw new PrimaryMismatchException(resultInfo.getPrimaryMismatch().toString()); } else if (resultInfo.getTxnNotFound() != null) { - LogUtils.warn(log, "startTs:{}, {} txnNotFound : {}", startTs, funName, resultInfo.getTxnNotFound().toString()); + LogUtils.warn(log, "startTs:{}, {} txnNotFound : {}", startTs, funName, + resultInfo.getTxnNotFound().toString()); resolveLockStatus = ResolveLockStatus.TXN_NOT_FOUND; } else if (resultInfo.getLocked() != null) { throw new RuntimeException(resultInfo.getLocked().toString()); @@ -745,13 +789,13 @@ private ResolveLockStatus readResolveConflict(List txnResult, int // CheckTxnStatus LogUtils.debug(log, "startTs:{}, {} lockInfo : {}", startTs, funName, lockInfo); long currentTs = TsoService.INSTANCE.tso(); - TxnCheckStatus txnCheckStatus = TxnCheckStatus.builder(). - isolationLevel(IsolationLevel.of(isolationLevel)). - primaryKey(lockInfo.getPrimaryLock()). - lockTs(lockInfo.getLockTs()). - callerStartTs(startTs). - currentTs(currentTs). - build(); + TxnCheckStatus txnCheckStatus = TxnCheckStatus.builder() + .isolationLevel(IsolationLevel.of(isolationLevel)) + .primaryKey(lockInfo.getPrimaryLock()) + .lockTs(lockInfo.getLockTs()) + .callerStartTs(startTs) + .currentTs(currentTs) + .build(); TxnCheckTxnStatusResponse statusResponse = txnCheckTxnStatus(txnCheckStatus); LogUtils.info(log, "startTs: {}, {} txnCheckStatus : {}", startTs, funName, statusResponse); TxnResultInfo resultInfo = statusResponse.getTxnResult(); @@ -764,8 +808,8 @@ private ResolveLockStatus readResolveConflict(List txnResult, int || action == Action.TTLExpirePessimisticRollback || action == Action.TTLExpireRollback)) { // pessimistic lock - TxnPessimisticRollBack pessimisticRollBack = TxnPessimisticRollBack.builder(). - isolationLevel(IsolationLevel.of(isolationLevel)) + TxnPessimisticRollBack pessimisticRollBack = TxnPessimisticRollBack.builder() + .isolationLevel(IsolationLevel.of(isolationLevel)) .startTs(lockInfo.getLockTs()) .forUpdateTs(lockInfo.getForUpdateTs()) .keys(Collections.singletonList(lockInfo.getKey())) @@ -789,25 +833,27 @@ private ResolveLockStatus readResolveConflict(List txnResult, int } } else if (commitTs > 0) { // resolveLock store commit - TxnResolveLock resolveLockRequest = TxnResolveLock.builder(). - isolationLevel(IsolationLevel.of(isolationLevel)). - startTs(lockInfo.getLockTs()). - commitTs(commitTs). - keys(singletonList(lockInfo.getKey())). - build(); + TxnResolveLock resolveLockRequest = TxnResolveLock.builder() + .isolationLevel(IsolationLevel.of(isolationLevel)) + .startTs(lockInfo.getLockTs()) + .commitTs(commitTs) + .keys(singletonList(lockInfo.getKey())) + .build(); TxnResolveLockResponse txnResolveLockRes = txnResolveLock(resolveLockRequest); - LogUtils.info(log, "startTs:{}, {} txnResolveLockResponse: {}", startTs, funName, txnResolveLockRes); + LogUtils.info(log, "startTs:{}, {} txnResolveLockResponse: {}", + startTs, funName, txnResolveLockRes); resolveLockStatus = ResolveLockStatus.COMMIT; } else if (lockTtl == 0 && commitTs == 0) { // resolveLock store rollback - TxnResolveLock resolveLockRequest = TxnResolveLock.builder(). - isolationLevel(IsolationLevel.of(isolationLevel)). - startTs(lockInfo.getLockTs()). - commitTs(commitTs). - keys(singletonList(lockInfo.getKey())). - build(); + TxnResolveLock resolveLockRequest = TxnResolveLock.builder() + .isolationLevel(IsolationLevel.of(isolationLevel)) + .startTs(lockInfo.getLockTs()) + .commitTs(commitTs) + .keys(singletonList(lockInfo.getKey())) + .build(); TxnResolveLockResponse txnResolveLockRes = txnResolveLock(resolveLockRequest); - LogUtils.info(log, "startTs:{}, {} txnResolveLockResponse: {}", startTs, funName, txnResolveLockRes); + LogUtils.info(log, "startTs:{}, {} txnResolveLockResponse: {}", startTs, + funName, txnResolveLockRes); resolveLockStatus = ResolveLockStatus.ROLLBACK; } } else { @@ -825,8 +871,8 @@ private ResolveLockStatus readResolveConflict(List txnResult, int && (action == Action.LockNotExistRollback || action == Action.TTLExpirePessimisticRollback || action == Action.TTLExpireRollback)) { - TxnPessimisticRollBack pessimisticRollBack = TxnPessimisticRollBack.builder(). - isolationLevel(IsolationLevel.of(isolationLevel)) + TxnPessimisticRollBack pessimisticRollBack = TxnPessimisticRollBack.builder() + .isolationLevel(IsolationLevel.of(isolationLevel)) .startTs(lockInfo.getLockTs()) .forUpdateTs(lockInfo.getForUpdateTs()) .keys(Collections.singletonList(lockInfo.getKey())) @@ -845,7 +891,8 @@ private ResolveLockStatus readResolveConflict(List txnResult, int if (resultInfo.getPrimaryMismatch() != null) { throw new PrimaryMismatchException(resultInfo.getPrimaryMismatch().toString()); } else if (resultInfo.getTxnNotFound() != null) { - LogUtils.warn(log, "startTs:{}, {} txnNotFound : {}", startTs, funName, resultInfo.getTxnNotFound().toString()); + LogUtils.warn(log, "startTs:{}, {} txnNotFound : {}", startTs, funName, + resultInfo.getTxnNotFound().toString()); resolveLockStatus = ResolveLockStatus.TXN_NOT_FOUND; } else if (resultInfo.getLocked() != null) { throw new RuntimeException(resultInfo.getLocked().toString()); @@ -941,7 +988,8 @@ private synchronized void fetch() { || resolveLockStatus == ResolveLockStatus.TXN_NOT_FOUND) { if (scanTimeOut < 0) { LogUtils.info(log, "scanTimeOut < 0, scanTs:{}", txnScanRequest.getStartTs()); - throw new RuntimeException("startTs:" + txnScanRequest.getStartTs() + " resolve lock timeout"); + throw new RuntimeException("startTs:" + txnScanRequest.getStartTs() + + " resolve lock timeout"); } try { long lockTtl = TxnVariables.WaitFixTime; @@ -951,14 +999,16 @@ private synchronized void fetch() { Thread.sleep(lockTtl); n++; scanTimeOut -= lockTtl; - LogUtils.info(log, "scanTs:{}, txnScan lockInfo wait {} ms end.", txnScanRequest.getStartTs(), lockTtl); + LogUtils.info(log, "scanTs:{}, txnScan lockInfo wait {} ms end.", + txnScanRequest.getStartTs(), lockTtl); } catch (InterruptedException e) { throw new RuntimeException(e); } } continue; } - keyValues = Optional.ofNullable(txnScanResponse.getKvs()).map(List::iterator).orElseGet(Collections::emptyIterator); + keyValues = Optional.ofNullable(txnScanResponse.getKvs()) + .map(List::iterator).orElseGet(Collections::emptyIterator); hasMore = txnScanResponse.isHasMore(); if (hasMore) { withStart = false; @@ -1062,7 +1112,7 @@ private synchronized void fetch() { TxnScanRequest txnScanRequest = MAPPER.scanTo(startTs, IsolationLevel.SnapshotIsolation, this.range); txnScanRequest.setLimit(limit); txnScanRequest.setCoprocessor(coprocessor); - if(txnScanRequest.getStreamMeta() == null) { + if (txnScanRequest.getStreamMeta() == null) { txnScanRequest.setStreamMeta(new StreamRequestMeta()); } TxnScanResponse txnScanResponse; @@ -1093,7 +1143,8 @@ private synchronized void fetch() { if (resolveLockStatus == ResolveLockStatus.LOCK_TTL || resolveLockStatus == ResolveLockStatus.TXN_NOT_FOUND) { if (scanTimeOut < 0) { - throw new RuntimeException("startTs:" + txnScanRequest.getStartTs() + " resolve lock timeout"); + throw new RuntimeException("startTs:" + txnScanRequest.getStartTs() + + " resolve lock timeout"); } try { long lockTtl = TxnVariables.WaitFixTime; @@ -1111,24 +1162,29 @@ private synchronized void fetch() { continue; } - if(txnScanResponse.getError() == null) { + if (txnScanResponse.getError() == null) { //get and set stream id for next request. - if(txnScanResponse.getStreamMeta() != null) { + if (txnScanResponse.getStreamMeta() != null) { this.streamId = txnScanResponse.getStreamMeta().getStreamId(); - keyValues = Optional.ofNullable(txnScanResponse.getKvs()).map(List::iterator).orElseGet(Collections::emptyIterator); + keyValues = Optional.ofNullable( + txnScanResponse.getKvs()).map(List::iterator).orElseGet(Collections::emptyIterator + ); hasMore = txnScanResponse.getStreamMeta().isHasMore(); if (hasMore) { withStart = false; - range = new StoreInstance.Range(txnScanResponse.getEndKey(), range.end, withStart, range.withEnd); + range = new StoreInstance.Range( + txnScanResponse.getEndKey(), range.end, withStart, range.withEnd + ); } } else { - keyValues = Optional.ofNullable(txnScanResponse.getKvs()).map(List::iterator).orElseGet(Collections::emptyIterator); + keyValues = Optional.ofNullable(txnScanResponse.getKvs()) + .map(List::iterator).orElseGet(Collections::emptyIterator); hasMore = false; break; } } } catch (RequestErrorException e) { - if(e.getErrorCode() == 10118) { + if (e.getErrorCode() == 10118) { //ESTREAM_EXPIRED: stream id is expired. this.streamId = null; LogUtils.info(log, "Stream id expired, info:{}", e.getMessage());