Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Colocate plan][Step1] Colocate join covers more situations #5521

Merged
merged 6 commits into from
Apr 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.HashMap;

/**
* Root of the expr node hierarchy.
Expand Down Expand Up @@ -1334,7 +1334,34 @@ public Type castBinaryOp(Type compatibleType) throws AnalysisException {
@Override
public String toString() {
return MoreObjects.toStringHelper(this.getClass()).add("id", id).add("type", type).add("sel",
selectivity).add("#distinct", numDistinctValues).add("scale", outputScale).toString();
selectivity).add("#distinct", numDistinctValues).add("scale", outputScale).toString();
}

/**
* This method is mainly used to find the original column corresponding to the current expr.
* Find the initial slotRef from the current slot ref.
*
* If the initial expr is not a slotRef, it returns null directly.
* If the current slotRef comes from another expression transformation,
* rather than directly from another slotRef, null will also be returned.
*/
public SlotRef getSrcSlotRef() {
SlotRef unwrapSloRef = this.unwrapSlotRef();
if (unwrapSloRef == null) {
return null;
}
SlotDescriptor slotDescriptor = unwrapSloRef.getDesc();
if (slotDescriptor == null) {
return null;
}
List<Expr> sourceExpr = slotDescriptor.getSourceExprs();
if (sourceExpr == null || sourceExpr.isEmpty()) {
return unwrapSloRef;
}
if (sourceExpr.size() > 1) {
return null;
}
return sourceExpr.get(0).getSrcSlotRef();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.doris.alter.AlterOpType;
import org.apache.doris.catalog.TableProperty;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
Expand Down Expand Up @@ -49,9 +48,6 @@ public void analyze(Analyzer analyzer) throws AnalysisException {
}

if (properties.containsKey(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH)) {
if (Config.disable_colocate_join) {
throw new AnalysisException("Colocate table is disabled by Admin");
}
this.needTableStable = false;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_TYPE)) {
if (!properties.get(PropertyAnalyzer.PROPERTIES_STORAGE_TYPE).equalsIgnoreCase("column")) {
Expand Down
28 changes: 12 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Replica.ReplicaStatus;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.clone.ColocateTableBalancer;
import org.apache.doris.clone.ColocateTableCheckerAndBalancer;
import org.apache.doris.clone.DynamicPartitionScheduler;
import org.apache.doris.clone.TabletChecker;
import org.apache.doris.clone.TabletScheduler;
Expand Down Expand Up @@ -217,11 +217,6 @@
import org.apache.doris.transaction.PublishVersionDaemon;
import org.apache.doris.transaction.UpdateDbUsedDataQuotaDaemon;

import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand All @@ -232,9 +227,10 @@
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;

import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
Expand Down Expand Up @@ -264,6 +260,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
import org.codehaus.jackson.map.ObjectMapper;

public class Catalog {
private static final Logger LOG = LogManager.getLogger(Catalog.class);
// 0 ~ 9999 used for qe
Expand Down Expand Up @@ -1263,10 +1264,8 @@ private void startMasterOnlyDaemonThreads() {
// Tablet checker and scheduler
tabletChecker.start();
tabletScheduler.start();
// Colocate tables balancer
if (!Config.disable_colocate_join) {
ColocateTableBalancer.getInstance().start();
}
// Colocate tables checker and balancer
ColocateTableCheckerAndBalancer.getInstance().start();
// Publish Version Daemon
publishVersionDaemon.start();
// Start txn cleaner
Expand Down Expand Up @@ -3645,9 +3644,6 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws DdlExcept
try {
String colocateGroup = PropertyAnalyzer.analyzeColocate(properties);
if (colocateGroup != null) {
if (Config.disable_colocate_join) {
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_FEATURE_DISABLED);
}
String fullGroupName = db.getId() + "_" + colocateGroup;
ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullGroupName);
if (groupSchema != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,23 @@
/**
* ColocateTableBalancer is responsible for tablets' repair and balance of colocated tables.
*/
public class ColocateTableBalancer extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(ColocateTableBalancer.class);
public class ColocateTableCheckerAndBalancer extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(ColocateTableCheckerAndBalancer.class);

private static final long CHECK_INTERVAL_MS = 20 * 1000L; // 20 second

private ColocateTableBalancer(long intervalMs) {
private ColocateTableCheckerAndBalancer(long intervalMs) {
super("colocate group clone checker", intervalMs);
}

private static volatile ColocateTableBalancer INSTANCE = null;
public static ColocateTableBalancer getInstance() {
private static volatile ColocateTableCheckerAndBalancer INSTANCE = null;
public static ColocateTableCheckerAndBalancer getInstance() {
if (INSTANCE == null) {
synchronized (ColocateTableBalancer.class) {
synchronized (ColocateTableCheckerAndBalancer.class) {
if (INSTANCE == null) {
INSTANCE = new ColocateTableBalancer(CHECK_INTERVAL_MS);
INSTANCE = new ColocateTableCheckerAndBalancer(CHECK_INTERVAL_MS);
}
}
}
} }
return INSTANCE;
}

Expand Down
20 changes: 9 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -781,10 +781,16 @@ public class Config extends ConfigBase {
public static int query_colocate_join_memory_limit_penalty_factor = 1;

/**
* Deprecated after 0.10
* This configs can set to true to disable the automatic colocate tables's relocate and balance.
* If 'disable_colocate_balance' is set to true,
* ColocateTableBalancer will not relocate and balance colocate tables.
* Attention:
* Under normal circumstances, there is no need to turn off balance at all.
* Because once the balance is turned off, the unstable colocate table may not be restored
* Eventually the colocate plan cannot be used when querying.
*/
@ConfField
public static boolean disable_colocate_join = false;
@ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_balance = false;

/**
* The default user resource publishing timeout.
*/
Expand Down Expand Up @@ -1093,14 +1099,6 @@ public class Config extends ConfigBase {
* Save small files
*/
@ConfField public static String small_file_dir = PaloFe.DORIS_HOME_DIR + "/small_files";

/**
* The following 2 configs can set to true to disable the automatic colocate tables's relocate and balance.
* if 'disable_colocate_relocate' is set to true, ColocateTableBalancer will not relocate colocate tables when Backend unavailable.
* if 'disable_colocate_balance' is set to true, ColocateTableBalancer will not balance colocate tables.
*/
@ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_relocate = false;
@ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_balance = false;

/**
* If set to true, the insert stmt with processing error will still return a label to user.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.planner;

public class DistributedPlanColocateRule {
public static final String SESSION_DISABLED = "Session disabled";
public static final String HAS_JOIN_HINT = "Has join hint";
public static final String TRANSFORMED_SRC_COLUMN = "Src column hash been transformed by expr";
public static final String REDISTRIBUTED_SRC_DATA = "The src data has been redistributed";
public static final String SUPPORT_ONLY_OLAP_TABLE = "Only olap table support colocate plan";
public static final String TABLE_NOT_IN_THE_SAME_GROUP = "Tables are not in the same group";
public static final String COLOCATE_GROUP_IS_NOT_STABLE = "Colocate group is not stable";
public static final String INCONSISTENT_DISTRIBUTION_OF_TABLE_AND_QUERY = "Inconsistent distribution of table and querie";
}
Loading