Skip to content

Commit 5ad816f

Browse files
committed
external table content has been decoded for viewer
1 parent 427adf7 commit 5ad816f

File tree

7 files changed

+54
-9
lines changed

7 files changed

+54
-9
lines changed

ydb/core/base/appdata.h

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "event_filter.h"
1212

1313
#include <ydb/core/control/immediate_control_board_impl.h>
14+
#include <ydb/core/external_sources/external_source_factory.h>
1415
#include <ydb/core/grpc_services/grpc_helper.h>
1516
#include <ydb/core/protos/auth.pb.h>
1617
#include <ydb/core/protos/cms.pb.h>

ydb/core/base/appdata_fwd.h

+6
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ namespace NYdb {
102102

103103
namespace NKikimr {
104104

105+
namespace NExternalSource {
106+
struct IExternalSourceFactory;
107+
}
108+
105109
namespace NScheme {
106110
class TTypeRegistry;
107111
}
@@ -148,6 +152,8 @@ struct TAppData {
148152
const NDataShard::IExportFactory *DataShardExportFactory = nullptr;
149153
const TFormatFactory* FormatFactory = nullptr;
150154
const NSQS::IEventsWriterFactory* SqsEventsWriterFactory = nullptr;
155+
TIntrusivePtr<NExternalSource::IExternalSourceFactory> ExternalSourceFactory = nullptr;
156+
151157

152158
NSQS::IAuthFactory* SqsAuthFactory = nullptr;
153159

ydb/core/driver_lib/run/run.cpp

+7
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,13 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig)
10041004
AppData->PersQueueGetReadSessionsInfoWorkerFactory = ModuleFactories ? ModuleFactories->PQReadSessionsInfoWorkerFactory.get() : nullptr;
10051005
AppData->IoContextFactory = ModuleFactories ? ModuleFactories->IoContextFactory.get() : nullptr;
10061006

1007+
std::vector<TString> hostnamePatterns;
1008+
if (runConfig.AppConfig.HasQueryServiceConfig()) {
1009+
const auto& patterns = runConfig.AppConfig.GetQueryServiceConfig().GetHostnamePatterns();
1010+
hostnamePatterns = {patterns.begin(), patterns.end()};
1011+
}
1012+
AppData->ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory(hostnamePatterns);
1013+
10071014
AppData->SqsAuthFactory = ModuleFactories
10081015
? ModuleFactories->SqsAuthFactory.get()
10091016
: nullptr;

ydb/core/kqp/host/kqp_host.cpp

+2-3
Original file line numberDiff line numberDiff line change
@@ -1517,8 +1517,8 @@ class TKqpHost : public IKqpHost {
15171517

15181518
auto queryExecutor = MakeIntrusive<TKqpQueryExecutor>(Gateway, Cluster, SessionCtx, KqpRunner);
15191519
auto kikimrDataSource = CreateKikimrDataSource(*FuncRegistry, *TypesCtx, gatewayProxy, SessionCtx,
1520-
ExternalSourceFactory, IsInternalCall);
1521-
auto kikimrDataSink = CreateKikimrDataSink(*FuncRegistry, *TypesCtx, gatewayProxy, SessionCtx, ExternalSourceFactory, queryExecutor);
1520+
AppData()->ExternalSourceFactory, IsInternalCall);
1521+
auto kikimrDataSink = CreateKikimrDataSink(*FuncRegistry, *TypesCtx, gatewayProxy, SessionCtx, AppData()->ExternalSourceFactory, queryExecutor);
15221522

15231523
FillSettings.AllResultsBytesLimit = Nothing();
15241524
FillSettings.RowsLimitPerWrite = SessionCtx->Config()._ResultRowsLimit.Get();
@@ -1649,7 +1649,6 @@ class TKqpHost : public IKqpHost {
16491649

16501650
TIntrusivePtr<TExecuteContext> ExecuteCtx;
16511651
TIntrusivePtr<IKqpRunner> KqpRunner;
1652-
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})};
16531652

16541653
TKqpTempTablesState::TConstPtr TempTablesState;
16551654
NActors::TActorSystem* ActorSystem = nullptr;

ydb/core/tx/schemeshard/schemeshard_impl.cpp

+1-5
Original file line numberDiff line numberDiff line change
@@ -4271,6 +4271,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
42714271
EnableStatistics = appData->FeatureFlags.GetEnableStatistics();
42724272
EnableTablePgTypes = appData->FeatureFlags.GetEnableTablePgTypes();
42734273
EnableServerlessExclusiveDynamicNodes = appData->FeatureFlags.GetEnableServerlessExclusiveDynamicNodes();
4274+
ExternalSourceFactory = appData->ExternalSourceFactory;
42744275

42754276
ConfigureCompactionQueues(appData->CompactionConfig, ctx);
42764277
ConfigureStatsBatching(appData->SchemeShardConfig, ctx);
@@ -6719,11 +6720,6 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi
67196720
LoadTableProfiles(nullptr, ctx);
67206721
}
67216722

6722-
if (appConfig.HasQueryServiceConfig()) {
6723-
const auto& hostnamePatterns = appConfig.GetQueryServiceConfig().GetHostnamePatterns();
6724-
ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory(std::vector<TString>(hostnamePatterns.begin(), hostnamePatterns.end()));
6725-
}
6726-
67276723
if (IsSchemeShardConfigured()) {
67286724
StartStopCompactionQueues();
67296725
}

ydb/core/tx/schemeshard/schemeshard_impl.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ class TSchemeShard
310310
TActorId DelayedInitTenantDestination;
311311
TAutoPtr<TEvSchemeShard::TEvInitTenantSchemeShardResult> DelayedInitTenantReply;
312312

313-
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})};
313+
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory;
314314

315315
THolder<TProposeResponse> IgniteOperation(TProposeRequest& request, TOperationContext& context);
316316
THolder<TEvDataShard::TEvProposeTransaction> MakeDataShardProposal(const TPathId& pathId, const TOperationId& opId,

ydb/core/viewer/json_describe.h

+36
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ class TJsonDescribe : public TViewerPipeClient<TJsonDescribe> {
268268
headers = HTTPFORBIDDENJSON;
269269
}
270270
TProtoToJson::ProtoToJson(json, *DescribeResult, JsonSettings);
271+
DecodeExternalTableContent(json);
271272
} else {
272273
json << "null";
273274
}
@@ -276,6 +277,41 @@ class TJsonDescribe : public TViewerPipeClient<TJsonDescribe> {
276277
PassAway();
277278
}
278279

280+
void DecodeExternalTableContent(TStringStream& json) const {
281+
if (!DescribeResult) {
282+
return;
283+
}
284+
285+
if (!DescribeResult->GetPathDescription().HasExternalTableDescription()) {
286+
return;
287+
}
288+
289+
const auto& content = DescribeResult->GetPathDescription().GetExternalTableDescription().GetContent();
290+
if (!content) {
291+
return;
292+
}
293+
294+
NJson::TJsonValue root;
295+
const auto& sourceType = DescribeResult->GetPathDescription().GetExternalTableDescription().GetSourceType();
296+
try {
297+
NJson::ReadJsonTree(json.Str(), &root);
298+
root["PathDescription"]["ExternalTableDescription"].EraseValue("Content");
299+
auto source = AppData()->ExternalSourceFactory->GetOrCreate(sourceType);
300+
auto parameters = source->GetParameters(content);
301+
for (const auto& [key, items]: parameters) {
302+
NJson::TJsonValue array{NJson::EJsonValueType::JSON_ARRAY};
303+
for (const auto& item: items) {
304+
array.AppendValue(item);
305+
}
306+
root["PathDescription"]["ExternalTableDescription"]["Content"][key] = array;
307+
}
308+
} catch (...) {
309+
BLOG_CRIT("Сan't unpack content for external table: " << sourceType << ", error: " << CurrentExceptionMessage());
310+
}
311+
json.Clear();
312+
json << root;
313+
}
314+
279315
void HandleTimeout() {
280316
Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get()), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
281317
PassAway();

0 commit comments

Comments
 (0)