Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the RPCServicePort constant to the global configuration #660

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions containers/python/rpc_test_container.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import rpc
import os
import sys
import numpy as np
import argparse
import json


Expand Down Expand Up @@ -29,16 +27,18 @@ def predict_doubles(self, inputs):


if __name__ == "__main__":
ip = "127.0.0.1"
port = 7000
input_type = "doubles"
model_version = 1
parser = argparse.ArgumentParser()
parser.add_argument(
'--rpc_service_port',
type=int,
default=7000)
args = parser.parse_args()

rpc_service = rpc.RPCService(collect_metrics=False, read_config=False)
rpc_service.model_name = "rpctest_py"
rpc_service.model_version = 1
rpc_service.host = "127.0.0.1"
rpc_service.port = 7000
rpc_service.port = args.rpc_service_port
rpc_service.input_type = "doubles"

model = RPCTestContainer(rpc_service)
Expand Down
10 changes: 5 additions & 5 deletions containers/test/test_container_rpc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ then
fi

success=false
rpc_service_port=17000 # for test only
RehanSD marked this conversation as resolved.
Show resolved Hide resolved

function clean_up {
# Perform program exit housekeeping
Expand All @@ -48,7 +49,7 @@ cd $DIR

# Start python rpc test container
echo "Starting python RPC test container..."
python ../python/rpc_test_container.py &
python ../python/rpc_test_container.py --rpc_service_port $rpc_service_port &

# Deprecate JVM containers
# cd ../jvm
Expand All @@ -67,22 +68,21 @@ cd $DIR/../../
cd container
make container_rpc_test
container_uptime_seconds=180
./container_rpc_test -t $container_uptime_seconds &
./container_rpc_test -t $container_uptime_seconds -p $rpc_service_port &

sleep 10s

cd $DIR/../../debug/src/benchmarks
make rpctest
echo "Executing RPC test (first iteration)..."
REDIS_PORT=$1
./rpctest --num_containers=2 --timeout_seconds=30 --redis_port $REDIS_PORT
./rpctest --num_containers=2 --timeout_seconds=30 --redis_port $REDIS_PORT --rpc_service_port $rpc_service_port
redis-cli -p $REDIS_PORT "flushall"
echo "Sleeping for 5 seconds..."
sleep 5s
echo "Executing RPC test (second iteration)..."
./rpctest --num_containers=2 --timeout_seconds=30 --redis_port $REDIS_PORT
./rpctest --num_containers=2 --timeout_seconds=30 --redis_port $REDIS_PORT --rpc_service_port $rpc_service_port
redis-cli -p $REDIS_PORT "flushall"
echo "TEST PASSED!"
success=true
exit 0

1 change: 1 addition & 0 deletions dockerfiles/PySparkContainerDockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ FROM ${REGISTRY}/${RPC_VERSION}-rpc:${CODE_VERSION}
LABEL maintainer="Dan Crankshaw <dscrankshaw@gmail.com>"

RUN mkdir -p /usr/share/man/man1 && \
apt-get update -qq -y && \
RehanSD marked this conversation as resolved.
Show resolved Hide resolved
apt-get install openjdk-8-jre openjdk-8-jdk-headless -y

RUN apt-get install -y -qq wget
Expand Down
9 changes: 6 additions & 3 deletions src/benchmarks/src/rpc_protocol_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ class Tester {
}

void start(long timeout_seconds) {
rpc_->start("127.0.0.1", RPC_SERVICE_PORT,
Config &conf = get_config();
rpc_->start("127.0.0.1", conf.get_rpc_service_port(),
[](VersionedModelId /*model*/, int /*container_id*/) {},
[this](rpc::RPCResponse &response) {
on_response_received(std::move(response));
},
[](VersionedModelId, int) {});
Config &conf = get_config();
while (!redis_connection_.connect(conf.get_redis_address(),
conf.get_redis_port())) {
log_error(LOGGING_TAG_RPC_TEST, "RPCTest failed to connect to redis",
Expand Down Expand Up @@ -308,16 +308,19 @@ int main(int argc, char *argv[]) {
("redis_ip", "Redis address",
cxxopts::value<std::string>()->default_value("localhost"))
("redis_port", "Redis port",
cxxopts::value<int>()->default_value("6379"))
cxxopts::value<int>()->default_value(std::to_string(DEFAULT_REDIS_PORT)))
("num_containers", "Number of containers to validate",
cxxopts::value<int>()->default_value("1"))
("rpc_service_port", "RPCService's port",
cxxopts::value<int>()->default_value(std::to_string(DEFAULT_RPC_SERVICE_PORT)))
("timeout_seconds", "Timeout in seconds",
cxxopts::value<long>()->default_value("30"));
// clang-format on
options.parse(argc, argv);

get_config().set_redis_address(options["redis_ip"].as<std::string>());
get_config().set_redis_port(options["redis_port"].as<int>());
get_config().set_rpc_service_port(options["rpc_service_port"].as<int>());
get_config().ready();

Tester tester(options["num_containers"].as<int>());
Expand Down
10 changes: 7 additions & 3 deletions src/benchmarks/src/rpc_service_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ class Benchmarker {
request_(create_request(input_type, message_size)) {}

void start() {
rpc_->start("*", RPC_SERVICE_PORT, [](VersionedModelId, int) {},
Config &conf = get_config();
rpc_->start("*", conf.get_rpc_service_port(),
[](VersionedModelId, int) {},
[this](rpc::RPCResponse &response) {
on_response_recv(std::move(response));
},
Expand All @@ -138,7 +140,6 @@ class Benchmarker {
throughput_meter_ = metrics::MetricsRegistry::get_metrics().create_meter(
"rpc_bench_throughput");

Config &conf = get_config();
while (!redis_connection_.connect(conf.get_redis_address(),
conf.get_redis_port())) {
log_error(LOGGING_TAG_RPC_BENCH, "RPCBench failed to connect to redis",
Expand Down Expand Up @@ -242,11 +243,13 @@ int main(int argc, char *argv[]) {
("redis_ip", "Redis address",
cxxopts::value<std::string>()->default_value("localhost"))
("redis_port", "Redis port",
cxxopts::value<int>()->default_value("6379"))
cxxopts::value<int>()->default_value(std::to_string(DEFAULT_REDIS_PORT)))
("m,num_messages", "Number of messages to send",
cxxopts::value<int>()->default_value("100"))
("s,message_size", "Number of inputs per message",
cxxopts::value<int>()->default_value("500"))
("rpc_service_port", "RPCService's port",
cxxopts::value<int>()->default_value(std::to_string(DEFAULT_RPC_SERVICE_PORT)))
("input_type", "Can be bytes, ints, floats, doubles, or strings",
cxxopts::value<std::string>()->default_value("doubles"));
// clang-format on
Expand All @@ -255,6 +258,7 @@ int main(int argc, char *argv[]) {
clipper::Config &conf = clipper::get_config();
conf.set_redis_address(options["redis_ip"].as<std::string>());
conf.set_redis_port(options["redis_port"].as<int>());
conf.set_rpc_service_port(options["rpc_service_port"].as<int>());
conf.ready();
InputType input_type =
clipper::parse_input_type(options["input_type"].as<std::string>());
Expand Down
6 changes: 4 additions & 2 deletions src/container/src/container_rpc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,21 @@ int main(int argc, char* argv[]) {
// clang-format off
options.add_options()
("t,test_length", "length of the test in seconds",
cxxopts::value<long>()->default_value("10"));
cxxopts::value<long>()->default_value("10"))
("p,rpc_service_port", "RPCService's port",
cxxopts::value<int>()->default_value("7000"));
// clang-format on
options.parse(argc, argv);

long test_length = options["test_length"].as<long>();
int clipper_port = options["rpc_service_port"].as<int>();

RPC container_rpc;
RPCTestModel test_model(container_rpc);
std::string model_name = "cpp_test";
int model_version = 1;

std::string clipper_ip = "localhost";
int clipper_port = 7000;

container_rpc.start(test_model, model_name, model_version, clipper_ip,
clipper_port);
Expand Down
3 changes: 3 additions & 0 deletions src/frontends/src/query_frontend_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ int main(int argc, char* argv[]) {
cxxopts::value<std::string>()->default_value(clipper::DEFAULT_REDIS_ADDRESS))
("redis_port", "Redis port",
cxxopts::value<int>()->default_value(std::to_string(clipper::DEFAULT_REDIS_PORT)))
("rpc_service_port", "RPCService's port",
cxxopts::value<int>()->default_value(std::to_string(clipper::DEFAULT_RPC_SERVICE_PORT)))
("prediction_cache_size", "Size of the prediction cache in bytes, excluding cache metadata",
cxxopts::value<long>()->default_value(std::to_string(clipper::DEFAULT_PREDICTION_CACHE_SIZE_BYTES)));
// clang-format on
Expand All @@ -22,6 +24,7 @@ int main(int argc, char* argv[]) {
clipper::Config& conf = clipper::get_config();
conf.set_redis_address(options["redis_ip"].as<std::string>());
conf.set_redis_port(options["redis_port"].as<int>());
conf.set_rpc_service_port(options["rpc_service_port"].as<int>());
conf.set_prediction_cache_size(options["prediction_cache_size"].as<long>());
conf.ready();

Expand Down
22 changes: 22 additions & 0 deletions src/libclipper/include/clipper/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace clipper {

const std::string DEFAULT_REDIS_ADDRESS("localhost");
constexpr int DEFAULT_REDIS_PORT = 6379;
constexpr int DEFAULT_RPC_SERVICE_PORT = 7000;
constexpr long DEFAULT_PREDICTION_CACHE_SIZE_BYTES = 33554432; // 32 MiB

/**
Expand All @@ -33,6 +34,7 @@ struct Config {
: readable_(false),
redis_address_(DEFAULT_REDIS_ADDRESS),
redis_port_(DEFAULT_REDIS_PORT),
rpc_service_port_(DEFAULT_RPC_SERVICE_PORT),
prediction_cache_size_bytes_(DEFAULT_PREDICTION_CACHE_SIZE_BYTES) {}

/**
Expand All @@ -42,6 +44,7 @@ struct Config {
readable_ = false;
redis_address_ = DEFAULT_REDIS_ADDRESS;
redis_port_ = DEFAULT_REDIS_PORT;
rpc_service_port_ = DEFAULT_RPC_SERVICE_PORT;
prediction_cache_size_bytes_ = DEFAULT_PREDICTION_CACHE_SIZE_BYTES;
}

Expand Down Expand Up @@ -85,6 +88,24 @@ struct Config {
redis_port_ = port;
}

int get_rpc_service_port() const {
if (!readable_) {
// TODO: use a better exception
throw std::logic_error("Cannot read Config until ready");
}
assert(readable_);
return rpc_service_port_;
}

void set_rpc_service_port(int port) {
if (readable_) {
// TODO: use a better exception
throw std::logic_error("Cannot write to Config after ready");
}
assert(!readable_);
rpc_service_port_ = port;
}

long get_prediction_cache_size() const {
if (!readable_) {
// TODO: use a better exception
Expand Down Expand Up @@ -114,6 +135,7 @@ struct Config {
bool readable_;
std::string redis_address_;
int redis_port_;
int rpc_service_port_;
long prediction_cache_size_bytes_;
};

Expand Down
2 changes: 0 additions & 2 deletions src/libclipper/include/clipper/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ enum RedisDBTable {
REDIS_APP_MODEL_LINKS_DB_NUM = 7,
};

constexpr int RPC_SERVICE_PORT = 7000;
RehanSD marked this conversation as resolved.
Show resolved Hide resolved

constexpr int QUERY_FRONTEND_PORT = 1337;
constexpr int MANAGEMENT_FRONTEND_PORT = 1338;

Expand Down
4 changes: 2 additions & 2 deletions src/libclipper/include/clipper/task_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,9 @@ class TaskExecutor {
model_queues_({}),
model_metrics_({}) {
log_info(LOGGING_TAG_TASK_EXECUTOR, "TaskExecutor started");
Config &conf = get_config();
rpc_->start(
"*", RPC_SERVICE_PORT, [ this, task_executor_valid = active_ ](
"*", conf.get_rpc_service_port(), [ this, task_executor_valid = active_ ](
VersionedModelId model, int replica_id) {
if (*task_executor_valid) {
on_container_ready(model, replica_id);
Expand Down Expand Up @@ -281,7 +282,6 @@ class TaskExecutor {
"TaskExecutor has been destroyed.");
}
});
Config &conf = get_config();
while (!redis_connection_.connect(conf.get_redis_address(),
conf.get_redis_port())) {
log_error(LOGGING_TAG_TASK_EXECUTOR,
Expand Down