Skip to content

Commit

Permalink
Add (nonfunctional) worker thread per stream.
Browse files Browse the repository at this point in the history
The worker threads don't actually do anything, but with this change
we can create and destroy threads roughly right.

Towards IPERF-124.
  • Loading branch information
bmah888 committed Oct 31, 2023
1 parent ca7c875 commit 448ba70
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/iperf.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
#include <openssl/evp.h>
#endif // HAVE_SSL

#include <pthread.h>

#if !defined(__IPERF_API_H)
typedef uint64_t iperf_size_t;
#endif // __IPERF_API_H
Expand Down Expand Up @@ -175,6 +177,8 @@ struct iperf_stream
{
struct iperf_test* test;

pthread_t thr;

/* configurable members */
int local_port;
int remote_port;
Expand Down
81 changes: 81 additions & 0 deletions src/iperf_client_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@
#endif /* TCP_CA_NAME_MAX */
#endif /* HAVE_TCP_CONGESTION */

void *
iperf_client_worker_start(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
struct iperf_test *test = sp->test;

while (1) {
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d\n", sp->socket);
}
sleep(1);
}
}

int
iperf_create_streams(struct iperf_test *test, int sender)
{
Expand Down Expand Up @@ -620,6 +633,23 @@ iperf_run_client(struct iperf_test * test)
if (startup) {
startup = 0;

/* Create and spin up threads */
pthread_attr_t attr;
pthread_attr_init(&attr);

SLIST_FOREACH(sp, &test->streams, streams) {
if (pthread_create(&(sp->thr), &attr, &iperf_client_worker_start, sp) != 0) {
perror("pthread_create");
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d created\n", sp->socket);
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "All threads created\n");
}
pthread_attr_destroy(&attr);

// Set non-blocking for non-UDP tests
if (test->protocol->id != Pudp) {
SLIST_FOREACH(sp, &test->streams, streams) {
Expand Down Expand Up @@ -665,6 +695,22 @@ iperf_run_client(struct iperf_test * test)
(test->settings->blocks != 0 && (test->blocks_sent >= test->settings->blocks ||
test->blocks_received >= test->settings->blocks)))) {

/* Cancel sender threads */
SLIST_FOREACH(sp, &test->streams, streams) {
if (sp->sender) {
if (pthread_cancel(sp->thr) != 0) {
perror("pthread_cancel");
}
sp->thr = 0;
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d cancelled\n", sp->socket);
}
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Sender threads cancelled\n");
}

// Unset non-blocking for non-UDP tests
if (test->protocol->id != Pudp) {
SLIST_FOREACH(sp, &test->streams, streams) {
Expand All @@ -691,6 +737,25 @@ iperf_run_client(struct iperf_test * test)
}
}

/* Cancel receiver threads */
SLIST_FOREACH(sp, &test->streams, streams) {
if (!sp->sender) {
if (pthread_cancel(sp->thr) != 0) {
perror("pthread_cancel");
}
if (pthread_join(sp->thr, NULL) != 0) {
perror("pthread_join");
}
sp->thr = 0;
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d cancelled\n", sp->socket);
}
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Receiver threads cancelled\n");
}

if (test->json_output) {
if (iperf_json_finish(test) < 0)
return -1;
Expand All @@ -704,6 +769,22 @@ iperf_run_client(struct iperf_test * test)
return 0;

cleanup_and_fail:
/* Cancel all threads */
SLIST_FOREACH(sp, &test->streams, streams) {
if (pthread_cancel(sp->thr) != 0) {
perror("pthread_cancel");
}
if (pthread_join(sp->thr, NULL) != 0) {
perror("pthread_join");
}
if (test->debug >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d cancelled\n", sp->socket);
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "All threads cancelled\n");
}

iperf_client_end(test);
if (test->json_output) {
cJSON_AddStringToObject(test->json_top, "error", iperf_strerror(i_errno));
Expand Down
46 changes: 46 additions & 0 deletions src/iperf_server_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@
#endif /* TCP_CA_NAME_MAX */
#endif /* HAVE_TCP_CONGESTION */

void *
iperf_server_worker_start(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
struct iperf_test *test = sp->test;

while (1) {
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d\n", sp->socket);
}
sleep(1);
}
}

int
iperf_server_listen(struct iperf_test *test)
{
Expand Down Expand Up @@ -388,6 +401,22 @@ cleanup_server(struct iperf_test *test)
{
struct iperf_stream *sp;

/* Cancel threads */
SLIST_FOREACH(sp, &test->streams, streams) {
if (pthread_cancel(sp->thr) != 0) {
perror("pthread_cancel");
}
if (pthread_join(sp->thr, NULL) != 0) {
perror("pthread_join");
}
if (test->debug >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d cancelled\n", sp->socket);
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "All threads cancelled\n");
}

/* Close open streams */
SLIST_FOREACH(sp, &test->streams, streams) {
if (sp->socket > -1) {
Expand Down Expand Up @@ -803,6 +832,23 @@ iperf_run_server(struct iperf_test *test)
cleanup_server(test);
return -1;
}

/* Create and spin up threads */
pthread_attr_t attr;
pthread_attr_init(&attr);

SLIST_FOREACH(sp, &test->streams, streams) {
if (pthread_create(&(sp->thr), &attr, &iperf_server_worker_start, sp) != 0) {
perror("pthread_create");
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d created\n", sp->socket);
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "All threads created\n");
}
pthread_attr_destroy(&attr);
}
}

Expand Down

0 comments on commit 448ba70

Please sign in to comment.