Skip to content

format setting supported for date type (#11608) #11853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
for (const auto& column: json.GetArray()) {
*objectStorage.add_partitioned_by() = column;
}
} else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "csv_delimiter"sv}, lowerKey)) {
} else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "data.date.format"sv, "csv_delimiter"sv}, lowerKey)) {
objectStorage.mutable_format_setting()->insert({lowerKey, value});
} else {
ythrow TExternalSourceException() << "Unknown attribute " << key;
Expand Down Expand Up @@ -196,7 +196,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
continue;
}

if (IsIn({ "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv}, key)) {
if (IsIn({ "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "data.date.format"sv}, key)) {
continue;
}

Expand Down Expand Up @@ -257,6 +257,10 @@ struct TObjectStorageExternalSource : public IExternalSource {
continue;
}

if (key == "data.date.format"sv) {
continue;
}

if (matchAllSettings) {
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "unknown format setting " + key));
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class TPqDataSourceProvider : public TDataProviderBase {
settings.Add(ctx.NewList(read.Pos(), std::move(pair)));
}

if (topicKeyParser.GetDateFormat()) {
settings.Add(topicKeyParser.GetDateFormat());
}

auto builder = Build<TPqReadTopic>(ctx, read.Pos())
.World(read.World())
.DataSource(read.DataSource())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

TStatus HandleReadTopic(TExprBase input, TExprContext& ctx) {
if (!EnsureMinMaxArgsCount(input.Ref(), 6, 8, ctx)) {
if (!EnsureMinMaxArgsCount(input.Ref(), 6, 9, ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ bool TTopicKeyParser::Parse(const TExprNode& expr, TExprNode::TPtr readSettings,
TimestampFormat = readSettings->Child(i);
continue;
}
if (readSettings->Child(i)->Head().IsAtom("data.date.format")) {
DateFormat = readSettings->Child(i);
continue;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class TTopicKeyParser {
return TimestampFormat;
}

TExprNode::TPtr GetDateFormat() {
return DateFormat;
}

bool Parse(const TExprNode& expr, TExprNode::TPtr readSettings, TExprContext& ctx);

private:
Expand All @@ -60,6 +64,7 @@ class TTopicKeyParser {
TExprNode::TPtr DateTimeFormat;
TExprNode::TPtr TimestampFormatName;
TExprNode::TPtr TimestampFormat;
TExprNode::TPtr DateFormat;
TExprNode::TPtr UserSchema;
TExprNode::TPtr ColumnOrder;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ std::shared_ptr<arrow::Array> ArrowStringAsYqlTimestamp(const std::shared_ptr<ar
return builder.Build(true).make_array();
}

template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) {
template <bool isOptional, typename TArrowType>
std::shared_ptr<arrow::Array> ArrowTypeAsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui16, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
::NYql::NUdf::TFixedSizeBlockReader<i32, isOptional> reader;
::NYql::NUdf::TFixedSizeBlockReader<TArrowType, isOptional> reader;
for (i64 i = 0; i < value->length(); ++i) {
const NUdf::TBlockItem item = reader.GetItem(*value->data(), i);
if constexpr (isOptional) {
Expand All @@ -253,7 +253,7 @@ std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::
throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type");
}

const i32 v = item.As<i32>();
const TArrowType v = item.As<TArrowType>();
if (v < 0 || v > ::NYql::NUdf::MAX_DATE) {
throw parquet::ParquetException(TStringBuilder() << "date in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATE << "]: " << v);
}
Expand All @@ -262,6 +262,31 @@ std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::
return builder.Build(true).make_array();
}

template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowStringAsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value, const NDB::FormatSettings& formatSettings) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui32, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
::NYql::NUdf::TStringBlockReader<arrow::BinaryType, isOptional, NKikimr::NUdf::EDataSlot::String> reader;
for (i64 i = 0; i < value->length(); ++i) {
NUdf::TBlockItem item = reader.GetItem(*value->data(), i);

if constexpr (isOptional) {
if (!item) {
builder.Add(item);
continue;
}
} else if (!item) {
throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type");
}

auto ref = item.AsStringRef();
NDB::ReadBufferFromMemory rb{ref.Data(), ref.Size()};
uint16_t result = 0;
parseImpl<NDB::DataTypeDate>(result, rb, nullptr, formatSettings);
builder.Add(NUdf::TBlockItem(static_cast<ui16>(result)));
}
return builder.Build(true).make_array();
}

TColumnConverter ArrowUInt32AsYqlDatetime(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
return isOptional
Expand Down Expand Up @@ -430,14 +455,62 @@ TColumnConverter ArrowDate32AsYqlString(const std::shared_ptr<arrow::DataType>&
};
}

TColumnConverter ArrowUInt16AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowTypeAsYqlDate<true, ui16>(targetType, value)
: ArrowTypeAsYqlDate<false, ui16>(targetType, value);
};
}

TColumnConverter ArrowInt32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowTypeAsYqlDate<true, i32>(targetType, value)
: ArrowTypeAsYqlDate<false, i32>(targetType, value);
};
}

TColumnConverter ArrowUInt32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowTypeAsYqlDate<true, ui32>(targetType, value)
: ArrowTypeAsYqlDate<false, ui32>(targetType, value);
};
}

TColumnConverter ArrowInt64AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowTypeAsYqlDate<true, i64>(targetType, value)
: ArrowTypeAsYqlDate<false, i64>(targetType, value);
};
}

TColumnConverter ArrowUInt64AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowTypeAsYqlDate<true, ui64>(targetType, value)
: ArrowTypeAsYqlDate<false, ui64>(targetType, value);
};
}

TColumnConverter ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, arrow::DateUnit unit) {
if (unit == arrow::DateUnit::MILLI) {
throw parquet::ParquetException(TStringBuilder() << "millisecond accuracy does not fit into the date");
}
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowDate32AsYqlDate<true>(targetType, value)
: ArrowDate32AsYqlDate<false>(targetType, value);
? ArrowTypeAsYqlDate<true, i32>(targetType, value)
: ArrowTypeAsYqlDate<false, i32>(targetType, value);
};
}

TColumnConverter ArrowStringAsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, const NDB::FormatSettings& formatSettings) {
return [targetType, isOptional, formatSettings](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowStringAsYqlDate<true>(targetType, value, formatSettings)
: ArrowStringAsYqlDate<false>(targetType, value, formatSettings);
};
}

Expand All @@ -457,6 +530,8 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
switch (originalType->id()) {
case arrow::Type::UINT16: {
switch (slotItem) {
case NUdf::EDataSlot::Date:
return ArrowUInt16AsYqlDate(targetType, isOptional);
case NUdf::EDataSlot::Datetime:
return ArrowUInt16AsYqlDatetime(targetType, isOptional);
case NUdf::EDataSlot::Timestamp:
Expand All @@ -467,6 +542,8 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
}
case arrow::Type::INT32: {
switch (slotItem) {
case NUdf::EDataSlot::Date:
return ArrowInt32AsYqlDate(targetType, isOptional);
case NUdf::EDataSlot::Datetime:
return ArrowInt32AsYqlDatetime(targetType, isOptional);
case NUdf::EDataSlot::Timestamp:
Expand All @@ -477,6 +554,8 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
}
case arrow::Type::UINT32: {
switch (slotItem) {
case NUdf::EDataSlot::Date:
return ArrowUInt32AsYqlDate(targetType, isOptional);
case NUdf::EDataSlot::Datetime:
return ArrowUInt32AsYqlDatetime(targetType, isOptional);
case NUdf::EDataSlot::Timestamp:
Expand All @@ -487,6 +566,8 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
}
case arrow::Type::INT64: {
switch (slotItem) {
case NUdf::EDataSlot::Date:
return ArrowInt64AsYqlDate(targetType, isOptional);
case NUdf::EDataSlot::Datetime:
return ArrowInt64AsYqlDatetime(targetType, isOptional);
case NUdf::EDataSlot::Timestamp:
Expand All @@ -497,6 +578,8 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
}
case arrow::Type::UINT64: {
switch (slotItem) {
case NUdf::EDataSlot::Date:
return ArrowUInt64AsYqlDate(targetType, isOptional);
case NUdf::EDataSlot::Datetime:
return ArrowUInt64AsYqlDatetime(targetType, isOptional);
case NUdf::EDataSlot::Timestamp:
Expand Down Expand Up @@ -553,6 +636,8 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
case arrow::Type::STRING:
case arrow::Type::BINARY: {
switch (slotItem) {
case NUdf::EDataSlot::Date:
return ArrowStringAsYqlDate(targetType, isOptional, formatSettings);
case NUdf::EDataSlot::Datetime:
return ArrowStringAsYqlDateTime(targetType, isOptional, formatSettings);
case NUdf::EDataSlot::Timestamp:
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2247,6 +2247,10 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
readSpec->Settings.timestamp_format = it->second;
}

if (const auto it = settings.find("data.date.format"); settings.cend() != it) {
readSpec->Settings.date_format = it->second;
}

if (readSpec->Settings.date_time_format_name == NDB::FormatSettings::DateTimeFormat::Unspecified && readSpec->Settings.date_time_format.empty()) {
readSpec->Settings.date_time_format_name = NDB::FormatSettings::DateTimeFormat::POSIX;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
return true;
}

if (name == "data.date.format") {
const auto& value = setting.Tail();
if (!EnsureAtom(value, ctx)) {
return false;
}

return true;
}

if (name == "csvdelimiter") {
const auto& value = setting.Tail();
if (!EnsureAtom(value, ctx)) {
Expand All @@ -292,7 +301,7 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
return true;
};

if (!EnsureValidSettings(*input->Child(TS3Target::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "data.datetime.formatname", "data.datetime.format", "data.timestamp.formatname", "data.timestamp.format", "csvdelimiter", "filepattern"}, validator, ctx)) {
if (!EnsureValidSettings(*input->Child(TS3Target::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "data.datetime.formatname", "data.datetime.format", "data.timestamp.formatname", "data.timestamp.format", "data.date.format", "csvdelimiter", "filepattern"}, validator, ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,14 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
return true;
}

if (name == "data.date.format"sv) {
TStringBuf unused;
if (!ExtractSettingValue(setting.Tail(), "data.date.format"sv, format, {}, ctx, unused)) {
return false;
}
return true;
}

if (name == "readmaxbytes"sv) {
TStringBuf unused;
if (!ExtractSettingValue(setting.Tail(), "read_max_bytes"sv, format, "raw"sv, ctx, unused)) {
Expand Down Expand Up @@ -785,7 +793,7 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
};
if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings),
{ "compression"sv, "partitionedby"sv, "projection"sv, "data.interval.unit"sv, "constraints"sv,
"data.datetime.formatname"sv, "data.datetime.format"sv, "data.timestamp.formatname"sv, "data.timestamp.format"sv,
"data.datetime.formatname"sv, "data.datetime.format"sv, "data.timestamp.formatname"sv, "data.timestamp.format"sv, "data.date.format"sv,
"readmaxbytes"sv, "csvdelimiter"sv, "directories"sv, "filepattern"sv, "pathpattern"sv, "pathpatternvariant"sv }, validator, ctx))
{
return TStatus::Error;
Expand Down
8 changes: 8 additions & 0 deletions ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ TExprNode::TPtr GetTimestampFormat(const TExprNode& settings) {
return GetSetting(settings, "data.timestamp.format"sv);
}

TExprNode::TPtr GetDateFormat(const TExprNode& settings) {
return GetSetting(settings, "data.date.format"sv);
}

TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) {
if (partBy) {
auto children = partBy->ChildrenList();
Expand Down Expand Up @@ -171,6 +175,10 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
sinkOutputSettingsBuilder.Add(ctx.NewList(target.Pos(), std::move(pair)));
}

if (auto dateFormat = GetDateFormat(settings)) {
sinkOutputSettingsBuilder.Add(std::move(dateFormat));
}

const TStringBuf format = target.Format();
if (format != "raw" && format != "json_list") { // multipart
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,11 @@ NDB::FormatSettings GetFormatSettings(const std::string_view& view) {
auto format = json["data.timestamp.format"].getString();
settings.timestamp_format = format;
}

if (json.has("data.date.format")) {
auto format = json["data.date.format"].getString();
settings.date_format = format;
}
}
return settings;
}
Expand Down
Loading
Loading