Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6497] WIP HoodieStorage abstraction #10360

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.aws.sync;

import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.CollectionUtils;
Expand All @@ -28,6 +29,9 @@
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;

import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.GlueAsyncClient;
import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
import software.amazon.awssdk.services.glue.model.BatchCreatePartitionRequest;
Expand Down Expand Up @@ -55,9 +59,6 @@
import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.glue.model.TableInput;
import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.ArrayList;
Expand All @@ -81,7 +82,6 @@
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;
import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;

/**
* This class implements all the AWS APIs to enable syncing of a Hudi Table with the
Expand Down Expand Up @@ -149,7 +149,7 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
Table table = getTable(awsGlue, databaseName, tableName);
StorageDescriptor sd = table.storageDescriptor();
List<PartitionInput> partitionInputs = partitionsToAdd.stream().map(partition -> {
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
String fullPartitionPath = FSUtils.getPartitionPathInPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
StorageDescriptor partitionSD = sd.copy(copySd -> copySd.location(fullPartitionPath));
return PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build();
Expand Down Expand Up @@ -192,7 +192,7 @@ public void updatePartitionsToTable(String tableName, List<String> changedPartit
Table table = getTable(awsGlue, databaseName, tableName);
StorageDescriptor sd = table.storageDescriptor();
List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> {
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
String fullPartitionPath = FSUtils.getPartitionPathInPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
StorageDescriptor partitionSD = sd.copy(copySd -> copySd.location(fullPartitionPath));
PartitionInput partitionInput = PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.hudi.aws.sync;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;

Expand Down Expand Up @@ -62,7 +62,7 @@ public static void main(String[] args) {
}
// HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
TypedProperties props = params.toProps();
Configuration hadoopConf = FSUtils.getFs(props.getString(META_SYNC_BASE_PATH.key()), new Configuration()).getConf();
Configuration hadoopConf = HadoopFSUtils.getFs(props.getString(META_SYNC_BASE_PATH.key()), new Configuration()).getConf();
try (AwsGlueCatalogSyncTool tool = new AwsGlueCatalogSyncTool(props, hadoopConf)) {
tool.syncHoodieTable();
}
Expand Down
14 changes: 9 additions & 5 deletions hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.hudi.cli.utils.TempViewProvider;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieStorage;
import org.apache.hudi.io.storage.HoodieStorageUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -39,7 +41,7 @@ public class HoodieCLI {

public static Configuration conf;
public static ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
public static FileSystem fs;
public static HoodieStorage storage;
public static CLIState state = CLIState.INIT;
public static String basePath;
protected static HoodieTableMetaClient tableMetadata;
Expand Down Expand Up @@ -73,15 +75,17 @@ private static void setLayoutVersion(Integer layoutVersion) {

public static boolean initConf() {
if (HoodieCLI.conf == null) {
HoodieCLI.conf = FSUtils.prepareHadoopConf(new Configuration());
HoodieCLI.conf = HadoopFSUtils.prepareHadoopConf(new Configuration());
return true;
}
return false;
}

public static void initFS(boolean force) throws IOException {
if (fs == null || force) {
fs = (tableMetadata != null) ? tableMetadata.getFs() : FileSystem.get(conf);
if (storage == null || force) {
storage = (tableMetadata != null)
? tableMetadata.getHoodieStorage()
: HoodieStorageUtils.getHoodieStorage(FileSystem.get(conf));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
Expand All @@ -38,12 +37,13 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.storage.HoodieFileStatus;
import org.apache.hudi.io.storage.HoodieLocation;
import org.apache.hudi.io.storage.HoodieStorageUtils;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -106,16 +106,20 @@ public String showArchivedCommits(
throws IOException {
System.out.println("===============> Showing only " + limit + " archived commits <===============");
String basePath = HoodieCLI.getTableMetaClient().getBasePath();
Path archivePath = new Path(HoodieCLI.getTableMetaClient().getArchivePath() + "/.commits_.archive*");
HoodieLocation archivePath = new HoodieLocation(
HoodieCLI.getTableMetaClient().getArchivePath() + "/.commits_.archive*");
if (folder != null && !folder.isEmpty()) {
archivePath = new Path(basePath + "/.hoodie/" + folder);
archivePath = new HoodieLocation(basePath + "/.hoodie/" + folder);
}
FileStatus[] fsStatuses = FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(archivePath);
List<HoodieFileStatus> fsStatuses =
HoodieStorageUtils.getHoodieStorage(basePath, HoodieCLI.conf).globEntries(archivePath);
List<Comparable[]> allStats = new ArrayList<>();
for (FileStatus fs : fsStatuses) {
for (HoodieFileStatus fs : fsStatuses) {
// read the archived file
Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(basePath, HoodieCLI.conf),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
Reader reader = HoodieLogFormat.newReader(
HoodieStorageUtils.getHoodieStorage(basePath, HoodieCLI.conf),
new HoodieLogFile(fs.getLocation()),
HoodieArchivedMetaEntry.getClassSchema());

List<IndexedRecord> readRecords = new ArrayList<>();
// read the avro blocks
Expand Down Expand Up @@ -182,14 +186,17 @@ public String showCommits(
System.out.println("===============> Showing only " + limit + " archived commits <===============");
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
String basePath = metaClient.getBasePath();
Path archivePath = new Path(metaClient.getArchivePath() + "/.commits_.archive*");
FileStatus[] fsStatuses =
FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(archivePath);
HoodieLocation archivePath =
new HoodieLocation(metaClient.getArchivePath() + "/.commits_.archive*");
List<HoodieFileStatus> fsStatuses =
HoodieStorageUtils.getHoodieStorage(basePath, HoodieCLI.conf).globEntries(archivePath);
List<Comparable[]> allCommits = new ArrayList<>();
for (FileStatus fs : fsStatuses) {
for (HoodieFileStatus fs : fsStatuses) {
// read the archived file
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(basePath, HoodieCLI.conf),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(
HoodieStorageUtils.getHoodieStorage(basePath, HoodieCLI.conf),
new HoodieLogFile(fs.getLocation()),
HoodieArchivedMetaEntry.getClassSchema());

List<IndexedRecord> readRecords = new ArrayList<>();
// read the avro blocks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,16 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieLocation;
import org.apache.hudi.io.storage.HoodieStorage;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.action.compact.OperationResult;
import org.apache.hudi.utilities.UtilHelpers;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
Expand All @@ -57,6 +56,7 @@
import org.springframework.shell.standard.ShellOption;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -432,17 +432,17 @@ private static String getTmpSerializerFile() {
return TMP_DIR + UUID.randomUUID().toString() + ".ser";
}

private <T> T deSerializeOperationResult(String inputP, FileSystem fs) throws Exception {
Path inputPath = new Path(inputP);
FSDataInputStream fsDataInputStream = fs.open(inputPath);
ObjectInputStream in = new ObjectInputStream(fsDataInputStream);
private <T> T deSerializeOperationResult(HoodieLocation inputLocation,
HoodieStorage storage) throws Exception {
InputStream inputStream = storage.open(inputLocation);
ObjectInputStream in = new ObjectInputStream(inputStream);
try {
T result = (T) in.readObject();
LOG.info("Result : " + result);
return result;
} finally {
in.close();
fsDataInputStream.close();
inputStream.close();
}
}

Expand All @@ -463,7 +463,7 @@ public String validateCompaction(
HoodieCLI.initFS(initialized);

String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
HoodieLocation outputLocation = new HoodieLocation(outputPathStr);
String output;
try {
String sparkPropertiesPath = Utils
Expand All @@ -477,7 +477,7 @@ public String validateCompaction(
if (exitCode != 0) {
return "Failed to validate compaction for " + compactionInstant;
}
List<ValidationOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
List<ValidationOpResult> res = deSerializeOperationResult(outputLocation, HoodieCLI.storage);
boolean valid = res.stream().map(OperationResult::isSuccess).reduce(Boolean::logicalAnd).orElse(true);
String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
List<Comparable[]> rows = new ArrayList<>();
Expand All @@ -502,8 +502,8 @@ public String validateCompaction(
headerOnly, rows);
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
if (HoodieCLI.storage.exists(outputLocation)) {
HoodieCLI.storage.deleteFile(outputLocation);
}
}
return output;
Expand All @@ -528,7 +528,7 @@ public String unscheduleCompaction(
HoodieCLI.initFS(initialized);

String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
HoodieLocation outputLocation = new HoodieLocation(outputPathStr);
String output;
try {
String sparkPropertiesPath = Utils
Expand All @@ -543,13 +543,13 @@ public String unscheduleCompaction(
if (exitCode != 0) {
return "Failed to unschedule compaction for " + compactionInstant;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
List<RenameOpResult> res = deSerializeOperationResult(outputLocation, HoodieCLI.storage);
output =
getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "unschedule pending compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
if (HoodieCLI.storage.exists(outputLocation)) {
HoodieCLI.storage.deleteFile(outputLocation);
}
}
return output;
Expand All @@ -573,7 +573,7 @@ public String unscheduleCompactFile(
HoodieCLI.initFS(initialized);

String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
HoodieLocation outputLocation = new HoodieLocation(outputPathStr);
String output;
try {
String sparkPropertiesPath = Utils
Expand All @@ -588,13 +588,13 @@ public String unscheduleCompactFile(
if (exitCode != 0) {
return "Failed to unschedule compaction for file " + fileId;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
List<RenameOpResult> res = deSerializeOperationResult(outputLocation, HoodieCLI.storage);
output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly,
"unschedule file from pending compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
if (HoodieCLI.storage.exists(outputLocation)) {
HoodieCLI.storage.deleteFile(outputLocation);
}
}
return output;
Expand All @@ -619,7 +619,7 @@ public String repairCompaction(
HoodieCLI.initFS(initialized);

String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
HoodieLocation outputLocation = new HoodieLocation(outputPathStr);
String output;
try {
String sparkPropertiesPath = Utils
Expand All @@ -633,12 +633,12 @@ public String repairCompaction(
if (exitCode != 0) {
return "Failed to unschedule compaction for " + compactionInstant;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
List<RenameOpResult> res = deSerializeOperationResult(outputLocation, HoodieCLI.storage);
output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "repair compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
if (HoodieCLI.storage.exists(outputLocation)) {
HoodieCLI.storage.deleteFile(outputLocation);
}
}
return output;
Expand Down
Loading
Loading