From dd34574669294d00405448302844a693441316dd Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Thu, 21 Jun 2018 14:33:19 -0400 Subject: [PATCH] TT#38350 keep track of the most used RTP payload type Instead of just remembering the last seen RTP payload type, this adds a tracker that keeps track of the last 32 seen payload types and keep the list in order of how often each type was seen. Change-Id: I062a43b7bfc9413b755dca548d72953ff8245477 --- daemon/codec.c | 6 +-- daemon/media_socket.c | 2 +- daemon/redis.c | 8 +-- daemon/ssrc.c | 104 ++++++++++++++++++++++++++++++++++++++- include/ssrc.h | 16 +++++- t/.gitignore | 1 + t/Makefile | 6 ++- t/payload-tracker-test.c | 96 ++++++++++++++++++++++++++++++++++++ 8 files changed, 226 insertions(+), 13 deletions(-) create mode 100644 t/payload-tracker-test.c diff --git a/daemon/codec.c b/daemon/codec.c index ea7cccd3d..59a282ed8 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -488,8 +488,8 @@ void codec_add_raw_packet(struct media_packet *mp) { struct codec_packet *p = g_slice_alloc(sizeof(*p)); p->s = mp->raw; p->free_func = NULL; - if (mp->rtp) - mp->ssrc_out->payload_type = mp->rtp->m_pt & 0x7f; + if (mp->rtp && mp->ssrc_out) + payload_tracker_add(&mp->ssrc_out->tracker, mp->rtp->m_pt & 0x7f); g_queue_push_tail(&mp->packets_out, p); } static int handler_func_passthrough(struct codec_handler *h, struct media_packet *mp) { @@ -738,7 +738,7 @@ static int __packet_encoded(encoder_t *enc, void *u1, void *u2) { struct codec_packet *p = g_slice_alloc(sizeof(*p)); p->s.s = buf; p->s.len = inout.len + sizeof(struct rtp_header); - mp->ssrc_out->payload_type = ch->handler->dest_pt.payload_type; + payload_tracker_add(&mp->ssrc_out->tracker, ch->handler->dest_pt.payload_type); p->free_func = free; g_queue_push_tail(&mp->packets_out, p); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 5f2df107e..92b4cfcbf 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1329,7 +1329,7 @@ static void media_packet_rtp(struct packet_handler_ctx *phc) // XXX redundant between SSRC handling and codec_handler stuff -> combine phc->payload_type = (phc->mp.rtp->m_pt & 0x7f); if (G_LIKELY(phc->mp.ssrc_in)) - phc->mp.ssrc_in->payload_type = phc->payload_type; + payload_tracker_add(&phc->mp.ssrc_in->tracker, phc->payload_type); // XXX yet another hash table per payload type -> combine struct rtp_stats *rtp_s = g_atomic_pointer_get(&phc->mp.stream->rtp_stats_cache); diff --git a/daemon/redis.c b/daemon/redis.c index 9beee0a04..0d807a01c 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1478,10 +1478,10 @@ static int json_build_ssrc(struct call *c, JsonReader *root_reader) { struct ssrc_entry_call *se = get_ssrc(ssrc, c->ssrc_hash); se->input_ctx.srtp_index = json_reader_get_ll(root_reader, "in_srtp_index"); se->input_ctx.srtcp_index = json_reader_get_ll(root_reader, "in_srtcp_index"); - se->input_ctx.payload_type = json_reader_get_ll(root_reader, "in_payload_type"); + payload_tracker_add(&se->input_ctx.tracker, json_reader_get_ll(root_reader, "in_payload_type")); se->output_ctx.srtp_index = json_reader_get_ll(root_reader, "out_srtp_index"); se->output_ctx.srtcp_index = json_reader_get_ll(root_reader, "out_srtcp_index"); - se->output_ctx.payload_type = json_reader_get_ll(root_reader, "out_payload_type"); + payload_tracker_add(&se->output_ctx.tracker, json_reader_get_ll(root_reader, "out_payload_type")); json_reader_end_element(root_reader); obj_put(&se->h); @@ -2092,10 +2092,10 @@ char* redis_encode_json(struct call *c) { // XXX use function for in/out JSON_SET_SIMPLE("in_srtp_index","%" PRIu64, se->input_ctx.srtp_index); JSON_SET_SIMPLE("in_srtcp_index","%" PRIu64, se->input_ctx.srtcp_index); - JSON_SET_SIMPLE("in_payload_type","%i", se->input_ctx.payload_type); + JSON_SET_SIMPLE("in_payload_type","%i", se->input_ctx.tracker.most[0]); JSON_SET_SIMPLE("out_srtp_index","%" PRIu64, se->output_ctx.srtp_index); JSON_SET_SIMPLE("out_srtcp_index","%" PRIu64, se->output_ctx.srtcp_index); - JSON_SET_SIMPLE("out_payload_type","%i", se->output_ctx.payload_type); + JSON_SET_SIMPLE("out_payload_type","%i", se->output_ctx.tracker.most[0]); // XXX add rest of info json_builder_end_object (builder); diff --git a/daemon/ssrc.c b/daemon/ssrc.c index 2bb108aae..393358bc7 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -11,7 +11,7 @@ static void __free_ssrc_entry_call(void *e); static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) { c->parent = parent; - c->payload_type = -1; + payload_tracker_init(&c->tracker); while (!c->ssrc_map_out) c->ssrc_map_out = random(); } @@ -243,7 +243,7 @@ static long long __calc_rtt(struct call *c, u_int32_t ssrc, u_int32_t ntp_middle return 0; if (pt_p) - *pt_p = e->output_ctx.payload_type; + *pt_p = e->output_ctx.tracker.most[0] == 255 ? -1 : e->output_ctx.tracker.most[0]; struct ssrc_time_item *sti; GQueue *q = (((void *) e) + reports_queue_offset); @@ -432,3 +432,103 @@ void ssrc_voip_metrics(struct call_media *m, const struct ssrc_xr_voip_metrics * e->last_rtt = vm->rnd_trip_delay; obj_put(&e->h); } + +static void __pt_sort(struct payload_tracker *t, int pt) { + // bubble up? + while (t->idx[pt] > 0) { + int this_idx = t->idx[pt]; + int prev_idx = this_idx - 1; + int prev_pt = t->most[prev_idx]; + if (G_LIKELY(t->count[prev_pt] >= t->count[pt])) + break; + // bubble up! + ilog(LOG_DEBUG, "bubble up pt %i from idx %u to %u", pt, this_idx, prev_idx); + // swap entries in "most" list + int prev = t->most[prev_idx]; + t->most[prev_idx] = t->most[this_idx]; + t->most[this_idx] = prev; + // adjust indexes + t->idx[pt]--; + t->idx[prev_pt]++; + } + + // bubble down? + while (t->idx[pt] < t->most_len - 1) { + int this_idx = t->idx[pt]; + int next_idx = this_idx + 1; + int next_pt = t->most[next_idx]; + if (G_LIKELY(t->count[next_pt] <= t->count[pt])) + break; + // bubble down! + ilog(LOG_DEBUG, "bubble down pt %i from idx %u to %u", pt, this_idx, next_idx); + // swap entries in "most" list + int next = t->most[next_idx]; + t->most[next_idx] = t->most[this_idx]; + t->most[this_idx] = next; + // adjust indexes + t->idx[pt]++; + t->idx[next_pt]--; + } + +} + +void payload_tracker_init(struct payload_tracker *t) { + mutex_init(&t->lock); + memset(&t->last, -1, sizeof(t->last)); + memset(&t->count, 0, sizeof(t->count)); + memset(&t->idx, -1, sizeof(t->idx)); + memset(&t->most, -1, sizeof(t->most)); + t->last_idx = 0; + t->most_len = 0; +} +//#define PT_DBG(x...) ilog(LOG_DEBUG, x) +#define PT_DBG(x...) ((void)0) +void payload_tracker_add(struct payload_tracker *t, int pt) { + if (G_UNLIKELY(pt < 0) || G_UNLIKELY(pt >= 128)) + return; + + mutex_lock(&t->lock); + + PT_DBG("new pt: %i", pt); + PT_DBG("last idx: %u", t->last_idx); + int old_pt = t->last[t->last_idx]; + PT_DBG("old pt: %u", old_pt); + + if (G_LIKELY(old_pt != 255)) { + // overwriting old entry. is it the same as the new one? + if (G_LIKELY(old_pt == pt)) { + PT_DBG("old pt == new pt"); + // no change + goto out; + } + PT_DBG("decreasing old pt count from %u", t->count[old_pt]); + // different: decrease old counter + t->count[old_pt]--; + } + + // fill in new entry + t->last[t->last_idx++] = pt; + if (t->last_idx >= G_N_ELEMENTS(t->last)) + t->last_idx = 0; + + // increase new counter + PT_DBG("increasing new pt count from %u", t->count[pt]); + t->count[pt]++; + + // is this a new entry? + if (G_UNLIKELY(t->idx[pt] == 255)) { + // put to the end of the "most" list + PT_DBG("inserting new entry at pos %u", t->most_len); + t->idx[pt] = t->most_len; + t->most[t->most_len] = pt; + t->most_len++; + } + + // now bubble sort both new and old entries + __pt_sort(t, pt); + if (G_LIKELY(old_pt != 255)) + __pt_sort(t, old_pt); + +out: + mutex_unlock(&t->lock); +} diff --git a/include/ssrc.h b/include/ssrc.h index 4f5dff7c7..2d462e66a 100644 --- a/include/ssrc.h +++ b/include/ssrc.h @@ -33,9 +33,19 @@ struct ssrc_hash { volatile struct ssrc_entry *cache; // last used entry volatile struct ssrc_entry *precreat; // next used entry }; +struct payload_tracker { + mutex_t lock; + unsigned char last[32]; // must be <= 255 + unsigned int last_idx; // rolling index into pt_last + unsigned char count[128]; // how many of each pt + unsigned char idx[128]; // each pt's index into most[] + unsigned char most[128]; // sorted list of pts + unsigned int most_len; // idx for new entries +}; struct ssrc_ctx { struct ssrc_entry_call *parent; - int payload_type; // to determine the clock rate for jitter calculations + struct payload_tracker tracker; + // XXX lock this? u_int64_t srtp_index, srtcp_index; @@ -183,5 +193,9 @@ void ssrc_voip_metrics(struct call_media *m, const struct ssrc_xr_voip_metrics * const struct timeval *); +void payload_tracker_init(struct payload_tracker *t); +void payload_tracker_add(struct payload_tracker *, int); + + #endif diff --git a/t/.gitignore b/t/.gitignore index a8d692760..b14d5778d 100644 --- a/t/.gitignore +++ b/t/.gitignore @@ -42,3 +42,4 @@ streambuf.c stun.c transcode-test udp_listener.c +payload-tracker-test diff --git a/t/Makefile b/t/Makefile index d9e08194f..1d70fcb84 100644 --- a/t/Makefile +++ b/t/Makefile @@ -49,7 +49,7 @@ LDLIBS+= $(shell pkg-config xmlrpc_util --libs 2> /dev/null) LDLIBS+= -lhiredis endif -SRCS= bitstr-test.c aes-crypt.c +SRCS= bitstr-test.c aes-crypt.c payload-tracker-test.c ifeq ($(with_transcoding),yes) SRCS+= amr-decode-test.c amr-encode-test.c transcode-test.c endif @@ -73,7 +73,7 @@ include .depend .PHONY: unit-tests -TESTS= bitstr-test aes-crypt +TESTS= bitstr-test aes-crypt payload-tracker-test ifeq ($(with_transcoding),yes) TESTS+= amr-decode-test amr-encode-test transcode-test endif @@ -95,3 +95,5 @@ transcode-test: transcode-test.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr kernel.o media_socket.o stun.o bencode.o socket.o poller.o dtls.o recording.o statistics.o \ rtcp.o redis.o iptables.o graphite.o call_interfaces.o sdp.o rtp.o crypto.o control_ng.o \ streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o + +payload-tracker-test: payload-tracker-test.o $(COMMONOBJS) ssrc.o aux.o auxlib.o rtp.o crypto.o diff --git a/t/payload-tracker-test.c b/t/payload-tracker-test.c new file mode 100644 index 000000000..59188a44c --- /dev/null +++ b/t/payload-tracker-test.c @@ -0,0 +1,96 @@ +#include +#include +#include +#include "ssrc.h" + + +static void most_cmp(struct payload_tracker *t, const char *cmp, const char *file, int line) { + char buf[1024] = ""; + int len = 0; + + for (int i = 0; i < t->most_len; i++) { + if (i > 0) + len += sprintf(buf+len, ","); + len += sprintf(buf+len, "%u", t->most[i]); + } + + if (strcmp(buf, cmp)) { + printf("test nok: %s:%i\n", file, line); + printf("expected: %s\n", cmp); + printf("got: %s\n", buf); + abort(); + } + + printf("test ok: %s:%i\n", file, line); +} + +#define cmp(s) most_cmp(&t, s, __FILE__, __LINE__) +#define add(p) payload_tracker_add(&t, p) + +int main() { + struct payload_tracker t; + + payload_tracker_init(&t); + + cmp(""); + + add(0); + cmp("0"); + + add(0); + add(0); + cmp("0"); + + add(5); + cmp("0,5"); + + add(5); + add(5); + cmp("0,5"); + + add(5); + cmp("5,0"); + + add(0); + cmp("5,0"); + + add(0); + cmp("0,5"); + + add(120); + cmp("0,5,120"); + + add(120); + add(120); + add(120); + add(120); + cmp("0,120,5"); + + add(120); + cmp("120,0,5"); + + add(120); + add(120); + add(120); + cmp("120,0,5"); + + add(5); + add(5); + cmp("120,5,0"); + + // saturation fill test + for (int i = 0; i < 32; i++) + add(10); + cmp("10,5,120,0"); + + // bubble up all the way + for (int i = 0; i < 32; i++) + add(0); + cmp("0,10,5,120"); + + // filled with 0s, so a single 1 goes in second place + add(1); + cmp("0,1,10,5,120"); + + return 0; +}