diff --git a/daemon/Makefile b/daemon/Makefile index 5901d974f..18982bfd9 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -3,7 +3,7 @@ CFLAGS= -g -Wall `pkg-config --cflags glib-2.0` `pcre-config --cflags` -I/lib/m #CFLAGS+= -O2 LDFLAGS= `pkg-config --libs glib-2.0` `pcre-config --libs` -SRCS= main.c kernel.c poller.c aux.c control.c streambuf.c call.c +SRCS= main.c kernel.c poller.c aux.c control.c streambuf.c call.c control_udp.c OBJS= $(SRCS:.c=.o) diff --git a/daemon/call.c b/daemon/call.c index 5d4989154..1df213b53 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -317,6 +317,7 @@ static int streams_parse_func(char **a, void **ret, void *p) { return 0; fail: + free(st->mediatype); free(st); return -1; } @@ -556,7 +557,7 @@ fail: -static int setup_peer(struct peer *p, struct stream *s, char *tag) { +static int setup_peer(struct peer *p, struct stream *s, const char *tag) { struct streamrelay *a, *b; a = &p->rtps[0]; @@ -683,7 +684,7 @@ static void callstream_init(struct callstream *s, struct call *ca) { -static unsigned int call_streams(struct call *c, GQueue *s, char *tag, int opmode) { +static unsigned int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) { GQueue *q; GList *i, *l; struct stream *t; @@ -845,7 +846,7 @@ static void call_destroy(struct call *c) { -static char *streams_print(GQueue *s, unsigned int num, unsigned int off) { +static char *streams_print(GQueue *s, unsigned int num, unsigned int off, const char *prefix) { GString *o; int i; GList *l; @@ -853,6 +854,8 @@ static char *streams_print(GQueue *s, unsigned int num, unsigned int off) { struct streamrelay *x; o = g_string_new(""); + if (prefix) + g_string_append_printf(o, "%s ", prefix); if (!s->head) goto out; @@ -874,30 +877,97 @@ out: -char *call_request(const char **o, struct callmaster *m) { +static struct call *call_get_or_create(const char *callid, struct callmaster *m) { struct call *c; - GQueue *s; - unsigned int num; - c = g_hash_table_lookup(m->callhash, o[2]); + c = g_hash_table_lookup(m->callhash, callid); if (!c) { - mylog(LOG_NOTICE, "[%s] Creating new call", o[2]); + mylog(LOG_NOTICE, "[%s] Creating new call", callid); c = malloc(sizeof(*c)); ZERO(*c); c->callmaster = m; - c->callid = strdup(o[2]); + c->callid = strdup(callid); c->callstreams = g_queue_new(); c->created = m->poller->now; g_hash_table_insert(m->callhash, c->callid, c); } + return c; +} + +char *call_update_udp(const char **o, struct callmaster *m) { + struct call *c; + GQueue q = G_QUEUE_INIT; + struct stream st; + int num; + + c = call_get_or_create(o[4], m); + strdupfree(&c->calling_agent, "UNKNOWN(udp)"); + + ZERO(st); + st.ip = inet_addr(o[5]); + st.port = atoi(o[6]); + st.mediatype = "unknown"; + if (st.ip == -1 || !st.port) + goto fail; + + g_queue_push_tail(&q, &st); + num = call_streams(c, &q, o[7], 0); + + g_queue_clear(&q); + + return streams_print(c->callstreams, 1, 0, o[1]); + +fail: + return NULL; +} + +char *call_lookup_udp(const char **o, struct callmaster *m) { + struct call *c; + GQueue q = G_QUEUE_INIT; + struct stream st; + int num; + + c = g_hash_table_lookup(m->callhash, o[4]); + if (!c) { + mylog(LOG_WARNING, "[%s] Got UDP LOOKUP for unknown call-id", o[4]); + return NULL; + } + + strdupfree(&c->called_agent, "UNKNOWN(udp)"); + + ZERO(st); + st.ip = inet_addr(o[5]); + st.port = atoi(o[6]); + st.mediatype = "unknown"; + if (st.ip == -1 || !st.port) + goto fail; + + g_queue_push_tail(&q, &st); + num = call_streams(c, &q, o[8], 1); + + g_queue_clear(&q); + + return streams_print(c->callstreams, 1, 1, o[1]); + +fail: + return NULL; +} + +char *call_request(const char **o, struct callmaster *m) { + struct call *c; + GQueue *s; + unsigned int num; + + c = call_get_or_create(o[2], m); + strdupfree(&c->calling_agent, o[9] ? : "UNKNOWN"); info_parse(o[10], &c->infohash); s = streams_parse(o[3]); num = call_streams(c, s, g_hash_table_lookup(c->infohash, "fromtag"), 0); streams_free(s); - return streams_print(c->callstreams, num, 0); + return streams_print(c->callstreams, num, 0, NULL); } char *call_lookup(const char **o, struct callmaster *m) { @@ -917,7 +987,7 @@ char *call_lookup(const char **o, struct callmaster *m) { num = call_streams(c, s, g_hash_table_lookup(c->infohash, "totag"), 1); streams_free(s); - return streams_print(c->callstreams, num, 1); + return streams_print(c->callstreams, num, 1, NULL); } void call_delete(const char **o, struct callmaster *m) { diff --git a/daemon/call.h b/daemon/call.h index 341c16bc9..799b67f88 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -88,7 +88,9 @@ struct callmaster *callmaster_new(struct poller *); char *call_request(const char **, struct callmaster *); +char *call_update_udp(const char **, struct callmaster *); char *call_lookup(const char **, struct callmaster *); +char *call_lookup_udp(const char **, struct callmaster *); void call_delete(const char **, struct callmaster *); void calls_status(struct callmaster *, struct control_stream *); diff --git a/daemon/control_udp.c b/daemon/control_udp.c new file mode 100644 index 000000000..1b043683f --- /dev/null +++ b/daemon/control_udp.c @@ -0,0 +1,140 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "control_udp.h" +#include "poller.h" +#include "aux.h" +#include "log.h" +#include "call.h" + + + + + +static pcre *parse_re; +static pcre_extra *parse_ree; +static GHashTable *cookies; + + + + +static void control_udp_closed(int fd, void *p) { + abort(); +} + +static void control_udp_incoming(int fd, void *p) { + struct control_udp *u = p; + int ret; + char buf[8192]; + struct sockaddr_in sin; + socklen_t sin_len; + int ovec[60]; + const char *errptr; + int erroff; + const char **out; + char *reply; + + sin_len = sizeof(sin); + ret = recvfrom(fd, buf, sizeof(buf) - 1, 0, (struct sockaddr *) &sin, &sin_len); + if (ret <= 0) { + mylog(LOG_WARNING, "Error reading from UDP socket"); + return; + } + + buf[ret] = '\0'; + + if (!parse_re) { + parse_re = pcre_compile( + /* cookie cmd flags callid addr port from_tag to_tag */ + "^(\\S+)\\s+(?:([ul])(\\S*)\\s+(\\S+)\\s+([\\d.]+)\\s+(\\d+)\\s+(\\S+)\\s+(\\S+))", + PCRE_DOLLAR_ENDONLY | PCRE_DOTALL | PCRE_CASELESS, &errptr, &erroff, NULL); + parse_ree = pcre_study(parse_re, 0, &errptr); + } + + ret = pcre_exec(parse_re, parse_ree, buf, ret, 0, 0, ovec, ARRAY_SIZE(ovec)); + if (ret <= 0) { + mylog(LOG_WARNING, "Unable to parse command line from " DF ": %s", DP(sin), buf); + return; + } + + mylog(LOG_INFO, "Got valid command from udp:" DF ": %s", DP(sin), buf); + + pcre_get_substring_list(buf, ovec, ret, &out); + + if (!cookies) + cookies = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); + + /* XXX better hashing */ + reply = g_hash_table_lookup(cookies, out[1]); + if (reply) { + sendto(fd, reply, strlen(reply), 0, (struct sockaddr *) &sin, sin_len); + goto out; + } + + if (out[1][0] == 'u' || out[1][0] == 'U') + reply = call_update_udp(out, u->callmaster); + else if (out[1][0] == 'l' || out[1][0] == 'L') + reply = call_lookup_udp(out, u->callmaster); + + if (reply) { + sendto(fd, reply, strlen(reply), 0, (struct sockaddr *) &sin, sin_len); + g_hash_table_insert(cookies, strdup(out[1]), reply); /* XXX timeout entries */ + } + +out: + pcre_free(out); +} + +struct control_udp *control_udp_new(struct poller *p, u_int32_t ip, u_int16_t port, struct callmaster *m) { + int fd; + struct control_udp *c; + struct poller_item i; + struct sockaddr_in sin; + + if (!p || !m) + return NULL; + + fd = socket(AF_INET, SOCK_DGRAM, 0); + if (fd == -1) + return NULL; + + NONBLOCK(fd); + REUSEADDR(fd); + + ZERO(sin); + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = ip; + sin.sin_port = htons(port); + if (bind(fd, (struct sockaddr *) &sin, sizeof(sin))) + goto fail; + + + c = malloc(sizeof(*c)); + ZERO(*c); + + c->fd = fd; + c->poller = p; + c->callmaster = m; + + ZERO(i); + i.fd = fd; + i.closed = control_udp_closed; + i.readable = control_udp_incoming; + i.ptr = c; + if (poller_add_item(p, &i)) + goto fail2; + + return c; + +fail2: + free(c); +fail: + close(fd); + return NULL; + +} diff --git a/daemon/control_udp.h b/daemon/control_udp.h new file mode 100644 index 000000000..b2c91e9d2 --- /dev/null +++ b/daemon/control_udp.h @@ -0,0 +1,29 @@ +#ifndef __CONTROL_UDP_H__ +#define __CONTROL_UDP_H__ + + + + +struct poller; +struct callmaster; + + + + + +struct control_udp { + int fd; + + struct poller *poller; + struct callmaster *callmaster; +}; + + + + + +struct control_udp *control_udp_new(struct poller *, u_int32_t, u_int16_t, struct callmaster *); + + + +#endif diff --git a/daemon/main.c b/daemon/main.c index e0276945c..14b7af0c5 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -9,6 +9,7 @@ #include "poller.h" #include "control.h" +#include "control_udp.h" #include "aux.h" #include "log.h" #include "call.h" @@ -28,6 +29,8 @@ static gboolean foreground; static u_int32_t ip; static u_int32_t listenp; static u_int16_t listenport; +static u_int32_t udp_listenp; +static u_int16_t udp_listenport; static int tos; static int table; static int timeout; @@ -62,13 +65,46 @@ static void resources(void) { +static int parse_ip_port(u_int32_t *ip, u_int16_t *port, char *s) { + char *p = NULL; + int ret = -1; + + p = strchr(s, ':'); + if (p) { + *p++ = 0; + *ip = inet_addr(s); + if (*ip == -1) + goto out; + *port = atoi(p); + } + else { + *ip = 0; + if (strchr(s, '.')) + goto out; + *port = atoi(s); + } + if (!*port) + goto out; + + ret = 0; + +out: + if (p) + *--p = ':'; + return ret; +} + + + static void options(int *argc, char ***argv) { static char *ips; static char *listenps; + static char *listenudps; static GOptionEntry e[] = { { "table", 't', 0, G_OPTION_ARG_INT, &table, "Kernel table to use", "INT" }, { "ip", 'i', 0, G_OPTION_ARG_STRING, &ips, "Local IP address", "IP" }, - { "listen", 'l', 0, G_OPTION_ARG_STRING, &listenps, "Port to listen on", "[IP:]PORT" }, + { "listen", 'l', 0, G_OPTION_ARG_STRING, &listenps, "TCP port to listen on", "[IP:]PORT" }, + { "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP:]PORT" }, { "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "TOS value to set on streams", "INT" }, { "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" }, { "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" }, @@ -79,7 +115,6 @@ static void options(int *argc, char ***argv) { GOptionContext *c; GError *er = NULL; - char *p; c = g_option_context_new(" - next-generation media proxy"); g_option_context_add_main_entries(c, e, NULL); @@ -88,30 +123,21 @@ static void options(int *argc, char ***argv) { if (!ips) die("Missing option IP\n"); - if (!listenps) - die("Missing option LISTEN\n"); + if (!listenps || !listenudps) + die("Missing option LISTEN or LISTEN-UDP\n"); ip = inet_addr(ips); if (ip == -1) die("Invalid IP\n"); - p = strchr(listenps, ':'); - if (p) { - *p++ = 0; - listenp = inet_addr(listenps); - if (listenp == -1) - die("Invalid IP\n"); - listenport = atoi(p); + if (listenps) { + if (parse_ip_port(&listenp, &listenport, listenps)) + die("Invalid IP or port"); } - else { - if (strchr(listenps, '.')) - die("Invalid port\n"); - listenport = atoi(listenps); + if (listenudps) { + if (parse_ip_port(&udp_listenp, &udp_listenport, listenudps)) + die("Invalid IP or port"); } - if (!listenport) - die("Invalid port\n"); - if (p) - *--p = ':'; if (tos < 0 || tos > 255) die("Invalid TOS value"); @@ -151,6 +177,7 @@ int main(int argc, char **argv) { struct poller *p; struct callmaster *m; struct control *c; + struct control_udp *cu; int kfd; int ret; @@ -179,9 +206,19 @@ int main(int argc, char **argv) { m->silent_timeout = silent_timeout; m->tos = tos; - c = control_new(p, listenp, listenport, m); - if (!c) - die("Failed to open control connection port\n"); + c = NULL; + if (listenport) { + c = control_new(p, listenp, listenport, m); + if (!c) + die("Failed to open TCP control connection port\n"); + } + + cu = NULL; + if (udp_listenport) { + cu = control_udp_new(p, udp_listenp, udp_listenport, m); + if (!cu) + die("Failed to open UDP control connection port\n"); + } mylog(LOG_INFO, "Startup complete");