Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
362 changes: 183 additions & 179 deletions plugins/experimental/parent_select/consistenthash.cc

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions plugins/experimental/parent_select/consistenthash.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ class PLNextHopConsistentHash : public PLNextHopSelectionStrategy
std::vector<std::shared_ptr<ATSConsistentHash>> rings;
uint64_t getHashKey(uint64_t sm_id, TSMBuffer reqp, TSMLoc url, TSMLoc parent_selection_url, ATSHash64 *h);

std::shared_ptr<PLHostRecord> chashLookup(const std::shared_ptr<ATSConsistentHash> &ring, uint32_t cur_ring,
PLNextHopConsistentHashTxn *state, bool *wrapped, uint64_t sm_id, TSMBuffer reqp,
TSMLoc url, TSMLoc parent_selection_url);

public:
const uint32_t LineNumberPlaceholder = 99999;

Expand Down
15 changes: 8 additions & 7 deletions plugins/experimental/parent_select/parent_select.cc
Original file line number Diff line number Diff line change
Expand Up @@ -442,20 +442,21 @@ TSRemapDoRemap(void *ih, TSHttpTxn txnp, TSRemapRequestInfo *rri)
strategy->next(txnp, strategyTxn->txn, exclude_host, exclude_host_len, exclude_port, &ra.hostname, &ra.hostname_len, &ra.port,
&ra.is_retry, &ra.no_cache);

if (ra.hostname == nullptr) {
ra.nextHopExists = ra.hostname != nullptr;
ra.fail = !ra.nextHopExists;
// The action here is used for the very first connection, not any retry. So of course we should try it.
ra.responseIsRetryable = true;
ra.goDirect = strategy->goDirect();
ra.parentIsProxy = strategy->parentIsProxy();

if (ra.fail && !ra.goDirect) {
// TODO make configurable
TSDebug(PLUGIN_NAME, "TSRemapDoRemap strategy '%s' next returned nil, returning 502!", strategy->name());
TSHttpTxnStatusSet(txnp, TS_HTTP_STATUS_BAD_GATEWAY);
// TODO verify TS_EVENT_HTTP_TXN_CLOSE fires, and if not, free the cont here.
return TSREMAP_DID_REMAP;
}

ra.fail = false;
ra.nextHopExists = true;
ra.responseIsRetryable =
true; // The action here is used for the very first connection, not any retry. So of course we should try it.
ra.goDirect = strategy->goDirect();
ra.parentIsProxy = strategy->parentIsProxy();
TSDebug(PLUGIN_NAME, "TSRemapDoRemap setting response_action hostname '%.*s' port %d direct %d proxy %d", int(ra.hostname_len),
ra.hostname, ra.port, ra.goDirect, ra.parentIsProxy);
TSHttpTxnResponseActionSet(txnp, &ra);
Expand Down
39 changes: 37 additions & 2 deletions plugins/experimental/parent_select/strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ bool
PLNextHopSelectionStrategy::Init(const YAML::Node &n)
{
PL_NH_Debug(PL_NH_DEBUG_TAG, "calling Init()");
std::string self_host;
bool self_host_used = false;

try {
if (n["scheme"]) {
Expand Down Expand Up @@ -112,7 +114,12 @@ PLNextHopSelectionStrategy::Init(const YAML::Node &n)
} else if (ring_mode_val == exhaust_rings) {
ring_mode = PL_NH_EXHAUST_RING;
} else if (ring_mode_val == peering_rings) {
ring_mode = PL_NH_PEERING_RING;
ring_mode = PL_NH_PEERING_RING;
YAML::Node self_node = failover_node["self"];
if (self_node) {
self_host = self_node.Scalar();
PL_NH_Debug(PL_NH_DEBUG_TAG, "%s is self", self_host.c_str());
}
} else {
ring_mode = PL_NH_ALTERNATE_RING;
PL_NH_Note("Invalid 'ring_mode' value, '%s', for the strategy named '%s', using default '%s'.", ring_mode_val.c_str(),
Expand Down Expand Up @@ -219,9 +226,16 @@ PLNextHopSelectionStrategy::Init(const YAML::Node &n)
std::shared_ptr<PLHostRecord> host_rec = std::make_shared<PLHostRecord>(hosts_list[hst].as<PLHostRecord>());
host_rec->group_index = grp;
host_rec->host_index = hst;
if (TSHostnameIsSelf(host_rec->hostname.c_str(), host_rec->hostname.size()) == TS_SUCCESS) {
if (self_host == host_rec->hostname ||
TSHostnameIsSelf(host_rec->hostname.c_str(), host_rec->hostname.size()) == TS_SUCCESS) {
if (ring_mode == PL_NH_PEERING_RING && grp != 0) {
throw std::invalid_argument("self host (" + self_host +
") can only appear in first host group for peering ring mode");
}
TSHostStatusSet(host_rec->hostname.c_str(), host_rec->hostname.size(), TSHostStatus::TS_HOST_STATUS_DOWN, 0,
static_cast<unsigned int>(TS_HOST_STATUS_SELF_DETECT));
host_rec->self = true;
self_host_used = true;
}
hosts_inner.push_back(std::move(host_rec));
num_parents++;
Expand All @@ -232,12 +246,33 @@ PLNextHopSelectionStrategy::Init(const YAML::Node &n)
}
}
}
if (!self_host.empty() && !self_host_used) {
throw std::invalid_argument("self host (" + self_host + ") does not appear in the first (peer) group");
}
} catch (std::exception &ex) {
PL_NH_Note("Error parsing the strategy named '%s' due to '%s', this strategy will be ignored.", strategy_name.c_str(),
ex.what());
return false;
}

if (ring_mode == PL_NH_PEERING_RING) {
if (groups == 1) {
if (!go_direct) {
PL_NH_Error("when ring mode is '%s', go_direct must be true when there is only one host group.", peering_rings.data());
return false;
}
} else if (groups != 2) {
PL_NH_Error("when ring mode is '%s', requires two host groups (peering group and an upstream group),"
" or just a single peering group with go_direct.",
peering_rings.data());
return false;
}
// if (policy_type != PL_NH_CONSISTENT_HASH) {
// PL_NH_Error("ring mode '%s', is only implemented for a 'consistent_hash' policy.", peering_rings.data());
// return false;
// }
}

return true;
}

Expand Down
26 changes: 15 additions & 11 deletions plugins/experimental/parent_select/strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,14 @@ struct PLNHProtocol {
struct PLHostRecord : ATSConsistentHashNode {
std::mutex _mutex;
std::string hostname;
time_t failedAt;
uint32_t failCount;
time_t upAt;
std::atomic<time_t> failedAt;
std::atomic<uint32_t> failCount;
std::atomic<time_t> upAt;
float weight;
std::string hash_string;
int host_index;
int group_index;
bool self = false;
std::vector<std::shared_ptr<PLNHProtocol>> protocols;

// construct without locking the _mutex.
Expand All @@ -132,30 +133,33 @@ struct PLHostRecord : ATSConsistentHashNode {
PLHostRecord(const PLHostRecord &o)
{
hostname = o.hostname;
failedAt = o.failedAt;
failCount = o.failCount;
upAt = o.upAt;
failedAt = o.failedAt.load();
failCount = o.failCount.load();
upAt = o.upAt.load();
weight = o.weight;
hash_string = o.hash_string;
host_index = -1;
group_index = -1;
available = true;
host_index = o.host_index;
group_index = o.group_index;
available = o.available.load();
protocols = o.protocols;
self = o.self;
}

// assign without copying the _mutex.
PLHostRecord &
operator=(const PLHostRecord &o)
{
hostname = o.hostname;
failedAt = o.failedAt;
upAt = o.upAt;
failedAt = o.failedAt.load();
failCount = o.failCount.load();
upAt = o.upAt.load();
weight = o.weight;
hash_string = o.hash_string;
host_index = o.host_index;
group_index = o.group_index;
available = o.available.load();
protocols = o.protocols;
self = o.self;
return *this;
}

Expand Down
1 change: 1 addition & 0 deletions tests/gold_tests/pluginTest/parent_select/body.gold
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This is the body.
147 changes: 147 additions & 0 deletions tests/gold_tests/pluginTest/parent_select/parent_select.test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
'''
'''
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Test.Summary = '''
Basic parent_select plugin test
'''

Test.SkipUnless(
Condition.PluginExists('parent_select.so'),
)
Test.ContinueOnFail = False

# Define and populate MicroServer.
#
server = Test.MakeOriginServer("server")
response_header = {
"headers":
"HTTP/1.1 200 OK\r\n"
"Connection: close\r\n"
"Cache-control: max-age=85000\r\n"
"\r\n",
"timestamp": "1469733493.993",
"body": "This is the body.\n"
}
num_objects = 32
for i in range(num_objects):
request_header = {
"headers":
f"GET /obj{i} HTTP/1.1\r\n"
"Host: does.not.matter\r\n" # But cannot be omitted.
"\r\n",
"timestamp": "1469733493.993",
"body": ""
}
server.addResponse("sessionlog.json", request_header, response_header)

dns = Test.MakeDNServer("dns")

# Define next hop trafficserver instances.
#
num_nh = 8
ts_nh = []
for i in range(num_nh):
ts = Test.MakeATSProcess(f"ts_nh{i}", command=f"traffic_server 2>nh_trace{i}.log")
ts.Disk.records_config.update({
'proxy.config.diags.debug.enabled': 1,
'proxy.config.diags.debug.tags': 'http|dns',
'proxy.config.dns.nameservers': f"127.0.0.1:{dns.Variables.Port}",
'proxy.config.dns.resolv_conf': "NULL",
})
ts.Disk.remap_config.AddLine(
f"map / http://127.0.0.1:{server.Variables.Port}"
)
ts_nh.append(ts)

ts = Test.MakeATSProcess("ts")

ts.Disk.records_config.update({
'proxy.config.diags.debug.enabled': 1,
'proxy.config.diags.debug.tags': 'http|dns|parent|next_hop|host_statuses|hostdb',
'proxy.config.dns.nameservers': f"127.0.0.1:{dns.Variables.Port}", # Only nameservers if resolv_conf NULL.
'proxy.config.dns.resolv_conf': "NULL", # This defaults to /etc/resvolv.conf (OS namesevers) if not NULL.
'proxy.config.http.cache.http': 0,
'proxy.config.http.uncacheable_requests_bypass_parent': 0,
'proxy.config.http.no_dns_just_forward_to_parent': 1,
'proxy.config.http.parent_proxy.mark_down_hostdb': 0,
'proxy.config.http.parent_proxy.self_detect': 0,
})

ts.Disk.File(ts.Variables.CONFIGDIR + "/strategies.yaml", id="strategies", typename="ats:config")
s = ts.Disk.strategies
s.AddLine("groups:")
s.AddLine(" - &g1")
for i in range(num_nh):
dns.addRecords(records={f"next_hop{i}": ["127.0.0.1"]})
s.AddLine(f" - host: next_hop{i}")
s.AddLine(f" protocol:")
s.AddLine(f" - scheme: http")
s.AddLine(f" port: {ts_nh[i].Variables.port}")
# The health check URL does not seem to be used currently.
# s.AddLine(f" health_check_url: http://next_hop{i}:{ts_nh[i].Variables.port}")
s.AddLine(f" weight: 1.0")
s.AddLines([
"strategies:",
" - strategy: the-strategy",
" policy: consistent_hash",
" hash_key: path",
" go_direct: false",
" parent_is_proxy: true",
" ignore_self_detect: true",
" groups:",
" - *g1",
" scheme: http"])

# Fallover not currently tested.
#
# s.AddLines([
# " fallover:",
# " max_simple_retries: 2",
# " ring_mode: exhaust_ring",
# " response_codes:",
# " - 404",
# " health_check:",
# " - passive"])

ts.Disk.remap_config.AddLine(
"map http://dummy.com http://not_used @plugin=parent_select.so @pparam=" +
ts.Variables.CONFIGDIR +
"/strategies.yaml @pparam=the-strategy")

tr = Test.AddTestRun()
tr.Processes.Default.StartBefore(server)
tr.Processes.Default.StartBefore(dns)
for i in range(num_nh):
tr.Processes.Default.StartBefore(ts_nh[i])
tr.Processes.Default.StartBefore(Test.Processes.ts)
tr.Processes.Default.Command = 'echo start TS, HTTP server, DNS server and next hop TSes'
tr.Processes.Default.ReturnCode = 0

for i in range(num_objects):
tr = Test.AddTestRun()
tr.Processes.Default.Command = (
f'curl --verbose --proxy 127.0.0.1:{ts.Variables.port} http://dummy.com/obj{i}'
)
tr.Processes.Default.Streams.stdout = "body.gold"
tr.Processes.Default.ReturnCode = 0

tr = Test.AddTestRun()
# For some reason, the * won't be expanded when the command is executed, if stdout is not piped through "cat".
tr.Processes.Default.Command = "grep -F '200 OK' nh_trace*.log | cat"
tr.Processes.Default.Streams.stdout = "trace.gold"
tr.Processes.Default.ReturnCode = 0
Loading