|
|
@ -925,8 +925,7 @@ fail: |
|
|
release_port(b); |
|
|
release_port(b); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* caller is responsible for appropriate locking */ |
|
|
static int setup_peer(struct peer *p, struct stream *s, const char *tag) { |
|
|
static int setup_peer(struct peer *p, struct stream *s, const char *tag) { |
|
|
struct streamrelay *a, *b; |
|
|
struct streamrelay *a, *b; |
|
|
struct callstream *cs; |
|
|
struct callstream *cs; |
|
|
@ -971,6 +970,7 @@ static int setup_peer(struct peer *p, struct stream *s, const char *tag) { |
|
|
return 0; |
|
|
return 0; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* caller is responsible for appropriate locking */ |
|
|
static void steal_peer(struct peer *dest, struct peer *src) { |
|
|
static void steal_peer(struct peer *dest, struct peer *src) { |
|
|
struct streamrelay *r; |
|
|
struct streamrelay *r; |
|
|
int i; |
|
|
int i; |
|
|
@ -1118,6 +1118,7 @@ static void callstream_free(void *ptr) { |
|
|
obj_put(s->call); |
|
|
obj_put(s->call); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* called with call->lock held */ |
|
|
static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) { |
|
|
static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) { |
|
|
GQueue *q; |
|
|
GQueue *q; |
|
|
GList *i, *l; |
|
|
GList *i, *l; |
|
|
@ -1141,6 +1142,7 @@ static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) |
|
|
/* look for an existing call stream with identical parameters */ |
|
|
/* look for an existing call stream with identical parameters */ |
|
|
for (l = c->callstreams->head; l; l = l->next) { |
|
|
for (l = c->callstreams->head; l; l = l->next) { |
|
|
cs_o = l->data; |
|
|
cs_o = l->data; |
|
|
|
|
|
mutex_lock(&cs_o->lock); |
|
|
for (x = 0; x < 2; x++) { |
|
|
for (x = 0; x < 2; x++) { |
|
|
r = &cs_o->peers[x].rtps[0]; |
|
|
r = &cs_o->peers[x].rtps[0]; |
|
|
DBG("comparing new ["IP6F"]:%u/%s to old ["IP6F"]:%u/%s", |
|
|
DBG("comparing new ["IP6F"]:%u/%s to old ["IP6F"]:%u/%s", |
|
|
@ -1156,6 +1158,7 @@ static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) |
|
|
DBG("found existing call stream to steal"); |
|
|
DBG("found existing call stream to steal"); |
|
|
goto found; |
|
|
goto found; |
|
|
} |
|
|
} |
|
|
|
|
|
mutex_unlock(&cs_o->lock); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
/* not found */ |
|
|
/* not found */ |
|
|
@ -1164,7 +1167,7 @@ static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) |
|
|
l = NULL; |
|
|
l = NULL; |
|
|
|
|
|
|
|
|
found: |
|
|
found: |
|
|
|
|
|
|
|
|
|
|
|
/* cs_o remains locked if set */ |
|
|
if (!opmode) { /* request */ |
|
|
if (!opmode) { /* request */ |
|
|
DBG("creating new callstream"); |
|
|
DBG("creating new callstream"); |
|
|
|
|
|
|
|
|
@ -1189,9 +1192,10 @@ found: |
|
|
steal_peer(&cs->peers[0], &cs_o->peers[1]); |
|
|
steal_peer(&cs->peers[0], &cs_o->peers[1]); |
|
|
steal_peer(&cs->peers[1], &cs_o->peers[0]); |
|
|
steal_peer(&cs->peers[1], &cs_o->peers[0]); |
|
|
} |
|
|
} |
|
|
|
|
|
mutex_unlock(&cs_o->lock); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
g_queue_push_tail(q, cs); /* hand over the ref */ |
|
|
|
|
|
|
|
|
g_queue_push_tail(q, cs); /* hand over the ref of new cs */ |
|
|
ZERO(c->lookup_done); |
|
|
ZERO(c->lookup_done); |
|
|
continue; |
|
|
continue; |
|
|
} |
|
|
} |
|
|
@ -1199,18 +1203,25 @@ found: |
|
|
/* lookup */ |
|
|
/* lookup */ |
|
|
for (l = c->callstreams->head; l; l = l->next) { |
|
|
for (l = c->callstreams->head; l; l = l->next) { |
|
|
cs = l->data; |
|
|
cs = l->data; |
|
|
|
|
|
if (cs != cs_o) |
|
|
|
|
|
mutex_lock(&cs->lock); |
|
|
DBG("hunting for callstream, %i <> %i", cs->num, t->num); |
|
|
DBG("hunting for callstream, %i <> %i", cs->num, t->num); |
|
|
if (cs->num != t->num) |
|
|
|
|
|
continue; |
|
|
|
|
|
goto got_cs; |
|
|
|
|
|
|
|
|
if (cs->num == t->num) |
|
|
|
|
|
goto got_cs; |
|
|
|
|
|
if (cs != cs_o) |
|
|
|
|
|
mutex_unlock(&cs->lock); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
mylog(LOG_WARNING, LOG_PREFIX_CI "Got LOOKUP, but no usable callstreams found", |
|
|
mylog(LOG_WARNING, LOG_PREFIX_CI "Got LOOKUP, but no usable callstreams found", |
|
|
LOG_PARAMS_CI(c)); |
|
|
LOG_PARAMS_CI(c)); |
|
|
|
|
|
if (cs_o) |
|
|
|
|
|
mutex_unlock(&cs_o->lock); |
|
|
break; |
|
|
break; |
|
|
|
|
|
|
|
|
got_cs: |
|
|
got_cs: |
|
|
g_queue_delete_link(c->callstreams, l); |
|
|
|
|
|
|
|
|
/* cs and cs_o remain locked, and maybe cs == cs_o */ |
|
|
|
|
|
/* r == peer[x].rtp[0] of cs_o */ |
|
|
|
|
|
g_queue_delete_link(c->callstreams, l); /* steal cs ref */ |
|
|
p = &cs->peers[1]; |
|
|
p = &cs->peers[1]; |
|
|
p2 = &cs->peers[0]; |
|
|
p2 = &cs->peers[0]; |
|
|
|
|
|
|
|
|
@ -1248,19 +1259,25 @@ got_cs: |
|
|
/* nothing found to steal and this end is used */ |
|
|
/* nothing found to steal and this end is used */ |
|
|
/* need a new call stream after all */ |
|
|
/* need a new call stream after all */ |
|
|
DBG("case 4"); |
|
|
DBG("case 4"); |
|
|
|
|
|
if (cs_o) |
|
|
|
|
|
mutex_unlock(&cs_o->lock); |
|
|
cs_o = cs; |
|
|
cs_o = cs; |
|
|
cs = obj_alloc0("callstream", sizeof(*cs), callstream_free); |
|
|
cs = obj_alloc0("callstream", sizeof(*cs), callstream_free); |
|
|
callstream_init(cs, c, 0, 0, t->num); |
|
|
callstream_init(cs, c, 0, 0, t->num); |
|
|
|
|
|
mutex_lock(&cs->lock); |
|
|
steal_peer(&cs->peers[0], &cs_o->peers[0]); |
|
|
steal_peer(&cs->peers[0], &cs_o->peers[0]); |
|
|
p = &cs->peers[1]; |
|
|
p = &cs->peers[1]; |
|
|
setup_peer(p, t, tag); |
|
|
setup_peer(p, t, tag); |
|
|
g_queue_push_tail(c->callstreams, cs_o); /* hand over ref XXX? */ |
|
|
|
|
|
|
|
|
g_queue_push_tail(c->callstreams, cs_o); /* hand over ref to original cs */ |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
time(&c->lookup_done); |
|
|
time(&c->lookup_done); |
|
|
|
|
|
|
|
|
skip: |
|
|
skip: |
|
|
g_queue_push_tail(q, p->up); |
|
|
|
|
|
|
|
|
g_queue_push_tail(q, p->up); /* hand over ref to cs */ |
|
|
|
|
|
mutex_unlock(&cs->lock); |
|
|
|
|
|
if (cs_o && cs_o != cs) |
|
|
|
|
|
mutex_unlock(&cs_o->lock); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
ret = ret * q->length; |
|
|
ret = ret * q->length; |
|
|
@ -1304,7 +1321,7 @@ static void unkernelize(struct peer *p) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* called with callstream->lock held */ |
|
|
static void kill_callstream(struct callstream *s) { |
|
|
static void kill_callstream(struct callstream *s) { |
|
|
int i, j; |
|
|
int i, j; |
|
|
struct peer *p; |
|
|
struct peer *p; |
|
|
@ -1342,6 +1359,7 @@ static void call_destroy(struct call *c) { |
|
|
redis_delete(c, m->conf.redis); |
|
|
redis_delete(c, m->conf.redis); |
|
|
|
|
|
|
|
|
mutex_lock(&c->lock); |
|
|
mutex_lock(&c->lock); |
|
|
|
|
|
/* at this point, no more callstreams can be added */ |
|
|
mylog(LOG_INFO, LOG_PREFIX_C "Final packet stats:", c->callid); |
|
|
mylog(LOG_INFO, LOG_PREFIX_C "Final packet stats:", c->callid); |
|
|
while (c->callstreams->head) { |
|
|
while (c->callstreams->head) { |
|
|
s = g_queue_pop_head(c->callstreams); |
|
|
s = g_queue_pop_head(c->callstreams); |
|
|
|