Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate credis with Ray & route task table entries into credis. #1841

Merged
merged 18 commits into from
May 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions python/ray/common/redis_module/runtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import division
from __future__ import print_function

import os
import redis
import sys
import time
Expand Down Expand Up @@ -51,8 +52,10 @@ def get_next_message(pubsub_client, timeout_seconds=10):

class TestGlobalStateStore(unittest.TestCase):
def setUp(self):
redis_port, _ = ray.services.start_redis_instance()
self.redis = redis.StrictRedis(host="localhost", port=redis_port, db=0)
unused_primary_redis_addr, redis_shards = ray.services.start_redis(
"localhost", use_credis="RAY_USE_NEW_GCS" in os.environ)
self.redis = redis.StrictRedis(
host="localhost", port=redis_shards[0].split(":")[-1], db=0)

def tearDown(self):
ray.services.cleanup()
Expand Down
222 changes: 103 additions & 119 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,28 @@
from __future__ import print_function

import binascii
from collections import namedtuple, OrderedDict
from datetime import datetime
import json
import os
import psutil
import pyarrow
import random
import redis
import resource
import shutil
import signal
import socket
import subprocess
import sys
import time
import threading
import time
from collections import OrderedDict, namedtuple
from datetime import datetime

import psutil
import redis

import pyarrow
# Ray modules
import ray.global_scheduler as global_scheduler
import ray.local_scheduler
import ray.plasma
import ray.global_scheduler as global_scheduler

PROCESS_TYPE_MONITOR = "monitor"
PROCESS_TYPE_LOG_MONITOR = "log_monitor"
Expand Down Expand Up @@ -63,6 +64,7 @@
"core/src/common/redis_module/libray_redis_module.so")

# Location of the credis server and modules.
# credis will be enabled if the environment variable RAY_USE_NEW_GCS is set.
CREDIS_EXECUTABLE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/credis/redis/src/redis-server")
Expand Down Expand Up @@ -393,81 +395,15 @@ def check_version_info(redis_client):
print(error_message)


def start_credis(node_ip_address,
redis_address,
port=None,
redirect_output=False,
cleanup=True):
"""Start the credis global state store.

Credis is a chain replicated reliable redis store. It consists
of one master process that acts as a controller and a number of
chain members (currently two, the head and the tail).

Args:
node_ip_address: The IP address of the current node. This is only used
for recording the log filenames in Redis.
redis_address (str): The IP address and port of the primary redis
server.
port (int): If provided, the primary Redis shard will be started on
this port.
redirect_output (bool): True if output should be redirected to a file
and false otherwise.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then all Redis processes started by this method will be killed by
services.cleanup() when the Python process that imported services
exits.

Returns:
The address (ip_address:port) of the credis master process.
"""

components = ["credis_master", "credis_head", "credis_tail"]
modules = [
CREDIS_MASTER_MODULE, CREDIS_MEMBER_MODULE, CREDIS_MEMBER_MODULE
]
ports = []

for i, component in enumerate(components):
stdout_file, stderr_file = new_log_files(component, redirect_output)

new_port, _ = start_redis_instance(
node_ip_address=node_ip_address,
port=port,
stdout_file=stdout_file,
stderr_file=stderr_file,
cleanup=cleanup,
module=modules[i],
executable=CREDIS_EXECUTABLE)

ports.append(new_port)

[master_port, head_port, tail_port] = ports

# Connect the members to the master

master_client = redis.StrictRedis(host=node_ip_address, port=master_port)
master_client.execute_command("MASTER.ADD", node_ip_address, head_port)
master_client.execute_command("MASTER.ADD", node_ip_address, tail_port)

credis_address = address(node_ip_address, master_port)

# Register credis master in redis
redis_ip_address, redis_port = redis_address.split(":")
redis_client = redis.StrictRedis(host=redis_ip_address, port=redis_port)
redis_client.set("credis_address", credis_address)

return credis_address


def start_redis(node_ip_address,
port=None,
redis_shard_ports=None,
num_redis_shards=1,
redis_max_clients=None,
redirect_output=False,
redirect_worker_output=False,
cleanup=True):
cleanup=True,
use_credis=None):
"""Start the Redis global state store.

Args:
Expand All @@ -491,6 +427,9 @@ def start_redis(node_ip_address,
then all Redis processes started by this method will be killed by
services.cleanup() when the Python process that imported services
exits.
use_credis: If True, additionally load the chain-replicated libraries
into the redis servers. Defaults to None, which means its value is
set by the presence of "RAY_USE_NEW_GCS" in os.environ.

Returns:
A tuple of the address for the primary Redis shard and a list of
Expand All @@ -505,62 +444,106 @@ def start_redis(node_ip_address,
raise Exception("The number of Redis shard ports does not match the "
"number of Redis shards.")

assigned_port, _ = start_redis_instance(
node_ip_address=node_ip_address,
port=port,
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup)
if use_credis is None:
use_credis = ("RAY_USE_NEW_GCS" in os.environ)
if not use_credis:
assigned_port, _ = _start_redis_instance(
node_ip_address=node_ip_address,
port=port,
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup)
else:
assigned_port, _ = _start_redis_instance(
node_ip_address=node_ip_address,
port=port,
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup,
executable=CREDIS_EXECUTABLE,
# It is important to load the credis module BEFORE the ray module,
# as the latter contains an extern declaration that the former
# supplies.
modules=[CREDIS_MASTER_MODULE, REDIS_MODULE])
if port is not None:
assert assigned_port == port
port = assigned_port
redis_address = address(node_ip_address, port)

# Register the number of Redis shards in the primary shard, so that clients
# know how many redis shards to expect under RedisShards.
redis_client = redis.StrictRedis(host=node_ip_address, port=port)
redis_client.set("NumRedisShards", str(num_redis_shards))
primary_redis_client = redis.StrictRedis(host=node_ip_address, port=port)
primary_redis_client.set("NumRedisShards", str(num_redis_shards))

# Put the redirect_worker_output bool in the Redis shard so that workers
# can access it and know whether or not to redirect their output.
redis_client.set("RedirectOutput", 1 if redirect_worker_output else 0)
primary_redis_client.set("RedirectOutput", 1
if redirect_worker_output else 0)

# Store version information in the primary Redis shard.
_put_version_info_in_redis(redis_client)
_put_version_info_in_redis(primary_redis_client)

# Start other Redis shards. Each Redis shard logs to a separate file,
# prefixed by "redis-<shard number>".
redis_shards = []
for i in range(num_redis_shards):
redis_stdout_file, redis_stderr_file = new_log_files(
"redis-{}".format(i), redirect_output)
redis_shard_port, _ = start_redis_instance(
node_ip_address=node_ip_address,
port=redis_shard_ports[i],
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup)
if not use_credis:
redis_shard_port, _ = _start_redis_instance(
node_ip_address=node_ip_address,
port=redis_shard_ports[i],
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup)
else:
assert num_redis_shards == 1, \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How difficult will it be to remove this restriction? (in a subsequent PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make this work we need both of the following

  • Vanilla support for sharding for RAY_USE_NEW_GCS=on (related to new GCS API; not related to chain)
  • (related to chain) code changes to the master, to make it support handling multiple chains; currently the master module assumes 1 master/chain.

Both items need non-negligible amount of engineering work, I think.

"For now, RAY_USE_NEW_GCS supports 1 shard, and credis "\
"supports 1-node chain for that shard only."
redis_shard_port, _ = _start_redis_instance(
node_ip_address=node_ip_address,
port=redis_shard_ports[i],
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup,
executable=CREDIS_EXECUTABLE,
# It is important to load the credis module BEFORE the ray
# module, as the latter contains an extern declaration that the
# former supplies.
modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE])

if redis_shard_ports[i] is not None:
assert redis_shard_port == redis_shard_ports[i]
shard_address = address(node_ip_address, redis_shard_port)
redis_shards.append(shard_address)
# Store redis shard information in the primary redis shard.
redis_client.rpush("RedisShards", shard_address)
primary_redis_client.rpush("RedisShards", shard_address)

if use_credis:
# Configure the chain state.
primary_redis_client.execute_command("MASTER.ADD", node_ip_address,
redis_shard_port)
shard_client = redis.StrictRedis(
host=node_ip_address, port=redis_shard_port)
shard_client.execute_command("MEMBER.CONNECT_TO_MASTER",
node_ip_address, port)

return redis_address, redis_shards


def start_redis_instance(node_ip_address="127.0.0.1",
port=None,
redis_max_clients=None,
num_retries=20,
stdout_file=None,
stderr_file=None,
cleanup=True,
executable=REDIS_EXECUTABLE,
module=REDIS_MODULE):
def _start_redis_instance(node_ip_address="127.0.0.1",
port=None,
redis_max_clients=None,
num_retries=20,
stdout_file=None,
stderr_file=None,
cleanup=True,
executable=REDIS_EXECUTABLE,
modules=None):
"""Start a single Redis server.

Args:
Expand All @@ -579,8 +562,9 @@ def start_redis_instance(node_ip_address="127.0.0.1",
then this process will be killed by serices.cleanup() when the
Python process that imported services exits.
executable (str): Full path tho the redis-server executable.
module (str): Full path to the redis module that will be loaded in this
redis server.
modules (list of str): A list of pathnames, pointing to the redis
module(s) that will be loaded in this redis server. If None, load
the default Ray redis module.

Returns:
A tuple of the port used by Redis and a handle to the process that was
Expand All @@ -591,23 +575,27 @@ def start_redis_instance(node_ip_address="127.0.0.1",
Exception: An exception is raised if Redis could not be started.
"""
assert os.path.isfile(executable)
assert os.path.isfile(module)
if modules is None:
modules = [REDIS_MODULE]
for module in modules:
assert os.path.isfile(module)
counter = 0
if port is not None:
# If a port is specified, then try only once to connect.
num_retries = 1
else:
port = new_port()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the above code block, retries should be happening only when the port is not specified, and if the port is specified, then a different random port should be tried each time. This is inconsistent with the error message I'm seeing

testInvalidTaskTableAdd (__main__.TestGlobalStateStore) ... 14397:M 24 May 03:52:08.898 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
14397:M 24 May 03:52:08.898 # Server started, Redis version 3.9.102
14397:M 24 May 03:52:08.898 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
14397:M 24 May 03:52:08.898 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
Waiting for redis server at 127.0.0.1:51098 to respond...
14401:M 24 May 03:52:09.004 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14402:M 24 May 03:52:09.108 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14403:M 24 May 03:52:09.212 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14404:M 24 May 03:52:09.316 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14405:M 24 May 03:52:09.420 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14406:M 24 May 03:52:09.524 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14407:M 24 May 03:52:09.628 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14408:M 24 May 03:52:09.732 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14409:M 24 May 03:52:09.836 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14410:M 24 May 03:52:09.940 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14411:M 24 May 03:52:10.044 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14412:M 24 May 03:52:10.147 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14413:M 24 May 03:52:10.251 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14414:M 24 May 03:52:10.355 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14415:M 24 May 03:52:10.459 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14416:M 24 May 03:52:10.563 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14417:M 24 May 03:52:10.667 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14418:M 24 May 03:52:10.771 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14419:M 24 May 03:52:10.875 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14420:M 24 May 03:52:10.979 # Creating Server TCP listening socket *:59530: bind: Address already in use
ERROR

Any idea about this?

load_module_args = []
for module in modules:
load_module_args += ["--loadmodule", module]

while counter < num_retries:
if counter > 0:
print("Redis failed to start, retrying now.")
p = subprocess.Popen(
[
executable, "--port",
str(port), "--loglevel", "warning", "--loadmodule", module
],
stdout=stdout_file,
stderr=stderr_file)
command = [executable, "--port",
str(port), "--loglevel", "warning"] + load_module_args
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
time.sleep(0.1)
# Check if Redis successfully started (or at least if it the executable
# did not exit within 0.1 seconds).
Expand All @@ -618,7 +606,8 @@ def start_redis_instance(node_ip_address="127.0.0.1",
port = new_port()
counter += 1
if counter == num_retries:
raise Exception("Couldn't start Redis.")
raise Exception(
"Couldn't start Redis. Check stdout file {}".format(stdout_file))

# Create a Redis client just for configuring Redis.
redis_client = redis.StrictRedis(host="127.0.0.1", port=port)
Expand Down Expand Up @@ -1329,10 +1318,6 @@ def start_ray_processes(address_info=None,
redirect_worker_output=redirect_worker_output,
cleanup=cleanup)
address_info["redis_address"] = redis_address
if "RAY_USE_NEW_GCS" in os.environ:
credis_address = start_credis(
node_ip_address, redis_address, cleanup=cleanup)
address_info["credis_address"] = credis_address
time.sleep(0.1)

# Start monitoring the processes.
Expand All @@ -1351,7 +1336,6 @@ def start_ray_processes(address_info=None,
stdout_file=monitor_stdout_file,
stderr_file=monitor_stderr_file,
cleanup=cleanup)

if redis_shards == []:
# Get redis shards from primary redis instance.
redis_ip_address, redis_port = redis_address.split(":")
Expand Down
Loading