Skip to content

Commit

Permalink
[Enhancement](sql-cache) Use update time of hive to avoid cache miss …
Browse files Browse the repository at this point in the history
…through multi fe nodes. (apache#26424) (apache#26762)

backport apache#26424
  • Loading branch information
morningman authored Nov 10, 2023
1 parent 7fd0195 commit 9a798c4
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
Expand Down Expand Up @@ -82,6 +83,9 @@ public class HMSExternalTable extends ExternalTable {

private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties";
private static final String TBL_PROP_INSERT_ONLY = "insert_only";

private static final String TBL_PROP_TRANSIENT_LAST_DDL_TIME = "transient_lastDdlTime";

private static final String NUM_ROWS = "numRows";

static {
Expand Down Expand Up @@ -112,8 +116,8 @@ public class HMSExternalTable extends ExternalTable {
// No as precise as row count in TableStats, but better than none.
private long estimatedRowCount = -1;

// record the partition update time when enable hms event listener
protected volatile long partitionUpdateTime;
// record the event update time when enable hms event listener
protected volatile long eventUpdateTime;

public enum DLAType {
UNKNOWN, HIVE, HUDI, ICEBERG, DELTALAKE
Expand Down Expand Up @@ -405,6 +409,20 @@ public Set<String> getPartitionNames() {
return new HashSet<>(names);
}

@Override
public List<Column> initSchemaAndUpdateTime() {
org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient()
.getTable(dbName, name);
// try to use transient_lastDdlTime from hms client
schemaUpdateTime = MapUtils.isNotEmpty(table.getParameters())
&& table.getParameters().containsKey(TBL_PROP_TRANSIENT_LAST_DDL_TIME)
? Long.parseLong(table.getParameters().get(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) * 1000
// use current timestamp if lastDdlTime does not exist (hive views don't have this prop)
: System.currentTimeMillis();
return initSchema();
}


@Override
public List<Column> initSchema() {
makeSureInitialized();
Expand Down Expand Up @@ -635,15 +653,15 @@ private void setStatData(Column col, ColumnStatisticsData data, ColumnStatisticB
}
}

public void setPartitionUpdateTime(long updateTime) {
this.partitionUpdateTime = updateTime;
public void setEventUpdateTime(long updateTime) {
this.eventUpdateTime = updateTime;
}

@Override
// get the max value of `schemaUpdateTime` and `partitionUpdateTime`
// partitionUpdateTime will be refreshed after processing partition events with hms event listener enabled
// get the max value of `schemaUpdateTime` and `eventUpdateTime`
// eventUpdateTime will be refreshed after processing events with hms event listener enabled
public long getUpdateTime() {
return Math.max(this.schemaUpdateTime, this.partitionUpdateTime);
return Math.max(this.schemaUpdateTime, this.eventUpdateTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,44 @@ public void replayRefreshExternalDb(ExternalObjectLog log) {
}
}

public void refreshExternalTableFromEvent(String dbName, String tableName, String catalogName,
long updateTime, boolean ignoreIfNotExists) throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
throw new DdlException("No catalog found with name: " + catalogName);
}
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support refresh ExternalCatalog Tables");
}
DatabaseIf db = catalog.getDbNullable(dbName);
if (db == null) {
if (!ignoreIfNotExists) {
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
}
return;
}

TableIf table = db.getTableNullable(tableName);
if (table == null) {
if (!ignoreIfNotExists) {
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
}
return;
}
if (!(table instanceof HMSExternalTable)) {
return;
}
((HMSExternalTable) table).unsetObjectCreated();
((HMSExternalTable) table).setEventUpdateTime(updateTime);
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName);
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setLastUpdateTime(updateTime);
Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
}

public void refreshExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfNotExists)
throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
Expand Down Expand Up @@ -705,6 +743,9 @@ public void replayRefreshExternalTable(ExternalObjectLog log) {
table.unsetObjectCreated();
Env.getCurrentEnv().getExtMetaCacheMgr()
.invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
if (table instanceof HMSExternalTable && log.getLastUpdateTime() > 0) {
((HMSExternalTable) table).setEventUpdateTime(log.getLastUpdateTime());
}
}

public void dropExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfExists)
Expand Down Expand Up @@ -924,8 +965,8 @@ public void replayCreateExternalDatabase(ExternalObjectLog log) {
}
}

public void addExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames,
boolean ignoreIfNotExists)
public void addExternalPartitions(String catalogName, String dbName, String tableName,
List<String> partitionNames, long updateTime, boolean ignoreIfNotExists)
throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
Expand Down Expand Up @@ -956,14 +997,13 @@ public void addExternalPartitions(String catalogName, String dbName, String tabl

HMSExternalTable hmsTable = (HMSExternalTable) table;
Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), hmsTable, partitionNames);
long lastPartitionUpdateTime = System.currentTimeMillis();
hmsTable.setPartitionUpdateTime(lastPartitionUpdateTime);
hmsTable.setEventUpdateTime(updateTime);
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setPartitionNames(partitionNames);
log.setLastUpdateTime(lastPartitionUpdateTime);
log.setLastUpdateTime(updateTime);
Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log);
}

Expand Down Expand Up @@ -994,16 +1034,16 @@ public void replayAddExternalPartitions(ExternalObjectLog log) {
try {
Env.getCurrentEnv().getExtMetaCacheMgr()
.addPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames());
hmsTable.setPartitionUpdateTime(log.getLastUpdateTime());
hmsTable.setEventUpdateTime(log.getLastUpdateTime());
} catch (HMSClientException e) {
LOG.warn("Network problem occurs or hms table has been deleted, fallback to invalidate table cache", e);
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(),
db.getFullName(), table.getName());
}
}

public void dropExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames,
boolean ignoreIfNotExists)
public void dropExternalPartitions(String catalogName, String dbName, String tableName,
List<String> partitionNames, long updateTime, boolean ignoreIfNotExists)
throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
Expand Down Expand Up @@ -1033,7 +1073,7 @@ public void dropExternalPartitions(String catalogName, String dbName, String tab
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setPartitionNames(partitionNames);
log.setLastUpdateTime(System.currentTimeMillis());
log.setLastUpdateTime(updateTime);
replayDropExternalPartitions(log);
Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log);
}
Expand Down Expand Up @@ -1063,11 +1103,11 @@ public void replayDropExternalPartitions(ExternalObjectLog log) {
HMSExternalTable hmsTable = (HMSExternalTable) table;
Env.getCurrentEnv().getExtMetaCacheMgr()
.dropPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames());
hmsTable.setPartitionUpdateTime(log.getLastUpdateTime());
hmsTable.setEventUpdateTime(log.getLastUpdateTime());
}

public void refreshExternalPartitions(String catalogName, String dbName, String tableName,
List<String> partitionNames, boolean ignoreIfNotExists)
List<String> partitionNames, long updateTime, boolean ignoreIfNotExists)
throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
Expand Down Expand Up @@ -1100,7 +1140,7 @@ public void refreshExternalPartitions(String catalogName, String dbName, String
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setPartitionNames(partitionNames);
log.setLastUpdateTime(System.currentTimeMillis());
log.setLastUpdateTime(updateTime);
replayRefreshExternalPartitions(log);
Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log);
}
Expand Down Expand Up @@ -1130,7 +1170,7 @@ public void replayRefreshExternalPartitions(ExternalObjectLog log) {
Env.getCurrentEnv().getExtMetaCacheMgr()
.invalidatePartitionsCache(catalog.getId(), db.getFullName(), table.getName(),
log.getPartitionNames());
((HMSExternalTable) table).setPartitionUpdateTime(log.getLastUpdateTime());
((HMSExternalTable) table).setEventUpdateTime(log.getLastUpdateTime());
}

public void registerCatalogRefreshListener(Env env) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected void process() throws MetastoreNotificationException {
return;
}
Env.getCurrentEnv().getCatalogMgr()
.addExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames, true);
.addExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames, eventTime, true);
} catch (DdlException e) {
throw new MetastoreNotificationException(
debugString("Failed to process event"), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ protected void process() throws MetastoreNotificationException {
if (isRename) {
Env.getCurrentEnv().getCatalogMgr()
.dropExternalPartitions(catalogName, dbName, tblName,
Lists.newArrayList(partitionNameBefore), true);
Lists.newArrayList(partitionNameBefore), eventTime, true);
Env.getCurrentEnv().getCatalogMgr()
.addExternalPartitions(catalogName, dbName, tblName,
Lists.newArrayList(partitionNameAfter), true);
Lists.newArrayList(partitionNameAfter), eventTime, true);
} else {
Env.getCurrentEnv().getCatalogMgr()
.refreshExternalPartitions(catalogName, dbName, hmsTbl.getTableName(),
Lists.newArrayList(partitionNameAfter), true);
Lists.newArrayList(partitionNameAfter), eventTime, true);
}
} catch (DdlException e) {
throw new MetastoreNotificationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ protected void process() throws MetastoreNotificationException {
}
//The scope of refresh can be narrowed in the future
Env.getCurrentEnv().getCatalogMgr()
.refreshExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true);
.refreshExternalTableFromEvent(tableBefore.getDbName(), tableBefore.getTableName(),
catalogName, eventTime, true);
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Failed to process event"), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ protected void process() throws MetastoreNotificationException {
return;
}
Env.getCurrentEnv().getCatalogMgr()
.dropExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames, true);
.dropExternalPartitions(catalogName, dbName, hmsTbl.getTableName(),
partitionNames, eventTime, true);
} catch (DdlException e) {
throw new MetastoreNotificationException(
debugString("Failed to process event"), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ protected void process() throws MetastoreNotificationException {
* the file cache of this table,
* but <a href="https://github.com/apache/doris/pull/17932">this PR</a> has fixed it.
*/
Env.getCurrentEnv().getCatalogMgr().refreshExternalTable(dbName, tblName, catalogName, true);
Env.getCurrentEnv().getCatalogMgr().refreshExternalTableFromEvent(dbName, tblName,
catalogName, eventTime, true);
} catch (DdlException e) {
throw new MetastoreNotificationException(
debugString("Failed to process event"), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ public abstract class MetastoreEvent {
protected final String tblName;

// eventId of the event. Used instead of calling getter on event everytime
private final long eventId;
protected final long eventId;

// eventTime of the event. Used instead of calling getter on event everytime
protected final long eventTime;

// eventType from the NotificationEvent
private final MetastoreEventType eventType;
protected final MetastoreEventType eventType;

// Actual notificationEvent object received from Metastore
protected final NotificationEvent metastoreNotificationEvent;
Expand All @@ -61,6 +64,7 @@ public abstract class MetastoreEvent {
protected MetastoreEvent(long eventId, String catalogName, String dbName,
String tblName, MetastoreEventType eventType) {
this.eventId = eventId;
this.eventTime = -1L;
this.catalogName = catalogName;
this.dbName = dbName;
this.tblName = tblName;
Expand All @@ -74,6 +78,7 @@ protected MetastoreEvent(NotificationEvent event, String catalogName) {
this.dbName = event.getDbName();
this.tblName = event.getTableName();
this.eventId = event.getEventId();
this.eventTime = event.getEventTime() * 1000L;
this.eventType = MetastoreEventType.from(event.getEventType());
this.metastoreNotificationEvent = event;
this.catalogName = catalogName;
Expand Down Expand Up @@ -163,8 +168,8 @@ protected String debugString(String msgFormatString, Object... args) {
*/
private Object[] getLogFormatArgs(Object[] args) {
Object[] formatArgs = new Object[args.length + 2];
formatArgs[0] = getEventId();
formatArgs[1] = getEventType();
formatArgs[0] = eventId;
formatArgs[1] = eventType;
int i = 2;
for (Object arg : args) {
formatArgs[i] = arg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ private void realRun() {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (catalog instanceof HMSExternalCatalog) {
HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) catalog;
List<NotificationEvent> events = Collections.emptyList();
try {
events = getNextHMSEvents(hmsExternalCatalog);
List<NotificationEvent> events = getNextHMSEvents(hmsExternalCatalog);
if (!events.isEmpty()) {
LOG.info("Events size are {} on catalog [{}]", events.size(),
hmsExternalCatalog.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ private void init(HMSExternalCatalog hmsCatalog) {

Deencapsulation.setField(tbl, "objectCreated", true);
Deencapsulation.setField(tbl, "rwLock", new ReentrantReadWriteLock(true));
Deencapsulation.setField(tbl, "schemaUpdateTime", NOW);
Deencapsulation.setField(tbl, "eventUpdateTime", 0);
new Expectations(tbl) {
{
tbl.getId();
Expand Down Expand Up @@ -154,15 +156,16 @@ private void init(HMSExternalCatalog hmsCatalog) {
minTimes = 0;
result = DLAType.HIVE;

tbl.getUpdateTime();
// mock initSchemaAndUpdateTime and do nothing
tbl.initSchemaAndUpdateTime();
minTimes = 0;
result = NOW;
}
};

Deencapsulation.setField(tbl2, "objectCreated", true);
Deencapsulation.setField(tbl2, "rwLock", new ReentrantReadWriteLock(true));

Deencapsulation.setField(tbl2, "schemaUpdateTime", NOW);
Deencapsulation.setField(tbl2, "eventUpdateTime", 0);
new Expectations(tbl2) {
{
tbl2.getId();
Expand Down Expand Up @@ -197,8 +200,8 @@ private void init(HMSExternalCatalog hmsCatalog) {
minTimes = 0;
result = DLAType.HIVE;

// mock init schema and do nothing
tbl2.initSchema();
// mock initSchemaAndUpdateTime and do nothing
tbl2.initSchemaAndUpdateTime();
minTimes = 0;
}
};
Expand Down Expand Up @@ -383,7 +386,7 @@ public void testHitSqlCacheAfterPartitionUpdateTimeChanged() throws Exception {
// do nothing
}
long later = System.currentTimeMillis();
tbl2.setPartitionUpdateTime(later);
tbl2.setEventUpdateTime(later);

// check cache mode again
ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2);
Expand Down Expand Up @@ -431,7 +434,7 @@ public void testHitSqlCacheByNereidsAfterPartitionUpdateTimeChanged() {
// do nothing
}
long later = System.currentTimeMillis();
tbl2.setPartitionUpdateTime(later);
tbl2.setEventUpdateTime(later);

// check cache mode again
ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2);
Expand Down

0 comments on commit 9a798c4

Please sign in to comment.