Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata.ReplicationType;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
import org.apache.hadoop.hive.ql.plan.api.StageType;
Expand Down Expand Up @@ -342,8 +341,10 @@ else if (work.isBootstrap()) {
} else {
LOG.info("Previous Dump is not yet loaded. Skipping this iteration.");
}
ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, null, conf,
work.dbNameOrPattern, work.isBootstrap() ? ReplicationType.BOOTSTRAP: ReplicationType.INCREMENTAL);
// Saving the executionId of last successful dump to be used in UI
DumpMetaData lastWrittenDmd = new DumpMetaData(previousValidHiveDumpPath, conf);
ReplUtils.reportStatusInReplicationMetricsWithLastExecutionId(
getName(), Status.SKIPPED, lastWrittenDmd.getDumpExecutionId(), null, conf);
}
}
} catch (RuntimeException e) {
Expand Down Expand Up @@ -940,6 +941,12 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
long estimatedNumEvents = evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo,
maxEventLimit);
try {
// Persisting the dump metadata with the execution ID early allows concurrent LOAD operations to associate
// with this DUMP cycle. This file will be overwritten with the final metadata in the finally block.
long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
dmd.setDump(DumpType.INCREMENTAL, null, null, cmRoot, executionId, false);
dmd.write(true);

IncrementalDumpLogger replLogger =
new IncrementalDumpLogger(dbName, dumpRoot.toString(), estimatedNumEvents, work.eventFrom, work.eventTo,
maxEventLimit);
Expand Down Expand Up @@ -1364,6 +1371,12 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, conf);
FileList snapPathFileList = isSnapshotEnabled ? createTableFileList(
SnapshotUtils.getSnapshotFileListPath(dumpRoot), EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT, conf) : null) {
// Persisting the dump metadata with the execution ID early allows concurrent LOAD operations to associate
// with this DUMP cycle. This file will be overwritten with the final metadata in the finally block.
long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
dmd.setDump(DumpType.BOOTSTRAP, null, null, cmRoot, executionId, false);
dmd.write(true);

ExportService exportService = new ExportService(conf);
for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
LOG.debug("Dumping db: " + dbName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,16 @@ public static void reportStatusInReplicationMetrics(String stageName, Status sta
metricCollector.reportStageEnd(stageName, status, errorLogPath);
}

public static void reportStatusInReplicationMetricsWithLastExecutionId(String stageName, Status status,
long lastDumpId, String errorLogPath,
HiveConf conf)
throws SemanticException {
ReplicationMetricCollector metricCollector =
new ReplicationMetricCollector(null, null, null, 0, conf) {};
metricCollector.reportStageStart(stageName, new HashMap<>());
metricCollector.reportStageEndWithLastExecutionId(stageName, status, errorLogPath, lastDumpId);
}

public static boolean isErrorRecoverable(Throwable e) {
int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
return errorCode > ErrorMsg.GENERIC_ERROR.getErrorCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,60 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException {
dmd.setOptimizedBootstrapToDumpMetadataFile(conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L));
}
} else {
ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", Status.SKIPPED, null, conf, sourceDbNameOrPattern, null);
LOG.warn("No dump to load or the previous dump already loaded");
handleSkippedLoad(latestDumpPath);
}
} catch (Exception e) {
// TODO : simple wrap & rethrow for now, clean up with error codes
throw new SemanticException(e.getMessage(), e);
}
}

private void handleSkippedLoad(Path latestDumpPath) throws SemanticException {
Long executionId = extractExecutionIdFromDump(latestDumpPath);
reportSkippedLoadMetrics(executionId);
LOG.warn("No dump to load or the previous dump already loaded");
}

private Long extractExecutionIdFromDump(Path latestDumpPath) {
if (latestDumpPath == null) {
return null;
}

try {
Path metadataPath = new Path(latestDumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
FileSystem fs = metadataPath.getFileSystem(conf);

if (fs.exists(metadataPath) && !fs.exists(new Path(metadataPath, LOAD_ACKNOWLEDGEMENT.toString()))) {
DumpMetaData lastWrittenDmd = new DumpMetaData(metadataPath, conf);
Long executionId = lastWrittenDmd.getDumpExecutionId();
LOG.debug("Retrieved execution ID {} from latest dump path", executionId);
return executionId;
} else {
LOG.debug("Metadata path does not exist: {}", metadataPath);
}
} catch (Exception e) {
LOG.warn("Unable to retrieve execution ID from dump metadata at {}: {}",
latestDumpPath, e.getMessage());
LOG.debug("Full exception:", e);
}

return null;
}

private void reportSkippedLoadMetrics(Long executionId) throws SemanticException {
try {
if (executionId != null) {
ReplUtils.reportStatusInReplicationMetricsWithLastExecutionId(
"REPL_LOAD", Status.SKIPPED, executionId, null, conf);
} else {
ReplUtils.reportStatusInReplicationMetrics(
"REPL_LOAD", Status.SKIPPED, null, conf, sourceDbNameOrPattern, null);
}
} catch (SemanticException e) {
throw new SemanticException(e.getMessage(), e);
}
}

private ReplicationMetricCollector initReplicationLoadMetricCollector(String dumpDirectory, String dbNameToLoadIn,
DumpMetaData dmd) throws SemanticException {
ReplicationMetricCollector collector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public abstract class ReplicationMetricCollector {
private static boolean enableForTests;
private static long scheduledExecutionIdForTests = 0L;
private HiveConf conf;
private static final String STAGE_ENDED_LOG_FORMAT = "Stage Ended {}, {}";

public void setMetricsMBean(ObjectName metricsMBean) {
this.metricsMBean = metricsMBean;
Expand Down Expand Up @@ -156,7 +157,7 @@ public void reportStageEnd(String stageName, Status status, long lastReplId,
SnapshotUtils.ReplSnapshotCount replSnapshotCount, ReplStatsTracker replStatsTracker) throws SemanticException {
unRegisterMBeanSafe();
if (isEnabled) {
LOG.debug("Stage ended {}, {}, {}", stageName, status, lastReplId );
LOG.debug("Stage ended {}, {}, {}", stageName, status, lastReplId);
Progress progress = replicationMetric.getProgress();
Stage stage = progress.getStageByName(stageName);
if(stage == null){
Expand All @@ -183,10 +184,35 @@ public void reportStageEnd(String stageName, Status status, long lastReplId,
}
}

public void reportStageEndWithLastExecutionId(String stageName, Status status, String errorLogPath, long lastDumpId)
throws SemanticException {
unRegisterMBeanSafe();
if (isEnabled) {
LOG.debug(STAGE_ENDED_LOG_FORMAT, stageName, status);
Progress progress = replicationMetric.getProgress();
Stage stage = progress.getStageByName(stageName);
if(stage == null){
stage = new Stage(stageName, status, -1L);
}
stage.setStatus(status);
stage.setEndTime(getCurrentTimeInMillis());
stage.setLastSuccessfulDumpId(lastDumpId);
if (errorLogPath != null) {
stage.setErrorLogPath(errorLogPath);
}
progress.addStage(stage);
replicationMetric.setProgress(progress);
metricCollector.addMetric(replicationMetric);
if (Status.FAILED == status || Status.FAILED_ADMIN == status || Status.SKIPPED == status) {
reportEnd(status);
}
}
}

public void reportStageEnd(String stageName, Status status, String errorLogPath) throws SemanticException {
unRegisterMBeanSafe();
if (isEnabled) {
LOG.debug("Stage Ended {}, {}", stageName, status );
LOG.debug(STAGE_ENDED_LOG_FORMAT, stageName, status);
Progress progress = replicationMetric.getProgress();
Stage stage = progress.getStageByName(stageName);
if(stage == null){
Expand All @@ -209,7 +235,7 @@ public void reportStageEnd(String stageName, Status status, String errorLogPath)
public void reportStageEnd(String stageName, Status status) throws SemanticException {
unRegisterMBeanSafe();
if (isEnabled) {
LOG.debug("Stage Ended {}, {}", stageName, status );
LOG.debug(STAGE_ENDED_LOG_FORMAT, stageName, status);
Progress progress = replicationMetric.getProgress();
Stage stage = progress.getStageByName(stageName);
if(stage == null){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class Stage {
private long endTime;
private long endTimeOnSrc;
private long endTimeOnTgt;
private long lastSuccessfulDumpId;
private Map<String, Metric> metrics = new HashMap<>();
private String errorLogPath;
private SnapshotUtils.ReplSnapshotCount replSnapshotCount = new SnapshotUtils.ReplSnapshotCount();
Expand Down Expand Up @@ -62,6 +63,7 @@ public Stage(Stage stage) {
this.replStats = stage.replStats;
this.endTimeOnSrc = stage.endTimeOnSrc;
this.endTimeOnTgt = stage.endTimeOnTgt;
this.lastSuccessfulDumpId = stage.lastSuccessfulDumpId;
}

public String getName() {
Expand Down Expand Up @@ -112,6 +114,14 @@ public void setEndTimeOnTgt(long endTimeOnTgt) {
this.endTimeOnTgt = endTimeOnTgt;
}

public long getLastSuccessfulDumpId() {
return lastSuccessfulDumpId;
}

public void setLastSuccessfulDumpId(long lastSuccessfulDumpId) {
this.lastSuccessfulDumpId = lastSuccessfulDumpId;
}

public void addMetric(Metric metric) {
this.metrics.put(metric.getName(), metric);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ public class StageMapper {

private long endTime = 0;

private long endTimeOnSrc = 0;

private long endTimeOnTgt = 0;

private long lastSuccessfulDumpId;

private List<Metric> metrics = new ArrayList<>();

private String errorLogPath;
Expand Down Expand Up @@ -64,6 +70,30 @@ public long getEndTime() {
return endTime;
}

public long getEndTimeOnSrc() {
return endTimeOnSrc;
}

public void setEndTimeOnSrc(long endTimeOnSrc) {
this.endTimeOnSrc = endTimeOnSrc;
}

public long getEndTimeOnTgt() {
return endTimeOnTgt;
}

public void setEndTimeOnTgt(long endTimeOnTgt) {
this.endTimeOnTgt = endTimeOnTgt;
}

public long getLastSuccessfulDumpId() {
return lastSuccessfulDumpId;
}

public void setLastSuccessfulDumpId(long lastSuccessfulDumpId) {
this.lastSuccessfulDumpId = lastSuccessfulDumpId;
}

public List<Metric> getMetrics() {
return metrics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,5 @@ POSTHOOK: type: QUERY
POSTHOOK: Input: sys@replication_metrics
POSTHOOK: Input: sys@replication_metrics_orig
#### A masked pattern was here ####
repl1 1 {"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null} H4sIAAAAAAAA/22PsQ6CMBRF/6Uzg6xsWjExQSC2TIaYBhsgKS15fZ1I/90iwYi6tefe0/c6EYsCnSUJYRWlKWMkmlErA7pNRItBhuyaltn9WF3KJf0jAPJ+ru4iIvXj+1xoBs0W8BZfYJAIfbOZdqpyys9FPj/dOACpkRqnlz4aFGq9+ugt8f0hS3+NeGvEvg47ABjITFsK7EiinVIRATkqpsVoO7OqH0H4sl2Ar/0T5NGeBTQBAAA= {"status":"SUCCESS","stages":[{"name":"REPL_DUMP","status":"SUCCESS","startTime":0,"endTime":0,"endTimeOnSrc":0,"endTimeOnTgt":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":null,"replStats":null}]} gzip(json-2.0)
repl2 1 {"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null} H4sIAAAAAAAA/22QQQuDMAyF/0vPHubV21YdDERl1dOQUVzQQW0lpifpf1/VOXDbLe8l30vIxEaSZEcWMVFxngjBgtlqwVu3iWnZg+9dkyK9p/kxXrt/AKTyOY8eAgb68V3nWmCzN8qWFqMHwmez23auMl5e8myObiwiaOLG6nWeDEm1SRd8oPJ4SpNfItwToav9DYgGU9MWkjoWaatUwBAGJbQcxs5sqI+2PUeQBI9ltZcxKFilezP+G+Ma4mr3Atju2TJPAQAA {"status":"SUCCESS","stages":[{"name":"REPL_LOAD","status":"SUCCESS","startTime":0,"endTime":0,"endTimeOnSrc":0,"endTimeOnTgt":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":{"numCreated":0,"numDeleted":0},"replStats":null}]} gzip(json-2.0)
repl1 1 {"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null} H4sIAAAAAAAAA22Qyw6CMBBF/6VrFrJlpwUTEhRiYWWIaUoFktKS6XRF+Hd5BCPqbnpuT3szA7HI0VkSEFZQGjFGvBnVckL3gWjeySm7RVnyCItLtqZ/BMC8na8ePCJ19T2nmoHYg7zGBShukTkhpLVPp0LX9XG1BJ1EaMWuxrm40jxOr/OfwgFIjdQ4vT6EBrnajqP3lvLjKYl+DX9v+GM5lQMwkJg649iQQDulPAKyV0zz3jZmUz+CaRd2BWM5vgCmU7uCTQEAAA== {"status":"SUCCESS","stages":[{"name":"REPL_DUMP","status":"SUCCESS","startTime":0,"endTime":0,"endTimeOnSrc":0,"endTimeOnTgt":0,"lastSuccessfulDumpId":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":null,"replStats":null}]} gzip(json-2.0)
repl2 1 {"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null} H4sIAAAAAAAAA22QMQvCMBSE/0vmDrq6aVpBKLaYdpIiIT6rkCbl5WWS/HdTo0LVLXeX73LkzhxJ8o6tmGg5L4Rg2WT1EK3jnRk5QMwORV2eymqdp/QPgNTcpquLjIE5f58rI1DNjaanp6GlI+GVAucuXud+GHfnZzAA4U3NZmzbPW921X56U3lEMMStN6mILEn9liH7QM16Uxa/xHJOLEMXxyFaLG1fS7qylfFaZwxh1MLI0V3tG43VfuAIkiBNjTIHDUmGFxO/yaWS0IUHJuXY4GgBAAA= {"status":"SUCCESS","stages":[{"name":"REPL_LOAD","status":"SUCCESS","startTime":0,"endTime":0,"endTimeOnSrc":0,"endTimeOnTgt":0,"lastSuccessfulDumpId":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":{"numCreated":0,"numDeleted":0},"replStats":null}]} gzip(json-2.0)