@@ -155,6 +155,8 @@ class Client::Impl {
155
155
156
156
void ExecuteQuery (Query query);
157
157
158
+ void SelectWithExternalData (Query query, const ExternalTables& external_tables);
159
+
158
160
void SendCancel ();
159
161
160
162
void Insert (const std::string& table_name, const std::string& query_id, const Block& block);
@@ -174,10 +176,14 @@ class Client::Impl {
174
176
175
177
bool ReceivePacket (uint64_t * server_packet = nullptr );
176
178
177
- void SendQuery (const Query& query);
179
+ void SendQuery (const Query& query, bool finalize = true );
180
+ void FinalizeQuery ();
178
181
179
182
void SendData (const Block& block);
180
183
184
+ void SendBlockData (const Block& block);
185
+ void SendExternalData (const ExternalTables& external_tables);
186
+
181
187
bool SendHello ();
182
188
183
189
bool ReadBlock (InputStream& input, Block* block);
@@ -291,6 +297,51 @@ void Client::Impl::ExecuteQuery(Query query) {
291
297
}
292
298
}
293
299
300
+
301
+ void Client::Impl::SelectWithExternalData (Query query, const ExternalTables& external_tables) {
302
+ if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
303
+ throw UnimplementedError (" This version of ClickHouse server doesn't support temporary tables" );
304
+ }
305
+
306
+ EnsureNull en (static_cast <QueryEvents*>(&query), &events_);
307
+
308
+ if (options_.ping_before_query ) {
309
+ RetryGuard ([this ]() { Ping (); });
310
+ }
311
+
312
+ SendQuery (query, false );
313
+ SendExternalData (external_tables);
314
+ FinalizeQuery ();
315
+
316
+ while (ReceivePacket ()) {
317
+ ;
318
+ }
319
+ }
320
+
321
+ void Client::Impl::SendBlockData (const Block& block) {
322
+ if (compression_ == CompressionState::Enable) {
323
+ std::unique_ptr<OutputStream> compressed_output = std::make_unique<CompressedOutput>(output_.get (), options_.max_compression_chunk_size , options_.compression_method );
324
+ BufferedOutput buffered (std::move (compressed_output), options_.max_compression_chunk_size );
325
+
326
+ WriteBlock (block, buffered);
327
+ } else {
328
+ WriteBlock (block, *output_);
329
+ }
330
+ }
331
+
332
+ void Client::Impl::SendExternalData (const ExternalTables& external_tables) {
333
+ for (const auto & table: external_tables) {
334
+ if (!table.data .GetRowCount ()) {
335
+ // skip empty blocks to keep the connection in the consistent state as the current request would be marked as finished by such an empty block
336
+ continue ;
337
+ }
338
+ WireFormat::WriteFixed<uint8_t >(*output_, ClientCodes::Data);
339
+ WireFormat::WriteString (*output_, table.name );
340
+ SendBlockData (table.data );
341
+ }
342
+ }
343
+
344
+
294
345
std::string NameToQueryString (const std::string &input)
295
346
{
296
347
std::string output;
@@ -753,7 +804,7 @@ void Client::Impl::SendCancel() {
753
804
output_->Flush ();
754
805
}
755
806
756
- void Client::Impl::SendQuery (const Query& query) {
807
+ void Client::Impl::SendQuery (const Query& query, bool finalize ) {
757
808
WireFormat::WriteUInt64 (*output_, ClientCodes::Query);
758
809
WireFormat::WriteString (*output_, query.GetQueryID ());
759
810
@@ -858,7 +909,13 @@ void Client::Impl::SendQuery(const Query& query) {
858
909
}
859
910
WireFormat::WriteString (*output_, std::string ()); // empty string after last param
860
911
}
912
+
913
+ if (finalize) {
914
+ FinalizeQuery ();
915
+ }
916
+ }
861
917
918
+ void Client::Impl::FinalizeQuery () {
862
919
// Send empty block as marker of
863
920
// end of data
864
921
SendData (Block ());
@@ -905,16 +962,7 @@ void Client::Impl::SendData(const Block& block) {
905
962
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
906
963
WireFormat::WriteString (*output_, std::string ());
907
964
}
908
-
909
- if (compression_ == CompressionState::Enable) {
910
-
911
- std::unique_ptr<OutputStream> compressed_output = std::make_unique<CompressedOutput>(output_.get (), options_.max_compression_chunk_size , options_.compression_method );
912
- BufferedOutput buffered (std::move (compressed_output), options_.max_compression_chunk_size );
913
-
914
- WriteBlock (block, buffered);
915
- } else {
916
- WriteBlock (block, *output_);
917
- }
965
+ SendBlockData (block);
918
966
919
967
output_->Flush ();
920
968
}
@@ -1077,6 +1125,22 @@ void Client::Select(const Query& query) {
1077
1125
Execute (query);
1078
1126
}
1079
1127
1128
+ void Client::SelectWithExternalData (const std::string& query, const ExternalTables& external_tables, SelectCallback cb) {
1129
+ impl_->SelectWithExternalData (Query (query).OnData (std::move (cb)), external_tables);
1130
+ }
1131
+
1132
+ void Client::SelectWithExternalData (const std::string& query, const std::string& query_id, const ExternalTables& external_tables, SelectCallback cb) {
1133
+ impl_->SelectWithExternalData (Query (query, query_id).OnData (std::move (cb)), external_tables);
1134
+ }
1135
+
1136
+ void Client::SelectWithExternalDataCancelable (const std::string& query, const ExternalTables& external_tables, SelectCancelableCallback cb) {
1137
+ impl_->SelectWithExternalData (Query (query).OnDataCancelable (std::move (cb)), external_tables);
1138
+ }
1139
+
1140
+ void Client::SelectWithExternalDataCancelable (const std::string& query, const std::string& query_id, const ExternalTables& external_tables, SelectCancelableCallback cb) {
1141
+ impl_->SelectWithExternalData (Query (query, query_id).OnDataCancelable (std::move (cb)), external_tables);
1142
+ }
1143
+
1080
1144
void Client::Insert (const std::string& table_name, const Block& block) {
1081
1145
impl_->Insert (table_name, Query::default_query_id, block);
1082
1146
}
0 commit comments