@@ -330,7 +330,7 @@ class RPCEndpoint::EventHandler : public dmlc::Stream {
330
330
}
331
331
332
332
/* !
333
- * \brief Recive incoming packed seq from the stream.
333
+ * \brief Receive incoming packed seq from the stream.
334
334
* \return The received argments.
335
335
* \note The TVMArgs is available until we switchstate.
336
336
*/
@@ -369,7 +369,6 @@ class RPCEndpoint::EventHandler : public dmlc::Stream {
369
369
*/
370
370
void HandleReturn (RPCCode code, RPCSession::FEncodeReturn setreturn) {
371
371
TVMArgs args = RecvPackedSeq ();
372
-
373
372
if (code == RPCCode::kException ) {
374
373
// switch to the state before sending exception.
375
374
this ->SwitchToState (kRecvPacketNumBytes );
@@ -802,14 +801,13 @@ void RPCEndpoint::CopyToRemote(void* from_bytes, DLTensor* to, uint64_t nbytes)
802
801
std::lock_guard<std::mutex> lock (mutex_);
803
802
RPCCode code = RPCCode::kCopyToRemote ;
804
803
805
- uint64_t num_data_bytes = static_cast <uint64_t >(GetDataSize (*to));
806
- ICHECK_EQ (nbytes, num_data_bytes);
804
+ uint64_t tensor_total_size_bytes = static_cast <uint64_t >(GetDataSize (*to));
805
+ ICHECK_LE (to->byte_offset + nbytes, tensor_total_size_bytes)
806
+ << " CopyToRemote: overflow in tensor size: (byte_offset=" << to->byte_offset
807
+ << " , nbytes=" << nbytes << " , tensor_total_size=" << tensor_total_size_bytes << " )" ;
807
808
808
- uint64_t to_data = reinterpret_cast <uint64_t >(to->data );
809
- uint64_t shape_bytes = to->ndim * sizeof (int64_t );
810
- uint64_t packet_nbytes = sizeof (code) + sizeof (to_data) + sizeof (to->device ) + sizeof (to->ndim ) +
811
- sizeof (to->dtype ) + sizeof (to->byte_offset ) + shape_bytes +
812
- sizeof (nbytes) + num_data_bytes;
809
+ uint64_t overhead = RemoteCopyCalculatePacketOverheadSize (to, code, nbytes);
810
+ uint64_t packet_nbytes = overhead + nbytes;
813
811
814
812
handler_->Write (packet_nbytes);
815
813
handler_->Write (code);
@@ -823,14 +821,13 @@ void RPCEndpoint::CopyFromRemote(DLTensor* from, void* to_bytes, uint64_t nbytes
823
821
std::lock_guard<std::mutex> lock (mutex_);
824
822
RPCCode code = RPCCode::kCopyFromRemote ;
825
823
826
- uint64_t num_data_bytes = static_cast <uint64_t >(GetDataSize (*from));
827
- CHECK_EQ (nbytes, num_data_bytes);
824
+ uint64_t tensor_total_size_bytes = static_cast <uint64_t >(GetDataSize (*from));
825
+ ICHECK_LE (from->byte_offset + nbytes, tensor_total_size_bytes)
826
+ << " CopyFromRemote: overflow in tensor size: (byte_offset=" << from->byte_offset
827
+ << " , nbytes=" << nbytes << " , tensor_total_size=" << tensor_total_size_bytes << " )" ;
828
828
829
- uint64_t from_data = reinterpret_cast <uint64_t >(from->data );
830
- uint64_t shape_bytes = from->ndim * sizeof (int64_t );
831
- uint64_t packet_nbytes = sizeof (code) + sizeof (from_data) + sizeof (from->device ) +
832
- sizeof (from->ndim ) + sizeof (from->dtype ) + sizeof (from->byte_offset ) +
833
- shape_bytes + sizeof (nbytes);
829
+ uint64_t overhead = RemoteCopyCalculatePacketOverheadSize (from, code, nbytes);
830
+ uint64_t packet_nbytes = overhead;
834
831
835
832
handler_->Write (packet_nbytes);
836
833
handler_->Write (code);
@@ -1009,11 +1006,55 @@ class RPCClientSession : public RPCSession, public DeviceAPI {
1009
1006
}
1010
1007
1011
1008
void CopyToRemote (void * local_from_bytes, DLTensor* remote_to, uint64_t nbytes) final {
1012
- endpoint_->CopyToRemote (local_from_bytes, remote_to, nbytes);
1009
+ RPCCode code = RPCCode::kCopyToRemote ;
1010
+ uint64_t overhead = RemoteCopyCalculatePacketOverheadSize (remote_to, code, nbytes);
1011
+ uint64_t rpc_max_size = GetRPCMaxTransferSize ();
1012
+ ICHECK_GT (rpc_max_size, overhead) << " CopyToRemote: Invalid block size!" ;
1013
+ const uint64_t block_size = rpc_max_size - overhead;
1014
+ uint64_t block_count = 0 ;
1015
+ const uint64_t num_blocks = nbytes / block_size;
1016
+ void * from_bytes;
1017
+
1018
+ for (block_count = 0 ; block_count < num_blocks; block_count++) {
1019
+ remote_to->byte_offset = block_count * block_size;
1020
+ from_bytes = reinterpret_cast <void *>(
1021
+ (reinterpret_cast <uint8_t *>(local_from_bytes) + block_count * block_size));
1022
+ endpoint_->CopyToRemote (from_bytes, remote_to, block_size);
1023
+ }
1024
+
1025
+ const uint64_t remainder_bytes = nbytes % block_size;
1026
+ if (remainder_bytes != 0 ) {
1027
+ remote_to->byte_offset = block_count * block_size;
1028
+ from_bytes = reinterpret_cast <void *>(
1029
+ (reinterpret_cast <uint8_t *>(local_from_bytes) + block_count * block_size));
1030
+ endpoint_->CopyToRemote (from_bytes, remote_to, remainder_bytes);
1031
+ }
1013
1032
}
1014
1033
1015
1034
void CopyFromRemote (DLTensor* remote_from, void * local_to_bytes, uint64_t nbytes) final {
1016
- endpoint_->CopyFromRemote (remote_from, local_to_bytes, nbytes);
1035
+ RPCCode code = RPCCode::kCopyFromRemote ;
1036
+ uint64_t overhead = RemoteCopyCalculatePacketOverheadSize (remote_from, code, nbytes);
1037
+ uint64_t rpc_max_size = GetRPCMaxTransferSize ();
1038
+ ICHECK_GT (rpc_max_size, overhead) << " CopyFromRemote: Invalid block size!" ;
1039
+ const uint64_t block_size = rpc_max_size - overhead;
1040
+ uint64_t block_count = 0 ;
1041
+ const uint64_t num_blocks = nbytes / block_size;
1042
+ void * to_bytes;
1043
+
1044
+ for (block_count = 0 ; block_count < num_blocks; block_count++) {
1045
+ remote_from->byte_offset = block_count * block_size;
1046
+ to_bytes = reinterpret_cast <void *>(
1047
+ (reinterpret_cast <uint8_t *>(local_to_bytes) + block_count * block_size));
1048
+ endpoint_->CopyFromRemote (remote_from, to_bytes, block_size);
1049
+ }
1050
+
1051
+ const uint64_t remainder_bytes = nbytes % block_size;
1052
+ if (remainder_bytes != 0 ) {
1053
+ remote_from->byte_offset = block_count * block_size;
1054
+ to_bytes = reinterpret_cast <void *>(
1055
+ (reinterpret_cast <uint8_t *>(local_to_bytes) + block_count * block_size));
1056
+ endpoint_->CopyFromRemote (remote_from, to_bytes, remainder_bytes);
1057
+ }
1017
1058
}
1018
1059
1019
1060
void FreeHandle (void * handle, int type_code) final {
@@ -1082,12 +1123,43 @@ class RPCClientSession : public RPCSession, public DeviceAPI {
1082
1123
bool IsLocalSession () const final { return false ; }
1083
1124
1084
1125
private:
1126
+ uint64_t GetRPCMaxTransferSize () {
1127
+ if (rpc_chunk_max_size_bytes_ > 0 ) {
1128
+ return (uint64_t )rpc_chunk_max_size_bytes_;
1129
+ }
1130
+
1131
+ PackedFuncHandle rpc_func = GetFunction (" tvm.rpc.server.GetCRTMaxPacketSize" );
1132
+ if (rpc_func == nullptr ) {
1133
+ rpc_chunk_max_size_bytes_ = (int64_t )kRPCMaxTransferSizeBytesDefault ;
1134
+ } else {
1135
+ CallFunc (rpc_func, nullptr , nullptr , 0 , [this ](TVMArgs args) {
1136
+ // Use args[1] as return value, args[0] is tcode
1137
+ // Look at RPCWrappedFunc in src/runtime/rpc/rpc_module.cc
1138
+ rpc_chunk_max_size_bytes_ = (int64_t )args[1 ];
1139
+ ICHECK_GT (rpc_chunk_max_size_bytes_, 0 )
1140
+ << " RPC max transfer size is <= 0! (remote value = " << rpc_chunk_max_size_bytes_
1141
+ << " )" ;
1142
+ });
1143
+ }
1144
+ return (uint64_t )rpc_chunk_max_size_bytes_;
1145
+ }
1146
+
1085
1147
std::shared_ptr<RPCEndpoint> endpoint_;
1148
+ int64_t rpc_chunk_max_size_bytes_ = -1 ;
1086
1149
};
1087
1150
1088
1151
std::shared_ptr<RPCSession> CreateClientSession (std::shared_ptr<RPCEndpoint> endpoint) {
1089
1152
return std::make_shared<RPCClientSession>(endpoint);
1090
1153
}
1091
1154
1155
+ uint64_t RemoteCopyCalculatePacketOverheadSize (DLTensor* tensor, RPCCode code, uint64_t nbytes) {
1156
+ uint64_t shape_bytes = tensor->ndim * sizeof (int64_t );
1157
+ uint64_t to_data = reinterpret_cast <uint64_t >(static_cast <uint8_t *>(tensor->data ));
1158
+ uint64_t overhead = sizeof (code) + sizeof (to_data) + sizeof (tensor->device ) +
1159
+ sizeof (tensor->ndim ) + sizeof (tensor->dtype ) + sizeof (tensor->byte_offset ) +
1160
+ shape_bytes + sizeof (nbytes);
1161
+ return overhead;
1162
+ }
1163
+
1092
1164
} // namespace runtime
1093
1165
} // namespace tvm
0 commit comments