11#include " container.h"
22#include < ydb/library/actors/core/log.h>
3+ #include < ydb/core/formats/arrow/arrow_helpers.h>
34#include < ydb/core/formats/arrow/simple_arrays_cache.h>
45
56namespace NKikimr ::NArrow {
67
7- NKikimr::TConclusionStatus TGeneralContainer::MergeColumnsStrictly (const TGeneralContainer& container) {
8- if (RecordsCount != container.RecordsCount ) {
8+ TConclusionStatus TGeneralContainer::MergeColumnsStrictly (const TGeneralContainer& container) {
9+ if (!container.RecordsCount ) {
10+ return TConclusionStatus::Success ();
11+ }
12+ if (!RecordsCount) {
13+ RecordsCount = container.RecordsCount ;
14+ }
15+ if (*RecordsCount != *container.RecordsCount ) {
916 return TConclusionStatus::Fail (TStringBuilder () << " inconsistency records count in additional container: " <<
1017 container.GetSchema ()->ToString () << " . expected: " << RecordsCount << " , reality: " << container.GetRecordsCount ());
1118 }
1219 for (i32 i = 0 ; i < container.Schema ->num_fields (); ++i) {
1320 auto addFieldResult = AddField (container.Schema ->field (i), container.Columns [i]);
14- if (! addFieldResult) {
21+ if (addFieldResult. IsFail () ) {
1522 return addFieldResult;
1623 }
1724 }
1825 return TConclusionStatus::Success ();
1926}
2027
21- NKikimr:: TConclusionStatus TGeneralContainer::AddField (const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<NAccessor::IChunkedArray>& data) {
28+ TConclusionStatus TGeneralContainer::AddField (const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<NAccessor::IChunkedArray>& data) {
2229 AFL_VERIFY (f);
2330 AFL_VERIFY (data);
24- if (data->GetRecordsCount () != RecordsCount) {
31+ if (RecordsCount && data->GetRecordsCount () != * RecordsCount) {
2532 return TConclusionStatus::Fail (TStringBuilder () << " inconsistency records count in new column: " <<
2633 f->name () << " . expected: " << RecordsCount << " , reality: " << data->GetRecordsCount ());
2734 }
2835 if (!data->GetDataType ()->Equals (f->type ())) {
2936 return TConclusionStatus::Fail (" schema and data type are not equals: " + data->GetDataType ()->ToString () + " vs " + f->type ()->ToString ());
3037 }
31- if (Schema->GetFieldByName (f->name ())) {
32- return TConclusionStatus::Fail (" field name duplication: " + f->name ());
33- }
34- auto resultAdd = Schema->AddField (Schema->num_fields (), f);
35- if (!resultAdd.ok ()) {
36- return TConclusionStatus::Fail (" internal schema error on add field: " + resultAdd.status ().ToString ());
38+ {
39+ auto conclusion = Schema->AddField (f);
40+ if (conclusion.IsFail ()) {
41+ return conclusion;
42+ }
3743 }
38- Schema = *resultAdd ;
44+ RecordsCount = data-> GetRecordsCount () ;
3945 Columns.emplace_back (data);
4046 return TConclusionStatus::Success ();
4147}
4248
43- TGeneralContainer::TGeneralContainer (const std::shared_ptr<arrow::Schema>& schema, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns)
44- : Schema(schema)
45- , Columns(std::move(columns))
46- {
47- AFL_VERIFY (schema);
49+ TConclusionStatus TGeneralContainer::AddField (const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<arrow::ChunkedArray>& data) {
50+ return AddField (f, std::make_shared<NAccessor::TTrivialChunkedArray>(data));
51+ }
52+
53+ TConclusionStatus TGeneralContainer::AddField (const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<arrow::Array>& data) {
54+ return AddField (f, std::make_shared<NAccessor::TTrivialArray>(data));
55+ }
56+
57+ void TGeneralContainer::Initialize () {
4858 std::optional<ui64> recordsCount;
4959 AFL_VERIFY (Schema->num_fields () == (i32 )Columns.size ())(" schema" , Schema->num_fields ())(" columns" , Columns.size ());
5060 for (i32 i = 0 ; i < Schema->num_fields (); ++i) {
@@ -58,12 +68,34 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Schema>& schem
5868 }
5969 }
6070 AFL_VERIFY (recordsCount);
71+ AFL_VERIFY (!RecordsCount || *RecordsCount == *recordsCount);
6172 RecordsCount = *recordsCount;
6273}
6374
75+ TGeneralContainer::TGeneralContainer (const std::vector<std::shared_ptr<arrow::Field>>& fields, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns)
76+ : Schema(std::make_shared<NModifier::TSchema>(fields))
77+ , Columns(std::move(columns))
78+ {
79+ Initialize ();
80+ }
81+
82+ TGeneralContainer::TGeneralContainer (const std::shared_ptr<NModifier::TSchema>& schema, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns)
83+ : Schema(std::make_shared<NModifier::TSchema>(schema))
84+ , Columns(std::move(columns))
85+ {
86+ Initialize ();
87+ }
88+
89+ TGeneralContainer::TGeneralContainer (const std::shared_ptr<arrow::Schema>& schema, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns)
90+ : Schema(std::make_shared<NModifier::TSchema>(schema))
91+ , Columns(std::move(columns))
92+ {
93+ Initialize ();
94+ }
95+
6496TGeneralContainer::TGeneralContainer (const std::shared_ptr<arrow::Table>& table) {
6597 AFL_VERIFY (table);
66- Schema = table->schema ();
98+ Schema = std::make_shared<NModifier::TSchema>( table->schema () );
6799 RecordsCount = table->num_rows ();
68100 for (auto && i : table->columns ()) {
69101 if (i->num_chunks () == 1 ) {
@@ -72,15 +104,17 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Table>& table)
72104 Columns.emplace_back (std::make_shared<NAccessor::TTrivialChunkedArray>(i));
73105 }
74106 }
107+ Initialize ();
75108}
76109
77110TGeneralContainer::TGeneralContainer (const std::shared_ptr<arrow::RecordBatch>& table) {
78111 AFL_VERIFY (table);
79- Schema = table->schema ();
112+ Schema = std::make_shared<NModifier::TSchema>( table->schema () );
80113 RecordsCount = table->num_rows ();
81114 for (auto && i : table->columns ()) {
82115 Columns.emplace_back (std::make_shared<NAccessor::TTrivialArray>(i));
83116 }
117+ Initialize ();
84118}
85119
86120std::shared_ptr<NKikimr::NArrow::NAccessor::IChunkedArray> TGeneralContainer::GetAccessorByNameVerified (const std::string& fieldId) const {
@@ -110,14 +144,78 @@ std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const std::o
110144 if (fields.empty ()) {
111145 return nullptr ;
112146 }
113- return arrow::Table::Make (std::make_shared<arrow::Schema>(fields), columns, RecordsCount);
147+ AFL_VERIFY (RecordsCount);
148+ return arrow::Table::Make (std::make_shared<arrow::Schema>(fields), columns, *RecordsCount);
114149}
115150
116- std::shared_ptr<arrow::Table> TGeneralContainer::BuildTable (const std::optional<std::set<std::string>>& columnNames /* = {}*/ ) const {
151+ std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified (const std::optional<std::set<std::string>>& columnNames /* = {}*/ ) const {
117152 auto result = BuildTableOptional (columnNames);
118153 AFL_VERIFY (result);
119154 AFL_VERIFY (!columnNames || result->schema ()->num_fields () == (i32 )columnNames->size ());
120155 return result;
121156}
122157
158+ std::shared_ptr<NArrow::NAccessor::IChunkedArray> TGeneralContainer::GetAccessorByNameOptional (const std::string& fieldId) const {
159+ int idx = Schema->GetFieldIndex (fieldId);
160+ if (idx == -1 ) {
161+ return nullptr ;
162+ }
163+ AFL_VERIFY ((ui32)idx < Columns.size ())(" idx" , idx)(" count" , Columns.size ());
164+ return Columns[idx];
165+ }
166+
167+ TConclusionStatus TGeneralContainer::SyncSchemaTo (const std::shared_ptr<arrow::Schema>& schema, const IFieldsConstructor* defaultFieldsConstructor, const bool forceDefaults) {
168+ std::shared_ptr<NModifier::TSchema> schemaNew = std::make_shared<NModifier::TSchema>();
169+ std::vector<std::shared_ptr<NAccessor::IChunkedArray>> columnsNew;
170+ if (!RecordsCount) {
171+ return TConclusionStatus::Fail (" original container has not data" );
172+ }
173+ for (auto && i : schema->fields ()) {
174+ const int idx = Schema->GetFieldIndex (i->name ());
175+ if (idx == -1 ) {
176+ if (!defaultFieldsConstructor) {
177+ return TConclusionStatus::Fail (" haven't field for sync: '" + i->name () + " '" );
178+ } else {
179+ schemaNew->AddField (i).Validate ();
180+ auto defConclusion = defaultFieldsConstructor->GetDefaultColumnElementValue (i, forceDefaults);
181+ if (defConclusion.IsFail ()) {
182+ return defConclusion;
183+ }
184+ columnsNew.emplace_back (std::make_shared<NAccessor::TTrivialArray>(NArrow::TThreadSimpleArraysCache::Get (i->type (), *defConclusion, *RecordsCount)));
185+ }
186+ } else {
187+ const auto & fOwned = Schema->GetFieldVerified (idx);
188+ if (!fOwned ->type ()->Equals (i->type ())) {
189+ return TConclusionStatus::Fail (" different field types for '" + i->name () + " '. Have " + fOwned ->type ()->ToString () + " , need " + i->type ()->ToString ());
190+ }
191+ schemaNew->AddField (fOwned ).Validate ();
192+ columnsNew.emplace_back (Columns[idx]);
193+ }
194+ }
195+ std::swap (Schema, schemaNew);
196+ std::swap (columnsNew, Columns);
197+ return TConclusionStatus::Success ();
198+ }
199+
200+ TString TGeneralContainer::DebugString () const {
201+ TStringBuilder result;
202+ if (RecordsCount) {
203+ result << " records_count=" << *RecordsCount << " ;" ;
204+ }
205+ result << " schema=" << Schema->ToString () << " ;" ;
206+ return result;
207+ }
208+
209+ TConclusion<std::shared_ptr<arrow::Scalar>> IFieldsConstructor::GetDefaultColumnElementValue (const std::shared_ptr<arrow::Field>& field, const bool force) const {
210+ AFL_VERIFY (field);
211+ auto result = DoGetDefaultColumnElementValue (field->name ());
212+ if (result) {
213+ return result;
214+ }
215+ if (force) {
216+ return NArrow::DefaultScalar (field->type ());
217+ }
218+ return TConclusionStatus::Fail (" have not default value for column " + field->name ());
219+ }
220+
123221}
0 commit comments