Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](s3) fix invalid s3 properties checking logic #35762

Merged
merged 11 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions be/src/util/s3_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ bool is_s3_conf_valid(const S3ClientConf& conf) {
// Return true is convert `str` to int successfully
bool to_int(std::string_view str, int& res) {
auto [_, ec] = std::from_chars(str.data(), str.data() + str.size(), res);
return ec != std::errc {};
return ec == std::errc {};
}

const std::string USE_PATH_STYLE = "use_path_style";
Expand Down Expand Up @@ -258,18 +258,19 @@ Status S3ClientFactory::convert_properties_to_s3_conf(
}
if (auto it = properties.find(S3_MAX_CONN_SIZE); it != properties.end()) {
if (!to_int(it->second, s3_conf->client_conf.max_connections)) {
return Status::InvalidArgument("invalid {} value {}", S3_MAX_CONN_SIZE, it->second);
return Status::InvalidArgument("invalid {} value \"{}\"", S3_MAX_CONN_SIZE, it->second);
}
}
if (auto it = properties.find(S3_REQUEST_TIMEOUT_MS); it != properties.end()) {
if (!to_int(it->second, s3_conf->client_conf.request_timeout_ms)) {
return Status::InvalidArgument("invalid {} value {}", S3_REQUEST_TIMEOUT_MS,
return Status::InvalidArgument("invalid {} value \"{}\"", S3_REQUEST_TIMEOUT_MS,
it->second);
}
}
if (auto it = properties.find(S3_CONN_TIMEOUT_MS); it != properties.end()) {
if (!to_int(it->second, s3_conf->client_conf.connect_timeout_ms)) {
return Status::InvalidArgument("invalid {} value {}", S3_CONN_TIMEOUT_MS, it->second);
return Status::InvalidArgument("invalid {} value \"{}\"", S3_CONN_TIMEOUT_MS,
it->second);
}
}

Expand Down
15 changes: 3 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.doris.common.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.remote.S3FileSystem;

Expand Down Expand Up @@ -121,15 +120,6 @@ protected void setProperties(Map<String, String> properties) throws DdlException
private static void pingS3(CloudCredentialWithEndpoint credential, String bucketName, String rootPath,
Map<String, String> properties) throws DdlException {
String bucket = "s3://" + bucketName + "/";
Map<String, String> propertiesPing = new HashMap<>();
propertiesPing.put(S3Properties.Env.ACCESS_KEY, credential.getAccessKey());
propertiesPing.put(S3Properties.Env.SECRET_KEY, credential.getSecretKey());
propertiesPing.put(S3Properties.Env.TOKEN, credential.getSessionToken());
propertiesPing.put(S3Properties.Env.ENDPOINT, credential.getEndpoint());
propertiesPing.put(S3Properties.Env.REGION, credential.getRegion());
propertiesPing.put(PropertyConverter.USE_PATH_STYLE,
properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false"));
properties.putAll(propertiesPing);
S3FileSystem fileSystem = new S3FileSystem(properties);
String testFile = bucket + rootPath + "/test-object-valid.txt";
String content = "doris will be better";
Expand All @@ -142,14 +132,14 @@ private static void pingS3(CloudCredentialWithEndpoint credential, String bucket
if (status != Status.OK) {
throw new DdlException(
"ping s3 failed(upload), status: " + status + ", properties: " + new PrintableMap<>(
propertiesPing, "=", true, false, true, false));
properties, "=", true, false, true, false));
}
} finally {
if (status.ok()) {
Status delete = fileSystem.delete(testFile);
if (delete != Status.OK) {
LOG.warn("delete test file failed, status: {}, properties: {}", delete, new PrintableMap<>(
propertiesPing, "=", true, false, true, false));
properties, "=", true, false, true, false));
}
}
}
Expand Down Expand Up @@ -250,3 +240,4 @@ protected void getProcNodeData(BaseProcResult result) {
readUnlock();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ protected FileSystem nativeFileSystem(String remotePath) throws UserException {
if (dfsFileSystem == null) {
Configuration conf = new Configuration();
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
PropertyConverter.convertToHadoopFSProperties(properties).forEach(conf::set);
// the entry value in properties may be null, and
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
.filter(entry -> entry.getKey() != null && entry.getValue() != null)
.forEach(entry -> conf.set(entry.getKey(), entry.getValue()));
try {
dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ protected TableValuedFunctionIf toCatalogFunction() {
Map<String, String> arguments = getTVFProperties().getMap();
return new HdfsTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build HdfsTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
throw new AnalysisException("Can not build hdfs(): " + t.getMessage(), t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ protected TableValuedFunctionIf toCatalogFunction() {
Map<String, String> arguments = getTVFProperties().getMap();
return new LocalTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build LocalTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
throw new AnalysisException("Can not build local(): " + t.getMessage(), t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ protected TableValuedFunctionIf toCatalogFunction() {
Map<String, String> arguments = getTVFProperties().getMap();
return new S3TableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build S3TableValuedFunction by "
+ this + ": " + t.getMessage(), t);
throw new AnalysisException("Can not build s3(): " + t.getMessage(), t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ public S3TableValuedFunction(Map<String, String> properties) throws AnalysisExce

S3URI s3uri = getS3Uri(uriStr, Boolean.parseBoolean(usePathStyle.toLowerCase()),
Boolean.parseBoolean(forceParsingByStandardUri.toLowerCase()));
String endpoint = getOrDefaultAndRemove(otherProps, S3Properties.ENDPOINT, s3uri.getEndpoint().orElseThrow(() ->
new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ENDPOINT))));

// get endpoint first from properties, if not present, get it from s3 uri.
// If endpoint is missing, exception will be thrown.
String endpoint = getOrDefaultAndRemove(otherProps, S3Properties.ENDPOINT, s3uri.getEndpoint().orElse(""));
if (Strings.isNullOrEmpty(endpoint)) {
throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ENDPOINT));
}
if (!otherProps.containsKey(S3Properties.REGION)) {
String region = s3uri.getRegion().orElseThrow(() ->
new AnalysisException(String.format("Properties '%s' is required.", S3Properties.REGION)));
Expand Down Expand Up @@ -151,3 +156,4 @@ public String getTableName() {
return "S3TableValuedFunction";
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ suite("test_s3_tvf_with_resource", "p0") {

// test outfile to s3
def outfile_url = outfile_to_S3()
// outfile_url like: s3://doris-build-hk-1308700295/est_s3_tvf/export_test/exp_f2cb650bbb94431a-ab0bc3e6f3e89f04_*

// 1. normal
try {
Expand All @@ -112,7 +113,7 @@ suite("test_s3_tvf_with_resource", "p0") {
// 2. test endpoint property
try {
order_qt_select_2 """ SELECT * FROM S3 (
"uri" = "http://${outfile_url.substring(5)}0.orc",
"uri" = "${outfile_url}0.orc",
"format" = "orc",
"resource" = "${resource_name}"
);
Expand Down
Loading