-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrate credis with Ray & route task table entries into credis. #1841
Merged
robertnishihara
merged 18 commits into
ray-project:master
from
concretevitamin:integrate-credis
May 25, 2018
Merged
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
6e2c2e8
Integrate credis with Ray: route task table entries into credis.
concretevitamin aadee29
Fix client_test: qualify chain tests under flag; start correct server
concretevitamin cd1d28a
Refactor so that gcs::CommandType is propagated per Client -> Table
concretevitamin 0380191
Address all other comments
concretevitamin f951b6e
Address comments, fix tests
concretevitamin 0ebe563
Fix credis_test.py
concretevitamin bc5df2c
A bunch of script changes, load credis when on
concretevitamin f73cc73
Fix tests
concretevitamin b308290
Skip sharded test
concretevitamin 12f8b34
Fix stuff that went bad during rebase
concretevitamin 88714c1
Address comments
concretevitamin 6460024
Cleanups to address comments
concretevitamin 13f0d3a
Fix test
concretevitamin 0a79cca
Address comments
concretevitamin 0ce22b5
Fixes
concretevitamin a27e235
Fix retried port not being actually used
concretevitamin 321c9f5
Seems passing now.
concretevitamin c5e69f6
Use clang-format-3.8
concretevitamin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,27 +3,28 @@ | |
from __future__ import print_function | ||
|
||
import binascii | ||
from collections import namedtuple, OrderedDict | ||
from datetime import datetime | ||
import json | ||
import os | ||
import psutil | ||
import pyarrow | ||
import random | ||
import redis | ||
import resource | ||
import shutil | ||
import signal | ||
import socket | ||
import subprocess | ||
import sys | ||
import time | ||
import threading | ||
import time | ||
from collections import OrderedDict, namedtuple | ||
from datetime import datetime | ||
|
||
import psutil | ||
import redis | ||
|
||
import pyarrow | ||
# Ray modules | ||
import ray.global_scheduler as global_scheduler | ||
import ray.local_scheduler | ||
import ray.plasma | ||
import ray.global_scheduler as global_scheduler | ||
|
||
PROCESS_TYPE_MONITOR = "monitor" | ||
PROCESS_TYPE_LOG_MONITOR = "log_monitor" | ||
|
@@ -63,6 +64,7 @@ | |
"core/src/common/redis_module/libray_redis_module.so") | ||
|
||
# Location of the credis server and modules. | ||
# credis will be enabled if the environment variable RAY_USE_NEW_GCS is set. | ||
CREDIS_EXECUTABLE = os.path.join( | ||
os.path.abspath(os.path.dirname(__file__)), | ||
"core/src/credis/redis/src/redis-server") | ||
|
@@ -393,81 +395,15 @@ def check_version_info(redis_client): | |
print(error_message) | ||
|
||
|
||
def start_credis(node_ip_address, | ||
redis_address, | ||
port=None, | ||
redirect_output=False, | ||
cleanup=True): | ||
"""Start the credis global state store. | ||
|
||
Credis is a chain replicated reliable redis store. It consists | ||
of one master process that acts as a controller and a number of | ||
chain members (currently two, the head and the tail). | ||
|
||
Args: | ||
node_ip_address: The IP address of the current node. This is only used | ||
for recording the log filenames in Redis. | ||
redis_address (str): The IP address and port of the primary redis | ||
server. | ||
port (int): If provided, the primary Redis shard will be started on | ||
this port. | ||
redirect_output (bool): True if output should be redirected to a file | ||
and false otherwise. | ||
cleanup (bool): True if using Ray in local mode. If cleanup is true, | ||
then all Redis processes started by this method will be killed by | ||
services.cleanup() when the Python process that imported services | ||
exits. | ||
|
||
Returns: | ||
The address (ip_address:port) of the credis master process. | ||
""" | ||
|
||
components = ["credis_master", "credis_head", "credis_tail"] | ||
modules = [ | ||
CREDIS_MASTER_MODULE, CREDIS_MEMBER_MODULE, CREDIS_MEMBER_MODULE | ||
] | ||
ports = [] | ||
|
||
for i, component in enumerate(components): | ||
stdout_file, stderr_file = new_log_files(component, redirect_output) | ||
|
||
new_port, _ = start_redis_instance( | ||
node_ip_address=node_ip_address, | ||
port=port, | ||
stdout_file=stdout_file, | ||
stderr_file=stderr_file, | ||
cleanup=cleanup, | ||
module=modules[i], | ||
executable=CREDIS_EXECUTABLE) | ||
|
||
ports.append(new_port) | ||
|
||
[master_port, head_port, tail_port] = ports | ||
|
||
# Connect the members to the master | ||
|
||
master_client = redis.StrictRedis(host=node_ip_address, port=master_port) | ||
master_client.execute_command("MASTER.ADD", node_ip_address, head_port) | ||
master_client.execute_command("MASTER.ADD", node_ip_address, tail_port) | ||
|
||
credis_address = address(node_ip_address, master_port) | ||
|
||
# Register credis master in redis | ||
redis_ip_address, redis_port = redis_address.split(":") | ||
redis_client = redis.StrictRedis(host=redis_ip_address, port=redis_port) | ||
redis_client.set("credis_address", credis_address) | ||
|
||
return credis_address | ||
|
||
|
||
def start_redis(node_ip_address, | ||
port=None, | ||
redis_shard_ports=None, | ||
num_redis_shards=1, | ||
redis_max_clients=None, | ||
redirect_output=False, | ||
redirect_worker_output=False, | ||
cleanup=True): | ||
cleanup=True, | ||
use_credis=None): | ||
"""Start the Redis global state store. | ||
|
||
Args: | ||
|
@@ -491,6 +427,9 @@ def start_redis(node_ip_address, | |
then all Redis processes started by this method will be killed by | ||
services.cleanup() when the Python process that imported services | ||
exits. | ||
use_credis: If True, additionally load the chain-replicated libraries | ||
into the redis servers. Defaults to None, which means its value is | ||
set by the presence of "RAY_USE_NEW_GCS" in os.environ. | ||
|
||
Returns: | ||
A tuple of the address for the primary Redis shard and a list of | ||
|
@@ -505,62 +444,106 @@ def start_redis(node_ip_address, | |
raise Exception("The number of Redis shard ports does not match the " | ||
"number of Redis shards.") | ||
|
||
assigned_port, _ = start_redis_instance( | ||
node_ip_address=node_ip_address, | ||
port=port, | ||
redis_max_clients=redis_max_clients, | ||
stdout_file=redis_stdout_file, | ||
stderr_file=redis_stderr_file, | ||
cleanup=cleanup) | ||
if use_credis is None: | ||
use_credis = ("RAY_USE_NEW_GCS" in os.environ) | ||
if not use_credis: | ||
assigned_port, _ = _start_redis_instance( | ||
node_ip_address=node_ip_address, | ||
port=port, | ||
redis_max_clients=redis_max_clients, | ||
stdout_file=redis_stdout_file, | ||
stderr_file=redis_stderr_file, | ||
cleanup=cleanup) | ||
else: | ||
assigned_port, _ = _start_redis_instance( | ||
node_ip_address=node_ip_address, | ||
port=port, | ||
redis_max_clients=redis_max_clients, | ||
stdout_file=redis_stdout_file, | ||
stderr_file=redis_stderr_file, | ||
cleanup=cleanup, | ||
executable=CREDIS_EXECUTABLE, | ||
# It is important to load the credis module BEFORE the ray module, | ||
# as the latter contains an extern declaration that the former | ||
# supplies. | ||
modules=[CREDIS_MASTER_MODULE, REDIS_MODULE]) | ||
if port is not None: | ||
assert assigned_port == port | ||
port = assigned_port | ||
redis_address = address(node_ip_address, port) | ||
|
||
# Register the number of Redis shards in the primary shard, so that clients | ||
# know how many redis shards to expect under RedisShards. | ||
redis_client = redis.StrictRedis(host=node_ip_address, port=port) | ||
redis_client.set("NumRedisShards", str(num_redis_shards)) | ||
primary_redis_client = redis.StrictRedis(host=node_ip_address, port=port) | ||
primary_redis_client.set("NumRedisShards", str(num_redis_shards)) | ||
|
||
# Put the redirect_worker_output bool in the Redis shard so that workers | ||
# can access it and know whether or not to redirect their output. | ||
redis_client.set("RedirectOutput", 1 if redirect_worker_output else 0) | ||
primary_redis_client.set("RedirectOutput", 1 | ||
if redirect_worker_output else 0) | ||
|
||
# Store version information in the primary Redis shard. | ||
_put_version_info_in_redis(redis_client) | ||
_put_version_info_in_redis(primary_redis_client) | ||
|
||
# Start other Redis shards. Each Redis shard logs to a separate file, | ||
# prefixed by "redis-<shard number>". | ||
redis_shards = [] | ||
for i in range(num_redis_shards): | ||
redis_stdout_file, redis_stderr_file = new_log_files( | ||
"redis-{}".format(i), redirect_output) | ||
redis_shard_port, _ = start_redis_instance( | ||
node_ip_address=node_ip_address, | ||
port=redis_shard_ports[i], | ||
redis_max_clients=redis_max_clients, | ||
stdout_file=redis_stdout_file, | ||
stderr_file=redis_stderr_file, | ||
cleanup=cleanup) | ||
if not use_credis: | ||
redis_shard_port, _ = _start_redis_instance( | ||
node_ip_address=node_ip_address, | ||
port=redis_shard_ports[i], | ||
redis_max_clients=redis_max_clients, | ||
stdout_file=redis_stdout_file, | ||
stderr_file=redis_stderr_file, | ||
cleanup=cleanup) | ||
else: | ||
assert num_redis_shards == 1, \ | ||
"For now, RAY_USE_NEW_GCS supports 1 shard, and credis "\ | ||
"supports 1-node chain for that shard only." | ||
redis_shard_port, _ = _start_redis_instance( | ||
node_ip_address=node_ip_address, | ||
port=redis_shard_ports[i], | ||
redis_max_clients=redis_max_clients, | ||
stdout_file=redis_stdout_file, | ||
stderr_file=redis_stderr_file, | ||
cleanup=cleanup, | ||
executable=CREDIS_EXECUTABLE, | ||
# It is important to load the credis module BEFORE the ray | ||
# module, as the latter contains an extern declaration that the | ||
# former supplies. | ||
modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE]) | ||
|
||
if redis_shard_ports[i] is not None: | ||
assert redis_shard_port == redis_shard_ports[i] | ||
shard_address = address(node_ip_address, redis_shard_port) | ||
redis_shards.append(shard_address) | ||
# Store redis shard information in the primary redis shard. | ||
redis_client.rpush("RedisShards", shard_address) | ||
primary_redis_client.rpush("RedisShards", shard_address) | ||
|
||
if use_credis: | ||
# Configure the chain state. | ||
primary_redis_client.execute_command("MASTER.ADD", node_ip_address, | ||
redis_shard_port) | ||
shard_client = redis.StrictRedis( | ||
host=node_ip_address, port=redis_shard_port) | ||
shard_client.execute_command("MEMBER.CONNECT_TO_MASTER", | ||
node_ip_address, port) | ||
|
||
return redis_address, redis_shards | ||
|
||
|
||
def start_redis_instance(node_ip_address="127.0.0.1", | ||
port=None, | ||
redis_max_clients=None, | ||
num_retries=20, | ||
stdout_file=None, | ||
stderr_file=None, | ||
cleanup=True, | ||
executable=REDIS_EXECUTABLE, | ||
module=REDIS_MODULE): | ||
def _start_redis_instance(node_ip_address="127.0.0.1", | ||
port=None, | ||
redis_max_clients=None, | ||
num_retries=20, | ||
stdout_file=None, | ||
stderr_file=None, | ||
cleanup=True, | ||
executable=REDIS_EXECUTABLE, | ||
modules=None): | ||
"""Start a single Redis server. | ||
|
||
Args: | ||
|
@@ -579,8 +562,9 @@ def start_redis_instance(node_ip_address="127.0.0.1", | |
then this process will be killed by serices.cleanup() when the | ||
Python process that imported services exits. | ||
executable (str): Full path tho the redis-server executable. | ||
module (str): Full path to the redis module that will be loaded in this | ||
redis server. | ||
modules (list of str): A list of pathnames, pointing to the redis | ||
module(s) that will be loaded in this redis server. If None, load | ||
the default Ray redis module. | ||
|
||
Returns: | ||
A tuple of the port used by Redis and a handle to the process that was | ||
|
@@ -591,23 +575,27 @@ def start_redis_instance(node_ip_address="127.0.0.1", | |
Exception: An exception is raised if Redis could not be started. | ||
""" | ||
assert os.path.isfile(executable) | ||
assert os.path.isfile(module) | ||
if modules is None: | ||
modules = [REDIS_MODULE] | ||
for module in modules: | ||
assert os.path.isfile(module) | ||
counter = 0 | ||
if port is not None: | ||
# If a port is specified, then try only once to connect. | ||
num_retries = 1 | ||
else: | ||
port = new_port() | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you look at the above code block, retries should be happening only when the port is not specified, and if the port is specified, then a different random port should be tried each time. This is inconsistent with the error message I'm seeing
Any idea about this? |
||
load_module_args = [] | ||
for module in modules: | ||
load_module_args += ["--loadmodule", module] | ||
|
||
while counter < num_retries: | ||
if counter > 0: | ||
print("Redis failed to start, retrying now.") | ||
p = subprocess.Popen( | ||
[ | ||
executable, "--port", | ||
str(port), "--loglevel", "warning", "--loadmodule", module | ||
], | ||
stdout=stdout_file, | ||
stderr=stderr_file) | ||
command = [executable, "--port", | ||
str(port), "--loglevel", "warning"] + load_module_args | ||
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) | ||
time.sleep(0.1) | ||
# Check if Redis successfully started (or at least if it the executable | ||
# did not exit within 0.1 seconds). | ||
|
@@ -618,7 +606,8 @@ def start_redis_instance(node_ip_address="127.0.0.1", | |
port = new_port() | ||
counter += 1 | ||
if counter == num_retries: | ||
raise Exception("Couldn't start Redis.") | ||
raise Exception( | ||
"Couldn't start Redis. Check stdout file {}".format(stdout_file)) | ||
|
||
# Create a Redis client just for configuring Redis. | ||
redis_client = redis.StrictRedis(host="127.0.0.1", port=port) | ||
|
@@ -1329,10 +1318,6 @@ def start_ray_processes(address_info=None, | |
redirect_worker_output=redirect_worker_output, | ||
cleanup=cleanup) | ||
address_info["redis_address"] = redis_address | ||
if "RAY_USE_NEW_GCS" in os.environ: | ||
credis_address = start_credis( | ||
node_ip_address, redis_address, cleanup=cleanup) | ||
address_info["credis_address"] = credis_address | ||
time.sleep(0.1) | ||
|
||
# Start monitoring the processes. | ||
|
@@ -1351,7 +1336,6 @@ def start_ray_processes(address_info=None, | |
stdout_file=monitor_stdout_file, | ||
stderr_file=monitor_stderr_file, | ||
cleanup=cleanup) | ||
|
||
if redis_shards == []: | ||
# Get redis shards from primary redis instance. | ||
redis_ip_address, redis_port = redis_address.split(":") | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How difficult will it be to remove this restriction? (in a subsequent PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make this work we need both of the following
Both items need non-negligible amount of engineering work, I think.