Skip to content

Commit

Permalink
(a12) initial directory-tunnel working
Browse files Browse the repository at this point in the history
With this patch the end for tunnel juggling is in sight. This lands
the full path from a client requesting that a source be opened in
tunnel-mode. The server main process splits out a socket pair and sends
to the workers. The workers adds that to the i/o multiplexing and
reserves channel-1 for direct forwarding by piggybacking on bstream-
transfers.

The sink end spawns off a thread that latches into a tunnel pair and
feeds that into a socket that authenticates and maps to a regular
a12-cl-to-shmif connection, with corresponding feed into the tunnel
side of that a12 state machine.

The source end double-forks off (* this does perform some of the illegal
from-fork actions) and performs a similar dance, though no added
threading.

A12_IDENT=test ARCAN_CONNPATH=a12://mydirsrv afsrv_terminal
arcan-net --tunnel mydirsrv "*test"

should be roughly what is needed assuming an up to date and compliant
directory server.
  • Loading branch information
letoram committed Oct 22, 2023
1 parent 923acbe commit 792626a
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 54 deletions.
55 changes: 47 additions & 8 deletions src/a12/a12.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ static void send_hello_packet(struct a12_state* S,
* reordering would then still need to account for rekeying.
*/
void a12int_append_out(struct a12_state* S, uint8_t type,
uint8_t* out, size_t out_sz, uint8_t* prepend, size_t prepend_sz)
const uint8_t* const out, size_t out_sz, uint8_t* prepend, size_t prepend_sz)
{
if (S->state == STATE_BROKEN)
return;
Expand Down Expand Up @@ -500,6 +500,10 @@ static struct a12_state* a12_setup(struct a12_context_options* opt, bool srv)
.server = srv
};

for (size_t i = 0; i <= 255; i++){
res->channels[i].unpack_state.bframe.tmp_fd = -1;
}

size_t len = 0;
res->opts = DYNAMIC_MALLOC(sizeof(struct a12_context_options));
if (!res->opts){
Expand Down Expand Up @@ -535,6 +539,7 @@ static struct a12_state* a12_setup(struct a12_context_options* opt, bool srv)

res->cookie = 0xfeedface;
res->out_stream = 1;
res->notify_dynamic = true;

return res;
}
Expand Down Expand Up @@ -2184,7 +2189,7 @@ static void process_blob(struct a12_state* S)
}

struct arcan_shmif_cont* cont = S->channels[S->in_channel].cont;
if (!cont && !S->binary_handler){
if (!cont && !S->binary_handler && !cbf->tunnel){
a12int_trace(A12_TRACE_SYSTEM, "kind=error:status=EINVAL:"
"ch=%d:message=no segment or bhandler mapped", S->in_channel);
reset_state(S);
Expand Down Expand Up @@ -2288,7 +2293,7 @@ static void process_blob(struct a12_state* S)
if (free_buf)
DYNAMIC_FREE(buf);

if (!S->binary_handler)
if (!S->binary_handler || cbf->tunnel)
return;

/* is it a streaming transfer or a known size? */
Expand Down Expand Up @@ -3448,18 +3453,52 @@ void a12_supply_dynamic_resource(struct a12_state* S, struct a12_dynreq r)
}

bool
a12_write_tunnel(
struct a12_state* S, uint8_t chid, const char* const buf, size_t buf_sz)
a12_write_tunnel(struct a12_state* S,
uint8_t chid, const uint8_t* const buf, size_t buf_sz)
{
return false;
if (!buf_sz)
return false;

if (!S->channels[chid].active){
a12int_trace(A12_TRACE_BTRANSFER, "write_tunnel:bad_channel=%"PRIu8, chid);
return false;
}

/* tunnel packet is a simpler form of binary stream with no rampup,
* multiplexing, checksum, compression, cancellation, ... just straight into
* channel */
uint8_t outb[1 + 4 + 2] = {0};
outb[0] = chid;
pack_u16(buf_sz, &outb[5]);
a12int_append_out(S, STATE_BLOB_PACKET, buf, buf_sz, outb, sizeof(outb));

a12int_trace(
A12_TRACE_BTRANSFER, "write_tunnel:ch=%"PRIu8":nb=%zu", chid, buf_sz);
return true;
}

bool
a12_set_tunnel_sink(struct a12_state* S, uint8_t chid, int fd)
{
if (!S->channels[chid].active)
if (S->channels[chid].active){
a12int_trace(A12_TRACE_DIRECTORY, "swap_sink:chid=%"PRIu8, chid);
if (0 < S->channels[chid].unpack_state.bframe.tmp_fd)
close(S->channels[chid].unpack_state.bframe.tmp_fd);
return false;
}

S->channels[chid].unpack_state.bframe.tmp_fd = fd;
if (-1 == fd){
S->channels[chid].active = false;
S->channels[chid].unpack_state.bframe.tunnel = false;
S->channels[chid].unpack_state.bframe.tmp_fd = -1;
return true;
}

S->channels[chid].active = true;
S->channels[chid].unpack_state.bframe = (struct binary_frame){
.tmp_fd = fd,
.tunnel = true,
.active = true
};
return true;
}
2 changes: 1 addition & 1 deletion src/a12/a12.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ a12_enqueue_blob(
* request_dynamic_resource when there is no direct / usable network path.
* Returns false if the channel isn't mapped for that kind of use. */
bool
a12_write_tunnel(struct a12_state*, uint8_t chid, const char* const, size_t);
a12_write_tunnel(struct a12_state*, uint8_t chid, const uint8_t* const, size_t);

bool
a12_set_tunnel_sink(struct a12_state*, uint8_t chid, int fd);
Expand Down
4 changes: 3 additions & 1 deletion src/a12/a12_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ struct binary_frame {
int tmp_fd;
int type;
bool active;
bool tunnel;
uint64_t size;
uint32_t identifier;
uint8_t checksum[16];
Expand Down Expand Up @@ -354,7 +355,8 @@ void a12int_stream_fail(struct a12_state* S, uint8_t ch, uint32_t id, int fail);
void a12int_stream_ack(struct a12_state* S, uint8_t ch, uint32_t id);

void a12int_append_out(
struct a12_state* S, uint8_t type, uint8_t* out, size_t out_sz,
struct a12_state* S, uint8_t type,
const uint8_t* const out, size_t out_sz,
uint8_t* prepend, size_t prepend_sz);

void a12int_step_vstream(struct a12_state* S, uint32_t id);
Expand Down
58 changes: 57 additions & 1 deletion src/a12/net/dir_cl.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,48 @@ struct appl_runner_state {
int p_stdin;
};

struct tunnel_state {
struct a12_context_options opts;
struct a12_dynreq req;
int fd;
};

static void* tunnel_runner(void* t)
{
struct tunnel_state* ts = t;
char* err = NULL;
struct a12_state* S = a12_client(&ts->opts);

if (anet_authenticate(S, ts->fd, ts->fd, &err)){
a12helper_a12srv_shmifcl(NULL, S, NULL, ts->fd, ts->fd);
}
else {
}

shutdown(ts->fd, SHUT_RDWR);
close(ts->fd);
free(err);
free(ts);

return NULL;
}

static void detach_tunnel_runner(
int fd, struct a12_context_options* aopt, struct a12_dynreq* req)
{
struct tunnel_state* ts = malloc(sizeof(struct tunnel_state));
ts->opts = *aopt;
ts->req = *req;
ts->opts.pk_lookup_tag = &ts->req;
ts->fd = fd;

pthread_t pth;
pthread_attr_t pthattr;
pthread_attr_init(&pthattr);
pthread_attr_setdetachstate(&pthattr, PTHREAD_CREATE_DETACHED);
pthread_create(&pth, &pthattr, tunnel_runner, ts);
}

/*
* The processing here is a bit problematic. One is that we still use a socket
* pair rather than a shmif connection. If this connection is severed or the
Expand Down Expand Up @@ -82,6 +124,8 @@ static struct pk_response key_auth_fixed(uint8_t pk[static 32], void* tag)
*/
static void on_source(struct a12_state* S, struct a12_dynreq req, void* tag)
{
struct ioloop_shared* I = tag;

/* security:
* disable the ephemeral exchange for now, this means the announced identity
* when we connect to the directory server will be the one used for the x25519
Expand All @@ -96,6 +140,7 @@ static void on_source(struct a12_state* S, struct a12_dynreq req, void* tag)

char port[sizeof("65535")];
snprintf(port, sizeof(port), "%"PRIu16, req.port);
snprintf(a12opts.secret, sizeof(a12opts.secret), "%s", req.authk);

struct anet_options anet = {
.retry_count = 10,
Expand All @@ -104,7 +149,18 @@ static void on_source(struct a12_state* S, struct a12_dynreq req, void* tag)
.port = port
};

snprintf(a12opts.secret, sizeof(a12opts.secret), "%s", req.authk);
if (req.proto == 4){
int sv[2];
if (0 != socketpair(AF_UNIX, SOCK_STREAM, 0, sv)){
a12int_trace(A12_TRACE_DIRECTORY, "tunnel_socketpair_fail");
return;
}

a12_set_tunnel_sink(S, 1, sv[0]);
detach_tunnel_runner(sv[1], &a12opts, &req);
return;
}

struct anet_cl_connection con = anet_cl_setup(&anet);
if (con.errmsg || !con.state){
fprintf(stderr, "%s", con.errmsg ? con.errmsg : "broken connection state\n");
Expand Down
34 changes: 28 additions & 6 deletions src/a12/net/dir_srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,39 @@ static void dynopen_to_worker(struct dircl* C, struct arg_arr* entry)
.space = 5
}
};
memcpy(to_src.ext.netstate.name, C->pubk, 32);

/* here is the heuristic spot for setting up NAT hole punching, or allocating a
* tunnel or .. right now just naively forward IP:port, set pubk and secret if
* needed. This could ideally be arranged so that the ordering (listening
* first) delayed locally based on the delta of pings, but then we'd need that
* tunnel or .. */
arcan_event to_sink = cur->endpoint;

/* for now blindly accept tunneling if requested and permitted */
if (arg_lookup(entry, "tunnel", 0, NULL)){
if (!active_clients.opts->allow_tunnel)
goto send_fail;

int sv[2];
if (0 != socketpair(AF_UNIX, SOCK_STREAM, 0, sv))
goto send_fail;
arcan_event ts = {
.category = EVENT_TARGET,
.tgt.kind = TARGET_COMMAND_BCHUNK_IN,
.tgt.message = ".tun"
};
shmifsrv_enqueue_event(cur->C, &ts, sv[0]);
shmifsrv_enqueue_event(C->C, &ts, sv[1]);
close(sv[0]);
close(sv[1]);
}

memcpy(to_src.ext.netstate.name, C->pubk, 32);

/*
* This could ideally be arranged so that the ordering (listening first)
* delayed locally based on the delta of pings, but then we'd need that
* estimate from the state machine as well. It would at least reduce the
* chances of the outbound connection having to retry if it received the
* trigger first. The lazy option is to just delay the outbound connection in
* the dir_cl for the time being. */
arcan_event to_sink = cur->endpoint;

/* Another protocol nuance here is that we're supposed to set an authk secret
* for the outer ephemeral making it possible to match the connection to our
Expand Down Expand Up @@ -360,7 +382,7 @@ static void register_source(struct dircl* C, struct arcan_event ev)
{
if (!a12helper_keystore_accepted(C->pubk, active_clients.opts->allow_src)){
unsigned char* b64 = a12helper_tob64(C->pubk, 32, &(size_t){0});

A12INT_DIRTRACE(
"dirsv:kind=reject_register:title=%s:role=%d:eperm:key=%s",
ev.ext.registr.title,
Expand Down
38 changes: 23 additions & 15 deletions src/a12/net/dir_srv_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
static struct arcan_shmif_cont shmif_parent_process;
static struct a12_state* active_client_state;
static struct appl_meta* pending_index;
static struct ioloop_shared* ioloop_shared;
static bool pending_tunnel;

static void do_event(
struct a12_state* S, struct arcan_shmif_cont* C, struct arcan_event* ev);
Expand Down Expand Up @@ -294,9 +296,8 @@ static void bchunk_event(struct a12_state *S,
* ones are not difficult as such but evaluate the need experimentally first. */
else if (strcmp(ev->tgt.message, ".tun") == 0){
a12int_trace(A12_TRACE_DIRECTORY, "worker:tunnel_acquired:channel=1");
S->channels[1].active = true;
S->channels[1].unpack_state.bframe.tmp_fd =
arcan_shmif_dupfd(ev->tgt.ioevs[0].iv, -1, true);
a12_set_tunnel_sink(S, 1, arcan_shmif_dupfd(ev->tgt.ioevs[0].iv, -1, true));
pending_tunnel = true;
}
}

Expand Down Expand Up @@ -353,10 +354,17 @@ static void do_event(

if (a12_remote_mode(S) == ROLE_SOURCE){
a12int_trace(A12_TRACE_DIRECTORY, "open_to_src");

struct a12_dynreq dynreq = (struct a12_dynreq){0};
snprintf(dynreq.authk, 12, "%s", cbt->secret);
memcpy(dynreq.pubk, ev->ext.netstate.name, 32);

if (pending_tunnel){
a12int_trace(A12_TRACE_DIRECTORY, "diropen:tunnel_src");
dynreq.proto = 4;
pending_tunnel = false;
}

a12_supply_dynamic_resource(S, dynreq);
return;
}
Expand Down Expand Up @@ -567,16 +575,14 @@ static bool dirsrv_req_open(struct a12_state* S,

_Static_assert(sizeof(rq.host) == 46, "wrong host-length");

/* reserved .tun as hostname is to tell that we have set a channel as tunnel,
* then spawn a processing thread that reads from the tunnel and injects into
* the state machine. */
if (strcmp(repev.ext.netstate.name, ".tun") == 0){
a12int_trace(A12_TRACE_DIRECTORY, "diropen:tunnel");
rq.proto = 3;
pthread_t pth;
pthread_attr_t pthattr;
pthread_attr_init(&pthattr);
pthread_attr_setdetachstate(&pthattr, PTHREAD_CREATE_DETACHED);
/* if there is a tunnel pending (would arrive as a bchunkstate during
* block_synch_request) tag the proto accordingly and spawn our feeder with
* the src descriptor already being set in the thread. */
if (pending_tunnel){
a12int_trace(A12_TRACE_DIRECTORY, "diropen:tunnel_sink");
rq.proto = 4;
*out = rq;
pending_tunnel = false;
return rv;
}

Expand All @@ -600,10 +606,9 @@ void anet_directory_srv(

netopts->pk_lookup = key_auth_worker;
struct anet_dirsrv_opts diropts = {};

struct arg_arr* args;

a12int_trace(A12_TRACE_DIRECTORY, "notice:directory-ready:pid=%d", getpid());
setenv("ARCAN_SHMIF_DEBUG", "1", true);

shmif_parent_process =
arcan_shmif_open(
Expand Down Expand Up @@ -672,6 +677,9 @@ void anet_directory_srv(
.cbt = &cbt,
};

ioloop_shared = &ioloop;


/* this will loop until client shutdown */
anet_directory_ioloop(&ioloop);
arcan_shmif_drop(&shmif_parent_process);
Expand Down
Loading

0 comments on commit 792626a

Please sign in to comment.