Skip to content

Commit

Permalink
Implemented PBX integration interface.
Browse files Browse the repository at this point in the history
You may now start/stop "fake" sessions in SM with a pipe interface commands.
  • Loading branch information
smokku committed Jun 4, 2009
1 parent 7713adf commit 67954c9
Show file tree
Hide file tree
Showing 9 changed files with 381 additions and 37 deletions.
2 changes: 1 addition & 1 deletion c2s/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
bin_PROGRAMS = c2s

c2s_SOURCES = authreg.c bind.c c2s.c main.c sm.c
c2s_SOURCES = authreg.c bind.c c2s.c main.c sm.c pbx.c pbx_commands.c
c2s_CPPFLAGS = -DCONFIG_DIR=\"$(sysconfdir)\" -DLIBRARY_DIR=\"$(pkglibdir)\"
c2s_LDFLAGS = -export-dynamic

Expand Down
50 changes: 31 additions & 19 deletions c2s/c2s.c
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ static void _c2s_component_presence(c2s_t c2s, nad_t nad) {
log_debug(ZONE, "killing session %s", jid_user(sess->resources->jid));

sess->active = 0;
sx_close(sess->s);
if(sess->s) sx_close(sess->s);
}
} while(xhash_iter_next(c2s->sessions));

Expand All @@ -680,7 +680,7 @@ int c2s_router_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) {
sx_error_t *sxe;
nad_t nad;
int len, elem, from, c2sid, smid, action, id, ns, attr, scan, replaced;
char skey[10];
char skey[24];
sess_t sess;
bres_t bres, ires;

Expand Down Expand Up @@ -863,15 +863,19 @@ int c2s_router_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) {
}

#ifdef HAVE_SSL
if(c2s->server_fd == NULL && c2s->server_ssl_fd == NULL) {
if(c2s->server_fd == NULL && c2s->server_ssl_fd == NULL && c2s->pbx_pipe == NULL) {
log_write(c2s->log, LOG_ERR, "both normal and SSL ports are disabled, nothing to do!");
#else
if(c2s->server_fd == NULL) {
if(c2s->server_fd == NULL && c2s->pbx_pipe == NULL) {
log_write(c2s->log, LOG_ERR, "server port is disabled, nothing to do!");
#endif
exit(1);
}

/* open PBX integration FIFO */
if(c2s->pbx_pipe != NULL)
c2s_pbx_init(c2s);

/* we're online */
c2s->online = c2s->started = 1;
log_write(c2s->log, LOG_NOTICE, "ready for connections", c2s->id);
Expand Down Expand Up @@ -928,13 +932,13 @@ int c2s_router_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) {
nad_free(nad);
return 0;
}
snprintf(skey, 10, "%.*s", NAD_AVAL_L(nad, c2sid), NAD_AVAL(nad, c2sid));
snprintf(skey, 24, "%.*s", NAD_AVAL_L(nad, c2sid), NAD_AVAL(nad, c2sid));

/* find the session, quietly drop if we don't have it */
sess = xhash_get(c2s->sessions, skey);
if(sess == NULL) {
/* if we get this, the SM probably thinks the session is still active
* so we need to tell SM to free it up */
* so we need to tell SM to free it up */
log_debug(ZONE, "no session for %s", skey);

/* check if it's a started action; otherwise we could end up in an infinite loop
Expand Down Expand Up @@ -985,7 +989,7 @@ int c2s_router_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) {
}

/* if they're pre-stream, then this is leftovers from a previous session */
if(sess->s->state < state_STREAM) {
if(sess->s && sess->s->state < state_STREAM) {
log_debug(ZONE, "session %s is pre-stream", skey);

nad_free(nad);
Expand Down Expand Up @@ -1014,8 +1018,10 @@ int c2s_router_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) {
if(nad_find_attr(nad, 0, -1, "error", NULL) >= 0) {
log_debug(ZONE, "routing error");

sx_error(sess->s, stream_err_INTERNAL_SERVER_ERROR, "internal server error");
sx_close(sess->s);
if(sess->s) {
sx_error(sess->s, stream_err_INTERNAL_SERVER_ERROR, "internal server error");
sx_close(sess->s);
}

nad_free(nad);
return 0;
Expand Down Expand Up @@ -1073,7 +1079,7 @@ int c2s_router_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) {

/* it has to have come from the session manager */
from = nad_find_attr(nad, 0, -1, "from", NULL);
if(!sess->s->req_to || strlen(sess->s->req_to) != NAD_AVAL_L(nad, from) || strncmp(sess->s->req_to, NAD_AVAL(nad, from), NAD_AVAL_L(nad, from)) != 0) {
if(sess->s && (!sess->s->req_to || strlen(sess->s->req_to) != NAD_AVAL_L(nad, from) || strncmp(sess->s->req_to, NAD_AVAL(nad, from), NAD_AVAL_L(nad, from))) != 0) {
log_debug(ZONE, "packet from '%.*s' for %s, but they're not the sm for this sess", NAD_AVAL_L(nad, from), NAD_AVAL(nad, from), skey);
nad_free(nad);
return 0;
Expand Down Expand Up @@ -1107,8 +1113,8 @@ int c2s_router_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) {
if(replaced)
sx_error(sess->s, stream_err_CONFLICT, NULL);

/* close them */
sx_close(sess->s);
/* close the stream if there is one */
if(sess->s) sx_close(sess->s);

nad_free(nad);
return 0;
Expand Down Expand Up @@ -1210,13 +1216,17 @@ int c2s_router_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) {
nad_free(nad);

/* bring them online, old-skool */
if(!sess->sasl_authd) {
if(!sess->sasl_authd && sess->s) {
sx_auth(sess->s, "traditional", jid_full(bres->jid));
return 0;
}

/* return the auth result to the client */
if(sess->result) sx_nad_write(sess->s, sess->result);
if(sess->result) {
/* return the auth result to the client */
if(sess->s) sx_nad_write(sess->s, sess->result);
/* or follow-up the session creation with cached presence packet */
else sm_packet(sess, bres, sess->result);
}
sess->result = NULL;

/* we're good to go */
Expand All @@ -1237,17 +1247,19 @@ int c2s_router_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) {

/* client packets */
if(NAD_NURI_L(nad, NAD_ENS(nad, 1)) == strlen(uri_CLIENT) && strncmp(uri_CLIENT, NAD_NURI(nad, NAD_ENS(nad, 1)), strlen(uri_CLIENT)) == 0) {
if(!sess->active) {
if(!sess->active || !sess->s) {
/* its a strange world .. */
nad_free(nad);
return 0;
}

/* sm is bouncing something */
if(nad_find_attr(nad, 1, ns, "failed", NULL) >= 0) {
/* there's really no graceful way to handle this */
sx_error(s, stream_err_INTERNAL_SERVER_ERROR, "session manager failed control action");
sx_close(s);
if(s) {
/* there's really no graceful way to handle this */
sx_error(s, stream_err_INTERNAL_SERVER_ERROR, "session manager failed control action");
sx_close(s);
}

nad_free(nad);
return 0;
Expand Down
9 changes: 8 additions & 1 deletion c2s/c2s.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ struct sess_st {

mio_fd_t fd;

char skey[10];
char skey[24];

char *ip;
int port;
Expand Down Expand Up @@ -207,6 +207,11 @@ struct c2s_st {
/** http forwarding URL */
char *http_forward;

/** PBX integration named pipe */
char *pbx_pipe;
int pbx_pipe_fd;
mio_fd_t pbx_pipe_mio_fd;

/** max file descriptors */
int io_max_fds;

Expand Down Expand Up @@ -284,6 +289,8 @@ C2S_API void sm_packet(sess_t sess, bres_t res, nad_t nad);

C2S_API int bind_init(sx_env_t env, sx_plugin_t p, va_list args);

C2S_API void c2s_pbx_init(c2s_t c2s);

struct authreg_st
{
c2s_t c2s;
Expand Down
22 changes: 9 additions & 13 deletions c2s/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ static void _c2s_config_expand(c2s_t c2s)
c2s->io_check_idle = j_atoi(config_get_one(c2s->config, "io.check.idle", 0), 0);
c2s->io_check_keepalive = j_atoi(config_get_one(c2s->config, "io.check.keepalive", 0), 0);

c2s->pbx_pipe = config_get_one(c2s->config, "pbx.pipe", 0);

c2s->ar_module_name = config_get_one(c2s->config, "authreg.module", 0);

if(config_get(c2s->config, "authreg.mechanisms.traditional.plain") != NULL) c2s->ar_mechanisms |= AR_MECH_TRAD_PLAIN;
Expand Down Expand Up @@ -241,8 +243,8 @@ static void _c2s_hosts_expand(c2s_t c2s)

elem = config_get(c2s->config, "local.id");
if(!elem) {
log_write(c2s->log, LOG_ERR, "no local.id configured, aborting");
exit(1);
log_write(c2s->log, LOG_NOTICE, "no local.id configured - skipping local domains configuration");
return;
}
for(i = 0; i < elem->nvalues; i++) {
host_t host = (host_t) pmalloco(xhash_pool(c2s->hosts), sizeof(struct host_st));
Expand Down Expand Up @@ -522,7 +524,7 @@ static void _c2s_time_checks(c2s_t c2s) {
xhv.sess_val = &sess;
xhash_iter_get(c2s->sessions, NULL, xhv.val);

if(c2s->io_check_idle > 0 && now > sess->last_activity + c2s->io_check_idle) {
if(c2s->io_check_idle > 0 && sess->s && now > sess->last_activity + c2s->io_check_idle) {
log_write(c2s->log, LOG_NOTICE, "[%d] [%s, port=%d] timed out", sess->fd->fd, sess->ip, sess->port);

sx_error(sess->s, stream_err_HOST_GONE, "connection timed out");
Expand Down Expand Up @@ -645,21 +647,15 @@ JABBER_MAIN("jabberd2c2s", "Jabber 2 C2S", "Jabber Open Source Server: Client to
_c2s_pidfile(c2s);

if(c2s->ar_module_name == NULL) {
log_write(c2s->log, LOG_ERR, "no authreg module specified in config file");
access_free(c2s->access);
config_free(c2s->config);
log_free(c2s->log);
free(c2s);
exit(1);
log_write(c2s->log, LOG_NOTICE, "no authreg module specified in config file");
}

if((c2s->ar = authreg_init(c2s, c2s->ar_module_name)) == NULL) {
else if((c2s->ar = authreg_init(c2s, c2s->ar_module_name)) == NULL) {
access_free(c2s->access);
config_free(c2s->config);
log_free(c2s->log);
free(c2s);
exit(1);
}
}

c2s->sessions = xhash_new(1023);

Expand Down Expand Up @@ -823,7 +819,7 @@ JABBER_MAIN("jabberd2c2s", "Jabber 2 C2S", "Jabber Open Source Server: Client to
xhv.sess_val = &sess;
xhash_iter_get(c2s->sessions, NULL, xhv.val);

if(sess->active)
if(sess->active && sess->s)
sx_close(sess->s);

} while(xhash_iter_next(c2s->sessions));
Expand Down
130 changes: 130 additions & 0 deletions c2s/pbx.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* jabberd - Jabber Open Source Server
* Copyright (c) 2009 Tomasz Sterna
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA02111-1307USA
*/

/** @file c2s/pbx.c
* @brief PBX integration
* @author Tomasz Sterna
* $Date$
* $Revision$
*/

#include "c2s.h"

#define COMMANDLINE_LENGTH_MAX 2048
static void _pbx_close_pipe(c2s_t c2s);
static void _pbx_open_pipe(c2s_t c2s, int mode);
static void _pbx_read_pipe(c2s_t c2s);
static void _pbx_write_pipe(c2s_t c2s);
int _pbx_process_command(c2s_t c2s, char *cmd);

static void _pbx_read_command(c2s_t c2s) {
char buf[COMMANDLINE_LENGTH_MAX];
char *bufp;

bufp = (char*)&buf;
while (read(c2s->pbx_pipe_fd, bufp, 1) > 0)
if(bufp - ((char*)&buf) < COMMANDLINE_LENGTH_MAX-1) bufp++;
*bufp = '\0';

log_debug(ZONE, "command read: %s", buf);

_pbx_close_pipe(c2s);

if(_pbx_process_command(c2s, buf) == 0)
_pbx_write_pipe(c2s);

_pbx_read_pipe(c2s);
}

static int _pbx_mio_callback(mio_t m, mio_action_t a, mio_fd_t fd, void *data, void *arg) {
c2s_t c2s = (c2s_t) arg;

log_debug(ZONE, "action %s on PBX pipe", a==0?"action_ACCEPT":a==1?"action_READ":a==2?"action_WRITE":a==3?"action_CLOSE":"-unknown-");

switch(a) {
case action_READ:
log_debug(ZONE, "read action on fd %d", fd->fd);
_pbx_read_command(c2s);
return 1; /* want to read again */

case action_WRITE:
/* write buffered lines from jqueue */
_pbx_close_pipe(c2s);
return 0;

case action_CLOSE:
c2s->pbx_pipe_mio_fd = 0;
c2s->pbx_pipe_fd = -1;
return 0;

default:
break;
}

return 0;
}

static void _pbx_close_pipe(c2s_t c2s) {
log_debug(ZONE, "### close_pipe");
if(c2s->pbx_pipe_mio_fd)
mio_close(c2s->mio, c2s->pbx_pipe_mio_fd);
}

static void _pbx_open_pipe(c2s_t c2s, int mode) {
log_debug(ZONE, "### open_pipe");
c2s->pbx_pipe_fd = open(c2s->pbx_pipe, mode | O_NONBLOCK);
if(c2s->pbx_pipe_fd == -1) {
c2s->pbx_pipe_mio_fd = 0;
log_debug(ZONE, "error opening pipe: %d %s", errno, strerror(errno));
log_write(c2s->log, LOG_ERR, "failed to open PBX named pipe %s for %s", c2s->pbx_pipe, mode==O_RDONLY?"reading":"writing");
exit(EXIT_FAILURE);
} else
c2s->pbx_pipe_mio_fd = mio_register(c2s->mio, c2s->pbx_pipe_fd, _pbx_mio_callback, (void *) c2s);
}
/* open pipe for reading */
static void _pbx_read_pipe(c2s_t c2s) {
log_debug(ZONE, "### read_pipe");
_pbx_open_pipe(c2s, O_RDONLY);
mio_read(c2s->mio, c2s->pbx_pipe_mio_fd);
}
/* trigger buffer write */
static void _pbx_write_pipe(c2s_t c2s) {
log_debug(ZONE, "### write_pipe");
_pbx_open_pipe(c2s, O_RDWR);
mio_write(c2s->mio, c2s->pbx_pipe_mio_fd);
}

void c2s_pbx_init(c2s_t c2s) {
struct stat sb;

/* create the FIFO */
if(stat(c2s->pbx_pipe, &sb) == -1) {
if(mkfifo(c2s->pbx_pipe, S_IRUSR | S_IWUSR | S_IRGRP) == -1) {
log_write(c2s->log, LOG_ERR, "failed to create PBX named pipe: %s", c2s->pbx_pipe);
exit(EXIT_FAILURE);
}
}else{
if(!S_ISFIFO(sb.st_mode)) {
log_write(c2s->log, LOG_ERR, "file %s exists but is not a named pipe", c2s->pbx_pipe);
exit(EXIT_FAILURE);
}
}

_pbx_read_pipe(c2s);
}
Loading

0 comments on commit 67954c9

Please sign in to comment.