Skip to content

Conversation

concretevitamin
Copy link
Contributor

@concretevitamin concretevitamin commented Jul 2, 2018

This is a followup to #2266 -- that PR laid down the flushing policy framework + made task table flushable. This PR in addition makes xray's task table flushable.

Testing program: submitting N noops in a loop, monitor memory usage of the redis data shard. Built and ran with both RAY_USE_NEW_GCS and RAY_USE_XRAY.

import subprocess
import time

import redis

import ray

# Pass this flag to avoid annoying output on program exit.
info = ray.init(redirect_worker_output=True)

# Turn on flushing with the next two lines.
pol = ray.experimental.SimpleGcsFlushPolicy(
    flush_when_at_least_bytes=0,
    flush_period_secs=1e-4,
    flush_num_entries_each_time=10000)
ray.experimental.set_flushing_policy(pol)

primary_port = info['redis_address'].split(':')[-1]
primary = redis.StrictRedis('127.0.0.1', primary_port)
splits = primary.lrange('RedisShards', 0, -1)[0].split(b':')
shard_port = splits[-1].decode()
print('ports: primary {} shard {}'.format(primary_port, shard_port))

measure_shard = True

port = primary_port
if measure_shard:
    port = shard_port

pid = subprocess.run(
    ['pgrep', '-f', 'redis-server.*{}'.format(port)],
    stdout=subprocess.PIPE).stdout[:-1].decode()
logfile = 'psrecord-{}.log'.format(port)


@ray.remote
def noop():
    return


time.sleep(1)
args = [
    'psrecord', pid, '--interval', '1', '--duration', '60', '--log', logfile
    # 'psrecord', pid, '--interval', '1', '--duration', '100000', '--log', logfile
]
print(' '.join(args))
subprocess.Popen(args)

# Assuming each noop generates ~3 writes to the GCS, we need a flush throughput
# of ~2.1k/s to break even.  <- Use this a sanity check.
num_noops = 200000
i = 0
start = time.time()
while i < num_noops:
    noop.remote()
    i += 1
print('Time to execute {} no-ops: {:.2f} seconds'.format(
    num_noops,
    time.time() - start))

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6426/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6427/
Test FAILed.

@concretevitamin concretevitamin changed the title [WIP, DO NOT REVIEW YET] ResultTableAdd & ObjectTableAdd: add credis-managed versions Make xray object table credis-managed and hence flushable. Jul 2, 2018
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6430/
Test FAILed.

if self.issue_gcs_flushes:
# For now, we take the primary redis server to issue flushes,
# because task table entries are stored there under this flag.
# Data is stored under the first data shard, so we issue flushes to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a note or todo here that this needs to change when we implement sharding

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, added a preventive check

# Data is stored under the first data shard, so we issue flushes to
# that redis server.
addr_port = self.redis.lrange("RedisShards", 0, -1)[0]
addr_port = addr_port.split(b":")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addr, port = addr_port.split(b":") would be slightly cleaner but up to you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

return context_->RunAsync("RAY.TABLE_APPEND", id, fbb.GetBufferPointer(), fbb.GetSize(),
prefix_, pubsub_channel_, std::move(callback));

std::string command;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this code at enough places now that it warrants a little helper function I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return context_->RunAsync("RAY.TABLE_APPEND", id, fbb.GetBufferPointer(), fbb.GetSize(),
prefix_, pubsub_channel_, std::move(callback), log_length);

std::string command;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to the above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

const char *reply = "ERR entry exists";
return RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply));
static const char *reply = "ERR entry exists";
RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use RedisModule_ReplyWithError here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also returns OK (0),

 * The function always returns REDISMODULE_OK.
 */
int RM_ReplyWithError(RedisModuleCtx *ctx, const char *err) {
    return replyWithStatus(ctx,err,"-");
}

So I'd like to not do this & keep this PR as close to the original behavior as possible.

Copy link
Contributor

@pcmoritz pcmoritz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good modulo small fixes (see comments)

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments. Overall, looks great!

host=addr_port[0], port=addr_port[1])
try:
self.redis.execute_command("HEAD.FLUSH 0")
self.redis_shard.execute_command("HEAD.FLUSH 0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this change necessary? Is self.redis pointing to the wrong server?

Also, is this code run every time a flush is required? If so, we should try to avoid running the self.redis.lrange command every time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It's necessary because before, in RAY_USE_NEW_GCS, the task table is placed in primary and the object table in data shard. I implemented flushing for task table first, hence this code here is issuing flushes to primary. @pcmoritz's PR on Saturday fixed that issue.

  2. No, this code is only run once at Monitor's start. It's used to poke whether the loaded modules contain support for flushing.

} else {
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to reply with a Redis error using RedisModule_ReplyWithError here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where? If you mean RAY_CHECK(flags == REDISMODULE_ZADD_ADDED) << "Appended a duplicate entry"; then I think no since (1) it will crash (2) the existing behavior is like so too?

RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply));
return REDISMODULE_ERR;
}
return REDISMODULE_OK;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this into the if block above? Since it's only reachable by that branch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, good call

#include "ray/gcs/client.h"

namespace {
static const std::string kTableAppendCommand = "RAY.TABLE_APPEND";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also move the TABLE_ADD commands here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really since that code is not duplicated. Also I don't see a good way to abstract that piece across append and add, without doing costly / unwieldy string concatenation. Here the strings are compile-time constants.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I just meant to also define static const std::strings for the TABLE_ADD commands.

I'm fine with not trying to abstract it out right now, but in the future we could still define the strings statically while doing the concatenation in the definition. As we add more commands, it would be nice to have a static table of commands that we could lookup as part of the helper function that you added in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6431/
Test FAILed.

@concretevitamin
Copy link
Contributor Author

Addressed comments, PTAL, thanks!

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6437/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6441/
Test PASSed.

#include "ray/gcs/client.h"

namespace {
static const std::string kTableAppendCommand = "RAY.TABLE_APPEND";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I just meant to also define static const std::strings for the TABLE_ADD commands.

I'm fine with not trying to abstract it out right now, but in the future we could still define the strings statically while doing the concatenation in the definition. As we add more commands, it would be nice to have a static table of commands that we could lookup as part of the helper function that you added in this PR.

} else {
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
return REDISMODULE_OK;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to be more careful about which functions (node vs. tail) should return a Redis reply and when, to make sure that every Redis command returns exactly one reply. I think it should be:

  1. If node func errors, then it should return an error reply.
  2. If node func does not error, then it should not return anything.
  3. Tail func should always return a Redis reply.

I think this means that for the master commit on credis, we are incorrectly handling the case where the node func does not error and the command is executed on a non-tail node. In this case, the node func does not return a reply and the tail func isn't run, so we won't be returning a Redis reply.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanie-wang yes, bullets 1-3 are exactly what I have in mind.

For singleton chain case (default deployment), both ActAsTail() and ActAsHead() will be true. So if node func does not error, the tail func will be run, and hence we will exactly get 1 reply.

For distributed, >1-chain case, the ChainReplicate() interface specifically designed for ray's redis module lacks this distributed support (i.e., propagating the update). (credis' native MemberPut_RedisCommand()/Put() do handle propagation, of course.) . Since it's not handled right now we should make sure 1-node case is correct at the moment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, just something to keep in mind once we move to distributed.

@concretevitamin
Copy link
Contributor Author

Travis LINUX_WHEELS=1 build kept failing with no obvious errors. Mind taking a look @pcmoritz ?

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6444/
Test PASSed.

@pcmoritz
Copy link
Contributor

pcmoritz commented Jul 3, 2018

@concretevitamin This also happens on the current master so is unrelated to this PR. I'm investigating it.

@pcmoritz pcmoritz merged commit ba28ddd into ray-project:master Jul 4, 2018
heyucongtom pushed a commit to heyucongtom/ray that referenced this pull request Jul 4, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants