Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ struct TS3ReadError : public yexception {
using yexception::yexception;
};

void ThrowParquetNotOk(arrow::Status status) {
if (!status.ok()) {
throw parquet::ParquetException(status.ToString());
}
}

using namespace NKikimr::NMiniKQL;

ui64 SubtractSaturating(ui64 lhs, ui64 rhs) {
Expand Down Expand Up @@ -637,8 +643,8 @@ class TS3ReadCoroImpl : public TActorCoroImpl {

// init the 1st reader, get meta/rg count
readers.resize(1);
THROW_ARROW_NOT_OK(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit)));
THROW_ARROW_NOT_OK(builder.Build(&readers[0]));
ThrowParquetNotOk(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit)));
ThrowParquetNotOk(builder.Build(&readers[0]));
auto fileMetadata = readers[0]->parquet_reader()->metadata();

bool hasPredicate = ReadSpec->Predicate.payload_case() != NYql::NConnector::NApi::TPredicate::PayloadCase::PAYLOAD_NOT_SET;
Expand All @@ -647,7 +653,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {

if (numGroups) {
std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(readers[0]->GetSchema(&schema));
ThrowParquetNotOk(readers[0]->GetSchema(&schema));
std::vector<int> columnIndices;
std::vector<TColumnConverter> columnConverters;

Expand Down Expand Up @@ -684,17 +690,17 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
// init other readers if any
readers.resize(readerCount);
for (ui64 i = 1; i < readerCount; i++) {
THROW_ARROW_NOT_OK(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit),
ThrowParquetNotOk(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit),
parquet::default_reader_properties(),
fileMetadata));
THROW_ARROW_NOT_OK(builder.Build(&readers[i]));
ThrowParquetNotOk(builder.Build(&readers[i]));
}
}

for (ui64 i = 0; i < readerCount; i++) {
if (!columnIndices.empty()) {
CurrentRowGroupIndex = i;
THROW_ARROW_NOT_OK(readers[i]->WillNeedRowGroups({ hasPredicate ? static_cast<int>(matchedRowGroups[i]) : static_cast<int>(i) }, columnIndices));
ThrowParquetNotOk(readers[i]->WillNeedRowGroups({ hasPredicate ? static_cast<int>(matchedRowGroups[i]) : static_cast<int>(i) }, columnIndices));
SourceContext->IncChunkCount();
}
RowGroupReaderIndex[i] = i;
Expand Down Expand Up @@ -736,7 +742,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
std::shared_ptr<arrow::Table> table;

LOG_CORO_D("Decode RowGroup " << readyGroupIndex << " of " << numGroups << " from reader " << readyReaderIndex);
THROW_ARROW_NOT_OK(readers[readyReaderIndex]->DecodeRowGroups({ hasPredicate ? static_cast<int>(matchedRowGroups[readyGroupIndex]) : static_cast<int>(readyGroupIndex) }, columnIndices, &table));
ThrowParquetNotOk(readers[readyReaderIndex]->DecodeRowGroups({ hasPredicate ? static_cast<int>(matchedRowGroups[readyGroupIndex]) : static_cast<int>(readyGroupIndex) }, columnIndices, &table));
readyGroupCount++;

auto downloadedBytes = ReadInflightSize[readyGroupIndex];
Expand Down Expand Up @@ -772,7 +778,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
if (nextGroup < numGroups) {
if (!columnIndices.empty()) {
CurrentRowGroupIndex = nextGroup;
THROW_ARROW_NOT_OK(readers[readyReaderIndex]->WillNeedRowGroups({ hasPredicate ? static_cast<int>(nextGroup) : static_cast<int>(nextGroup) }, columnIndices));
ThrowParquetNotOk(readers[readyReaderIndex]->WillNeedRowGroups({ hasPredicate ? static_cast<int>(nextGroup) : static_cast<int>(nextGroup) }, columnIndices));
SourceContext->IncChunkCount();
}
RowGroupReaderIndex[nextGroup] = readyReaderIndex;
Expand Down Expand Up @@ -806,11 +812,11 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
properties.set_cache_options(arrow::io::CacheOptions::LazyDefaults());
properties.set_pre_buffer(true);
builder.properties(properties);
THROW_ARROW_NOT_OK(builder.Open(arrowFile));
THROW_ARROW_NOT_OK(builder.Build(&fileReader));
ThrowParquetNotOk(builder.Open(arrowFile));
ThrowParquetNotOk(builder.Build(&fileReader));

std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(fileReader->GetSchema(&schema));
ThrowParquetNotOk(fileReader->GetSchema(&schema));
std::vector<int> columnIndices;
std::vector<TColumnConverter> columnConverters;

Expand All @@ -829,7 +835,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {

std::shared_ptr<arrow::Table> table;
ui64 ingressBytes = IngressBytes;
THROW_ARROW_NOT_OK(fileReader->ReadRowGroup(group, columnIndices, &table));
ThrowParquetNotOk(fileReader->ReadRowGroup(group, columnIndices, &table));
ui64 downloadedBytes = IngressBytes - ingressBytes;
auto reader = std::make_unique<arrow::TableBatchReader>(*table);

Expand Down
31 changes: 31 additions & 0 deletions ydb/tests/fq/s3/test_s3_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,37 @@ def test_bad_format(self, kikimr, s3, client, runtime_listing, unique_prefix):
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)

@yq_v2
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_bad_request_on_invalid_parquet(self, kikimr, s3, client, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("bbucket")
bucket.create(ACL='public-read')
bucket.objects.all().delete()

s3_client = boto3.client(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

s3_client.put_object(Body='not a parquet file', Bucket='bbucket', Key='file.txt', ContentType='text/plain')

kikimr.control_plane.wait_bootstrap(1)
storage_connection_name = unique_prefix + "badbucket"
client.create_storage_connection(storage_connection_name, "bbucket")

sql = f'''
select * from `{storage_connection_name}`.`file.txt` with (format=parquet, schema (data string));
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)

error_message = str(client.describe_query(query_id).result)
assert ("Query failed with code BAD_REQUEST" in error_message) and ("Parquet magic bytes not found in footer." in error_message)

@yq_v1
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": os.getenv("YDB_ENDPOINT")}], indirect=True)
Expand Down