Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ewma balancer #2001

Merged
merged 20 commits into from
Aug 29, 2020
4 changes: 2 additions & 2 deletions apisix/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ local module_name = "balancer"
local pickers = {
roundrobin = require("apisix.balancer.roundrobin"),
chash = require("apisix.balancer.chash"),
ewma = require("apisix.balancer.ewma")
}


local lrucache_server_picker = core.lrucache.new({
ttl = 300, count = 256
})
Expand Down Expand Up @@ -245,7 +245,7 @@ local function pick_server(route, ctx)
core.log.error("failed to parse server addr: ", server, " err: ", err)
return core.response.exit(502)
end

ctx.server_picker = server_picker
return res
end

Expand Down
240 changes: 240 additions & 0 deletions apisix/balancer/ewma.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
--
redynasc marked this conversation as resolved.
Show resolved Hide resolved
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local resty_lock = require("resty.lock")
local core = require("apisix.core")
local ngx = ngx
local ngx_shared = ngx.shared
local ngx_now = ngx.now
local tostring = tostring
local math = math
local pairs = pairs
local next = next
local tonumber = tonumber

local _M = {}
local DECAY_TIME = 10 -- this value is in seconds
local LOCK_KEY = ":ewma_key"
local PICK_SET_SIZE = 2
local ewma_lock, ewma_lock_err
local shm_ewma = ngx_shared.balancer_ewma
local shm_last_touched_at= ngx_shared.balancer_ewma_last_touched_at


local function lock(upstream)
membphis marked this conversation as resolved.
Show resolved Hide resolved
local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
redynasc marked this conversation as resolved.
Show resolved Hide resolved
if err then
if err ~= "timeout" then
core.log.error("EWMA Balancer failed to lock:", err)
end
return false, err
end

return true
membphis marked this conversation as resolved.
Show resolved Hide resolved
end


local function unlock()
local ok, err = ewma_lock:unlock()
if not ok then
core.log.error("EWMA Balancer failed to unlock:", err)
return false, err
end

return true
end

membphis marked this conversation as resolved.
Show resolved Hide resolved

local function decay_ewma(ewma, last_touched_at, rtt, now)
local td = now - last_touched_at
td = math.max(td, 0)
redynasc marked this conversation as resolved.
Show resolved Hide resolved
local weight = math.exp(-td / DECAY_TIME)

ewma = ewma * weight + rtt * (1.0 - weight)
return ewma
end


local function store_stats(upstream, ewma, now)
local success, err, forcible = shm_last_touched_at:set(upstream, now)
if not success then
core.log.error("balancer_ewma_last_touched_at:set failed ", err)
end
if forcible then
core.log.warn("balancer_ewma_last_touched_at:set valid items forcibly overwritten")
end

success, err, forcible = shm_ewma:set(upstream, ewma)
if not success then
core.log.error("balancer_ewma:set failed ", err)
end
if forcible then
core.log.warn("balancer_ewma:set valid items forcibly overwritten")
end
end


local function get_or_update_ewma(upstream, rtt, update)
local lock_ok, err = nil
if update then
lock_ok, err = lock(upstream)
end
local ewma = shm_ewma:get(upstream) or 0
if not lock_ok then
return ewma, err
end

local now = ngx_now()
local last_touched_at = shm_last_touched_at:get(upstream) or 0
ewma = decay_ewma(ewma, last_touched_at, rtt, now)

if not update then
return ewma
redynasc marked this conversation as resolved.
Show resolved Hide resolved
end

store_stats(upstream, ewma, now)

unlock()

return ewma
end


local function score(upstream)
-- Original implementation used names
-- Endpoints don't have names, so passing in IP:Port as key instead
local upstream_name = upstream.address .. ":" .. upstream.port
return get_or_update_ewma(upstream_name, 0, false)
end


-- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
-- or https://en.wikipedia.org/wiki/Random_permutation
-- loop from 1 .. k
-- pick a random value r from the remaining set of unpicked values (i .. n)
-- swap the value at position i with the value at position r
local function shuffle_peers(peers, k)
for i = 1, k do
local rand_index = math.random(i, #peers)
peers[i], peers[rand_index] = peers[rand_index], peers[i]
end
end


local function pick_and_score(peers, k)
redynasc marked this conversation as resolved.
Show resolved Hide resolved
shuffle_peers(peers, k)
local lowest_score_index = 1
local lowest_score = score(peers[lowest_score_index])
redynasc marked this conversation as resolved.
Show resolved Hide resolved
for i = 2, k do
local new_score = score(peers[i])
if new_score < lowest_score then
lowest_score_index, lowest_score = i, new_score
end
end

return peers[lowest_score_index], lowest_score
end


local function _trans_format(t1)
-- trans
--{"1.2.3.4:80":100,"5.6.7.8:8080":100}
-- into
-- [{"address":"1.2.3.4","port":"80"},{"address":"5.6.7.8","port":"8080"}]
local t2 = {}
redynasc marked this conversation as resolved.
Show resolved Hide resolved
local addr, port, err

for k,_ in pairs(t1) do
redynasc marked this conversation as resolved.
Show resolved Hide resolved
addr, port, err = core.utils.parse_addr(k)
if not err then
core.table.insert(t2, {address = addr, port = tostring(port)})
else
core.log.error('parse_addr error: ', k, err)
end
end

return next(t2) and t2 or nil
end


local function _ewma_find(up_nodes)
local peers
local endpoint
local err

if not ewma_lock then
ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks",
{timeout = 0, exptime = 0.1})
end
if not ewma_lock then
return nil, ewma_lock_err
end
peers = _trans_format(up_nodes)
redynasc marked this conversation as resolved.
Show resolved Hide resolved
if not peers then
err = 'up_nodes error'
return nil, err
end
endpoint = peers[1]
redynasc marked this conversation as resolved.
Show resolved Hide resolved
if #peers > 1 then
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
endpoint = pick_and_score(peers, k)
end

return endpoint.address .. ":" .. endpoint.port
end


local function _ewma_after_balance(ctx)
local response_time = tonumber(ctx.var.upstream_response_time) or 0
local connect_time = tonumber(ctx.var.upstream_connect_time) or 0
local rtt = connect_time + response_time
local upstream = ctx.var.upstream_addr

if not ewma_lock then
ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks",
{timeout = 0, exptime = 0.1})
redynasc marked this conversation as resolved.
Show resolved Hide resolved
end
if not ewma_lock then
return nil, ewma_lock
end
if not upstream then
return nil, "no upstream addr found"
end

return get_or_update_ewma(upstream, rtt, true)
end


function _M.new(up_nodes, upstream)
if not shm_ewma
or not shm_last_touched_at then
return nil, "dictionary not find"
end

return {
upstream = upstream,
get = function ()
return _ewma_find(up_nodes)
end,
after_balance = function(ctx)
redynasc marked this conversation as resolved.
Show resolved Hide resolved
return _ewma_after_balance(ctx)
end
}
end


return _M
4 changes: 4 additions & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,10 @@ function _M.http_log_phase()
local api_ctx = common_phase("log")
healcheck_passive(api_ctx)

if api_ctx.server_picker and api_ctx.server_picker.after_balance then
api_ctx.server_picker.after_balance(api_ctx)
end

if api_ctx.uri_parse_param then
core.tablepool.release("uri_parse_param", api_ctx.uri_parse_param)
end
Expand Down
2 changes: 1 addition & 1 deletion apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ local upstream_schema = {
type = {
description = "algorithms of load balancing",
type = "string",
enum = {"chash", "roundrobin"}
enum = {"chash", "roundrobin", "ewma"}
},
checks = health_checker,
hash_on = {
Expand Down
3 changes: 3 additions & 0 deletions bin/apisix
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ http {
lua_shared_dict worker-events 10m;
lua_shared_dict lrucache-lock 10m;
lua_shared_dict skywalking-tracing-buffer 100m;
lua_shared_dict balancer_ewma 10m;
lua_shared_dict balancer_ewma_locks 10m;
lua_shared_dict balancer_ewma_last_touched_at 10m;

# for openid-connect plugin
lua_shared_dict discovery 1m; # cache for discovery metadata documents
Expand Down
2 changes: 1 addition & 1 deletion doc/admin-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ In addition to the basic complex equalization algorithm selection, APISIX's Upst

|Name |Optional|Description|
|------- |-----|------|
|type |required|`roundrobin` supports the weight of the load, `chash` consistency hash, pick one of them.|
|type |required|`roundrobin` supports the weight of the load, `chash` consistency hash,`ewma` minimum latency ,pick one of them.|
redynasc marked this conversation as resolved.
Show resolved Hide resolved
|nodes |required if `k8s_deployment_info` not configured|Hash table, the key of the internal element is the upstream machine address list, the format is `Address + Port`, where the address part can be IP or domain name, such as `192.168.1.100:80`, `foo.com:80`, etc. Value is the weight of the node. In particular, when the weight value is `0`, it has a special meaning, which usually means that the upstream node is invalid and never wants to be selected.|
|k8s_deployment_info|required if `nodes` not configured|fields: `namespace`、`deploy_name`、`service_name`、`port`、`backend_type`, `port` is number, `backend_type` is `pod` or `service`, others is string. |
|hash_on |optional|This option is only valid if the `type` is `chash`. Supported types `vars`(Nginx variables), `header`(custom header), `cookie`, `consumer`, the default value is `vars`.|
Expand Down
90 changes: 90 additions & 0 deletions t/node/ewma.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX 'no_plan';

repeat_each(3);
#no_long_string();
no_root_location();
log_level('info');
run_tests;

__DATA__

=== TEST 1: add upstream
--- http_config
lua_shared_dict balancer_ewma 1m;
lua_shared_dict balancer_ewma_last_touched_at 1m;
lua_shared_dict balancer_ewma_locks 1m;
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"upstream": {
"nodes": {
"10.12.7.103:28002": 100,
membphis marked this conversation as resolved.
Show resolved Hide resolved
"10.12.7.103:29002": 100
},
"type": "ewma"
},
"uri": "/delay"
membphis marked this conversation as resolved.
Show resolved Hide resolved
}]],
[[{
"node": {
"value": {
"upstream": {
"nodes": {
"10.12.7.103:28002": 100,
"10.12.7.103:29002": 100
},
"type": "ewma"
},
"uri": "/delay"
},
"key": "/apisix/routes/1"
},
"action": "set"
}]]
)

if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]



=== TEST 2: access
--- http_config
lua_shared_dict balancer_ewma 1m;
redynasc marked this conversation as resolved.
Show resolved Hide resolved
lua_shared_dict balancer_ewma_last_touched_at 1m;
lua_shared_dict balancer_ewma_locks 1m;
--- request
GET /delay
--- error_code: 200
--- no_error_log
[error]