Skip to content

Remove connection from application connection list before tcp_destroy. #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ INCDIR ?= $(PREFIX)/include

RTE_SDK ?= /usr/

CFLAGS += -std=gnu99 -O3 -g -Wall -Werror -I. -Iinclude/ -march=native -fno-omit-frame-pointer
CFLAGS += -std=gnu99 -O3 -g -Wall -Wno-address-of-packed-member -Werror -I. -Iinclude/ -march=native -fno-omit-frame-pointer
LDFLAGS += -pthread -g

RTE_SDK ?= $(HOME)/dpdk/x86_64-native-linuxapp-gcc
DPDK_PMDS ?= ixgbe i40e tap virtio

CFLAGS+= -I$(RTE_SDK)/include -I$(RTE_SDK)/include/dpdk
CFLAGS+= -I$(RTE_SDK)/include/x86_64-linux-gnu/dpdk/
CFLAGS+= -DCONNECTION_STATS -DQUEUE_STATS -DPROFILING
LDFLAGS+= -L$(RTE_SDK)/lib/

LIBS_DPDK= -Wl,--whole-archive
Expand Down
14 changes: 8 additions & 6 deletions include/kernel_appif.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ struct kernel_appout_req_scale {

/** Common struct for events on kernel -> app queue */
struct kernel_appout {
uint64_t ts;
union {
struct kernel_appout_conn_open conn_open;
struct kernel_appout_conn_close conn_close;
Expand All @@ -140,7 +141,7 @@ struct kernel_appout {

struct kernel_appout_req_scale req_scale;

uint8_t raw[63];
uint8_t raw[64 - sizeof(uint64_t) - sizeof(uint8_t)];
} __attribute__((packed)) data;
uint8_t type;
} __attribute__((packed));
Expand Down Expand Up @@ -172,8 +173,8 @@ struct kernel_appin_status {
/** New connection opened */
struct kernel_appin_conn_opened {
uint64_t opaque;
uint64_t rx_off;
uint64_t tx_off;
uint32_t rx_off;
uint32_t tx_off;
uint32_t rx_len;
uint32_t tx_len;
int32_t status;
Expand All @@ -195,8 +196,8 @@ struct kernel_appin_listen_newconn {
/** Accepted connection on listener */
struct kernel_appin_accept_conn {
uint64_t opaque;
uint64_t rx_off;
uint64_t tx_off;
uint32_t rx_off;
uint32_t tx_off;
uint32_t rx_len;
uint32_t tx_len;
int32_t status;
Expand All @@ -211,12 +212,13 @@ struct kernel_appin_accept_conn {

/** Common struct for events on app -> kernel queue */
struct kernel_appin {
uint64_t ts;
union {
struct kernel_appin_status status;
struct kernel_appin_conn_opened conn_opened;
struct kernel_appin_listen_newconn listen_newconn;
struct kernel_appin_accept_conn accept_connection;
uint8_t raw[63];
uint8_t raw[64 - sizeof(uint64_t) - sizeof(uint8_t)];
} __attribute__((packed)) data;
uint8_t type;
} __attribute__((packed));
Expand Down
6 changes: 4 additions & 2 deletions include/tas_memif.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ struct flexnic_info {
/** Kernel RX queue entry */
struct flextcp_pl_krx {
uint64_t addr;
uint64_t ts;
union {
struct {
uint16_t len;
uint16_t fn_core;
uint16_t flow_group;
} packet;
uint8_t raw[55];
uint8_t raw[64 - sizeof(uint8_t) - sizeof(uint64_t) - sizeof(uint64_t)];
} __attribute__((packed)) msg;
volatile uint8_t type;
} __attribute__((packed));
Expand All @@ -106,8 +107,9 @@ struct flextcp_pl_ktx {
struct {
uint32_t flow_id;
} connretran;
uint8_t raw[63];
uint8_t raw[64 - sizeof(uint8_t) - sizeof(uint64_t)];
} __attribute__((packed)) msg;
uint64_t ts;
volatile uint8_t type;
} __attribute__((packed));

Expand Down
95 changes: 95 additions & 0 deletions include/utils_log.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#ifndef UTILS_LOG_H_
#define UTILS_LOG_H_

#include <rte_log.h>
#include <rte_cycles.h>

#include <stdio.h>
#include <stdarg.h>

/*
* DPDK supports 8 user-defined log types.
* TAS modules are mapped into user-defined types for:
* (a) Helps identify the module from the log
* (b) Module-wise filtering of logs may be implemented in the future
*/

#define _LOG_USER_START RTE_LOGTYPE_USER1
#define _LOG_USER_END RTE_LOGTYPE_USER8

enum
{
LOG_MAIN,
LOG_FAST_TX,
LOG_FAST_RX,
LOG_FAST_QMAN,
LOG_FAST_QMAN_FWD,
LOG_FAST_APPIF,
LOG_SLOW,
// Add more if required !
LOG_END
};

/* Useful macros */
#define BUILD_BUG_ON(condition) ((void)sizeof(char[1 - 2*!!(condition)]))

#define TIMESTAMP_FMTSTR "[%4.4u:%2.2u:%2.2u.%6.6u]"
#define FILE_LINENUM_FMTSTR "[%15.15s:%4.4d]"
#define PAD_FMTSTR " "
#define __FILENAME__ (__builtin_strrchr(__FILE__, '/') ? __builtin_strrchr(__FILE__, '/') + 1 : __FILE__)

static inline const char*
gettimestamp()
{
static __thread char buf[20];

// Prepare the timestamp
uint64_t cyc = rte_get_tsc_cycles();
static __thread uint64_t freq = 0;

if (freq == 0)
freq = rte_get_tsc_hz();

uint64_t elapsed_time_secs = cyc / freq;
cyc = cyc % freq; // for sub-second values
unsigned int hrs, mins, secs, microsecs;
hrs = (unsigned) (elapsed_time_secs / (60 * 60));
elapsed_time_secs -= (hrs * 60 * 60);
mins = (unsigned) (elapsed_time_secs / 60);
elapsed_time_secs -= (mins * 60);
secs = (unsigned) elapsed_time_secs;
microsecs = (unsigned) ((cyc * 1000000ULL) / freq);

snprintf(buf, sizeof(buf), TIMESTAMP_FMTSTR, hrs, mins, secs, microsecs);

return buf;
}

static inline int
tas_log(uint32_t level,
uint32_t logtype,
const char* format, ...)
{
BUILD_BUG_ON((LOG_END + _LOG_USER_START) > _LOG_USER_END);

va_list ap;
int ret;

va_start(ap, format);
ret = rte_vlog(level, logtype, format, ap);
va_end(ap);

return ret;
}

#define TAS_LOG(l, t, fmtstr, ...) \
(void)((RTE_LOG_ ## l <= RTE_LOG_DP_LEVEL) ? \
tas_log(RTE_LOG_ ## l, \
_LOG_USER_START + LOG_ ## t, \
"%s" FILE_LINENUM_FMTSTR PAD_FMTSTR # t ": " fmtstr, \
gettimestamp(), \
__FILENAME__, __LINE__, \
__VA_ARGS__) : \
0)

#endif /* UTILS_LOG_H_ */
42 changes: 42 additions & 0 deletions lib/sockets/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
#include "internal.h"
#include "../tas/internal.h"

#ifdef QUEUE_STATS
extern void appqueue_stats_dump();
#endif

static inline void ev_listen_open(struct flextcp_context *ctx,
struct flextcp_event *ev);
static inline void ev_listen_newconn(struct flextcp_context *ctx,
Expand Down Expand Up @@ -145,6 +149,27 @@ int flextcp_sockctx_poll(struct flextcp_context *ctx)
}
}

#ifdef CONNECTION_STATS
/* Report the stats roughly every 1s on a 2GHz processor*/
uint64_t now = util_rdtsc();
if (now - ctx->stats_last_ts > 2000000000ull)
{
fprintf(stderr, "[STATS] connect() cycles=%lu count=%lu\n",
ctx->connect_cycles, ctx->connect_count);
fprintf(stderr, "[STATS] listen() cycles=%lu count=%lu\n",
ctx->listen_cycles, ctx->listen_count);
fprintf(stderr, "[STATS] accept() cycles=%lu count=%lu\n",
ctx->accept_cycles, ctx->accept_count);
fprintf(stderr, "[STATS] close() cycles=%lu count=%lu\n",
ctx->close_cycles, ctx->close_count);
ctx->stats_last_ts = now;

#ifdef QUEUE_STATS
appqueue_stats_dump();
#endif
}
#endif

return num;
}

Expand Down Expand Up @@ -174,6 +199,10 @@ static inline void ev_listen_open(struct flextcp_context *ctx,
assert(s->data.listener.status == SOL_OPENING);
if (ev->ev.listen_open.status == 0) {
s->data.listener.status = SOL_OPEN;
#ifdef CONNECTION_STATS
ctx->listen_cycles += util_rdtsc() - s->ts;
ctx->listen_count += 1;
#endif
} else {
s->data.listener.status = SOL_FAILED;
}
Expand Down Expand Up @@ -214,6 +243,10 @@ static inline void ev_listen_accept(struct flextcp_context *ctx,
if (ev->ev.listen_accept.status == 0) {
s->data.connection.status = SOC_CONNECTED;
flextcp_epoll_set(s, EPOLLOUT);
#ifdef CONNECTION_STATS
ctx->accept_cycles += (util_rdtsc() - s->ts);
ctx->accept_count += 1;
#endif
} else {
s->data.connection.status = SOC_FAILED;
flextcp_epoll_set(s, EPOLLERR);
Expand All @@ -237,6 +270,10 @@ static inline void ev_conn_open(struct flextcp_context *ctx,
if (ev->ev.conn_open.status == 0) {
s->data.connection.status = SOC_CONNECTED;
flextcp_epoll_set(s, EPOLLOUT);
#ifdef CONNECTION_STATS
ctx->connect_cycles += util_rdtsc() - s->ts;
ctx->connect_count += 1;
#endif
} else {
s->data.connection.status = SOC_FAILED;
flextcp_epoll_set(s, EPOLLERR);
Expand Down Expand Up @@ -427,5 +464,10 @@ static inline void ev_conn_closed(struct flextcp_context *ctx,
assert(s->type == SOCK_CONNECTION);
assert(s->data.connection.status == SOC_CLOSED);

#ifdef CONNECTION_STATS
ctx->close_cycles += (util_rdtsc() - s->ts);
ctx->close_count += 1;
#endif

free(s);
}
3 changes: 3 additions & 0 deletions lib/sockets/control.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ int tas_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen)

/* open flextcp connection */
ctx = flextcp_sockctx_get();
s->ts = util_rdtsc();
if (flextcp_connection_open(ctx, &s->data.connection.c,
ntohl(sin->sin_addr.s_addr), ntohs(sin->sin_port)))
{
Expand Down Expand Up @@ -350,6 +351,7 @@ int tas_listen(int sockfd, int backlog)

/* open flextcp listener */
ctx = flextcp_sockctx_get();
s->ts = util_rdtsc();
if (flextcp_listen_open(ctx, &s->data.listener.l, ntohs(s->addr.sin_port),
backlog, flags))
{
Expand Down Expand Up @@ -452,6 +454,7 @@ int tas_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen,
sp->next = NULL;

/* send accept request to kernel */
ns->ts = util_rdtsc();
if (flextcp_listen_accept(ctx, &s->data.listener.l,
&ns->data.connection.c) != 0)
{
Expand Down
1 change: 1 addition & 0 deletions lib/sockets/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ struct socket {
struct socket_conn connection;
struct socket_listen listener;
} data;
uint64_t ts;
struct sockaddr_in addr;
uint8_t flags;
uint8_t type;
Expand Down
5 changes: 5 additions & 0 deletions lib/tas/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ int flextcp_listen_open(struct flextcp_context *ctx,
kin->data.listen_open.flags = f;
MEM_BARRIER();
kin->type = KERNEL_APPOUT_LISTEN_OPEN;
kin->ts = util_rdtsc();
flextcp_kernel_kick();

pos = pos + 1;
Expand Down Expand Up @@ -109,6 +110,7 @@ int flextcp_listen_accept(struct flextcp_context *ctx,
kin->data.accept_conn.local_port = lst->local_port;
MEM_BARRIER();
kin->type = KERNEL_APPOUT_ACCEPT_CONN;
kin->ts = util_rdtsc();
flextcp_kernel_kick();

pos = pos + 1;
Expand Down Expand Up @@ -145,6 +147,7 @@ int flextcp_connection_open(struct flextcp_context *ctx,
kin->data.conn_open.flags = f;
MEM_BARRIER();
kin->type = KERNEL_APPOUT_CONN_OPEN;
kin->ts = util_rdtsc();
flextcp_kernel_kick();

pos = pos + 1;
Expand Down Expand Up @@ -209,6 +212,7 @@ int flextcp_connection_close(struct flextcp_context *ctx,
kin->data.conn_close.flags = f;
MEM_BARRIER();
kin->type = KERNEL_APPOUT_CONN_CLOSE;
kin->ts = util_rdtsc();
flextcp_kernel_kick();

pos = pos + 1;
Expand Down Expand Up @@ -415,6 +419,7 @@ int flextcp_connection_move(struct flextcp_context *ctx,
kin->data.conn_move.opaque = OPAQUE(conn);
MEM_BARRIER();
kin->type = KERNEL_APPOUT_CONN_MOVE;
kin->ts = util_rdtsc();
flextcp_kernel_kick();

pos = pos + 1;
Expand Down
15 changes: 15 additions & 0 deletions lib/tas/include/tas_ll.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@
* - notification queue pair to flexnic
*/
struct flextcp_context {
#ifdef CONNECTION_STATS
uint64_t connect_cycles;
uint64_t connect_count;

uint64_t listen_cycles;
uint64_t listen_count;

uint64_t accept_cycles;
uint64_t accept_count;

uint64_t close_cycles;
uint64_t close_count;

uint64_t stats_last_ts;
#endif
/* incoming queue from the kernel */
void *kin_base;
uint32_t kin_len;
Expand Down
Loading