Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hotfix(mashape-analytics) improve ALF buffer under load #757

Merged
merged 5 commits into from
Dec 3, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 116 additions & 56 deletions kong/plugins/mashape-analytics/buffer.lua
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
-- ALF buffer module
--
-- This module contains a buffered array of ALF objects. When the buffer is full (max number of entries
-- or max payload size), it is converted to a JSON payload and moved a queue of payloads to be
-- sent to the server.
-- or max payload size accepted by the socket server), it is eventually converted to a JSON payload and moved a
-- queue of payloads to be sent to the server. "Eventually", because to prevent the sending queue from growing
-- too much and crashing the Lua VM, its size is limited (in bytes). If the sending queue is currently bloated
-- and reached its size limit, then the buffer is NOT added to it, and simply discarded. ALFs will be lost.
--
-- 1 buffer of ALFs (gets flushed once it reached the mmax size)
-- 1 queue of ready-to-be-sent batches which are JSON payloads
-- So to resume:
-- One buffer of ALFs (gets flushed once it reaches the max size)
-- One queue of pending, ready-to-be-sent batches which are JSON payloads (which also has a max size, in bytes)
--
-- We only remove a payload from the sent queue if it has been correctly received by the socket server.
-- We retry if there is any error during the sending.
-- We run a 'delayed timer' in case no call is received for a while to still flush
-- the buffer and have 'real-time' analytics.
-- 1. The sending queue keeps sending batches one by one, and if batches are acknowledged by the socket server,
-- the batch is considered saved and is discarded.
-- 2. If the batch is invalid (bad ALF formatting) according to the socket server, it is discarded and won't be retried.
-- 3. If the connection to the socket server could not be made, the batch will not be discarded so it can be retried.
-- 4. The sending queue keeps sending batches as long as it has some pending for sending. If the connection failed (3.),
-- the sending queue will use a retry policy timer which is incremented everytime the socket server did not answer.
-- 5. We run a 'delayed timer' in case no call is received for a while to still flush the buffer and have 'real-time' analytics.
--
-- @see alf_serializer.lua
-- @see handler.lua
Expand All @@ -26,15 +32,27 @@ local ngx_timer_at = ngx.timer.at
local table_insert = table.insert
local table_concat = table.concat
local table_remove = table.remove
local string_sub = string.sub
local string_len = string.len
local string_rep = string.rep
local string_format = string.format
local math_pow = math.pow
local math_min = math.min
local setmetatable = setmetatable

local MB = 1024 * 1024
local MAX_BUFFER_SIZE = 1 * MB
local MAX_BUFFER_SIZE = 500 * MB
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change from 1MB to 500MB correct? Seems awfully large. This is the maximum payload size that the socket server will accept in a single request, right? Given that max_queue_size defaults to 10MB, and the queue holds multiple batches, I would expect this value to be < 10MB.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value is correct. This constant will be renamed, and its actual purpose is to mirror a constant present in the Galileo collector. It is the maximum payload size accepted by an HTTP request by the Galileo collector. It used to be 1MB, and is now 500. I believe this value should mirror the Galileo restrictions in the case someone really knows what he is doing and wish to send extremely large batches of ALFs (possibly containing request bodies that are very large). This value is hard coded and cannot be bypassed. Any ALF beyond that limit will be dropped as it is too big by itself, and if a batch reaches that limit, it will be put in the sending_queue directly, wether it has reached the configured batch_size or not.

batch_size (n of ALFs) and sending_queue_size (Mbs) are the value one should tweak to enforce a low memory footprint. I could see a third value, being the batch_size but in terms of bytes too, being useful. In case one's ALFs are much much bigger than he intended to. It would make the plugin more complicated to understand and configure (it already does with the sending_queue_size so I am not sure about that yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @cb372 yes it's very large, but it isn't meant as a way to reject very large batches, it's mostly just a failsafe in case a client gets stuck and forgets to close the tcp socket or something like that. The collector handles it just fine even with extremely large batches. If it reaches a size that big without being a bad socket, it's often due to huge bodies and we don't want to drop those.

local EMPTY_ARRAY_PLACEHOLDER = "__empty_array_placeholder__"

-- Define an exponential retry policy for all workers.
-- The policy will give a delay that grows everytime
-- Galileo fails to respond. As soon as Galileo responds,
-- the delay is reset to its base.
local dict = ngx.shared.locks
local RETRY_INDEX_KEY = "mashape_analytics_retry_index"
local RETRY_BASE_DELAY = 1 -- seconds
local RETRY_MAX_DELAY = 60 -- seconds

local buffer_mt = {}
buffer_mt.__index = buffer_mt
buffer_mt.MAX_BUFFER_SIZE = MAX_BUFFER_SIZE
Expand All @@ -44,10 +62,10 @@ buffer_mt.MAX_BUFFER_SIZE = MAX_BUFFER_SIZE
-- as possible.
local delayed_send_handler
delayed_send_handler = function(premature, buffer)
if ngx_now() - buffer.latest_call < buffer.AUTO_FLUSH_DELAY then
if ngx_now() - buffer.latest_call < buffer.auto_flush_delay then
-- If the latest call was received during the wait delay, abort the delayed send and
-- report it for X more seconds.
local ok, err = ngx_timer_at(buffer.AUTO_FLUSH_DELAY, delayed_send_handler, buffer)
local ok, err = ngx_timer_at(buffer.auto_flush_delay, delayed_send_handler, buffer)
if not ok then
buffer.lock_delayed = false -- re-enable creation of a delayed-timer for this buffer
ngx_log(ngx_log_ERR, "[mashape-analytics] failed to create delayed batch sending timer: ", err)
Expand All @@ -64,18 +82,20 @@ end
-- Instanciate a new buffer with configuration and properties
function buffer_mt.new(conf)
local buffer = {
MAX_ENTRIES = conf.batch_size,
MAX_SIZE = MAX_BUFFER_SIZE,
AUTO_FLUSH_DELAY = conf.delay,
HOST = conf.host,
PORT = conf.port,
PATH = conf.path,
max_entries = conf.batch_size,
max_entries_size = buffer_mt.MAX_BUFFER_SIZE, -- using the value attached to the buffer_mt allows us to unit test this
auto_flush_delay = conf.delay,
host = conf.host,
port = conf.port,
path = conf.path,
max_queue_size = conf.sending_queue_size * MB,
entries = {}, -- current buffer as an array of strings (serialized ALFs)
entries_size = 0, -- current buffer size in bytes
entries_size = 0, -- current entries size in bytes (total)
sending_queue = {}, -- array of constructed payloads (batches of ALFs) to be sent
sending_queue_size = 0, -- current sending queue size in bytes
lock_sending = false, -- lock if currently sending its data
lock_delayed = false, -- lock if a delayed timer is already set for this buffer
latest_call = nil -- date at which a request was last made to this API (for delayed timer)
latest_call = nil -- date at which a request was last made to this API (for the delayed timer to know if it needs to trigger)
}
return setmetatable(buffer, buffer_mt)
end
Expand All @@ -93,18 +113,17 @@ function buffer_mt:add_alf(alf)
str = str:gsub("\""..EMPTY_ARRAY_PLACEHOLDER.."\"", ""):gsub("\\/", "/")

-- Check what would be the size of the buffer
local next_n_entries = #self.entries + 1
local next_n_entries = table_getn(self.entries) + 1
local alf_size = string_len(str)

-- If the alf_size exceeds the payload limit by itself, we have a big problem
if alf_size > self.MAX_SIZE then
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] ALF size exceeded the maximum size (%sMB) accepted by the socket server. Dropping it.",
self.MAX_SIZE / MB))
if alf_size > self.max_entries_size then
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] ALF size exceeded the maximum size (%sMB) accepted by the socket server. Dropping it.", self.max_entries_size / MB))
return
end

-- If size or entries exceed the max limits
local full = next_n_entries > self.MAX_ENTRIES or (self:get_size() + alf_size) > self.MAX_SIZE
local full = next_n_entries > self.max_entries or (self:get_size() + alf_size) > self.max_entries_size
if full then
self:flush()
-- Batch size reached, let's send the data
Expand All @@ -116,7 +135,7 @@ function buffer_mt:add_alf(alf)
-- Batch size not yet reached.
-- Set a timer sending the data only in case nothing happens for awhile or if the batch_size is taking
-- too much time to reach the limit and trigger the flush.
local ok, err = ngx_timer_at(self.AUTO_FLUSH_DELAY, delayed_send_handler, self)
local ok, err = ngx_timer_at(self.auto_flush_delay, delayed_send_handler, self)
if ok then
self.lock_delayed = true -- Make sure only one delayed timer is ever pending for a given buffer
else
Expand All @@ -137,28 +156,40 @@ end

-- Get the size of the current buffer if it was to be converted to a JSON payload
function buffer_mt:get_size()
local commas = string_rep(",", #self.entries - 1)
local commas = string_rep(",", table_getn(self.entries) - 1)
return string_len(commas.."[]") + self.entries_size
end

-- Flush the buffer
-- 1. Convert the content of it into a JSON payload
-- 2. Add the payload to the queue of payloads to be sent
-- 3. Empty the buffer and reset the current buffer size
-- 1. Make sure the current sending queue doesn't exceed its size limit
-- 1b. Convert its content into a JSON payload
-- 1c. Add the payload to the queue of payloads to be sent
-- 2. Empty the buffer and reset the current buffer size
function buffer_mt:flush()
local payload = self:payload_string()
table_insert(self.sending_queue, {
payload = payload,
n_entries = #self.entries,
size = self:get_size()
})
local size = self:get_size()

-- Make sure we don't cross the size limit. The only exception is if the sending_queue is empty,
-- it could happen that the configuration is erroneous (ex: batch_size too big for a too small sending_queue_size)
if self.sending_queue_size + size <= self.max_queue_size or table_getn(self.sending_queue) < 1 then
self.sending_queue_size = self.sending_queue_size + size

table_insert(self.sending_queue, {
payload = self:payload_string(),
n_entries = table_getn(self.entries),
size = size
})
else
ngx_log(ngx.NOTICE, string_format("[mashape-analytics] buffer reached its maximum sending queue size. (%s) ALFs, (%s) bytes dropped.", table_getn(self.entries), size))
end

self.entries = {}
self.entries_size = 0
end

-- Send the oldest payload (batch of ALFs) from the queue to the socket server.
-- The payload will be removed if the socket server acknowledged the batch.
-- If the queue still has payloads to be sent, keep on sending them.
-- If the connection to the socket server fails, use the retry policy.
function buffer_mt.send_batch(premature, self)
if self.lock_sending then return end
self.lock_sending = true -- simple lock
Expand All @@ -170,51 +201,80 @@ function buffer_mt.send_batch(premature, self)
-- Let's send the oldest batch in our queue
local batch_to_send = table_remove(self.sending_queue, 1)

local drop_batch = false
local retry
local client = http:new()
client:set_timeout(50000) -- 5 sec

local ok, err = client:connect(self.HOST, self.PORT)
local ok, err = client:connect(self.host, self.port)
if ok then
local res, err = client:request({path = self.PATH, body = batch_to_send.payload})
local res, err = client:request({path = self.path, body = batch_to_send.payload})
if not res then
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] failed to send batch (%s ALFs %s bytes): %s",
batch_to_send.n_entries, batch_to_send.size, err))
elseif res.status == 200 then
drop_batch = true
ngx_log(ngx.DEBUG, string_format("[mashape-analytics] successfully saved the batch. (%s)", res.body))
elseif res.status == 400 then
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] socket server refused the batch (%s ALFs %s bytes). Dropping batch. Status: (%s) Error: (%s)",
batch_to_send.n_entries, batch_to_send.size, res.status, res.body))
drop_batch = true
retry = true
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] failed to send batch (%s ALFs %s bytes): %s", batch_to_send.n_entries, batch_to_send.size, err))
else
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] socket server could not save the batch (%s ALFs %s bytes). Status: (%s) Error: (%s)",
batch_to_send.n_entries, batch_to_send.size, res.status, res.body))
res.body = string_sub(res.body, 1, -2) -- remove trailing line jump for logs
if res.status == 200 then
ngx_log(ngx.NOTICE, string_format("[mashape-analytics] successfully saved the batch. (%s)", res.body))
elseif res.status == 207 then
retry = true
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] socket server could not save all ALFs from the batch. (%s)", res.body))
elseif res.status == 400 then
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] socket server refused the batch (%s ALFs %s bytes). Dropping batch. Status: (%s) Error: (%s)", batch_to_send.n_entries, batch_to_send.size, res.status, res.body))
else
retry = true
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] socket server could not save the batch (%s ALFs %s bytes). Status: (%s) Error: (%s)", batch_to_send.n_entries, batch_to_send.size, res.status, res.body))
end
end

-- close connection, or put it into the connection pool
if not res or res.headers["connection"] == "close" then
ok, err = client:close()
if not ok then
ngx_log(ngx_log_ERR, "[mashape-analytics] failed to close socket: "..err)
ngx_log(ngx_log_ERR, "[mashape-analytics] failed to close socket: ", err)
end
else
client:set_keepalive()
end
else
ngx_log(ngx_log_ERR, "[mashape-analytics] failed to connect to the socket server: "..err)
retry = true
ngx_log(ngx_log_ERR, "[mashape-analytics] failed to connect to the socket server: ", err)
end

if not drop_batch then
-- If the batch is not dropped, then add it back to the end of the queue and it will be tried again later
table_insert(self.sending_queue, batch_to_send)
local next_batch_delay = 0 -- default delay for the next batch sending

if retry then
-- could not reach the socket server, need to retry
table_insert(self.sending_queue, 1, batch_to_send)

local ok, err = dict:add(RETRY_INDEX_KEY, 0)
if not ok and err ~= "exists" then
ngx_log(ngx_log_ERR, "[mashape-analytics] cannot prepare retry policy: ", err)
end

local index, err = dict:incr(RETRY_INDEX_KEY, 1)
if err then
ngx_log(ngx_log_ERR, "[mashape-analytics] cannot increment retry policy index: ", err)
elseif index then
next_batch_delay = math_min(math_pow(index, 2) * RETRY_BASE_DELAY, RETRY_MAX_DELAY)
end

ngx_log(ngx.NOTICE, string_format("[mashape-analytics] batch was queued for retry. Next retry in: %s seconds", next_batch_delay))
else
-- batch acknowledged by the socket server
self.sending_queue_size = self.sending_queue_size - batch_to_send.size

-- reset retry policy
local ok, err = dict:set(RETRY_INDEX_KEY, 0)
if not ok then
ngx_log(ngx_log_ERR, "[mashape-analytics] cannot reset retry policy index: ", err)
end
end

self.lock_sending = false

-- Keep sendind data if the queue is not yet emptied
if #self.sending_queue > 0 then
local ok, err = ngx_timer_at(2, self.send_batch, self)
if table_getn(self.sending_queue) > 0 then
local ok, err = ngx_timer_at(next_batch_delay, self.send_batch, self)
if not ok then
ngx_log(ngx_log_ERR, "[mashape-analytics] failed to create batch retry timer: ", err)
end
Expand Down
17 changes: 9 additions & 8 deletions kong/plugins/mashape-analytics/schema.lua
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
return {
fields = {
service_token = { type = "string", required = true },
environment = { type = "string" },
batch_size = { type = "number", default = 100 },
log_body = { type = "boolean", default = false },
delay = { type = "number", default = 2 },
host = { required = true, type = "string", default = "socket.analytics.mashape.com" },
port = { required = true, type = "number", default = 80 },
path = { required = true, type = "string", default = "/1.0.0/batch" }
service_token = {type = "string", required = true},
environment = {type = "string"},
batch_size = {type = "number", default = 100},
log_body = {type = "boolean", default = false},
delay = {type = "number", default = 2},
sending_queue_size = {type = "number", default = 10}, -- in mb
host = {required = true, type = "string", default = "socket.analytics.mashape.com"},
port = {required = true, type = "number", default = 80},
path = {required = true, type = "string", default = "/1.0.0/batch"}
}
}
Loading