Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions TLS.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,18 @@ Connections
Connection abstraction API is mostly done and seems to hold well for hiding
implementation details between TLS and TCP.

1. Still need to implement the equivalent of AE_BARRIER. Because TLS
socket-level read/write events don't correspond to logical operations, this
should probably be done at the Read/Write handler level.

2. Multi-threading I/O is not supported. The main issue to address is the need
1. Multi-threading I/O is not supported. The main issue to address is the need
to manipulate AE based on OpenSSL return codes. We can either propagate this
out of the thread, or explore ways of further optimizing MT I/O by having
event loops that live inside the thread and borrow connections in/out.

3. Finish cleaning up the implementation. Make sure all error cases are handled
2. Finish cleaning up the implementation. Make sure all error cases are handled
and reflected into connection state, connection state validated before
certain operations, etc.
- Clean (non-errno) interface to report would-block.
- Consistent error reporting.

4. Sync IO for TLS is currently implemented in a hackish way, i.e. making the
3. Sync IO for TLS is currently implemented in a hackish way, i.e. making the
socket blocking and configuring socket-level timeout. This means the timeout
value may not be so accurate, and there would be a lot of syscall overhead.
However I believe that getting rid of syncio completely in favor of pure
Expand Down
4 changes: 4 additions & 0 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ void flushAppendOnlyFile(int force) {
* there is much to do about the whole server stopping for power problems
* or alike */

if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
usleep(server.aof_flush_sleep);
}

latencyStartMonitor(latency);
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
Expand Down
2 changes: 1 addition & 1 deletion src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -2276,7 +2276,7 @@ void clusterReadHandler(connection *conn) {
* from event handlers that will do stuff with the same link later. */
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
if (sdslen(link->sndbuf) == 0 && msglen != 0)
connSetWriteHandler(link->conn, clusterWriteHandler); /* TODO: Handle AE_BARRIER in conns */
connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);

link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);

Expand Down
32 changes: 29 additions & 3 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,14 @@ static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_hand
/* Register a write handler, to be called when the connection is writable.
* If NULL, the existing handler is removed.
*/
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
if (func == conn->write_handler) return C_OK;

conn->write_handler = func;
if (barrier)
conn->flags |= CONN_FLAG_WRITE_BARRIER;
else
conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
if (!conn->write_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
else
Expand Down Expand Up @@ -247,13 +251,35 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD
conn->conn_handler = NULL;
}

/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if WRITE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;

int call_write = (mask & AE_WRITABLE) && conn->write_handler;
int call_read = (mask & AE_READABLE) && conn->read_handler;

/* Handle normal I/O flows */
if ((mask & AE_READABLE) && conn->read_handler) {
if (!invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
if ((mask & AE_WRITABLE) && conn->write_handler) {
/* Fire the writable event. */
if (call_write) {
if (!callHandler(conn, conn->write_handler)) return;
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
}

static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) {
Expand Down
14 changes: 12 additions & 2 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ typedef enum {

#define CONN_FLAG_IN_HANDLER (1<<0) /* A handler execution is in progress */
#define CONN_FLAG_CLOSE_SCHEDULED (1<<1) /* Closed scheduled by a handler */
#define CONN_FLAG_WRITE_BARRIER (1<<2) /* Write barrier requested */

typedef void (*ConnectionCallbackFunc)(struct connection *conn);

Expand All @@ -57,7 +58,7 @@ typedef struct ConnectionType {
int (*read)(struct connection *conn, void *buf, size_t buf_len);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
Expand Down Expand Up @@ -144,7 +145,7 @@ static inline int connRead(connection *conn, void *buf, size_t buf_len) {
* If NULL, the existing handler is removed.
*/
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_write_handler(conn, func);
return conn->type->set_write_handler(conn, func, 0);
}

/* Register a read handler, to be called when the connection is readable.
Expand All @@ -154,6 +155,15 @@ static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc fu
return conn->type->set_read_handler(conn, func);
}

/* Set a write handler, and possibly enable a write barrier, this flag is
* cleared when write handler is changed or removed.
* With barroer enabled, we never fire the event if the read handler already
* fired in the same event loop iteration. Useful when you want to persist
* things to disk before sending replies, and want to do that in a group fashion. */
static inline int connSetWriteHandlerWithBarrier(connection *conn, ConnectionCallbackFunc func, int barrier) {
return conn->type->set_write_handler(conn, func, barrier);
}

static inline void connClose(connection *conn) {
conn->type->close(conn);
}
Expand Down
6 changes: 6 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ void debugCommand(client *c) {
"SDSLEN <key> -- Show low level SDS string info representing key and value.",
"SEGFAULT -- Crash the server with sigsegv.",
"SET-ACTIVE-EXPIRE <0|1> -- Setting it to 0 disables expiring keys in background when they are not accessed (otherwise the Redis behavior). Setting it to 1 reenables back the default.",
"AOF-FLUSH-SLEEP <microsec> -- Server will sleep before flushing the AOF, this is used for testing",
"SLEEP <seconds> -- Stop the server for <seconds>. Decimals allowed.",
"STRUCTSIZE -- Return the size of different Redis core C structures.",
"ZIPLIST <key> -- Show low level info about the ziplist encoding.",
Expand Down Expand Up @@ -595,6 +596,11 @@ NULL
{
server.active_expire_enabled = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"aof-flush-sleep") &&
c->argc == 3)
{
server.aof_flush_sleep = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"lua-always-replicate-commands") &&
c->argc == 3)
{
Expand Down
11 changes: 5 additions & 6 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip) {
serverLog(LL_WARNING,
"Error accepting a client connection: %s (conn: %s)",
connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn);
freeClient(connGetPrivateData(conn));
return;
}
}
Expand Down Expand Up @@ -1305,19 +1305,18 @@ int handleClientsWithPendingWrites(void) {
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
int ae_barrier = 0;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
* actual fsync of AOF to disk. the write barrier ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_flags |= AE_BARRIER;
ae_barrier = 1;
}
/* TODO: Handle write barriers in connection (also see tlsProcessPendingData) */
if (connSetWriteHandler(c->conn, sendReplyToClient) == C_ERR) {
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
freeClientAsync(c);
}
}
Expand Down
68 changes: 32 additions & 36 deletions src/rio.c
Original file line number Diff line number Diff line change
Expand Up @@ -288,50 +288,46 @@ static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
unsigned char *p = (unsigned char*) buf;
int doflush = (buf == NULL && len == 0);

/* To start we always append to our buffer. If it gets larger than
* a given size, we actually write to the sockets. */
if (len) {
r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len);
len = 0; /* Prevent entering the while below if we don't flush. */
if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN) doflush = 1;
}

if (doflush) {
/* For small writes, we rather keep the data in user-space buffer, and flush
* it only when it grows. however for larger writes, we prefer to flush
* any pre-existing buffer, and write the new one directly without reallocs
* and memory copying. */
if (len > PROTO_IOBUF_LEN) {
/* First, flush any pre-existing buffered data. */
if (sdslen(r->io.fd.buf)) {
if (rioFdWrite(r, NULL, 0) == 0)
return 0;
}
/* Write the new data, keeping 'p' and 'len' from the input. */
} else {
if (len) {
r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len);
if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN)
doflush = 1;
if (!doflush)
return 1;
}
/* Flusing the buffered data. set 'p' and 'len' accordintly. */
p = (unsigned char*) r->io.fd.buf;
len = sdslen(r->io.fd.buf);
}

/* Write in little chunchs so that when there are big writes we
* parallelize while the kernel is sending data in background to
* the TCP socket. */
while(len) {
size_t count = len < 1024 ? len : 1024;

/* Make sure to write 'count' bytes to the socket regardless
* of short writes. */
size_t nwritten = 0;
while(nwritten != count) {
retval = write(r->io.fd.fd,p+nwritten,count-nwritten);
if (retval <= 0) {
/* With blocking sockets, which is the sole user of this
* rio target, EWOULDBLOCK is returned only because of
* the SO_SNDTIMEO socket option, so we translate the error
* into one more recognizable by the user. */
if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
break;
}
nwritten += retval;
}

if (nwritten != count) {
size_t nwritten = 0;
while(nwritten != len) {
retval = write(r->io.fd.fd,p+nwritten,len-nwritten);
if (retval <= 0) {
/* With blocking io, which is the sole user of this
* rio target, EWOULDBLOCK is returned only because of
* the SO_SNDTIMEO socket option, so we translate the error
* into one more recognizable by the user. */
if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
return 0; /* error. */
}
p += count;
len -= count;
r->io.fd.pos += count;
nwritten += retval;
}

if (doflush) sdsclear(r->io.fd.buf);
r->io.fd.pos += len;
sdsclear(r->io.fd.buf);
return 1;
}

Expand Down
11 changes: 6 additions & 5 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2046,6 +2046,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);

/* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
tlsProcessPendingData();
/* If tls still has pending unread data don't sleep at all. */
aeDontWait(server.el, tlsHasPendingData());

/* Call the Redis Cluster before sleep function. Note that this function
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
Expand Down Expand Up @@ -2091,11 +2096,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();

/* TODO: How do i handle write barriers flag */
tlsProcessPendingData();
/* If tls already has pending unread data don't sleep at all. */
aeDontWait(server.el, tlsHasPendingData());

/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();

Expand Down Expand Up @@ -2274,6 +2274,7 @@ void initServerConfig(void) {
server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE;
server.aof_rewrite_base_size = 0;
server.aof_rewrite_scheduled = 0;
server.aof_flush_sleep = 0;
server.aof_last_fsync = time(NULL);
server.aof_rewrite_time_last = -1;
server.aof_rewrite_time_start = -1;
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,7 @@ struct redisServer {
off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
off_t aof_current_size; /* AOF current size. */
off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */
int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
pid_t aof_child_pid; /* PID if rewriting process */
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
Expand Down
47 changes: 39 additions & 8 deletions src/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -349,18 +349,47 @@ static void tlsHandleEvent(tls_connection *conn, int mask) {
conn->c.conn_handler = NULL;
break;
case CONN_STATE_CONNECTED:
if ((mask & AE_READABLE) && (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ)) {
{
int call_read = ((mask & AE_READABLE) && conn->c.read_handler) ||
((mask & AE_WRITABLE) && (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE));
int call_write = ((mask & AE_WRITABLE) && conn->c.write_handler) ||
((mask & AE_READABLE) && (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ));

/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if WRITE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
int invert = conn->c.flags & CONN_FLAG_WRITE_BARRIER;

if (!invert && call_read) {
conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE;
if (!callHandler((connection *) conn, conn->c.read_handler)) return;
}

/* Fire the writable event. */
if (call_write) {
conn->flags &= ~TLS_CONN_FLAG_WRITE_WANT_READ;
if (!callHandler((connection *) conn, conn->c.write_handler)) return;
}

if ((mask & AE_WRITABLE) && (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE)) {
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && call_read) {
conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE;
if (!callHandler((connection *) conn, conn->c.read_handler)) return;
}

if ((mask & AE_READABLE) && conn->c.read_handler) {
if (!callHandler((connection *) conn, conn->c.read_handler)) return;
/* If SSL has pending that, already read from the socket, we're at
* risk of not calling the read handler again, make sure to add it
* to a list of pending connection that should be handled anyway. */
if ((mask & AE_READABLE)) {
if (SSL_has_pending(conn->ssl)) {
if (!conn->pending_list_node) {
listAddNodeTail(pending_list, conn);
Expand All @@ -372,10 +401,8 @@ static void tlsHandleEvent(tls_connection *conn, int mask) {
}
}

if ((mask & AE_WRITABLE) && conn->c.write_handler) {
if (!callHandler((connection *) conn, conn->c.write_handler)) return;
}
break;
}
default:
break;
}
Expand Down Expand Up @@ -523,8 +550,12 @@ static const char *connTLSGetLastError(connection *conn_) {
return NULL;
}

int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
conn->write_handler = func;
if (barrier)
conn->flags |= CONN_FLAG_WRITE_BARRIER;
else
conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
updateSSLEvent((tls_connection *) conn);
return C_OK;
}
Expand Down
Loading