Browse Source

MT#55283 used typed hash table for rtp_stats

Change-Id: I015a931d7e726815d2a7d06fa4d74a1418dfec0b
pull/1910/head
Richard Fuchs 10 months ago
parent
commit
4c9343b8c6
3 changed files with 43 additions and 45 deletions
  1. +20
    -20
      daemon/call.c
  2. +19
    -23
      daemon/media_socket.c
  3. +4
    -2
      include/call.h

+ 20
- 20
daemon/call.c View File

@ -954,10 +954,12 @@ static int __wildcard_endpoint_map(struct call_media *media, unsigned int num_po
return 0;
}
static void __rtp_stats_free(void *p) {
static void __rtp_stats_free(struct rtp_stats *p) {
bufferpool_unref(p);
}
TYPED_GHASHTABLE_IMPL(rtp_stats_ht, g_direct_hash, g_direct_equal, NULL, __rtp_stats_free)
struct packet_stream *__packet_stream_new(call_t *call) {
struct packet_stream *stream;
@ -966,7 +968,7 @@ struct packet_stream *__packet_stream_new(call_t *call) {
mutex_init(&stream->out_lock);
stream->call = call;
atomic64_set_na(&stream->last_packet, rtpe_now.tv_sec);
stream->rtp_stats = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __rtp_stats_free);
stream->rtp_stats = rtp_stats_ht_new();
recording_init_stream(stream);
stream->send_timer = send_timer_new(stream);
stream->stats_in = bufferpool_alloc0(shm_bufferpool, sizeof(*stream->stats_in));
@ -1208,18 +1210,18 @@ int __init_stream(struct packet_stream *ps) {
return 0;
}
static void rtp_stats_add_pt(GHashTable *dst, const struct rtp_payload_type *pt) {
struct rtp_stats *rs = g_hash_table_lookup(dst, GINT_TO_POINTER(pt->payload_type));
static void rtp_stats_add_pt(rtp_stats_ht dst, const struct rtp_payload_type *pt) {
struct rtp_stats *rs = t_hash_table_lookup(dst, GINT_TO_POINTER(pt->payload_type));
if (rs)
return;
rs = bufferpool_alloc0(shm_bufferpool, sizeof(*rs));
rs->payload_type = pt->payload_type;
rs->clock_rate = pt->clock_rate;
g_hash_table_insert(dst, GINT_TO_POINTER(rs->payload_type), rs);
t_hash_table_insert(dst, GINT_TO_POINTER(rs->payload_type), rs);
}
void __rtp_stats_update(GHashTable *dst, struct codec_store *cs) {
void __rtp_stats_update(rtp_stats_ht dst, struct codec_store *cs) {
rtp_payload_type *pt;
codecs_ht src = cs->codecs;
@ -3842,9 +3844,7 @@ void dialogue_connect(struct call_monologue *src_ml, struct call_monologue *dst_
static int __rtp_stats_sort(const void *ap, const void *bp) {
const struct rtp_stats *a = ap, *b = bp;
static int __rtp_stats_sort(const struct rtp_stats *a, const struct rtp_stats *b) {
/* descending order */
if (atomic64_get(&a->packets) > atomic64_get(&b->packets))
return -1;
@ -3855,8 +3855,6 @@ static int __rtp_stats_sort(const void *ap, const void *bp) {
const rtp_payload_type *__rtp_stats_codec(struct call_media *m) {
struct packet_stream *ps;
GList *values;
struct rtp_stats *rtp_s;
const rtp_payload_type *rtp_pt = NULL;
/* we only use the primary packet stream for the time being */
@ -3865,21 +3863,23 @@ const rtp_payload_type *__rtp_stats_codec(struct call_media *m) {
ps = m->streams.head->data;
values = g_hash_table_get_values(ps->rtp_stats);
if (!values)
rtp_stats_ht_iter iter;
t_hash_table_iter_init(&iter, ps->rtp_stats);
struct rtp_stats *rs, *top = NULL;
while (t_hash_table_iter_next(&iter, NULL, &rs)) {
if (!top || __rtp_stats_sort(rs, top) < 0)
top = rs;
}
if (!top)
return NULL;
values = g_list_sort(values, __rtp_stats_sort);
/* payload type with the most packets */
rtp_s = values->data;
if (atomic64_get(&rtp_s->packets) == 0)
if (atomic64_get(&top->packets) == 0)
goto out;
rtp_pt = get_rtp_payload_type(rtp_s->payload_type, &m->codecs);
rtp_pt = get_rtp_payload_type(top->payload_type, &m->codecs);
out:
g_list_free(values);
return rtp_pt; /* may be NULL */
}
@ -4301,7 +4301,7 @@ static void __call_free(call_t *c) {
ps = t_queue_pop_head(&c->streams);
crypto_cleanup(&ps->crypto);
t_queue_clear(&ps->sfds);
g_hash_table_destroy(ps->rtp_stats);
t_hash_table_destroy(ps->rtp_stats);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++)
ssrc_ctx_put(&ps->ssrc_in[u]);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++)


+ 19
- 23
daemon/media_socket.c View File

@ -1497,9 +1497,7 @@ INLINE void __re_address_translate_ep(struct re_address *o, const endpoint_t *ep
ep->address.family->endpoint2kernel(o, ep);
}
static int __rtp_stats_pt_sort(const void *ap, const void *bp) {
const struct rtp_stats *a = ap, *b = bp;
static int __rtp_stats_pt_sort(const struct rtp_stats *a, const struct rtp_stats *b) {
if (a->payload_type < b->payload_type)
return -1;
if (a->payload_type > b->payload_type)
@ -1516,7 +1514,7 @@ static int __rtp_stats_pt_sort(const void *ap, const void *bp) {
*/
static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *outputs,
struct packet_stream *stream, struct sink_handler *sink_handler, sink_handler_q *sinks,
GList **payload_types)
rtp_stats_arr **payload_types)
{
struct rtpengine_destination_info *redi = NULL;
call_t *call = stream->call;
@ -1608,19 +1606,24 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out
}
if (reti->rtp && sinks && sinks->length && payload_types) {
GList *l;
struct rtp_stats *rs;
// this code is execute only once: list therefore must be empty
assert(*payload_types == NULL);
*payload_types = g_hash_table_get_values(stream->rtp_stats);
*payload_types = g_list_sort(*payload_types, __rtp_stats_pt_sort);
for (l = *payload_types; l; ) {
// create sorted list of payload types
*payload_types = rtp_stats_arr_new_sized(t_hash_table_size(stream->rtp_stats));
rtp_stats_ht_iter iter;
t_hash_table_iter_init(&iter, stream->rtp_stats);
unsigned int i = 0;
while (t_hash_table_iter_next(&iter, NULL, &rs))
(*payload_types)->pdata[i++] = rs;
t_ptr_array_sort(*payload_types, __rtp_stats_pt_sort);
for (i = 0; i < (*payload_types)->len; i++) {
if (reti->num_payload_types >= G_N_ELEMENTS(reti->pt_stats)) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Too many RTP payload types for kernel module");
break;
}
rs = l->data;
rs = (*payload_types)->pdata[i];
// only add payload types that are passthrough for all sinks
bool can_kernelize = true;
for (__auto_type k = sinks->head; k; k = k->next) {
@ -1637,16 +1640,12 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out
reti->pt_filter = 1;
// ensure that the final list in *payload_types reflects the payload
// types populated in reti->payload_types
GList *next = l->next;
*payload_types = g_list_delete_link(*payload_types, l);
l = next;
t_ptr_array_remove_index(*payload_types, i);
continue;
}
reti->pt_stats[reti->num_payload_types] = rs;
reti->num_payload_types++;
l = l->next;
}
}
else {
@ -1673,10 +1672,9 @@ output:
|| sink_handler->attrs.silence_media;
bool manipulate_pt = silenced || ML_ISSET(media->monologue, BLOCK_SHORT);
if (manipulate_pt && payload_types) {
int i = 0;
for (GList *l = *payload_types; l; l = l->next) {
struct rtp_stats *rs = l->data;
struct rtpengine_pt_output *rpt = &redi->output.pt_output[i++];
for (unsigned int i = 0; i < (*payload_types)->len; i++) {
__auto_type rs = (*payload_types)->pdata[i];
struct rtpengine_pt_output *rpt = &redi->output.pt_output[i];
struct codec_handler *ch = codec_handler_get(media, rs->payload_type,
sink->media, sink_handler);
@ -1746,7 +1744,7 @@ output:
// helper function for kernelize()
static void kernelize_one_sink_handler(struct rtpengine_target_info *reti, GQueue *outputs,
struct packet_stream *stream, struct sink_handler *sink_handler, sink_handler_q *sinks,
GList **payload_types)
rtp_stats_arr **payload_types)
{
struct packet_stream *sink = sink_handler->sink;
if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED))
@ -1761,6 +1759,7 @@ void kernelize(struct packet_stream *stream) {
call_t *call = stream->call;
const char *nk_warn_msg;
struct call_media *media = stream->media;
g_autoptr(rtp_stats_arr) payload_types = NULL;
if (PS_ISSET(stream, KERNELIZED))
return;
@ -1784,7 +1783,6 @@ void kernelize(struct packet_stream *stream) {
struct rtpengine_target_info reti;
ZERO(reti); // reti.local.family determines if anything can be done
GQueue outputs = G_QUEUE_INIT;
GList *payload_types = NULL;
unsigned int num_sinks = stream->rtp_sinks.length + stream->rtcp_sinks.length;
@ -1816,8 +1814,6 @@ void kernelize(struct packet_stream *stream) {
reti.num_rtcp_destinations = reti.num_destinations - num_rtp_dests;
}
g_list_free(payload_types);
if (!reti.local.family)
goto no_kernel;
@ -2248,7 +2244,7 @@ static void media_packet_rtp_in(struct packet_handler_ctx *phc)
// XXX yet another hash table per payload type -> combine
struct rtp_stats *rtp_s = g_atomic_pointer_get(&phc->mp.stream->rtp_stats_cache);
if (G_UNLIKELY(!rtp_s) || G_UNLIKELY(rtp_s->payload_type != phc->payload_type))
rtp_s = g_hash_table_lookup(phc->mp.stream->rtp_stats,
rtp_s = t_hash_table_lookup(phc->mp.stream->rtp_stats,
GUINT_TO_POINTER(phc->payload_type));
if (!rtp_s) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT,


+ 4
- 2
include/call.h View File

@ -390,6 +390,8 @@ struct loop_protector {
};
TYPED_GPTRARRAY(rtp_stats_arr, struct rtp_stats)
TYPED_GHASHTABLE_PROTO(rtp_stats_ht, void, struct rtp_stats)
/**
* The packet_stream itself can be marked as:
@ -444,7 +446,7 @@ struct packet_stream {
struct stream_stats *stats_in;
struct stream_stats *stats_out;
atomic64 last_packet; // userspace only
GHashTable *rtp_stats; /* LOCK: call->master_lock */
rtp_stats_ht rtp_stats; /* LOCK: call->master_lock */
struct rtp_stats *rtp_stats_cache;
enum endpoint_learning el_flags;
@ -874,7 +876,7 @@ int call_stream_address(GString *, struct packet_stream *ps, enum stream_address
void add_total_calls_duration_in_interval(struct timeval *interval_tv);
enum thread_looper_action call_timer(void);
void __rtp_stats_update(GHashTable *dst, struct codec_store *);
void __rtp_stats_update(rtp_stats_ht dst, struct codec_store *);
int __init_stream(struct packet_stream *ps);
void call_stream_crypto_reset(struct packet_stream *ps);


Loading…
Cancel
Save