@@ -161,6 +161,12 @@ class Client::Impl {
161161
162162 void Insert (const std::string& table_name, const std::string& query_id, const Block& block);
163163
164+ PreparedInsert * PrepareInsert (Query query);
165+
166+ void FinishInsert ();
167+
168+ void SendData (const Block& block);
169+
164170 void Ping ();
165171
166172 void ResetConnection ();
@@ -175,12 +181,11 @@ class Client::Impl {
175181 bool Handshake ();
176182
177183 bool ReceivePacket (uint64_t * server_packet = nullptr );
184+ bool ReceivePreparePackets (uint64_t * server_packet = nullptr );
178185
179186 void SendQuery (const Query& query, bool finalize = true );
180187 void FinalizeQuery ();
181188
182- void SendData (const Block& block);
183-
184189 void SendBlockData (const Block& block);
185190 void SendExternalData (const ExternalTables& external_tables);
186191
@@ -398,8 +403,11 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
398403
399404 // Send data.
400405 SendData (block);
401- // Send empty block as marker of
402- // end of data.
406+ FinishInsert ();
407+ }
408+
409+ void Client::Impl::FinishInsert () {
410+ // Send empty block as marker of end of data.
403411 SendData (Block ());
404412
405413 // Wait for EOS.
@@ -648,6 +656,78 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
648656 }
649657}
650658
659+ bool Client::Impl::ReceivePreparePackets (uint64_t * server_packet) {
660+ uint64_t packet_type = 0 ;
661+
662+ while (true ) {
663+ if (!WireFormat::ReadVarint64 (*input_, &packet_type)) {
664+ throw std::runtime_error (" unexpected package type " +
665+ std::to_string ((int )packet_type) + " for insert query" );
666+ }
667+ if (server_packet) {
668+ *server_packet = packet_type;
669+ }
670+
671+ switch (packet_type) {
672+ case ServerCodes::Data: {
673+ if (!ReceiveData ()) {
674+ throw ProtocolError (" can't read data packet from input stream" );
675+ }
676+ return true ;
677+ }
678+
679+ case ServerCodes::Exception: {
680+ ReceiveException ();
681+ return false ;
682+ }
683+
684+ case ServerCodes::ProfileInfo:
685+ case ServerCodes::Progress:
686+ case ServerCodes::Pong:
687+ case ServerCodes::Hello:
688+ continue ;
689+
690+ case ServerCodes::Log: {
691+ // log tag
692+ if (!WireFormat::SkipString (*input_)) {
693+ return false ;
694+ }
695+ Block block;
696+
697+ // Use uncompressed stream since log blocks usually contain only one row
698+ if (!ReadBlock (*input_, &block)) {
699+ return false ;
700+ }
701+
702+ if (events_) {
703+ events_->OnServerLog (block);
704+ }
705+ continue ;
706+ }
707+
708+ case ServerCodes::TableColumns: {
709+ // external table name
710+ if (!WireFormat::SkipString (*input_)) {
711+ return false ;
712+ }
713+
714+ // columns metadata
715+ if (!WireFormat::SkipString (*input_)) {
716+ return false ;
717+ }
718+ continue ;
719+ }
720+
721+ // No others expected.
722+ case ServerCodes::EndOfStream:
723+ case ServerCodes::ProfileEvents:
724+ default :
725+ throw UnimplementedError (" unimplemented " + std::to_string ((int )packet_type));
726+ break ;
727+ }
728+ }
729+ }
730+
651731bool Client::Impl::ReadBlock (InputStream& input, Block* block) {
652732 // Additional information about block.
653733 if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
@@ -923,7 +1003,6 @@ void Client::Impl::FinalizeQuery() {
9231003 output_->Flush ();
9241004}
9251005
926-
9271006void Client::Impl::WriteBlock (const Block& block, OutputStream& output) {
9281007 // Additional information about block.
9291008 if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
@@ -1063,7 +1142,7 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
10631142 }
10641143 }
10651144 }
1066- // Connectiong with current_endpoint_ are broken.
1145+ // Connecting with current_endpoint_ are broken.
10671146 // Trying to establish with the another one from the list.
10681147 size_t connection_attempts_count = GetConnectionAttempts ();
10691148 for (size_t i = 0 ; i < connection_attempts_count;)
@@ -1085,6 +1164,41 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
10851164 }
10861165}
10871166
1167+ Client::PreparedInsert * Client::Impl::PrepareInsert (Query query) {
1168+ // Arrange a query callback to extract a block that corresponds to the
1169+ // query columns.
1170+ auto block = new Block ();
1171+ query.OnData ([&block](const Block& b) {
1172+ for (Block::Iterator bi (b); bi.IsValid (); bi.Next ()) {
1173+ // Create the ClickHouse column type.
1174+ clickhouse::ColumnRef col = bi.Column ();
1175+ auto chtype = col->Type ();
1176+ if (chtype->GetCode () == Type::LowCardinality) {
1177+ chtype = col->As <ColumnLowCardinality>()->GetNestedType ();
1178+ }
1179+ block->AppendColumn (bi.Name (), clickhouse::CreateColumnByType (chtype->GetName ()));
1180+ }
1181+
1182+ return true ;
1183+ });
1184+
1185+
1186+ EnsureNull en (static_cast <QueryEvents*>(&query), &events_);
1187+
1188+ if (options_.ping_before_query ) {
1189+ RetryGuard ([this ]() { Ping (); });
1190+ }
1191+
1192+ SendQuery (query.GetText ());
1193+
1194+ // Receive data packet but keep the query/connection open.
1195+ if (!ReceivePreparePackets ()) {
1196+ throw std::runtime_error (" fail to receive data packet" );
1197+ }
1198+
1199+ return new PreparedInsert (this , block);
1200+ }
1201+
10881202Client::Client (const ClientOptions& opts)
10891203 : options_(opts)
10901204 , impl_(new Impl(opts))
@@ -1149,6 +1263,14 @@ void Client::Insert(const std::string& table_name, const std::string& query_id,
11491263 impl_->Insert (table_name, query_id, block);
11501264}
11511265
1266+ Client::PreparedInsert * Client::PrepareInsert (const std::string& query) {
1267+ return impl_->PrepareInsert (Query (query));
1268+ }
1269+
1270+ Client::PreparedInsert * Client::PrepareInsert (const std::string& query, const std::string& query_id) {
1271+ return impl_->PrepareInsert (Query (query, query_id));
1272+ }
1273+
11521274void Client::Ping () {
11531275 impl_->Ping ();
11541276}
@@ -1179,4 +1301,42 @@ Client::Version Client::GetVersion() {
11791301 };
11801302}
11811303
1304+ Client::PreparedInsert::PreparedInsert (void *c, Block *b) {
1305+ client = c;
1306+ block = b;
1307+ }
1308+
1309+ Client::PreparedInsert::~PreparedInsert () {
1310+ Finish ();
1311+ }
1312+
1313+ Block * Client::PreparedInsert::GetBlock () { return block; }
1314+
1315+ void Client::PreparedInsert::Execute () {
1316+ if (!block) {
1317+ throw std::runtime_error (" Cannot execute finished PrepareExecute" );
1318+ }
1319+ auto c = (Client::Impl *)(client);
1320+ block->RefreshRowCount ();
1321+ c->SendData (*block);
1322+ block->Clear ();
1323+ }
1324+
1325+ void Client::PreparedInsert::Finish () {
1326+ if (!block) return ;
1327+
1328+ auto c = (Client::Impl *)(client);
1329+ block->RefreshRowCount ();
1330+ if (block->GetRowCount () > 0 ) {
1331+ c->SendData (*block);
1332+ block->Clear ();
1333+ }
1334+ c->FinishInsert ();
1335+ // Do not delete client as we're effectively its child.
1336+ if (block) {
1337+ delete block;
1338+ block = nullptr ;
1339+ }
1340+ }
1341+
11821342}
0 commit comments