Skip to content

Commit 6e17260

Browse files
fixed issues
1 parent 5ded5bd commit 6e17260

File tree

4 files changed

+67
-20
lines changed

4 files changed

+67
-20
lines changed

ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -152,30 +152,40 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
152152
// Cutting file
153153

154154
TMaybe<TString> DecompressFile(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
155-
auto dataBuffer = NDB::ReadBufferFromString(data);
156-
auto decompressorBuffer = NYql::MakeDecompressor(dataBuffer, *DecompressionFormat_);
157-
if (!decompressorBuffer) {
158-
auto error = MakeError(
155+
try {
156+
auto dataBuffer = NDB::ReadBufferFromString(data);
157+
auto decompressorBuffer = NYql::MakeDecompressor(dataBuffer, *DecompressionFormat_);
158+
if (!decompressorBuffer) {
159+
auto error = MakeError(
160+
request.Path,
161+
NFq::TIssuesIds::INTERNAL_ERROR,
162+
TStringBuilder{} << "unknown compression: " << *DecompressionFormat_ << ". Use one of: gzip, zstd, lz4, brotli, bzip2, xz"
163+
);
164+
SendError(ctx, error);
165+
return {};
166+
}
167+
168+
TStringBuilder decompressedData;
169+
while (!decompressorBuffer->eof() && decompressedData.size() < 10_MB) {
170+
decompressorBuffer->nextIfAtEnd();
171+
size_t maxDecompressedChunkSize = std::min(
172+
decompressorBuffer->available(),
173+
10_MB - decompressedData.size()
174+
);
175+
TString decompressedChunk{maxDecompressedChunkSize, ' '};
176+
decompressorBuffer->read(&decompressedChunk.front(), maxDecompressedChunkSize);
177+
decompressedData << decompressedChunk;
178+
}
179+
return std::move(decompressedData);
180+
} catch (const yexception& error) {
181+
auto errorEv = MakeError(
159182
request.Path,
160183
NFq::TIssuesIds::INTERNAL_ERROR,
161-
TStringBuilder{} << "invalid decompression format: " << *DecompressionFormat_
184+
TStringBuilder{} << "couldn't decompress file, check format and compression params: " << error.AsStrBuf()
162185
);
163-
SendError(ctx, error);
186+
SendError(ctx, errorEv);
164187
return {};
165188
}
166-
167-
TString decompressedData;
168-
while (!decompressorBuffer->eof() && decompressedData.size() < 10_MB) {
169-
decompressorBuffer->nextIfAtEnd();
170-
size_t maxDecompressedChunkSize = std::min(
171-
decompressorBuffer->available(),
172-
10_MB - decompressedData.size()
173-
);
174-
TString decompressedChunk{maxDecompressedChunkSize, ' '};
175-
decompressorBuffer->read(&decompressedChunk.front(), maxDecompressedChunkSize);
176-
decompressedData += decompressedChunk;
177-
}
178-
return std::move(decompressedData);
179189
}
180190

181191
std::shared_ptr<arrow::io::RandomAccessFile> CleanupCsvFile(const TString& data, const TRequest& request, const arrow::csv::ParseOptions& options, const NActors::TActorContext& ctx) {

ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class ArrowInferenceTest : public testing::Test {
5050

5151
NActors::TActorId RegisterInferencinator(TStringBuf formatStr) {
5252
auto format = NInference::ConvertFileFormat(formatStr);
53-
auto arrowFetcher = ActorSystem.Register(NInference::CreateArrowFetchingActor(S3ActorId, format), 1);
53+
auto arrowFetcher = ActorSystem.Register(NInference::CreateArrowFetchingActor(S3ActorId, format, {}), 1);
5454
return ActorSystem.Register(NInference::CreateArrowInferencinator(arrowFetcher, format, {}), 1);
5555
}
5656

ydb/core/external_sources/object_storage/inference/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ PEERDIR(
1515

1616
ydb/core/external_sources/object_storage
1717

18+
ydb/library/yql/providers/s3/compressors
1819
ydb/library/yql/udfs/common/clickhouse/client
1920
)
2021

ydb/tests/fq/s3/test_compressions.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,3 +204,39 @@ def test_invalid_compression(self, kikimr, s3, client, unique_prefix):
204204
assert (
205205
"Unknown compression: some_compression. Use one of: gzip, zstd, lz4, brotli, bzip2, xz" in describe_string
206206
)
207+
208+
@yq_v2
209+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
210+
def test_invalid_compression_inference(self, kikimr, s3, client, unique_prefix):
211+
resource = boto3.resource(
212+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
213+
)
214+
215+
bucket = resource.Bucket("fbucket")
216+
bucket.create(ACL='public-read')
217+
218+
s3_client = boto3.client(
219+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
220+
)
221+
222+
fruits = R'''[{"name" : "banana", "price" : 3, "weight" : 100}]'''
223+
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.json', ContentType='text/plain')
224+
kikimr.control_plane.wait_bootstrap(1)
225+
226+
storage_connection_name = unique_prefix + "fruitbucket"
227+
client.create_storage_connection(storage_connection_name, "fbucket")
228+
229+
sql = fR'''
230+
SELECT *
231+
FROM `{storage_connection_name}`.`fruits.json`
232+
WITH (format=csv_with_names, compression="gzip", with_infer="true");
233+
'''
234+
235+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
236+
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
237+
describe_result = client.describe_query(query_id).result
238+
logging.debug("Describe result: {}".format(describe_result))
239+
describe_string = "{}".format(describe_result)
240+
assert (
241+
"couldn\\'t decompress file, check format and compression params:" in describe_string
242+
)

0 commit comments

Comments
 (0)