@@ -207,24 +207,23 @@ NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
207207 return ReorderImpl (incoming, columnNames);
208208}
209209namespace {
210- template <class TDataContainer , class TSchemaImpl >
211- TConclusion<TSchemaSubset> BuildSequentialSubsetImpl (const std::shared_ptr<TDataContainer>& srcBatch,
212- const std::shared_ptr<TSchemaImpl>& dstSchema, const TColumnOperator::ECheckFieldTypesPolicy checkFieldTypesPolicy) {
210+ template <class TDataContainer >
211+ TConclusion<TSchemaSubset> BuildSequentialSubsetImpl (const std::shared_ptr<TDataContainer>& srcBatch, const TSchemaLiteView& dstSchema,
212+ const TColumnOperator::ECheckFieldTypesPolicy checkFieldTypesPolicy) {
213213 AFL_VERIFY (srcBatch);
214- AFL_VERIFY (dstSchema);
215- if (dstSchema->num_fields () < srcBatch->schema ()->num_fields ()) {
214+ if (dstSchema.num_fields () < srcBatch->schema ()->num_fields ()) {
216215 AFL_ERROR (NKikimrServices::ARROW_HELPER)(" event" , " incorrect columns set: destination must been wider than source" )(
217- " source" , srcBatch->schema ()->ToString ())(" destination" , dstSchema-> ToString ());
216+ " source" , srcBatch->schema ()->ToString ())(" destination" , dstSchema. ToString ());
218217 return TConclusionStatus::Fail (" incorrect columns set: destination must been wider than source" );
219218 }
220219 std::set<ui32> fieldIdx;
221220 auto itSrc = srcBatch->schema ()->fields ().begin ();
222- auto itDst = dstSchema-> fields ().begin ();
223- while (itSrc != srcBatch->schema ()->fields ().end () && itDst != dstSchema-> fields ().end ()) {
221+ auto itDst = dstSchema. fields ().begin ();
222+ while (itSrc != srcBatch->schema ()->fields ().end () && itDst != dstSchema. fields ().end ()) {
224223 if ((*itSrc)->name () != (*itDst)->name ()) {
225224 ++itDst;
226225 } else {
227- fieldIdx.emplace (itDst - dstSchema-> fields ().begin ());
226+ fieldIdx.emplace (itDst - dstSchema. fields ().begin ());
228227 if (checkFieldTypesPolicy != TColumnOperator::ECheckFieldTypesPolicy::Ignore && (*itDst)->Equals (*itSrc)) {
229228 switch (checkFieldTypesPolicy) {
230229 case TColumnOperator::ECheckFieldTypesPolicy::Error: {
@@ -245,25 +244,24 @@ TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(const std::shared_ptr<TData
245244 ++itSrc;
246245 }
247246 }
248- if (itDst == dstSchema-> fields ().end () && itSrc != srcBatch->schema ()->fields ().end ()) {
247+ if (itDst == dstSchema. fields ().end () && itSrc != srcBatch->schema ()->fields ().end ()) {
249248 AFL_ERROR (NKikimrServices::ARROW_HELPER)(" event" , " incorrect columns order in source set" )(" source" , srcBatch->schema ()->ToString ())(
250- " destination" , dstSchema-> ToString ());
249+ " destination" , dstSchema. ToString ());
251250 return TConclusionStatus::Fail (" incorrect columns order in source set" );
252251 }
253- return TSchemaSubset (fieldIdx, dstSchema-> num_fields ());
252+ return TSchemaSubset (fieldIdx, dstSchema. num_fields ());
254253}
255254} // namespace
256255
257256TConclusion<TSchemaSubset> TColumnOperator::BuildSequentialSubset (
258- const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr< NArrow::TSchemaLite> & dstSchema) {
257+ const std::shared_ptr<arrow::RecordBatch>& incoming, const NArrow::TSchemaLiteView & dstSchema) {
259258 return BuildSequentialSubsetImpl (incoming, dstSchema, DifferentColumnTypesPolicy);
260259}
261260namespace {
262261template <class TDataContainer >
263262TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl (const std::shared_ptr<TDataContainer>& incoming,
264- const std::shared_ptr<TSchemaLite>& dstSchema, const std::function<TConclusionStatus(const ui32, const i32 )>& checker,
265- const std::function<i32(const std::string&)>& nameResolver,
266- const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy,
263+ const TSchemaLiteView& dstSchema, const std::function<TConclusionStatus(const ui32, const i32 )>& checker,
264+ const std::function<i32(const std::string&)>& nameResolver, const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy,
267265 const TColumnOperator::EAbsentFieldPolicy absentColumnPolicy) {
268266 struct TFieldData {
269267 ui32 Index;
@@ -273,14 +271,13 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(c
273271 }
274272 };
275273 AFL_VERIFY (incoming);
276- AFL_VERIFY (dstSchema);
277274 std::vector<TFieldData> resultColumns;
278275 resultColumns.reserve (incoming->num_columns ());
279276 ui32 idx = 0 ;
280277 for (auto & srcField : incoming->schema ()->fields ()) {
281278 const int dstIndex = nameResolver (srcField->name ());
282279 if (dstIndex > -1 ) {
283- const auto & dstField = dstSchema-> GetFieldByIndexVerified (dstIndex);
280+ const auto & dstField = dstSchema. GetFieldByIndexVerified (dstIndex);
284281 switch (differentColumnTypesPolicy) {
285282 case TColumnOperator::ECheckFieldTypesPolicy::Verify:
286283 AFL_VERIFY (dstField->type ()->Equals (srcField->type ()))(" event" , " cannot_use_incoming_batch" )(" reason" , " invalid_column_type" )(
@@ -322,14 +319,14 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(c
322319 columns.reserve (resultColumns.size ());
323320 fields.reserve (resultColumns.size ());
324321 for (auto && i : resultColumns) {
325- fields.emplace_back (dstSchema-> field (i.Index ));
322+ fields.emplace_back (dstSchema. field (i.Index ));
326323 columns.emplace_back (i.Column );
327324 }
328325 return NAdapter::TDataBuilderPolicy<TDataContainer>::Build (std::make_shared<arrow::Schema>(fields), std::move (columns), incoming->num_rows ());
329326}
330327} // namespace
331328TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::AdaptIncomingToDestinationExt (
332- const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<TSchemaLite> & dstSchema,
329+ const std::shared_ptr<arrow::RecordBatch>& incoming, const TSchemaLiteView & dstSchema,
333330 const std::function<TConclusionStatus(const ui32, const i32 )>& checker, const std::function<i32(const std::string&)>& nameResolver) const {
334331 return AdaptIncomingToDestinationExtImpl (incoming, dstSchema, checker, nameResolver, DifferentColumnTypesPolicy, AbsentColumnPolicy);
335332}
0 commit comments