Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7069] Optimize metaclient construction and include table config… #10048

Merged
merged 7 commits into from
Nov 15, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ public class HoodieClusteringJob {
private HoodieTableMetaClient metaClient;

public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs));
this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs),
UtilHelpers.createMetaClient(jsc, cfg.basePath, true));
}

public HoodieClusteringJob(JavaSparkContext jsc, Config cfg, TypedProperties props) {
public HoodieClusteringJob(JavaSparkContext jsc, Config cfg, TypedProperties props, HoodieTableMetaClient metaClient) {
this.cfg = cfg;
this.jsc = jsc;
this.props = props;
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
this.metaClient = metaClient;
// Disable async cleaning, will trigger synchronous cleaning manually.
this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
// add default lock config options if MDT is enabled.
UtilHelpers.addLockOptions(cfg.basePath, this.props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,18 @@ public class HoodieCompactor {
private transient FileSystem fs;
private TypedProperties props;
private final JavaSparkContext jsc;
private final HoodieTableMetaClient metaClient;
private HoodieTableMetaClient metaClient;

public HoodieCompactor(JavaSparkContext jsc, Config cfg) {
this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs));
this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs),
UtilHelpers.createMetaClient(jsc, cfg.basePath, true));
}

public HoodieCompactor(JavaSparkContext jsc, Config cfg, TypedProperties props) {
public HoodieCompactor(JavaSparkContext jsc, Config cfg, TypedProperties props, HoodieTableMetaClient metaClient) {
this.cfg = cfg;
this.jsc = jsc;
this.props = props;
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
this.metaClient = metaClient;
// Disable async cleaning, will trigger synchronous cleaning manually.
this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
Expand Down Expand Up @@ -256,7 +257,7 @@ private int doCompact(JavaSparkContext jsc) throws Exception {
// If no compaction instant is provided by --instant-time, find the earliest scheduled compaction
// instant from the active timeline
if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
metaClient = HoodieTableMetaClient.reload(metaClient);
Option<HoodieInstant> firstCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
if (firstCompactionInstant.isPresent()) {
cfg.compactionInstantTime = firstCompactionInstant.get().getTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.hudi.utilities.multitable;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.utilities.HoodieClusteringJob;

import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -43,13 +44,18 @@ class ClusteringTask extends TableServiceTask {
*/
private String clusteringMode;

/**
* Meta Client.
*/
private HoodieTableMetaClient metaClient;

@Override
void run() {
HoodieClusteringJob.Config clusteringConfig = new HoodieClusteringJob.Config();
clusteringConfig.basePath = basePath;
clusteringConfig.parallelism = parallelism;
clusteringConfig.runningMode = clusteringMode;
new HoodieClusteringJob(jsc, clusteringConfig, props).cluster(retry);
new HoodieClusteringJob(jsc, clusteringConfig, props, metaClient).cluster(retry);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be caution that the timeline should be refreshed each time for compaction when metaClient is reused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This‘s right. In the existing code, the metaclient is reloaded during clustering, and a new metaclient is created during compaction. A better implementation would be to have consistent behavior for compaction as well. The metaclient should be reloaded at the point of creation.

private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
LOG.info("Step 1: Do schedule");
metaClient = HoodieTableMetaClient.reload(metaClient);
...
}
� private int doCompact(JavaSparkContext jsc) throws Exception {
 ...
         if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
        HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);...
}

I will make modifications here.

}

/**
Expand Down Expand Up @@ -98,6 +104,11 @@ public static final class Builder {
*/
private int retry;

/**
* Meta Client.
*/
private HoodieTableMetaClient metaClient;

private Builder() {
}

Expand Down Expand Up @@ -131,6 +142,11 @@ public Builder withRetry(int retry) {
return this;
}

public Builder withMetaclient(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
return this;
}

public ClusteringTask build() {
ClusteringTask clusteringTask = new ClusteringTask();
clusteringTask.jsc = this.jsc;
Expand All @@ -139,6 +155,7 @@ public ClusteringTask build() {
clusteringTask.retry = this.retry;
clusteringTask.basePath = this.basePath;
clusteringTask.props = this.props;
clusteringTask.metaClient = this.metaClient;
return clusteringTask;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.hudi.utilities.multitable;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.utilities.HoodieCompactor;

import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -48,6 +49,11 @@ class CompactionTask extends TableServiceTask {
*/
private int parallelism;

/**
* Meta Client.
*/
private HoodieTableMetaClient metaClient;

@Override
void run() {
HoodieCompactor.Config compactionCfg = new HoodieCompactor.Config();
Expand All @@ -56,7 +62,7 @@ void run() {
compactionCfg.runningMode = compactionRunningMode;
compactionCfg.parallelism = parallelism;
compactionCfg.retry = retry;
new HoodieCompactor(jsc, compactionCfg, props).compact(retry);
new HoodieCompactor(jsc, compactionCfg, props, metaClient).compact(retry);
}

/**
Expand Down Expand Up @@ -109,6 +115,11 @@ public static final class Builder {
*/
private JavaSparkContext jsc;

/**
* Meta Client.
*/
private HoodieTableMetaClient metaClient;

public Builder withProps(TypedProperties props) {
this.props = props;
return this;
Expand Down Expand Up @@ -144,6 +155,11 @@ public Builder withJsc(JavaSparkContext jsc) {
return this;
}

public Builder withMetaclient(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
return this;
}

public CompactionTask build() {
CompactionTask compactionTask = new CompactionTask();
compactionTask.basePath = this.basePath;
Expand All @@ -153,6 +169,7 @@ public CompactionTask build() {
compactionTask.compactionStrategyName = this.compactionStrategyName;
compactionTask.retry = this.retry;
compactionTask.props = this.props;
compactionTask.metaClient = this.metaClient;
return compactionTask;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.utilities.UtilHelpers;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -166,6 +168,9 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js
HoodieMultiTableServicesMain.Config cfg,
TypedProperties props) {
TableServicePipeline pipeline = new TableServicePipeline();
HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, basePath, true);
// Add the table config to the write config.
props.putAll(metaClient.getTableConfig().getProps());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use table config to overwrite those configs already set in write configs by user? Not sure which one have a higher priority here. @danny0405 What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can indeed lead to a priority issue here. A simple solution is to use addNecessaryTableConfigToWriteConfig to add the necessary parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you refering to the per-table options? We should consider write config options with higher priority always.

if (cfg.enableCompaction) {
pipeline.add(CompactionTask.newBuilder()
.withJsc(jsc)
Expand All @@ -175,6 +180,7 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js
.withCompactionStrategyName(cfg.compactionStrategyClassName)
.withProps(props)
.withRetry(cfg.retry)
.withMetaclient(metaClient)
.build());
}
if (cfg.enableClustering) {
Expand All @@ -185,6 +191,7 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js
.withClusteringRunningMode(cfg.clusteringRunningMode)
.withProps(props)
.withRetry(cfg.retry)
.withMetaclient(metaClient)
.build());
}
if (cfg.enableClean) {
Expand Down
Loading