Skip to content

Commit

Permalink
[HUDI-6497] WIP HoodieStorage abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed Jan 23, 2024
1 parent e85f8f4 commit 6632d6e
Show file tree
Hide file tree
Showing 701 changed files with 10,850 additions and 7,064 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.aws.sync;

import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.CollectionUtils;
Expand All @@ -28,6 +29,9 @@
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;

import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.GlueAsyncClient;
import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
import software.amazon.awssdk.services.glue.model.BatchCreatePartitionRequest;
Expand Down Expand Up @@ -55,9 +59,6 @@
import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.glue.model.TableInput;
import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.ArrayList;
Expand All @@ -81,7 +82,6 @@
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;
import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;

/**
* This class implements all the AWS APIs to enable syncing of a Hudi Table with the
Expand Down Expand Up @@ -149,7 +149,7 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
Table table = getTable(awsGlue, databaseName, tableName);
StorageDescriptor sd = table.storageDescriptor();
List<PartitionInput> partitionInputs = partitionsToAdd.stream().map(partition -> {
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
String fullPartitionPath = FSUtils.getPartitionPathInPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
StorageDescriptor partitionSD = sd.copy(copySd -> copySd.location(fullPartitionPath));
return PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build();
Expand Down Expand Up @@ -192,7 +192,7 @@ public void updatePartitionsToTable(String tableName, List<String> changedPartit
Table table = getTable(awsGlue, databaseName, tableName);
StorageDescriptor sd = table.storageDescriptor();
List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> {
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
String fullPartitionPath = FSUtils.getPartitionPathInPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
StorageDescriptor partitionSD = sd.copy(copySd -> copySd.location(fullPartitionPath));
PartitionInput partitionInput = PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.hudi.aws.sync;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;

Expand Down Expand Up @@ -62,7 +62,7 @@ public static void main(String[] args) {
}
// HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
TypedProperties props = params.toProps();
Configuration hadoopConf = FSUtils.getFs(props.getString(META_SYNC_BASE_PATH.key()), new Configuration()).getConf();
Configuration hadoopConf = HadoopFSUtils.getFs(props.getString(META_SYNC_BASE_PATH.key()), new Configuration()).getConf();
try (AwsGlueCatalogSyncTool tool = new AwsGlueCatalogSyncTool(props, hadoopConf)) {
tool.syncHoodieTable();
}
Expand Down
14 changes: 9 additions & 5 deletions hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.hudi.cli.utils.TempViewProvider;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieStorage;
import org.apache.hudi.io.storage.HoodieStorageUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -39,7 +41,7 @@ public class HoodieCLI {

public static Configuration conf;
public static ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
public static FileSystem fs;
public static HoodieStorage storage;
public static CLIState state = CLIState.INIT;
public static String basePath;
protected static HoodieTableMetaClient tableMetadata;
Expand Down Expand Up @@ -73,15 +75,17 @@ private static void setLayoutVersion(Integer layoutVersion) {

public static boolean initConf() {
if (HoodieCLI.conf == null) {
HoodieCLI.conf = FSUtils.prepareHadoopConf(new Configuration());
HoodieCLI.conf = HadoopFSUtils.prepareHadoopConf(new Configuration());
return true;
}
return false;
}

public static void initFS(boolean force) throws IOException {
if (fs == null || force) {
fs = (tableMetadata != null) ? tableMetadata.getFs() : FileSystem.get(conf);
if (storage == null || force) {
storage = (tableMetadata != null)
? tableMetadata.getHoodieStorage()
: HoodieStorageUtils.getHoodieStorage(FileSystem.get(conf));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
Expand All @@ -38,12 +37,13 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.storage.HoodieFileStatus;
import org.apache.hudi.io.storage.HoodieLocation;
import org.apache.hudi.io.storage.HoodieStorageUtils;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -106,16 +106,20 @@ public String showArchivedCommits(
throws IOException {
System.out.println("===============> Showing only " + limit + " archived commits <===============");
String basePath = HoodieCLI.getTableMetaClient().getBasePath();
Path archivePath = new Path(HoodieCLI.getTableMetaClient().getArchivePath() + "/.commits_.archive*");
HoodieLocation archivePath = new HoodieLocation(
HoodieCLI.getTableMetaClient().getArchivePath() + "/.commits_.archive*");
if (folder != null && !folder.isEmpty()) {
archivePath = new Path(basePath + "/.hoodie/" + folder);
archivePath = new HoodieLocation(basePath + "/.hoodie/" + folder);
}
FileStatus[] fsStatuses = FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(archivePath);
List<HoodieFileStatus> fsStatuses =
HoodieStorageUtils.getHoodieStorage(basePath, HoodieCLI.conf).globEntries(archivePath);
List<Comparable[]> allStats = new ArrayList<>();
for (FileStatus fs : fsStatuses) {
for (HoodieFileStatus fs : fsStatuses) {
// read the archived file
Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(basePath, HoodieCLI.conf),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
Reader reader = HoodieLogFormat.newReader(
HoodieStorageUtils.getHoodieStorage(basePath, HoodieCLI.conf),
new HoodieLogFile(fs.getLocation()),
HoodieArchivedMetaEntry.getClassSchema());

List<IndexedRecord> readRecords = new ArrayList<>();
// read the avro blocks
Expand Down Expand Up @@ -182,14 +186,17 @@ public String showCommits(
System.out.println("===============> Showing only " + limit + " archived commits <===============");
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
String basePath = metaClient.getBasePath();
Path archivePath = new Path(metaClient.getArchivePath() + "/.commits_.archive*");
FileStatus[] fsStatuses =
FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(archivePath);
HoodieLocation archivePath =
new HoodieLocation(metaClient.getArchivePath() + "/.commits_.archive*");
List<HoodieFileStatus> fsStatuses =
HoodieStorageUtils.getHoodieStorage(basePath, HoodieCLI.conf).globEntries(archivePath);
List<Comparable[]> allCommits = new ArrayList<>();
for (FileStatus fs : fsStatuses) {
for (HoodieFileStatus fs : fsStatuses) {
// read the archived file
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(basePath, HoodieCLI.conf),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(
HoodieStorageUtils.getHoodieStorage(basePath, HoodieCLI.conf),
new HoodieLogFile(fs.getLocation()),
HoodieArchivedMetaEntry.getClassSchema());

List<IndexedRecord> readRecords = new ArrayList<>();
// read the avro blocks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,16 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieLocation;
import org.apache.hudi.io.storage.HoodieStorage;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.action.compact.OperationResult;
import org.apache.hudi.utilities.UtilHelpers;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
Expand All @@ -57,6 +56,7 @@
import org.springframework.shell.standard.ShellOption;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -432,17 +432,17 @@ private static String getTmpSerializerFile() {
return TMP_DIR + UUID.randomUUID().toString() + ".ser";
}

private <T> T deSerializeOperationResult(String inputP, FileSystem fs) throws Exception {
Path inputPath = new Path(inputP);
FSDataInputStream fsDataInputStream = fs.open(inputPath);
ObjectInputStream in = new ObjectInputStream(fsDataInputStream);
private <T> T deSerializeOperationResult(HoodieLocation inputLocation,
HoodieStorage storage) throws Exception {
InputStream inputStream = storage.open(inputLocation);
ObjectInputStream in = new ObjectInputStream(inputStream);
try {
T result = (T) in.readObject();
LOG.info("Result : " + result);
return result;
} finally {
in.close();
fsDataInputStream.close();
inputStream.close();
}
}

Expand All @@ -463,7 +463,7 @@ public String validateCompaction(
HoodieCLI.initFS(initialized);

String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
HoodieLocation outputLocation = new HoodieLocation(outputPathStr);
String output;
try {
String sparkPropertiesPath = Utils
Expand All @@ -477,7 +477,7 @@ public String validateCompaction(
if (exitCode != 0) {
return "Failed to validate compaction for " + compactionInstant;
}
List<ValidationOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
List<ValidationOpResult> res = deSerializeOperationResult(outputLocation, HoodieCLI.storage);
boolean valid = res.stream().map(OperationResult::isSuccess).reduce(Boolean::logicalAnd).orElse(true);
String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
List<Comparable[]> rows = new ArrayList<>();
Expand All @@ -502,8 +502,8 @@ public String validateCompaction(
headerOnly, rows);
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
if (HoodieCLI.storage.exists(outputLocation)) {
HoodieCLI.storage.deleteFile(outputLocation);
}
}
return output;
Expand All @@ -528,7 +528,7 @@ public String unscheduleCompaction(
HoodieCLI.initFS(initialized);

String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
HoodieLocation outputLocation = new HoodieLocation(outputPathStr);
String output;
try {
String sparkPropertiesPath = Utils
Expand All @@ -543,13 +543,13 @@ public String unscheduleCompaction(
if (exitCode != 0) {
return "Failed to unschedule compaction for " + compactionInstant;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
List<RenameOpResult> res = deSerializeOperationResult(outputLocation, HoodieCLI.storage);
output =
getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "unschedule pending compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
if (HoodieCLI.storage.exists(outputLocation)) {
HoodieCLI.storage.deleteFile(outputLocation);
}
}
return output;
Expand All @@ -573,7 +573,7 @@ public String unscheduleCompactFile(
HoodieCLI.initFS(initialized);

String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
HoodieLocation outputLocation = new HoodieLocation(outputPathStr);
String output;
try {
String sparkPropertiesPath = Utils
Expand All @@ -588,13 +588,13 @@ public String unscheduleCompactFile(
if (exitCode != 0) {
return "Failed to unschedule compaction for file " + fileId;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
List<RenameOpResult> res = deSerializeOperationResult(outputLocation, HoodieCLI.storage);
output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly,
"unschedule file from pending compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
if (HoodieCLI.storage.exists(outputLocation)) {
HoodieCLI.storage.deleteFile(outputLocation);
}
}
return output;
Expand All @@ -619,7 +619,7 @@ public String repairCompaction(
HoodieCLI.initFS(initialized);

String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
HoodieLocation outputLocation = new HoodieLocation(outputPathStr);
String output;
try {
String sparkPropertiesPath = Utils
Expand All @@ -633,12 +633,12 @@ public String repairCompaction(
if (exitCode != 0) {
return "Failed to unschedule compaction for " + compactionInstant;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
List<RenameOpResult> res = deSerializeOperationResult(outputLocation, HoodieCLI.storage);
output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "repair compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
if (HoodieCLI.storage.exists(outputLocation)) {
HoodieCLI.storage.deleteFile(outputLocation);
}
}
return output;
Expand Down
Loading

0 comments on commit 6632d6e

Please sign in to comment.