Skip to content

Commit 7f55a3d

Browse files
authored
Merge 491cc27 into 262bf50
2 parents 262bf50 + 491cc27 commit 7f55a3d

16 files changed

+463
-339
lines changed

ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
#include <ydb/library/yql/providers/generic/proto/source.pb.h>
1717
#include <ydb/library/yql/providers/generic/connector/libcpp/error.h>
1818
#include <ydb/library/yql/providers/generic/connector/libcpp/utils.h>
19-
#include <ydb/library/yql/providers/generic/proto/range.pb.h>
2019
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
2120
#include <yql/essentials/public/udf/arrow/util.h>
2221
#include <yql/essentials/utils/log/log.h>

ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,19 @@ namespace NYql::NDq {
1313
auto readActorFactory = [credentialsFactory, genericClient](
1414
Generic::TSource&& settings,
1515
IDqAsyncIoFactory::TSourceArguments&& args) {
16-
return CreateGenericReadActor(genericClient, std::move(settings), args.InputIndex, args.StatsLevel,
17-
args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory);
16+
return CreateGenericReadActor(
17+
genericClient,
18+
std::move(settings),
19+
args.InputIndex,
20+
args.StatsLevel,
21+
args.SecureParams,
22+
args.TaskId,
23+
args.TaskParams,
24+
args.ReadRanges,
25+
args.ComputeActorId,
26+
credentialsFactory,
27+
args.HolderFactory
28+
);
1829
};
1930

2031
auto lookupActorFactory = [credentialsFactory, genericClient](NYql::Generic::TLookupSource&& lookupSource, IDqAsyncIoFactory::TLookupSourceArguments&& args) {

ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp

Lines changed: 46 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "yql_generic_read_actor.h"
33
#include "yql_generic_token_provider.h"
44

5+
#include <util/string/join.h>
56
#include <ydb/library/actors/core/actor_bootstrapped.h>
67
#include <ydb/library/actors/core/actorsystem.h>
78
#include <ydb/library/actors/core/event_local.h>
@@ -46,11 +47,13 @@ namespace NYql::NDq {
4647
TGenericTokenProvider::TPtr tokenProvider,
4748
Generic::TSource&& source,
4849
const NActors::TActorId& computeActorId,
49-
const NKikimr::NMiniKQL::THolderFactory& holderFactory)
50+
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
51+
TVector<TString>&& splitDescriptions)
5052
: InputIndex_(inputIndex)
5153
, ComputeActorId_(computeActorId)
5254
, Client_(std::move(client))
5355
, TokenProvider_(std::move(tokenProvider))
56+
, SplitDescriptions_(std::move(splitDescriptions))
5457
, HolderFactory_(holderFactory)
5558
, Source_(source)
5659
{
@@ -59,7 +62,7 @@ namespace NYql::NDq {
5962

6063
void Bootstrap() {
6164
Become(&TGenericReadActor::StateFunc);
62-
auto issue = InitSplitsListing();
65+
auto issue = InitSplitsReading();
6366
if (issue) {
6467
return NotifyComputeActorWithIssue(
6568
TActivationContext::ActorSystem(),
@@ -72,145 +75,43 @@ namespace NYql::NDq {
7275
static constexpr char ActorName[] = "GENERIC_READ_ACTOR";
7376

7477
private:
75-
// TODO: make two different states
7678
// clang-format off
7779
STRICT_STFUNC(StateFunc,
78-
hFunc(TEvListSplitsIterator, Handle);
79-
hFunc(TEvListSplitsPart, Handle);
80-
hFunc(TEvListSplitsFinished, Handle);
8180
hFunc(TEvReadSplitsIterator, Handle);
8281
hFunc(TEvReadSplitsPart, Handle);
8382
hFunc(TEvReadSplitsFinished, Handle);
8483
)
8584
// clang-format on
8685

87-
// ListSplits
88-
89-
TMaybe<TIssue> InitSplitsListing() {
90-
YQL_CLOG(DEBUG, ProviderGeneric) << "Start splits listing";
91-
92-
// Prepare request
93-
NConnector::NApi::TListSplitsRequest request;
94-
NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source
95-
96-
auto error = TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance());
97-
if (error) {
98-
return TIssue(error);
99-
}
100-
101-
*request.mutable_selects()->Add() = std::move(select);
102-
103-
// Initialize stream
104-
Client_->ListSplits(request).Subscribe(
105-
[actorSystem = TActivationContext::ActorSystem(),
106-
selfId = SelfId(),
107-
computeActorId = ComputeActorId_,
108-
inputIndex = InputIndex_](
109-
const NConnector::TListSplitsStreamIteratorAsyncResult& future) {
110-
AwaitIterator<
111-
NConnector::TListSplitsStreamIteratorAsyncResult,
112-
TEvListSplitsIterator>(
113-
actorSystem, selfId, computeActorId, inputIndex, future);
114-
});
115-
116-
return Nothing();
117-
}
118-
119-
void Handle(TEvListSplitsIterator::TPtr& ev) {
120-
ListSplitsIterator_ = std::move(ev->Get()->Iterator);
121-
122-
AwaitNextStreamItem<NConnector::IListSplitsStreamIterator,
123-
TEvListSplitsPart,
124-
TEvListSplitsFinished>(ListSplitsIterator_);
125-
}
126-
127-
void Handle(TEvListSplitsPart::TPtr& ev) {
128-
auto& response = ev->Get()->Response;
129-
YQL_CLOG(TRACE, ProviderGeneric) << "Handle :: EvListSplitsPart :: event handling started"
130-
<< ": splits_size=" << response.splits().size();
131-
132-
if (!NConnector::IsSuccess(response)) {
133-
return NotifyComputeActorWithError(
134-
TActivationContext::ActorSystem(),
135-
ComputeActorId_,
136-
InputIndex_,
137-
response.error());
138-
}
139-
140-
// Save splits for the further usage
141-
Splits_.insert(
142-
Splits_.end(),
143-
std::move_iterator(response.mutable_splits()->begin()),
144-
std::move_iterator(response.mutable_splits()->end()));
145-
146-
// ask for next stream message
147-
AwaitNextStreamItem<NConnector::IListSplitsStreamIterator,
148-
TEvListSplitsPart,
149-
TEvListSplitsFinished>(ListSplitsIterator_);
150-
151-
YQL_CLOG(TRACE, ProviderGeneric) << "Handle :: EvListSplitsPart :: event handling finished";
152-
}
153-
154-
void Handle(TEvListSplitsFinished::TPtr& ev) {
155-
const auto& status = ev->Get()->Status;
156-
157-
YQL_CLOG(TRACE, ProviderGeneric) << "Handle :: EvListSplitsFinished :: event handling started: ";
158-
159-
// Server sent EOF, now we are ready to start splits reading
160-
if (NConnector::GrpcStatusEndOfStream(status)) {
161-
YQL_CLOG(DEBUG, ProviderGeneric) << "Handle :: EvListSplitsFinished :: last message was reached, start data reading";
162-
auto issue = InitSplitsReading();
163-
if (issue) {
164-
return NotifyComputeActorWithIssue(
165-
TActivationContext::ActorSystem(),
166-
ComputeActorId_,
167-
InputIndex_,
168-
std::move(*issue));
169-
}
170-
171-
return;
172-
}
173-
174-
// Server temporary failure
175-
if (NConnector::GrpcStatusNeedsRetry(status)) {
176-
YQL_CLOG(WARN, ProviderGeneric) << "Handle :: EvListSplitsFinished :: you should retry your operation due to '"
177-
<< status.ToDebugString() << "' error";
178-
// TODO: retry
179-
}
180-
181-
return NotifyComputeActorWithError(
182-
TActivationContext::ActorSystem(),
183-
ComputeActorId_,
184-
InputIndex_,
185-
NConnector::ErrorFromGRPCStatus(status));
186-
}
187-
18886
// ReadSplits
18987
TMaybe<TIssue> InitSplitsReading() {
19088
YQL_CLOG(DEBUG, ProviderGeneric) << "Start splits reading";
19189

192-
if (Splits_.empty()) {
90+
if (SplitDescriptions_.empty()) {
19391
YQL_CLOG(WARN, ProviderGeneric) << "Accumulated empty list of splits";
19492
ReadSplitsFinished_ = true;
19593
NotifyComputeActorWithData();
19694
return Nothing();
19795
}
19896

199-
// Prepare request
97+
// Prepare ReadSplits request. For the sake of simplicity,
98+
// all the splits will be packed into a single ReadSplits call.
20099
NConnector::NApi::TReadSplitsRequest request;
201100
request.set_format(NConnector::NApi::TReadSplitsRequest::ARROW_IPC_STREAMING);
202101
request.set_filtering(NConnector::NApi::TReadSplitsRequest::FILTERING_OPTIONAL);
203-
request.mutable_splits()->Reserve(Splits_.size());
102+
request.mutable_splits()->Reserve(SplitDescriptions_.size());
204103

205-
for (const auto& split : Splits_) {
206-
NConnector::NApi::TSplit splitCopy = split;
104+
for (const auto& splitDescription : SplitDescriptions_) {
105+
NConnector::NApi::TSplit split;
106+
split.mutable_select()->CopyFrom(Source_.select());
107+
split.set_description(splitDescription);
207108

208-
auto error = TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance());
109+
auto error = TokenProvider_->MaybeFillToken(*split.mutable_select()->mutable_data_source_instance());
209110
if (error) {
210111
return TIssue(std::move(error));
211112
}
212113

213-
*request.mutable_splits()->Add() = std::move(splitCopy);
114+
*request.mutable_splits()->Add() = std::move(split);
214115
}
215116

216117
// Start streaming
@@ -471,8 +372,9 @@ namespace NYql::NDq {
471372

472373
NConnector::IClient::TPtr Client_;
473374
TGenericTokenProvider::TPtr TokenProvider_;
474-
NConnector::IListSplitsStreamIterator::TPtr ListSplitsIterator_;
475-
TVector<NConnector::NApi::TSplit> Splits_; // accumulated list of table splits
375+
376+
const TVector<TString> SplitDescriptions_;
377+
476378
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator_;
477379
std::optional<NConnector::NApi::TReadSplitsResponse> LastReadSplitsResponse_;
478380
bool ReadSplitsFinished_ = false;
@@ -482,24 +384,48 @@ namespace NYql::NDq {
482384
Generic::TSource Source_;
483385
};
484386

387+
void ExtractSplitDescriptions(
388+
TVector<TString>& splitDescriptions,
389+
const THashMap<TString, TString>& taskParams, // ranges are here in v1
390+
const TVector<TString>& srcReadRanges // ranges are here in v2
391+
) {
392+
if (srcReadRanges.size() > 0) {
393+
splitDescriptions = srcReadRanges;
394+
} else {
395+
const auto& range = taskParams.find(GenericProviderName);
396+
if (range != taskParams.end()) {
397+
splitDescriptions.push_back(range->second);
398+
}
399+
}
400+
401+
Y_ENSURE(splitDescriptions.size() > 0, "read ranges must not be empty");
402+
}
403+
485404
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*>
486405
CreateGenericReadActor(NConnector::IClient::TPtr genericClient,
487406
Generic::TSource&& source,
488407
ui64 inputIndex,
489408
TCollectStatsLevel statsLevel,
490409
const THashMap<TString, TString>& /*secureParams*/,
491-
const THashMap<TString, TString>& /*taskParams*/,
410+
const ui64 taskId,
411+
const THashMap<TString, TString>& taskParams,
412+
const TVector<TString>& readRanges,
492413
const NActors::TActorId& computeActorId,
493414
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
494415
const NKikimr::NMiniKQL::THolderFactory& holderFactory)
495416
{
417+
TVector<TString> splitDescriptions;
418+
ExtractSplitDescriptions(splitDescriptions, taskParams, readRanges);
419+
496420
const auto dsi = source.select().data_source_instance();
497421
YQL_CLOG(INFO, ProviderGeneric) << "Creating read actor with params:"
498422
<< " kind=" << NYql::EGenericDataSourceKind_Name(dsi.kind())
499423
<< ", endpoint=" << dsi.endpoint().ShortDebugString()
500424
<< ", database=" << dsi.database()
501425
<< ", use_tls=" << ToString(dsi.use_tls())
502-
<< ", protocol=" << NYql::EGenericProtocol_Name(dsi.protocol());
426+
<< ", protocol=" << NYql::EGenericProtocol_Name(dsi.protocol())
427+
<< ", taskId=" << taskId
428+
<< ", splitDescriptions=" << JoinSeq(",", splitDescriptions);
503429

504430
// FIXME: strange piece of logic - authToken is created but not used:
505431
// https://a.yandex-team.ru/arcadia/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp?rev=r11550199#L140
@@ -512,20 +438,6 @@ namespace NYql::NDq {
512438
YQL_ENSURE(one != TString::npos && two != TString::npos && one < two, "Bad token format:" << token);
513439
*/
514440

515-
// Obtain token to access remote data source if necessary
516-
// TODO: partitioning is not implemented now, but this code will be useful for the further research:
517-
/*
518-
TStringBuilder part;
519-
if (const auto taskParamsIt = taskParams.find(GenericProviderName); taskParamsIt != taskParams.cend()) {
520-
Generic::TRange range;
521-
TStringInput input(taskParamsIt->second);
522-
range.Load(&input);
523-
if (const auto& r = range.GetRange(); !r.empty())
524-
part << ' ' << r;
525-
}
526-
part << ';';
527-
*/
528-
529441
auto tokenProvider = CreateGenericTokenProvider(
530442
source.GetToken(),
531443
source.GetServiceAccountId(),
@@ -539,7 +451,8 @@ namespace NYql::NDq {
539451
std::move(tokenProvider),
540452
std::move(source),
541453
computeActorId,
542-
holderFactory);
454+
holderFactory,
455+
std::move(splitDescriptions));
543456

544457
return {actor, actor};
545458
}

ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,17 @@
99
namespace NYql::NDq {
1010

1111
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*>
12-
CreateGenericReadActor(NConnector::IClient::TPtr genericClient, Generic::TSource&& params, ui64 inputIndex,
13-
TCollectStatsLevel statsLevel, const THashMap<TString, TString>& secureParams,
14-
const THashMap<TString, TString>& taskParams, const NActors::TActorId& computeActorId,
15-
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
16-
const NKikimr::NMiniKQL::THolderFactory& holderFactory);
12+
CreateGenericReadActor(
13+
NConnector::IClient::TPtr genericClient,
14+
Generic::TSource&& params,
15+
ui64 inputIndex,
16+
TCollectStatsLevel statsLevel,
17+
const THashMap<TString, TString>& secureParams,
18+
const ui64 taskId,
19+
const THashMap<TString, TString>& taskParams,
20+
const TVector<TString>& readRanges,
21+
const NActors::TActorId& computeActorId,
22+
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
23+
const NKikimr::NMiniKQL::THolderFactory& holderFactory);
1724

1825
} // namespace NYql::NDq

ydb/library/yql/providers/generic/connector/libcpp/client.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ namespace NYql::NConnector {
2020
std::shared_ptr<TStreamer<TResponse>> Streamer_;
2121
};
2222

23+
TListSplitsStreamIteratorDrainer::TPtr MakeListSplitsStreamIteratorDrainer(IListSplitsStreamIterator::TPtr&& iterator) {
24+
return std::make_shared<TListSplitsStreamIteratorDrainer>(std::move(iterator));
25+
}
26+
27+
TReadSplitsStreamIteratorDrainer::TPtr MakeReadSplitsStreamIteratorDrainer(IReadSplitsStreamIterator::TPtr&& iterator) {
28+
return std::make_shared<TReadSplitsStreamIteratorDrainer>(std::move(iterator));
29+
}
30+
2331
class TClientGRPC: public IClient {
2432
public:
2533
TClientGRPC() = delete;
@@ -83,7 +91,7 @@ namespace NYql::NConnector {
8391
typename NYdbGrpc::TSimpleRequestProcessor<NApi::Connector::Stub, TRequest, TResponse>::TAsyncRequest rpc, TDuration timeout = {}) {
8492
auto context = GrpcClient_->CreateContext();
8593
if (!context) {
86-
throw yexception() << "Client is being shutted down";
94+
throw yexception() << "Client is being shutdown";
8795
}
8896

8997
auto promise = NThreading::NewPromise<TResult<TResponse>>();
@@ -113,7 +121,7 @@ namespace NYql::NConnector {
113121

114122
auto context = GrpcClient_->CreateContext();
115123
if (!context) {
116-
throw yexception() << "Client is being shutted down";
124+
throw yexception() << "Client is being shutdown";
117125
}
118126

119127
GrpcConnection_->DoStreamRequest<TRequest, TResponse>(

0 commit comments

Comments
 (0)