From 0f6d89817148713ba8aa9232436c9129a047c48c Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Sun, 5 Aug 2012 16:38:21 +0000 Subject: [PATCH] add proper locking to callstream setup procedure --- daemon/call.c | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index e64cf501e..94f6279cc 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -925,8 +925,7 @@ fail: release_port(b); } - - +/* caller is responsible for appropriate locking */ static int setup_peer(struct peer *p, struct stream *s, const char *tag) { struct streamrelay *a, *b; struct callstream *cs; @@ -971,6 +970,7 @@ static int setup_peer(struct peer *p, struct stream *s, const char *tag) { return 0; } +/* caller is responsible for appropriate locking */ static void steal_peer(struct peer *dest, struct peer *src) { struct streamrelay *r; int i; @@ -1118,6 +1118,7 @@ static void callstream_free(void *ptr) { obj_put(s->call); } +/* called with call->lock held */ static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) { GQueue *q; GList *i, *l; @@ -1141,6 +1142,7 @@ static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) /* look for an existing call stream with identical parameters */ for (l = c->callstreams->head; l; l = l->next) { cs_o = l->data; + mutex_lock(&cs_o->lock); for (x = 0; x < 2; x++) { r = &cs_o->peers[x].rtps[0]; DBG("comparing new ["IP6F"]:%u/%s to old ["IP6F"]:%u/%s", @@ -1156,6 +1158,7 @@ static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) DBG("found existing call stream to steal"); goto found; } + mutex_unlock(&cs_o->lock); } /* not found */ @@ -1164,7 +1167,7 @@ static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) l = NULL; found: - + /* cs_o remains locked if set */ if (!opmode) { /* request */ DBG("creating new callstream"); @@ -1189,9 +1192,10 @@ found: steal_peer(&cs->peers[0], &cs_o->peers[1]); steal_peer(&cs->peers[1], &cs_o->peers[0]); } + mutex_unlock(&cs_o->lock); } - g_queue_push_tail(q, cs); /* hand over the ref */ + g_queue_push_tail(q, cs); /* hand over the ref of new cs */ ZERO(c->lookup_done); continue; } @@ -1199,18 +1203,25 @@ found: /* lookup */ for (l = c->callstreams->head; l; l = l->next) { cs = l->data; + if (cs != cs_o) + mutex_lock(&cs->lock); DBG("hunting for callstream, %i <> %i", cs->num, t->num); - if (cs->num != t->num) - continue; - goto got_cs; + if (cs->num == t->num) + goto got_cs; + if (cs != cs_o) + mutex_unlock(&cs->lock); } mylog(LOG_WARNING, LOG_PREFIX_CI "Got LOOKUP, but no usable callstreams found", LOG_PARAMS_CI(c)); + if (cs_o) + mutex_unlock(&cs_o->lock); break; got_cs: - g_queue_delete_link(c->callstreams, l); + /* cs and cs_o remain locked, and maybe cs == cs_o */ + /* r == peer[x].rtp[0] of cs_o */ + g_queue_delete_link(c->callstreams, l); /* steal cs ref */ p = &cs->peers[1]; p2 = &cs->peers[0]; @@ -1248,19 +1259,25 @@ got_cs: /* nothing found to steal and this end is used */ /* need a new call stream after all */ DBG("case 4"); + if (cs_o) + mutex_unlock(&cs_o->lock); cs_o = cs; cs = obj_alloc0("callstream", sizeof(*cs), callstream_free); callstream_init(cs, c, 0, 0, t->num); + mutex_lock(&cs->lock); steal_peer(&cs->peers[0], &cs_o->peers[0]); p = &cs->peers[1]; setup_peer(p, t, tag); - g_queue_push_tail(c->callstreams, cs_o); /* hand over ref XXX? */ + g_queue_push_tail(c->callstreams, cs_o); /* hand over ref to original cs */ } time(&c->lookup_done); skip: - g_queue_push_tail(q, p->up); + g_queue_push_tail(q, p->up); /* hand over ref to cs */ + mutex_unlock(&cs->lock); + if (cs_o && cs_o != cs) + mutex_unlock(&cs_o->lock); } ret = ret * q->length; @@ -1304,7 +1321,7 @@ static void unkernelize(struct peer *p) { } - +/* called with callstream->lock held */ static void kill_callstream(struct callstream *s) { int i, j; struct peer *p; @@ -1342,6 +1359,7 @@ static void call_destroy(struct call *c) { redis_delete(c, m->conf.redis); mutex_lock(&c->lock); + /* at this point, no more callstreams can be added */ mylog(LOG_INFO, LOG_PREFIX_C "Final packet stats:", c->callid); while (c->callstreams->head) { s = g_queue_pop_head(c->callstreams);