Browse Source

TT#12800 store and process RTCP SR and RR for statistics

Change-Id: Ia78c37ae9d24df6783b664da6d395263f9f39e91
changes/90/12490/17
Richard Fuchs 9 years ago
parent
commit
70b7112617
4 changed files with 204 additions and 30 deletions
  1. +3
    -0
      daemon/aux.h
  2. +99
    -28
      daemon/rtcp.c
  3. +60
    -1
      daemon/ssrc.c
  4. +42
    -1
      daemon/ssrc.h

+ 3
- 0
daemon/aux.h View File

@ -604,6 +604,9 @@ INLINE void timeval_lowest(struct timeval *l, const struct timeval *n) {
if (!l->tv_sec || timeval_cmp(l, n) == 1)
*l = *n;
}
INLINE double ntp_ts_to_double(u_int32_t whole, u_int32_t frac) {
return (double) whole + (double) frac / 4294967296.0;
}


+ 99
- 28
daemon/rtcp.c View File

@ -15,6 +15,7 @@
#include "homer.h"
#include "media_socket.h"
#include "rtcplib.h"
#include "ssrc.h"
@ -133,7 +134,17 @@ struct rtcp_chain_element {
// struct defs
// context to hold state variables
struct rtcp_process_ctx {
u_int32_t scratch;
// input
struct call *call;
const struct timeval *received;
// handler vars
union {
struct ssrc_receiver_report rr;
struct ssrc_sender_report sr;
} scratch;
u_int32_t scratch_common_ssrc;
GString *log;
GString *json;
};
@ -159,6 +170,7 @@ struct rtcp_handler {
struct rtcp_handlers {
const struct rtcp_handler
*scratch,
*mos,
*logging,
*homer;
};
@ -167,8 +179,14 @@ struct rtcp_handlers {
static void dummy_handler();
// scratch area (prepare/parse packet)
static void scratch_common(struct rtcp_process_ctx *, const struct rtcp_packet *);
static void scratch_sr(struct rtcp_process_ctx *, const struct sender_report_packet *);
static void scratch_rr(struct rtcp_process_ctx *, const struct report_block *);
// MOS calculation / stats
static void mos_sr(struct rtcp_process_ctx *, const struct sender_report_packet *);
static void mos_rr(struct rtcp_process_ctx *, const struct report_block *);
// homer functions
static void homer_init(struct rtcp_process_ctx *);
static void homer_sr(struct rtcp_process_ctx *, const struct sender_report_packet *);
@ -197,7 +215,13 @@ static void logging_destroy(struct rtcp_process_ctx *);
// structs for each handler type
static struct rtcp_handler dummy_handlers;
static struct rtcp_handler scratch_handlers = {
.common = scratch_common,
.rr = scratch_rr,
.sr = scratch_sr,
};
static struct rtcp_handler mos_handlers = {
.rr = mos_rr,
.sr = mos_sr,
};
static struct rtcp_handler log_handlers = {
.init = logging_init,
@ -225,12 +249,15 @@ static struct rtcp_handler homer_handlers = {
// main var to hold references
static struct rtcp_handlers rtcp_handlers = {
.scratch = &scratch_handlers,
.mos = &mos_handlers,
// remainder is variable
};
// list of all handlers
static struct rtcp_handler *all_handlers[] = {
&dummy_handlers,
&scratch_handlers,
&mos_handlers,
&log_handlers,
&homer_handlers,
};
@ -238,6 +265,7 @@ static struct rtcp_handler *all_handlers[] = {
// macro to call all function handlers in one go
#define CAH(func, ...) do { \
rtcp_handlers.scratch->func(log_ctx, ##__VA_ARGS__); \
rtcp_handlers.mos->func(log_ctx, ##__VA_ARGS__); \
rtcp_handlers.logging->func(log_ctx, ##__VA_ARGS__); \
rtcp_handlers.homer->func(log_ctx, ##__VA_ARGS__); \
} while (0)
@ -423,7 +451,11 @@ static int __rtcp_parse(GQueue *q, const str *_s, struct stream_fd *sfd, const e
int min_packet_size;
ZERO(log_ctx_s);
log_ctx_s.call = c;
log_ctx_s.received = tv;
log_ctx = &log_ctx_s;
CAH(init);
CAH(start, c);
@ -675,25 +707,52 @@ static void dummy_handler() {
return;
}
static void scratch_common(struct rtcp_process_ctx *ctx, const struct rtcp_packet *common) {
ctx->scratch_common_ssrc = htonl(common->ssrc);
}
static void scratch_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) {
ctx->scratch = (rr->number_lost[0] << 16) | (rr->number_lost[1] << 8) | rr->number_lost[2];
ctx->scratch.rr = (struct ssrc_receiver_report) {
.from = ctx->scratch_common_ssrc,
.ssrc = htonl(rr->ssrc),
.high_seq_received = ntohl(rr->high_seq_received),
.fraction_lost = rr->fraction_lost,
.jitter = ntohl(rr->jitter),
.lsr = ntohl(rr->lsr),
.dlsr = ntohl(rr->dlsr),
};
ctx->scratch.rr.packets_lost = (rr->number_lost[0] << 16) | (rr->number_lost[1] << 8) | rr->number_lost[2];
}
static void scratch_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) {
ctx->scratch.sr = (struct ssrc_sender_report) {
.ssrc = ctx->scratch_common_ssrc,
.ntp_msw = ntohl(sr->ntp_msw),
.ntp_lsw = ntohl(sr->ntp_lsw),
.octet_count = ntohl(sr->octet_count),
.timestamp = ntohl(sr->timestamp),
.packet_count = ntohl(sr->packet_count),
};
ctx->scratch.sr.ntp_ts = ntp_ts_to_double(ctx->scratch.sr.ntp_msw, ctx->scratch.sr.ntp_lsw);
}
static void homer_init(struct rtcp_process_ctx *ctx) {
ctx->json = g_string_new("{ ");
}
static void homer_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) {
g_string_append_printf(ctx->json, "\"sender_information\":{\"ntp_timestamp_sec\":%u,"
"\"ntp_timestamp_usec\":%u,\"octets\":%u,\"rtp_timestamp\":%u, \"packets\":%u},",
ntohl(sr->ntp_msw),
ntohl(sr->ntp_lsw),
ntohl(sr->octet_count),
ntohl(sr->timestamp),
ntohl(sr->packet_count));
ctx->scratch.sr.ntp_msw,
ctx->scratch.sr.ntp_lsw,
ctx->scratch.sr.octet_count,
ctx->scratch.sr.timestamp,
ctx->scratch.sr.packet_count);
}
static void homer_rr_list_start(struct rtcp_process_ctx *ctx, const struct rtcp_packet *common) {
g_string_append_printf(ctx->json, "\"ssrc\":%u,\"type\":%u,\"report_count\":%u,\"report_blocks\":[",
ntohl(common->ssrc),
ctx->scratch_common_ssrc,
common->header.pt,
common->header.count);
}
@ -701,13 +760,13 @@ static void homer_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr
g_string_append_printf(ctx->json, "{\"source_ssrc\":%u,"
"\"highest_seq_no\":%u,\"fraction_lost\":%u,\"ia_jitter\":%u,"
"\"packets_lost\":%u,\"lsr\":%u,\"dlsr\":%u},",
ntohl(rr->ssrc),
ntohl(rr->high_seq_received),
rr->fraction_lost,
ntohl(rr->jitter),
ctx->scratch,
ntohl(rr->lsr),
ntohl(rr->dlsr));
ctx->scratch.rr.ssrc,
ctx->scratch.rr.high_seq_received,
ctx->scratch.rr.fraction_lost,
ctx->scratch.rr.jitter,
ctx->scratch.rr.packets_lost,
ctx->scratch.rr.lsr,
ctx->scratch.rr.dlsr);
}
static void homer_rr_list_end(struct rtcp_process_ctx *ctx) {
str_sanitize(ctx->json);
@ -783,7 +842,7 @@ static void logging_common(struct rtcp_process_ctx *ctx, const struct rtcp_packe
common->header.count,
common->header.pt,
ntohs(common->header.length),
ntohl(common->ssrc));
ctx->scratch_common_ssrc);
}
static void logging_sdes_list_start(struct rtcp_process_ctx *ctx, const struct source_description_packet *sdes) {
g_string_append_printf(ctx->log,"version=%u, padding=%u, count=%u, payloadtype=%u, length=%u, ",
@ -794,22 +853,23 @@ static void logging_sdes_list_start(struct rtcp_process_ctx *ctx, const struct s
ntohs(sdes->header.length));
}
static void logging_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) {
g_string_append_printf(ctx->log,"ntp_sec=%u, ntp_fractions=%u, rtp_ts=%u, sender_packets=%u, sender_bytes=%u, ",
ntohl(sr->ntp_msw),
ntohl(sr->ntp_lsw),
ntohl(sr->timestamp),
ntohl(sr->packet_count),
ntohl(sr->octet_count));
g_string_append_printf(ctx->log,"ntp_sec=%u, ntp_fractions=%u, rtp_ts=%u, sender_packets=%u, " \
"sender_bytes=%u, ",
ctx->scratch.sr.ntp_msw,
ctx->scratch.sr.ntp_lsw,
ctx->scratch.sr.timestamp,
ctx->scratch.sr.packet_count,
ctx->scratch.sr.octet_count);
}
static void logging_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) {
g_string_append_printf(ctx->log,"ssrc=%u, fraction_lost=%u, packet_loss=%u, last_seq=%u, jitter=%u, last_sr=%u, delay_since_last_sr=%u, ",
ntohl(rr->ssrc),
ctx->scratch.rr.ssrc,
rr->fraction_lost,
ctx->scratch,
ntohl(rr->high_seq_received),
ntohl(rr->jitter),
ntohl(rr->lsr),
ntohl(rr->dlsr));
ctx->scratch.rr.packets_lost,
ctx->scratch.rr.high_seq_received,
ctx->scratch.rr.jitter,
ctx->scratch.rr.lsr,
ctx->scratch.rr.dlsr);
}
static void logging_xr(struct rtcp_process_ctx *ctx, const struct rtcp_packet *common, str *comp_s) {
pjmedia_rtcp_xr_rx_rtcp_xr(ctx->log, common, comp_s);
@ -826,6 +886,17 @@ static void logging_destroy(struct rtcp_process_ctx *ctx) {
static void mos_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) {
ssrc_sender_report(ctx->call, &ctx->scratch.sr, ctx->received);
}
static void mos_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) {
ssrc_receiver_report(ctx->call, &ctx->scratch.rr, ctx->received);
}
void rtcp_init() {
rtcp_handlers.logging = _log_facility_rtcp ? &log_handlers : &dummy_handlers;
rtcp_handlers.homer = has_homer() ? &homer_handlers : &dummy_handlers;


+ 60
- 1
daemon/ssrc.c View File

@ -1,6 +1,7 @@
#include "ssrc.h"
#include <glib.h>
#include "aux.h"
#include "call.h"
@ -8,13 +9,20 @@ static struct ssrc_entry *create_ssrc_entry(u_int32_t ssrc) {
struct ssrc_entry *ent;
ent = g_slice_alloc0(sizeof(struct ssrc_entry));
ent->ssrc = ssrc;
mutex_init(&ent->lock);
return ent;
}
static void add_ssrc_entry(struct ssrc_entry *ent, struct ssrc_hash *ht) {
g_hash_table_replace(ht->ht, &ent->ssrc, ent);
}
static void free_sender_report(void *p) {
struct ssrc_sender_report_item *i = p;
g_slice_free1(sizeof(*i), i);
}
static void free_ssrc_entry(void *p) {
g_slice_free1(sizeof(struct ssrc_entry), p);
struct ssrc_entry *e = p;
g_queue_clear_full(&e->sender_reports, free_sender_report);
g_slice_free1(sizeof(*e), e);
}
@ -72,3 +80,54 @@ struct ssrc_ctx *get_ssrc_ctx(u_int32_t ssrc, struct ssrc_hash *ht, enum ssrc_di
return ((void *) s) + dir;
}
void ssrc_sender_report(struct call *c, const struct ssrc_sender_report *sr,
const struct timeval *tv)
{
struct ssrc_entry *e;
struct ssrc_sender_report_item *seri;
seri = g_slice_alloc(sizeof(*seri));
seri->received = *tv;
seri->report = *sr;
seri->ntp_middle_bits = sr->ntp_msw << 16 | sr->ntp_lsw >> 16;
ilog(LOG_DEBUG, "SR from %u: RTP TS %u PC %u OC %u NTP TS %u/%u=%f",
sr->ssrc, sr->timestamp, sr->packet_count, sr->octet_count,
sr->ntp_msw, sr->ntp_lsw, sr->ntp_ts);
e = get_ssrc(sr->ssrc, c->ssrc_hash);
mutex_lock(&e->lock);
g_queue_push_tail(&e->sender_reports, seri);
mutex_unlock(&e->lock);
}
void ssrc_receiver_report(struct call *c, const struct ssrc_receiver_report *rr,
const struct timeval *tv)
{
ilog(LOG_DEBUG, "RR from %u about %u: FL %u TL %u HSR %u J %u LSR %u DLSR %u",
rr->from, rr->ssrc, rr->fraction_lost, rr->packets_lost,
rr->high_seq_received, rr->jitter, rr->lsr, rr->dlsr);
if (!rr->lsr || !rr->dlsr)
return; // no delay to be known
struct ssrc_entry *e = get_ssrc(rr->ssrc, c->ssrc_hash);
mutex_lock(&e->lock);
// go through the list backwards until we find the SR referenced, up to 10 steps
int i = 0;
for (GList *l = e->sender_reports.tail;
l && i < 10;
l = l->prev, i++)
{
struct ssrc_sender_report_item *seri = l->data;
if (seri->ntp_middle_bits != rr->lsr)
continue;
ilog(LOG_DEBUG, "RR from %u reports delay %u from %u", rr->from, rr->dlsr, rr->ssrc);
break;
}
mutex_unlock(&e->lock);
}

+ 42
- 1
daemon/ssrc.h View File

@ -10,6 +10,11 @@
struct call;
struct timeval;
struct ssrc_hash {
GHashTable *ht;
rwlock_t lock;
@ -20,16 +25,47 @@ struct ssrc_ctx {
// XXX move entire crypto context in here?
};
struct ssrc_entry {
// XXX lock this?
mutex_t lock;
u_int32_t ssrc;
struct ssrc_ctx input_ctx,
output_ctx;
GQueue sender_reports;
};
enum ssrc_dir {
SSRC_DIR_INPUT = G_STRUCT_OFFSET(struct ssrc_entry, input_ctx),
SSRC_DIR_OUTPUT = G_STRUCT_OFFSET(struct ssrc_entry, output_ctx),
};
struct ssrc_sender_report {
u_int32_t ssrc;
u_int32_t ntp_msw;
u_int32_t ntp_lsw;
u_int32_t timestamp;
u_int32_t packet_count;
u_int32_t octet_count;
double ntp_ts;
};
struct ssrc_sender_report_item {
struct timeval received;
u_int32_t ntp_middle_bits; // to match up with rr->lsr
struct ssrc_sender_report report;
};
struct ssrc_receiver_report {
u_int32_t from;
u_int32_t ssrc;
unsigned char fraction_lost;
u_int32_t packets_lost;
u_int32_t high_seq_received;
u_int32_t jitter;
u_int32_t lsr;
u_int32_t dlsr;
};
struct ssrc_receiver_report_item {
struct timeval received;
struct ssrc_receiver_report report;
};
@ -43,5 +79,10 @@ struct ssrc_entry *get_ssrc(u_int32_t, struct ssrc_hash * /* , int *created */);
struct ssrc_ctx *get_ssrc_ctx(u_int32_t, struct ssrc_hash *, enum ssrc_dir); // creates new entry if not found
void ssrc_sender_report(struct call *, const struct ssrc_sender_report *, const struct timeval *);
void ssrc_receiver_report(struct call *, const struct ssrc_receiver_report *,
const struct timeval *);
#endif

Loading…
Cancel
Save