Skip to content

Commit dead60d

Browse files
committed
proxy: mcp.internal fixes and tests
- Refcount leak on sets - Move the response elapsed timer back closer to when the response was processed as to not clobber the wrong IO object data - Restores error messages from set/ms - Adds start of unit tests Requests will look like they run a tiiiiny bit faster than they do, but I need to get the elapsed time there for a later change.
1 parent 9e740a9 commit dead60d

File tree

5 files changed

+252
-8
lines changed

5 files changed

+252
-8
lines changed

proto_proxy.c

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,14 +290,8 @@ void proxy_return_cb(io_pending_t *pending) {
290290
if (p->is_await) {
291291
mcplib_await_return(p);
292292
} else {
293-
struct timeval end;
294293
lua_State *Lc = p->coro;
295294

296-
// stamp the elapsed time into the response object.
297-
gettimeofday(&end, NULL);
298-
p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 +
299-
(end.tv_usec - p->client_resp->start.tv_usec);
300-
301295
// in order to resume we need to remove the objects that were
302296
// originally returned
303297
// what's currently on the top of the stack is what we want to keep.

proxy_internal.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,13 +347,13 @@ static void process_update_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *re
347347
if (it == 0) {
348348
//enum store_item_type status;
349349
if (! item_size_ok(nkey, flags, pr->vlen)) {
350-
//out_string(c, "SERVER_ERROR object too large for cache");
350+
pout_string(resp, "SERVER_ERROR object too large for cache");
351351
//status = TOO_LARGE;
352352
pthread_mutex_lock(&t->stats.mutex);
353353
t->stats.store_too_large++;
354354
pthread_mutex_unlock(&t->stats.mutex);
355355
} else {
356-
//out_of_memory(c, "SERVER_ERROR out of memory storing object");
356+
pout_string(resp, "SERVER_ERROR out of memory storing object");
357357
//status = NO_MEMORY;
358358
pthread_mutex_lock(&t->stats.mutex);
359359
t->stats.store_no_memory++;
@@ -407,6 +407,8 @@ static void process_update_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *re
407407
pout_string(resp, "SERVER_ERROR Unhandled storage type.");
408408
}
409409

410+
// We don't need to hold a reference since the item was fully read.
411+
item_remove(it);
410412
}
411413

412414
static void process_arithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, const bool incr) {
@@ -1214,6 +1216,8 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp
12141216
resp->wbytes = p - resp->wbuf;
12151217
resp_add_iov(resp, resp->wbuf, resp->wbytes);
12161218

1219+
item_remove(it);
1220+
12171221
return;
12181222
error:
12191223
// Note: no errors possible after the item was successfully allocated.

proxy_network.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -966,6 +966,7 @@ static void _stop_timeout_event(mcp_backend_t *be) {
966966
static int proxy_backend_drive_machine(mcp_backend_t *be) {
967967
bool stop = false;
968968
io_pending_proxy_t *p = NULL;
969+
struct timeval end;
969970
int flags = 0;
970971

971972
p = STAILQ_FIRST(&be->io_head);
@@ -1166,6 +1167,12 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
11661167
STAILQ_REMOVE_HEAD(&be->io_head, io_next);
11671168
be->depth--;
11681169
be->pending_read--;
1170+
1171+
// stamp the elapsed time into the response object.
1172+
gettimeofday(&end, NULL);
1173+
p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 +
1174+
(end.tv_usec - p->client_resp->start.tv_usec);
1175+
11691176
// have to do the q->count-- and == 0 and redispatch_conn()
11701177
// stuff here. The moment we call return_io here we
11711178
// don't own *p anymore.

t/proxyinternal.lua

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
function mcp_config_pools(oldss)
2+
mcp.backend_read_timeout(0.5)
3+
mcp.backend_connect_timeout(5)
4+
5+
local srv = mcp.backend
6+
7+
-- Single backend for zones to ease testing.
8+
-- For purposes of this config the proxy is always "zone 1" (z1)
9+
local b1 = srv('b1', '127.0.0.1', 11611)
10+
local b2 = srv('b2', '127.0.0.1', 11612)
11+
local b3 = srv('b3', '127.0.0.1', 11613)
12+
13+
local b1z = {b1}
14+
local b2z = {b2}
15+
local b3z = {b3}
16+
17+
-- convert the backends to pools.
18+
-- as per a normal full config see simple.lua or t/startfile.lua
19+
local zones = {
20+
z1 = mcp.pool(b1z),
21+
z2 = mcp.pool(b2z),
22+
z3 = mcp.pool(b3z),
23+
}
24+
25+
return zones
26+
end
27+
28+
-- WORKER CODE:
29+
30+
-- Using a very simple route handler only to allow testing the three
31+
-- workarounds in the same configuration file.
32+
function prefix_factory(pattern, list, default)
33+
local p = pattern
34+
local l = list
35+
local d = default
36+
return function(r)
37+
local route = l[string.match(r:key(), p)]
38+
if route == nil then
39+
return d(r)
40+
end
41+
return route(r)
42+
end
43+
end
44+
45+
-- just for golfing the code in mcp_config_routes()
46+
function toproute_factory(pfx, label)
47+
local err = "SERVER_ERROR no " .. label .. " route\r\n"
48+
return prefix_factory("^/(%a+)/", pfx, function(r) return err end)
49+
end
50+
51+
-- Do specialized testing based on the key prefix.
52+
function mcp_config_routes(zones)
53+
local pfx_get = {}
54+
local pfx_set = {}
55+
local pfx_touch = {}
56+
local pfx_gets = {}
57+
local pfx_gat = {}
58+
local pfx_gats = {}
59+
local pfx_cas = {}
60+
local pfx_add = {}
61+
local pfx_delete = {}
62+
local pfx_incr = {}
63+
local pfx_decr = {}
64+
local pfx_append = {}
65+
local pfx_prepend = {}
66+
local pfx_mg = {}
67+
local pfx_ms = {}
68+
local pfx_md = {}
69+
local pfx_ma = {}
70+
71+
local basic = function(r)
72+
return mcp.internal(r)
73+
end
74+
75+
pfx_get["b"] = basic
76+
pfx_set["b"] = basic
77+
pfx_touch["b"] = basic
78+
pfx_gets["b"] = basic
79+
pfx_gat["b"] = basic
80+
pfx_gats["b"] = basic
81+
pfx_cas["b"] = basic
82+
pfx_add["b"] = basic
83+
pfx_delete["b"] = basic
84+
pfx_incr["b"] = basic
85+
pfx_decr["b"] = basic
86+
pfx_append["b"] = basic
87+
pfx_prepend["b"] = basic
88+
pfx_mg["b"] = basic
89+
pfx_ms["b"] = basic
90+
pfx_md["b"] = basic
91+
pfx_ma["b"] = basic
92+
93+
mcp.attach(mcp.CMD_GET, toproute_factory(pfx_get, "get"))
94+
mcp.attach(mcp.CMD_SET, toproute_factory(pfx_set, "set"))
95+
mcp.attach(mcp.CMD_TOUCH, toproute_factory(pfx_touch, "touch"))
96+
mcp.attach(mcp.CMD_GETS, toproute_factory(pfx_gets, "gets"))
97+
mcp.attach(mcp.CMD_GAT, toproute_factory(pfx_gat, "gat"))
98+
mcp.attach(mcp.CMD_GATS, toproute_factory(pfx_gats, "gats"))
99+
mcp.attach(mcp.CMD_CAS, toproute_factory(pfx_cas, "cas"))
100+
mcp.attach(mcp.CMD_ADD, toproute_factory(pfx_add, "add"))
101+
mcp.attach(mcp.CMD_DELETE, toproute_factory(pfx_delete, "delete"))
102+
mcp.attach(mcp.CMD_INCR, toproute_factory(pfx_incr, "incr"))
103+
mcp.attach(mcp.CMD_DECR, toproute_factory(pfx_decr, "decr"))
104+
mcp.attach(mcp.CMD_APPEND, toproute_factory(pfx_append, "append"))
105+
mcp.attach(mcp.CMD_PREPEND, toproute_factory(pfx_prepend, "prepend"))
106+
mcp.attach(mcp.CMD_MG, toproute_factory(pfx_mg, "mg"))
107+
mcp.attach(mcp.CMD_MS, toproute_factory(pfx_ms, "ms"))
108+
mcp.attach(mcp.CMD_MD, toproute_factory(pfx_md, "md"))
109+
mcp.attach(mcp.CMD_MA, toproute_factory(pfx_ma, "ma"))
110+
111+
end

t/proxyinternal.t

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
#!/usr/bin/env perl
2+
3+
use strict;
4+
use warnings;
5+
use Test::More;
6+
use FindBin qw($Bin);
7+
use lib "$Bin/lib";
8+
use Carp qw(croak);
9+
use MemcachedTest;
10+
use IO::Socket qw(AF_INET SOCK_STREAM);
11+
use IO::Select;
12+
13+
if (!supports_proxy()) {
14+
plan skip_all => 'proxy not enabled';
15+
exit 0;
16+
}
17+
18+
# Don't want to write two distinct set of tests, and extstore is a default.
19+
if (!supports_extstore()) {
20+
plan skip_all => 'extstore not enabled';
21+
exit 0;
22+
}
23+
24+
my $ext_path = "/tmp/proxyinternal.$$";
25+
26+
# Set up some server sockets.
27+
sub mock_server {
28+
my $port = shift;
29+
my $srv = IO::Socket->new(
30+
Domain => AF_INET,
31+
Type => SOCK_STREAM,
32+
Proto => 'tcp',
33+
LocalHost => '127.0.0.1',
34+
LocalPort => $port,
35+
ReusePort => 1,
36+
Listen => 5) || die "IO::Socket: $@";
37+
return $srv;
38+
}
39+
40+
# Put a version command down the pipe to ensure the socket is clear.
41+
# client version commands skip the proxy code
42+
sub check_version {
43+
my $ps = shift;
44+
print $ps "version\r\n";
45+
like(<$ps>, qr/VERSION /, "version received");
46+
}
47+
48+
my @mocksrvs = ();
49+
#diag "making mock servers";
50+
for my $port (11611, 11612, 11613) {
51+
my $srv = mock_server($port);
52+
ok(defined $srv, "mock server created");
53+
push(@mocksrvs, $srv);
54+
}
55+
56+
my $p_srv = new_memcached("-o proxy_config=./t/proxyinternal.lua,ext_item_size=500,ext_item_age=1,ext_path=$ext_path:64m,ext_max_sleep=100000 -l 127.0.0.1 -U 0", 11510);
57+
my $ps = $p_srv->sock;
58+
$ps->autoflush(1);
59+
60+
# set up server backend sockets.
61+
# uncomment when needed. currently they get thrown out so this can hang.
62+
#my @mbe = ();
63+
#diag "accepting mock backends";
64+
#for my $msrv (@mocksrvs) {
65+
# my $be = $msrv->accept();
66+
# $be->autoflush(1);
67+
# ok(defined $be, "mock backend created");
68+
# push(@mbe, $be);
69+
#}
70+
71+
#diag "validating backends";
72+
#for my $be (@mbe) {
73+
# like(<$be>, qr/version/, "received version command");
74+
# print $be "VERSION 1.0.0-mock\r\n";
75+
#}
76+
77+
#diag "object too large"
78+
{
79+
my $data = 'x' x 2000000;
80+
print $ps "set /b/toolarge 0 0 2000000\r\n$data\r\n";
81+
is(scalar <$ps>, "SERVER_ERROR object too large for cache\r\n", "set too large");
82+
83+
print $ps "ms /b/toolarge 2000000 T30\r\n$data\r\n";
84+
is(scalar <$ps>, "SERVER_ERROR object too large for cache\r\n", "ms too large");
85+
}
86+
87+
#diag "basic tests"
88+
{
89+
print $ps "set /b/foo 0 0 2\r\nhi\r\n";
90+
is(scalar <$ps>, "STORED\r\n", "int set");
91+
print $ps "get /b/foo\r\n";
92+
is(scalar <$ps>, "VALUE /b/foo 0 2\r\n", "get response");
93+
is(scalar <$ps>, "hi\r\n", "get value");
94+
is(scalar <$ps>, "END\r\n", "get END");
95+
}
96+
97+
#diag "fetch from extstore"
98+
{
99+
my $data = 'x' x 1000;
100+
print $ps "set /b/ext 0 0 1000\r\n$data\r\n";
101+
is(scalar <$ps>, "STORED\r\n", "int set for extstore");
102+
sleep 3; # TODO: import wait_for_ext
103+
104+
print $ps "get /b/ext\r\n";
105+
is(scalar <$ps>, "VALUE /b/ext 0 1000\r\n", "get response from extstore");
106+
is(scalar <$ps>, "$data\r\n", "got data from extstore");
107+
is(scalar <$ps>, "END\r\n", "get END");
108+
}
109+
110+
#diag "flood memory"
111+
{
112+
# ensure we don't have a basic reference counter leak
113+
my $data = 'x' x 500000;
114+
for (1 .. 200) {
115+
print $ps "set /b/$_ 0 0 500000\r\n$data\r\n";
116+
is(scalar <$ps>, "STORED\r\n", "flood set");
117+
}
118+
for (1 .. 200) {
119+
print $ps "ms /b/$_ 500000 T30\r\n$data\r\n";
120+
is(scalar <$ps>, "HD\r\n", "flood ms");
121+
}
122+
}
123+
124+
done_testing();
125+
126+
END {
127+
unlink $ext_path if $ext_path;
128+
}

0 commit comments

Comments
 (0)