Skip to content

Commit 676a4a9

Browse files
Yeolarwesm
authored andcommitted
ARROW-1255: [Plasma] Fix typo in plasma protocol; add DCHECK for ReadXXX in plasma protocol.
Related to #878, add DCHECK for ReadXXX. Author: Yeolar <yeolar@gmail.com> Closes #887 from Yeolar/fixtypo_plasma_and_add_DCHECK and squashes the following commits: 4df63bc [Yeolar] clang-format for too long lines. 143d254 [Yeolar] Update, compile passed. 09ff103 [Yeolar] Fix conflicts. b951d8d [Yeolar] Merge pull request #1 from apache/master ebae611 [Yeolar] Fix typo in plasma protocol; add DCHECK for ReadXXX in plasma protocol.
1 parent 2eeaa95 commit 676a4a9

File tree

5 files changed

+143
-96
lines changed

5 files changed

+143
-96
lines changed

cpp/src/plasma/client.cc

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
152152
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer));
153153
ObjectID id;
154154
PlasmaObject object;
155-
RETURN_NOT_OK(ReadCreateReply(buffer.data(), &id, &object));
155+
RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, &object));
156156
// If the CreateReply included an error, then the store will not send a file
157157
// descriptor.
158158
int fd = recv_fd(store_conn_);
@@ -227,7 +227,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
227227
std::vector<ObjectID> received_object_ids(num_objects);
228228
std::vector<PlasmaObject> object_data(num_objects);
229229
PlasmaObject* object;
230-
RETURN_NOT_OK(ReadGetReply(buffer.data(), received_object_ids.data(),
230+
RETURN_NOT_OK(ReadGetReply(buffer.data(), buffer.size(), received_object_ids.data(),
231231
object_data.data(), num_objects));
232232

233233
for (int i = 0; i < num_objects; ++i) {
@@ -356,7 +356,8 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
356356
std::vector<uint8_t> buffer;
357357
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, &buffer));
358358
ObjectID object_id2;
359-
RETURN_NOT_OK(ReadContainsReply(buffer.data(), &object_id2, has_object));
359+
RETURN_NOT_OK(
360+
ReadContainsReply(buffer.data(), buffer.size(), &object_id2, has_object));
360361
}
361362
return Status::OK();
362363
}
@@ -451,7 +452,7 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
451452
std::vector<uint8_t> buffer;
452453
int64_t type;
453454
RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer));
454-
return ReadEvictReply(buffer.data(), num_bytes_evicted);
455+
return ReadEvictReply(buffer.data(), buffer.size(), num_bytes_evicted);
455456
}
456457

457458
Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) {
@@ -524,7 +525,7 @@ Status PlasmaClient::Connect(const std::string& store_socket_name,
524525
RETURN_NOT_OK(SendConnectRequest(store_conn_));
525526
std::vector<uint8_t> buffer;
526527
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaConnectReply, &buffer));
527-
RETURN_NOT_OK(ReadConnectReply(buffer.data(), &store_capacity_));
528+
RETURN_NOT_OK(ReadConnectReply(buffer.data(), buffer.size(), &store_capacity_));
528529
return Status::OK();
529530
}
530531

@@ -564,7 +565,7 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {
564565
std::vector<uint8_t> buffer;
565566
RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaStatusReply, &buffer));
566567
ObjectID id;
567-
RETURN_NOT_OK(ReadStatusReply(buffer.data(), &id, object_status, 1));
568+
RETURN_NOT_OK(ReadStatusReply(buffer.data(), buffer.size(), &id, object_status, 1));
568569
ARROW_CHECK(object_id == id);
569570
return Status::OK();
570571
}
@@ -586,7 +587,8 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req
586587
num_ready_objects, timeout_ms));
587588
std::vector<uint8_t> buffer;
588589
RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, &buffer));
589-
RETURN_NOT_OK(ReadWaitReply(buffer.data(), object_requests, &num_ready_objects));
590+
RETURN_NOT_OK(
591+
ReadWaitReply(buffer.data(), buffer.size(), object_requests, &num_ready_objects));
590592

591593
*num_objects_ready = 0;
592594
for (int i = 0; i < num_object_requests; ++i) {

cpp/src/plasma/protocol.cc

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,11 @@ Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
6262
return PlasmaSend(sock, MessageType_PlasmaCreateRequest, &fbb, message);
6363
}
6464

65-
Status ReadCreateRequest(uint8_t* data, ObjectID* object_id, int64_t* data_size,
66-
int64_t* metadata_size) {
65+
Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
66+
int64_t* data_size, int64_t* metadata_size) {
6767
DCHECK(data);
6868
auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data);
69+
DCHECK(verify_flatbuffer(message, data, size));
6970
*data_size = message->data_size();
7071
*metadata_size = message->metadata_size();
7172
*object_id = ObjectID::from_binary(message->object_id()->str());
@@ -83,9 +84,11 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
8384
return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message);
8485
}
8586

86-
Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object) {
87+
Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
88+
PlasmaObject* object) {
8789
DCHECK(data);
8890
auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data);
91+
DCHECK(verify_flatbuffer(message, data, size));
8992
*object_id = ObjectID::from_binary(message->object_id()->str());
9093
object->handle.store_fd = message->plasma_object()->segment_index();
9194
object->handle.mmap_size = message->plasma_object()->mmap_size();
@@ -106,9 +109,11 @@ Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) {
106109
return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message);
107110
}
108111

109-
Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest) {
112+
Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
113+
unsigned char* digest) {
110114
DCHECK(data);
111115
auto message = flatbuffers::GetRoot<PlasmaSealRequest>(data);
116+
DCHECK(verify_flatbuffer(message, data, size));
112117
*object_id = ObjectID::from_binary(message->object_id()->str());
113118
ARROW_CHECK(message->digest()->size() == kDigestSize);
114119
memcpy(digest, message->digest()->data(), kDigestSize);
@@ -122,9 +127,10 @@ Status SendSealReply(int sock, ObjectID object_id, int error) {
122127
return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message);
123128
}
124129

125-
Status ReadSealReply(uint8_t* data, ObjectID* object_id) {
130+
Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) {
126131
DCHECK(data);
127132
auto message = flatbuffers::GetRoot<PlasmaSealReply>(data);
133+
DCHECK(verify_flatbuffer(message, data, size));
128134
*object_id = ObjectID::from_binary(message->object_id()->str());
129135
return plasma_error_status(message->error());
130136
}
@@ -133,13 +139,14 @@ Status ReadSealReply(uint8_t* data, ObjectID* object_id) {
133139

134140
Status SendReleaseRequest(int sock, ObjectID object_id) {
135141
flatbuffers::FlatBufferBuilder fbb;
136-
auto message = CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()));
142+
auto message = CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary()));
137143
return PlasmaSend(sock, MessageType_PlasmaReleaseRequest, &fbb, message);
138144
}
139145

140-
Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) {
146+
Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) {
141147
DCHECK(data);
142148
auto message = flatbuffers::GetRoot<PlasmaReleaseRequest>(data);
149+
DCHECK(verify_flatbuffer(message, data, size));
143150
*object_id = ObjectID::from_binary(message->object_id()->str());
144151
return Status::OK();
145152
}
@@ -151,9 +158,10 @@ Status SendReleaseReply(int sock, ObjectID object_id, int error) {
151158
return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message);
152159
}
153160

154-
Status ReadReleaseReply(uint8_t* data, ObjectID* object_id) {
161+
Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) {
155162
DCHECK(data);
156163
auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
164+
DCHECK(verify_flatbuffer(message, data, size));
157165
*object_id = ObjectID::from_binary(message->object_id()->str());
158166
return plasma_error_status(message->error());
159167
}
@@ -166,9 +174,10 @@ Status SendDeleteRequest(int sock, ObjectID object_id) {
166174
return PlasmaSend(sock, MessageType_PlasmaDeleteRequest, &fbb, message);
167175
}
168176

169-
Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) {
177+
Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id) {
170178
DCHECK(data);
171179
auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
180+
DCHECK(verify_flatbuffer(message, data, size));
172181
*object_id = ObjectID::from_binary(message->object_id()->str());
173182
return Status::OK();
174183
}
@@ -180,9 +189,10 @@ Status SendDeleteReply(int sock, ObjectID object_id, int error) {
180189
return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message);
181190
}
182191

183-
Status ReadDeleteReply(uint8_t* data, ObjectID* object_id) {
192+
Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id) {
184193
DCHECK(data);
185194
auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
195+
DCHECK(verify_flatbuffer(message, data, size));
186196
*object_id = ObjectID::from_binary(message->object_id()->str());
187197
return plasma_error_status(message->error());
188198
}
@@ -196,9 +206,11 @@ Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objec
196206
return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message);
197207
}
198208

199-
Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects) {
209+
Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[],
210+
int64_t num_objects) {
200211
DCHECK(data);
201212
auto message = flatbuffers::GetRoot<PlasmaStatusRequest>(data);
213+
DCHECK(verify_flatbuffer(message, data, size));
202214
for (uoffset_t i = 0; i < num_objects; ++i) {
203215
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
204216
}
@@ -214,16 +226,18 @@ Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[],
214226
return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message);
215227
}
216228

217-
int64_t ReadStatusReply_num_objects(uint8_t* data) {
229+
int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size) {
218230
DCHECK(data);
219231
auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
232+
DCHECK(verify_flatbuffer(message, data, size));
220233
return message->object_ids()->size();
221234
}
222235

223-
Status ReadStatusReply(uint8_t* data, ObjectID object_ids[], int object_status[],
224-
int64_t num_objects) {
236+
Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[],
237+
int object_status[], int64_t num_objects) {
225238
DCHECK(data);
226239
auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
240+
DCHECK(verify_flatbuffer(message, data, size));
227241
for (uoffset_t i = 0; i < num_objects; ++i) {
228242
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
229243
}
@@ -241,9 +255,10 @@ Status SendContainsRequest(int sock, ObjectID object_id) {
241255
return PlasmaSend(sock, MessageType_PlasmaContainsRequest, &fbb, message);
242256
}
243257

244-
Status ReadContainsRequest(uint8_t* data, ObjectID* object_id) {
258+
Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) {
245259
DCHECK(data);
246260
auto message = flatbuffers::GetRoot<PlasmaContainsRequest>(data);
261+
DCHECK(verify_flatbuffer(message, data, size));
247262
*object_id = ObjectID::from_binary(message->object_id()->str());
248263
return Status::OK();
249264
}
@@ -255,9 +270,11 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
255270
return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message);
256271
}
257272

258-
Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object) {
273+
Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
274+
bool* has_object) {
259275
DCHECK(data);
260276
auto message = flatbuffers::GetRoot<PlasmaContainsReply>(data);
277+
DCHECK(verify_flatbuffer(message, data, size));
261278
*object_id = ObjectID::from_binary(message->object_id()->str());
262279
*has_object = message->has_object();
263280
return Status::OK();
@@ -279,9 +296,10 @@ Status SendConnectReply(int sock, int64_t memory_capacity) {
279296
return PlasmaSend(sock, MessageType_PlasmaConnectReply, &fbb, message);
280297
}
281298

282-
Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity) {
299+
Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity) {
283300
DCHECK(data);
284301
auto message = flatbuffers::GetRoot<PlasmaConnectReply>(data);
302+
DCHECK(verify_flatbuffer(message, data, size));
285303
*memory_capacity = message->memory_capacity();
286304
return Status::OK();
287305
}
@@ -294,9 +312,10 @@ Status SendEvictRequest(int sock, int64_t num_bytes) {
294312
return PlasmaSend(sock, MessageType_PlasmaEvictRequest, &fbb, message);
295313
}
296314

297-
Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes) {
315+
Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes) {
298316
DCHECK(data);
299317
auto message = flatbuffers::GetRoot<PlasmaEvictRequest>(data);
318+
DCHECK(verify_flatbuffer(message, data, size));
300319
*num_bytes = message->num_bytes();
301320
return Status::OK();
302321
}
@@ -307,9 +326,10 @@ Status SendEvictReply(int sock, int64_t num_bytes) {
307326
return PlasmaSend(sock, MessageType_PlasmaEvictReply, &fbb, message);
308327
}
309328

310-
Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) {
329+
Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes) {
311330
DCHECK(data);
312331
auto message = flatbuffers::GetRoot<PlasmaEvictReply>(data);
332+
DCHECK(verify_flatbuffer(message, data, size));
313333
num_bytes = message->num_bytes();
314334
return Status::OK();
315335
}
@@ -324,10 +344,11 @@ Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects,
324344
return PlasmaSend(sock, MessageType_PlasmaGetRequest, &fbb, message);
325345
}
326346

327-
Status ReadGetRequest(uint8_t* data, std::vector<ObjectID>& object_ids,
347+
Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids,
328348
int64_t* timeout_ms) {
329349
DCHECK(data);
330350
auto message = flatbuffers::GetRoot<PlasmaGetRequest>(data);
351+
DCHECK(verify_flatbuffer(message, data, size));
331352
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
332353
auto object_id = message->object_ids()->Get(i)->str();
333354
object_ids.push_back(ObjectID::from_binary(object_id));
@@ -355,10 +376,11 @@ Status SendGetReply(
355376
return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message);
356377
}
357378

358-
Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[],
359-
int64_t num_objects) {
379+
Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
380+
PlasmaObject plasma_objects[], int64_t num_objects) {
360381
DCHECK(data);
361382
auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
383+
DCHECK(verify_flatbuffer(message, data, size));
362384
for (uoffset_t i = 0; i < num_objects; ++i) {
363385
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
364386
}
@@ -383,9 +405,10 @@ Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_object
383405
return PlasmaSend(sock, MessageType_PlasmaFetchRequest, &fbb, message);
384406
}
385407

386-
Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids) {
408+
Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids) {
387409
DCHECK(data);
388410
auto message = flatbuffers::GetRoot<PlasmaFetchRequest>(data);
411+
DCHECK(verify_flatbuffer(message, data, size));
389412
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
390413
object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str()));
391414
}
@@ -410,10 +433,11 @@ Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_re
410433
return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message);
411434
}
412435

413-
Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
436+
Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
414437
int64_t* timeout_ms, int* num_ready_objects) {
415438
DCHECK(data);
416439
auto message = flatbuffers::GetRoot<PlasmaWaitRequest>(data);
440+
DCHECK(verify_flatbuffer(message, data, size));
417441
*num_ready_objects = message->num_ready_objects();
418442
*timeout_ms = message->timeout();
419443

@@ -443,11 +467,12 @@ Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
443467
return PlasmaSend(sock, MessageType_PlasmaWaitReply, &fbb, message);
444468
}
445469

446-
Status ReadWaitReply(uint8_t* data, ObjectRequest object_requests[],
470+
Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[],
447471
int* num_ready_objects) {
448472
DCHECK(data);
449473

450474
auto message = flatbuffers::GetRoot<PlasmaWaitReply>(data);
475+
DCHECK(verify_flatbuffer(message, data, size));
451476
*num_ready_objects = message->num_ready_objects();
452477
for (int i = 0; i < *num_ready_objects; i++) {
453478
object_requests[i].object_id =
@@ -475,9 +500,11 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po
475500
return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message);
476501
}
477502

478-
Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port) {
503+
Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address,
504+
int* port) {
479505
DCHECK(data);
480506
auto message = flatbuffers::GetRoot<PlasmaDataRequest>(data);
507+
DCHECK(verify_flatbuffer(message, data, size));
481508
DCHECK(message->object_id()->size() == sizeof(ObjectID));
482509
*object_id = ObjectID::from_binary(message->object_id()->str());
483510
*address = strdup(message->address()->c_str());
@@ -493,10 +520,11 @@ Status SendDataReply(int sock, ObjectID object_id, int64_t object_size,
493520
return PlasmaSend(sock, MessageType_PlasmaDataReply, &fbb, message);
494521
}
495522

496-
Status ReadDataReply(uint8_t* data, ObjectID* object_id, int64_t* object_size,
497-
int64_t* metadata_size) {
523+
Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id,
524+
int64_t* object_size, int64_t* metadata_size) {
498525
DCHECK(data);
499526
auto message = flatbuffers::GetRoot<PlasmaDataReply>(data);
527+
DCHECK(verify_flatbuffer(message, data, size));
500528
*object_id = ObjectID::from_binary(message->object_id()->str());
501529
*object_size = (int64_t)message->object_size();
502530
*metadata_size = (int64_t)message->metadata_size();

0 commit comments

Comments
 (0)