Skip to content

Commit

Permalink
tcp cosocket: shutdown support. closes openresty#2 (openresty#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
dndx authored Oct 6, 2017
1 parent d308670 commit ff64e06
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 18 deletions.
55 changes: 54 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ documentation of `ngx_http_lua_module` for more details about their usage and be
The [send_timeout](http://nginx.org/r/send_timeout) directive in the Nginx "http" subsystem is missing in the "stream" subsystem.
So `ngx_stream_lua_module` uses the `lua_socket_send_timeout` for this purpose.

**Note:** the lingering close directive that used to exist in older version of stream_lua_nginx_module has been removed and can
now be simulated with the newly added [tcpsock:shutdown](#tcpsockshutdown) method if necessary.

[Back to TOC](#table-of-contents)

Nginx API for Lua
Expand Down Expand Up @@ -170,6 +173,57 @@ other stream modules.
is ignored and the raw request socket is always returned. Unlike `ngx_http_lua_module`,
you can still call output API functions like `ngx.say`, `ngx.print`, and `ngx.flush`
after acquiring the raw request socket via this function.

Raw request socket returned by this module will contain the following extra method:

tcpsock:shutdown
----------------
**syntax:** *ok, err = tcpsock:shutdown("send")*

**context:** *content_by_lua**

Shuts down the write part of the request socket, prevents all further writing to the client
and sends TCP FIN, while keeping the reading half open.

Currently only the `"send"` direction is supported. Using any parameters other than "send" will return
an error.

If you called any output functions (like [ngx.say](https://github.com/openresty/lua-nginx-module#ngxsay))
before calling this method, consider use `ngx.flush(true)` to make sure all busy buffers are complely
flushed before shutting down the socket. If any busy buffers were detected, this method will return `nil`
will error message `"socket busy writing"`.

This feature is particularly useful for protocols that generates response before actually
finishes consuming all incoming data. Normally Kernel will send out RST to the client when
[tcpsock:close](https://github.com/openresty/lua-nginx-module#tcpsockclose) is called without
emptying the receiving buffer first. Calling this method will allow you to keep reading from
the receiving buffer and prevents RST from being sent.

You can also use this method to simulate lingering close similar to that
[provided by the ngx_http_core_module](https://nginx.org/en/docs/http/ngx_http_core_module.html#lingering_close)
for protocols that needs such behavior. Here is an example:

```lua
local LINGERING_TIME = 30 -- 30 seconds
local LINGERING_TIMEOUT = 5000 -- 5 seconds

local ok, err = sock:shutdown("send")
if not ok then
ngx.log(ngx.ERR, "failed to shutdown: ", err)
return
end

local deadline = ngx.time() + LINGERING_TIME

sock:settimeouts(nil, nil, LINGERING_TIMEOUT)

repeat
local data, _, partial = sock:receive(1024)
until (not data and not partial) or ngx.time() >= deadline
```

[Back to TOC](#directives)

* [ngx.print](https://github.com/openresty/lua-nginx-module#ngxprint)
* [ngx.say](https://github.com/openresty/lua-nginx-module#ngxsay)
* [ngx.log](https://github.com/openresty/lua-nginx-module#ngxlog)
Expand Down Expand Up @@ -244,7 +298,6 @@ TODO
* Add new directives `log_by_lua_block` and `log_by_lua_file`.
* Add new directives `balancer_by_lua_block` and `balancer_by_lua_file`.
* Add new directives `ssl_certificate_by_lua_block` and `ssl_certificate_by_lua_file`.
* Port `lua_lingering_close`, `lua_lingering_time` and `lua_lingering_timeout` directives over.
* Add `ngx.semaphore` API.
* Add `ngx_meta_lua_module` to share as much code as possible between this module and `ngx_http_lua_module` and allow sharing
of `lua_shared_dict`.
Expand Down
6 changes: 6 additions & 0 deletions src/ngx_stream_lua_request.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@

/*
* Copyright (C) OpenResty Inc.
*/


#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_stream.h>
Expand Down
6 changes: 6 additions & 0 deletions src/ngx_stream_lua_request.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@

/*
* Copyright (C) OpenResty Inc.
*/


#ifndef _NGX_STREAM_LUA_REQUEST_H_INCLUDED_
#define _NGX_STREAM_LUA_REQUEST_H_INCLUDED_

Expand Down
132 changes: 122 additions & 10 deletions src/ngx_stream_lua_socket_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ static int ngx_stream_lua_socket_tcp_sslhandshake(lua_State *L);
static int ngx_stream_lua_socket_tcp_receive(lua_State *L);
static int ngx_stream_lua_socket_tcp_send(lua_State *L);
static int ngx_stream_lua_socket_tcp_close(lua_State *L);
static int ngx_stream_lua_socket_tcp_shutdown(lua_State *L);
static int ngx_stream_lua_socket_tcp_setoption(lua_State *L);
static int ngx_stream_lua_socket_tcp_settimeout(lua_State *L);
static int ngx_stream_lua_socket_tcp_settimeouts(lua_State *L);
Expand All @@ -49,7 +50,7 @@ static void ngx_stream_lua_socket_tcp_finalize(ngx_stream_lua_request_t *r,
static void ngx_stream_lua_socket_tcp_finalize_read_part(ngx_stream_lua_request_t *r,
ngx_stream_lua_socket_tcp_upstream_t *u);
static void ngx_stream_lua_socket_tcp_finalize_write_part(ngx_stream_lua_request_t *r,
ngx_stream_lua_socket_tcp_upstream_t *u);
ngx_stream_lua_socket_tcp_upstream_t *u, int do_shutdown);
static ngx_int_t ngx_stream_lua_socket_send(ngx_stream_lua_request_t *r,
ngx_stream_lua_socket_tcp_upstream_t *u);
static ngx_int_t ngx_stream_lua_socket_test_connect(ngx_stream_lua_request_t *r,
Expand Down Expand Up @@ -179,7 +180,7 @@ enum {
return 2; \
} \
if ((u)->raw_downstream \
&& ((r)->connection->buffered)) \
&& ((r)->connection->buffered)) \
{ \
lua_pushnil(L); \
lua_pushliteral(L, "socket busy writing"); \
Expand Down Expand Up @@ -253,6 +254,9 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
lua_pushcfunction(L, ngx_stream_lua_socket_tcp_settimeouts);
lua_setfield(L, -2, "settimeouts"); /* ngx socket mt */

lua_pushcfunction(L, ngx_stream_lua_socket_tcp_shutdown);
lua_setfield(L, -2, "shutdown");

lua_pushvalue(L, -1);
lua_setfield(L, -2, "__index");

Expand Down Expand Up @@ -285,6 +289,9 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
lua_pushcfunction(L, ngx_stream_lua_socket_tcp_close);
lua_setfield(L, -2, "close");

lua_pushcfunction(L, ngx_stream_lua_socket_tcp_shutdown);
lua_setfield(L, -2, "shutdown");

lua_pushcfunction(L, ngx_stream_lua_socket_tcp_setoption);
lua_setfield(L, -2, "setoption");

Expand Down Expand Up @@ -1642,7 +1649,7 @@ ngx_stream_lua_socket_write_error_retval_handler(ngx_stream_lua_request_t *r,
u->write_co_ctx->cleanup = NULL;
}

ngx_stream_lua_socket_tcp_finalize_write_part(r, u);
ngx_stream_lua_socket_tcp_finalize_write_part(r, u, 0);

ft_type = u->ft_type;
u->ft_type = 0;
Expand Down Expand Up @@ -2633,6 +2640,102 @@ ngx_stream_lua_socket_tcp_close(lua_State *L)
}


static int
ngx_stream_lua_socket_tcp_shutdown(lua_State *L)
{
ngx_stream_lua_request_t *r;
ngx_stream_lua_socket_tcp_upstream_t *u;
ngx_str_t direction;
char *p;
ngx_stream_lua_ctx_t *ctx;

if (lua_gettop(L) != 2) {
return luaL_error(L, "expecting 2 arguments "
"(including the object) but seen %d", lua_gettop(L));
}

luaL_checktype(L, 1, LUA_TTABLE);

lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
u = lua_touserdata(L, -1);
lua_pop(L, 1);

r = ngx_stream_lua_get_req(L);
if (r == NULL) {
return luaL_error(L, "no request found");
}

ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
if (ctx == NULL) {
ngx_stream_lua_socket_handle_write_error(r, u,
NGX_STREAM_LUA_SOCKET_FT_ERROR);
return NGX_ERROR;
}

/*
* only allow shutdown on raw request in stream module in content phase.
* in http module, lingering close will take care of the shutdown.
* in stream module, it is unsafe to shutdown prior on reaching content phase
* as later phases may still need to write to the socket.
*/

if (u->raw_downstream) {
ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_CONTENT);

if (ctx->eof) {
lua_pushnil(L);
lua_pushliteral(L, "seen eof");
return 2;
}

/* prevent all further output attempt */
ctx->eof = 1;
}

if (u == NULL
|| u->peer.connection == NULL
|| (u->read_closed && u->write_closed))
{
lua_pushnil(L);
lua_pushliteral(L, "closed");
return 2;
}

if (u->write_closed) {
lua_pushnil(L);
lua_pushliteral(L, "already shutdown");
return 2;
}

if (u->request != r) {
return luaL_error(L, "bad request");
}

ngx_stream_lua_socket_check_busy_connecting(r, u, L);
ngx_stream_lua_socket_check_busy_writing(r, u, L);

/* shutdown */
direction.data = (u_char *) luaL_checklstring(L, 2, &direction.len);
if (direction.len == 0) {
lua_pushnil(L);
lua_pushliteral(L, "pattern is empty");
return 2;
}

if (direction.len != 4 || ngx_strcmp(direction.data, "send") != 0) {
p = (char *) lua_pushfstring(L, "bad shutdown argument: %s",
(char *) direction.data);

return luaL_argerror(L, 2, p);
}

ngx_stream_lua_socket_tcp_finalize_write_part(r, u, 1);

lua_pushinteger(L, 1);
return 1;
}


static int
ngx_stream_lua_socket_tcp_setoption(lua_State *L)
{
Expand Down Expand Up @@ -3330,11 +3433,13 @@ ngx_stream_lua_socket_tcp_finalize_read_part(ngx_stream_lua_request_t *r,

static void
ngx_stream_lua_socket_tcp_finalize_write_part(ngx_stream_lua_request_t *r,
ngx_stream_lua_socket_tcp_upstream_t *u)
ngx_stream_lua_socket_tcp_upstream_t *u, int do_shutdown)
{
ngx_connection_t *c;
ngx_connection_t *c;
ngx_stream_lua_ctx_t *ctx;

c = u->peer.connection;

if (u->write_closed) {
return;
}
Expand All @@ -3343,6 +3448,17 @@ ngx_stream_lua_socket_tcp_finalize_write_part(ngx_stream_lua_request_t *r,

ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);

if (c && do_shutdown) {
if (ngx_shutdown_socket(c->fd, NGX_WRITE_SHUTDOWN) == -1) {
ngx_connection_error(c, ngx_socket_errno,
ngx_shutdown_socket_n " failed");
return;
}

ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
"stream lua shutdown socket write direction");
}

if (u->raw_downstream || u->body_downstream) {
if (ctx && ctx->writing_raw_req_socket) {
ctx->writing_raw_req_socket = 0;
Expand All @@ -3355,8 +3471,6 @@ ngx_stream_lua_socket_tcp_finalize_write_part(ngx_stream_lua_request_t *r,
return;
}

c = u->peer.connection;

if (c) {
if (c->write->timer_set) {
ngx_del_timer(c->write);
Expand All @@ -3375,8 +3489,6 @@ ngx_stream_lua_socket_tcp_finalize_write_part(ngx_stream_lua_request_t *r,
}

c->write->closed = 1;

/* TODO: shutdown the writing part of the connection */
}
}

Expand All @@ -3400,7 +3512,7 @@ ngx_stream_lua_socket_tcp_finalize(ngx_stream_lua_request_t *r,
}

ngx_stream_lua_socket_tcp_finalize_read_part(r, u);
ngx_stream_lua_socket_tcp_finalize_write_part(r, u);
ngx_stream_lua_socket_tcp_finalize_write_part(r, u, 0);

if (u->raw_downstream || u->body_downstream) {
u->peer.connection = NULL;
Expand Down
38 changes: 37 additions & 1 deletion t/058-tcp-socket.t
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use Test::Nginx::Socket::Lua::Stream;

repeat_each(2);

plan tests => repeat_each() * 179;
plan tests => repeat_each() * 183;

our $HtmlDir = html_dir;

Expand Down Expand Up @@ -3035,3 +3035,39 @@ lua stream cleanup reuse
total_send_bytes: 114
--- no_error_log
[error]



=== TEST 56: setkeepalive on socket already shutdown
--- stream_server_config
lua_socket_connect_timeout 1s;
resolver $TEST_NGINX_RESOLVER ipv6=off;
resolver_timeout 3s;

content_by_lua_block {
local sock = ngx.socket.tcp()
local ok, err = sock:connect("openresty.org", 443)
if not ok then
ngx.say("failed to connect: ", err)
return
end

ngx.say("connected: ", ok)

local ok, err = sock:shutdown('send')
if not ok then
ngx.log(ngx.ERR, 'failed to shutdown socket: ', err)
return
end

local ok, err = sock:setkeepalive()
if not ok then
ngx.log(ngx.ERR, "failed to setkeepalive: ", err)
end
}

--- stream_response
connected: 1
--- error_log
stream lua shutdown socket write direction
failed to setkeepalive: closed
4 changes: 2 additions & 2 deletions t/062-count.t
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ n = 10
assert(ngx.say("n = ", n))
}
--- stream_response
n = 6
n = 7
--- no_error_log
[error]

Expand Down Expand Up @@ -276,6 +276,6 @@ n = 6
end
}
--- stream_response
n = 6
n = 7
--- no_error_log
[error]
Loading

0 comments on commit ff64e06

Please sign in to comment.