Skip to content

Commit 981d141

Browse files
Merge 768a82e into 4d73159
2 parents 4d73159 + 768a82e commit 981d141

File tree

2 files changed

+50
-12
lines changed

2 files changed

+50
-12
lines changed

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,13 @@
132132
throw yexception() << _s.ToString(); \
133133
} while (false)
134134
135+
#define THROW_PARQUET_NOT_OK(status) \
136+
do \
137+
{ \
138+
if (::arrow::Status _s = (status); !_s.ok()) \
139+
throw parquet::ParquetException(_s.ToString()); \
140+
} while (false)
141+
135142
namespace NYql::NDq {
136143
137144
using namespace ::NActors;
@@ -637,8 +644,8 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
637644
638645
// init the 1st reader, get meta/rg count
639646
readers.resize(1);
640-
THROW_ARROW_NOT_OK(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit)));
641-
THROW_ARROW_NOT_OK(builder.Build(&readers[0]));
647+
THROW_PARQUET_NOT_OK(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit)));
648+
THROW_PARQUET_NOT_OK(builder.Build(&readers[0]));
642649
auto fileMetadata = readers[0]->parquet_reader()->metadata();
643650
644651
bool hasPredicate = ReadSpec->Predicate.payload_case() != NYql::NConnector::NApi::TPredicate::PayloadCase::PAYLOAD_NOT_SET;
@@ -647,7 +654,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
647654
648655
if (numGroups) {
649656
std::shared_ptr<arrow::Schema> schema;
650-
THROW_ARROW_NOT_OK(readers[0]->GetSchema(&schema));
657+
THROW_PARQUET_NOT_OK(readers[0]->GetSchema(&schema));
651658
std::vector<int> columnIndices;
652659
std::vector<TColumnConverter> columnConverters;
653660
@@ -684,17 +691,17 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
684691
// init other readers if any
685692
readers.resize(readerCount);
686693
for (ui64 i = 1; i < readerCount; i++) {
687-
THROW_ARROW_NOT_OK(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit),
694+
THROW_PARQUET_NOT_OK(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit),
688695
parquet::default_reader_properties(),
689696
fileMetadata));
690-
THROW_ARROW_NOT_OK(builder.Build(&readers[i]));
697+
THROW_PARQUET_NOT_OK(builder.Build(&readers[i]));
691698
}
692699
}
693700
694701
for (ui64 i = 0; i < readerCount; i++) {
695702
if (!columnIndices.empty()) {
696703
CurrentRowGroupIndex = i;
697-
THROW_ARROW_NOT_OK(readers[i]->WillNeedRowGroups({ hasPredicate ? static_cast<int>(matchedRowGroups[i]) : static_cast<int>(i) }, columnIndices));
704+
THROW_PARQUET_NOT_OK(readers[i]->WillNeedRowGroups({ hasPredicate ? static_cast<int>(matchedRowGroups[i]) : static_cast<int>(i) }, columnIndices));
698705
SourceContext->IncChunkCount();
699706
}
700707
RowGroupReaderIndex[i] = i;
@@ -736,7 +743,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
736743
std::shared_ptr<arrow::Table> table;
737744
738745
LOG_CORO_D("Decode RowGroup " << readyGroupIndex << " of " << numGroups << " from reader " << readyReaderIndex);
739-
THROW_ARROW_NOT_OK(readers[readyReaderIndex]->DecodeRowGroups({ hasPredicate ? static_cast<int>(matchedRowGroups[readyGroupIndex]) : static_cast<int>(readyGroupIndex) }, columnIndices, &table));
746+
THROW_PARQUET_NOT_OK(readers[readyReaderIndex]->DecodeRowGroups({ hasPredicate ? static_cast<int>(matchedRowGroups[readyGroupIndex]) : static_cast<int>(readyGroupIndex) }, columnIndices, &table));
740747
readyGroupCount++;
741748
742749
auto downloadedBytes = ReadInflightSize[readyGroupIndex];
@@ -772,7 +779,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
772779
if (nextGroup < numGroups) {
773780
if (!columnIndices.empty()) {
774781
CurrentRowGroupIndex = nextGroup;
775-
THROW_ARROW_NOT_OK(readers[readyReaderIndex]->WillNeedRowGroups({ hasPredicate ? static_cast<int>(nextGroup) : static_cast<int>(nextGroup) }, columnIndices));
782+
THROW_PARQUET_NOT_OK(readers[readyReaderIndex]->WillNeedRowGroups({ hasPredicate ? static_cast<int>(nextGroup) : static_cast<int>(nextGroup) }, columnIndices));
776783
SourceContext->IncChunkCount();
777784
}
778785
RowGroupReaderIndex[nextGroup] = readyReaderIndex;
@@ -806,11 +813,11 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
806813
properties.set_cache_options(arrow::io::CacheOptions::LazyDefaults());
807814
properties.set_pre_buffer(true);
808815
builder.properties(properties);
809-
THROW_ARROW_NOT_OK(builder.Open(arrowFile));
810-
THROW_ARROW_NOT_OK(builder.Build(&fileReader));
816+
THROW_PARQUET_NOT_OK(builder.Open(arrowFile));
817+
THROW_PARQUET_NOT_OK(builder.Build(&fileReader));
811818
812819
std::shared_ptr<arrow::Schema> schema;
813-
THROW_ARROW_NOT_OK(fileReader->GetSchema(&schema));
820+
THROW_PARQUET_NOT_OK(fileReader->GetSchema(&schema));
814821
std::vector<int> columnIndices;
815822
std::vector<TColumnConverter> columnConverters;
816823
@@ -829,7 +836,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
829836
830837
std::shared_ptr<arrow::Table> table;
831838
ui64 ingressBytes = IngressBytes;
832-
THROW_ARROW_NOT_OK(fileReader->ReadRowGroup(group, columnIndices, &table));
839+
THROW_PARQUET_NOT_OK(fileReader->ReadRowGroup(group, columnIndices, &table));
833840
ui64 downloadedBytes = IngressBytes - ingressBytes;
834841
auto reader = std::make_unique<arrow::TableBatchReader>(*table);
835842

ydb/tests/fq/s3/test_s3_0.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,37 @@ def test_bad_format(self, kikimr, s3, client, runtime_listing, unique_prefix):
762762
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
763763
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
764764

765+
@yq_v2
766+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
767+
def test_bad_request(self, kikimr, s3, client, unique_prefix):
768+
resource = boto3.resource(
769+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
770+
)
771+
772+
bucket = resource.Bucket("bbucket")
773+
bucket.create(ACL='public-read')
774+
bucket.objects.all().delete()
775+
776+
s3_client = boto3.client(
777+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
778+
)
779+
780+
s3_client.put_object(Body="blah blah blah", Bucket='bbucket', Key='file.json', ContentType='text/plain')
781+
782+
kikimr.control_plane.wait_bootstrap(1)
783+
storage_connection_name = unique_prefix + "badbucket"
784+
client.create_storage_connection(storage_connection_name, "bbucket")
785+
786+
sql = f'''
787+
select * from `{storage_connection_name}`.`file.json` with (format=parquet, schema (data string));
788+
'''
789+
790+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
791+
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
792+
assert "Query failed with code BAD_REQUEST" in str(
793+
client.describe_query(query_id).result
794+
)
795+
765796
@yq_v1
766797
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
767798
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": os.getenv("YDB_ENDPOINT")}], indirect=True)

0 commit comments

Comments
 (0)