Skip to content

Commit

Permalink
[Feature] Support list partition (StarRocks#17037)
Browse files Browse the repository at this point in the history
* [Feature] Support list partition

Signed-off-by: Astralidea <astralidea@163.com>
  • Loading branch information
Astralidea authored Jan 31, 2023
1 parent 7c42127 commit 5e4bd2b
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 86 deletions.
30 changes: 26 additions & 4 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,14 @@ Status OlapTablePartitionParam::init(RuntimeState* state) {
}

if (t_part.__isset.end_keys) {
RETURN_IF_ERROR_WITH_WARN(_create_partition_keys(t_part.end_keys, &part->end_key), "end keys");
RETURN_IF_ERROR_WITH_WARN(_create_partition_keys(t_part.end_keys, &part->end_key), "end_keys");
}

if (t_part.__isset.in_keys) {
part->in_keys.resize(t_part.in_keys.size());
for (int i = 0; i < t_part.in_keys.size(); i++) {
RETURN_IF_ERROR_WITH_WARN(_create_partition_keys(t_part.in_keys[i], &part->in_keys[i]), "in_keys");
}
}

part->num_buckets = t_part.num_buckets;
Expand Down Expand Up @@ -192,7 +199,13 @@ Status OlapTablePartitionParam::init(RuntimeState* state) {
}
}
_partitions.emplace_back(part);
_partitions_map.emplace(&part->end_key, part);
if (t_part.__isset.in_keys) {
for (auto& in_key : part->in_keys) {
_partitions_map.emplace(&in_key, part);
}
} else {
_partitions_map.emplace(&part->end_key, part);
}
}

return Status::OK();
Expand Down Expand Up @@ -277,6 +290,14 @@ Status OlapTablePartitionParam::_create_partition_keys(const std::vector<TExprNo
column->get_data().emplace_back(val);
break;
}
case TYPE_VARCHAR: {
int len = t_expr.string_literal.value.size();
const char* str_val = t_expr.string_literal.value.c_str();
Slice value(str_val, len);
auto* column = down_cast<BinaryColumn*>(_partition_columns[i].get());
column->append(value);
break;
}
default: {
std::stringstream ss;
ss << "unsupported partition column node type, type=" << t_expr.node_type;
Expand Down Expand Up @@ -315,17 +336,18 @@ Status OlapTablePartitionParam::find_tablets(Chunk* chunk, std::vector<OlapTable
ChunkRow row;
row.columns = &partition_columns;
row.index = 0;
bool is_list_partition = _t_param.partitions[0].__isset.in_keys;
for (size_t i = 0; i < num_rows; ++i) {
if ((*selection)[i]) {
row.index = i;
auto it = _partitions_map.upper_bound(&row);
auto it = is_list_partition ? _partitions_map.find(&row) : _partitions_map.upper_bound(&row);
if (UNLIKELY(it == _partitions_map.end())) {
(*partitions)[i] = nullptr;
(*selection)[i] = 0;
if (invalid_row_index != nullptr) {
*invalid_row_index = i;
}
} else if (LIKELY(_part_contains(it->second, &row))) {
} else if (LIKELY(is_list_partition || _part_contains(it->second, &row))) {
(*partitions)[i] = it->second;
(*indexes)[i] = (*indexes)[i] % it->second->num_buckets;
} else {
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ struct OlapTablePartition {
int64_t id = 0;
ChunkRow start_key;
ChunkRow end_key;
std::vector<ChunkRow> in_keys;
int64_t num_buckets = 0;
std::vector<OlapTableIndexTablets> indexes;
};
Expand Down
80 changes: 77 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/catalog/CatalogUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -96,14 +97,83 @@ public static void checkIsLakeTable(String dbName, String tableName) throws Anal
}
}

public static void checkPartitionValuesExistForAddListPartition(OlapTable olapTable, PartitionDesc partitionDesc)
public static void checkPartitionValuesExistForReplaceListPartition(ListPartitionInfo listPartitionInfo,
Partition partition)
throws DdlException {
try {
List<Long> partitionIds = listPartitionInfo.getPartitionIds(false);
Map<Long, List<LiteralExpr>> literalExprValues = listPartitionInfo.getLiteralExprValues();
Map<Long, List<List<LiteralExpr>>> multiLiteralExprValues = listPartitionInfo.getMultiLiteralExprValues();

List<LiteralExpr> literalExprs = listPartitionInfo.getLiteralExprValues().get(partition.getId());
List<List<LiteralExpr>> multiLiteral = listPartitionInfo.getMultiLiteralExprValues().get(partition.getId());
if (!literalExprValues.isEmpty()) {
listPartitionInfo.setBatchLiteralExprValues(listPartitionInfo.getIdToValues());
List<LiteralExpr> allLiteralExprValues = Lists.newArrayList();
literalExprValues.forEach((k, v) -> {
if (partitionIds.contains(k)) {
allLiteralExprValues.addAll(v);
}
});
if (literalExprs != null) {
for (LiteralExpr item : literalExprs) {
for (LiteralExpr value : allLiteralExprValues) {
if (item.getStringValue().equals(value.getStringValue())) {
throw new DdlException("Duplicate partition value %s");
}
}
}
}
} else if (!multiLiteralExprValues.isEmpty()) {
listPartitionInfo.setBatchMultiLiteralExprValues(listPartitionInfo.getIdToMultiValues());
List<List<LiteralExpr>> allMultiLiteralExprValues = Lists.newArrayList();
listPartitionInfo.getMultiLiteralExprValues().forEach((k, v) -> {
if (partitionIds.contains(k)) {
allMultiLiteralExprValues.addAll(v);
}
});

int partitionColSize = listPartitionInfo.getPartitionColumns().size();
for (List<LiteralExpr> itemExpr : multiLiteral) {
for (List<LiteralExpr> valueExpr : allMultiLiteralExprValues) {
int duplicatedSize = 0;
for (int i = 0; i < itemExpr.size(); i++) {
String itemValue = itemExpr.get(i).getStringValue();
String value = valueExpr.get(i).getStringValue();
if (value.equals(itemValue)) {
duplicatedSize++;
}
}
if (duplicatedSize == partitionColSize) {
List<String> msg = itemExpr.stream()
.map(value -> ("\"" + value.getStringValue() + "\""))
.collect(Collectors.toList());
throw new DdlException("Duplicate values " +
"(" + String.join(",", msg) + ") ");
}
}
}
}
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
}

public static void checkPartitionValuesExistForAddListPartition(OlapTable olapTable, PartitionDesc partitionDesc,
boolean isTemp)
throws DdlException {
try {
ListPartitionInfo listPartitionInfo = (ListPartitionInfo) olapTable.getPartitionInfo();
List<Long> partitionIds = listPartitionInfo.getPartitionIds(isTemp);

if (partitionDesc instanceof SingleItemListPartitionDesc) {
listPartitionInfo.setBatchLiteralExprValues(listPartitionInfo.getIdToValues());
List<LiteralExpr> allLiteralExprValues = Lists.newArrayList();
listPartitionInfo.getLiteralExprValues().forEach((k, v) -> allLiteralExprValues.addAll(v));
listPartitionInfo.getLiteralExprValues().forEach((k, v) -> {
if (partitionIds.contains(k)) {
allLiteralExprValues.addAll(v);
}
});

SingleItemListPartitionDesc singleItemListPartitionDesc = (SingleItemListPartitionDesc) partitionDesc;
for (LiteralExpr item : singleItemListPartitionDesc.getLiteralExprValues()) {
Expand All @@ -116,7 +186,11 @@ public static void checkPartitionValuesExistForAddListPartition(OlapTable olapTa
} else if (partitionDesc instanceof MultiItemListPartitionDesc) {
listPartitionInfo.setBatchMultiLiteralExprValues(listPartitionInfo.getIdToMultiValues());
List<List<LiteralExpr>> allMultiLiteralExprValues = Lists.newArrayList();
listPartitionInfo.getMultiLiteralExprValues().forEach((k, v) -> allMultiLiteralExprValues.addAll(v));
listPartitionInfo.getMultiLiteralExprValues().forEach((k, v) -> {
if (partitionIds.contains(k)) {
allMultiLiteralExprValues.addAll(v);
}
});

int partitionColSize = listPartitionInfo.getPartitionColumns().size();
MultiItemListPartitionDesc multiItemListPartitionDesc = (MultiItemListPartitionDesc) partitionDesc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.starrocks.catalog;

import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import com.starrocks.analysis.LiteralExpr;
import com.starrocks.common.AnalysisException;
Expand Down Expand Up @@ -88,6 +89,10 @@ public void setValues(long partitionId, List<String> values) {
this.idToValues.put(partitionId, values);
}

public void setIdToIsTempPartition(long partitionId, boolean isTemp) {
this.idToIsTempPartition.put(partitionId, isTemp);
}

public void setLiteralExprValues(long partitionId, List<String> values) throws AnalysisException {
List<LiteralExpr> partitionValues = new ArrayList<>(values.size());
for (String value : values) {
Expand All @@ -99,6 +104,16 @@ public void setLiteralExprValues(long partitionId, List<String> values) throws A
this.idToLiteralExprValues.put(partitionId, partitionValues);
}

public List<Long> getPartitionIds(boolean isTemp) {
List<Long> partitionIds = Lists.newArrayList();
idToIsTempPartition.forEach((k, v) -> {
if (v.equals(isTemp)) {
partitionIds.add(k);
}
});
return partitionIds;
}

public void setBatchLiteralExprValues(Map<Long, List<String>> batchValues) throws AnalysisException {
for (Map.Entry<Long, List<String>> entry : batchValues.entrySet()) {
long partitionId = entry.getKey();
Expand Down Expand Up @@ -346,4 +361,20 @@ public void unprotectHandleNewPartitionDesc(ListPartitionPersistInfo partitionPe
this.setLiteralExprValues(partitionId, values);
}
}

@Override
public void dropPartition(long partitionId) {
super.dropPartition(partitionId);
idToValues.remove(partitionId);
idToLiteralExprValues.remove(partitionId);
idToMultiValues.remove(partitionId);
idToMultiLiteralExprValues.remove(partitionId);
idToIsTempPartition.remove(partitionId);
}

@Override
public void moveRangeFromTempToFormal(long tempPartitionId) {
super.moveRangeFromTempToFormal(tempPartitionId);
idToIsTempPartition.computeIfPresent(tempPartitionId, (k, v) -> false);
}
}
Loading

0 comments on commit 5e4bd2b

Please sign in to comment.