Skip to content

Commit b279217

Browse files
Merge 55276be into 2848978
2 parents 2848978 + 55276be commit b279217

File tree

14 files changed

+465
-153
lines changed

14 files changed

+465
-153
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 111 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "validation_functions.h"
44
#include "object_storage/s3_fetcher.h"
55

6+
#include <util/string/join.h>
67
#include <ydb/core/external_sources/object_storage/inference/arrow_fetcher.h>
78
#include <ydb/core/external_sources/object_storage/inference/arrow_inferencinator.h>
89
#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
@@ -20,6 +21,10 @@
2021
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
2122

2223
#include <library/cpp/scheme/scheme.h>
24+
#include <library/cpp/json/json_reader.h>
25+
#include <arrow/buffer_builder.h>
26+
#include <arrow/buffer.h>
27+
#include <arrow/io/memory.h>
2328

2429
#include <util/string/builder.h>
2530

@@ -322,21 +327,26 @@ struct TObjectStorageExternalSource : public IExternalSource {
322327
structuredTokenBuilder.SetNoAuth();
323328
}
324329

325-
auto effectiveFilePattern = NYql::NS3::NormalizePath(meta->TableLocation);
326-
if (meta->TableLocation.EndsWith('/')) {
327-
effectiveFilePattern += '*';
328-
}
329-
330330
const NYql::TS3Credentials credentials(CredentialsFactory, structuredTokenBuilder.ToJson());
331+
332+
const TString path = meta->TableLocation;
333+
const TString filePattern = meta->Attributes.Value("filepattern", TString{});
334+
const TVector<TString> partitionedBy = GetPartitionedByConfig(meta);
335+
NYql::NS3Lister::TListingRequest request {
336+
.Url = meta->DataSourceLocation,
337+
.Credentials = credentials
338+
};
339+
340+
NYql::NS3::BuildS3FilePattern(path, filePattern, partitionedBy, request);
341+
342+
auto partByData = std::make_shared<TStringBuilder>();
343+
331344
auto httpGateway = NYql::IHTTPGateway::Make();
332345
auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
333-
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
334-
.Url = meta->DataSourceLocation,
335-
.Credentials = credentials,
336-
.Pattern = effectiveFilePattern,
337-
}, Nothing(), AllowLocalFiles, ActorSystem);
338-
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
346+
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, request, Nothing(), AllowLocalFiles, ActorSystem);
347+
auto afterListing = s3Lister->Next().Apply([partByData, partitionedBy, path = request.Pattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
339348
auto& listRes = listResFut.GetValue();
349+
auto& partByRef = *partByData;
340350
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
341351
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);
342352
throw yexception() << error.Issues.ToString();
@@ -345,6 +355,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
345355
if (entries.Objects.empty()) {
346356
throw yexception() << "couldn't find files at " << path;
347357
}
358+
359+
partByRef << JoinSeq(",", partitionedBy);
360+
for (const auto& entry : entries.Objects) {
361+
if (entry.MatchedGlobs.size() != partitionedBy.size()) {
362+
continue;
363+
}
364+
partByRef << Endl << JoinSeq(",", entry.MatchedGlobs);
365+
}
348366
for (const auto& entry : entries.Objects) {
349367
if (entry.Size > 0) {
350368
return entry;
@@ -387,12 +405,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
387405
auto [path, size, _] = entryFut.GetValue();
388406
actorSystem->Register(new NKqp::TActorRequestHandler<NObjectStorage::TEvInferFileSchema, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
389407
arrowInferencinatorId,
390-
new NObjectStorage::TEvInferFileSchema(TString{path}, size),
408+
new NObjectStorage::TEvInferFileSchema(std::move(path), size),
391409
promise,
392410
std::move(schemaToMetadata)
393411
));
394412

395413
return promise.GetFuture();
414+
}).Apply([arrowInferencinatorId, meta, partByData, partitionedBy, this](const NThreading::TFuture<TMetadataResult>& result) {
415+
return InferPartitionedColumnsTypes(arrowInferencinatorId, partByData, partitionedBy, result);
396416
}).Apply([](const NThreading::TFuture<TMetadataResult>& result) {
397417
auto& value = result.GetValue();
398418
if (value.Success()) {
@@ -407,6 +427,85 @@ struct TObjectStorageExternalSource : public IExternalSource {
407427
}
408428

409429
private:
430+
NThreading::TFuture<TMetadataResult> InferPartitionedColumnsTypes(
431+
NActors::TActorId arrowInferencinatorId,
432+
std::shared_ptr<TStringBuilder> partByData,
433+
const TVector<TString>& partitionedBy,
434+
const NThreading::TFuture<TMetadataResult>& result) const {
435+
436+
auto& value = result.GetValue();
437+
if (!value.Success() || partitionedBy.empty()) {
438+
return result;
439+
}
440+
441+
auto meta = value.Metadata;
442+
for (const auto& partitionName : partitionedBy) {
443+
auto& destColumn = *meta->Schema.add_column();
444+
destColumn.mutable_name()->assign(partitionName);
445+
destColumn.mutable_type()->set_type_id(Ydb::Type::UTF8);
446+
}
447+
448+
arrow::BufferBuilder builder;
449+
auto partitionBuffer = std::make_shared<arrow::Buffer>(nullptr, 0);
450+
auto buildStatus = builder.Append(partByData->data(), partByData->size());
451+
auto finishStatus = builder.Finish(&partitionBuffer);
452+
453+
if (!buildStatus.ok() || !finishStatus.ok()) {
454+
return result;
455+
}
456+
457+
auto promise = NThreading::NewPromise<TMetadataResult>();
458+
auto partitionsToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response){
459+
if (response.Status.IsSuccess()) {
460+
THashMap<TString, Ydb::Type> inferredTypes;
461+
for (const auto& column : response.Fields) {
462+
inferredTypes[column.name()] = column.type();
463+
}
464+
465+
for (auto& destColumn : *meta->Schema.mutable_column()) {
466+
if (auto type = inferredTypes.FindPtr(destColumn.name()); type) {
467+
destColumn.mutable_type()->set_type_id(type->has_optional_type() ?
468+
type->optional_type().item().type_id() :
469+
type->type_id());
470+
}
471+
}
472+
}
473+
TMetadataResult result;
474+
result.SetSuccess();
475+
result.Metadata = meta;
476+
metaPromise.SetValue(std::move(result));
477+
};
478+
479+
auto file = std::make_shared<arrow::io::BufferReader>(std::move(partitionBuffer));
480+
ActorSystem->Register(new NKqp::TActorRequestHandler<NObjectStorage::TEvInferPartitions, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
481+
arrowInferencinatorId,
482+
new NObjectStorage::TEvInferPartitions(std::move(std::dynamic_pointer_cast<arrow::io::RandomAccessFile>(file))),
483+
promise,
484+
std::move(partitionsToMetadata)
485+
));
486+
487+
return promise.GetFuture();
488+
}
489+
490+
static TVector<TString> GetPartitionedByConfig(std::shared_ptr<TMetadata> meta) {
491+
THashSet<TString> columns;
492+
if (auto partitioned = meta->Attributes.FindPtr("partitionedby"); partitioned) {
493+
NJson::TJsonValue values;
494+
Y_ENSURE(NJson::ReadJsonTree(*partitioned, &values));
495+
Y_ENSURE(values.GetType() == NJson::JSON_ARRAY);
496+
497+
for (const auto& value : values.GetArray()) {
498+
Y_ENSURE(value.GetType() == NJson::JSON_STRING);
499+
if (columns.find(value.GetString()) != columns.end()) {
500+
throw yexception() << "invalid partitioned_by parameter, column " << value.GetString() << "mentioned twice";
501+
}
502+
columns.insert(value.GetString());
503+
}
504+
}
505+
506+
return TVector<TString>{columns.begin(), columns.end()};
507+
}
508+
410509
static bool IsValidIntervalUnit(const TString& unit) {
411510
static constexpr std::array<std::string_view, 7> IntervalUnits = {
412511
"MICROSECONDS"sv,

ydb/core/external_sources/object_storage/events.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ enum EEventTypes : ui32 {
2626

2727
// Move somewhere separate?
2828
EvInferFileSchema,
29+
EvInferPartitions,
2930
EvInferredFileSchema,
3031

3132
EvArrowFile,
@@ -128,6 +129,14 @@ struct TEvInferFileSchema : public NActors::TEventLocal<TEvInferFileSchema, EvIn
128129
ui64 Size = 0;
129130
};
130131

132+
struct TEvInferPartitions : public NActors::TEventLocal<TEvInferPartitions, EvInferPartitions> {
133+
explicit TEvInferPartitions(std::shared_ptr<arrow::io::RandomAccessFile> file)
134+
: File{std::move(file)}
135+
{}
136+
137+
std::shared_ptr<arrow::io::RandomAccessFile> File;
138+
};
139+
131140
struct TEvInferredFileSchema : public NActors::TEventLocal<TEvInferredFileSchema, EvInferredFileSchema> {
132141
TEvInferredFileSchema(TString path, std::vector<Ydb::Column>&& fields)
133142
: Path{std::move(path)}

ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "arrow_fetcher.h"
22
#include "arrow_inferencinator.h"
3+
#include "infer_config.h"
34

45
#include <arrow/buffer.h>
56
#include <arrow/buffer_builder.h>
@@ -30,6 +31,7 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
3031
TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params)
3132
: S3FetcherId_{s3FetcherId}
3233
, Format_{format}
34+
, Config_{MakeFormatConfig(format, params)}
3335
{
3436
Y_ABORT_UNLESS(IsArrowInferredFormat(Format_));
3537

@@ -72,7 +74,11 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
7274
break;
7375
}
7476
default: {
75-
ctx.Send(localRequest.Requester, MakeError(localRequest.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "unsupported format for inference: " << ConvertFileFormat(Format_)));
77+
ctx.Send(localRequest.Requester, MakeError(
78+
localRequest.Path,
79+
NFq::TIssuesIds::UNSUPPORTED,
80+
TStringBuilder{} << "unsupported format for inference: " << ConvertFileFormat(Format_))
81+
);
7682
return;
7783
}
7884
case EFileFormat::Undefined:
@@ -100,12 +106,7 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
100106
switch (Format_) {
101107
case EFileFormat::CsvWithNames:
102108
case EFileFormat::TsvWithNames: {
103-
// TODO: obtain from request
104-
arrow::csv::ParseOptions options;
105-
if (Format_ == EFileFormat::TsvWithNames) {
106-
options.delimiter = '\t';
107-
}
108-
file = CleanupCsvFile(data, request, options, ctx);
109+
file = CleanupCsvFile(data, request, std::dynamic_pointer_cast<CsvConfig>(Config_)->ParseOpts, ctx);
109110
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
110111
break;
111112
}
@@ -120,7 +121,7 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
120121
}
121122
case EFileFormat::JsonEachRow:
122123
case EFileFormat::JsonList: {
123-
file = CleanupJsonFile(data, request, arrow::json::ParseOptions::Defaults(), ctx);
124+
file = CleanupJsonFile(data, request, std::dynamic_pointer_cast<JsonConfig>(Config_)->ParseOpts, ctx);
124125
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
125126
break;
126127
}
@@ -354,6 +355,7 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
354355
// Fields
355356
NActors::TActorId S3FetcherId_;
356357
EFileFormat Format_;
358+
std::shared_ptr<FormatConfig> Config_;
357359
TMaybe<TString> DecompressionFormat_;
358360
std::unordered_map<TString, TRequest> InflightRequests_; // Path -> Request
359361
};

0 commit comments

Comments
 (0)