11#include " restore_import_data.h"
22
33#include < ydb/public/lib/ydb_cli/common/retry_func.h>
4+ #include < ydb/public/lib/ydb_cli/dump/util/log.h>
45#include < ydb/public/lib/ydb_cli/dump/util/util.h>
56
67#include < library/cpp/string_utils/quote/quote.h>
@@ -662,29 +663,29 @@ class TTableRows {
662663 return false ;
663664 }
664665
665- TString GetData (ui64 memLimit, ui64 batchSize, bool force = false ) {
666+ TString GetData (ui64 memLimit, ui64 batchSize, bool force = false ) {
666667 Y_ENSURE (HasData (memLimit, batchSize, force));
667668 Y_ENSURE (!ByMemSize.empty ());
668669 Y_ENSURE (!ByRecordsSize.empty ());
669670 Y_ENSURE (!ByMemSize.begin ()->second .empty ());
670671 Y_ENSURE (!ByRecordsSize.begin ()->second .empty ());
671672
672673 auto get = [this , batchSize](TRowsBy& from) {
673- auto it = from.begin ()->second .begin ();
674- auto & rows = (*it) ->second ;
674+ auto it = * from.begin ()->second .begin ();
675+ auto & rows = it ->second ;
675676
676- RemoveFromSizeTracker (ByMemSize, rows.MemSize (), * it);
677- RemoveFromSizeTracker (ByRecordsSize, rows.RecordsSize (), * it);
677+ RemoveFromSizeTracker (ByMemSize, rows.MemSize (), it);
678+ RemoveFromSizeTracker (ByRecordsSize, rows.RecordsSize (), it);
678679
679680 MemSize -= rows.MemSize ();
680681 auto ret = rows.Serialize (batchSize);
681682 MemSize += rows.MemSize ();
682683
683684 if (rows.MemSize ()) {
684- Y_ENSURE (ByMemSize[rows.MemSize ()].insert (* it).second );
685+ Y_ENSURE (ByMemSize[rows.MemSize ()].insert (it).second );
685686 }
686687 if (rows.RecordsSize ()) {
687- Y_ENSURE (ByRecordsSize[rows.RecordsSize ()].insert (* it).second );
688+ Y_ENSURE (ByRecordsSize[rows.RecordsSize ()].insert (it).second );
688689 }
689690
690691 return ret;
@@ -739,9 +740,16 @@ class TDataAccumulator: public NPrivate::IDataAccumulator {
739740 return Rows.GetData (MemLimit, BatchSize, force);
740741 }
741742
742- void Reshard (const TVector<TKeyRange>& keyRanges) {
743+ void Reshard (const TVector<TKeyRange>& keyRanges, const TString& data ) {
743744 TGuard<TMutex> lock (Mutex);
744745 Rows.Reshard (keyRanges);
746+
747+ TStringInput input (data);
748+ TString line;
749+
750+ while (input.ReadLine (line)) {
751+ Rows.Add (KeyBuilder.Build (line), std::move (line));
752+ }
745753 }
746754
747755private:
@@ -792,6 +800,7 @@ class TDataWriter: public NPrivate::IDataWriter {
792800 }
793801
794802 if (retryNumber == maxRetries) {
803+ LOG_E (" There is no retries left, last result: " << importResult.GetIssues ().ToOneLineString ());
795804 return false ;
796805 }
797806
@@ -801,19 +810,12 @@ class TDataWriter: public NPrivate::IDataWriter {
801810 TMaybe<TTableDescription> desc;
802811 auto descResult = DescribeTable (TableClient, Path, desc);
803812 if (!descResult.IsSuccess ()) {
813+ LOG_E (" Describe table " << Path.Quote () << " failed: " << descResult.GetIssues ().ToOneLineString ());
804814 return false ;
805815 }
806816
807- Accumulator->Reshard (desc->GetKeyRanges ());
808-
809- TStringInput input (data);
810- TString line;
811-
812- while (input.ReadLine (line)) {
813- Accumulator->Feed (std::move (line));
814- }
815-
816- break ;
817+ Accumulator->Reshard (desc->GetKeyRanges (), data);
818+ return true ;
817819 }
818820
819821 case EStatus::ABORTED:
@@ -855,12 +857,14 @@ class TDataWriter: public NPrivate::IDataWriter {
855857 const TRestoreSettings& settings,
856858 TImportClient& importClient,
857859 TTableClient& tableClient,
858- NPrivate::IDataAccumulator* accumulator)
860+ NPrivate::IDataAccumulator* accumulator,
861+ const std::shared_ptr<TLog>& log)
859862 : Path(path)
860863 , Settings(MakeSettings(settings, desc))
861864 , ImportClient(importClient)
862865 , TableClient(tableClient)
863866 , Accumulator(dynamic_cast <TDataAccumulator*>(accumulator))
867+ , Log(log)
864868 , RateLimiterSettings(settings.RateLimiterSettings_)
865869 , RequestLimiter(RateLimiterSettings.GetRps(), RateLimiterSettings.GetRps())
866870 {
@@ -871,7 +875,10 @@ class TDataWriter: public NPrivate::IDataWriter {
871875 }
872876
873877 bool Push (TString&& data) override {
874- Y_ENSURE (data.size () < TRestoreSettings::MaxBytesPerRequest, " Data is too long" );
878+ if (data.size () >= TRestoreSettings::MaxBytesPerRequest) {
879+ LOG_E (" Data is too long" );
880+ return false ;
881+ }
875882
876883 if (IsStopped ()) {
877884 return false ;
@@ -896,6 +903,7 @@ class TDataWriter: public NPrivate::IDataWriter {
896903 TImportClient& ImportClient;
897904 TTableClient& TableClient;
898905 TDataAccumulator* Accumulator;
906+ const std::shared_ptr<TLog> Log;
899907
900908 const TRateLimiterSettings RateLimiterSettings;
901909
@@ -922,8 +930,9 @@ NPrivate::IDataWriter* CreateImportDataWriter(
922930 TImportClient& importClient,
923931 TTableClient& tableClient,
924932 NPrivate::IDataAccumulator* accumulator,
925- const TRestoreSettings& settings) {
926- return new TDataWriter (path, desc, settings, importClient, tableClient, accumulator);
933+ const TRestoreSettings& settings,
934+ const std::shared_ptr<TLog>& log) {
935+ return new TDataWriter (path, desc, settings, importClient, tableClient, accumulator, log);
927936}
928937
929938} // NDump
0 commit comments