From 1b93bdb9b74714f038a72dbf34c83d5b25ee0ea7 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 21 Dec 2011 16:52:45 +0000 Subject: [PATCH] fix handling of calls with multiple concurrent media streams --- daemon/call.c | 44 +++++++++++++++++++++++++++++++------------- daemon/call.h | 2 ++ daemon/control_udp.c | 16 ++++++++-------- 3 files changed, 41 insertions(+), 21 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index bc70c6353..1e85bb5ec 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -375,7 +375,9 @@ static GHashTable *info_parse(const char *s, GHashTable **h) { static int streams_parse_func(char **a, void **ret, void *p) { struct stream *st; u_int32_t ip; + int *i; + i = p; st = g_slice_alloc0(sizeof(*st)); ip = inet_addr(a[0]); @@ -385,6 +387,7 @@ static int streams_parse_func(char **a, void **ret, void *p) { in4_to_6(&st->ip46, ip); st->port = atoi(a[1]); st->mediatype = strdup(a[2] ? : ""); + st->num = ++(*i); if (!st->port) goto fail; @@ -401,7 +404,9 @@ fail: static GQueue *streams_parse(const char *s) { - return pcre_multi_match(&streams_re, &streams_ree, "^([\\d.]+):(\\d+)(?::(.*?))?(?:$|,)", s, 3, streams_parse_func, NULL); + int i; + i = 0; + return pcre_multi_match(&streams_re, &streams_ree, "^([\\d.]+):(\\d+)(?::(.*?))?(?:$|,)", s, 3, streams_parse_func, &i); } static void streams_free(GQueue *q) { @@ -815,7 +820,7 @@ static void steal_peer(struct peer *dest, struct peer *src) { } -static void callstream_init(struct callstream *s, struct call *ca, int port1, int port2) { +static void callstream_init(struct callstream *s, struct call *ca, int port1, int port2, int num) { int i, j, tport; struct peer *p; struct streamrelay *r; @@ -828,6 +833,8 @@ static void callstream_init(struct callstream *s, struct call *ca, int port1, in ZERO(pi); s->call = ca; + DBG("setting new callstream num to %i", num); + s->num = num; for (i = 0; i < 2; i++) { p = &s->peers[i]; @@ -927,13 +934,13 @@ found: if (!r) { /* nothing found to re-use, open new ports */ - callstream_init(cs, c, 0, 0); + callstream_init(cs, c, 0, 0, t->num); p = &cs->peers[0]; setup_peer(p, t, tag); } else { /* re-use, so don't open new ports */ - callstream_init(cs, c, -1, -1); + callstream_init(cs, c, -1, -1, t->num); if (r->up->idx == 0) { /* request/lookup came in the same order as before */ steal_peer(&cs->peers[0], &cs_o->peers[0]); @@ -952,12 +959,18 @@ found: } /* lookup */ - l = c->callstreams->head; - if (!l) { - mylog(LOG_WARNING, "[%s] Got LOOKUP, but no callstreams found", c->callid); - break; + for (l = c->callstreams->head; l; l = l->next) { + cs = l->data; + DBG("hunting for callstream, %i <> %i", cs->num, t->num); + if (cs->num != t->num) + continue; + goto got_cs; } - cs = l->data; + + mylog(LOG_WARNING, "[%s] Got LOOKUP, but no usable callstreams found", c->callid); + break; + +got_cs: g_queue_delete_link(c->callstreams, l); p = &cs->peers[1]; p2 = &cs->peers[0]; @@ -998,7 +1011,7 @@ found: DBG("case 4"); cs_o = cs; cs = g_slice_alloc(sizeof(*cs)); - callstream_init(cs, c, 0, 0); + callstream_init(cs, c, 0, 0, t->num); steal_peer(&cs->peers[0], &cs_o->peers[0]); p = &cs->peers[1]; setup_peer(p, t, tag); @@ -1229,6 +1242,11 @@ static int addr_parse_udp(struct stream *st, const char **o) { } } + if (o[9]) + st->num = atoi(o[9]); + if (!st->num) + st->num = 1; + return 0; fail: return -1; @@ -1286,7 +1304,7 @@ char *call_lookup_udp(const char **o, struct callmaster *m) { goto fail; g_queue_push_tail(&q, &st); - num = call_streams(c, &q, o[9], 1); + num = call_streams(c, &q, o[10], 1); g_queue_clear(&q); @@ -1358,7 +1376,7 @@ char *call_delete_udp(const char **o, struct callmaster *m) { struct call *c; char *ret; - c = g_hash_table_lookup(m->callhash, o[12]); + c = g_hash_table_lookup(m->callhash, o[13]); if (!c) goto err; @@ -1473,7 +1491,7 @@ void call_restore(struct callmaster *m, char *uuid, redisReply **hash, GList *st rps[1] = streams->data; cs = g_slice_alloc(sizeof(*cs)); - callstream_init(cs, c, atoi(rps[0]->element[2]->str), atoi(rps[1]->element[2]->str)); + callstream_init(cs, c, atoi(rps[0]->element[2]->str), atoi(rps[1]->element[2]->str), -1); /* XXX */ kernel = 0; for (i = 0; i < 2; i++) { diff --git a/daemon/call.h b/daemon/call.h index 5f9399cc6..a168bc60c 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -39,6 +39,7 @@ struct stream { DIR_INTERNAL, DIR_EXTERNAL, } direction[2]; + int num; }; struct streamrelay { int fd; @@ -67,6 +68,7 @@ struct peer { struct callstream { struct peer peers[2]; struct call *call; + int num; }; struct call { diff --git a/daemon/control_udp.c b/daemon/control_udp.c index 93e4bbd4d..891aa64ce 100644 --- a/daemon/control_udp.c +++ b/daemon/control_udp.c @@ -116,9 +116,9 @@ static void control_udp_incoming(int fd, void *p) { reply = call_update_udp(out, u->callmaster); else if (chrtoupper(out[2][0]) == 'L') reply = call_lookup_udp(out, u->callmaster); - else if (chrtoupper(out[10][0]) == 'D') + else if (chrtoupper(out[11][0]) == 'D') reply = call_delete_udp(out, u->callmaster); - else if (chrtoupper(out[13][0]) == 'V') { + else if (chrtoupper(out[14][0]) == 'V') { ZERO(mh); mh.msg_name = &sin; mh.msg_namelen = sizeof(sin); @@ -130,13 +130,13 @@ static void control_udp_incoming(int fd, void *p) { iov[1].iov_base = " "; iov[1].iov_len = 1; - if (chrtoupper(out[14][0]) == 'F') { + if (chrtoupper(out[15][0]) == 'F') { ret = 0; - if (!strcmp(out[15], "20040107")) + if (!strcmp(out[16], "20040107")) ret = 1; - else if (!strcmp(out[15], "20050322")) + else if (!strcmp(out[16], "20050322")) ret = 1; - else if (!strcmp(out[15], "20060704")) + else if (!strcmp(out[16], "20060704")) ret = 1; iov[2].iov_base = ret ? "1\n" : "0\n"; iov[2].iov_len = 2; @@ -200,8 +200,8 @@ struct control_udp *control_udp_new(struct poller *p, struct in6_addr ip, u_int1 c->stale_chunks = g_string_chunk_new(4 * 1024); c->oven_time = p->now; c->parse_re = pcre_compile( - /* cookie:1 cmd:2 flags:3 callid:4 addr4:5 addr6:6 port:7 from_tag:8 to_tag:9 d:10 flags:11 callid:12 v:13 flags:14 parms:15 */ - "^(\\S+)\\s+(?:([ul])(\\S*)\\s+(\\S+)\\s+(?:([\\d.]+)|([\\da-f:]+(?::ffff:[\\d.]+)?))\\s+(\\d+)\\s+(\\S+?)(?:;\\S+)?(?:\\s+(\\S+?)(?:;\\S+)?(?:\\s+.*)?)?\r?\n?$|(d)(\\S*)\\s+(\\S+)|(v)(\\S*)(?:\\s+(\\S+))?)", + /* cookie:1 cmd:2 flags:3 callid:4 addr4:5 addr6:6 port:7 from_tag:8 num:9 to_tag:10 d:11 flags:12 callid:13 v:14 flags:15 parms:16 */ + "^(\\S+)\\s+(?:([ul])(\\S*)\\s+(\\S+)\\s+(?:([\\d.]+)|([\\da-f:]+(?::ffff:[\\d.]+)?))\\s+(\\d+)\\s+(\\S+?);(\\d+)(?:\\s+(\\S+?);\\d+(?:\\s+.*)?)?\r?\n?$|(d)(\\S*)\\s+(\\S+)|(v)(\\S*)(?:\\s+(\\S+))?)", PCRE_DOLLAR_ENDONLY | PCRE_DOTALL | PCRE_CASELESS, &errptr, &erroff, NULL); c->parse_ree = pcre_study(c->parse_re, 0, &errptr); /* cookie cmd flags callid addr port */