Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
return result;
}

//Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread
// Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread
class TCallLookupActor: public TActorBootstrapped<TCallLookupActor> {
public:
TCallLookupActor(
Expand Down Expand Up @@ -135,7 +135,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
.AddResponse(
MakeRecordBatch(
MakeArray<arrow::UInt64Builder, uint64_t>("id", {0, 1, 2}, arrow::uint64()),
MakeArray<arrow::UInt64Builder, uint64_t>("optional_id", {100, 101, 103}, arrow::uint64()), //the last value is intentially wrong
MakeArray<arrow::UInt64Builder, uint64_t>("optional_id", {100, 101, 103}, arrow::uint64()), // the last value is intentially wrong
MakeArray<arrow::StringBuilder, std::string>("string_value", {"a", "b", "c"}, arrow::utf8())
),
NewSuccess()
Expand Down Expand Up @@ -184,7 +184,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
request.emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
}

guard.Release(); //let actors use alloc
guard.Release(); // let actors use alloc

auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(request));
runtime.Register(callLookupActor);
Expand Down Expand Up @@ -213,4 +213,4 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
}
}

} //Y_UNIT_TEST_SUITE(GenericProviderLookupActor)
} // Y_UNIT_TEST_SUITE(GenericProviderLookupActor)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace NYql::NDq {

template <typename TDerived>
class TGenericBaseActor: public NActors::TActorBootstrapped<TDerived> {
protected: //Events
protected: // Events
// Event ids
enum EEventIds: ui32 {
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
Expand Down Expand Up @@ -89,7 +89,7 @@ namespace NYql::NDq {
NConnector::NApi::TError Error;
};

protected: //TODO move common logic here
protected: // TODO move common logic here
};

} // namespace NYql::NDq
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ namespace NYql::NDq {

template <typename T>
T ExtractFromConstFuture(const NThreading::TFuture<T>& f) {
//We want to avoid making a copy of data stored in a future.
//But there is no direct way to extract data from a const future5
//So, we make a copy of the future, that is cheap. Then, extract the value from this copy.
//It destructs the value in the original future, but this trick is legal and documented here:
//https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency
// We want to avoid making a copy of data stored in a future.
// But there is no direct way to extract data from a const future5
// So, we make a copy of the future, that is cheap. Then, extract the value from this copy.
// It destructs the value in the original future, but this trick is legal and documented here:
// https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency
return NThreading::TFuture<T>(f).ExtractValueSync();
}

Expand Down Expand Up @@ -112,7 +112,7 @@ namespace NYql::NDq {

static constexpr char ActorName[] = "GENERIC_PROVIDER_LOOKUP_ACTOR";

private: //IDqAsyncLookupSource
private: // IDqAsyncLookupSource
size_t GetMaxSupportedKeysInRequest() const override {
return MaxKeysInRequest;
}
Expand All @@ -121,7 +121,7 @@ namespace NYql::NDq {
CreateRequest(std::move(request));
}

private: //events
private: // events
STRICT_STFUNC(StateFunc,
hFunc(TEvListSplitsIterator, Handle);
hFunc(TEvListSplitsPart, Handle);
Expand Down Expand Up @@ -238,7 +238,7 @@ namespace NYql::NDq {
void ProcessReceivedData(const NConnector::NApi::TReadSplitsResponse& resp) {
Y_ABORT_UNLESS(resp.payload_case() == NConnector::NApi::TReadSplitsResponse::PayloadCase::kArrowIpcStreaming);
auto guard = Guard(*Alloc);
NKikimr::NArrow::NSerialization::TSerializerContainer deser = NKikimr::NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer(); //todo move to class' member
NKikimr::NArrow::NSerialization::TSerializerContainer deser = NKikimr::NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer(); // todo move to class' member
const auto& data = deser->Deserialize(resp.arrow_ipc_streaming());
Y_ABORT_UNLESS(data.ok());
const auto& value = data.ValueOrDie();
Expand All @@ -259,7 +259,7 @@ namespace NYql::NDq {
(ColumnDestinations[j].first == EColumnDestination::Key ? keyItems : outputItems)[ColumnDestinations[j].second] = columns[j][i];
}
if (auto* v = Request.FindPtr(key)) {
*v = std::move(output); //duplicates will be overwritten
*v = std::move(output); // duplicates will be overwritten
}
}
}
Expand Down Expand Up @@ -316,8 +316,8 @@ namespace NYql::NDq {

NYql::NConnector::NApi::TDataSourceInstance GetDataSourceInstanceWithToken() const {
auto dsi = LookupSource.data_source_instance();
//Note: returned token may be stale and we have no way to check or recover here
//Consider to redesign ICredentialsProvider
// Note: returned token may be stale and we have no way to check or recover here
// Consider to redesign ICredentialsProvider
TokenProvider->MaybeFillToken(dsi);
return dsi;
}
Expand Down Expand Up @@ -361,13 +361,13 @@ namespace NYql::NDq {
const NYql::Generic::TLookupSource LookupSource;
const NKikimr::NMiniKQL::TStructType* const KeyType;
const NKikimr::NMiniKQL::TStructType* const PayloadType;
const NKikimr::NMiniKQL::TStructType* const SelectResultType; //columns from KeyType + PayloadType
const NKikimr::NMiniKQL::TStructType* const SelectResultType; // columns from KeyType + PayloadType
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
const std::vector<std::pair<EColumnDestination, size_t>> ColumnDestinations;
const size_t MaxKeysInRequest;
std::atomic_bool InProgress;
IDqAsyncLookupSource::TUnboxedValueMap Request;
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; //TODO move me to TEvReadSplitsPart
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ namespace NYql::NDq {

template <typename T>
T ExtractFromConstFuture(const NThreading::TFuture<T>& f) {
//We want to avoid making a copy of data stored in a future.
//But there is no direct way to extract data from a const future
//So, we make a copy of the future, that is cheap. Then, extract the value from this copy.
//It destructs the value in the original future, but this trick is legal and documented here:
//https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency
// We want to avoid making a copy of data stored in a future.
// But there is no direct way to extract data from a const future
// So, we make a copy of the future, that is cheap. Then, extract the value from this copy.
// It destructs the value in the original future, but this trick is legal and documented here:
// https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency
return NThreading::TFuture<T>(f).ExtractValueSync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ namespace NYql::NDq {
}
return std::make_unique<TGenericTokenProvider>();
}
} //namespace NYql::NDq
} // namespace NYql::NDq
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace NYql::NDq {
class TGenericTokenProvider {
public:
using TPtr = std::unique_ptr<TGenericTokenProvider>;
TGenericTokenProvider() = default; //No auth required
TGenericTokenProvider() = default; // No auth required
TGenericTokenProvider(const TString& staticIamToken);
TGenericTokenProvider(
const TString& serviceAccountId,
Expand All @@ -31,4 +31,4 @@ namespace NYql::NDq {
const TString& staticIamToken,
const TString& serviceAccountId, const TString& ServiceAccountIdSignature,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);
} //namespace NYql::NDq
} // namespace NYql::NDq
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ namespace NYql::NTestCreds {
}
};

} //namespace NYql::NTestCreds
} // namespace NYql::NTestCreds
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ namespace NYql {
THashMap<TString, TGenericClusterConfig> ClusterNamesToClusterConfigs; // cluster name -> cluster config
THashMap<TString, TVector<TString>> DatabaseIdsToClusterNames; // database id -> cluster name
};
} //namespace NYql
} // namespace NYql