Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Fix bug that index name in MaterializedViewMeta is not changed after schema change #3048

Merged
merged 6 commits into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,3 @@ if ! ${CMAKE_CMD} --version; then
exit 1
fi
export CMAKE_CMD

3 changes: 2 additions & 1 deletion fe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ under the License.
<maven.compiler.target>1.8</maven.compiler.target>
<jprotobuf.version>2.2.11</jprotobuf.version>
<skip.plugin>false</skip.plugin>
<fe_ut_parallel>${env.FE_UT_PARALLEL}</fe_ut_parallel>
</properties>

<profiles>
Expand Down Expand Up @@ -608,7 +609,7 @@ under the License.
<version>2.22.2</version>
<configuration>
<!-->set larger, eg, 3, to reduce the time or running FE unit tests<-->
<forkCount>1</forkCount>
<forkCount>${fe_ut_parallel}</forkCount>
<!-->not reuse forked jvm, so that each unit test will run in separate jvm. to avoid singleton confict<-->
<reuseForks>false</reuseForks>
<argLine>
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.doris.analysis.SetOperationStmt.Operation;
import org.apache.doris.analysis.SetOperationStmt.SetOperand;
import org.apache.doris.catalog.AccessPrivilege;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
Expand Down Expand Up @@ -1527,6 +1526,7 @@ recover_stmt ::=
;

opt_agg_type ::=
/* not set */
{: RESULT = null; :}
| KW_SUM
{:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public void processBatchAddRollup(List<AlterClause> alterClauses, Database db, O
* @throws AnalysisException
*/
private RollupJobV2 createMaterializedViewJob(String mvName, String baseIndexName,
List<Column> mvColumns, Map<String, String> properties, OlapTable
List<Column> mvColumns, Map<String, String> properties, OlapTable
olapTable, Database db, long baseIndexId, KeysType mvKeysType)
throws DdlException, AnalysisException {
if (mvKeysType == null) {
Expand Down Expand Up @@ -343,31 +343,35 @@ private RollupJobV2 createMaterializedViewJob(String mvName, String baseIndexNam
* create all rollup indexes. and set state.
* After setting, Tables' state will be ROLLUP
*/
List<Tablet> addedTablets = Lists.newArrayList();
for (Partition partition : olapTable.getPartitions()) {
long partitionId = partition.getId();
TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
// index state is SHADOW
MaterializedIndex mvIndex = new MaterializedIndex(mvIndexId, IndexState.SHADOW);
MaterializedIndex baseIndex = partition.getIndex(baseIndexId);
TabletMeta mvTabletMeta = new TabletMeta(dbId, tableId, partitionId, mvIndexId, mvSchemaHash,
medium);
TabletMeta mvTabletMeta = new TabletMeta(dbId, tableId, partitionId, mvIndexId, mvSchemaHash, medium);
short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partitionId);
for (Tablet baseTablet : baseIndex.getTablets()) {
long baseTabletId = baseTablet.getId();
long mvTabletId = catalog.getNextId();

Tablet newTablet = new Tablet(mvTabletId);
mvIndex.addTablet(newTablet, mvTabletMeta);
addedTablets.add(newTablet);

mvJob.addTabletIdMap(partitionId, mvTabletId, baseTabletId);
List<Replica> baseReplicas = baseTablet.getReplicas();

int healthyReplicaNum = 0;
for (Replica baseReplica : baseReplicas) {
long mvReplicaId = catalog.getNextId();
long backendId = baseReplica.getBackendId();
if (baseReplica.getState() == Replica.ReplicaState.CLONE
|| baseReplica.getState() == Replica.ReplicaState.DECOMMISSION
|| baseReplica.getLastFailedVersion() > 0) {
// just skip it.
LOG.info("base replica {} of tablet {} state is {}, and last failed version is {}, skip creating rollup replica",
baseReplica.getId(), baseTabletId, baseReplica.getState(), baseReplica.getLastFailedVersion());
continue;
}
Preconditions.checkState(baseReplica.getState() == Replica.ReplicaState.NORMAL);
Expand All @@ -377,7 +381,24 @@ private RollupJobV2 createMaterializedViewJob(String mvName, String baseIndexNam
.PARTITION_INIT_VERSION_HASH,
mvSchemaHash);
newTablet.addReplica(mvReplica);
healthyReplicaNum++;
} // end for baseReplica

if (healthyReplicaNum < replicationNum / 2 + 1) {
/*
* TODO(cmy): This is a bad design.
* Because in the rollup job, we will only send tasks to the rollup replicas that have been created,
* without checking whether the quorum of replica number are satisfied.
* This will cause the job to fail until we find that the quorum of replica number
* is not satisfied until the entire job is done.
* So here we check the replica number strictly and do not allow to submit the job
* if the quorum of replica number is not satisfied.
*/
for (Tablet tablet : addedTablets) {
Catalog.getCurrentInvertedIndex().deleteTablet(tablet.getId());
}
throw new DdlException("tablet " + baseTabletId + " has few healthy replica: " + healthyReplicaNum);
}
} // end for baseTablets

mvJob.addMVIndex(partitionId, mvIndex);
Expand All @@ -387,7 +408,6 @@ private RollupJobV2 createMaterializedViewJob(String mvName, String baseIndexNam
} // end for partitions

LOG.info("finished to create materialized view job: {}", mvJob.getJobId());

return mvJob;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ private void createJob(long dbId, OlapTable olapTable, Map<Long, LinkedList<Colu
while (currentSchemaHash == newSchemaHash) {
newSchemaHash = Util.generateSchemaHash();
}
String newIndexName = SHADOW_NAME_PRFIX + currentIndexMeta.getIndexName();
String newIndexName = SHADOW_NAME_PRFIX + olapTable.getIndexNameById(originIndexId);
short newShortKeyColumnCount = indexIdToShortKeyColumnCount.get(originIndexId);
long shadowIndexId = catalog.getNextId();

Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/analysis/ColumnDef.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public ColumnDef(String name, TypeDef typeDef, boolean isKey, AggregateType aggr
public String getDefaultValue() { return defaultValue.value; }
public String getName() { return name; }
public AggregateType getAggregateType() { return aggregateType; }
public void setAggregateType(AggregateType aggregateType, boolean xxx) { this.aggregateType = aggregateType; }
public void setAggregateType(AggregateType aggregateType) { this.aggregateType = aggregateType; }
public boolean isKey() { return isKey; }
public void setIsKey(boolean isKey) { this.isKey = isKey; }
public TypeDef getTypeDef() { return typeDef; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public void analyze(Analyzer analyzer) throws UserException {
type = AggregateType.NONE;
}
for (int i = keysDesc.keysColumnSize(); i < columnDefs.size(); ++i) {
columnDefs.get(i).setAggregateType(type, true);
columnDefs.get(i).setAggregateType(type);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -4121,7 +4121,7 @@ public static void getDdlStmt(Table table, List<String> createTableStmt, List<St
}
MaterializedIndexMeta materializedIndexMeta = entry.getValue();
sb = new StringBuilder();
String indexName = materializedIndexMeta.getIndexName();
String indexName = olapTable.getIndexNameById(entry.getKey());
sb.append("ALTER TABLE ").append(table.getName()).append(" ADD ROLLUP ").append(indexName);
sb.append("(");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
public class MaterializedIndexMeta implements Writable {
@SerializedName(value = "indexId")
private long indexId;
@SerializedName(value = "indexName")
private String indexName;
@SerializedName(value = "schema")
private List<Column> schema = Lists.newArrayList();
@SerializedName(value = "schemaVersion")
Expand All @@ -49,11 +47,9 @@ public class MaterializedIndexMeta implements Writable {
@SerializedName(value = "keysType")
private KeysType keysType;

public MaterializedIndexMeta(long indexId, String indexName, List<Column> schema, int schemaVersion, int
public MaterializedIndexMeta(long indexId, List<Column> schema, int schemaVersion, int
schemaHash, short shortKeyColumnCount, TStorageType storageType, KeysType keysType) {
this.indexId = indexId;
Preconditions.checkState(indexName != null);
this.indexName = indexName;
Preconditions.checkState(schema != null);
Preconditions.checkState(schema.size() != 0);
this.schema = schema;
Expand All @@ -70,10 +66,6 @@ public long getIndexId() {
return indexId;
}

public String getIndexName() {
return indexName;
}

public KeysType getKeysType() {
return keysType;
}
Expand Down Expand Up @@ -111,9 +103,6 @@ public boolean equals(Object obj) {
if (indexMeta.indexId != this.indexId) {
return false;
}
if (!indexMeta.indexName.equals(this.indexName)) {
return false;
}
if (indexMeta.schema.size() != this.schema.size() || !indexMeta.schema.containsAll(this.schema)) {
return false;
}
Expand Down
11 changes: 5 additions & 6 deletions fe/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,10 @@ public boolean hasMaterializedIndex(String indexName) {

public void setIndexMeta(long indexId, String indexName, List<Column> schema, int schemaVersion, int schemaHash,
short shortKeyColumnCount, TStorageType storageType, KeysType keysType) {
// Nullable when meta comes from schema change
// Nullable when meta comes from schema change log replay.
// The replay log only save the index id, so we need to get name by id.
if (indexName == null) {
MaterializedIndexMeta oldIndexMeta = indexIdToMeta.get(indexId);
Preconditions.checkState(oldIndexMeta != null);
indexName = oldIndexMeta.getIndexName();
indexName = getIndexNameById(indexId);
Preconditions.checkState(indexName != null);
}
// Nullable when meta is less then VERSION_74
Expand All @@ -262,7 +261,7 @@ public void setIndexMeta(long indexId, String indexName, List<Column> schema, in
Preconditions.checkState(storageType == TStorageType.COLUMN);
}

MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(indexId, indexName, schema, schemaVersion,
MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(indexId, schema, schemaVersion,
schemaHash, shortKeyColumnCount, storageType, keysType);
indexIdToMeta.put(indexId, indexMeta);
indexNameToId.put(indexName, indexId);
Expand Down Expand Up @@ -885,7 +884,7 @@ public void readFields(DataInput in) throws IOException {
short shortKeyColumnCount = in.readShort();

// The keys type in here is incorrect
MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(indexId, indexName, schema,
MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(indexId, schema,
schemaVersion, schemaHash, shortKeyColumnCount, storageType, KeysType.AGG_KEYS);
tmpIndexMetaList.add(indexMeta);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public ProcResult fetchResult() throws AnalysisException {
builder.append(Joiner.on(", ").join(columnNames)).append(")");

result.addRow(Lists.newArrayList(String.valueOf(indexId),
indexMeta.getIndexName(),
olapTable.getIndexNameById(indexId),
String.valueOf(indexMeta.getSchemaVersion()),
String.valueOf(indexMeta.getSchemaHash()),
String.valueOf(indexMeta.getShortKeyColumnCount()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected void executeWithoutPassword(BaseRequest request, BaseResponse response
for (Map.Entry<Long, MaterializedIndexMeta> entry : olapTbl.getIndexIdToMeta().entrySet()) {
MaterializedIndexMeta indexMeta = entry.getValue();
if (indexMeta.getStorageType() == TStorageType.ROW) {
indexObj.put(indexMeta.getIndexName(), indexMeta.getStorageType().name());
indexObj.put(olapTbl.getIndexNameById(entry.getKey()), indexMeta.getStorageType().name());
}
}
root.put(tbl.getName(), indexObj);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public enum JobState {
NEED_SCHEDULE,
RUNNING,
PAUSED,
STOPPED, CANCELLED;
STOPPED,
CANCELLED;

public boolean isFinalState() {
return this == STOPPED || this == CANCELLED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exc
// set BE id to -1 to release the BE slot
routineLoadTaskInfo.setBeId(-1);
routineLoadManager.getJob(routineLoadTaskInfo.getJobId())
.updateState(JobState.STOPPED,
.updateState(JobState.CANCELLED,
new ErrorReason(InternalErrorCode.META_NOT_FOUND_ERR, "meta not found: " + e.getMessage()),
false);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.analysis;

import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
Expand All @@ -34,8 +33,6 @@
import java.util.ArrayList;
import java.util.List;

import javax.validation.constraints.AssertTrue;

import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
Expand Down Expand Up @@ -195,8 +192,6 @@ public void testSelectClauseWithHavingClause(@Injectable SlotRef slotRef,
result = havingClause;
slotRef.getColumnName();
result = "k1";
selectStmt.getGroupByClause();
result = null;
}
};
CreateMaterializedViewStmt createMaterializedViewStmt = new CreateMaterializedViewStmt("test", selectStmt, null);
Expand Down Expand Up @@ -241,8 +236,6 @@ public void testOrderOfColumn(@Injectable SlotRef slotRef1,
result = "k1";
slotRef2.getColumnName();
result = "k2";
selectStmt.getGroupByClause();
result = null;
}
};
CreateMaterializedViewStmt createMaterializedViewStmt = new CreateMaterializedViewStmt("test", selectStmt, null);
Expand Down Expand Up @@ -297,8 +290,6 @@ public void testOrderByAggregateColumn(@Injectable SlotRef slotRef1,
result = Lists.newArrayList(slotRef2);
functionCallExpr.getChild(0);
result = slotRef2;
selectStmt.getGroupByClause();
result = groupByClause;
}
};
CreateMaterializedViewStmt createMaterializedViewStmt = new CreateMaterializedViewStmt("test", selectStmt, null);
Expand Down Expand Up @@ -396,8 +387,6 @@ public void testOrderByColumnsLessThenGroupByColumns(@Injectable SlotRef slotRef
result = Lists.newArrayList(slotRef1);
functionCallExpr.getChild(0);
result = functionChild0;
selectStmt.getGroupByClause();
result = groupByClause;
}
};
CreateMaterializedViewStmt createMaterializedViewStmt = new CreateMaterializedViewStmt("test", selectStmt, null);
Expand Down Expand Up @@ -474,8 +463,6 @@ public void testMVColumnsWithoutOrderby(@Injectable SlotRef slotRef1,
result = columnName4;
functionChild0.getColumnName();
result = columnName5;
selectStmt.getGroupByClause();
result = groupByClause;
}
};

Expand Down Expand Up @@ -569,7 +556,7 @@ public void testMVColumnsWithoutOrderbyWithoutAggregation(@Injectable SlotRef sl
result = 3;
slotRef4.getType().getStorageLayoutBytes();
result = 4;
selectStmt.getGroupByClause();
selectStmt.getAggInfo(); // return null, so that the mv can be a duplicate mv
result = null;
}
};
Expand Down Expand Up @@ -658,8 +645,6 @@ public void testMVColumns(@Injectable SlotRef slotRef1,
result = columnName2;
functionChild0.getColumnName();
result = columnName3;
selectStmt.getGroupByClause();
result = groupByClause;
}
};

Expand Down Expand Up @@ -714,8 +699,6 @@ public void testDeduplicateMV(@Injectable SlotRef slotRef1,
result = null;
slotRef1.getColumnName();
result = columnName1;
selectStmt.getGroupByClause();
result = groupByClause;
selectStmt.getHavingPred();
result = null;
selectStmt.getLimit();
Expand All @@ -736,3 +719,4 @@ public void testDeduplicateMV(@Injectable SlotRef slotRef1,

}
}

Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,22 @@ public void testSerializeMaterializedIndexMeta() throws IOException {
DataOutputStream out = new DataOutputStream(new FileOutputStream(file));

List<Column> schema = Lists.newArrayList();
Column column = new Column("k1", Type.INT, true, null, true, "1", "");
schema.add(column);
schema.add(new Column("k1", Type.TINYINT, true, null, true, "1", "abc"));
schema.add(new Column("k2", Type.SMALLINT, true, null, true, "1", "debug"));
schema.add(new Column("k3", Type.INT, true, null, true, "1", ""));
schema.add(new Column("k4", Type.BIGINT, true, null, true, "1", "**"));
schema.add(new Column("k5", Type.LARGEINT, true, null, true, null, ""));
schema.add(new Column("k6", Type.DOUBLE, true, null, true, "1.1", ""));
schema.add(new Column("k7", Type.FLOAT, true, null, true, "1", ""));
schema.add(new Column("k8", Type.DATE, true, null, true, "1", ""));
schema.add(new Column("k9", Type.DATETIME, true, null, true, "1", ""));
schema.add(new Column("k10", Type.VARCHAR, true, null, true, "1", ""));
schema.add(new Column("k11", Type.DECIMALV2, true, null, true, "1", ""));
schema.add(new Column("k12", Type.INT, true, null, true, "1", ""));
schema.add(new Column("v1", Type.INT, true, AggregateType.SUM, true, "1", ""));
schema.add(new Column("v1", Type.VARCHAR, true, AggregateType.REPLACE, true, "1", ""));
short shortKeyColumnCount = 1;
MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(1, "test", schema, 1, 1, shortKeyColumnCount,
MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(1, schema, 1, 1, shortKeyColumnCount,
TStorageType.COLUMN, KeysType.DUP_KEYS);
indexMeta.write(out);
out.flush();
Expand Down
Loading