Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] refactor connector specific interfaces in mv #34681

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
refactor Traits.maxPartitionRefreshTs
Signed-off-by: Murphy <mofei@starrocks.com>
  • Loading branch information
murphyatwork committed Nov 9, 2023
commit d703cb54c22d9bf8d35534dcb2c89433f588b32d
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.starrocks.common.util.DateUtils;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.connector.ConnectorPartitionTraits;
import com.starrocks.connector.ConnectorTableInfo;
import com.starrocks.connector.PartitionUtil;
import com.starrocks.connector.iceberg.IcebergPartitionUtils;
Expand Down Expand Up @@ -663,41 +664,20 @@ public Optional<Long> maxBaseTableRefreshTimestamp() {
long maxRefreshTimestamp = -1;
for (BaseTableInfo baseTableInfo : baseTableInfos) {
Table baseTable = baseTableInfo.getTable();
if (baseTable instanceof MaterializedView) {

if (baseTable instanceof View) {
continue;
} else if (baseTable instanceof MaterializedView) {
MaterializedView mv = (MaterializedView) baseTable;
if (!mv.isStalenessSatisfied()) {
return Optional.empty();
}
Optional<Long> maxPartitionRefreshTimestamp =
mv.getPartitions().stream().map(Partition::getVisibleVersionTime).max(Long::compareTo);
if (!maxPartitionRefreshTimestamp.isPresent()) {
return Optional.empty();
}
maxRefreshTimestamp = Math.max(maxPartitionRefreshTimestamp.get(), maxRefreshTimestamp);
} else if (baseTable instanceof OlapTable) {
OlapTable olapTable = (OlapTable) baseTable;
Optional<Long> maxPartitionRefreshTimestamp =
olapTable.getPartitions().stream().map(Partition::getVisibleVersionTime).max(Long::compareTo);
if (!maxPartitionRefreshTimestamp.isPresent()) {
return Optional.empty();
}
maxRefreshTimestamp = Math.max(maxPartitionRefreshTimestamp.get(), maxRefreshTimestamp);
} else if (baseTable instanceof HiveTable) {
HiveTable hiveTable = (HiveTable) baseTable;
Map<String, com.starrocks.connector.PartitionInfo> partitionNameWithPartition =
PartitionUtil.getPartitionNameWithPartitionInfo(hiveTable);
Optional<Long> maxPartitionRefreshTimestamp =
partitionNameWithPartition.values().stream().map(com.starrocks.connector.PartitionInfo::getModifiedTime)
.max(Long::compareTo);
if (!maxPartitionRefreshTimestamp.isPresent()) {
return Optional.empty();
}
maxRefreshTimestamp = Math.max(maxPartitionRefreshTimestamp.get(), maxRefreshTimestamp);
} else if (baseTable instanceof View) {
// continue
} else {
}
Optional<Long> baseTableTs = ConnectorPartitionTraits.build(baseTable).maxPartitionRefreshTs();
if (!baseTableTs.isPresent()) {
return Optional.empty();
}
maxRefreshTimestamp = Math.max(maxRefreshTimestamp, baseTableTs.get());
}
return Optional.of(maxRefreshTimestamp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PaimonPartitionKey;
import com.starrocks.catalog.PaimonTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
Expand All @@ -48,6 +49,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -119,12 +121,18 @@ abstract Map<String, Range<PartitionKey>> getPartitionKeyRange(Column partitionC

/**
* Get the list-map with specified partition column and expression
*
* @apiNote it must be a list-partitioned table
*/
abstract Map<String, List<List<String>>> getPartitionList(Column partitionColumn) throws AnalysisException;

abstract Map<String, PartitionInfo> getPartitionNameWithPartitionInfo();

/**
* The max of refresh ts for all partitions
*/
public abstract Optional<Long> maxPartitionRefreshTs();

// ========================================= Implementations ==============================================

abstract static class DefaultTraits extends ConnectorPartitionTraits {
Expand Down Expand Up @@ -202,6 +210,11 @@ protected List<PartitionInfo> getPartitions(List<String> names) {
throw new NotImplementedException("Only support hive/jdbc");
}

@Override
public Optional<Long> maxPartitionRefreshTs() {
throw new NotImplementedException("Not support maxPartitionRefreshTs");
}

}

// ========================================= Specific Implementations ======================================
Expand Down Expand Up @@ -230,6 +243,11 @@ public Map<String, List<List<String>>> getPartitionList(Column partitionColumn)
return ((OlapTable) table).getListPartitionMap();
}

@Override
public Optional<Long> maxPartitionRefreshTs() {
OlapTable olapTable = (OlapTable) table;
return olapTable.getPartitions().stream().map(Partition::getVisibleVersionTime).max(Long::compareTo);
}
}

static class HivePartitionTraits extends DefaultTraits {
Expand All @@ -250,6 +268,16 @@ public List<PartitionInfo> getPartitions(List<String> partitionNames) {
return GlobalStateMgr.getCurrentState().getMetadataMgr().
getPartitions(hiveTable.getCatalogName(), table, partitionNames);
}

@Override
public Optional<Long> maxPartitionRefreshTs() {
Map<String, com.starrocks.connector.PartitionInfo> partitionNameWithPartition =
getPartitionNameWithPartitionInfo();
return
partitionNameWithPartition.values().stream()
.map(com.starrocks.connector.PartitionInfo::getModifiedTime)
.max(Long::compareTo);
}
}

static class HudiPartitionTraits extends DefaultTraits {
Expand Down Expand Up @@ -281,6 +309,12 @@ public String getTableName() {
PartitionKey createEmptyKey() {
return new IcebergPartitionKey();
}

@Override
public Optional<Long> maxPartitionRefreshTs() {
IcebergTable icebergTable = (IcebergTable) table;
return Optional.of(icebergTable.getRefreshSnapshotTime());
}
}

static class PaimonPartitionTraits extends DefaultTraits {
Expand All @@ -294,6 +328,7 @@ public String getDbName() {
PartitionKey createEmptyKey() {
return new PaimonPartitionKey();
}

}

static class JDBCPartitionTraits extends DefaultTraits {
Expand All @@ -318,6 +353,15 @@ public List<PartitionInfo> getPartitions(List<String> partitionNames) {
PartitionKey createEmptyKey() {
return new JDBCPartitionKey();
}

@Override
public Optional<Long> maxPartitionRefreshTs() {
Map<String, com.starrocks.connector.PartitionInfo> partitionNameWithPartition =
getPartitionNameWithPartitionInfo();
return partitionNameWithPartition.values().stream()
.map(com.starrocks.connector.PartitionInfo::getModifiedTime)
.max(Long::compareTo);
}
}

static class DeltaLakePartitionTraits extends DefaultTraits {
Expand Down