Skip to content

Commit 382b138

Browse files
zhijunfurobertnishihara
authored andcommitted
fix code issues in object manager that are reported by scanning tool (#3649)
Fix some code issues found by code scanning tool: **1. Macro compares unsigned to 0(NO_EFFECT)** CWE570: An unsigned value can never be less than 0 This greater-than-or-equal-to-zero comparison of an unsigned value is always true. "this->create_buffer_state_[object_id].num_seals_remaining >= 0UL". ~/ray/src/ray/object_manager/object_buffer_pool.cc: ray::ObjectBufferPool::SealChunk(const ray::UniqueID &, unsigned long) **2. Inferred misuse of enum(MIXED_ENUMS)** CWE398: An integer expression which was inferred to have an enum type is mixed with a different enum type This case, "static_cast(ray::object_manager::protocol::MessageType::PushRequest)", implies the effective type of "message_type" is "ray::object_manager::protocol::MessageType". ~/ray/src/ray/object_manager/object_manager.cc: ray::ObjectManager::ProcessClientMessage(std::shared_ptr> &, long, const unsigned char *)
1 parent 3df1e1c commit 382b138

File tree

10 files changed

+60
-44
lines changed

10 files changed

+60
-44
lines changed

src/ray/common/client_connection.cc

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,10 @@ void ServerConnection<T>::DoAsyncWrites() {
186186
template <class T>
187187
std::shared_ptr<ClientConnection<T>> ClientConnection<T>::Create(
188188
ClientHandler<T> &client_handler, MessageHandler<T> &message_handler,
189-
boost::asio::basic_stream_socket<T> &&socket, const std::string &debug_label) {
190-
std::shared_ptr<ClientConnection<T>> self(
191-
new ClientConnection(message_handler, std::move(socket), debug_label));
189+
boost::asio::basic_stream_socket<T> &&socket, const std::string &debug_label,
190+
int64_t error_message_type) {
191+
std::shared_ptr<ClientConnection<T>> self(new ClientConnection(
192+
message_handler, std::move(socket), debug_label, error_message_type));
192193
// Let our manager process our new connection.
193194
client_handler(*self);
194195
return self;
@@ -197,10 +198,12 @@ std::shared_ptr<ClientConnection<T>> ClientConnection<T>::Create(
197198
template <class T>
198199
ClientConnection<T>::ClientConnection(MessageHandler<T> &message_handler,
199200
boost::asio::basic_stream_socket<T> &&socket,
200-
const std::string &debug_label)
201+
const std::string &debug_label,
202+
int64_t error_message_type)
201203
: ServerConnection<T>(std::move(socket)),
202204
message_handler_(message_handler),
203-
debug_label_(debug_label) {}
205+
debug_label_(debug_label),
206+
error_message_type_(error_message_type) {}
204207

205208
template <class T>
206209
const ClientID &ClientConnection<T>::GetClientId() {
@@ -230,7 +233,7 @@ template <class T>
230233
void ClientConnection<T>::ProcessMessageHeader(const boost::system::error_code &error) {
231234
if (error) {
232235
// If there was an error, disconnect the client.
233-
read_type_ = static_cast<int64_t>(protocol::MessageType::DisconnectClient);
236+
read_type_ = error_message_type_;
234237
read_length_ = 0;
235238
ProcessMessage(error);
236239
return;
@@ -251,7 +254,7 @@ void ClientConnection<T>::ProcessMessageHeader(const boost::system::error_code &
251254
template <class T>
252255
void ClientConnection<T>::ProcessMessage(const boost::system::error_code &error) {
253256
if (error) {
254-
read_type_ = static_cast<int64_t>(protocol::MessageType::DisconnectClient);
257+
read_type_ = error_message_type_;
255258
}
256259

257260
int64_t start_ms = current_time_ms();

src/ray/common/client_connection.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ class ClientConnection : public ServerConnection<T> {
148148
/// \return std::shared_ptr<ClientConnection>.
149149
static std::shared_ptr<ClientConnection<T>> Create(
150150
ClientHandler<T> &new_client_handler, MessageHandler<T> &message_handler,
151-
boost::asio::basic_stream_socket<T> &&socket, const std::string &debug_label);
151+
boost::asio::basic_stream_socket<T> &&socket, const std::string &debug_label,
152+
int64_t error_message_type);
152153

153154
std::shared_ptr<ClientConnection<T>> shared_ClientConnection_from_this() {
154155
return std::static_pointer_cast<ClientConnection<T>>(shared_from_this());
@@ -169,7 +170,7 @@ class ClientConnection : public ServerConnection<T> {
169170
/// A private constructor for a node client connection.
170171
ClientConnection(MessageHandler<T> &message_handler,
171172
boost::asio::basic_stream_socket<T> &&socket,
172-
const std::string &debug_label);
173+
const std::string &debug_label, int64_t error_message_type);
173174
/// Process an error from the last operation, then process the message
174175
/// header from the client.
175176
void ProcessMessageHeader(const boost::system::error_code &error);
@@ -183,6 +184,8 @@ class ClientConnection : public ServerConnection<T> {
183184
MessageHandler<T> message_handler_;
184185
/// A label used for debug messages.
185186
const std::string debug_label_;
187+
/// The value for disconnect client message.
188+
int64_t error_message_type_;
186189
/// Buffers for the current message being read from the client.
187190
int64_t read_version_;
188191
int64_t read_type_;

src/ray/object_manager/format/object_manager.fbs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ table ObjectInfo {
2727

2828
enum MessageType:int {
2929
ConnectClient = 1,
30+
DisconnectClient,
3031
PushRequest,
3132
PullRequest,
3233
FreeRequest

src/ray/object_manager/object_buffer_pool.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk
150150
CreateChunkState::REFERENCED);
151151
create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::SEALED;
152152
create_buffer_state_[object_id].num_seals_remaining--;
153-
RAY_CHECK(create_buffer_state_[object_id].num_seals_remaining >= 0);
154153
RAY_LOG(DEBUG) << "SealChunk" << object_id << " "
155154
<< create_buffer_state_[object_id].num_seals_remaining;
156155
if (create_buffer_state_[object_id].num_seals_remaining == 0) {

src/ray/object_manager/object_manager.cc

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -707,25 +707,26 @@ void ObjectManager::ProcessNewClient(TcpClientConnection &conn) {
707707

708708
void ObjectManager::ProcessClientMessage(std::shared_ptr<TcpClientConnection> &conn,
709709
int64_t message_type, const uint8_t *message) {
710-
switch (message_type) {
711-
case static_cast<int64_t>(object_manager_protocol::MessageType::PushRequest): {
710+
auto message_type_value =
711+
static_cast<object_manager_protocol::MessageType>(message_type);
712+
switch (message_type_value) {
713+
case object_manager_protocol::MessageType::PushRequest: {
712714
ReceivePushRequest(conn, message);
713715
break;
714716
}
715-
case static_cast<int64_t>(object_manager_protocol::MessageType::PullRequest): {
717+
case object_manager_protocol::MessageType::PullRequest: {
716718
ReceivePullRequest(conn, message);
717719
break;
718720
}
719-
case static_cast<int64_t>(object_manager_protocol::MessageType::ConnectClient): {
721+
case object_manager_protocol::MessageType::ConnectClient: {
720722
ConnectClient(conn, message);
721723
break;
722724
}
723-
case static_cast<int64_t>(object_manager_protocol::MessageType::FreeRequest): {
725+
case object_manager_protocol::MessageType::FreeRequest: {
724726
ReceiveFreeRequest(conn, message);
725727
break;
726728
}
727-
case static_cast<int64_t>(protocol::MessageType::DisconnectClient): {
728-
// TODO(hme): Disconnect without depending on the node manager protocol.
729+
case object_manager_protocol::MessageType::DisconnectClient: {
729730
DisconnectClient(conn, message);
730731
break;
731732
}

src/ray/object_manager/test/object_manager_stress_test.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,10 @@ class MockServer {
7474
object_manager_.ProcessClientMessage(client, message_type, message);
7575
};
7676
// Accept a new local client and dispatch it to the node manager.
77-
auto new_connection =
78-
TcpClientConnection::Create(client_handler, message_handler,
79-
std::move(object_manager_socket_), "object manager");
77+
auto new_connection = TcpClientConnection::Create(
78+
client_handler, message_handler, std::move(object_manager_socket_),
79+
"object manager",
80+
static_cast<int64_t>(object_manager::protocol::MessageType::DisconnectClient));
8081
DoAcceptObjectManager();
8182
}
8283

src/ray/object_manager/test/object_manager_test.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,10 @@ class MockServer {
6565
object_manager_.ProcessClientMessage(client, message_type, message);
6666
};
6767
// Accept a new local client and dispatch it to the node manager.
68-
auto new_connection =
69-
TcpClientConnection::Create(client_handler, message_handler,
70-
std::move(object_manager_socket_), "object manager");
68+
auto new_connection = TcpClientConnection::Create(
69+
client_handler, message_handler, std::move(object_manager_socket_),
70+
"object manager",
71+
static_cast<int64_t>(object_manager::protocol::MessageType::DisconnectClient));
7172
DoAcceptObjectManager();
7273
}
7374

src/ray/raylet/client_connection_test.cc

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,16 @@ namespace raylet {
1313

1414
class ClientConnectionTest : public ::testing::Test {
1515
public:
16-
ClientConnectionTest() : io_service_(), in_(io_service_), out_(io_service_) {
16+
ClientConnectionTest()
17+
: io_service_(), in_(io_service_), out_(io_service_), error_message_type_(1) {
1718
boost::asio::local::connect_pair(in_, out_);
1819
}
1920

2021
protected:
2122
boost::asio::io_service io_service_;
2223
boost::asio::local::stream_protocol::socket in_;
2324
boost::asio::local::stream_protocol::socket out_;
25+
int64_t error_message_type_;
2426
};
2527

2628
TEST_F(ClientConnectionTest, SimpleSyncWrite) {
@@ -37,11 +39,11 @@ TEST_F(ClientConnectionTest, SimpleSyncWrite) {
3739
num_messages += 1;
3840
};
3941

40-
auto conn1 = LocalClientConnection::Create(client_handler, message_handler,
41-
std::move(in_), "conn1");
42+
auto conn1 = LocalClientConnection::Create(
43+
client_handler, message_handler, std::move(in_), "conn1", error_message_type_);
4244

43-
auto conn2 = LocalClientConnection::Create(client_handler, message_handler,
44-
std::move(out_), "conn2");
45+
auto conn2 = LocalClientConnection::Create(
46+
client_handler, message_handler, std::move(out_), "conn2", error_message_type_);
4547

4648
RAY_CHECK_OK(conn1->WriteMessage(0, 5, arr));
4749
RAY_CHECK_OK(conn2->WriteMessage(0, 5, arr));
@@ -83,11 +85,11 @@ TEST_F(ClientConnectionTest, SimpleAsyncWrite) {
8385
}
8486
};
8587

86-
auto writer = LocalClientConnection::Create(client_handler, noop_handler,
87-
std::move(in_), "writer");
88+
auto writer = LocalClientConnection::Create(
89+
client_handler, noop_handler, std::move(in_), "writer", error_message_type_);
8890

8991
reader = LocalClientConnection::Create(client_handler, message_handler, std::move(out_),
90-
"reader");
92+
"reader", error_message_type_);
9193

9294
std::function<void(const ray::Status &)> callback = [](const ray::Status &status) {
9395
RAY_CHECK_OK(status);
@@ -111,8 +113,8 @@ TEST_F(ClientConnectionTest, SimpleAsyncError) {
111113
std::shared_ptr<LocalClientConnection> client, int64_t message_type,
112114
const uint8_t *message) {};
113115

114-
auto writer = LocalClientConnection::Create(client_handler, noop_handler,
115-
std::move(in_), "writer");
116+
auto writer = LocalClientConnection::Create(
117+
client_handler, noop_handler, std::move(in_), "writer", error_message_type_);
116118

117119
std::function<void(const ray::Status &)> callback = [](const ray::Status &status) {
118120
ASSERT_TRUE(!status.ok());
@@ -133,8 +135,8 @@ TEST_F(ClientConnectionTest, CallbackWithSharedRefDoesNotLeakConnection) {
133135
std::shared_ptr<LocalClientConnection> client, int64_t message_type,
134136
const uint8_t *message) {};
135137

136-
auto writer = LocalClientConnection::Create(client_handler, noop_handler,
137-
std::move(in_), "writer");
138+
auto writer = LocalClientConnection::Create(
139+
client_handler, noop_handler, std::move(in_), "writer", error_message_type_);
138140

139141
std::function<void(const ray::Status &)> callback =
140142
[writer](const ray::Status &status) {

src/ray/raylet/raylet.cc

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ void Raylet::HandleAcceptNodeManager(const boost::system::error_code &error) {
101101
};
102102
// Accept a new TCP client and dispatch it to the node manager.
103103
auto new_connection = TcpClientConnection::Create(
104-
client_handler, message_handler, std::move(node_manager_socket_), "node manager");
104+
client_handler, message_handler, std::move(node_manager_socket_), "node manager",
105+
static_cast<int64_t>(protocol::MessageType::DisconnectClient));
105106
}
106107
// We're ready to accept another client.
107108
DoAcceptNodeManager();
@@ -122,9 +123,10 @@ void Raylet::HandleAcceptObjectManager(const boost::system::error_code &error) {
122123
object_manager_.ProcessClientMessage(client, message_type, message);
123124
};
124125
// Accept a new TCP client and dispatch it to the node manager.
125-
auto new_connection =
126-
TcpClientConnection::Create(client_handler, message_handler,
127-
std::move(object_manager_socket_), "object manager");
126+
auto new_connection = TcpClientConnection::Create(
127+
client_handler, message_handler, std::move(object_manager_socket_),
128+
"object manager",
129+
static_cast<int64_t>(object_manager::protocol::MessageType::DisconnectClient));
128130
DoAcceptObjectManager();
129131
}
130132

@@ -144,8 +146,9 @@ void Raylet::HandleAccept(const boost::system::error_code &error) {
144146
node_manager_.ProcessClientMessage(client, message_type, message);
145147
};
146148
// Accept a new local client and dispatch it to the node manager.
147-
auto new_connection = LocalClientConnection::Create(client_handler, message_handler,
148-
std::move(socket_), "worker");
149+
auto new_connection = LocalClientConnection::Create(
150+
client_handler, message_handler, std::move(socket_), "worker",
151+
static_cast<int64_t>(protocol::MessageType::DisconnectClient));
149152
}
150153
// We're ready to accept another client.
151154
DoAccept();

src/ray/raylet/worker_pool_test.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class WorkerPoolMock : public WorkerPool {
3434

3535
class WorkerPoolTest : public ::testing::Test {
3636
public:
37-
WorkerPoolTest() : worker_pool_(), io_service_() {}
37+
WorkerPoolTest() : worker_pool_(), io_service_(), error_message_type_(1) {}
3838

3939
std::shared_ptr<Worker> CreateWorker(pid_t pid,
4040
const Language &language = Language::PYTHON) {
@@ -46,14 +46,16 @@ class WorkerPoolTest : public ::testing::Test {
4646
HandleMessage(client, message_type, message);
4747
};
4848
boost::asio::local::stream_protocol::socket socket(io_service_);
49-
auto client = LocalClientConnection::Create(client_handler, message_handler,
50-
std::move(socket), "worker");
49+
auto client =
50+
LocalClientConnection::Create(client_handler, message_handler, std::move(socket),
51+
"worker", error_message_type_);
5152
return std::shared_ptr<Worker>(new Worker(pid, language, client));
5253
}
5354

5455
protected:
5556
WorkerPoolMock worker_pool_;
5657
boost::asio::io_service io_service_;
58+
int64_t error_message_type_;
5759

5860
private:
5961
void HandleNewClient(LocalClientConnection &){};

0 commit comments

Comments
 (0)