Browse Source

First step in implementation of UDP control protocol - untested/unfinished

git.mgm/mediaproxy-ng/2.0
Richard Fuchs 15 years ago
parent
commit
b6370bf556
6 changed files with 312 additions and 34 deletions
  1. +1
    -1
      daemon/Makefile
  2. +81
    -11
      daemon/call.c
  3. +2
    -0
      daemon/call.h
  4. +140
    -0
      daemon/control_udp.c
  5. +29
    -0
      daemon/control_udp.h
  6. +59
    -22
      daemon/main.c

+ 1
- 1
daemon/Makefile View File

@ -3,7 +3,7 @@ CFLAGS= -g -Wall `pkg-config --cflags glib-2.0` `pcre-config --cflags` -I/lib/m
#CFLAGS+= -O2 #CFLAGS+= -O2
LDFLAGS= `pkg-config --libs glib-2.0` `pcre-config --libs` LDFLAGS= `pkg-config --libs glib-2.0` `pcre-config --libs`
SRCS= main.c kernel.c poller.c aux.c control.c streambuf.c call.c
SRCS= main.c kernel.c poller.c aux.c control.c streambuf.c call.c control_udp.c
OBJS= $(SRCS:.c=.o) OBJS= $(SRCS:.c=.o)


+ 81
- 11
daemon/call.c View File

@ -317,6 +317,7 @@ static int streams_parse_func(char **a, void **ret, void *p) {
return 0; return 0;
fail: fail:
free(st->mediatype);
free(st); free(st);
return -1; return -1;
} }
@ -556,7 +557,7 @@ fail:
static int setup_peer(struct peer *p, struct stream *s, char *tag) {
static int setup_peer(struct peer *p, struct stream *s, const char *tag) {
struct streamrelay *a, *b; struct streamrelay *a, *b;
a = &p->rtps[0]; a = &p->rtps[0];
@ -683,7 +684,7 @@ static void callstream_init(struct callstream *s, struct call *ca) {
static unsigned int call_streams(struct call *c, GQueue *s, char *tag, int opmode) {
static unsigned int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) {
GQueue *q; GQueue *q;
GList *i, *l; GList *i, *l;
struct stream *t; struct stream *t;
@ -845,7 +846,7 @@ static void call_destroy(struct call *c) {
static char *streams_print(GQueue *s, unsigned int num, unsigned int off) {
static char *streams_print(GQueue *s, unsigned int num, unsigned int off, const char *prefix) {
GString *o; GString *o;
int i; int i;
GList *l; GList *l;
@ -853,6 +854,8 @@ static char *streams_print(GQueue *s, unsigned int num, unsigned int off) {
struct streamrelay *x; struct streamrelay *x;
o = g_string_new(""); o = g_string_new("");
if (prefix)
g_string_append_printf(o, "%s ", prefix);
if (!s->head) if (!s->head)
goto out; goto out;
@ -874,30 +877,97 @@ out:
char *call_request(const char **o, struct callmaster *m) {
static struct call *call_get_or_create(const char *callid, struct callmaster *m) {
struct call *c; struct call *c;
GQueue *s;
unsigned int num;
c = g_hash_table_lookup(m->callhash, o[2]);
c = g_hash_table_lookup(m->callhash, callid);
if (!c) { if (!c) {
mylog(LOG_NOTICE, "[%s] Creating new call", o[2]);
mylog(LOG_NOTICE, "[%s] Creating new call", callid);
c = malloc(sizeof(*c)); c = malloc(sizeof(*c));
ZERO(*c); ZERO(*c);
c->callmaster = m; c->callmaster = m;
c->callid = strdup(o[2]);
c->callid = strdup(callid);
c->callstreams = g_queue_new(); c->callstreams = g_queue_new();
c->created = m->poller->now; c->created = m->poller->now;
g_hash_table_insert(m->callhash, c->callid, c); g_hash_table_insert(m->callhash, c->callid, c);
} }
return c;
}
char *call_update_udp(const char **o, struct callmaster *m) {
struct call *c;
GQueue q = G_QUEUE_INIT;
struct stream st;
int num;
c = call_get_or_create(o[4], m);
strdupfree(&c->calling_agent, "UNKNOWN(udp)");
ZERO(st);
st.ip = inet_addr(o[5]);
st.port = atoi(o[6]);
st.mediatype = "unknown";
if (st.ip == -1 || !st.port)
goto fail;
g_queue_push_tail(&q, &st);
num = call_streams(c, &q, o[7], 0);
g_queue_clear(&q);
return streams_print(c->callstreams, 1, 0, o[1]);
fail:
return NULL;
}
char *call_lookup_udp(const char **o, struct callmaster *m) {
struct call *c;
GQueue q = G_QUEUE_INIT;
struct stream st;
int num;
c = g_hash_table_lookup(m->callhash, o[4]);
if (!c) {
mylog(LOG_WARNING, "[%s] Got UDP LOOKUP for unknown call-id", o[4]);
return NULL;
}
strdupfree(&c->called_agent, "UNKNOWN(udp)");
ZERO(st);
st.ip = inet_addr(o[5]);
st.port = atoi(o[6]);
st.mediatype = "unknown";
if (st.ip == -1 || !st.port)
goto fail;
g_queue_push_tail(&q, &st);
num = call_streams(c, &q, o[8], 1);
g_queue_clear(&q);
return streams_print(c->callstreams, 1, 1, o[1]);
fail:
return NULL;
}
char *call_request(const char **o, struct callmaster *m) {
struct call *c;
GQueue *s;
unsigned int num;
c = call_get_or_create(o[2], m);
strdupfree(&c->calling_agent, o[9] ? : "UNKNOWN"); strdupfree(&c->calling_agent, o[9] ? : "UNKNOWN");
info_parse(o[10], &c->infohash); info_parse(o[10], &c->infohash);
s = streams_parse(o[3]); s = streams_parse(o[3]);
num = call_streams(c, s, g_hash_table_lookup(c->infohash, "fromtag"), 0); num = call_streams(c, s, g_hash_table_lookup(c->infohash, "fromtag"), 0);
streams_free(s); streams_free(s);
return streams_print(c->callstreams, num, 0);
return streams_print(c->callstreams, num, 0, NULL);
} }
char *call_lookup(const char **o, struct callmaster *m) { char *call_lookup(const char **o, struct callmaster *m) {
@ -917,7 +987,7 @@ char *call_lookup(const char **o, struct callmaster *m) {
num = call_streams(c, s, g_hash_table_lookup(c->infohash, "totag"), 1); num = call_streams(c, s, g_hash_table_lookup(c->infohash, "totag"), 1);
streams_free(s); streams_free(s);
return streams_print(c->callstreams, num, 1);
return streams_print(c->callstreams, num, 1, NULL);
} }
void call_delete(const char **o, struct callmaster *m) { void call_delete(const char **o, struct callmaster *m) {


+ 2
- 0
daemon/call.h View File

@ -88,7 +88,9 @@ struct callmaster *callmaster_new(struct poller *);
char *call_request(const char **, struct callmaster *); char *call_request(const char **, struct callmaster *);
char *call_update_udp(const char **, struct callmaster *);
char *call_lookup(const char **, struct callmaster *); char *call_lookup(const char **, struct callmaster *);
char *call_lookup_udp(const char **, struct callmaster *);
void call_delete(const char **, struct callmaster *); void call_delete(const char **, struct callmaster *);
void calls_status(struct callmaster *, struct control_stream *); void calls_status(struct callmaster *, struct control_stream *);


+ 140
- 0
daemon/control_udp.c View File

@ -0,0 +1,140 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <pcre.h>
#include <glib.h>
#include "control_udp.h"
#include "poller.h"
#include "aux.h"
#include "log.h"
#include "call.h"
static pcre *parse_re;
static pcre_extra *parse_ree;
static GHashTable *cookies;
static void control_udp_closed(int fd, void *p) {
abort();
}
static void control_udp_incoming(int fd, void *p) {
struct control_udp *u = p;
int ret;
char buf[8192];
struct sockaddr_in sin;
socklen_t sin_len;
int ovec[60];
const char *errptr;
int erroff;
const char **out;
char *reply;
sin_len = sizeof(sin);
ret = recvfrom(fd, buf, sizeof(buf) - 1, 0, (struct sockaddr *) &sin, &sin_len);
if (ret <= 0) {
mylog(LOG_WARNING, "Error reading from UDP socket");
return;
}
buf[ret] = '\0';
if (!parse_re) {
parse_re = pcre_compile(
/* cookie cmd flags callid addr port from_tag to_tag */
"^(\\S+)\\s+(?:([ul])(\\S*)\\s+(\\S+)\\s+([\\d.]+)\\s+(\\d+)\\s+(\\S+)\\s+(\\S+))",
PCRE_DOLLAR_ENDONLY | PCRE_DOTALL | PCRE_CASELESS, &errptr, &erroff, NULL);
parse_ree = pcre_study(parse_re, 0, &errptr);
}
ret = pcre_exec(parse_re, parse_ree, buf, ret, 0, 0, ovec, ARRAY_SIZE(ovec));
if (ret <= 0) {
mylog(LOG_WARNING, "Unable to parse command line from " DF ": %s", DP(sin), buf);
return;
}
mylog(LOG_INFO, "Got valid command from udp:" DF ": %s", DP(sin), buf);
pcre_get_substring_list(buf, ovec, ret, &out);
if (!cookies)
cookies = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
/* XXX better hashing */
reply = g_hash_table_lookup(cookies, out[1]);
if (reply) {
sendto(fd, reply, strlen(reply), 0, (struct sockaddr *) &sin, sin_len);
goto out;
}
if (out[1][0] == 'u' || out[1][0] == 'U')
reply = call_update_udp(out, u->callmaster);
else if (out[1][0] == 'l' || out[1][0] == 'L')
reply = call_lookup_udp(out, u->callmaster);
if (reply) {
sendto(fd, reply, strlen(reply), 0, (struct sockaddr *) &sin, sin_len);
g_hash_table_insert(cookies, strdup(out[1]), reply); /* XXX timeout entries */
}
out:
pcre_free(out);
}
struct control_udp *control_udp_new(struct poller *p, u_int32_t ip, u_int16_t port, struct callmaster *m) {
int fd;
struct control_udp *c;
struct poller_item i;
struct sockaddr_in sin;
if (!p || !m)
return NULL;
fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd == -1)
return NULL;
NONBLOCK(fd);
REUSEADDR(fd);
ZERO(sin);
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = ip;
sin.sin_port = htons(port);
if (bind(fd, (struct sockaddr *) &sin, sizeof(sin)))
goto fail;
c = malloc(sizeof(*c));
ZERO(*c);
c->fd = fd;
c->poller = p;
c->callmaster = m;
ZERO(i);
i.fd = fd;
i.closed = control_udp_closed;
i.readable = control_udp_incoming;
i.ptr = c;
if (poller_add_item(p, &i))
goto fail2;
return c;
fail2:
free(c);
fail:
close(fd);
return NULL;
}

+ 29
- 0
daemon/control_udp.h View File

@ -0,0 +1,29 @@
#ifndef __CONTROL_UDP_H__
#define __CONTROL_UDP_H__
struct poller;
struct callmaster;
struct control_udp {
int fd;
struct poller *poller;
struct callmaster *callmaster;
};
struct control_udp *control_udp_new(struct poller *, u_int32_t, u_int16_t, struct callmaster *);
#endif

+ 59
- 22
daemon/main.c View File

@ -9,6 +9,7 @@
#include "poller.h" #include "poller.h"
#include "control.h" #include "control.h"
#include "control_udp.h"
#include "aux.h" #include "aux.h"
#include "log.h" #include "log.h"
#include "call.h" #include "call.h"
@ -28,6 +29,8 @@ static gboolean foreground;
static u_int32_t ip; static u_int32_t ip;
static u_int32_t listenp; static u_int32_t listenp;
static u_int16_t listenport; static u_int16_t listenport;
static u_int32_t udp_listenp;
static u_int16_t udp_listenport;
static int tos; static int tos;
static int table; static int table;
static int timeout; static int timeout;
@ -62,13 +65,46 @@ static void resources(void) {
static int parse_ip_port(u_int32_t *ip, u_int16_t *port, char *s) {
char *p = NULL;
int ret = -1;
p = strchr(s, ':');
if (p) {
*p++ = 0;
*ip = inet_addr(s);
if (*ip == -1)
goto out;
*port = atoi(p);
}
else {
*ip = 0;
if (strchr(s, '.'))
goto out;
*port = atoi(s);
}
if (!*port)
goto out;
ret = 0;
out:
if (p)
*--p = ':';
return ret;
}
static void options(int *argc, char ***argv) { static void options(int *argc, char ***argv) {
static char *ips; static char *ips;
static char *listenps; static char *listenps;
static char *listenudps;
static GOptionEntry e[] = { static GOptionEntry e[] = {
{ "table", 't', 0, G_OPTION_ARG_INT, &table, "Kernel table to use", "INT" }, { "table", 't', 0, G_OPTION_ARG_INT, &table, "Kernel table to use", "INT" },
{ "ip", 'i', 0, G_OPTION_ARG_STRING, &ips, "Local IP address", "IP" }, { "ip", 'i', 0, G_OPTION_ARG_STRING, &ips, "Local IP address", "IP" },
{ "listen", 'l', 0, G_OPTION_ARG_STRING, &listenps, "Port to listen on", "[IP:]PORT" },
{ "listen", 'l', 0, G_OPTION_ARG_STRING, &listenps, "TCP port to listen on", "[IP:]PORT" },
{ "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP:]PORT" },
{ "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "TOS value to set on streams", "INT" }, { "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "TOS value to set on streams", "INT" },
{ "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" }, { "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" },
{ "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" }, { "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" },
@ -79,7 +115,6 @@ static void options(int *argc, char ***argv) {
GOptionContext *c; GOptionContext *c;
GError *er = NULL; GError *er = NULL;
char *p;
c = g_option_context_new(" - next-generation media proxy"); c = g_option_context_new(" - next-generation media proxy");
g_option_context_add_main_entries(c, e, NULL); g_option_context_add_main_entries(c, e, NULL);
@ -88,30 +123,21 @@ static void options(int *argc, char ***argv) {
if (!ips) if (!ips)
die("Missing option IP\n"); die("Missing option IP\n");
if (!listenps)
die("Missing option LISTEN\n");
if (!listenps || !listenudps)
die("Missing option LISTEN or LISTEN-UDP\n");
ip = inet_addr(ips); ip = inet_addr(ips);
if (ip == -1) if (ip == -1)
die("Invalid IP\n"); die("Invalid IP\n");
p = strchr(listenps, ':');
if (p) {
*p++ = 0;
listenp = inet_addr(listenps);
if (listenp == -1)
die("Invalid IP\n");
listenport = atoi(p);
if (listenps) {
if (parse_ip_port(&listenp, &listenport, listenps))
die("Invalid IP or port");
} }
else {
if (strchr(listenps, '.'))
die("Invalid port\n");
listenport = atoi(listenps);
if (listenudps) {
if (parse_ip_port(&udp_listenp, &udp_listenport, listenudps))
die("Invalid IP or port");
} }
if (!listenport)
die("Invalid port\n");
if (p)
*--p = ':';
if (tos < 0 || tos > 255) if (tos < 0 || tos > 255)
die("Invalid TOS value"); die("Invalid TOS value");
@ -151,6 +177,7 @@ int main(int argc, char **argv) {
struct poller *p; struct poller *p;
struct callmaster *m; struct callmaster *m;
struct control *c; struct control *c;
struct control_udp *cu;
int kfd; int kfd;
int ret; int ret;
@ -179,9 +206,19 @@ int main(int argc, char **argv) {
m->silent_timeout = silent_timeout; m->silent_timeout = silent_timeout;
m->tos = tos; m->tos = tos;
c = control_new(p, listenp, listenport, m);
if (!c)
die("Failed to open control connection port\n");
c = NULL;
if (listenport) {
c = control_new(p, listenp, listenport, m);
if (!c)
die("Failed to open TCP control connection port\n");
}
cu = NULL;
if (udp_listenport) {
cu = control_udp_new(p, udp_listenp, udp_listenport, m);
if (!cu)
die("Failed to open UDP control connection port\n");
}
mylog(LOG_INFO, "Startup complete"); mylog(LOG_INFO, "Startup complete");


Loading…
Cancel
Save