-
Notifications
You must be signed in to change notification settings - Fork 694
Show create table (Column-oriented tables) #16509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -612,10 +612,11 @@ bool TCreateTableFormatter::Format(const TFamilyDescription& familyDesc) { | |
compression = "off"; | ||
break; | ||
case NKikimrSchemeOp::ColumnCodecLZ4: | ||
compression = "lz4"; | ||
compression = "lz4"; | ||
break; | ||
case NKikimrSchemeOp::ColumnCodecZSTD: | ||
ythrow TFormatFail(Ydb::StatusIds::UNSUPPORTED, "ZSTD COMPRESSION codec is not supported"); | ||
compression = "zstd"; | ||
break; | ||
} | ||
} else if (familyDesc.HasCodec()) { | ||
if (familyDesc.GetCodec() == 1) { | ||
|
@@ -643,7 +644,7 @@ bool TCreateTableFormatter::Format(const TFamilyDescription& familyDesc) { | |
del = ", "; | ||
} | ||
|
||
if (dataName) { | ||
if (compression) { | ||
Stream << del << "COMPRESSION = " << "\"" << compression << "\""; | ||
} | ||
|
||
|
@@ -884,5 +885,312 @@ bool TCreateTableFormatter::Format(const Ydb::Table::TtlSettings& ttlSettings, T | |
return true; | ||
} | ||
|
||
TCreateTableFormatter::TResult TCreateTableFormatter::Format(const TString& tablePath, const TColumnTableDescription& tableDesc, bool temporary) { | ||
Stream.Clear(); | ||
|
||
TStringStreamWrapper wrapper(Stream); | ||
|
||
Ydb::Table::CreateTableRequest createRequest; | ||
if (temporary) { | ||
Stream << "CREATE TEMPORARY TABLE "; | ||
} else { | ||
Stream << "CREATE TABLE "; | ||
} | ||
EscapeName(tablePath); | ||
Stream << " (\n"; | ||
|
||
const auto& schema = tableDesc.GetSchema(); | ||
|
||
std::map<ui32, const TOlapColumnDescription*> columns; | ||
for (const auto& column : schema.GetColumns()) { | ||
columns[column.GetId()] = &column; | ||
} | ||
|
||
try { | ||
auto it = columns.cbegin(); | ||
Format(*it->second); | ||
std::advance(it, 1); | ||
for (; it != columns.end(); ++it) { | ||
Stream << ",\n"; | ||
Format(*it->second); | ||
} | ||
} catch (const TFormatFail& ex) { | ||
return TResult(ex.Status, ex.Error); | ||
} catch (const yexception& e) { | ||
return TResult(Ydb::StatusIds::UNSUPPORTED, e.what()); | ||
} | ||
Stream << ",\n"; | ||
|
||
if (!schema.GetIndexes().empty()) { | ||
return TResult(Ydb::StatusIds::UNSUPPORTED, "Indexes are not supported yet for column tables."); | ||
} | ||
|
||
bool isFamilyPrinted = false; | ||
if (!schema.GetColumnFamilies().empty()) { | ||
try { | ||
isFamilyPrinted = Format(schema.GetColumnFamilies(0)); | ||
for (int i = 1; i < schema.GetColumnFamilies().size(); i++) { | ||
if (isFamilyPrinted) { | ||
Stream << ",\n"; | ||
} | ||
isFamilyPrinted = Format(schema.GetColumnFamilies(i)); | ||
} | ||
} catch (const TFormatFail& ex) { | ||
return TResult(ex.Status, ex.Error); | ||
} catch (const yexception& e) { | ||
return TResult(Ydb::StatusIds::UNSUPPORTED, e.what()); | ||
} | ||
} | ||
|
||
Y_ENSURE(!schema.GetKeyColumnNames().empty()); | ||
if (isFamilyPrinted) { | ||
Stream << ",\n"; | ||
} | ||
Stream << "\tPRIMARY KEY ("; | ||
EscapeName(schema.GetKeyColumnNames(0)); | ||
for (int i = 1; i < schema.GetKeyColumnNames().size(); i++) { | ||
Stream << ", "; | ||
EscapeName(schema.GetKeyColumnNames(i)); | ||
} | ||
Stream << ")\n"; | ||
Stream << ") "; | ||
|
||
if (schema.HasOptions()) { | ||
const auto& options = schema.GetOptions(); | ||
if (options.GetSchemeNeedActualization()) { | ||
return TResult(Ydb::StatusIds::UNSUPPORTED, "Unsupported setting: SCHEME_NEED_ACTUALIZATION"); | ||
} | ||
if (options.HasScanReaderPolicyName() && !options.GetScanReaderPolicyName().empty()) { | ||
return TResult(Ydb::StatusIds::UNSUPPORTED, "Unsupported setting: SCAN_READER_POLICY_NAME"); | ||
} | ||
if (options.HasCompactionPlannerConstructor()) { | ||
return TResult(Ydb::StatusIds::UNSUPPORTED, "Unsupported setting: COMPACTION_PLANNER"); | ||
} | ||
if (options.HasMetadataManagerConstructor()) { | ||
return TResult(Ydb::StatusIds::UNSUPPORTED, "Unsupported setting: METADATA_MEMORY_MANAGER"); | ||
} | ||
} | ||
|
||
if (tableDesc.HasSharding()) { | ||
Format(tableDesc.GetSharding()); | ||
} | ||
|
||
Stream << "WITH (\n"; | ||
Stream << "\tSTORE = COLUMN"; | ||
|
||
if (tableDesc.HasColumnShardCount()) { | ||
Stream << ",\n"; | ||
Stream << "\tAUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << tableDesc.GetColumnShardCount(); | ||
} | ||
|
||
if (tableDesc.HasTtlSettings()) { | ||
Format(tableDesc.GetTtlSettings()); | ||
} | ||
|
||
Stream << "\n);"; | ||
|
||
TString statement = Stream.Str(); | ||
TString formattedStatement; | ||
NYql::TIssues issues; | ||
if (!NYdb::NDump::Format(statement, formattedStatement, issues)) { | ||
return TResult(Ydb::StatusIds::INTERNAL_ERROR, issues.ToString()); | ||
} | ||
|
||
auto result = TResult(std::move(formattedStatement)); | ||
|
||
return result; | ||
} | ||
|
||
void TCreateTableFormatter::Format(const TOlapColumnDescription& olapColumnDesc) { | ||
Stream << "\t"; | ||
EscapeName(olapColumnDesc.GetName()); | ||
Stream << " " << olapColumnDesc.GetType(); | ||
|
||
if (olapColumnDesc.HasColumnFamilyName()) { | ||
Stream << " FAMILY "; | ||
EscapeName(olapColumnDesc.GetColumnFamilyName()); | ||
} | ||
if (olapColumnDesc.GetNotNull()) { | ||
Stream << " NOT NULL"; | ||
} | ||
if (olapColumnDesc.HasDefaultValue()) { | ||
Format(olapColumnDesc.GetDefaultValue()); | ||
} | ||
|
||
if (olapColumnDesc.HasStorageId() && !olapColumnDesc.GetStorageId().empty()) { | ||
ythrow TFormatFail(Ydb::StatusIds::UNSUPPORTED, "Unsupported setting: STORAGE_ID"); | ||
} | ||
|
||
if (olapColumnDesc.HasDataAccessorConstructor()) { | ||
ythrow TFormatFail(Ydb::StatusIds::UNSUPPORTED, "Unsupported setting: DATA_ACCESSOR_CONSTRUCTOR"); | ||
} | ||
|
||
if (olapColumnDesc.HasDictionaryEncoding()) { | ||
ythrow TFormatFail(Ydb::StatusIds::UNSUPPORTED, "Unsupported setting: ENCODING.DICTIONARY"); | ||
} | ||
} | ||
|
||
void TCreateTableFormatter::Format(const NKikimrColumnShardColumnDefaults::TColumnDefault& defaultValue) { | ||
if (!defaultValue.HasScalar()) { | ||
return; | ||
} | ||
|
||
Stream << " DEFAULT "; | ||
|
||
TGuard<NMiniKQL::TScopedAlloc> guard(Alloc); | ||
const auto& scalar = defaultValue.GetScalar(); | ||
if (scalar.HasBool()) { | ||
if (scalar.GetBool() == true) { | ||
Stream << "true"; | ||
} else { | ||
Stream << "false"; | ||
} | ||
} else if (scalar.HasUint8()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. А точно нету готовых ToString scalar? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Мне кажется нет, во всяком случае не находил, кажется что этот протобуф только для дефолтов используется |
||
const NUdf::TUnboxedValue str = NMiniKQL::ValueToString(NUdf::EDataSlot::Uint8, NUdf::TUnboxedValuePod(scalar.GetUint8())); | ||
Y_ENSURE(str.HasValue()); | ||
Stream << TString(str.AsStringRef()); | ||
} else if (scalar.HasUint16()) { | ||
const NUdf::TUnboxedValue str = NMiniKQL::ValueToString(NUdf::EDataSlot::Uint16, NUdf::TUnboxedValuePod(scalar.GetUint16())); | ||
Y_ENSURE(str.HasValue()); | ||
Stream << TString(str.AsStringRef()); | ||
} else if (scalar.HasUint32()) { | ||
const NUdf::TUnboxedValue str = NMiniKQL::ValueToString(NUdf::EDataSlot::Uint32, NUdf::TUnboxedValuePod(scalar.GetUint32())); | ||
Y_ENSURE(str.HasValue()); | ||
Stream << TString(str.AsStringRef()); | ||
} else if (scalar.HasUint64()) { | ||
const NUdf::TUnboxedValue str = NMiniKQL::ValueToString(NUdf::EDataSlot::Uint64, NUdf::TUnboxedValuePod(static_cast<ui64>(scalar.GetUint64()))); | ||
Y_ENSURE(str.HasValue()); | ||
Stream << TString(str.AsStringRef()); | ||
} else if (scalar.HasInt8()) { | ||
const NUdf::TUnboxedValue str = NMiniKQL::ValueToString(NUdf::EDataSlot::Int8, NUdf::TUnboxedValuePod(scalar.GetInt8())); | ||
Y_ENSURE(str.HasValue()); | ||
Stream << TString(str.AsStringRef()); | ||
} else if (scalar.HasInt16()) { | ||
const NUdf::TUnboxedValue str = NMiniKQL::ValueToString(NUdf::EDataSlot::Int16, NUdf::TUnboxedValuePod(scalar.GetInt16())); | ||
Y_ENSURE(str.HasValue()); | ||
Stream << TString(str.AsStringRef()); | ||
} else if (scalar.HasInt32()) { | ||
const NUdf::TUnboxedValue str = NMiniKQL::ValueToString(NUdf::EDataSlot::Int32, NUdf::TUnboxedValuePod(scalar.GetInt32())); | ||
Y_ENSURE(str.HasValue()); | ||
Stream << TString(str.AsStringRef()); | ||
} else if (scalar.HasInt64()) { | ||
const NUdf::TUnboxedValue str = NMiniKQL::ValueToString(NUdf::EDataSlot::Int64, NUdf::TUnboxedValuePod(static_cast<i64>(scalar.GetInt64()))); | ||
Y_ENSURE(str.HasValue()); | ||
Stream << TString(str.AsStringRef()); | ||
} else if (scalar.HasDouble()) { | ||
const NUdf::TUnboxedValue str = NMiniKQL::ValueToString(NUdf::EDataSlot::Double, NUdf::TUnboxedValuePod(scalar.GetDouble())); | ||
Y_ENSURE(str.HasValue()); | ||
Stream << TString(str.AsStringRef()); | ||
} else if (scalar.HasFloat()) { | ||
const NUdf::TUnboxedValue str = NMiniKQL::ValueToString(NUdf::EDataSlot::Float, NUdf::TUnboxedValuePod(scalar.GetFloat())); | ||
Y_ENSURE(str.HasValue()); | ||
Stream << TString(str.AsStringRef()); | ||
} else if (scalar.HasTimestamp()) { | ||
ui64 value = scalar.GetTimestamp().GetValue(); | ||
arrow::TimeUnit::type unit = arrow::TimeUnit::type(scalar.GetTimestamp().GetUnit()); | ||
switch (unit) { | ||
case arrow::TimeUnit::SECOND: | ||
value *= 1000000; | ||
break; | ||
case arrow::TimeUnit::MILLI: | ||
value *= 1000; | ||
break; | ||
case arrow::TimeUnit::MICRO: | ||
break; | ||
case arrow::TimeUnit::NANO: | ||
value /= 1000; | ||
break; | ||
} | ||
Stream << "TIMESTAMP("; | ||
const NUdf::TUnboxedValue str = NMiniKQL::ValueToString(NUdf::EDataSlot::Timestamp, NUdf::TUnboxedValuePod(value)); | ||
Y_ENSURE(str.HasValue()); | ||
EscapeString(TString(str.AsStringRef())); | ||
Stream << ")"; | ||
} else if (scalar.HasString()) { | ||
EscapeString(TString(scalar.GetString())); | ||
} else { | ||
ythrow TFormatFail(Ydb::StatusIds::UNSUPPORTED, "Unsupported type for default value"); | ||
} | ||
} | ||
|
||
void TCreateTableFormatter::Format(const NKikimrSchemeOp::TColumnTableSharding& sharding) { | ||
switch (sharding.GetMethodCase()) { | ||
case NKikimrSchemeOp::TColumnTableSharding::kHashSharding: { | ||
const auto& hashSharding = sharding.GetHashSharding(); | ||
Y_ENSURE(!hashSharding.GetColumns().empty()); | ||
Stream << "PARTITION BY HASH("; | ||
EscapeName(hashSharding.GetColumns(0)); | ||
for (int i = 1; i < hashSharding.GetColumns().size(); i++) { | ||
Stream << ", "; | ||
EscapeName(hashSharding.GetColumns(i)); | ||
} | ||
Stream << ")\n"; | ||
break; | ||
} | ||
case NKikimrSchemeOp::TColumnTableSharding::kRandomSharding: | ||
ythrow TFormatFail(Ydb::StatusIds::UNSUPPORTED, "Random sharding is not supported yet."); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Боюсь что когда его добавят RandomSharding, то точно сломаю SHOW CREATE TABLE. Может тест добавить на этот случай чтобы пошли чинить его после реализации? |
||
default: | ||
ythrow TFormatFail(Ydb::StatusIds::INTERNAL_ERROR, "Unsupported unit"); | ||
} | ||
} | ||
|
||
void TCreateTableFormatter::Format(const NKikimrSchemeOp::TColumnDataLifeCycle& ttlSettings) { | ||
if (!ttlSettings.HasEnabled()) { | ||
return; | ||
} | ||
|
||
const auto& enabled = ttlSettings.GetEnabled(); | ||
|
||
if (enabled.HasExpireAfterBytes()) { | ||
ythrow TFormatFail(Ydb::StatusIds::UNSUPPORTED, "TTL by size is not supported."); | ||
} | ||
|
||
Stream << ",\n"; | ||
Stream << "\tTTL =\n\t "; | ||
bool first = true; | ||
|
||
if (!enabled.TiersSize()) { | ||
Y_ENSURE(enabled.HasExpireAfterSeconds()); | ||
Format(enabled.GetExpireAfterSeconds()); | ||
} else { | ||
for (const auto& tier : enabled.GetTiers()) { | ||
if (!first) { | ||
Stream << ", "; | ||
} | ||
switch (tier.GetActionCase()) { | ||
case NKikimrSchemeOp::TTTLSettings::TTier::ActionCase::kDelete: | ||
Format(tier.GetApplyAfterSeconds()); | ||
break; | ||
case NKikimrSchemeOp::TTTLSettings::TTier::ActionCase::kEvictToExternalStorage: | ||
Format(tier.GetApplyAfterSeconds(), tier.GetEvictToExternalStorage().GetStorage()); | ||
break; | ||
case NKikimrSchemeOp::TTTLSettings::TTier::ActionCase::ACTION_NOT_SET: | ||
ythrow TFormatFail(Ydb::StatusIds::UNSUPPORTED, "Undefined tier action"); | ||
} | ||
first = false; | ||
} | ||
} | ||
|
||
Stream << "\n\t ON " << enabled.GetColumnName(); | ||
switch (enabled.GetColumnUnit()) { | ||
case NKikimrSchemeOp::TTTLSettings::UNIT_AUTO: | ||
break; | ||
case NKikimrSchemeOp::TTTLSettings::UNIT_SECONDS: | ||
Stream << " AS SECONDS"; | ||
break; | ||
case NKikimrSchemeOp::TTTLSettings::UNIT_MILLISECONDS: | ||
Stream << " AS MILLISECONDS"; | ||
break; | ||
case NKikimrSchemeOp::TTTLSettings::UNIT_MICROSECONDS: | ||
Stream << " AS MICROSECONDS"; | ||
break; | ||
case NKikimrSchemeOp::TTTLSettings::UNIT_NANOSECONDS: | ||
Stream << " AS NANOSECONDS"; | ||
break; | ||
default: | ||
ythrow TFormatFail(Ydb::StatusIds::INTERNAL_ERROR, "Unsupported unit"); | ||
} | ||
} | ||
|
||
} // NSysView | ||
} // NKikimr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
У YQL уже есть форматер, возможно ли его применить?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Уже применяется)