diff --git a/src/iperf.h b/src/iperf.h index c3ce33383..e1841714e 100644 --- a/src/iperf.h +++ b/src/iperf.h @@ -73,6 +73,8 @@ #include #endif // HAVE_SSL +#include + #if !defined(__IPERF_API_H) typedef uint64_t iperf_size_t; #endif // __IPERF_API_H @@ -175,6 +177,8 @@ struct iperf_stream { struct iperf_test* test; + pthread_t thr; + /* configurable members */ int local_port; int remote_port; diff --git a/src/iperf_client_api.c b/src/iperf_client_api.c index 8971ef15d..7c927a360 100644 --- a/src/iperf_client_api.c +++ b/src/iperf_client_api.c @@ -51,6 +51,19 @@ #endif /* TCP_CA_NAME_MAX */ #endif /* HAVE_TCP_CONGESTION */ +void * +iperf_client_worker_start(void *s) { + struct iperf_stream *sp = (struct iperf_stream *) s; + struct iperf_test *test = sp->test; + + while (1) { + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "Thread FD %d\n", sp->socket); + } + sleep(1); + } +} + int iperf_create_streams(struct iperf_test *test, int sender) { @@ -620,6 +633,23 @@ iperf_run_client(struct iperf_test * test) if (startup) { startup = 0; + /* Create and spin up threads */ + pthread_attr_t attr; + pthread_attr_init(&attr); + + SLIST_FOREACH(sp, &test->streams, streams) { + if (pthread_create(&(sp->thr), &attr, &iperf_client_worker_start, sp) != 0) { + perror("pthread_create"); + } + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "Thread FD %d created\n", sp->socket); + } + } + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "All threads created\n"); + } + pthread_attr_destroy(&attr); + // Set non-blocking for non-UDP tests if (test->protocol->id != Pudp) { SLIST_FOREACH(sp, &test->streams, streams) { @@ -665,6 +695,22 @@ iperf_run_client(struct iperf_test * test) (test->settings->blocks != 0 && (test->blocks_sent >= test->settings->blocks || test->blocks_received >= test->settings->blocks)))) { + /* Cancel sender threads */ + SLIST_FOREACH(sp, &test->streams, streams) { + if (sp->sender) { + if (pthread_cancel(sp->thr) != 0) { + perror("pthread_cancel"); + } + sp->thr = 0; + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "Thread FD %d cancelled\n", sp->socket); + } + } + } + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "Sender threads cancelled\n"); + } + // Unset non-blocking for non-UDP tests if (test->protocol->id != Pudp) { SLIST_FOREACH(sp, &test->streams, streams) { @@ -691,6 +737,25 @@ iperf_run_client(struct iperf_test * test) } } + /* Cancel receiver threads */ + SLIST_FOREACH(sp, &test->streams, streams) { + if (!sp->sender) { + if (pthread_cancel(sp->thr) != 0) { + perror("pthread_cancel"); + } + if (pthread_join(sp->thr, NULL) != 0) { + perror("pthread_join"); + } + sp->thr = 0; + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "Thread FD %d cancelled\n", sp->socket); + } + } + } + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "Receiver threads cancelled\n"); + } + if (test->json_output) { if (iperf_json_finish(test) < 0) return -1; @@ -704,6 +769,22 @@ iperf_run_client(struct iperf_test * test) return 0; cleanup_and_fail: + /* Cancel all threads */ + SLIST_FOREACH(sp, &test->streams, streams) { + if (pthread_cancel(sp->thr) != 0) { + perror("pthread_cancel"); + } + if (pthread_join(sp->thr, NULL) != 0) { + perror("pthread_join"); + } + if (test->debug >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "Thread FD %d cancelled\n", sp->socket); + } + } + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "All threads cancelled\n"); + } + iperf_client_end(test); if (test->json_output) { cJSON_AddStringToObject(test->json_top, "error", iperf_strerror(i_errno)); diff --git a/src/iperf_server_api.c b/src/iperf_server_api.c index ae916f586..8ac3fd9e5 100644 --- a/src/iperf_server_api.c +++ b/src/iperf_server_api.c @@ -66,6 +66,19 @@ #endif /* TCP_CA_NAME_MAX */ #endif /* HAVE_TCP_CONGESTION */ +void * +iperf_server_worker_start(void *s) { + struct iperf_stream *sp = (struct iperf_stream *) s; + struct iperf_test *test = sp->test; + + while (1) { + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "Thread FD %d\n", sp->socket); + } + sleep(1); + } +} + int iperf_server_listen(struct iperf_test *test) { @@ -388,6 +401,22 @@ cleanup_server(struct iperf_test *test) { struct iperf_stream *sp; + /* Cancel threads */ + SLIST_FOREACH(sp, &test->streams, streams) { + if (pthread_cancel(sp->thr) != 0) { + perror("pthread_cancel"); + } + if (pthread_join(sp->thr, NULL) != 0) { + perror("pthread_join"); + } + if (test->debug >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "Thread FD %d cancelled\n", sp->socket); + } + } + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "All threads cancelled\n"); + } + /* Close open streams */ SLIST_FOREACH(sp, &test->streams, streams) { if (sp->socket > -1) { @@ -803,6 +832,23 @@ iperf_run_server(struct iperf_test *test) cleanup_server(test); return -1; } + + /* Create and spin up threads */ + pthread_attr_t attr; + pthread_attr_init(&attr); + + SLIST_FOREACH(sp, &test->streams, streams) { + if (pthread_create(&(sp->thr), &attr, &iperf_server_worker_start, sp) != 0) { + perror("pthread_create"); + } + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "Thread FD %d created\n", sp->socket); + } + } + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "All threads created\n"); + } + pthread_attr_destroy(&attr); } }