33#include " validation_functions.h"
44#include " object_storage/s3_fetcher.h"
55
6+ #include < util/string/join.h>
67#include < ydb/core/external_sources/object_storage/inference/arrow_fetcher.h>
78#include < ydb/core/external_sources/object_storage/inference/arrow_inferencinator.h>
9+ #include < ydb/core/external_sources/object_storage/inference/infer_config.h>
810#include < ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
911#include < ydb/core/protos/external_sources.pb.h>
1012#include < ydb/core/protos/flat_scheme_op.pb.h>
2022#include < ydb/public/sdk/cpp/client/ydb_value/value.h>
2123
2224#include < library/cpp/scheme/scheme.h>
25+ #include < library/cpp/json/json_reader.h>
26+ #include < arrow/buffer_builder.h>
27+ #include < arrow/buffer.h>
28+ #include < arrow/io/memory.h>
2329
2430#include < util/string/builder.h>
2531
@@ -322,21 +328,29 @@ struct TObjectStorageExternalSource : public IExternalSource {
322328 structuredTokenBuilder.SetNoAuth ();
323329 }
324330
325- auto effectiveFilePattern = NYql::NS3::NormalizePath (meta->TableLocation );
326- if (meta->TableLocation .EndsWith (' /' )) {
327- effectiveFilePattern += ' *' ;
328- }
329-
330331 const NYql::TS3Credentials credentials (CredentialsFactory, structuredTokenBuilder.ToJson ());
332+
333+ const TString path = meta->TableLocation ;
334+ const TString filePattern = meta->Attributes .Value (" filepattern" , TString{});
335+ const TVector<TString> partitionedBy = GetPartitionedByConfig (meta);
336+ NYql::NS3Lister::TListingRequest request {
337+ .Url = meta->DataSourceLocation ,
338+ .Credentials = credentials
339+ };
340+
341+ auto error = NYql::NS3::BuildS3FilePattern (path, filePattern, partitionedBy, request);
342+ if (error) {
343+ throw yexception () << *error;
344+ }
345+
346+ auto partByData = std::make_shared<TStringBuilder>();
347+
331348 auto httpGateway = NYql::IHTTPGateway::Make ();
332349 auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy (NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes ()});
333- auto s3Lister = NYql::NS3Lister::MakeS3Lister (httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
334- .Url = meta->DataSourceLocation ,
335- .Credentials = credentials,
336- .Pattern = effectiveFilePattern,
337- }, Nothing (), AllowLocalFiles, ActorSystem);
338- auto afterListing = s3Lister->Next ().Apply ([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
350+ auto s3Lister = NYql::NS3Lister::MakeS3Lister (httpGateway, httpRetryPolicy, request, Nothing (), AllowLocalFiles, ActorSystem);
351+ auto afterListing = s3Lister->Next ().Apply ([partByData, partitionedBy, path = request.Pattern ](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
339352 auto & listRes = listResFut.GetValue ();
353+ auto & partByRef = *partByData;
340354 if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
341355 auto & error = std::get<NYql::NS3Lister::TListError>(listRes);
342356 throw yexception () << error.Issues .ToString ();
@@ -345,6 +359,12 @@ struct TObjectStorageExternalSource : public IExternalSource {
345359 if (entries.Objects .empty ()) {
346360 throw yexception () << " couldn't find files at " << path;
347361 }
362+
363+ partByRef << JoinSeq (" ," , partitionedBy);
364+ for (const auto & entry : entries.Objects ) {
365+ Y_ENSURE (entry.MatchedGlobs .size () == partitionedBy.size ());
366+ partByRef << Endl << JoinSeq (" ," , entry.MatchedGlobs );
367+ }
348368 for (const auto & entry : entries.Objects ) {
349369 if (entry.Size > 0 ) {
350370 return entry;
@@ -362,9 +382,8 @@ struct TObjectStorageExternalSource : public IExternalSource {
362382
363383 meta->Attributes .erase (" withinfer" );
364384
365- auto fileFormat = NObjectStorage::NInference::ConvertFileFormat (*format);
366- auto arrowFetcherId = ActorSystem->Register (NObjectStorage::NInference::CreateArrowFetchingActor (s3FetcherId, fileFormat, meta->Attributes ));
367- auto arrowInferencinatorId = ActorSystem->Register (NObjectStorage::NInference::CreateArrowInferencinator (arrowFetcherId, fileFormat, meta->Attributes ));
385+ auto arrowFetcherId = ActorSystem->Register (NObjectStorage::NInference::CreateArrowFetchingActor (s3FetcherId, meta->Attributes ));
386+ auto arrowInferencinatorId = ActorSystem->Register (NObjectStorage::NInference::CreateArrowInferencinator (arrowFetcherId));
368387
369388 return afterListing.Apply ([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<NYql::NS3Lister::TObjectListEntry>& entryFut) {
370389 auto promise = NThreading::NewPromise<TMetadataResult>();
@@ -387,12 +406,19 @@ struct TObjectStorageExternalSource : public IExternalSource {
387406 auto [path, size, _] = entryFut.GetValue ();
388407 actorSystem->Register (new NKqp::TActorRequestHandler<NObjectStorage::TEvInferFileSchema, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
389408 arrowInferencinatorId,
390- new NObjectStorage::TEvInferFileSchema (TString{ path} , size),
409+ new NObjectStorage::TEvInferFileSchema (std::move ( path) , size),
391410 promise,
392411 std::move (schemaToMetadata)
393412 ));
394413
395414 return promise.GetFuture ();
415+ }).Apply ([arrowInferencinatorId, meta, partByData, partitionedBy, this ](const NThreading::TFuture<TMetadataResult>& result) {
416+ auto & value = result.GetValue ();
417+ if (!value.Success ()) {
418+ return result;
419+ }
420+
421+ return InferPartitionedColumnsTypes (arrowInferencinatorId, partByData, partitionedBy, result);
396422 }).Apply ([](const NThreading::TFuture<TMetadataResult>& result) {
397423 auto & value = result.GetValue ();
398424 if (value.Success ()) {
@@ -407,6 +433,88 @@ struct TObjectStorageExternalSource : public IExternalSource {
407433 }
408434
409435private:
436+ NThreading::TFuture<TMetadataResult> InferPartitionedColumnsTypes (
437+ NActors::TActorId arrowInferencinatorId,
438+ std::shared_ptr<TStringBuilder> partByData,
439+ const TVector<TString>& partitionedBy,
440+ const NThreading::TFuture<TMetadataResult>& result) const {
441+
442+ auto & value = result.GetValue ();
443+ if (partitionedBy.empty ()) {
444+ return result;
445+ }
446+
447+ auto meta = value.Metadata ;
448+ for (const auto & partitionName : partitionedBy) {
449+ auto & destColumn = *meta->Schema .add_column ();
450+ destColumn.mutable_name ()->assign (partitionName);
451+ destColumn.mutable_type ()->set_type_id (Ydb::Type::UTF8);
452+ }
453+
454+ arrow::BufferBuilder builder;
455+ auto partitionBuffer = std::make_shared<arrow::Buffer>(nullptr , 0 );
456+ auto buildStatus = builder.Append (partByData->data (), partByData->size ());
457+ auto finishStatus = builder.Finish (&partitionBuffer);
458+
459+ if (!buildStatus.ok () || !finishStatus.ok ()) {
460+ return result;
461+ }
462+
463+ auto promise = NThreading::NewPromise<TMetadataResult>();
464+ auto partitionsToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response){
465+ if (response.Status .IsSuccess ()) {
466+ THashMap<TString, Ydb::Type> inferredTypes;
467+ for (const auto & column : response.Fields ) {
468+ if (ValidateCommonProjectionType (column.type (), column.name ()).Empty ()) {
469+ inferredTypes[column.name ()] = column.type ();
470+ }
471+ }
472+
473+ for (auto & destColumn : *meta->Schema .mutable_column ()) {
474+ if (auto type = inferredTypes.FindPtr (destColumn.name ()); type) {
475+ destColumn.mutable_type ()->set_type_id (type->type_id ());
476+ }
477+ }
478+ }
479+ TMetadataResult result;
480+ result.SetSuccess ();
481+ result.Metadata = meta;
482+ metaPromise.SetValue (std::move (result));
483+ };
484+
485+ auto bufferReader = std::make_shared<arrow::io::BufferReader>(std::move (partitionBuffer));
486+ auto file = std::dynamic_pointer_cast<arrow::io::RandomAccessFile>(bufferReader);
487+ auto config = NObjectStorage::NInference::MakeFormatConfig ({{ " format" , " csv_with_names" }});
488+ config->ShouldMakeOptional = false ;
489+ ActorSystem->Register (new NKqp::TActorRequestHandler<NObjectStorage::TEvArrowFile, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
490+ arrowInferencinatorId,
491+ new NObjectStorage::TEvArrowFile (config, std::move (file), " " ),
492+ promise,
493+ std::move (partitionsToMetadata)
494+ ));
495+
496+ return promise.GetFuture ();
497+ }
498+
499+ static TVector<TString> GetPartitionedByConfig (std::shared_ptr<TMetadata> meta) {
500+ THashSet<TString> columns;
501+ if (auto partitioned = meta->Attributes .FindPtr (" partitionedby" ); partitioned) {
502+ NJson::TJsonValue values;
503+ Y_ENSURE (NJson::ReadJsonTree (*partitioned, &values));
504+ Y_ENSURE (values.GetType () == NJson::JSON_ARRAY);
505+
506+ for (const auto & value : values.GetArray ()) {
507+ Y_ENSURE (value.GetType () == NJson::JSON_STRING);
508+ if (columns.find (value.GetString ()) != columns.end ()) {
509+ throw yexception () << " invalid partitioned_by parameter, column " << value.GetString () << " mentioned twice" ;
510+ }
511+ columns.insert (value.GetString ());
512+ }
513+ }
514+
515+ return TVector<TString>{columns.begin (), columns.end ()};
516+ }
517+
410518 static bool IsValidIntervalUnit (const TString& unit) {
411519 static constexpr std::array<std::string_view, 7 > IntervalUnits = {
412520 " MICROSECONDS" sv,
0 commit comments