|
| 1 | +#include "schemeshard__operation_common_external_table.h" |
| 2 | + |
| 3 | +#include <utility> |
| 4 | + |
| 5 | +namespace NKikimr::NSchemeShard::NExternalTable { |
| 6 | + |
| 7 | +constexpr uint32_t MAX_FIELD_SIZE = 1000; |
| 8 | +constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB |
| 9 | + |
| 10 | +bool ValidateSourceType(const TString& sourceType, TString& errStr) { |
| 11 | + // Only object storage supported today |
| 12 | + if (sourceType != "ObjectStorage") { |
| 13 | + errStr = "Only ObjectStorage source type supported but got " + sourceType; |
| 14 | + return false; |
| 15 | + } |
| 16 | + return true; |
| 17 | +} |
| 18 | + |
| 19 | +bool ValidateLocation(const TString& location, TString& errStr) { |
| 20 | + if (!location) { |
| 21 | + errStr = "Location must not be empty"; |
| 22 | + return false; |
| 23 | + } |
| 24 | + if (location.Size() > MAX_FIELD_SIZE) { |
| 25 | + errStr = Sprintf("Maximum length of location must be less or equal equal to %u but got %lu", MAX_FIELD_SIZE, location.Size()); |
| 26 | + return false; |
| 27 | + } |
| 28 | + return true; |
| 29 | +} |
| 30 | + |
| 31 | +bool ValidateContent(const TString& content, TString& errStr) { |
| 32 | + if (content.Size() > MAX_PROTOBUF_SIZE) { |
| 33 | + errStr = Sprintf("Maximum size of content must be less or equal equal to %u but got %lu", MAX_PROTOBUF_SIZE, content.Size()); |
| 34 | + return false; |
| 35 | + } |
| 36 | + return true; |
| 37 | +} |
| 38 | + |
| 39 | +bool ValidateDataSourcePath(const TString& dataSourcePath, TString& errStr) { |
| 40 | + if (!dataSourcePath) { |
| 41 | + errStr = "Data source path must not be empty"; |
| 42 | + return false; |
| 43 | + } |
| 44 | + return true; |
| 45 | +} |
| 46 | + |
| 47 | +bool Validate(const TString& sourceType, const NKikimrSchemeOp::TExternalTableDescription& desc, TString& errStr) { |
| 48 | + return ValidateSourceType(sourceType, errStr) |
| 49 | + && ValidateLocation(desc.GetLocation(), errStr) |
| 50 | + && ValidateContent(desc.GetContent(), errStr) |
| 51 | + && ValidateDataSourcePath(desc.GetDataSourcePath(), errStr); |
| 52 | +} |
| 53 | + |
| 54 | +Ydb::Type CreateYdbType(const NScheme::TTypeInfo& typeInfo, bool notNull) { |
| 55 | + Ydb::Type ydbType; |
| 56 | + if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) { |
| 57 | + auto* typeDesc = typeInfo.GetTypeDesc(); |
| 58 | + auto* pg = ydbType.mutable_pg_type(); |
| 59 | + pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc)); |
| 60 | + pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc)); |
| 61 | + } else { |
| 62 | + auto& item = notNull |
| 63 | + ? ydbType |
| 64 | + : *ydbType.mutable_optional_type()->mutable_item(); |
| 65 | + item.set_type_id(static_cast<Ydb::Type::PrimitiveTypeId>(typeInfo.GetTypeId())); |
| 66 | + } |
| 67 | + return ydbType; |
| 68 | +} |
| 69 | + |
| 70 | +std::pair<TExternalTableInfo::TPtr, TMaybe<TString>> CreateExternalTable( |
| 71 | + const TString& sourceType, |
| 72 | + const NKikimrSchemeOp::TExternalTableDescription& desc, |
| 73 | + const NExternalSource::IExternalSourceFactory::TPtr& factory, |
| 74 | + ui64 alterVersion) { |
| 75 | + TString errStr; |
| 76 | + |
| 77 | + if (!desc.ColumnsSize()) { |
| 78 | + errStr = "The schema must have at least one column"; |
| 79 | + return std::make_pair(nullptr, errStr); |
| 80 | + } |
| 81 | + |
| 82 | + TExternalTableInfo::TPtr externalTableInfo = new TExternalTableInfo; |
| 83 | + const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; |
| 84 | + |
| 85 | + if (desc.GetSourceType() != "General") { |
| 86 | + errStr = "Only general data source has been supported as request"; |
| 87 | + return std::make_pair(nullptr, errStr); |
| 88 | + } |
| 89 | + |
| 90 | + externalTableInfo->DataSourcePath = desc.GetDataSourcePath(); |
| 91 | + externalTableInfo->Location = desc.GetLocation(); |
| 92 | + externalTableInfo->AlterVersion = alterVersion; |
| 93 | + externalTableInfo->SourceType = sourceType; |
| 94 | + |
| 95 | + NKikimrExternalSources::TSchema schema; |
| 96 | + uint64_t nextColumnId = 1; |
| 97 | + for (const auto& col : desc.GetColumns()) { |
| 98 | + TString colName = col.GetName(); |
| 99 | + |
| 100 | + if (!colName) { |
| 101 | + errStr = "Columns cannot have an empty name"; |
| 102 | + return std::make_pair(nullptr, errStr); |
| 103 | + } |
| 104 | + |
| 105 | + if (col.HasTypeId()) { |
| 106 | + errStr = TStringBuilder() << "Cannot set TypeId for column '" << colName << "', use Type"; |
| 107 | + return std::make_pair(nullptr, errStr); |
| 108 | + } |
| 109 | + |
| 110 | + if (!col.HasType()) { |
| 111 | + errStr = TStringBuilder() << "Missing Type for column '" << colName << "'"; |
| 112 | + return std::make_pair(nullptr, errStr); |
| 113 | + } |
| 114 | + |
| 115 | + auto typeName = NMiniKQL::AdaptLegacyYqlType(col.GetType()); |
| 116 | + const NScheme::IType* type = typeRegistry->GetType(typeName); |
| 117 | + |
| 118 | + NScheme::TTypeInfo typeInfo; |
| 119 | + if (type) { |
| 120 | + // Only allow YQL types |
| 121 | + if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) { |
| 122 | + errStr = Sprintf("Type '%s' specified for column '%s' is no longer supported", col.GetType().data(), colName.data()); |
| 123 | + return std::make_pair(nullptr, errStr); |
| 124 | + } |
| 125 | + typeInfo = NScheme::TTypeInfo(type->GetTypeId()); |
| 126 | + } else { |
| 127 | + auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName); |
| 128 | + if (!typeDesc) { |
| 129 | + errStr = Sprintf("Type '%s' specified for column '%s' is not supported by storage", col.GetType().data(), colName.data()); |
| 130 | + return std::make_pair(nullptr, errStr); |
| 131 | + } |
| 132 | + typeInfo = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc); |
| 133 | + } |
| 134 | + |
| 135 | + ui32 colId = col.HasId() ? col.GetId() : nextColumnId; |
| 136 | + if (externalTableInfo->Columns.contains(colId)) { |
| 137 | + errStr = Sprintf("Duplicate column id: %" PRIu32, colId); |
| 138 | + return std::make_pair(nullptr, errStr); |
| 139 | + } |
| 140 | + |
| 141 | + nextColumnId = colId + 1 > nextColumnId ? colId + 1 : nextColumnId; |
| 142 | + |
| 143 | + TTableInfo::TColumn& column = externalTableInfo->Columns[colId]; |
| 144 | + column = TTableInfo::TColumn(colName, colId, typeInfo, "", col.GetNotNull()); // TODO: do we need typeMod here? |
| 145 | + |
| 146 | + auto& schemaColumn= *schema.add_column(); |
| 147 | + schemaColumn.set_name(colName); |
| 148 | + *schemaColumn.mutable_type() = CreateYdbType(typeInfo, col.GetNotNull()); |
| 149 | + } |
| 150 | + |
| 151 | + try { |
| 152 | + NKikimrExternalSources::TGeneral general; |
| 153 | + general.ParseFromStringOrThrow(desc.GetContent()); |
| 154 | + const auto source = factory->GetOrCreate(sourceType); |
| 155 | + if (!source->HasExternalTable()) { |
| 156 | + errStr = TStringBuilder{} << "External table isn't supported for " << sourceType; |
| 157 | + return std::make_pair(nullptr, errStr); |
| 158 | + } |
| 159 | + externalTableInfo->Content = source->Pack(schema, general); |
| 160 | + } catch (...) { |
| 161 | + errStr = CurrentExceptionMessage(); |
| 162 | + return std::make_pair(nullptr, errStr); |
| 163 | + } |
| 164 | + |
| 165 | + return std::make_pair(externalTableInfo, Nothing()); |
| 166 | +} |
| 167 | + |
| 168 | + |
| 169 | +} // namespace NKikimr::NSchemeShard::NExternalDataSource |
0 commit comments