diff --git a/daemon/aux.h b/daemon/aux.h index d0d0af3e5..1b15dfe4c 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -604,6 +604,9 @@ INLINE void timeval_lowest(struct timeval *l, const struct timeval *n) { if (!l->tv_sec || timeval_cmp(l, n) == 1) *l = *n; } +INLINE double ntp_ts_to_double(u_int32_t whole, u_int32_t frac) { + return (double) whole + (double) frac / 4294967296.0; +} diff --git a/daemon/rtcp.c b/daemon/rtcp.c index a903fe053..239c3a79a 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -15,6 +15,7 @@ #include "homer.h" #include "media_socket.h" #include "rtcplib.h" +#include "ssrc.h" @@ -133,7 +134,17 @@ struct rtcp_chain_element { // struct defs // context to hold state variables struct rtcp_process_ctx { - u_int32_t scratch; + // input + struct call *call; + const struct timeval *received; + + // handler vars + union { + struct ssrc_receiver_report rr; + struct ssrc_sender_report sr; + } scratch; + u_int32_t scratch_common_ssrc; + GString *log; GString *json; }; @@ -159,6 +170,7 @@ struct rtcp_handler { struct rtcp_handlers { const struct rtcp_handler *scratch, + *mos, *logging, *homer; }; @@ -167,8 +179,14 @@ struct rtcp_handlers { static void dummy_handler(); // scratch area (prepare/parse packet) +static void scratch_common(struct rtcp_process_ctx *, const struct rtcp_packet *); +static void scratch_sr(struct rtcp_process_ctx *, const struct sender_report_packet *); static void scratch_rr(struct rtcp_process_ctx *, const struct report_block *); +// MOS calculation / stats +static void mos_sr(struct rtcp_process_ctx *, const struct sender_report_packet *); +static void mos_rr(struct rtcp_process_ctx *, const struct report_block *); + // homer functions static void homer_init(struct rtcp_process_ctx *); static void homer_sr(struct rtcp_process_ctx *, const struct sender_report_packet *); @@ -197,7 +215,13 @@ static void logging_destroy(struct rtcp_process_ctx *); // structs for each handler type static struct rtcp_handler dummy_handlers; static struct rtcp_handler scratch_handlers = { + .common = scratch_common, .rr = scratch_rr, + .sr = scratch_sr, +}; +static struct rtcp_handler mos_handlers = { + .rr = mos_rr, + .sr = mos_sr, }; static struct rtcp_handler log_handlers = { .init = logging_init, @@ -225,12 +249,15 @@ static struct rtcp_handler homer_handlers = { // main var to hold references static struct rtcp_handlers rtcp_handlers = { .scratch = &scratch_handlers, + .mos = &mos_handlers, + // remainder is variable }; // list of all handlers static struct rtcp_handler *all_handlers[] = { &dummy_handlers, &scratch_handlers, + &mos_handlers, &log_handlers, &homer_handlers, }; @@ -238,6 +265,7 @@ static struct rtcp_handler *all_handlers[] = { // macro to call all function handlers in one go #define CAH(func, ...) do { \ rtcp_handlers.scratch->func(log_ctx, ##__VA_ARGS__); \ + rtcp_handlers.mos->func(log_ctx, ##__VA_ARGS__); \ rtcp_handlers.logging->func(log_ctx, ##__VA_ARGS__); \ rtcp_handlers.homer->func(log_ctx, ##__VA_ARGS__); \ } while (0) @@ -423,7 +451,11 @@ static int __rtcp_parse(GQueue *q, const str *_s, struct stream_fd *sfd, const e int min_packet_size; ZERO(log_ctx_s); + log_ctx_s.call = c; + log_ctx_s.received = tv; + log_ctx = &log_ctx_s; + CAH(init); CAH(start, c); @@ -675,25 +707,52 @@ static void dummy_handler() { return; } + + +static void scratch_common(struct rtcp_process_ctx *ctx, const struct rtcp_packet *common) { + ctx->scratch_common_ssrc = htonl(common->ssrc); +} static void scratch_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) { - ctx->scratch = (rr->number_lost[0] << 16) | (rr->number_lost[1] << 8) | rr->number_lost[2]; + ctx->scratch.rr = (struct ssrc_receiver_report) { + .from = ctx->scratch_common_ssrc, + .ssrc = htonl(rr->ssrc), + .high_seq_received = ntohl(rr->high_seq_received), + .fraction_lost = rr->fraction_lost, + .jitter = ntohl(rr->jitter), + .lsr = ntohl(rr->lsr), + .dlsr = ntohl(rr->dlsr), + }; + ctx->scratch.rr.packets_lost = (rr->number_lost[0] << 16) | (rr->number_lost[1] << 8) | rr->number_lost[2]; +} +static void scratch_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) { + ctx->scratch.sr = (struct ssrc_sender_report) { + .ssrc = ctx->scratch_common_ssrc, + .ntp_msw = ntohl(sr->ntp_msw), + .ntp_lsw = ntohl(sr->ntp_lsw), + .octet_count = ntohl(sr->octet_count), + .timestamp = ntohl(sr->timestamp), + .packet_count = ntohl(sr->packet_count), + }; + ctx->scratch.sr.ntp_ts = ntp_ts_to_double(ctx->scratch.sr.ntp_msw, ctx->scratch.sr.ntp_lsw); } + + static void homer_init(struct rtcp_process_ctx *ctx) { ctx->json = g_string_new("{ "); } static void homer_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) { g_string_append_printf(ctx->json, "\"sender_information\":{\"ntp_timestamp_sec\":%u," "\"ntp_timestamp_usec\":%u,\"octets\":%u,\"rtp_timestamp\":%u, \"packets\":%u},", - ntohl(sr->ntp_msw), - ntohl(sr->ntp_lsw), - ntohl(sr->octet_count), - ntohl(sr->timestamp), - ntohl(sr->packet_count)); + ctx->scratch.sr.ntp_msw, + ctx->scratch.sr.ntp_lsw, + ctx->scratch.sr.octet_count, + ctx->scratch.sr.timestamp, + ctx->scratch.sr.packet_count); } static void homer_rr_list_start(struct rtcp_process_ctx *ctx, const struct rtcp_packet *common) { g_string_append_printf(ctx->json, "\"ssrc\":%u,\"type\":%u,\"report_count\":%u,\"report_blocks\":[", - ntohl(common->ssrc), + ctx->scratch_common_ssrc, common->header.pt, common->header.count); } @@ -701,13 +760,13 @@ static void homer_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr g_string_append_printf(ctx->json, "{\"source_ssrc\":%u," "\"highest_seq_no\":%u,\"fraction_lost\":%u,\"ia_jitter\":%u," "\"packets_lost\":%u,\"lsr\":%u,\"dlsr\":%u},", - ntohl(rr->ssrc), - ntohl(rr->high_seq_received), - rr->fraction_lost, - ntohl(rr->jitter), - ctx->scratch, - ntohl(rr->lsr), - ntohl(rr->dlsr)); + ctx->scratch.rr.ssrc, + ctx->scratch.rr.high_seq_received, + ctx->scratch.rr.fraction_lost, + ctx->scratch.rr.jitter, + ctx->scratch.rr.packets_lost, + ctx->scratch.rr.lsr, + ctx->scratch.rr.dlsr); } static void homer_rr_list_end(struct rtcp_process_ctx *ctx) { str_sanitize(ctx->json); @@ -783,7 +842,7 @@ static void logging_common(struct rtcp_process_ctx *ctx, const struct rtcp_packe common->header.count, common->header.pt, ntohs(common->header.length), - ntohl(common->ssrc)); + ctx->scratch_common_ssrc); } static void logging_sdes_list_start(struct rtcp_process_ctx *ctx, const struct source_description_packet *sdes) { g_string_append_printf(ctx->log,"version=%u, padding=%u, count=%u, payloadtype=%u, length=%u, ", @@ -794,22 +853,23 @@ static void logging_sdes_list_start(struct rtcp_process_ctx *ctx, const struct s ntohs(sdes->header.length)); } static void logging_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) { - g_string_append_printf(ctx->log,"ntp_sec=%u, ntp_fractions=%u, rtp_ts=%u, sender_packets=%u, sender_bytes=%u, ", - ntohl(sr->ntp_msw), - ntohl(sr->ntp_lsw), - ntohl(sr->timestamp), - ntohl(sr->packet_count), - ntohl(sr->octet_count)); + g_string_append_printf(ctx->log,"ntp_sec=%u, ntp_fractions=%u, rtp_ts=%u, sender_packets=%u, " \ + "sender_bytes=%u, ", + ctx->scratch.sr.ntp_msw, + ctx->scratch.sr.ntp_lsw, + ctx->scratch.sr.timestamp, + ctx->scratch.sr.packet_count, + ctx->scratch.sr.octet_count); } static void logging_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) { g_string_append_printf(ctx->log,"ssrc=%u, fraction_lost=%u, packet_loss=%u, last_seq=%u, jitter=%u, last_sr=%u, delay_since_last_sr=%u, ", - ntohl(rr->ssrc), + ctx->scratch.rr.ssrc, rr->fraction_lost, - ctx->scratch, - ntohl(rr->high_seq_received), - ntohl(rr->jitter), - ntohl(rr->lsr), - ntohl(rr->dlsr)); + ctx->scratch.rr.packets_lost, + ctx->scratch.rr.high_seq_received, + ctx->scratch.rr.jitter, + ctx->scratch.rr.lsr, + ctx->scratch.rr.dlsr); } static void logging_xr(struct rtcp_process_ctx *ctx, const struct rtcp_packet *common, str *comp_s) { pjmedia_rtcp_xr_rx_rtcp_xr(ctx->log, common, comp_s); @@ -826,6 +886,17 @@ static void logging_destroy(struct rtcp_process_ctx *ctx) { + +static void mos_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) { + ssrc_sender_report(ctx->call, &ctx->scratch.sr, ctx->received); +} +static void mos_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) { + ssrc_receiver_report(ctx->call, &ctx->scratch.rr, ctx->received); +} + + + + void rtcp_init() { rtcp_handlers.logging = _log_facility_rtcp ? &log_handlers : &dummy_handlers; rtcp_handlers.homer = has_homer() ? &homer_handlers : &dummy_handlers; diff --git a/daemon/ssrc.c b/daemon/ssrc.c index e9e144e3f..4a7cc4be3 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -1,6 +1,7 @@ #include "ssrc.h" #include #include "aux.h" +#include "call.h" @@ -8,13 +9,20 @@ static struct ssrc_entry *create_ssrc_entry(u_int32_t ssrc) { struct ssrc_entry *ent; ent = g_slice_alloc0(sizeof(struct ssrc_entry)); ent->ssrc = ssrc; + mutex_init(&ent->lock); return ent; } static void add_ssrc_entry(struct ssrc_entry *ent, struct ssrc_hash *ht) { g_hash_table_replace(ht->ht, &ent->ssrc, ent); } +static void free_sender_report(void *p) { + struct ssrc_sender_report_item *i = p; + g_slice_free1(sizeof(*i), i); +} static void free_ssrc_entry(void *p) { - g_slice_free1(sizeof(struct ssrc_entry), p); + struct ssrc_entry *e = p; + g_queue_clear_full(&e->sender_reports, free_sender_report); + g_slice_free1(sizeof(*e), e); } @@ -72,3 +80,54 @@ struct ssrc_ctx *get_ssrc_ctx(u_int32_t ssrc, struct ssrc_hash *ht, enum ssrc_di return ((void *) s) + dir; } + + +void ssrc_sender_report(struct call *c, const struct ssrc_sender_report *sr, + const struct timeval *tv) +{ + struct ssrc_entry *e; + struct ssrc_sender_report_item *seri; + + seri = g_slice_alloc(sizeof(*seri)); + seri->received = *tv; + seri->report = *sr; + seri->ntp_middle_bits = sr->ntp_msw << 16 | sr->ntp_lsw >> 16; + + ilog(LOG_DEBUG, "SR from %u: RTP TS %u PC %u OC %u NTP TS %u/%u=%f", + sr->ssrc, sr->timestamp, sr->packet_count, sr->octet_count, + sr->ntp_msw, sr->ntp_lsw, sr->ntp_ts); + + e = get_ssrc(sr->ssrc, c->ssrc_hash); + + mutex_lock(&e->lock); + + g_queue_push_tail(&e->sender_reports, seri); + + mutex_unlock(&e->lock); +} +void ssrc_receiver_report(struct call *c, const struct ssrc_receiver_report *rr, + const struct timeval *tv) +{ + ilog(LOG_DEBUG, "RR from %u about %u: FL %u TL %u HSR %u J %u LSR %u DLSR %u", + rr->from, rr->ssrc, rr->fraction_lost, rr->packets_lost, + rr->high_seq_received, rr->jitter, rr->lsr, rr->dlsr); + + if (!rr->lsr || !rr->dlsr) + return; // no delay to be known + + struct ssrc_entry *e = get_ssrc(rr->ssrc, c->ssrc_hash); + mutex_lock(&e->lock); + // go through the list backwards until we find the SR referenced, up to 10 steps + int i = 0; + for (GList *l = e->sender_reports.tail; + l && i < 10; + l = l->prev, i++) + { + struct ssrc_sender_report_item *seri = l->data; + if (seri->ntp_middle_bits != rr->lsr) + continue; + ilog(LOG_DEBUG, "RR from %u reports delay %u from %u", rr->from, rr->dlsr, rr->ssrc); + break; + } + mutex_unlock(&e->lock); +} diff --git a/daemon/ssrc.h b/daemon/ssrc.h index ac88ee487..10977c387 100644 --- a/daemon/ssrc.h +++ b/daemon/ssrc.h @@ -10,6 +10,11 @@ +struct call; +struct timeval; + + + struct ssrc_hash { GHashTable *ht; rwlock_t lock; @@ -20,16 +25,47 @@ struct ssrc_ctx { // XXX move entire crypto context in here? }; struct ssrc_entry { - // XXX lock this? + mutex_t lock; u_int32_t ssrc; struct ssrc_ctx input_ctx, output_ctx; + GQueue sender_reports; }; enum ssrc_dir { SSRC_DIR_INPUT = G_STRUCT_OFFSET(struct ssrc_entry, input_ctx), SSRC_DIR_OUTPUT = G_STRUCT_OFFSET(struct ssrc_entry, output_ctx), }; +struct ssrc_sender_report { + u_int32_t ssrc; + u_int32_t ntp_msw; + u_int32_t ntp_lsw; + u_int32_t timestamp; + u_int32_t packet_count; + u_int32_t octet_count; + double ntp_ts; +}; +struct ssrc_sender_report_item { + struct timeval received; + u_int32_t ntp_middle_bits; // to match up with rr->lsr + struct ssrc_sender_report report; +}; + +struct ssrc_receiver_report { + u_int32_t from; + u_int32_t ssrc; + unsigned char fraction_lost; + u_int32_t packets_lost; + u_int32_t high_seq_received; + u_int32_t jitter; + u_int32_t lsr; + u_int32_t dlsr; +}; +struct ssrc_receiver_report_item { + struct timeval received; + struct ssrc_receiver_report report; +}; + @@ -43,5 +79,10 @@ struct ssrc_entry *get_ssrc(u_int32_t, struct ssrc_hash * /* , int *created */); struct ssrc_ctx *get_ssrc_ctx(u_int32_t, struct ssrc_hash *, enum ssrc_dir); // creates new entry if not found +void ssrc_sender_report(struct call *, const struct ssrc_sender_report *, const struct timeval *); +void ssrc_receiver_report(struct call *, const struct ssrc_receiver_report *, + const struct timeval *); + + #endif