diff --git a/daemon/call.c b/daemon/call.c index 4a9679595..8da3025ef 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -499,55 +499,8 @@ destroy: free(url_suffix); } - -// reverse of count_stream_stats_userspace() -static void count_stream_stats_kernel(struct packet_stream *ps) { - if (!PS_ISSET(ps, RTP)) - return; - if (bf_set(&ps->stats_flags, PS_STATS_KERNEL)) - return; // flag was already set, nothing to do - - if (bf_isset(&ps->stats_flags, PS_STATS_USERSPACE)) { - // mixed stream. count as only mixed stream. - if (bf_clear(&ps->stats_flags, PS_STATS_KERNEL_COUNTED)) - RTPE_GAUGE_DEC(kernel_only_streams); - if (bf_clear(&ps->stats_flags, PS_STATS_USERSPACE_COUNTED)) - RTPE_GAUGE_DEC(userspace_streams); - if (!bf_set(&ps->stats_flags, PS_STATS_MIXED_COUNTED)) - RTPE_GAUGE_INC(kernel_user_streams); - } - else { - // kernel-only (for now). count it. - if (!bf_set(&ps->stats_flags, PS_STATS_KERNEL_COUNTED)) - RTPE_GAUGE_INC(kernel_only_streams); - } -} - - -#define DS_io(x, ps, ke, io) do { \ - uint64_t ks_val; \ - ks_val = atomic64_get(&ps->kernel_stats_ ## io.x); \ - if ((ke)->x < ks_val) \ - diff_ ## x ## _ ## io = 0; \ - else \ - diff_ ## x ## _ ## io = (ke)->x - ks_val; \ - atomic64_add(&ps->stats_ ## io.x, diff_ ## x ## _ ## io); \ - atomic64_add(&ps->selected_sfd->local_intf->stats.io.x, diff_ ## x ## _ ## io); \ - RTPE_STATS_ADD(x ## _kernel, diff_ ## x ## _ ## io); \ - } while (0) - -#define DS(x) DS_io(x, ps, &ke->stats_in, in) -#define DSo(x) DS_io(x, sink, stats_o, out) - void call_timer(void *ptr) { struct iterator_helper hlp; - GList *i; - struct rtpengine_list_entry *ke; - struct packet_stream *ps; - int j; - struct rtp_stats *rs; - unsigned int pt; - endpoint_t ep; struct timeval tv_start; long long run_diff_us; @@ -575,166 +528,6 @@ void call_timer(void *ptr) { // stats derived while iterating calls RTPE_GAUGE_SET(transcoded_media, hlp.transcoded_media); - // TODO: eliminate/split out most of what this single central timer does - i = hlp.count ? kernel_list() : NULL; - while (i) { - ke = i->data; - - kernel2endpoint(&ep, &ke->target.local); - AUTO_CLEANUP(struct stream_fd *sfd, stream_fd_auto_cleanup) = stream_fd_lookup(&ep); - if (!sfd) - goto next; - - log_info_stream_fd(sfd); - - rwlock_lock_r(&sfd->call->master_lock); - - ps = sfd->stream; - if (!ps || ps->selected_sfd != sfd) { - rwlock_unlock_r(&sfd->call->master_lock); - goto next; - } - - uint64_t diff_packets_in, diff_bytes_in, diff_errors_in; - uint64_t diff_packets_out, diff_bytes_out, diff_errors_out; - - DS(packets); - DS(bytes); - DS(errors); - - - if (ke->stats_in.packets != atomic64_get(&ps->kernel_stats_in.packets)) { - atomic64_set(&ps->last_packet, rtpe_now.tv_sec); - count_stream_stats_kernel(ps); - } - - ps->in_tos_tclass = ke->stats_in.tos; - -#if (RE_HAS_MEASUREDELAY) - /* XXX fix atomicity */ - ps->stats_in.delay_min = ke->stats_in.delay_min; - ps->stats_in.delay_avg = ke->stats_in.delay_avg; - ps->stats_in.delay_max = ke->stats_in.delay_max; -#endif - - atomic64_set(&ps->kernel_stats_in.bytes, ke->stats_in.bytes); - atomic64_set(&ps->kernel_stats_in.packets, ke->stats_in.packets); - atomic64_set(&ps->kernel_stats_in.errors, ke->stats_in.errors); - - uint64_t max_diff = 0; - int max_pt = -1; - for (j = 0; j < ke->target.num_payload_types; j++) { - pt = ke->target.pt_input[j].pt_num; - rs = g_hash_table_lookup(ps->rtp_stats, GINT_TO_POINTER(pt)); - if (!rs) - continue; - if (ke->rtp_stats[j].packets > atomic64_get(&rs->packets)) { - uint64_t diff = ke->rtp_stats[j].packets - atomic64_get(&rs->packets); - atomic64_add(&rs->packets, diff); - if (diff > max_diff) { - max_diff = diff; - max_pt = pt; - } - } - if (ke->rtp_stats[j].bytes > atomic64_get(&rs->bytes)) - atomic64_add(&rs->bytes, - ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes)); - atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets); - atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes); - } - - bool update = false; - - if (diff_packets_in) - sfd->call->foreign_media = 0; - - if (!ke->target.non_forwarding && diff_packets_in) { - for (GList *l = ps->rtp_sinks.head; l; l = l->next) { - struct sink_handler *sh = l->data; - struct packet_stream *sink = sh->sink; - - if (sh->kernel_output_idx < 0 - || sh->kernel_output_idx >= ke->target.num_destinations) - continue; - - struct rtpengine_output_info *o = &ke->outputs[sh->kernel_output_idx]; - struct rtpengine_stats *stats_o = &ke->stats_out[sh->kernel_output_idx]; - - DSo(bytes); - DSo(packets); - DSo(errors); - - atomic64_set(&sink->kernel_stats_out.bytes, stats_o->bytes); - atomic64_set(&sink->kernel_stats_out.packets, stats_o->packets); - atomic64_set(&sink->kernel_stats_out.errors, stats_o->errors); - - mutex_lock(&sink->out_lock); - for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) { - if (!ke->target.ssrc[u]) // end of list - break; - uint32_t out_ssrc = o->ssrc_out[u]; - if (!out_ssrc) - out_ssrc = ke->target.ssrc[u]; - struct ssrc_ctx *ctx = __hunt_ssrc_ctx(ntohl(out_ssrc), - sink->ssrc_out, 0); - if (!ctx) - continue; - if (max_pt != -1) - payload_tracker_add(&ctx->tracker, max_pt); - if (sink->crypto.params.crypto_suite - && o->encrypt.last_index[u] - ctx->srtp_index > 0x4000) - { - ilog(LOG_DEBUG, "Updating SRTP encryption index from %" PRIu64 - " to %" PRIu64, - ctx->srtp_index, - o->encrypt.last_index[u]); - ctx->srtp_index = o->encrypt.last_index[u]; - update = true; - } - } - mutex_unlock(&sink->out_lock); - } - - mutex_lock(&ps->in_lock); - - for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) { - if (!ke->target.ssrc[u]) // end of list - break; - struct ssrc_ctx *ctx = __hunt_ssrc_ctx(ntohl(ke->target.ssrc[u]), - ps->ssrc_in, 0); - if (!ctx) - continue; - // TODO: add in SSRC stats similar to __stream_update_stats - atomic64_set(&ctx->last_seq, ke->target.decrypt.last_index[u]); - - if (max_pt != -1) - payload_tracker_add(&ctx->tracker, max_pt); - - if (sfd->crypto.params.crypto_suite - && ke->target.decrypt.last_index[u] - - ctx->srtp_index > 0x4000) { - ilog(LOG_DEBUG, "Updating SRTP decryption index from %" PRIu64 - " to %" PRIu64, - ctx->srtp_index, - ke->target.decrypt.last_index[u]); - ctx->srtp_index = ke->target.decrypt.last_index[u]; - update = true; - } - } - mutex_unlock(&ps->in_lock); - } - - rwlock_unlock_r(&sfd->call->master_lock); - - if (update) - redis_update_onekey(ps->call, rtpe_redis_write); - -next: - g_slice_free1(sizeof(*ke), ke); - i = g_list_delete_link(i, i); - log_info_pop(); - } - kill_calls_timer(hlp.del_scheduled, NULL); kill_calls_timer(hlp.del_timeout, rtpe_config.b2b_url); diff --git a/daemon/main.c b/daemon/main.c index 3b3cfec52..974c9239c 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1354,6 +1354,10 @@ int main(int argc, char **argv) { thread_create_detach_prio(call_rate_stats_updater, NULL, rtpe_config.idle_scheduling, rtpe_config.idle_priority, "call rate stats"); + /* separate thread for ports iterations (stats update from the kernel) functionality */ + thread_create_detach_prio(kernel_stats_updater_iterator, NULL, rtpe_config.idle_scheduling, + rtpe_config.idle_priority, "kernel stats updater"); + if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && initial_rtpe_config.redis_delete_async) thread_create_detach(redis_delete_async_loop, NULL, "redis async"); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index e46a1b4c3..1300abbae 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -45,6 +45,20 @@ #define MAX_RECV_LOOP_STRIKES 5 #endif +#define DS_io(x, ps, ke, io) do { \ + uint64_t ks_val; \ + ks_val = atomic64_get(&ps->kernel_stats_ ## io.x); \ + if ((ke)->x < ks_val) \ + diff_ ## x ## _ ## io = 0; \ + else \ + diff_ ## x ## _ ## io = (ke)->x - ks_val; \ + atomic64_add(&ps->stats_ ## io.x, diff_ ## x ## _ ## io); \ + atomic64_add(&ps->selected_sfd->local_intf->stats.io.x, diff_ ## x ## _ ## io); \ + RTPE_STATS_ADD(x ## _kernel, diff_ ## x ## _ ## io); \ + } while (0) + +#define DS(x) DS_io(x, ps, &ke->stats_in, in) +#define DSo(x) DS_io(x, sink, stats_o, out) struct intf_rr { @@ -2603,7 +2617,9 @@ static int media_packet_queue_dup(GQueue *q) { return 0; } -// reverse of count_stream_stats_kernel() +/** + * reverse of count_stream_stats_kernel() + */ static void count_stream_stats_userspace(struct packet_stream *ps) { if (!PS_ISSET(ps, RTP)) return; @@ -2625,7 +2641,30 @@ static void count_stream_stats_userspace(struct packet_stream *ps) { RTPE_GAUGE_INC(userspace_streams); } } +/** + * reverse of count_stream_stats_userspace() + */ +static void count_stream_stats_kernel(struct packet_stream *ps) { + if (!PS_ISSET(ps, RTP)) + return; + if (bf_set(&ps->stats_flags, PS_STATS_KERNEL)) + return; // flag was already set, nothing to do + if (bf_isset(&ps->stats_flags, PS_STATS_USERSPACE)) { + // mixed stream. count as only mixed stream. + if (bf_clear(&ps->stats_flags, PS_STATS_KERNEL_COUNTED)) + RTPE_GAUGE_DEC(kernel_only_streams); + if (bf_clear(&ps->stats_flags, PS_STATS_USERSPACE_COUNTED)) + RTPE_GAUGE_DEC(userspace_streams); + if (!bf_set(&ps->stats_flags, PS_STATS_MIXED_COUNTED)) + RTPE_GAUGE_INC(kernel_user_streams); + } + else { + // kernel-only (for now). count it. + if (!bf_set(&ps->stats_flags, PS_STATS_KERNEL_COUNTED)) + RTPE_GAUGE_INC(kernel_only_streams); + } +} /** * Packet handling starts in stream_packet(). @@ -3297,3 +3336,184 @@ struct interface_stats_block *interface_sampled_rate_stats_get(struct interface_ ret->last_run = rtpe_now; return &ret->stats; } + + +/** + * Ports iterations (stats update from the kernel) functionality. + */ +static void kernel_stats_updater(void) { + struct rtpengine_list_entry *ke; + struct packet_stream *ps; + int j; + struct rtp_stats *rs; + unsigned int pt; + endpoint_t ep; + + /* TODO: should we realy check the count of call timers? `call_timer_iterator()` */ + GList * kl = kernel_list(); + while (kl) { + ke = kl->data; + kernel2endpoint(&ep, &ke->target.local); + AUTO_CLEANUP(struct stream_fd *sfd, stream_fd_auto_cleanup) = stream_fd_lookup(&ep); + + if (!sfd) + goto next; + + log_info_stream_fd(sfd); + + rwlock_lock_r(&sfd->call->master_lock); + ps = sfd->stream; + if (!ps || ps->selected_sfd != sfd) { + rwlock_unlock_r(&sfd->call->master_lock); + goto next; + } + + uint64_t diff_packets_in, diff_bytes_in, diff_errors_in; + uint64_t diff_packets_out, diff_bytes_out, diff_errors_out; + + DS(packets); + DS(bytes); + DS(errors); + + if (ke->stats_in.packets != atomic64_get(&ps->kernel_stats_in.packets)) { + atomic64_set(&ps->last_packet, rtpe_now.tv_sec); + count_stream_stats_kernel(ps); + } + + ps->in_tos_tclass = ke->stats_in.tos; + +#if (RE_HAS_MEASUREDELAY) + /* XXX fix atomicity */ + ps->stats_in.delay_min = ke->stats_in.delay_min; + ps->stats_in.delay_avg = ke->stats_in.delay_avg; + ps->stats_in.delay_max = ke->stats_in.delay_max; +#endif + + atomic64_set(&ps->kernel_stats_in.bytes, ke->stats_in.bytes); + atomic64_set(&ps->kernel_stats_in.packets, ke->stats_in.packets); + atomic64_set(&ps->kernel_stats_in.errors, ke->stats_in.errors); + + uint64_t max_diff = 0; + int max_pt = -1; + for (j = 0; j < ke->target.num_payload_types; j++) { + pt = ke->target.pt_input[j].pt_num; + rs = g_hash_table_lookup(ps->rtp_stats, GINT_TO_POINTER(pt)); + if (!rs) + continue; + if (ke->rtp_stats[j].packets > atomic64_get(&rs->packets)) { + uint64_t diff = ke->rtp_stats[j].packets - atomic64_get(&rs->packets); + atomic64_add(&rs->packets, diff); + if (diff > max_diff) { + max_diff = diff; + max_pt = pt; + } + } + if (ke->rtp_stats[j].bytes > atomic64_get(&rs->bytes)) + atomic64_add(&rs->bytes, + ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes)); + atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets); + atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes); + } + + bool update = false; + + if (diff_packets_in) + sfd->call->foreign_media = 0; + + if (!ke->target.non_forwarding && diff_packets_in) { + for (GList *l = ps->rtp_sinks.head; l; l = l->next) { + struct sink_handler *sh = l->data; + struct packet_stream *sink = sh->sink; + + if (sh->kernel_output_idx < 0 + || sh->kernel_output_idx >= ke->target.num_destinations) + continue; + + struct rtpengine_output_info *o = &ke->outputs[sh->kernel_output_idx]; + struct rtpengine_stats *stats_o = &ke->stats_out[sh->kernel_output_idx]; + + DSo(bytes); + DSo(packets); + DSo(errors); + + atomic64_set(&sink->kernel_stats_out.bytes, stats_o->bytes); + atomic64_set(&sink->kernel_stats_out.packets, stats_o->packets); + atomic64_set(&sink->kernel_stats_out.errors, stats_o->errors); + + mutex_lock(&sink->out_lock); + for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) { + if (!ke->target.ssrc[u]) // end of list + break; + uint32_t out_ssrc = o->ssrc_out[u]; + if (!out_ssrc) + out_ssrc = ke->target.ssrc[u]; + struct ssrc_ctx *ctx = __hunt_ssrc_ctx(ntohl(out_ssrc), + sink->ssrc_out, 0); + if (!ctx) + continue; + if (max_pt != -1) + payload_tracker_add(&ctx->tracker, max_pt); + if (sink->crypto.params.crypto_suite + && o->encrypt.last_index[u] - ctx->srtp_index > 0x4000) + { + ilog(LOG_DEBUG, "Updating SRTP encryption index from %" PRIu64 + " to %" PRIu64, + ctx->srtp_index, + o->encrypt.last_index[u]); + ctx->srtp_index = o->encrypt.last_index[u]; + update = true; + } + } + mutex_unlock(&sink->out_lock); + } + + mutex_lock(&ps->in_lock); + + for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) { + if (!ke->target.ssrc[u]) // end of list + break; + struct ssrc_ctx *ctx = __hunt_ssrc_ctx(ntohl(ke->target.ssrc[u]), + ps->ssrc_in, 0); + if (!ctx) + continue; + // TODO: add in SSRC stats similar to __stream_update_stats + atomic64_set(&ctx->last_seq, ke->target.decrypt.last_index[u]); + + if (max_pt != -1) + payload_tracker_add(&ctx->tracker, max_pt); + + if (sfd->crypto.params.crypto_suite + && ke->target.decrypt.last_index[u] + - ctx->srtp_index > 0x4000) { + ilog(LOG_DEBUG, "Updating SRTP decryption index from %" PRIu64 + " to %" PRIu64, + ctx->srtp_index, + ke->target.decrypt.last_index[u]); + ctx->srtp_index = ke->target.decrypt.last_index[u]; + update = true; + } + } + mutex_unlock(&ps->in_lock); + } + + rwlock_unlock_r(&sfd->call->master_lock); + + if (update) + redis_update_onekey(ps->call, rtpe_redis_write); + +next: + g_slice_free1(sizeof(*ke), ke); + kl = g_list_delete_link(kl, kl); + log_info_pop(); + } + +} +void kernel_stats_updater_iterator(void * dummy) { + while (!rtpe_shutdown) { + kernel_stats_updater(); + + thread_cancel_enable(); + usleep(1000000); /* sleep for 1 second in each iteration */ + thread_cancel_disable(); + } +} diff --git a/include/media_socket.h b/include/media_socket.h index 11e7f776c..21eca9d2e 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -334,6 +334,8 @@ const struct transport_protocol *transport_protocol(const str *s); //void play_buffered(struct packet_stream *sink, struct codec_packet *cp, int buffered); void play_buffered(struct jb_packet *cp); +void kernel_stats_updater_iterator(void * dummy); + INLINE int proto_is_rtp(const struct transport_protocol *protocol) { // known to be RTP? therefore unknown is not RTP if (!protocol)