diff --git a/daemon/call.c b/daemon/call.c index 350f9b5e7..2d19b616e 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -954,10 +954,12 @@ static int __wildcard_endpoint_map(struct call_media *media, unsigned int num_po return 0; } -static void __rtp_stats_free(void *p) { +static void __rtp_stats_free(struct rtp_stats *p) { bufferpool_unref(p); } +TYPED_GHASHTABLE_IMPL(rtp_stats_ht, g_direct_hash, g_direct_equal, NULL, __rtp_stats_free) + struct packet_stream *__packet_stream_new(call_t *call) { struct packet_stream *stream; @@ -966,7 +968,7 @@ struct packet_stream *__packet_stream_new(call_t *call) { mutex_init(&stream->out_lock); stream->call = call; atomic64_set_na(&stream->last_packet, rtpe_now.tv_sec); - stream->rtp_stats = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __rtp_stats_free); + stream->rtp_stats = rtp_stats_ht_new(); recording_init_stream(stream); stream->send_timer = send_timer_new(stream); stream->stats_in = bufferpool_alloc0(shm_bufferpool, sizeof(*stream->stats_in)); @@ -1208,18 +1210,18 @@ int __init_stream(struct packet_stream *ps) { return 0; } -static void rtp_stats_add_pt(GHashTable *dst, const struct rtp_payload_type *pt) { - struct rtp_stats *rs = g_hash_table_lookup(dst, GINT_TO_POINTER(pt->payload_type)); +static void rtp_stats_add_pt(rtp_stats_ht dst, const struct rtp_payload_type *pt) { + struct rtp_stats *rs = t_hash_table_lookup(dst, GINT_TO_POINTER(pt->payload_type)); if (rs) return; rs = bufferpool_alloc0(shm_bufferpool, sizeof(*rs)); rs->payload_type = pt->payload_type; rs->clock_rate = pt->clock_rate; - g_hash_table_insert(dst, GINT_TO_POINTER(rs->payload_type), rs); + t_hash_table_insert(dst, GINT_TO_POINTER(rs->payload_type), rs); } -void __rtp_stats_update(GHashTable *dst, struct codec_store *cs) { +void __rtp_stats_update(rtp_stats_ht dst, struct codec_store *cs) { rtp_payload_type *pt; codecs_ht src = cs->codecs; @@ -3842,9 +3844,7 @@ void dialogue_connect(struct call_monologue *src_ml, struct call_monologue *dst_ -static int __rtp_stats_sort(const void *ap, const void *bp) { - const struct rtp_stats *a = ap, *b = bp; - +static int __rtp_stats_sort(const struct rtp_stats *a, const struct rtp_stats *b) { /* descending order */ if (atomic64_get(&a->packets) > atomic64_get(&b->packets)) return -1; @@ -3855,8 +3855,6 @@ static int __rtp_stats_sort(const void *ap, const void *bp) { const rtp_payload_type *__rtp_stats_codec(struct call_media *m) { struct packet_stream *ps; - GList *values; - struct rtp_stats *rtp_s; const rtp_payload_type *rtp_pt = NULL; /* we only use the primary packet stream for the time being */ @@ -3865,21 +3863,23 @@ const rtp_payload_type *__rtp_stats_codec(struct call_media *m) { ps = m->streams.head->data; - values = g_hash_table_get_values(ps->rtp_stats); - if (!values) + rtp_stats_ht_iter iter; + t_hash_table_iter_init(&iter, ps->rtp_stats); + struct rtp_stats *rs, *top = NULL; + while (t_hash_table_iter_next(&iter, NULL, &rs)) { + if (!top || __rtp_stats_sort(rs, top) < 0) + top = rs; + } + if (!top) return NULL; - values = g_list_sort(values, __rtp_stats_sort); - /* payload type with the most packets */ - rtp_s = values->data; - if (atomic64_get(&rtp_s->packets) == 0) + if (atomic64_get(&top->packets) == 0) goto out; - rtp_pt = get_rtp_payload_type(rtp_s->payload_type, &m->codecs); + rtp_pt = get_rtp_payload_type(top->payload_type, &m->codecs); out: - g_list_free(values); return rtp_pt; /* may be NULL */ } @@ -4301,7 +4301,7 @@ static void __call_free(call_t *c) { ps = t_queue_pop_head(&c->streams); crypto_cleanup(&ps->crypto); t_queue_clear(&ps->sfds); - g_hash_table_destroy(ps->rtp_stats); + t_hash_table_destroy(ps->rtp_stats); for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) ssrc_ctx_put(&ps->ssrc_in[u]); for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) diff --git a/daemon/media_socket.c b/daemon/media_socket.c index bc2d914cb..019b7b4be 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1497,9 +1497,7 @@ INLINE void __re_address_translate_ep(struct re_address *o, const endpoint_t *ep ep->address.family->endpoint2kernel(o, ep); } -static int __rtp_stats_pt_sort(const void *ap, const void *bp) { - const struct rtp_stats *a = ap, *b = bp; - +static int __rtp_stats_pt_sort(const struct rtp_stats *a, const struct rtp_stats *b) { if (a->payload_type < b->payload_type) return -1; if (a->payload_type > b->payload_type) @@ -1516,7 +1514,7 @@ static int __rtp_stats_pt_sort(const void *ap, const void *bp) { */ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *outputs, struct packet_stream *stream, struct sink_handler *sink_handler, sink_handler_q *sinks, - GList **payload_types) + rtp_stats_arr **payload_types) { struct rtpengine_destination_info *redi = NULL; call_t *call = stream->call; @@ -1608,19 +1606,24 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out } if (reti->rtp && sinks && sinks->length && payload_types) { - GList *l; struct rtp_stats *rs; // this code is execute only once: list therefore must be empty assert(*payload_types == NULL); - *payload_types = g_hash_table_get_values(stream->rtp_stats); - *payload_types = g_list_sort(*payload_types, __rtp_stats_pt_sort); - for (l = *payload_types; l; ) { + // create sorted list of payload types + *payload_types = rtp_stats_arr_new_sized(t_hash_table_size(stream->rtp_stats)); + rtp_stats_ht_iter iter; + t_hash_table_iter_init(&iter, stream->rtp_stats); + unsigned int i = 0; + while (t_hash_table_iter_next(&iter, NULL, &rs)) + (*payload_types)->pdata[i++] = rs; + t_ptr_array_sort(*payload_types, __rtp_stats_pt_sort); + for (i = 0; i < (*payload_types)->len; i++) { if (reti->num_payload_types >= G_N_ELEMENTS(reti->pt_stats)) { ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Too many RTP payload types for kernel module"); break; } - rs = l->data; + rs = (*payload_types)->pdata[i]; // only add payload types that are passthrough for all sinks bool can_kernelize = true; for (__auto_type k = sinks->head; k; k = k->next) { @@ -1637,16 +1640,12 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out reti->pt_filter = 1; // ensure that the final list in *payload_types reflects the payload // types populated in reti->payload_types - GList *next = l->next; - *payload_types = g_list_delete_link(*payload_types, l); - l = next; + t_ptr_array_remove_index(*payload_types, i); continue; } reti->pt_stats[reti->num_payload_types] = rs; reti->num_payload_types++; - - l = l->next; } } else { @@ -1673,10 +1672,9 @@ output: || sink_handler->attrs.silence_media; bool manipulate_pt = silenced || ML_ISSET(media->monologue, BLOCK_SHORT); if (manipulate_pt && payload_types) { - int i = 0; - for (GList *l = *payload_types; l; l = l->next) { - struct rtp_stats *rs = l->data; - struct rtpengine_pt_output *rpt = &redi->output.pt_output[i++]; + for (unsigned int i = 0; i < (*payload_types)->len; i++) { + __auto_type rs = (*payload_types)->pdata[i]; + struct rtpengine_pt_output *rpt = &redi->output.pt_output[i]; struct codec_handler *ch = codec_handler_get(media, rs->payload_type, sink->media, sink_handler); @@ -1746,7 +1744,7 @@ output: // helper function for kernelize() static void kernelize_one_sink_handler(struct rtpengine_target_info *reti, GQueue *outputs, struct packet_stream *stream, struct sink_handler *sink_handler, sink_handler_q *sinks, - GList **payload_types) + rtp_stats_arr **payload_types) { struct packet_stream *sink = sink_handler->sink; if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED)) @@ -1761,6 +1759,7 @@ void kernelize(struct packet_stream *stream) { call_t *call = stream->call; const char *nk_warn_msg; struct call_media *media = stream->media; + g_autoptr(rtp_stats_arr) payload_types = NULL; if (PS_ISSET(stream, KERNELIZED)) return; @@ -1784,7 +1783,6 @@ void kernelize(struct packet_stream *stream) { struct rtpengine_target_info reti; ZERO(reti); // reti.local.family determines if anything can be done GQueue outputs = G_QUEUE_INIT; - GList *payload_types = NULL; unsigned int num_sinks = stream->rtp_sinks.length + stream->rtcp_sinks.length; @@ -1816,8 +1814,6 @@ void kernelize(struct packet_stream *stream) { reti.num_rtcp_destinations = reti.num_destinations - num_rtp_dests; } - g_list_free(payload_types); - if (!reti.local.family) goto no_kernel; @@ -2248,7 +2244,7 @@ static void media_packet_rtp_in(struct packet_handler_ctx *phc) // XXX yet another hash table per payload type -> combine struct rtp_stats *rtp_s = g_atomic_pointer_get(&phc->mp.stream->rtp_stats_cache); if (G_UNLIKELY(!rtp_s) || G_UNLIKELY(rtp_s->payload_type != phc->payload_type)) - rtp_s = g_hash_table_lookup(phc->mp.stream->rtp_stats, + rtp_s = t_hash_table_lookup(phc->mp.stream->rtp_stats, GUINT_TO_POINTER(phc->payload_type)); if (!rtp_s) { ilog(LOG_WARNING | LOG_FLAG_LIMIT, diff --git a/include/call.h b/include/call.h index 946c5280e..c165153a3 100644 --- a/include/call.h +++ b/include/call.h @@ -390,6 +390,8 @@ struct loop_protector { }; +TYPED_GPTRARRAY(rtp_stats_arr, struct rtp_stats) +TYPED_GHASHTABLE_PROTO(rtp_stats_ht, void, struct rtp_stats) /** * The packet_stream itself can be marked as: @@ -444,7 +446,7 @@ struct packet_stream { struct stream_stats *stats_in; struct stream_stats *stats_out; atomic64 last_packet; // userspace only - GHashTable *rtp_stats; /* LOCK: call->master_lock */ + rtp_stats_ht rtp_stats; /* LOCK: call->master_lock */ struct rtp_stats *rtp_stats_cache; enum endpoint_learning el_flags; @@ -874,7 +876,7 @@ int call_stream_address(GString *, struct packet_stream *ps, enum stream_address void add_total_calls_duration_in_interval(struct timeval *interval_tv); enum thread_looper_action call_timer(void); -void __rtp_stats_update(GHashTable *dst, struct codec_store *); +void __rtp_stats_update(rtp_stats_ht dst, struct codec_store *); int __init_stream(struct packet_stream *ps); void call_stream_crypto_reset(struct packet_stream *ps);