Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
csun5285 committed Aug 6, 2024
1 parent 16ab9ad commit 219786b
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 34 deletions.
60 changes: 59 additions & 1 deletion be/src/http/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,11 @@ Status HttpClient::init(const std::string& url, bool set_fail_on_error) {
LOG(WARNING) << "fail to set CURLOPT_WRITEDATA, msg=" << _to_errmsg(code);
return Status::InternalError("fail to set CURLOPT_WRITEDATA");
}

std::string escaped_url;
RETURN_IF_ERROR(_escape_url(url, &escaped_url));
// set url
code = curl_easy_setopt(_curl, CURLOPT_URL, url.c_str());
code = curl_easy_setopt(_curl, CURLOPT_URL, escaped_url.c_str());
if (code != CURLE_OK) {
LOG(WARNING) << "failed to set CURLOPT_URL, errmsg=" << _to_errmsg(code);
return Status::InternalError("fail to set CURLOPT_URL");
Expand Down Expand Up @@ -290,4 +293,59 @@ Status HttpClient::execute_with_retry(int retry_times, int sleep_time,
return status;
}

// http://example.com/page?param1=value1&param2=value+with+spaces#section
Status HttpClient::_escape_url(const std::string& url, std::string* escaped_url) {
size_t query_pos = url.find('?');
if (query_pos == std::string::npos) {
*escaped_url = url;
return Status::OK();
}
size_t fragment_pos = url.find('#');
std::string query;
std::string fragment;

if (fragment_pos == std::string::npos) {
query = url.substr(query_pos + 1, url.length() - query_pos - 1);
} else {
query = url.substr(query_pos + 1, fragment_pos - query_pos - 1);
fragment = url.substr(fragment_pos, url.length() - fragment_pos);
}

std::string encoded_query;
size_t ampersand_pos = query.find('&');
size_t equal_pos;

if (ampersand_pos == std::string::npos) {
ampersand_pos = query.length();
}

while (true) {
equal_pos = query.find('=');
if (equal_pos != std::string::npos) {
std::string key = query.substr(0, equal_pos);
std::string value = query.substr(equal_pos + 1, ampersand_pos - equal_pos - 1);

auto encoded_value = std::unique_ptr<char, decltype(&curl_free)>(
curl_easy_escape(_curl, value.c_str(), value.length()), &curl_free);
if (encoded_value) {
encoded_query += key + "=" + std::string(encoded_value.get());
} else {
return Status::InternalError("escape url failed, url={}", url);
}
} else {
encoded_query += query.substr(0, ampersand_pos);
}

if (ampersand_pos == query.length() || ampersand_pos == std::string::npos) {
break;
}

encoded_query += "&";
query = query.substr(ampersand_pos + 1);
ampersand_pos = query.find('&');
}
*escaped_url = url.substr(0, query_pos + 1) + encoded_query + fragment;
return Status::OK();
}

} // namespace doris
9 changes: 9 additions & 0 deletions be/src/http/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ class HttpClient {

size_t on_response_data(const void* data, size_t length);

// The file name of the variant column with the inverted index contains %
// such as: 020000000000003f624c4c322c568271060f9b5b274a4a95_0_10133@properties%2Emessage.idx
// {rowset_id}_{seg_num}_{index_id}_{variant_column_name}{%2E}{extracted_column_name}.idx
// We need to handle %, otherwise it will cause an HTTP 404 error.
// Because the percent ("%") character serves as the indicator for percent-encoded octets,
// it must be percent-encoded as "%25" for that octet to be used as data within a URI.
// https://datatracker.ietf.org/doc/html/rfc3986
Status _escape_url(const std::string& url, std::string* escaped_url);

private:
const char* _to_errmsg(CURLcode code);

Expand Down
15 changes: 1 addition & 14 deletions be/src/olap/single_replica_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,20 +404,7 @@ Status SingleReplicaCompaction::_download_files(DataDir* data_dir,
return Status::InternalError("single compaction init curl failed");
}
for (auto& file_name : file_name_list) {
// The file name of the variant column with the inverted index contains %
// such as: 020000000000003f624c4c322c568271060f9b5b274a4a95_0_10133@properties%2Emessage.idx
// {rowset_id}_{seg_num}_{index_id}_{variant_column_name}{%2E}{extracted_column_name}.idx
// We need to handle %, otherwise it will cause an HTTP 404 error.
// Because the percent ("%") character serves as the indicator for percent-encoded octets,
// it must be percent-encoded as "%25" for that octet to be used as data within a URI.
// https://datatracker.ietf.org/doc/html/rfc3986
auto output = std::unique_ptr<char, decltype(&curl_free)>(
curl_easy_escape(curl.get(), file_name.c_str(), file_name.length()), &curl_free);
if (!output) {
return Status::InternalError("escape file name failed, file name={}", file_name);
}
std::string encoded_filename(output.get());
auto remote_file_url = remote_url_prefix + encoded_filename;
auto remote_file_url = remote_url_prefix + file_name;

// get file length
uint64_t file_size = 0;
Expand Down
20 changes: 1 addition & 19 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,26 +523,8 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re
uint64_t total_file_size = 0;
MonotonicStopWatch watch;
watch.start();
auto curl = std::unique_ptr<CURL, decltype(&curl_easy_cleanup)>(curl_easy_init(),
&curl_easy_cleanup);
if (!curl) {
return Status::InternalError("engine clone task init curl failed");
}
for (auto& file_name : file_name_list) {
// The file name of the variant column with the inverted index contains %
// such as: 020000000000003f624c4c322c568271060f9b5b274a4a95_0_10133@properties%2Emessage.idx
// {rowset_id}_{seg_num}_{index_id}_{variant_column_name}{%2E}{extracted_column_name}.idx
// We need to handle %, otherwise it will cause an HTTP 404 error.
// Because the percent ("%") character serves as the indicator for percent-encoded octets,
// it must be percent-encoded as "%25" for that octet to be used as data within a URI.
// https://datatracker.ietf.org/doc/html/rfc3986
auto output = std::unique_ptr<char, decltype(&curl_free)>(
curl_easy_escape(curl.get(), file_name.c_str(), file_name.length()), &curl_free);
if (!output) {
return Status::InternalError("escape file name failed, file name={}", file_name);
}
std::string encoded_filename(output.get());
auto remote_file_url = remote_url_prefix + encoded_filename;
auto remote_file_url = remote_url_prefix + file_name;

// get file length
uint64_t file_size = 0;
Expand Down
42 changes: 42 additions & 0 deletions be/test/http/http_client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,46 @@ TEST_F(HttpClientTest, download_file_md5) {
close(fd);
}

TEST_F(HttpClientTest, escape_url) {
HttpClient client;
client._curl = curl_easy_init();
auto check_result = [&client](const auto& input_url, const auto& output_url) -> bool {
std::string escaped_url;
if (!client._escape_url(input_url, &escaped_url).ok()) {
return false;
}
if (escaped_url != output_url) {
return false;
}
return true;
};
std::string input_A = hostname + "/download_file?token=oxof&file_name=02x_0.dat";
std::string output_A = hostname + "/download_file?token=oxof&file_name=02x_0.dat";
ASSERT_TRUE(check_result(input_A, output_A));

std::string input_B = hostname + "/download_file?";
std::string output_B = hostname + "/download_file?";
ASSERT_TRUE(check_result(input_B, output_B));

std::string input_C = hostname + "/download_file";
std::string output_C = hostname + "/download_file";
ASSERT_TRUE(check_result(input_C, output_C));

std::string input_D = hostname + "/download_file?&";
std::string output_D = hostname + "/download_file?&";
ASSERT_TRUE(check_result(input_D, output_D));

std::string input_E = hostname + "/download_file?key=0x2E";
std::string output_E = hostname + "/download_file?key=0x2E";
ASSERT_TRUE(check_result(input_E, output_E));

std::string input_F = hostname + "/download_file?key=0x2E&key=%";
std::string output_F = hostname + "/download_file?key=0x2E&key=%25";
ASSERT_TRUE(check_result(input_F, output_F));

std::string input_G = hostname + "/download_file?key=0x2E&key=%2E#section";
std::string output_G = hostname + "/download_file?key=0x2E&key=%252E#section";
ASSERT_TRUE(check_result(input_G, output_G));
}

} // namespace doris
67 changes: 67 additions & 0 deletions regression-test/suites/load_p2/test_single_replica_load.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// The cases is copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
// and modified by Doris.

suite("test_single_replica_load", "p2") {

def load_json_data = {table_name, file_name ->
// load the json data
streamLoad {
table "${table_name}"

// set http request header params
set 'read_json_by_line', 'true'
set 'format', 'json'
set 'max_filter_ratio', '0.1'
file file_name // import json file
time 10000 // limit inflight 10s

// if declared a check callback, the default check condition will ignore.
// So you must check all condition

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
logger.info("Stream load ${file_name} result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}

def tableName = "test_single_replica_load"

sql "DROP TABLE IF EXISTS ${tableName}"
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
k bigint,
v variant,
INDEX idx(v) USING INVERTED PROPERTIES("parser"="standard") COMMENT ''
)
DUPLICATE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 1
properties("replication_num" = "2", "disable_auto_compaction" = "true", "inverted_index_storage_format" = "V1");
"""
load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""")
load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""")
load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""")
}

0 comments on commit 219786b

Please sign in to comment.