Skip to content

Commit

Permalink
[HUDI-7357] Introduce generic StorageConfiguration (#10586)
Browse files Browse the repository at this point in the history
This commit introduces the generic `StorageConfiguration` to store configuration for I/O with `HoodieStorage`. Given there's overhead of reinitializing Hadoop's `Configuration` instance, the approach is to wrap the instance in the `HadoopStorageConfiguration` implementation.  This change will enable us to remove our dependency on Hadoop's `Configuration` class.  When integrated, places using `Configuration` will be replaced by `StorageConfiguration` and the `StorageConfiguration` will be passed around for instantiating `HoodieStorage` (unless Hadoop-based readers need the `Configuration` instance).
  • Loading branch information
yihua authored Feb 14, 2024
1 parent 83206a2 commit 1f7e0f6
Show file tree
Hide file tree
Showing 6 changed files with 415 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

package org.apache.hudi.hadoop.fs;

import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -49,6 +52,28 @@ public static Configuration prepareHadoopConf(Configuration conf) {
return conf;
}

public static StorageConfiguration<Configuration> getStorageConf(Configuration conf) {
return getStorageConf(conf, false);
}

public static StorageConfiguration<Configuration> getStorageConf(Configuration conf, boolean copy) {
return new HadoopStorageConfiguration(conf, copy);
}

public static <T> FileSystem getFs(String pathStr, StorageConfiguration<T> storageConf) {
return getFs(new Path(pathStr), storageConf);
}

public static <T> FileSystem getFs(Path path, StorageConfiguration<T> storageConf) {
return getFs(path, storageConf, false);
}

public static <T> FileSystem getFs(Path path, StorageConfiguration<T> storageConf, boolean newCopy) {
T conf = newCopy ? storageConf.newCopy() : storageConf.get();
ValidationUtils.checkArgument(conf instanceof Configuration);
return getFs(path, (Configuration) conf);
}

public static FileSystem getFs(String pathStr, Configuration conf) {
return getFs(new Path(pathStr), conf);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.storage.hadoop;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.storage.StorageConfiguration;

import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

/**
* Implementation of {@link StorageConfiguration} providing Hadoop's {@link Configuration}.
*/
public class HadoopStorageConfiguration extends StorageConfiguration<Configuration> {
private static final long serialVersionUID = 1L;

private transient Configuration configuration;

public HadoopStorageConfiguration() {
this(new Configuration());
}

public HadoopStorageConfiguration(Configuration configuration) {
this(configuration, false);
}

public HadoopStorageConfiguration(Configuration configuration, boolean copy) {
if (copy) {
this.configuration = new Configuration(configuration);
} else {
this.configuration = configuration;
}
}

public HadoopStorageConfiguration(HadoopStorageConfiguration configuration) {
this.configuration = configuration.newCopy();
}

@Override
public Configuration get() {
return configuration;
}

@Override
public Configuration newCopy() {
return new Configuration(configuration);
}

@Override
public void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
configuration.write(out);
}

@Override
public void readObject(ObjectInputStream in) throws IOException {
configuration = new Configuration(false);
configuration.readFields(in);
}

@Override
public void set(String key, String value) {
configuration.set(key, value);
}

@Override
public Option<String> getString(String key) {
return Option.ofNullable(configuration.get(key));
}

@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
configuration.iterator().forEachRemaining(
e -> stringBuilder.append(String.format("%s => %s \n", e.getKey(), e.getValue())));
return stringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
* under the License.
*/

package org.apache.hudi.hadoop.storage;
package org.apache.hudi.storage.hadoop;

import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.TestHoodieStorageBase;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.storage.hadoop;

import org.apache.hudi.io.storage.BaseTestStorageConfiguration;
import org.apache.hudi.storage.StorageConfiguration;

import org.apache.hadoop.conf.Configuration;

import java.util.Map;

/**
* Tests {@link HadoopStorageConfiguration}.
*/
public class TestStorageConfigurationHadoopStorageConfiguration extends BaseTestStorageConfiguration<Configuration> {
@Override
protected StorageConfiguration<Configuration> getStorageConfiguration(Configuration conf) {
return new HadoopStorageConfiguration(conf);
}

@Override
protected Configuration getConf(Map<String, String> mapping) {
Configuration conf = new Configuration();
mapping.forEach(conf::set);
return conf;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.storage;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
* Interface providing the storage configuration in type {@link T}.
*
* @param <T> type of storage configuration to provide.
*/
public abstract class StorageConfiguration<T> implements Serializable {
/**
* @return the storage configuration.
*/
public abstract T get();

/**
* @return a new copy of the storage configuration.
*/
public abstract T newCopy();

/**
* Serializes the storage configuration.
* DO NOT change the signature, as required by {@link Serializable}.
*
* @param out stream to write.
* @throws IOException on I/O error.
*/
public abstract void writeObject(ObjectOutputStream out) throws IOException;

/**
* Deserializes the storage configuration.
* DO NOT change the signature, as required by {@link Serializable}.
*
* @param in stream to read.
* @throws IOException on I/O error.
*/
public abstract void readObject(ObjectInputStream in) throws IOException;

/**
* Sets the configuration key-value pair.
*
* @param key in String.
* @param value in String.
*/
public abstract void set(String key, String value);

/**
* Gets the String value of a property key.
*
* @param key property key in String.
* @return the property value if present, or {@code Option.empty()}.
*/
public abstract Option<String> getString(String key);

/**
* Gets the String value of a property key if present, or the default value if not.
*
* @param key property key in String.
* @param defaultValue default value is the property does not exist.
* @return the property value if present, or the default value.
*/
public final String getString(String key, String defaultValue) {
Option<String> value = getString(key);
return value.isPresent() ? value.get() : defaultValue;
}

/**
* Gets the boolean value of a property key if present, or the default value if not.
*
* @param key property key in String.
* @param defaultValue default value is the property does not exist.
* @return the property value if present, or the default value.
*/
public final boolean getBoolean(String key, boolean defaultValue) {
Option<String> value = getString(key);
return value.isPresent()
? (!StringUtils.isNullOrEmpty(value.get()) ? Boolean.parseBoolean(value.get()) : defaultValue)
: defaultValue;
}

/**
* Gets the long value of a property key if present, or the default value if not.
*
* @param key property key in String.
* @param defaultValue default value is the property does not exist.
* @return the property value if present, or the default value.
*/
public final long getLong(String key, long defaultValue) {
Option<String> value = getString(key);
return value.isPresent() ? Long.parseLong(value.get()) : defaultValue;
}

/**
* Gets the Enum value of a property key if present, or the default value if not.
*
* @param key property key in String.
* @param defaultValue default value is the property does not exist.
* @param <T> Enum.
* @return the property value if present, or the default value.
*/
public <T extends Enum<T>> T getEnum(String key, T defaultValue) {
Option<String> value = getString(key);
return value.isPresent()
? Enum.valueOf(defaultValue.getDeclaringClass(), value.get())
: defaultValue;
}
}
Loading

0 comments on commit 1f7e0f6

Please sign in to comment.