Skip to content

Commit

Permalink
[fix](s3) fix invalid s3 properties checking logic (#35762)
Browse files Browse the repository at this point in the history
## Proposed changes
Introduced from #35515

1. Fix invalid `to_int()` method logic
2. Remove unnecessary properties when creating s3 resource
Before, after recreating s3 resource, there will be some extra
properties being added to the resource properties,
such as AWS_ACCESS_KEY, but this keys are only for s3 client on BE side,
don' t needed when ping s3.
    But it will add some invalid properties such as `AWS_TOKEN=null`
  • Loading branch information
morningman authored and dataroaring committed Jun 4, 2024
1 parent 49418b8 commit 26f720f
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 26 deletions.
9 changes: 5 additions & 4 deletions be/src/util/s3_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ bool is_s3_conf_valid(const S3ClientConf& conf) {
// Return true is convert `str` to int successfully
bool to_int(std::string_view str, int& res) {
auto [_, ec] = std::from_chars(str.data(), str.data() + str.size(), res);
return ec != std::errc {};
return ec == std::errc {};
}

const std::string USE_PATH_STYLE = "use_path_style";
Expand Down Expand Up @@ -258,18 +258,19 @@ Status S3ClientFactory::convert_properties_to_s3_conf(
}
if (auto it = properties.find(S3_MAX_CONN_SIZE); it != properties.end()) {
if (!to_int(it->second, s3_conf->client_conf.max_connections)) {
return Status::InvalidArgument("invalid {} value {}", S3_MAX_CONN_SIZE, it->second);
return Status::InvalidArgument("invalid {} value \"{}\"", S3_MAX_CONN_SIZE, it->second);
}
}
if (auto it = properties.find(S3_REQUEST_TIMEOUT_MS); it != properties.end()) {
if (!to_int(it->second, s3_conf->client_conf.request_timeout_ms)) {
return Status::InvalidArgument("invalid {} value {}", S3_REQUEST_TIMEOUT_MS,
return Status::InvalidArgument("invalid {} value \"{}\"", S3_REQUEST_TIMEOUT_MS,
it->second);
}
}
if (auto it = properties.find(S3_CONN_TIMEOUT_MS); it != properties.end()) {
if (!to_int(it->second, s3_conf->client_conf.connect_timeout_ms)) {
return Status::InvalidArgument("invalid {} value {}", S3_CONN_TIMEOUT_MS, it->second);
return Status::InvalidArgument("invalid {} value \"{}\"", S3_CONN_TIMEOUT_MS,
it->second);
}
}

Expand Down
15 changes: 3 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.doris.common.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.remote.S3FileSystem;

Expand Down Expand Up @@ -121,15 +120,6 @@ protected void setProperties(Map<String, String> properties) throws DdlException
private static void pingS3(CloudCredentialWithEndpoint credential, String bucketName, String rootPath,
Map<String, String> properties) throws DdlException {
String bucket = "s3://" + bucketName + "/";
Map<String, String> propertiesPing = new HashMap<>();
propertiesPing.put(S3Properties.Env.ACCESS_KEY, credential.getAccessKey());
propertiesPing.put(S3Properties.Env.SECRET_KEY, credential.getSecretKey());
propertiesPing.put(S3Properties.Env.TOKEN, credential.getSessionToken());
propertiesPing.put(S3Properties.Env.ENDPOINT, credential.getEndpoint());
propertiesPing.put(S3Properties.Env.REGION, credential.getRegion());
propertiesPing.put(PropertyConverter.USE_PATH_STYLE,
properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false"));
properties.putAll(propertiesPing);
S3FileSystem fileSystem = new S3FileSystem(properties);
String testFile = bucket + rootPath + "/test-object-valid.txt";
String content = "doris will be better";
Expand All @@ -142,14 +132,14 @@ private static void pingS3(CloudCredentialWithEndpoint credential, String bucket
if (status != Status.OK) {
throw new DdlException(
"ping s3 failed(upload), status: " + status + ", properties: " + new PrintableMap<>(
propertiesPing, "=", true, false, true, false));
properties, "=", true, false, true, false));
}
} finally {
if (status.ok()) {
Status delete = fileSystem.delete(testFile);
if (delete != Status.OK) {
LOG.warn("delete test file failed, status: {}, properties: {}", delete, new PrintableMap<>(
propertiesPing, "=", true, false, true, false));
properties, "=", true, false, true, false));
}
}
}
Expand Down Expand Up @@ -250,3 +240,4 @@ protected void getProcNodeData(BaseProcResult result) {
readUnlock();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ protected FileSystem nativeFileSystem(String remotePath) throws UserException {
if (dfsFileSystem == null) {
Configuration conf = new Configuration();
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
PropertyConverter.convertToHadoopFSProperties(properties).forEach(conf::set);
// the entry value in properties may be null, and
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
.filter(entry -> entry.getKey() != null && entry.getValue() != null)
.forEach(entry -> conf.set(entry.getKey(), entry.getValue()));
try {
dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ protected TableValuedFunctionIf toCatalogFunction() {
Map<String, String> arguments = getTVFProperties().getMap();
return new HdfsTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build HdfsTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
throw new AnalysisException("Can not build hdfs(): " + t.getMessage(), t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ protected TableValuedFunctionIf toCatalogFunction() {
Map<String, String> arguments = getTVFProperties().getMap();
return new LocalTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build LocalTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
throw new AnalysisException("Can not build local(): " + t.getMessage(), t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ protected TableValuedFunctionIf toCatalogFunction() {
Map<String, String> arguments = getTVFProperties().getMap();
return new S3TableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build S3TableValuedFunction by "
+ this + ": " + t.getMessage(), t);
throw new AnalysisException("Can not build s3(): " + t.getMessage(), t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ public S3TableValuedFunction(Map<String, String> properties) throws AnalysisExce

S3URI s3uri = getS3Uri(uriStr, Boolean.parseBoolean(usePathStyle.toLowerCase()),
Boolean.parseBoolean(forceParsingByStandardUri.toLowerCase()));
String endpoint = getOrDefaultAndRemove(otherProps, S3Properties.ENDPOINT, s3uri.getEndpoint().orElseThrow(() ->
new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ENDPOINT))));

// get endpoint first from properties, if not present, get it from s3 uri.
// If endpoint is missing, exception will be thrown.
String endpoint = getOrDefaultAndRemove(otherProps, S3Properties.ENDPOINT, s3uri.getEndpoint().orElse(""));
if (Strings.isNullOrEmpty(endpoint)) {
throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ENDPOINT));
}
if (!otherProps.containsKey(S3Properties.REGION)) {
String region = s3uri.getRegion().orElseThrow(() ->
new AnalysisException(String.format("Properties '%s' is required.", S3Properties.REGION)));
Expand Down Expand Up @@ -151,3 +156,4 @@ public String getTableName() {
return "S3TableValuedFunction";
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ suite("test_s3_tvf_with_resource", "p0") {

// test outfile to s3
def outfile_url = outfile_to_S3()
// outfile_url like: s3://doris-build-hk-1308700295/est_s3_tvf/export_test/exp_f2cb650bbb94431a-ab0bc3e6f3e89f04_*

// 1. normal
try {
Expand All @@ -112,7 +113,7 @@ suite("test_s3_tvf_with_resource", "p0") {
// 2. test endpoint property
try {
order_qt_select_2 """ SELECT * FROM S3 (
"uri" = "http://${outfile_url.substring(5)}0.orc",
"uri" = "${outfile_url}0.orc",
"format" = "orc",
"resource" = "${resource_name}"
);
Expand Down

0 comments on commit 26f720f

Please sign in to comment.