Skip to content

Commit

Permalink
[Minor] Throw exceptions when cleaner/compactor fail (#10108)
Browse files Browse the repository at this point in the history
Co-authored-by: Shawn Chang <yxchang@amazon.com>
  • Loading branch information
CTTY and CTTY authored Nov 16, 2023
1 parent bada5d9 commit 35af64d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.exception.HoodieException;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -103,28 +104,20 @@ public static void main(String[] args) {
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
throw new HoodieException("Failed to run cleaning for " + cfg.basePath);
}

String dirName = new Path(cfg.basePath).getName();
JavaSparkContext jssc = UtilHelpers.buildSparkContext("hoodie-cleaner-" + dirName, cfg.sparkMaster);
boolean success = true;

try {
new HoodieCleaner(cfg, jssc).run();
} catch (Throwable throwable) {
success = false;
LOG.error("Failed to run cleaning for " + cfg.basePath, throwable);
throw new HoodieException("Failed to run cleaning for " + cfg.basePath, throwable);
} finally {
jssc.stop();
}

if (!success) {
// Return a non-zero exit code to properly notify any resource manager
// that cleaning was not successful
System.exit(1);
}

LOG.info("Cleaner ran successfully");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;

Expand Down Expand Up @@ -168,18 +169,20 @@ public static void main(String[] args) {
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
throw new HoodieException("Fail to run compaction for " + cfg.tableName + ", return code: " + 1);
}
final JavaSparkContext jsc = UtilHelpers.buildSparkContext("compactor-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
int ret = 0;
try {
HoodieCompactor compactor = new HoodieCompactor(jsc, cfg);
ret = compactor.compact(cfg.retry);
ret = new HoodieCompactor(jsc, cfg).compact(cfg.retry);
} catch (Throwable throwable) {
LOG.error("Fail to run compaction for " + cfg.tableName, throwable);
throw new HoodieException("Fail to run compaction for " + cfg.tableName + ", return code: " + ret, throwable);
} finally {
jsc.stop();
System.exit(ret);
}

if (ret != 0) {
throw new HoodieException("Fail to run compaction for " + cfg.tableName + ", return code: " + ret);
}
}

Expand Down

0 comments on commit 35af64d

Please sign in to comment.