Browse Source

TT#91151 implement one-to-many forwarding

Change-Id: I80fd35da680d4ad1f4d3d21f14f11363106b9917
pull/1295/head
Richard Fuchs 5 years ago
parent
commit
92fb330a46
16 changed files with 1133 additions and 581 deletions
  1. +340
    -126
      daemon/call.c
  2. +63
    -47
      daemon/call_interfaces.c
  3. +9
    -4
      daemon/cdr.c
  4. +9
    -4
      daemon/cli.c
  5. +3
    -2
      daemon/codec.c
  6. +66
    -62
      daemon/dtmf.c
  7. +461
    -263
      daemon/media_socket.c
  8. +1
    -1
      daemon/recording.c
  9. +115
    -12
      daemon/redis.c
  10. +14
    -12
      daemon/rtcp.c
  11. +16
    -4
      daemon/ssrc.c
  12. +2
    -20
      daemon/statistics.c
  13. +24
    -19
      include/call.h
  14. +7
    -0
      include/media_socket.h
  15. +2
    -2
      kernel-module/xt_RTPENGINE.c
  16. +1
    -3
      t/test-transcode.c

+ 340
- 126
daemon/call.c View File

@ -10,7 +10,7 @@
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <stdbool.h>
#include <time.h>
#include <xmlrpc_client.h>
#include <sys/wait.h>
@ -378,7 +378,7 @@ fault:
void kill_calls_timer(GSList *list, const char *url) {
struct call *ca;
GList *csl;
struct call_monologue *cm, *cd;
struct call_monologue *cm;
char *url_prefix = NULL, *url_suffix = NULL;
struct xmlrpc_helper *xh = NULL;
char url_buf[128];
@ -445,25 +445,32 @@ void kill_calls_timer(GSList *list, const char *url) {
case XF_KAMAILIO:
for (csl = ca->monologues.head; csl; csl = csl->next) {
cm = csl->data;
cd = cm->active_dialogue;
if (!cm->tag.s || !cm->tag.len || !cd || !cd->tag.s || !cd->tag.len)
if (!cm->tag.s || !cm->tag.len)
continue;
str *from_tag = g_hash_table_lookup(dup_tags, &cd->tag);
if (from_tag && !str_cmp_str(from_tag, &cm->tag))
continue;
for (GList *sub = cm->subscribers.head; sub; sub = sub->next) {
struct call_subscription *cs = sub->data;
struct call_monologue *cd = cs->monologue;
if (!cd->tag.s || !cd->tag.len)
continue;
from_tag = str_dup(&cm->tag);
str *to_tag = str_dup(&cd->tag);
str *from_tag = g_hash_table_lookup(dup_tags, &cd->tag);
if (from_tag && !str_cmp_str(from_tag, &cm->tag))
continue;
g_queue_push_tail(&xh->strings,
strdup(url_buf));
g_queue_push_tail(&xh->strings,
str_dup(&ca->callid));
g_queue_push_tail(&xh->strings, from_tag);
g_queue_push_tail(&xh->strings, to_tag);
from_tag = str_dup(&cm->tag);
str *to_tag = str_dup(&cd->tag);
g_hash_table_insert(dup_tags, from_tag, to_tag);
g_queue_push_tail(&xh->strings,
strdup(url_buf));
g_queue_push_tail(&xh->strings,
str_dup(&ca->callid));
g_queue_push_tail(&xh->strings, from_tag);
g_queue_push_tail(&xh->strings, to_tag);
g_hash_table_insert(dup_tags, from_tag, to_tag);
}
}
break;
}
@ -521,7 +528,7 @@ static void call_timer(void *ptr) {
struct iterator_helper hlp;
GList *i, *l;
struct rtpengine_list_entry *ke;
struct packet_stream *ps, *sink;
struct packet_stream *ps;
struct stats tmpstats;
int j, update;
struct stream_fd *sfd;
@ -639,12 +646,17 @@ static void call_timer(void *ptr) {
if (diff_packets)
sfd->call->foreign_media = 0;
sink = packet_stream_sink(ps);
if (!ke->target.non_forwarding && diff_packets) {
// only check the first
struct rtpengine_output_info *o = &ke->outputs[0];
if (sink && o->src_addr.family) {
for (GList *l = ps->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;
if (sh->kernel_output_idx < 0
|| sh->kernel_output_idx >= ke->target.num_destinations)
continue;
struct rtpengine_output_info *o = &ke->outputs[sh->kernel_output_idx];
mutex_lock(&sink->out_lock);
if (sink->crypto.params.crypto_suite && sink->ssrc_out
&& ntohl(ke->target.ssrc) == sink->ssrc_out->parent->h.ssrc
@ -1179,6 +1191,28 @@ void __rtp_stats_update(GHashTable *dst, struct codec_store *cs) {
/* we leave previously added but now removed payload types in place */
}
void free_sink_handler(void *p) {
struct sink_handler *sh = p;
g_slice_free1(sizeof(*sh), sh);
}
void __add_sink_handler(GQueue *q, struct packet_stream *sink) {
struct sink_handler *sh = g_slice_alloc0(sizeof(*sh));
sh->sink = sink;
sh->kernel_output_idx = -1;
g_queue_push_tail(q, sh);
}
// called once before calling __init_streams once for each sink
static void __reset_streams(struct call_media *media) {
for (GList *l = media->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
g_queue_clear_full(&ps->rtp_sinks, free_sink_handler);
g_queue_clear_full(&ps->rtcp_sinks, free_sink_handler);
}
}
// called once on media A for each sink media B
// B can be NULL
// XXX this function seems to do two things - stream init (with B NULL) and sink init - split up?
static int __init_streams(struct call_media *A, struct call_media *B, const struct stream_params *sp,
const struct sdp_ng_flags *flags) {
GList *la, *lb;
@ -1194,11 +1228,13 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru
b = lb->data;
/* RTP */
a->rtp_sink = b;
// reflect media - pretent reflection also for blackhole, as otherwise
// reflect media - pretend reflection also for blackhole, as otherwise
// we get SSRC flip-flops on the opposite side
// XXX still necessary for blackhole?
if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE))
a->rtp_sink = a;
__add_sink_handler(&a->rtp_sinks, a);
else
__add_sink_handler(&a->rtp_sinks, b);
PS_SET(a, RTP); /* XXX technically not correct, could be udptl too */
__rtp_stats_update(a->rtp_stats, &A->codecs);
@ -1227,14 +1263,13 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru
b = lb->data;
}
if (!MEDIA_ISSET(A, RTCP_MUX)) {
a->rtcp_sink = NULL;
if (!MEDIA_ISSET(A, RTCP_MUX))
PS_CLEAR(a, RTCP);
}
else {
a->rtcp_sink = b;
if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE))
a->rtcp_sink = a->rtcp_sibling;
__add_sink_handler(&a->rtcp_sinks, a->rtcp_sibling);
else
__add_sink_handler(&a->rtcp_sinks, b);
PS_SET(a, RTCP);
PS_CLEAR(a, IMPLICIT_RTCP);
}
@ -1247,10 +1282,10 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru
assert(la != NULL);
a = la->data;
a->rtp_sink = NULL;
a->rtcp_sink = b;
if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE))
a->rtcp_sink = a;
__add_sink_handler(&a->rtcp_sinks, a);
else
__add_sink_handler(&a->rtcp_sinks, b);
PS_CLEAR(a, RTP);
PS_SET(a, RTCP);
a->rtcp_sibling = NULL;
@ -2211,16 +2246,68 @@ void codecs_offer_answer(struct call_media *media, struct call_media *other_medi
}
}
/* called with call->master_lock held in W */
int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams,
static void __update_init_subscribers(struct call_monologue *ml, GQueue *streams, struct sdp_ng_flags *flags) {
GList *sl = streams ? streams->head : NULL;
// create media iterators for all subscribers
GList *sub_medias[ml->subscribers.length];
unsigned int num_subs = 0;
for (GList *l = ml->subscribers.head; l; l = l->next) {
struct call_subscription *cs = l->data;
struct call_monologue *sub_ml = cs->monologue;
sub_medias[num_subs++] = sub_ml->medias.head;
}
// keep num_subs as shortcut to ml->subscribers.length
for (GList *l = ml->medias.head; l; l = l->next) {
struct call_media *media = l->data;
struct stream_params *sp = NULL;
if (sl) {
sp = sl->data;
sl = sl->next;
}
__ice_start(media);
// update all subscribers
__reset_streams(media);
for (unsigned int i = 0; i < num_subs; i++) {
if (!sub_medias[i])
continue;
struct call_media *sub_media = sub_medias[i]->data;
sub_medias[i] = sub_medias[i]->next;
if (__init_streams(media, sub_media, sp, flags))
ilog(LOG_WARN, "Error initialising streams");
}
// we are now ready to fire up ICE if so desired and requested
ice_update(media->ice_agent, sp); // sp == NULL: update in case rtcp-mux changed
recording_setup_media(media);
t38_gateway_start(media->t38_gateway);
if (mqtt_publish_scope() == MPS_MEDIA)
mqtt_timer_start(&media->mqtt_timer, media->call, media);
}
}
/* called with call->master_lock held in W */
int monologue_offer_answer(struct call_monologue *dialogue[2], GQueue *streams,
struct sdp_ng_flags *flags)
{
struct stream_params *sp;
GList *media_iter, *ml_media, *other_ml_media;
struct call_media *media, *other_media;
struct call_monologue *monologue;
struct endpoint_map *em;
struct call *call;
struct call_monologue *other_ml = dialogue[0];
struct call_monologue *monologue = dialogue[1];
/* we must have a complete dialogue, even though the to-tag (monologue->tag)
* may not be known yet */
@ -2229,7 +2316,6 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams,
return -1;
}
monologue = other_ml->active_dialogue;
call = monologue->call;
call->last_signal = MAX(call->last_signal, rtpe_now.tv_sec);
@ -2432,9 +2518,6 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams,
/* ICE stuff - must come after interface and address family selection */
__ice_offer(flags, media, other_media);
__ice_start(other_media);
__ice_start(media);
/* we now know what's being advertised by the other side */
@ -2448,7 +2531,7 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams,
* generate media (or RTCP packets) for that stream. */
__disable_streams(media, sp->num_ports);
__disable_streams(other_media, sp->num_ports);
goto init;
continue;
}
if (is_addr_unspecified(&sp->rtp_endpoint.address) && !MEDIA_ISSET(other_media, TRICKLE_ICE)) {
/* Zero endpoint address, equivalent to setting the media stream
@ -2478,26 +2561,11 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams,
if (__wildcard_endpoint_map(other_media, sp->num_ports))
goto error_ports;
}
init:
if (__init_streams(media, other_media, NULL, NULL))
return -1;
if (__init_streams(other_media, media, sp, flags))
return -1;
/* we are now ready to fire up ICE if so desired and requested */
ice_update(other_media->ice_agent, sp);
ice_update(media->ice_agent, NULL); /* this is in case rtcp-mux has changed */
recording_setup_media(media);
t38_gateway_start(media->t38_gateway);
if (mqtt_publish_scope() == MPS_MEDIA) {
mqtt_timer_start(&media->mqtt_timer, call, media);
mqtt_timer_start(&other_media->mqtt_timer, call, other_media);
}
}
__update_init_subscribers(other_ml, streams, flags);
__update_init_subscribers(monologue, NULL, NULL);
// set ipv4/ipv6/mixed media stats
if (flags && (flags->opmode == OP_OFFER || flags->opmode == OP_ANSWER)) {
statistics_update_ip46_inc_dec(call, CMC_INCREMENT);
@ -2515,6 +2583,73 @@ error_intf:
}
static void __unsubscribe_one_link(struct call_monologue *which, GList *which_cs_link) {
struct call_subscription *cs = which_cs_link->data;
struct call_subscription *rev_cs = cs->link->data;
struct call_monologue *from = cs->monologue;
ilog(LOG_DEBUG, "Unsubscribing '" STR_FORMAT_M "' from '" STR_FORMAT_M "'",
STR_FMT_M(&which->tag),
STR_FMT_M(&from->tag));
g_queue_delete_link(&from->subscribers, cs->link);
g_queue_delete_link(&which->subscriptions, which_cs_link);
g_slice_free1(sizeof(*cs), cs);
g_slice_free1(sizeof(*rev_cs), rev_cs);
}
//static void __unsubscribe_all(struct call_monologue *which) {
// while (which->subscriptions.head)
// __unsubscribe_one_link(which, which->subscriptions.head);
//}
static bool __unsubscribe_one(struct call_monologue *which, struct call_monologue *from) {
for (GList *l = which->subscriptions.head; l; l = l->next) {
struct call_subscription *cs = l->data;
if (cs->monologue != from)
continue;
__unsubscribe_one_link(which, l);
return true;
}
return false;
}
static void __unsubscribe_all_offer_answer(struct call_monologue *ml) {
for (GList *l = ml->subscriptions.head; l; ) {
struct call_subscription *cs = l->data;
if (!cs->offer_answer) {
l = l->next;
continue;
}
GList *next = l->next;
__unsubscribe_one_link(ml, l);
l = next;
}
}
void __add_subscription(struct call_monologue *which, struct call_monologue *to, bool offer_answer) {
ilog(LOG_DEBUG, "Subscribing '" STR_FORMAT_M "' to '" STR_FORMAT_M "'",
STR_FMT_M(&which->tag),
STR_FMT_M(&to->tag));
struct call_subscription *which_cs = g_slice_alloc0(sizeof(*which_cs));
struct call_subscription *to_rev_cs = g_slice_alloc0(sizeof(*to_rev_cs));
which_cs->monologue = to;
to_rev_cs->monologue = which;
// keep offer-answer subscriptions first in the list
if (!offer_answer) {
g_queue_push_tail(&which->subscriptions, which_cs);
g_queue_push_tail(&to->subscribers, to_rev_cs);
which_cs->link = to->subscribers.tail;
to_rev_cs->link = which->subscriptions.tail;
}
else {
g_queue_push_head(&which->subscriptions, which_cs);
g_queue_push_head(&to->subscribers, to_rev_cs);
which_cs->link = to->subscribers.head;
to_rev_cs->link = which->subscriptions.head;
}
which_cs->offer_answer = offer_answer ? 1 : 0;
to_rev_cs->offer_answer = which_cs->offer_answer;
}
static void __subscribe_only_one_offer_answer(struct call_monologue *which, struct call_monologue *to) {
__unsubscribe_all_offer_answer(which);
__add_subscription(which, to, true);
}
static int __rtp_stats_sort(const void *ap, const void *bp) {
const struct rtp_stats *a = ap, *b = bp;
@ -2602,8 +2737,8 @@ static void __call_cleanup(struct call *c) {
g_queue_clear(&ps->sfds);
crypto_cleanup(&ps->crypto);
ps->rtp_sink = NULL;
ps->rtcp_sink = NULL;
g_queue_clear_full(&ps->rtp_sinks, free_sink_handler);
g_queue_clear_full(&ps->rtcp_sinks, free_sink_handler);
}
for (GList *l = c->medias.head; l; l = l->next) {
@ -2718,18 +2853,21 @@ void call_destroy(struct call *c) {
// stats output only - no cleanups
ilog(LOG_INFO, "--- Tag '" STR_FORMAT_M "'%s"STR_FORMAT"%s, created "
"%u:%02u ago for branch '" STR_FORMAT_M "', in dialogue with '" STR_FORMAT_M "'",
"%u:%02u ago for branch '" STR_FORMAT_M "'",
STR_FMT_M(&ml->tag),
ml->label.s ? " (label '" : "",
STR_FMT(ml->label.s ? &ml->label : &STR_EMPTY),
ml->label.s ? "')" : "",
(unsigned int) (rtpe_now.tv_sec - ml->created) / 60,
(unsigned int) (rtpe_now.tv_sec - ml->created) % 60,
STR_FMT_M(&ml->viabranch),
ml->active_dialogue ? rtpe_common_config_ptr->log_mark_prefix : "",
ml->active_dialogue ? (int) ml->active_dialogue->tag.len : 6,
ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)",
ml->active_dialogue ? rtpe_common_config_ptr->log_mark_suffix : "");
STR_FMT_M(&ml->viabranch));
for (GList *sub = ml->subscriptions.head; sub; sub = sub->next) {
struct call_subscription *cs = sub->data;
struct call_monologue *csm = cs->monologue;
ilog(LOG_INFO, "--- subscribed to '" STR_FORMAT_M "'",
STR_FMT_M(&csm->tag));
}
for (k = ml->medias.head; k; k = k->next) {
md = k->data;
@ -3073,7 +3211,6 @@ void __monologue_tag(struct call_monologue *ml, const str *tag) {
}
void __monologue_viabranch(struct call_monologue *ml, const str *viabranch) {
struct call *call = ml->call;
struct call_monologue *other = ml->active_dialogue;
if (!viabranch || !viabranch->len)
return;
@ -3081,15 +3218,25 @@ void __monologue_viabranch(struct call_monologue *ml, const str *viabranch) {
__C_DBG("tagging monologue with viabranch '"STR_FORMAT"'", STR_FMT(viabranch));
if (ml->viabranch.s) {
g_hash_table_remove(call->viabranches, &ml->viabranch);
if (other)
g_hash_table_remove(other->branches, &ml->viabranch);
for (GList *sub = ml->subscribers.head; sub; sub = sub->next) {
struct call_subscription *cs = sub->data;
g_hash_table_remove(cs->monologue->branches, &ml->viabranch);
}
}
call_str_cpy(call, &ml->viabranch, viabranch);
g_hash_table_insert(call->viabranches, &ml->viabranch, ml);
if (other)
g_hash_table_insert(other->branches, &ml->viabranch, ml);
for (GList *sub = ml->subscribers.head; sub; sub = sub->next) {
struct call_subscription *cs = sub->data;
g_hash_table_insert(cs->monologue->branches, &ml->viabranch, ml);
}
}
static void __unconfirm_sinks(GQueue *q) {
for (GList *l = q->head; l; l = l->next) {
struct sink_handler *sh = l->data;
__stream_unconfirm(sh->sink);
}
}
/* must be called with call->master_lock held in W */
void __monologue_unkernelize(struct call_monologue *monologue) {
GList *l, *m;
@ -3108,14 +3255,30 @@ void __monologue_unkernelize(struct call_monologue *monologue) {
for (m = media->streams.head; m; m = m->next) {
stream = m->data;
__stream_unconfirm(stream);
if (stream->rtp_sink)
__stream_unconfirm(stream->rtp_sink);
if (stream->rtcp_sink)
__stream_unconfirm(stream->rtcp_sink);
__unconfirm_sinks(&stream->rtp_sinks);
__unconfirm_sinks(&stream->rtcp_sinks);
}
}
}
static void __dialogue_unkernelize(struct call_monologue *ml) {
__monologue_unkernelize(ml);
for (GList *sub = ml->subscriptions.head; sub; sub = sub->next) {
struct call_subscription *cs = sub->data;
__monologue_unkernelize(cs->monologue);
}
for (GList *sub = ml->subscribers.head; sub; sub = sub->next) {
struct call_subscription *cs = sub->data;
__monologue_unkernelize(cs->monologue);
}
}
static void __unkernelize_sinks(GQueue *q) {
for (GList *l = q->head; l; l = l->next) {
struct sink_handler *sh = l->data;
unkernelize(sh->sink);
}
}
/* call locked in R */
void call_media_unkernelize(struct call_media *media) {
GList *m;
@ -3124,8 +3287,8 @@ void call_media_unkernelize(struct call_media *media) {
for (m = media->streams.head; m; m = m->next) {
stream = m->data;
unkernelize(stream);
unkernelize(stream->rtp_sink);
unkernelize(stream->rtcp_sink);
__unkernelize_sinks(&stream->rtp_sinks);
__unkernelize_sinks(&stream->rtcp_sinks);
}
}
@ -3199,23 +3362,28 @@ static int monologue_destroy(struct call_monologue *ml) {
/* must be called with call->master_lock held in W */
static void __fix_other_tags(struct call_monologue *one) {
struct call_monologue *two;
if (!one || !one->tag.len)
return;
two = one->active_dialogue;
if (!two || !two->tag.len)
return;
g_hash_table_insert(one->other_tags, &two->tag, two);
g_hash_table_insert(two->other_tags, &one->tag, one);
for (GList *sub = one->subscribers.head; sub; sub = sub->next) {
struct call_subscription *cs = sub->data;
struct call_monologue *two = cs->monologue;
g_hash_table_insert(one->other_tags, &two->tag, two);
g_hash_table_insert(two->other_tags, &one->tag, one);
}
}
/* must be called with call->master_lock held in W */
static struct call_monologue *call_get_monologue(struct call *call, const str *fromtag, const str *totag,
struct call_monologue *call_get_monologue(struct call *call, const str *fromtag) {
return g_hash_table_lookup(call->tags, fromtag);
}
/* must be called with call->master_lock held in W */
static int call_get_monologue_new(struct call_monologue *dialogue[2], struct call *call,
const str *fromtag, const str *totag,
const str *viabranch)
{
struct call_monologue *ret, *os;
struct call_monologue *ret, *os = NULL;
__C_DBG("getting monologue for tag '"STR_FORMAT"' in call '"STR_FORMAT"'",
STR_FMT(fromtag), STR_FMT(&call->callid));
@ -3228,27 +3396,34 @@ static struct call_monologue *call_get_monologue(struct call *call, const str *f
__C_DBG("found existing monologue");
__monologue_unkernelize(ret);
__monologue_unkernelize(ret->active_dialogue);
for (GList *sub = ret->subscriptions.head; sub; sub = sub->next) {
struct call_subscription *cs = sub->data;
__monologue_unkernelize(cs->monologue);
}
if (!viabranch)
goto ok_check_tag;
/* check the viabranch. if it's not known, then this is a branched offer and we need
* to create a new "other side" for this branch. */
if (!ret->active_dialogue->viabranch.s) {
/* previous "other side" hasn't been tagged with the via-branch, so we'll just
* use this one and tag it */
__monologue_viabranch(ret->active_dialogue, viabranch);
goto ok_check_tag;
for (GList *sub = ret->subscribers.head; sub; sub = sub->next) {
struct call_subscription *cs = sub->data;
struct call_monologue *csm = cs->monologue;
/* check the viabranch. if it's not known, then this is a branched offer and we need
* to create a new "other side" for this branch. */
if (!csm->viabranch.s) {
/* previous "other side" hasn't been tagged with the via-branch, so we'll just
* use this one and tag it */
__monologue_viabranch(csm, viabranch);
goto ok_check_tag;
}
if (!str_cmp_str(&csm->viabranch, viabranch))
goto ok_check_tag; /* dialogue still intact */
}
if (!str_cmp_str(&ret->active_dialogue->viabranch, viabranch))
goto ok_check_tag; /* dialogue still intact */
os = g_hash_table_lookup(call->viabranches, viabranch);
if (os) {
/* previously seen branch. use it */
__monologue_unkernelize(os);
os->active_dialogue = ret;
ret->active_dialogue = os;
__subscribe_only_one_offer_answer(ret, os);
__subscribe_only_one_offer_answer(os, ret);
goto ok_check_tag;
}
@ -3257,21 +3432,31 @@ static struct call_monologue *call_get_monologue(struct call *call, const str *f
new_branch:
__C_DBG("create new \"other side\" monologue for viabranch "STR_FORMAT, STR_FMT0(viabranch));
os = __monologue_create(call);
ret->active_dialogue = os;
os->active_dialogue = ret;
__subscribe_only_one_offer_answer(ret, os);
__subscribe_only_one_offer_answer(os, ret);
__monologue_viabranch(os, viabranch);
ok_check_tag:
os = ret->active_dialogue;
if (totag && totag->s && !os->tag.s) {
__monologue_tag(os, totag);
__fix_other_tags(ret);
}
return ret;
for (GList *sub = ret->subscriptions.head; sub; sub = sub->next) {
struct call_subscription *cs = sub->data;
struct call_monologue *csm = cs->monologue;
if (!os)
os = csm;
if (totag && totag->s && !csm->tag.s) {
__monologue_tag(csm, totag);
__fix_other_tags(ret);
}
break; // there should only be one
// XXX check if there's more than a one-to-one mapping here?
}
dialogue[0] = ret;
dialogue[1] = os;
return 0;
}
/* must be called with call->master_lock held in W */
static struct call_monologue *call_get_dialogue(struct call *call, const str *fromtag, const str *totag,
static int call_get_dialogue(struct call_monologue *dialogue[2], struct call *call, const str *fromtag,
const str *totag,
const str *viabranch)
{
struct call_monologue *ft, *tt;
@ -3282,7 +3467,7 @@ static struct call_monologue *call_get_dialogue(struct call *call, const str *fr
/* we start with the to-tag. if it's not known, we treat it as a branched offer */
tt = g_hash_table_lookup(call->tags, totag);
if (!tt)
return call_get_monologue(call, fromtag, totag, viabranch);
return call_get_monologue_new(dialogue, call, fromtag, totag, viabranch);
/* if the from-tag is known already, return that */
ft = g_hash_table_lookup(call->tags, fromtag);
@ -3290,9 +3475,26 @@ static struct call_monologue *call_get_dialogue(struct call *call, const str *fr
__C_DBG("found existing dialogue");
/* make sure that the dialogue is actually intact */
/* fastpath for a common case */
if (!str_cmp_str(totag, &ft->active_dialogue->tag))
goto done;
if (ft->subscriptions.length != 1 || ft->subscribers.length != 1)
goto tag_setup;
if (tt->subscriptions.length != 1 || tt->subscribers.length != 1)
goto tag_setup;
struct call_subscription *cs = ft->subscriptions.head->data;
if (cs->monologue != tt)
goto tag_setup;
cs = ft->subscribers.head->data;
if (cs->monologue != tt)
goto tag_setup;
cs = tt->subscriptions.head->data;
if (cs->monologue != ft)
goto tag_setup;
cs = tt->subscribers.head->data;
if (cs->monologue != ft)
goto tag_setup;
goto done;
}
else {
/* perhaps we can determine the monologue from the viabranch */
@ -3303,37 +3505,44 @@ static struct call_monologue *call_get_dialogue(struct call *call, const str *fr
if (!ft) {
/* if we don't have a fromtag monologue yet, we can use a half-complete dialogue
* from the totag if there is one. otherwise we have to create a new one. */
ft = tt->active_dialogue;
if (ft->tag.s)
if (tt->subscriptions.head) {
struct call_subscription *cs = tt->subscriptions.head->data;
ft = cs->monologue;
}
if (!ft || ft->tag.s)
ft = __monologue_create(call);
}
tag_setup:
/* the fromtag monologue may be newly created, or half-complete from the totag, or
* derived from the viabranch. */
if (!ft->tag.s || str_cmp_str(&ft->tag, fromtag))
__monologue_tag(ft, fromtag);
__monologue_unkernelize(ft->active_dialogue);
__monologue_unkernelize(tt->active_dialogue);
ft->active_dialogue = tt;
tt->active_dialogue = ft;
__dialogue_unkernelize(ft);
__dialogue_unkernelize(tt);
__subscribe_only_one_offer_answer(ft, tt);
__subscribe_only_one_offer_answer(tt, ft);
__fix_other_tags(ft);
done:
__monologue_unkernelize(ft);
__monologue_unkernelize(ft->active_dialogue);
return ft;
__dialogue_unkernelize(ft);
dialogue[0] = ft;
dialogue[1] = tt;
return 0;
}
/* fromtag and totag strictly correspond to the directionality of the message, not to the actual
* SIP headers. IOW, the fromtag corresponds to the monologue sending this message, even if the
* tag is actually from the TO header of the SIP message (as it would be in a 200 OK) */
struct call_monologue *call_get_mono_dialogue(struct call *call, const str *fromtag, const str *totag,
int call_get_mono_dialogue(struct call_monologue *dialogue[2], struct call *call, const str *fromtag,
const str *totag,
const str *viabranch)
{
if (!totag || !totag->s) /* initial offer */
return call_get_monologue(call, fromtag, NULL, viabranch);
return call_get_dialogue(call, fromtag, totag, viabranch);
return call_get_monologue_new(dialogue, call, fromtag, NULL, viabranch);
return call_get_dialogue(dialogue, call, fromtag, totag, viabranch);
}
@ -3403,8 +3612,11 @@ int call_delete_branch(const str *callid, const str *branch,
// if the associated dialogue has an empty tag (unknown)
if (match_tag == totag) {
ml = g_hash_table_lookup(c->tags, fromtag);
if (ml && ml->active_dialogue && ml->active_dialogue->tag.len == 0)
goto do_delete;
if (ml && ml->subscriptions.length == 1) {
struct call_subscription *cs = ml->subscriptions.head->data;
if (cs->monologue->tag.len == 0)
goto do_delete;
}
}
ilog(LOG_INFO, "Tag '"STR_FORMAT"' in delete message not found, ignoring",
@ -3417,8 +3629,10 @@ do_delete:
ng_call_stats(c, fromtag, totag, output, NULL);
monologue_stop(ml);
if (ml->active_dialogue && ml->active_dialogue->active_dialogue == ml)
monologue_stop(ml->active_dialogue);
for (GList *l = ml->subscribers.head; l; l = l->next) {
struct call_subscription *cs = l->data;
monologue_stop(cs->monologue);
}
if (delete_delay > 0) {
ilog(LOG_INFO, "Scheduling deletion of call branch '" STR_FORMAT_M "' "


+ 63
- 47
daemon/call_interfaces.c View File

@ -170,7 +170,7 @@ static str *call_update_lookup_udp(char **out, enum call_opmode opmode, const ch
const endpoint_t *sin)
{
struct call *c;
struct call_monologue *monologue;
struct call_monologue *dialogue[2];
GQueue q = G_QUEUE_INIT;
struct stream_params sp;
str *ret, callid, viabranch, fromtag, totag = STR_NULL;
@ -195,33 +195,32 @@ static str *call_update_lookup_udp(char **out, enum call_opmode opmode, const ch
c->created_from_addr = sin->address;
}
monologue = call_get_mono_dialogue(c, &fromtag, &totag, NULL);
if (!monologue)
if (call_get_mono_dialogue(dialogue, c, &fromtag, &totag, NULL))
goto ml_fail;
if (opmode == OP_OFFER) {
monologue->tagtype = FROM_TAG;
dialogue[0]->tagtype = FROM_TAG;
} else {
monologue->tagtype = TO_TAG;
dialogue[0]->tagtype = TO_TAG;
}
if (addr_parse_udp(&sp, out))
goto addr_fail;
g_queue_push_tail(&q, &sp);
i = monologue_offer_answer(monologue, &q, NULL);
i = monologue_offer_answer(dialogue, &q, NULL);
g_queue_clear(&q);
if (i)
goto unlock_fail;
ret = streams_print(&monologue->active_dialogue->medias,
ret = streams_print(&dialogue[1]->medias,
sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP);
rwlock_unlock_w(&c->master_lock);
redis_update_onekey(c, rtpe_redis_write);
gettimeofday(&(monologue->started), NULL);
gettimeofday(&(dialogue[0]->started), NULL);
ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT(ret));
goto out;
@ -307,7 +306,7 @@ static void streams_parse(const char *s, GQueue *q) {
static str *call_request_lookup_tcp(char **out, enum call_opmode opmode) {
struct call *c;
struct call_monologue *monologue;
struct call_monologue *dialogue[2];
GQueue s = G_QUEUE_INIT;
str *ret = NULL, callid, fromtag, totag = STR_NULL;
GHashTable *infohash;
@ -336,15 +335,14 @@ static str *call_request_lookup_tcp(char **out, enum call_opmode opmode) {
str_swap(&fromtag, &totag);
}
monologue = call_get_mono_dialogue(c, &fromtag, &totag, NULL);
if (!monologue) {
if (call_get_mono_dialogue(dialogue, c, &fromtag, &totag, NULL)) {
ilog(LOG_WARNING, "Invalid dialogue association");
goto out2;
}
if (monologue_offer_answer(monologue, &s, NULL))
if (monologue_offer_answer(dialogue, &s, NULL))
goto out2;
ret = streams_print(&monologue->active_dialogue->medias, 1, s.length, NULL, SAF_TCP);
ret = streams_print(&dialogue[1]->medias, 1, s.length, NULL, SAF_TCP);
out2:
rwlock_unlock_w(&c->master_lock);
@ -1265,11 +1263,11 @@ static void queue_sdp_fragment(struct ng_buffer *ngbuf, GQueue *streams, struct
mutex_unlock(&sdp_fragments_lock);
}
#define MAX_FRAG_AGE 3000000
static void dequeue_sdp_fragments(struct call_monologue *monologue) {
static void dequeue_sdp_fragments(struct call_monologue *dialogue[2]) {
struct fragment_key k;
ZERO(k);
k.call_id = monologue->call->callid;
k.from_tag = monologue->tag;
k.call_id = dialogue[0]->call->callid;
k.from_tag = dialogue[0]->tag;
mutex_lock(&sdp_fragments_lock);
GQueue *frags = g_hash_table_lookup(sdp_fragments, &k);
@ -1290,7 +1288,7 @@ static void dequeue_sdp_fragments(struct call_monologue *monologue) {
ilog(LOG_DEBUG, "Dequeuing SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M,
STR_FMT_M(&k.call_id), STR_FMT_M(&k.from_tag));
monologue_offer_answer(monologue, &frag->streams, &frag->flags);
monologue_offer_answer(dialogue, &frag->streams, &frag->flags);
next:
fragment_free(frag);
@ -1331,7 +1329,7 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t
GQueue parsed = G_QUEUE_INIT;
GQueue streams = G_QUEUE_INIT;
struct call *call;
struct call_monologue *monologue;
struct call_monologue *dialogue[2];
int ret;
struct sdp_ng_flags flags;
struct sdp_chopper *chopper;
@ -1409,19 +1407,18 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t
* need to hold a ref until we're done sending the reply */
call_bencode_hold_ref(call, output);
monologue = call_get_mono_dialogue(call, &flags.from_tag, &flags.to_tag,
flags.via_branch.s ? &flags.via_branch : NULL);
errstr = "Invalid dialogue association";
if (!monologue) {
if (call_get_mono_dialogue(dialogue, call, &flags.from_tag, &flags.to_tag,
flags.via_branch.s ? &flags.via_branch : NULL)) {
rwlock_unlock_w(&call->master_lock);
obj_put(call);
goto out;
}
if (opmode == OP_OFFER) {
monologue->tagtype = FROM_TAG;
dialogue[0]->tagtype = FROM_TAG;
} else {
monologue->tagtype = TO_TAG;
dialogue[0]->tagtype = TO_TAG;
}
chopper = sdp_chopper_new(&flags.sdp);
@ -1443,11 +1440,11 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t
int do_dequeue = 1;
ret = monologue_offer_answer(monologue, &streams, &flags);
ret = monologue_offer_answer(dialogue, &streams, &flags);
if (!ret) {
// SDP fragments for trickle ICE are consumed with no replacement returned
if (!flags.fragment)
ret = sdp_replace(chopper, &parsed, monologue->active_dialogue, &flags);
ret = sdp_replace(chopper, &parsed, dialogue[1], &flags);
}
else if (ret == ERROR_NO_ICE_AGENT && flags.fragment) {
queue_sdp_fragment(ngbuf, &streams, &flags);
@ -1459,15 +1456,15 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t
struct recording *recording = call->recording;
if (recording != NULL) {
meta_write_sdp_before(recording, &flags.sdp, monologue, opmode);
meta_write_sdp_before(recording, &flags.sdp, dialogue[0], opmode);
meta_write_sdp_after(recording, chopper->output,
monologue, opmode);
dialogue[0], opmode);
recording_response(recording, output);
}
if (do_dequeue)
dequeue_sdp_fragments(monologue);
dequeue_sdp_fragments(dialogue);
rwlock_unlock_w(&call->master_lock);
@ -1478,7 +1475,7 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t
}
obj_put(call);
gettimeofday(&(monologue->started), NULL);
gettimeofday(&(dialogue[0]->started), NULL);
errstr = "Error rewriting SDP";
@ -1697,8 +1694,20 @@ static void ng_stats_monologue(bencode_item_t *dict, const struct call_monologue
if (ml->label.s)
bencode_dictionary_add_str(sub, "label", &ml->label);
bencode_dictionary_add_integer(sub, "created", ml->created);
if (ml->active_dialogue)
bencode_dictionary_add_str(sub, "in dialogue with", &ml->active_dialogue->tag);
bencode_item_t *subs = bencode_dictionary_add_list(sub, "subscriptions");
for (GList *l = ml->subscriptions.head; l; l = l->next) {
struct call_subscription *cs = l->data;
bencode_item_t *sub1 = bencode_list_add_dictionary(subs);
bencode_dictionary_add_str(sub1, "tag", &cs->monologue->tag);
bencode_dictionary_add_string(sub1, "type", cs->offer_answer ? "offer/answer" : "pub/sub");
}
subs = bencode_dictionary_add_list(sub, "subscribers");
for (GList *l = ml->subscribers.head; l; l = l->next) {
struct call_subscription *cs = l->data;
bencode_item_t *sub1 = bencode_list_add_dictionary(subs);
bencode_dictionary_add_str(sub1, "tag", &cs->monologue->tag);
bencode_dictionary_add_string(sub1, "type", cs->offer_answer ? "offer/answer" : "pub/sub");
}
ng_stats_ssrc(bencode_dictionary_add_dictionary(sub, "SSRC"), ml->ssrc_hash);
medias = bencode_dictionary_add_list(sub, "medias");
@ -1818,7 +1827,10 @@ stats:
ml = g_hash_table_lookup(call->tags, match_tag);
if (ml) {
ng_stats_monologue(tags, ml, totals);
ng_stats_monologue(tags, ml->active_dialogue, totals);
for (GList *l = ml->subscriptions.head; l; l = l->next) {
struct call_subscription *cs = l->data;
ng_stats_monologue(tags, cs->monologue, totals);
}
}
}
@ -1977,9 +1989,10 @@ found:
;
}
else if (flags->from_tag.s) {
*monologue = call_get_mono_dialogue(*call, &flags->from_tag, NULL, NULL);
*monologue = call_get_monologue(*call, &flags->from_tag);
if (!*monologue)
return "From-tag given, but no such tag exists";
__monologue_unkernelize(*monologue);
}
return NULL;
@ -2382,23 +2395,26 @@ const char *call_play_dtmf_ng(bencode_item_t *input, bencode_item_t *output) {
// XXX fall back to generating a secondary stream
goto out;
found:;
struct call_monologue *dialogue = monologue->active_dialogue;
struct call_media *sink = NULL;
for (GList *l = dialogue->medias.head; l; l = l->next) {
sink = l->data;
if (media->type_id != MT_AUDIO)
continue;
goto found_sink;
}
found:
for (GList *k = monologue->subscribers.head; k; k = k->next) {
struct call_subscription *cs = k->data;
struct call_monologue *dialogue = cs->monologue;
struct call_media *sink = NULL;
for (GList *m = dialogue->medias.head; m; m = m->next) {
sink = m->data;
if (media->type_id != MT_AUDIO)
continue;
goto found_sink;
}
err = "Sink monologue has no media capable of DTMF playback";
goto out;
err = "Sink monologue has no media capable of DTMF playback";
goto out;
found_sink:
err = dtmf_inject(media, code, volume, duration, pause, sink);
if (err)
break;
err = dtmf_inject(media, code, volume, duration, pause, sink);
if (err)
break;
}
}
out:


+ 9
- 4
daemon/cdr.c View File

@ -74,15 +74,20 @@ void cdr_update_entry(struct call* c) {
"ml%i_duration=%ld.%06ld, "
"ml%i_termination=%s, "
"ml%i_local_tag=%s, "
"ml%i_local_tag_type=%s, "
"ml%i_remote_tag=%s, ",
"ml%i_local_tag_type=%s, ",
cdrlinecnt, ml->started.tv_sec, ml->started.tv_usec,
cdrlinecnt, ml->terminated.tv_sec, ml->terminated.tv_usec,
cdrlinecnt, tim_result_duration.tv_sec, tim_result_duration.tv_usec,
cdrlinecnt, get_term_reason_text(ml->term_reason),
cdrlinecnt, ml->tag.s,
cdrlinecnt, get_tag_type_text(ml->tagtype),
cdrlinecnt, ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)");
cdrlinecnt, get_tag_type_text(ml->tagtype));
for (k = ml->subscriptions.head; k; k = k->next) {
struct call_subscription *cs = k->data;
g_string_append_printf(cdr,
"ml%i_remote_tag=%s, ",
cdrlinecnt, cs->monologue->tag.s);
}
}
for (k = ml->medias.head; k; k = k->next) {


+ 9
- 4
daemon/cli.c View File

@ -571,14 +571,19 @@ static void cli_incoming_list_callid(str *instr, struct cli_writer *cw) {
cw->cw_printf(cw, "--- Tag '" STR_FORMAT "', type: %s, label '" STR_FORMAT "', "
"branch '" STR_FORMAT "', "
"callduration "
"%ld.%06ld, in dialogue with '" STR_FORMAT "'\n",
"%ld.%06ld\n",
STR_FMT(&ml->tag), get_tag_type_text(ml->tagtype),
STR_FMT(ml->label.s ? &ml->label : &STR_EMPTY),
STR_FMT(&ml->viabranch),
tim_result_duration.tv_sec,
tim_result_duration.tv_usec,
ml->active_dialogue ? (int) ml->active_dialogue->tag.len : 6,
ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)");
tim_result_duration.tv_usec);
for (GList *sub = ml->subscriptions.head; sub; sub = sub->next) {
struct call_subscription *cs = sub->data;
struct call_monologue *csm = cs->monologue;
cw->cw_printf(cw, "--- subscribed to '" STR_FORMAT_M "'\n",
STR_FMT_M(&csm->tag));
}
for (k = ml->medias.head; k; k = k->next) {
md = k->data;


+ 3
- 2
daemon/codec.c View File

@ -2265,12 +2265,13 @@ static void __dtx_send_later(struct codec_timer *ct) {
__ssrc_unlock_both(&mp_copy);
if (mp_copy.packets_out.length && ret == 0) {
struct packet_stream *sink = ps->rtp_sink;
struct sink_handler *sh = &mp_copy.sink;
struct packet_stream *sink = sh->sink;
if (!sink)
media_socket_dequeue(&mp_copy, NULL); // just free
else {
if (ps->handler && media_packet_encrypt(ps->handler->out->rtp_crypt, sink, &mp_copy))
if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, &mp_copy))
ilogs(dtx, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");
mutex_lock(&sink->out_lock);


+ 66
- 62
daemon/dtmf.c View File

@ -236,69 +236,73 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_media *
{
struct call *call = monologue->call;
if (!media->monologue || !media->monologue->active_dialogue)
return "No dialogue association";
struct ssrc_ctx *ssrc_out = get_ssrc_ctx(ssrc_in->ssrc_map_out,
media->monologue->active_dialogue->ssrc_hash, SSRC_DIR_OUTPUT,
monologue);
if (!ssrc_out)
return "No output SSRC context present"; // XXX generate stream
int duration_samples = duration * ch->dest_pt.clock_rate / 1000;
int pause_samples = pause * ch->dest_pt.clock_rate / 1000;
// we generate PCM DTMF by simulating a detected RFC event packet
// XXX this shouldn't require faking an actual RTP packet
struct telephone_event_payload tep = {
.event = code,
.volume = -1 * volume,
.end = 1,
.duration = htons(duration_samples),
};
struct rtp_header rtp = {
.m_pt = 0xff,
.timestamp = 0,
.seq_num = htons(ssrc_in->parent->sequencer.seq),
.ssrc = htonl(ssrc_in->parent->h.ssrc),
};
struct media_packet packet = {
.tv = rtpe_now,
.call = call,
.media = media,
.media_out = sink,
.rtp = &rtp,
.ssrc_in = ssrc_in,
.ssrc_out = ssrc_out,
.raw = { (void *) &tep, sizeof(tep) },
.payload = { (void *) &tep, sizeof(tep) },
};
// keep track of how much PCM we've generated
uint64_t encoder_pts = codec_encoder_pts(csh);
uint64_t skip_pts = codec_decoder_unskip_pts(csh); // reset to zero to take up our new samples
ch->dtmf_injector->func(ch->dtmf_injector, &packet);
// insert pause
tep.event = 0xff;
tep.duration = htons(pause_samples);
rtp.seq_num = htons(ssrc_in->parent->sequencer.seq);
ch->dtmf_injector->func(ch->dtmf_injector, &packet);
// skip generated samples
uint64_t pts_offset = codec_encoder_pts(csh) - encoder_pts;
skip_pts += av_rescale(pts_offset, ch->dest_pt.clock_rate, ch->source_pt.clock_rate);
codec_decoder_skip_pts(csh, skip_pts);
// ready packets for send
// XXX handle encryption?
media_socket_dequeue(&packet, packet_stream_sink(ps));
for (GList *l = ps->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink_ps = sh->sink;
struct call_monologue *sink_ml = sink_ps->media->monologue;
struct ssrc_ctx *ssrc_out = get_ssrc_ctx(ssrc_in->ssrc_map_out,
sink_ml->ssrc_hash, SSRC_DIR_OUTPUT,
monologue);
if (!ssrc_out)
return "No output SSRC context present"; // XXX generate stream
int duration_samples = duration * ch->dest_pt.clock_rate / 1000;
int pause_samples = pause * ch->dest_pt.clock_rate / 1000;
// we generate PCM DTMF by simulating a detected RFC event packet
// XXX this shouldn't require faking an actual RTP packet
struct telephone_event_payload tep = {
.event = code,
.volume = -1 * volume,
.end = 1,
.duration = htons(duration_samples),
};
struct rtp_header rtp = {
.m_pt = 0xff,
.timestamp = 0,
.seq_num = htons(ssrc_in->parent->sequencer.seq),
.ssrc = htonl(ssrc_in->parent->h.ssrc),
};
struct media_packet packet = {
.tv = rtpe_now,
.call = call,
.media = media,
.media_out = sink,
.rtp = &rtp,
.ssrc_in = ssrc_in,
.ssrc_out = ssrc_out,
.raw = { (void *) &tep, sizeof(tep) },
.payload = { (void *) &tep, sizeof(tep) },
};
// keep track of how much PCM we've generated
uint64_t encoder_pts = codec_encoder_pts(csh);
uint64_t skip_pts = codec_decoder_unskip_pts(csh); // reset to zero to take up our new samples
ch->dtmf_injector->func(ch->dtmf_injector, &packet);
// insert pause
tep.event = 0xff;
tep.duration = htons(pause_samples);
rtp.seq_num = htons(ssrc_in->parent->sequencer.seq);
ch->dtmf_injector->func(ch->dtmf_injector, &packet);
// skip generated samples
uint64_t pts_offset = codec_encoder_pts(csh) - encoder_pts;
skip_pts += av_rescale(pts_offset, ch->dest_pt.clock_rate, ch->source_pt.clock_rate);
codec_decoder_skip_pts(csh, skip_pts);
// ready packets for send
// XXX handle encryption?
media_socket_dequeue(&packet, sink_ps);
obj_put_o((struct obj *) csh);
ssrc_ctx_put(&ssrc_out);
}
obj_put_o((struct obj *) csh);
ssrc_ctx_put(&ssrc_out);
return 0;
}


+ 461
- 263
daemon/media_socket.c
File diff suppressed because it is too large
View File


+ 1
- 1
daemon/recording.c View File

@ -302,7 +302,7 @@ void recording_start(struct call *call, const char *prefix, str *metadata) {
struct packet_stream *ps = l->data;
recording_setup_stream(ps);
__unkernelize(ps);
ps->handler = NULL;
__reset_sink_handlers(ps);
}
recording_update_flags(call);


+ 115
- 12
daemon/redis.c View File

@ -1568,7 +1568,32 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_
for (i = 0; i < tags->len; i++) {
ml = tags->ptrs[i];
ml->active_dialogue = redis_list_get_ptr(tags, &tags->rh[i], "active");
if (json_build_list(&q, c, "subscriptions-oa", &c->callid, i, tags, root_reader))
return -1;
for (l = q.head; l; l = l->next) {
other_ml = l->data;
if (!other_ml)
return -1;
__add_subscription(ml, other_ml, true);
}
g_queue_clear(&q);
if (json_build_list(&q, c, "subscriptions-noa", &c->callid, i, tags, root_reader))
return -1;
for (l = q.head; l; l = l->next) {
other_ml = l->data;
if (!other_ml)
return -1;
__add_subscription(ml, other_ml, false);
}
g_queue_clear(&q);
// backwards compatibility
if (!ml->subscriptions.length) {
other_ml = redis_list_get_ptr(tags, &tags->rh[i], "active");
if (other_ml)
__add_subscription(ml, other_ml, true);
}
if (json_build_list(&q, c, "other_tags", &c->callid, i, tags, root_reader))
return -1;
@ -1602,19 +1627,53 @@ static int json_link_streams(struct call *c, struct redis_list *streams,
{
unsigned int i;
struct packet_stream *ps;
GQueue q = G_QUEUE_INIT;
GList *l;
for (i = 0; i < streams->len; i++) {
ps = streams->ptrs[i];
ps->media = redis_list_get_ptr(medias, &streams->rh[i], "media");
ps->selected_sfd = redis_list_get_ptr(sfds, &streams->rh[i], "sfd");
ps->rtp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtp_sink");
ps->rtcp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sink");
ps->rtcp_sibling = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sibling");
if (json_build_list(&ps->sfds, c, "stream_sfds", &c->callid, i, sfds, root_reader))
return -1;
if (json_build_list(&q, c, "rtp_sinks", &c->callid, i, streams, root_reader))
return -1;
for (l = q.head; l; l = l->next) {
struct packet_stream *sink = l->data;
if (!sink)
return -1;
__add_sink_handler(&ps->rtp_sinks, sink);
}
g_queue_clear(&q);
// backwards compatibility
if (!ps->rtp_sinks.length) {
struct packet_stream *sink = redis_list_get_ptr(streams, &streams->rh[i], "rtp_sink");
if (sink)
__add_sink_handler(&ps->rtp_sinks, sink);
}
if (json_build_list(&q, c, "rtcp_sinks", &c->callid, i, streams, root_reader))
return -1;
for (l = q.head; l; l = l->next) {
struct packet_stream *sink = l->data;
if (!sink)
return -1;
__add_sink_handler(&ps->rtcp_sinks, sink);
}
g_queue_clear(&q);
// backwards compatibility
if (!ps->rtcp_sinks.length) {
struct packet_stream *sink = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sink");
if (sink)
__add_sink_handler(&ps->rtcp_sinks, sink);
}
if (ps->media)
__rtp_stats_update(ps->rtp_stats, &ps->media->codecs);
@ -1646,12 +1705,15 @@ static int json_link_medias(struct call *c, struct redis_list *medias,
// find the pair media
struct call_monologue *ml = med->monologue;
struct call_monologue *other_ml = ml->active_dialogue;
for (GList *l = other_ml->medias.head; l; l = l->next) {
struct call_media *other_m = l->data;
if (other_m->index == med->index) {
codec_handlers_update(med, other_m, NULL, NULL);
break;
for (GList *sub = ml->subscriptions.head; sub; sub = sub->next) {
struct call_subscription *cs = sub->data;
struct call_monologue *other_ml = cs->monologue;
for (GList *l = other_ml->medias.head; l; l = l->next) {
struct call_media *other_m = l->data;
if (other_m->index == med->index) {
codec_handlers_update(med, other_m, NULL, NULL);
break;
}
}
}
}
@ -2170,8 +2232,6 @@ char* redis_encode_json(struct call *c) {
{
JSON_SET_SIMPLE("media","%u",ps->media->unique_id);
JSON_SET_SIMPLE("sfd","%u",ps->selected_sfd ? ps->selected_sfd->unique_id : -1);
JSON_SET_SIMPLE("rtp_sink","%u",ps->rtp_sink ? ps->rtp_sink->unique_id : -1);
JSON_SET_SIMPLE("rtcp_sink","%u",ps->rtcp_sink ? ps->rtcp_sink->unique_id : -1);
JSON_SET_SIMPLE("rtcp_sibling","%u",ps->rtcp_sibling ? ps->rtcp_sibling->unique_id : -1);
JSON_SET_SIMPLE("last_packet",UINT64F,atomic64_get(&ps->last_packet));
JSON_SET_SIMPLE("ps_flags","%u",ps->ps_flags);
@ -2196,6 +2256,7 @@ char* redis_encode_json(struct call *c) {
for (l = c->streams.head; l; l = l->next) {
ps = l->data;
// XXX these should all go into the above loop
mutex_lock(&ps->in_lock);
mutex_lock(&ps->out_lock);
@ -2209,6 +2270,26 @@ char* redis_encode_json(struct call *c) {
}
json_builder_end_array (builder);
snprintf(tmp, sizeof(tmp), "rtp_sinks-%u", ps->unique_id);
json_builder_set_member_name(builder, tmp);
json_builder_begin_array(builder);
for (k = ps->rtp_sinks.head; k; k = k->next) {
struct sink_handler *sh = k->data;
struct packet_stream *sink = sh->sink;
JSON_ADD_STRING("%u", sink->unique_id);
}
json_builder_end_array (builder);
snprintf(tmp, sizeof(tmp), "rtcp_sinks-%u", ps->unique_id);
json_builder_set_member_name(builder, tmp);
json_builder_begin_array(builder);
for (k = ps->rtcp_sinks.head; k; k = k->next) {
struct sink_handler *sh = k->data;
struct packet_stream *sink = sh->sink;
JSON_ADD_STRING("%u", sink->unique_id);
}
json_builder_end_array (builder);
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->out_lock);
}
@ -2224,7 +2305,6 @@ char* redis_encode_json(struct call *c) {
{
JSON_SET_SIMPLE("created","%llu",(long long unsigned) ml->created);
JSON_SET_SIMPLE("active","%u",ml->active_dialogue ? ml->active_dialogue->unique_id : -1);
JSON_SET_SIMPLE("deleted","%llu",(long long unsigned) ml->deleted);
JSON_SET_SIMPLE("block_dtmf","%i",ml->block_dtmf ? 1 : 0);
JSON_SET_SIMPLE("block_media","%i",ml->block_media ? 1 : 0);
@ -2245,6 +2325,7 @@ char* redis_encode_json(struct call *c) {
for (l = c->monologues.head; l; l = l->next) {
ml = l->data;
// -- we do it again here since the jsonbuilder is linear straight forward
// XXX these should all go into the above loop
k = g_hash_table_get_values(ml->other_tags);
snprintf(tmp, sizeof(tmp), "other_tags-%u", ml->unique_id);
json_builder_set_member_name(builder, tmp);
@ -2304,6 +2385,28 @@ char* redis_encode_json(struct call *c) {
g_list_free(k);
rwlock_unlock_r(&ml->ssrc_hash->lock);
snprintf(tmp, sizeof(tmp), "subscriptions-oa-%u", ml->unique_id);
json_builder_set_member_name(builder, tmp);
json_builder_begin_array(builder);
for (k = ml->subscriptions.head; k; k = k->next) {
struct call_subscription *cs = k->data;
if (!cs->offer_answer)
continue;
JSON_ADD_STRING("%u", cs->monologue->unique_id);
}
json_builder_end_array(builder);
snprintf(tmp, sizeof(tmp), "subscriptions-noa-%u", ml->unique_id);
json_builder_set_member_name(builder, tmp);
json_builder_begin_array(builder);
for (k = ml->subscriptions.head; k; k = k->next) {
struct call_subscription *cs = k->data;
if (cs->offer_answer)
continue;
JSON_ADD_STRING("%u", cs->monologue->unique_id);
}
json_builder_end_array(builder);
}


+ 14
- 12
daemon/rtcp.c View File

@ -1567,15 +1567,7 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
ps = next_ps;
}
struct call_media *other_media = NULL;
if (ps->rtp_sink)
other_media = ps->rtp_sink->media;
else if (ps->rtcp_sink)
other_media = ps->rtcp_sink->media;
media_update_stats(media);
if (other_media)
media_update_stats(other_media);
log_info_stream_fd(ps->selected_sfd);
@ -1598,12 +1590,22 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
socket_sendto(&ps->selected_sfd->socket, sr->str, sr->len, &ps->endpoint);
g_string_free(sr, TRUE);
if (other_media)
GQueue *sinks = ps->rtp_sinks.length ? &ps->rtp_sinks : &ps->rtcp_sinks;
for (GList *l = sinks->head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;
struct call_media *other_media = sink->media;
media_update_stats(other_media);
ssrc_sender_report(other_media, &ssr, &rtpe_now);
struct ssrc_receiver_report *srr;
while ((srr = g_queue_pop_head(&srrs))) {
if (other_media)
for (GList *k = srrs.head; k; k = k->next) {
struct ssrc_receiver_report *srr = k->data;
ssrc_receiver_report(other_media, srr, &rtpe_now);
}
}
while (srrs.length) {
struct ssrc_receiver_report *srr = g_queue_pop_head(&srrs);
g_slice_free1(sizeof(*srr), srr);
}
}


+ 16
- 4
daemon/ssrc.c View File

@ -88,7 +88,7 @@ static void mos_calc(struct ssrc_stats_block *ssb) {
ssb->mos = intmos;
}
static struct ssrc_entry *find_ssrc(uint32_t ssrc, struct ssrc_hash *ht) {
static void *find_ssrc(uint32_t ssrc, struct ssrc_hash *ht) {
rwlock_lock_r(&ht->lock);
struct ssrc_entry *ret = g_atomic_pointer_get(&ht->cache);
if (!ret || ret->ssrc != ssrc) {
@ -253,6 +253,18 @@ static void *__do_time_report_item(struct call_media *m, size_t struct_size, siz
return sti;
}
// call must be locked in R
static struct ssrc_entry_call *hunt_ssrc(struct call_monologue *ml, uint32_t ssrc) {
for (GList *l = ml->subscriptions.head; l; l = l->next) {
struct call_subscription *cs = l->data;
struct call_monologue *other = cs->monologue;
struct ssrc_entry_call *e = find_ssrc(ssrc, other->ssrc_hash);
if (e)
return e;
}
return NULL;
}
static long long __calc_rtt(struct call_monologue *ml, uint32_t ssrc, uint32_t ntp_middle_bits,
uint32_t delay, size_t reports_queue_offset, const struct timeval *tv, int *pt_p)
{
@ -262,7 +274,7 @@ static long long __calc_rtt(struct call_monologue *ml, uint32_t ssrc, uint32_t n
if (!ntp_middle_bits || !delay)
return 0;
struct ssrc_entry_call *e = get_ssrc(ssrc, ml->ssrc_hash);
struct ssrc_entry_call *e = hunt_ssrc(ml, ssrc);
if (G_UNLIKELY(!e))
return 0;
@ -334,7 +346,7 @@ void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_repor
int pt;
long long rtt = __calc_rtt(m->monologue->active_dialogue, rr->ssrc, rr->lsr, rr->dlsr,
long long rtt = __calc_rtt(m->monologue, rr->ssrc, rr->lsr, rr->dlsr,
G_STRUCT_OFFSET(struct ssrc_entry_call, sender_reports), tv, &pt);
struct ssrc_entry_call *other_e = get_ssrc(rr->from, m->monologue->ssrc_hash);
@ -440,7 +452,7 @@ void ssrc_receiver_dlrr(struct call_media *m, const struct ssrc_xr_dlrr *dlrr,
FMT_M(dlrr->from), FMT_M(dlrr->ssrc),
dlrr->lrr, dlrr->dlrr);
__calc_rtt(m->monologue->active_dialogue, dlrr->ssrc, dlrr->lrr, dlrr->dlrr,
__calc_rtt(m->monologue, dlrr->ssrc, dlrr->lrr, dlrr->dlrr,
G_STRUCT_OFFSET(struct ssrc_entry_call, rr_time_reports), tv, NULL);
}


+ 2
- 20
daemon/statistics.c View File

@ -236,26 +236,8 @@ void statistics_update_oneway(struct call* c) {
}
if (!found)
ps = NULL;
found = 0;
if (ml->active_dialogue) {
// --- go through partner ml and search the RTP
for (k = ml->active_dialogue->medias.head; k; k = k->next) {
md = k->data;
for (o = md->streams.head; o; o = o->next) {
ps2 = o->data;
if (PS_ISSET(ps2, RTP)) {
// --- only RTP is interesting
found = 1;
break;
}
}
if (found) { break; }
}
}
if (!found)
ps2 = NULL;
struct sink_handler *sh = g_queue_peek_head(&ps->rtp_sinks);
ps2 = sh ? sh->sink : NULL;
if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) {
if (atomic64_get(&ps->stats.packets)!=0 && IS_OWN_CALL(c)){


+ 24
- 19
include/call.h View File

@ -14,6 +14,7 @@
#include <pcre.h>
#include <openssl/x509.h>
#include <limits.h>
#include <stdbool.h>
#include "compat.h"
#include "socket.h"
#include "media_socket.h"
@ -200,7 +201,6 @@ struct call;
struct redis;
struct crypto_suite;
struct rtpengine_srtp;
struct streamhandler;
struct sdp_ng_flags;
struct local_interface;
struct call_monologue;
@ -290,18 +290,17 @@ struct packet_stream {
struct recording_stream recording; /* LOCK: call->master_lock */
GQueue sfds; /* LOCK: call->master_lock */
struct stream_fd * volatile selected_sfd;
struct stream_fd * selected_sfd;
struct dtls_connection ice_dtls; /* LOCK: in_lock */
struct packet_stream *rtp_sink; /* LOCK: call->master_lock */
struct packet_stream *rtcp_sink; /* LOCK: call->master_lock */
GQueue rtp_sinks; // LOCK: call->master_lock, in_lock for streamhandler
GQueue rtcp_sinks; // LOCK: call->master_lock, in_lock for streamhandler
struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */
const struct streamhandler *handler; /* LOCK: in_lock */
struct endpoint endpoint; /* LOCK: out_lock */
struct endpoint detected_endpoints[4]; /* LOCK: out_lock */
struct timeval ep_detect_signal; /* LOCK: out_lock */
struct endpoint advertised_endpoint; /* RO */
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct ssrc_ctx *ssrc_in, /* LOCK: in_lock */ // XXX eliminate these
struct ssrc_ctx *ssrc_in, /* LOCK: in_lock */
*ssrc_out; /* LOCK: out_lock */
struct send_timer *send_timer; /* RO */
struct jitter_buffer *jb; /* RO */
@ -374,6 +373,13 @@ struct call_media {
volatile unsigned int media_flags;
};
// link between subscribers and subscriptions
struct call_subscription {
struct call_monologue *monologue;
GList *link; // link into the corresponding opposite list
unsigned int offer_answer:1; // bidirectional, exclusive
};
/* half a dialogue */
/* protected by call->master_lock, except the RO elements */
struct call_monologue {
@ -386,12 +392,13 @@ struct call_monologue {
str label;
time_t created; /* RO */
time_t deleted;
struct timeval started; /* for CDR */
struct timeval terminated; /* for CDR */
enum termination_reason term_reason;
struct timeval started; /* for CDR */
struct timeval terminated; /* for CDR */
enum termination_reason term_reason;
GHashTable *other_tags;
GHashTable *branches;
struct call_monologue *active_dialogue;
GQueue subscriptions; // who am I subscribed to (sources)
GQueue subscribers; // who is subscribed to me (sinks)
GQueue medias;
GHashTable *media_ids;
struct media_player *player;
@ -517,15 +524,20 @@ struct call_monologue *__monologue_create(struct call *call);
void __monologue_tag(struct call_monologue *ml, const str *tag);
void __monologue_viabranch(struct call_monologue *ml, const str *viabranch);
struct packet_stream *__packet_stream_new(struct call *call);
void __add_subscription(struct call_monologue *ml, struct call_monologue *other, bool offer_answer);
void free_sink_handler(void *);
void __add_sink_handler(GQueue *, struct packet_stream *);
struct call *call_get_or_create(const str *callid, int foreign);
struct call *call_get_opmode(const str *callid, enum call_opmode opmode);
void call_make_own_foreign(struct call *c, int foreign);
struct call_monologue *call_get_mono_dialogue(struct call *call, const str *fromtag, const str *totag,
int call_get_mono_dialogue(struct call_monologue *dialogue[2], struct call *call, const str *fromtag,
const str *totag,
const str *viabranch);
struct call_monologue *call_get_monologue(struct call *call, const str *fromtag);
struct call *call_get(const str *callid);
int monologue_offer_answer(struct call_monologue *monologue, GQueue *streams, struct sdp_ng_flags *flags);
int monologue_offer_answer(struct call_monologue *dialogue[2], GQueue *streams, struct sdp_ng_flags *flags);
void codecs_offer_answer(struct call_media *media, struct call_media *other_media,
struct stream_params *sp, struct sdp_ng_flags *flags);
int call_delete_branch(const str *callid, const str *branch,
@ -601,13 +613,6 @@ INLINE str *call_str_init_dup(struct call *c, char *s) {
str_init(&t, s);
return call_str_dup(c, &t);
}
INLINE struct packet_stream *packet_stream_sink(struct packet_stream *ps) {
struct packet_stream *ret;
ret = ps->rtp_sink;
if (!ret)
ret = ps->rtcp_sink;
return ret;
}
INLINE void __call_unkernelize(struct call *call) {
for (GList *l = call->monologues.head; l; l = l->next) {
struct call_monologue *ml = l->data;


+ 7
- 0
include/media_socket.h View File

@ -122,6 +122,11 @@ struct stream_fd {
struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */
struct dtls_connection dtls; /* LOCK: stream->in_lock */
};
struct sink_handler {
struct packet_stream *sink;
const struct streamhandler *handler;
int kernel_output_idx;
};
struct media_packet {
str raw;
@ -132,6 +137,7 @@ struct media_packet {
struct packet_stream *stream; // sfd->stream
struct call_media *media; // stream->media
struct call_media *media_out; // output media
struct sink_handler sink;
struct rtp_header *rtp;
struct rtcp_packet *rtcp;
@ -176,6 +182,7 @@ void kernelize(struct packet_stream *);
void __unkernelize(struct packet_stream *);
void unkernelize(struct packet_stream *);
void __stream_unconfirm(struct packet_stream *);
void __reset_sink_handlers(struct packet_stream *);
void media_update_stats(struct call_media *m);


+ 2
- 2
kernel-module/xt_RTPENGINE.c View File

@ -4385,13 +4385,13 @@ not_stun:
goto skip_error;
src_check_ok:
if (g->target.dtls && is_dtls(skb))
goto skip1;
if (g->target.non_forwarding) {
if (g->target.blackhole)
error_nf_action = NF_DROP;
goto skip1;
}
if (g->target.dtls && is_dtls(skb))
goto skip1;
rtp.ok = 0;
if (!g->target.rtp)


+ 1
- 3
t/test-transcode.c View File

@ -87,8 +87,6 @@ static void __start(const char *file, int line) {
str_init(&ml_B.tag, "tag_B");
media_B->monologue = &ml_B;
media_B->protocol = &transport_protocols[PROTO_RTP_AVP];
ml_A.active_dialogue = &ml_B;
ml_B.active_dialogue = &ml_A;
__init();
}
@ -233,7 +231,7 @@ static void __packet_seq_ts(const char *file, int line, struct call_media *media
// from __stream_ssrc()
if (!MEDIA_ISSET(media, TRANSCODE))
mp.ssrc_in->ssrc_map_out = ntohl(ssrc);
mp.ssrc_out = get_ssrc_ctx(mp.ssrc_in->ssrc_map_out, media->monologue->active_dialogue->ssrc_hash, SSRC_DIR_OUTPUT, NULL);
mp.ssrc_out = get_ssrc_ctx(mp.ssrc_in->ssrc_map_out, other_media->monologue->ssrc_hash, SSRC_DIR_OUTPUT, NULL);
payload_tracker_add(&mp.ssrc_in->tracker, pt_in & 0x7f);
int packet_len = sizeof(struct rtp_header) + pl.len;


Loading…
Cancel
Save