|
126 | 126 | LOG_TRACE_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, "TS3ReadCoroImpl: " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId \ |
127 | 127 | << " [" << Path << "]. RETRY{ Offset: " << RetryStuff->Offset << ", Delay: " << RetryStuff->NextRetryDelay << ", RequestId: " << RetryStuff->RequestId << "}. " << stream) |
128 | 128 |
|
129 | | -#define THROW_ARROW_NOT_OK(status) \ |
| 129 | +#define THROW_ARROW_NOT_OK(code, status) \ |
130 | 130 | do \ |
131 | 131 | { \ |
132 | 132 | if (::arrow::Status _s = (status); !_s.ok()) \ |
133 | | - throw yexception() << _s.ToString(); \ |
| 133 | + ythrow TCodeLineException(code) << _s.ToString(); \ |
134 | 134 | } while (false) |
135 | 135 |
|
136 | 136 | namespace NYql::NDq { |
@@ -575,7 +575,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { |
575 | 575 | void HandleEvent(TEvS3Provider::TEvReadResult2::THandle& event) { |
576 | 576 |
|
577 | 577 | if (event.Get()->Failure) { |
578 | | - throw yexception() << event.Get()->Issues.ToOneLineString(); |
| 578 | + ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) << event.Get()->Issues.ToOneLineString(); |
579 | 579 | } |
580 | 580 | auto readyRange = event.Get()->ReadRange; |
581 | 581 | LOG_CORO_D("Download FINISHED [" << readyRange.Offset << "-" << readyRange.Length << "], cookie: " << event.Cookie); |
@@ -773,9 +773,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { |
773 | 773 | if (StopIfConsumedEnough(numRows)) { |
774 | 774 | isCancelled = true; |
775 | 775 | } |
776 | | - if (!status.ok()) { |
777 | | - throw yexception() << status.ToString(); |
778 | | - } |
| 776 | + ThrowParquetNotOk(status); |
779 | 777 | SourceContext->UpdateProgress(downloadedBytes, decodedBytes, table->num_rows()); |
780 | 778 | if (RawInflightSize) { |
781 | 779 | RawInflightSize->Sub(downloadedBytes); |
@@ -862,9 +860,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { |
862 | 860 | if (StopIfConsumedEnough(numRows)) { |
863 | 861 | isCancelled = true; |
864 | 862 | } |
865 | | - if (!status.ok()) { |
866 | | - throw yexception() << status.ToString(); |
867 | | - } |
| 863 | + ThrowParquetNotOk(status); |
868 | 864 | SourceContext->UpdateProgress(downloadedBytes, decodedBytes, table->num_rows()); |
869 | 865 | if (isCancelled) { |
870 | 866 | LOG_CORO_D("RunCoroBlockArrowParserOverFile - STOPPED ON SATURATION"); |
@@ -1177,6 +1173,11 @@ class TS3ReadCoroImpl : public TActorCoroImpl { |
1177 | 1173 | // Stop any activity instantly |
1178 | 1174 | RetryStuff->Cancel(); |
1179 | 1175 | return; |
| 1176 | + } catch (const TCodeLineException& err) { |
| 1177 | + LOG_CORO_E(err.what()); |
| 1178 | + Issues.AddIssue(err.GetRawMessage()); |
| 1179 | + FatalCode = static_cast<NYql::NDqProto::StatusIds::StatusCode>(err.Code); |
| 1180 | + RetryStuff->Cancel(); |
1180 | 1181 | } catch (const std::exception& err) { |
1181 | 1182 | Issues.AddIssue(TIssue(err.what())); |
1182 | 1183 | FatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR; |
@@ -2029,11 +2030,11 @@ NDB::DataTypePtr MetaToClickHouse(const TType* type, NSerialization::TSerializat |
2029 | 2030 | return std::make_shared<const NDB::DataTypeDecimal<NDB::Decimal128>>(precision, scale); |
2030 | 2031 | } |
2031 | 2032 | default: |
2032 | | - throw yexception() << "Unsupported data slot in MetaToClickHouse: " << slot; |
| 2033 | + ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) << "Unsupported data slot in MetaToClickHouse: " << slot; |
2033 | 2034 | } |
2034 | 2035 | } |
2035 | 2036 | default: |
2036 | | - throw yexception() << "Unsupported type kind in MetaToClickHouse: " << type->GetKindAsStr(); |
| 2037 | + ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) << "Unsupported type kind in MetaToClickHouse: " << type->GetKindAsStr(); |
2037 | 2038 | } |
2038 | 2039 | return nullptr; |
2039 | 2040 | } |
@@ -2104,17 +2105,18 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( |
2104 | 2105 | if (hasDirectories) { |
2105 | 2106 | auto pathPatternValue = settings.find("pathpattern"); |
2106 | 2107 | if (pathPatternValue == settings.cend()) { |
2107 | | - ythrow yexception() << "'pathpattern' must be configured for directory listing"; |
| 2108 | + ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) |
| 2109 | + << "'pathpattern' must be configured for directory listing"; |
2108 | 2110 | } |
2109 | 2111 | pathPattern = pathPatternValue->second; |
2110 | 2112 |
|
2111 | 2113 | auto pathPatternVariantValue = settings.find("pathpatternvariant"); |
2112 | 2114 | if (pathPatternVariantValue == settings.cend()) { |
2113 | | - ythrow yexception() |
| 2115 | + ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) |
2114 | 2116 | << "'pathpatternvariant' must be configured for directory listing"; |
2115 | 2117 | } |
2116 | 2118 | if (!TryFromString(pathPatternVariantValue->second, pathPatternVariant)) { |
2117 | | - ythrow yexception() |
| 2119 | + ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) |
2118 | 2120 | << "Unknown 'pathpatternvariant': " << pathPatternVariantValue->second; |
2119 | 2121 | } |
2120 | 2122 | } |
@@ -2197,13 +2199,16 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( |
2197 | 2199 | std::shared_ptr<arrow::DataType> dataType; |
2198 | 2200 |
|
2199 | 2201 | YQL_ENSURE(ConvertArrowType(memberType, dataType, true), "Unsupported arrow type"); |
2200 | | - THROW_ARROW_NOT_OK(builder.AddField(std::make_shared<arrow::Field>(std::string(memberName), dataType, memberType->IsOptional()))); |
| 2202 | + THROW_ARROW_NOT_OK( |
| 2203 | + NYql::NDqProto::StatusIds::INTERNAL_ERROR, |
| 2204 | + builder.AddField(std::make_shared<arrow::Field>(std::string(memberName), dataType, memberType->IsOptional())) |
| 2205 | + ); |
2201 | 2206 | readSpec->ColumnReorder.push_back(i); |
2202 | 2207 | readSpec->RowSpec.emplace(memberName, memberType); |
2203 | 2208 | } |
2204 | 2209 |
|
2205 | 2210 | auto res = builder.Finish(); |
2206 | | - THROW_ARROW_NOT_OK(res.status()); |
| 2211 | + THROW_ARROW_NOT_OK(NYql::NDqProto::StatusIds::INTERNAL_ERROR, res.status()); |
2207 | 2212 | readSpec->ArrowSchema = std::move(res).ValueOrDie(); |
2208 | 2213 | } else { |
2209 | 2214 | readSpec->CHColumns.resize(structType->GetMembersCount()); |
|
0 commit comments