2
2
3
3
#include " ray/gcs/redis_context.h"
4
4
5
+ static void GetRedisShards (redisContext *context, std::vector<std::string> &addresses,
6
+ std::vector<int > &ports) {
7
+ // Get the total number of Redis shards in the system.
8
+ int num_attempts = 0 ;
9
+ redisReply *reply = nullptr ;
10
+ while (num_attempts < RayConfig::instance ().redis_db_connect_retries ()) {
11
+ // Try to read the number of Redis shards from the primary shard. If the
12
+ // entry is present, exit.
13
+ reply = reinterpret_cast <redisReply *>(redisCommand (context, " GET NumRedisShards" ));
14
+ if (reply->type != REDIS_REPLY_NIL) {
15
+ break ;
16
+ }
17
+
18
+ // Sleep for a little, and try again if the entry isn't there yet. */
19
+ freeReplyObject (reply);
20
+ usleep (RayConfig::instance ().redis_db_connect_wait_milliseconds () * 1000 );
21
+ num_attempts++;
22
+ }
23
+ RAY_CHECK (num_attempts < RayConfig::instance ().redis_db_connect_retries ())
24
+ << " No entry found for NumRedisShards" ;
25
+ RAY_CHECK (reply->type == REDIS_REPLY_STRING) << " Expected string, found Redis type "
26
+ << reply->type << " for NumRedisShards" ;
27
+ int num_redis_shards = atoi (reply->str );
28
+ RAY_CHECK (num_redis_shards >= 1 ) << " Expected at least one Redis shard, "
29
+ << " found " << num_redis_shards;
30
+ freeReplyObject (reply);
31
+
32
+ // Get the addresses of all of the Redis shards.
33
+ num_attempts = 0 ;
34
+ while (num_attempts < RayConfig::instance ().redis_db_connect_retries ()) {
35
+ // Try to read the Redis shard locations from the primary shard. If we find
36
+ // that all of them are present, exit.
37
+ reply =
38
+ reinterpret_cast <redisReply *>(redisCommand (context, " LRANGE RedisShards 0 -1" ));
39
+ if (static_cast <int >(reply->elements ) == num_redis_shards) {
40
+ break ;
41
+ }
42
+
43
+ // Sleep for a little, and try again if not all Redis shard addresses have
44
+ // been added yet.
45
+ freeReplyObject (reply);
46
+ usleep (RayConfig::instance ().redis_db_connect_wait_milliseconds () * 1000 );
47
+ num_attempts++;
48
+ }
49
+ RAY_CHECK (num_attempts < RayConfig::instance ().redis_db_connect_retries ())
50
+ << " Expected " << num_redis_shards << " Redis shard addresses, found "
51
+ << reply->elements ;
52
+
53
+ // Parse the Redis shard addresses.
54
+ for (size_t i = 0 ; i < reply->elements ; ++i) {
55
+ // Parse the shard addresses and ports.
56
+ RAY_CHECK (reply->element [i]->type == REDIS_REPLY_STRING);
57
+ std::string addr;
58
+ std::stringstream ss (reply->element [i]->str );
59
+ getline (ss, addr, ' :' );
60
+ addresses.push_back (addr);
61
+ int port;
62
+ ss >> port;
63
+ ports.push_back (port);
64
+ }
65
+ freeReplyObject (reply);
66
+ }
67
+
5
68
namespace ray {
6
69
7
70
namespace gcs {
8
71
9
- AsyncGcsClient::AsyncGcsClient (const ClientID &client_id, CommandType command_type) {
10
- context_ = std::make_shared<RedisContext>();
72
+ AsyncGcsClient::AsyncGcsClient (const std::string &address, int port,
73
+ const ClientID &client_id, CommandType command_type,
74
+ bool is_test_client = false ) {
11
75
primary_context_ = std::make_shared<RedisContext>();
12
- client_table_.reset (new ClientTable (primary_context_, this , client_id));
13
- object_table_.reset (new ObjectTable (context_, this , command_type));
14
- actor_table_.reset (new ActorTable (context_, this ));
15
- task_table_.reset (new TaskTable (context_, this , command_type));
16
- raylet_task_table_.reset (new raylet::TaskTable (context_, this , command_type));
17
- task_reconstruction_log_.reset (new TaskReconstructionLog (context_, this ));
18
- task_lease_table_.reset (new TaskLeaseTable (context_, this ));
19
- heartbeat_table_.reset (new HeartbeatTable (context_, this ));
20
- driver_table_.reset (new DriverTable (primary_context_, this ));
21
- error_table_.reset (new ErrorTable (primary_context_, this ));
22
- profile_table_.reset (new ProfileTable (context_, this ));
76
+
77
+ RAY_CHECK_OK (primary_context_->Connect (address, port, /* sharding=*/ true ));
78
+
79
+ if (!is_test_client) {
80
+ // Moving sharding into constructor defaultly means that sharding = true.
81
+ // This design decision may worth a look.
82
+ std::vector<std::string> addresses;
83
+ std::vector<int > ports;
84
+ GetRedisShards (primary_context_->sync_context (), addresses, ports);
85
+ if (addresses.size () == 0 || ports.size () == 0 ) {
86
+ addresses.push_back (address);
87
+ ports.push_back (port);
88
+ }
89
+
90
+ // Populate shard_contexts.
91
+ for (size_t i = 0 ; i < addresses.size (); ++i) {
92
+ shard_contexts_.push_back (std::make_shared<RedisContext>());
93
+ }
94
+
95
+ RAY_CHECK (shard_contexts_.size () == addresses.size ());
96
+ for (size_t i = 0 ; i < addresses.size (); ++i) {
97
+ RAY_CHECK_OK (
98
+ shard_contexts_[i]->Connect (addresses[i], ports[i], /* sharding=*/ true ));
99
+ }
100
+ } else {
101
+ shard_contexts_.push_back (std::make_shared<RedisContext>());
102
+ RAY_CHECK_OK (shard_contexts_[0 ]->Connect (address, port, /* sharding=*/ true ));
103
+ }
104
+
105
+ client_table_.reset (new ClientTable ({primary_context_}, this , client_id));
106
+ error_table_.reset (new ErrorTable ({primary_context_}, this ));
107
+ driver_table_.reset (new DriverTable ({primary_context_}, this ));
108
+ // Tables below would be sharded.
109
+ object_table_.reset (new ObjectTable (shard_contexts_, this , command_type));
110
+ actor_table_.reset (new ActorTable (shard_contexts_, this ));
111
+ task_table_.reset (new TaskTable (shard_contexts_, this , command_type));
112
+ raylet_task_table_.reset (new raylet::TaskTable (shard_contexts_, this , command_type));
113
+ task_reconstruction_log_.reset (new TaskReconstructionLog (shard_contexts_, this ));
114
+ task_lease_table_.reset (new TaskLeaseTable (shard_contexts_, this ));
115
+ heartbeat_table_.reset (new HeartbeatTable (shard_contexts_, this ));
116
+ profile_table_.reset (new ProfileTable (shard_contexts_, this ));
23
117
command_type_ = command_type;
118
+
119
+ // TODO(swang): Call the client table's Connect() method here. To do this,
120
+ // we need to make sure that we are attached to an event loop first. This
121
+ // currently isn't possible because the aeEventLoop, which we use for
122
+ // testing, requires us to connect to Redis first.
24
123
}
25
124
26
125
#if RAY_USE_NEW_GCS
27
126
// Use of kChain currently only applies to Table::Add which affects only the
28
127
// task table, and when RAY_USE_NEW_GCS is set at compile time.
29
- AsyncGcsClient::AsyncGcsClient (const ClientID &client_id)
30
- : AsyncGcsClient(client_id, CommandType::kChain ) {}
128
+ AsyncGcsClient::AsyncGcsClient (const std::string &address, int port,
129
+ const ClientID &client_id, bool is_test_client = false )
130
+ : AsyncGcsClient(address, port, client_id, CommandType::kChain , is_test_client) {}
31
131
#else
32
- AsyncGcsClient::AsyncGcsClient (const ClientID &client_id)
33
- : AsyncGcsClient(client_id, CommandType::kRegular ) {}
132
+ AsyncGcsClient::AsyncGcsClient (const std::string &address, int port,
133
+ const ClientID &client_id, bool is_test_client = false )
134
+ : AsyncGcsClient(address, port, client_id, CommandType::kRegular , is_test_client) {}
34
135
#endif // RAY_USE_NEW_GCS
35
136
36
- AsyncGcsClient::AsyncGcsClient (CommandType command_type)
37
- : AsyncGcsClient(ClientID::from_random(), command_type) {}
137
+ AsyncGcsClient::AsyncGcsClient (const std::string &address, int port,
138
+ CommandType command_type)
139
+ : AsyncGcsClient(address, port, ClientID::from_random(), command_type) {}
38
140
39
- AsyncGcsClient::AsyncGcsClient () : AsyncGcsClient(ClientID::from_random()) {}
141
+ AsyncGcsClient::AsyncGcsClient (const std::string &address, int port,
142
+ CommandType command_type, bool is_test_client)
143
+ : AsyncGcsClient(address, port, ClientID::from_random(), command_type,
144
+ is_test_client) {}
40
145
41
- Status AsyncGcsClient::Connect (const std::string &address, int port, bool sharding) {
42
- RAY_RETURN_NOT_OK (context_->Connect (address, port, sharding));
43
- RAY_RETURN_NOT_OK (primary_context_->Connect (address, port, /* sharding=*/ false ));
44
- // TODO(swang): Call the client table's Connect() method here. To do this,
45
- // we need to make sure that we are attached to an event loop first. This
46
- // currently isn't possible because the aeEventLoop, which we use for
47
- // testing, requires us to connect to Redis first.
48
- return Status::OK ();
49
- }
146
+ AsyncGcsClient::AsyncGcsClient (const std::string &address, int port)
147
+ : AsyncGcsClient(address, port, ClientID::from_random()) {}
148
+
149
+ AsyncGcsClient::AsyncGcsClient (const std::string &address, int port, bool is_test_client)
150
+ : AsyncGcsClient(address, port, ClientID::from_random(), is_test_client) {}
50
151
51
152
Status Attach (plasma::EventLoop &event_loop) {
52
153
// TODO(pcm): Implement this via
@@ -55,9 +156,14 @@ Status Attach(plasma::EventLoop &event_loop) {
55
156
}
56
157
57
158
Status AsyncGcsClient::Attach (boost::asio::io_service &io_service) {
58
- asio_async_client_.reset (new RedisAsioClient (io_service, context_->async_context ()));
59
- asio_subscribe_client_.reset (
60
- new RedisAsioClient (io_service, context_->subscribe_context ()));
159
+ // Take care of sharding contexts.
160
+ RAY_CHECK (shard_asio_async_clients_.empty ()) << " Attach shall be called only once" ;
161
+ for (std::shared_ptr<RedisContext> context : shard_contexts_) {
162
+ shard_asio_async_clients_.emplace_back (
163
+ new RedisAsioClient (io_service, context->async_context ()));
164
+ shard_asio_subscribe_clients_.emplace_back (
165
+ new RedisAsioClient (io_service, context->subscribe_context ()));
166
+ }
61
167
asio_async_auxiliary_client_.reset (
62
168
new RedisAsioClient (io_service, primary_context_->async_context ()));
63
169
asio_subscribe_auxiliary_client_.reset (
0 commit comments