|
5 | 5 |
|
6 | 6 | #include <library/cpp/string_utils/csv/csv.h> |
7 | 7 |
|
| 8 | +#include <regex> |
| 9 | + |
8 | 10 | namespace NYdb { |
9 | 11 | namespace NConsoleClient { |
10 | 12 | namespace { |
@@ -289,6 +291,120 @@ class TCsvToYdbConverter { |
289 | 291 | } |
290 | 292 | } |
291 | 293 |
|
| 294 | + template <class T> |
| 295 | + bool TryParseArithmetic(const TString& token) const { |
| 296 | + size_t cnt; |
| 297 | + try { |
| 298 | + auto value = StringToArithmetic<T>(token, cnt); |
| 299 | + if (cnt != token.size() || value < std::numeric_limits<T>::lowest() || value > std::numeric_limits<T>::max()) { |
| 300 | + return false; |
| 301 | + } |
| 302 | + } catch (std::exception& e) { |
| 303 | + return false; |
| 304 | + } |
| 305 | + return true; |
| 306 | + } |
| 307 | + |
| 308 | + bool TryParseBool(const TString& token) const { |
| 309 | + TString tokenLowerCase = to_lower(token); |
| 310 | + return tokenLowerCase == "true" || tokenLowerCase == "false"; |
| 311 | + } |
| 312 | + |
| 313 | + bool TryParsePrimitive(const TString& token) { |
| 314 | + switch (Parser.GetPrimitive()) { |
| 315 | + case EPrimitiveType::Uint8: |
| 316 | + return TryParseArithmetic<ui8>(token) && !token.StartsWith('-'); |
| 317 | + case EPrimitiveType::Uint16: |
| 318 | + return TryParseArithmetic<ui16>(token) && !token.StartsWith('-');; |
| 319 | + case EPrimitiveType::Uint32: |
| 320 | + return TryParseArithmetic<ui32>(token) && !token.StartsWith('-');; |
| 321 | + case EPrimitiveType::Uint64: |
| 322 | + return TryParseArithmetic<ui64>(token) && !token.StartsWith('-');; |
| 323 | + case EPrimitiveType::Int8: |
| 324 | + return TryParseArithmetic<i8>(token); |
| 325 | + case EPrimitiveType::Int16: |
| 326 | + return TryParseArithmetic<i16>(token); |
| 327 | + case EPrimitiveType::Int32: |
| 328 | + return TryParseArithmetic<i32>(token); |
| 329 | + case EPrimitiveType::Int64: |
| 330 | + return TryParseArithmetic<i64>(token); |
| 331 | + case EPrimitiveType::Bool: |
| 332 | + return TryParseBool(token); |
| 333 | + case EPrimitiveType::Json: |
| 334 | + return token.StartsWith('{') && token.EndsWith('}'); |
| 335 | + break; |
| 336 | + case EPrimitiveType::JsonDocument: |
| 337 | + break; |
| 338 | + case EPrimitiveType::Yson: |
| 339 | + break; |
| 340 | + case EPrimitiveType::Uuid: |
| 341 | + static std::regex uuidRegexTemplate("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"); |
| 342 | + return std::regex_match(token.c_str(), uuidRegexTemplate); |
| 343 | + case EPrimitiveType::Float: |
| 344 | + return TryParseArithmetic<float>(token); |
| 345 | + case EPrimitiveType::Double: |
| 346 | + return TryParseArithmetic<double>(token); |
| 347 | + case EPrimitiveType::DyNumber: |
| 348 | + break; |
| 349 | + case EPrimitiveType::Date: { |
| 350 | + TInstant date; |
| 351 | + return TInstant::TryParseIso8601(token, date) && token.length() <= 10; |
| 352 | + } |
| 353 | + case EPrimitiveType::Datetime: { |
| 354 | + TInstant datetime; |
| 355 | + return TInstant::TryParseIso8601(token, datetime) && token.length() <= 19; |
| 356 | + } |
| 357 | + case EPrimitiveType::Timestamp: { |
| 358 | + TInstant timestamp; |
| 359 | + return TInstant::TryParseIso8601(token, timestamp) || TryParseArithmetic<ui64>(token); |
| 360 | + } |
| 361 | + case EPrimitiveType::Interval: |
| 362 | + break; |
| 363 | + case EPrimitiveType::Date32: { |
| 364 | + TInstant date; |
| 365 | + return TInstant::TryParseIso8601(token, date) || TryParseArithmetic<i32>(token); |
| 366 | + } |
| 367 | + case EPrimitiveType::Datetime64: { |
| 368 | + TInstant date; |
| 369 | + return TInstant::TryParseIso8601(token, date) || TryParseArithmetic<i64>(token); |
| 370 | + } |
| 371 | + case EPrimitiveType::Timestamp64: { |
| 372 | + TInstant date; |
| 373 | + return TInstant::TryParseIso8601(token, date) || TryParseArithmetic<i64>(token); |
| 374 | + } |
| 375 | + case EPrimitiveType::Interval64: |
| 376 | + return TryParseArithmetic<i64>(token); |
| 377 | + case EPrimitiveType::TzDate: |
| 378 | + break; |
| 379 | + case EPrimitiveType::TzDatetime: |
| 380 | + break; |
| 381 | + case EPrimitiveType::TzTimestamp: |
| 382 | + break; |
| 383 | + default: |
| 384 | + throw TCsvParseException() << "Unsupported primitive type: " << Parser.GetPrimitive(); |
| 385 | + } |
| 386 | + return false; |
| 387 | + } |
| 388 | + |
| 389 | + bool TryParseValue(const TStringBuf& token, TPossibleType& possibleType) { |
| 390 | + if (NullValue && token == NullValue) { |
| 391 | + possibleType.SetHasNulls(true); |
| 392 | + return true; |
| 393 | + } |
| 394 | + possibleType.SetHasNonNulls(true); |
| 395 | + switch (Parser.GetKind()) { |
| 396 | + case TTypeParser::ETypeKind::Primitive: { |
| 397 | + return TryParsePrimitive(TString(token)); |
| 398 | + } |
| 399 | + case TTypeParser::ETypeKind::Decimal: { |
| 400 | + break; |
| 401 | + } |
| 402 | + default: |
| 403 | + throw TCsvParseException() << "Unsupported type kind: " << Parser.GetKind(); |
| 404 | + } |
| 405 | + return false; |
| 406 | + } |
| 407 | + |
292 | 408 | TValue Convert(const TStringBuf& token) { |
293 | 409 | BuildValue(token); |
294 | 410 | return Builder.Build(); |
@@ -330,6 +446,16 @@ TValue FieldToValue(TTypeParser& parser, |
330 | 446 | } |
331 | 447 | } |
332 | 448 |
|
| 449 | +bool TryParse(TTypeParser& parser, const TStringBuf& token, const std::optional<TString>& nullValue, TPossibleType& possibleType) { |
| 450 | + try { |
| 451 | + TCsvToYdbConverter converter(parser, nullValue); |
| 452 | + return converter.TryParseValue(token, possibleType); |
| 453 | + } catch (std::exception& e) { |
| 454 | + Cerr << "UNEXPECTED EXCEPTION: " << e.what() << Endl; |
| 455 | + return false; |
| 456 | + } |
| 457 | +} |
| 458 | + |
333 | 459 | TStringBuf Consume(NCsvFormat::CsvSplitter& splitter, |
334 | 460 | const TCsvParser::TParseMetadata& meta, |
335 | 461 | const TString& columnName) { |
@@ -489,6 +615,151 @@ void TCsvParser::BuildLineType() { |
489 | 615 | ResultLineType = builder.Build(); |
490 | 616 | ResultListType = TTypeBuilder().List(ResultLineType.value()).Build(); |
491 | 617 | } |
| 618 | +namespace { |
| 619 | +static const std::vector<TType> availableTypes = { |
| 620 | + TTypeBuilder().Primitive(EPrimitiveType::Bool).Build(), |
| 621 | + TTypeBuilder().Primitive(EPrimitiveType::Uint64).Build(), |
| 622 | + TTypeBuilder().Primitive(EPrimitiveType::Int64).Build(), |
| 623 | + TTypeBuilder().Primitive(EPrimitiveType::Double).Build(), |
| 624 | + TTypeBuilder().Primitive(EPrimitiveType::Date).Build(), |
| 625 | + TTypeBuilder().Primitive(EPrimitiveType::Datetime).Build(), |
| 626 | + TTypeBuilder().Primitive(EPrimitiveType::Timestamp).Build(), |
| 627 | + TTypeBuilder().Primitive(EPrimitiveType::Json).Build(), |
| 628 | + TTypeBuilder().Primitive(EPrimitiveType::Uuid).Build(), |
| 629 | +}; |
| 630 | + |
| 631 | +static const auto availableTypesEnd = availableTypes.end(); |
| 632 | + |
| 633 | +} // namespace |
| 634 | + |
| 635 | +TPossibleType::TPossibleType() { |
| 636 | + CurrentType = availableTypes.begin(); |
| 637 | +} |
| 638 | + |
| 639 | +TPossibleType::TPossibleType(std::vector<TType>::const_iterator currentType) |
| 640 | +: CurrentType(currentType) |
| 641 | +{ |
| 642 | +} |
| 643 | + |
| 644 | +void TPossibleType::SetIterator(const std::vector<TType>::const_iterator& newIterator) { |
| 645 | + CurrentType = newIterator; |
| 646 | +} |
| 647 | + |
| 648 | +std::vector<TType>::const_iterator& TPossibleType::GetIterator() { |
| 649 | + return CurrentType; |
| 650 | +} |
| 651 | + |
| 652 | +const std::vector<TType>::const_iterator& TPossibleType::GetAvailableTypesEnd() { |
| 653 | + return availableTypesEnd; |
| 654 | +} |
| 655 | + |
| 656 | +void TPossibleType::SetHasNulls(bool hasNulls) { |
| 657 | + HasNulls = hasNulls; |
| 658 | +} |
| 659 | + |
| 660 | +bool TPossibleType::GetHasNulls() const { |
| 661 | + return HasNulls; |
| 662 | +} |
| 663 | + |
| 664 | +void TPossibleType::SetHasNonNulls(bool hasNonNulls) { |
| 665 | + HasNonNulls = hasNonNulls; |
| 666 | +} |
| 667 | + |
| 668 | +bool TPossibleType::GetHasNonNulls() const { |
| 669 | + return HasNonNulls; |
| 670 | +} |
| 671 | + |
| 672 | +TPossibleTypes::TPossibleTypes(size_t size) { |
| 673 | + ColumnPossibleTypes.resize(size); |
| 674 | +} |
| 675 | + |
| 676 | +TPossibleTypes::TPossibleTypes(std::vector<TPossibleType>& currentColumnTypes) |
| 677 | +: ColumnPossibleTypes(currentColumnTypes) |
| 678 | +{ |
| 679 | +} |
| 680 | + |
| 681 | +// Pass this copy to a worker to parse his chunk of data with it to merge it later back into this main chunk |
| 682 | +TPossibleTypes TPossibleTypes::GetCopy() { |
| 683 | + std::shared_lock<std::shared_mutex> ReadLock(Lock); |
| 684 | + return TPossibleTypes(ColumnPossibleTypes); |
| 685 | +} |
| 686 | + |
| 687 | +// Merge this main chunk with another chunk that parsed a CSV batch and maybe dismissed some types |
| 688 | +void TPossibleTypes::MergeWith(TPossibleTypes& newTypes) { |
| 689 | + auto newTypesVec = newTypes.GetColumnPossibleTypes(); |
| 690 | + { |
| 691 | + std::shared_lock<std::shared_mutex> ReadLock(Lock); |
| 692 | + bool changed = false; |
| 693 | + for (size_t i = 0; i < ColumnPossibleTypes.size(); ++i) { |
| 694 | + auto& currentPossibleType = ColumnPossibleTypes[i]; |
| 695 | + auto& newPossibleType = newTypesVec[i]; |
| 696 | + auto& currentIt = currentPossibleType.GetIterator(); |
| 697 | + const auto& newIt = newPossibleType.GetIterator(); |
| 698 | + if (newIt > currentIt) { |
| 699 | + changed = true; |
| 700 | + break; |
| 701 | + } |
| 702 | + if (currentPossibleType.GetHasNulls() != newPossibleType.GetHasNulls() |
| 703 | + || currentPossibleType.GetHasNonNulls() != newPossibleType.GetHasNonNulls()) { |
| 704 | + changed = true; |
| 705 | + break; |
| 706 | + } |
| 707 | + } |
| 708 | + if (!changed) { |
| 709 | + return; |
| 710 | + } |
| 711 | + } |
| 712 | + std::unique_lock<std::shared_mutex> WriteLock(Lock); |
| 713 | + for (size_t i = 0; i < ColumnPossibleTypes.size(); ++i) { |
| 714 | + auto& currentPossibleType = ColumnPossibleTypes[i]; |
| 715 | + auto& newPossibleType = newTypesVec[i]; |
| 716 | + const auto& newIt = newPossibleType.GetIterator(); |
| 717 | + if (newIt > currentPossibleType.GetIterator()) { |
| 718 | + currentPossibleType.SetIterator(newIt); |
| 719 | + } |
| 720 | + if (newPossibleType.GetHasNulls()) { |
| 721 | + currentPossibleType.SetHasNulls(true); |
| 722 | + } |
| 723 | + if (newPossibleType.GetHasNonNulls()) { |
| 724 | + currentPossibleType.SetHasNonNulls(true); |
| 725 | + } |
| 726 | + } |
| 727 | +} |
| 728 | + |
| 729 | +std::vector<TPossibleType>& TPossibleTypes::GetColumnPossibleTypes() { |
| 730 | + return ColumnPossibleTypes; |
| 731 | +} |
| 732 | + |
| 733 | +void TCsvParser::ParseLineTypes(TString& line, TPossibleTypes& possibleTypes, const TParseMetadata& meta) { |
| 734 | + NCsvFormat::CsvSplitter splitter(line, Delimeter); |
| 735 | + auto headerIt = Header.cbegin(); |
| 736 | + auto typesIt = possibleTypes.GetColumnPossibleTypes().begin(); |
| 737 | + do { |
| 738 | + if (headerIt == Header.cend()) { |
| 739 | + throw FormatError(yexception() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta); |
| 740 | + } |
| 741 | + TStringBuf token = Consume(splitter, meta, *headerIt); |
| 742 | + TPossibleType& possibleType = *typesIt; |
| 743 | + auto& typeIt = possibleType.GetIterator(); |
| 744 | + while (typeIt != availableTypesEnd) { |
| 745 | + TTypeParser typeParser(*typeIt); |
| 746 | + if (TryParse(typeParser, token, NullValue, possibleType)) { |
| 747 | + break; |
| 748 | + } |
| 749 | + ++typeIt; |
| 750 | + } |
| 751 | + ++headerIt; |
| 752 | + ++typesIt; |
| 753 | + } while (splitter.Step()); |
| 754 | + |
| 755 | + if (headerIt != Header.cend()) { |
| 756 | + throw FormatError(yexception() << "Header contains more fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta); |
| 757 | + } |
| 758 | +} |
| 759 | + |
| 760 | +const TVector<TString>& TCsvParser::GetHeader() { |
| 761 | + return Header; |
| 762 | +} |
492 | 763 |
|
493 | 764 | } |
494 | 765 | } |
0 commit comments