diff --git a/daemon/call.c b/daemon/call.c index 47e7c37e5..4c4cac201 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -49,6 +49,7 @@ #include "dtmf.h" #include "audio_player.h" #include "sdp.h" +#include "bufferpool.h" #include "xt_RTPENGINE.h" @@ -901,6 +902,8 @@ struct packet_stream *__packet_stream_new(call_t *call) { stream->rtp_stats = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __rtp_stats_free); recording_init_stream(stream); stream->send_timer = send_timer_new(stream); + stream->stats_in = bufferpool_alloc0(shm_bufferpool, sizeof(*stream->stats_in)); + stream->stats_out = bufferpool_alloc0(shm_bufferpool, sizeof(*stream->stats_out)); if (rtpe_config.jb_length && !CALL_ISSET(call, DISABLE_JB)) stream->jb = jitter_buffer_new(call); @@ -1063,8 +1066,8 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) { if (sfd->socket.fd == -1 || ps->endpoint.address.family == NULL) continue; socket_sendto(&sfd->socket, fake_rtp.s, fake_rtp.len, &ps->endpoint); - atomic64_inc(&ps->stats_out.packets); - atomic64_add(&ps->stats_out.bytes, fake_rtp.len); + atomic64_inc_na(&ps->stats_out->packets); + atomic64_add_na(&ps->stats_out->bytes, fake_rtp.len); } ret = CSS_PIERCE_NAT; } @@ -3741,13 +3744,13 @@ void call_destroy(call_t *c) { FMT_M(addr, ps->endpoint.port), (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", FMT_M(ps->ssrc_in[0] ? ps->ssrc_in[0]->parent->h.ssrc : 0), - atomic64_get(&ps->stats_in.packets), - atomic64_get(&ps->stats_in.bytes), - atomic64_get(&ps->stats_in.errors), + atomic64_get_na(&ps->stats_in->packets), + atomic64_get_na(&ps->stats_in->bytes), + atomic64_get_na(&ps->stats_in->errors), rtpe_now.tv_sec - atomic64_get(&ps->last_packet), - atomic64_get(&ps->stats_out.packets), - atomic64_get(&ps->stats_out.bytes), - atomic64_get(&ps->stats_out.errors)); + atomic64_get_na(&ps->stats_out->packets), + atomic64_get_na(&ps->stats_out->bytes), + atomic64_get_na(&ps->stats_out->errors)); } } @@ -3930,6 +3933,8 @@ static void __call_free(void *p) { ssrc_ctx_put(&ps->ssrc_in[u]); for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) ssrc_ctx_put(&ps->ssrc_out[u]); + bufferpool_unref(ps->stats_in); + bufferpool_unref(ps->stats_out); g_slice_free1(sizeof(*ps), ps); } diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 26d81ea31..6728348f9 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -2333,9 +2333,9 @@ err: } static void ng_stats(bencode_item_t *d, const struct stream_stats *s, struct stream_stats *totals) { - bencode_dictionary_add_integer(d, "packets", atomic64_get(&s->packets)); - bencode_dictionary_add_integer(d, "bytes", atomic64_get(&s->bytes)); - bencode_dictionary_add_integer(d, "errors", atomic64_get(&s->errors)); + bencode_dictionary_add_integer(d, "packets", atomic64_get_na(&s->packets)); + bencode_dictionary_add_integer(d, "bytes", atomic64_get_na(&s->bytes)); + bencode_dictionary_add_integer(d, "errors", atomic64_get_na(&s->errors)); if (!totals) return; atomic64_add_na(&totals->packets, atomic64_get(&s->packets)); @@ -2423,8 +2423,8 @@ stats: s = &totals->totals[0]; if (!PS_ISSET(ps, RTP)) s = &totals->totals[1]; - ng_stats(bencode_dictionary_add_dictionary(dict, "stats"), &ps->stats_in, s); - ng_stats(bencode_dictionary_add_dictionary(dict, "stats_out"), &ps->stats_out, NULL); + ng_stats(bencode_dictionary_add_dictionary(dict, "stats"), ps->stats_in, s); + ng_stats(bencode_dictionary_add_dictionary(dict, "stats_out"), ps->stats_out, NULL); } #define BF_M(k, f) if (MEDIA_ISSET(m, f)) bencode_list_add_string(flags, k) diff --git a/daemon/cdr.c b/daemon/cdr.c index 0ee2f959d..7aff36962 100644 --- a/daemon/cdr.c +++ b/daemon/cdr.c @@ -142,11 +142,11 @@ void cdr_update_entry(call_t * c) { cdrlinecnt, md->index, protocol, local_addr, cdrlinecnt, md->index, protocol, ps->last_local_endpoint.port, cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats_in.packets), + atomic64_get_na(&ps->stats_in->packets), cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats_in.bytes), + atomic64_get_na(&ps->stats_in->bytes), cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats_in.errors), + atomic64_get_na(&ps->stats_in->errors), cdrlinecnt, md->index, protocol, atomic64_get(&ps->last_packet), cdrlinecnt, md->index, protocol, @@ -167,11 +167,11 @@ void cdr_update_entry(call_t * c) { cdrlinecnt, md->index, protocol, local_addr, cdrlinecnt, md->index, protocol, ps->last_local_endpoint.port, cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats_in.packets), + atomic64_get_na(&ps->stats_in->packets), cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats_in.bytes), + atomic64_get_na(&ps->stats_in->bytes), cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats_in.errors), + atomic64_get_na(&ps->stats_in->errors), cdrlinecnt, md->index, protocol, atomic64_get(&ps->last_packet), cdrlinecnt, md->index, protocol, diff --git a/daemon/cli.c b/daemon/cli.c index 0dac17fbe..5145232b4 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -714,9 +714,10 @@ static void cli_list_tag_info(struct cli_writer *cw, struct call_monologue *ml) ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", ps->ssrc_in[0] ? ps->ssrc_in[0]->parent->h.ssrc : 0, - atomic64_get(&ps->stats_in.packets), - atomic64_get(&ps->stats_in.bytes), atomic64_get(&ps->stats_in.errors), - atomic64_get(&ps->last_packet)); + atomic64_get_na(&ps->stats_in->packets), + atomic64_get_na(&ps->stats_in->bytes), + atomic64_get_na(&ps->stats_in->errors), + atomic64_get_na(&ps->last_packet)); cw->cw_printf(cw, "\n"); } } diff --git a/daemon/dtls.c b/daemon/dtls.c index 070d46b88..d6447beb6 100644 --- a/daemon/dtls.c +++ b/daemon/dtls.c @@ -884,8 +884,8 @@ int dtls(stream_fd *sfd, const str *s, const endpoint_t *fsin) { if (fsin) { ilogs(srtp, LOG_DEBUG, "Sending DTLS packet"); socket_sendto(&sfd->socket, buf, ret, fsin); - atomic64_inc(&ps->stats_out.packets); - atomic64_add(&ps->stats_out.bytes, ret); + atomic64_inc_na(&ps->stats_out->packets); + atomic64_add_na(&ps->stats_out->bytes, ret); } } diff --git a/daemon/media_player.c b/daemon/media_player.c index a1a0b8bdd..4444f0745 100644 --- a/daemon/media_player.c +++ b/daemon/media_player.c @@ -303,8 +303,8 @@ static bool __send_timer_send_1(struct rtp_header *rh, struct packet_stream *sin dump_packet(&mp, cp->plain.s ? &cp->plain : &cp->s); } - atomic64_inc(&sink->stats_out.packets); - atomic64_add(&sink->stats_out.bytes, cp->s.len); + atomic64_inc_na(&sink->stats_out->packets); + atomic64_add_na(&sink->stats_out->bytes, cp->s.len); atomic64_inc_na(&sink_fd->local_intf->stats->out.packets); atomic64_add_na(&sink_fd->local_intf->stats->out.bytes, cp->s.len); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index cde5ec55d..351754d34 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -52,7 +52,7 @@ diff_ ## x ## _ ## io = 0; \ else \ diff_ ## x ## _ ## io = (ke)->x - ks_val; \ - atomic64_add(&ps->stats_ ## io.x, diff_ ## x ## _ ## io); \ + atomic64_add_na(&ps->stats_ ## io->x, diff_ ## x ## _ ## io); \ RTPE_STATS_ADD(x ## _kernel, diff_ ## x ## _ ## io); \ } while (0) @@ -2280,7 +2280,7 @@ static void media_packet_rtp_in(struct packet_handler_ctx *phc) "RTP packet with unknown payload type %u received from %s%s%s", phc->payload_type, FMT_M(endpoint_print_buf(&phc->mp.fsin))); - atomic64_inc(&phc->mp.stream->stats_in.errors); + atomic64_inc_na(&phc->mp.stream->stats_in->errors); atomic64_inc_na(&phc->mp.sfd->local_intf->stats->in.errors); RTPE_STATS_INC(errors_user); } @@ -2491,7 +2491,7 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc) FMT_M(sockaddr_print_buf(&endpoint.address), endpoint.port), FMT_M(sockaddr_print_buf(&ps_endpoint->address), ps_endpoint->port)); - atomic64_inc(&phc->mp.stream->stats_in.errors); + atomic64_inc_na(&phc->mp.stream->stats_in->errors); atomic64_inc_na(&phc->mp.sfd->local_intf->stats->in.errors); ret = true; } @@ -2889,14 +2889,14 @@ static int stream_packet(struct packet_handler_ctx *phc) { phc->mp.raw = phc->s; - if (atomic64_inc(&phc->mp.stream->stats_in.packets) == 0) { + if (atomic64_inc_na(&phc->mp.stream->stats_in->packets) == 0) { if (phc->mp.stream->component == 1) { if (phc->mp.media->index == 1) janus_rtc_up(phc->mp.media->monologue); janus_media_up(phc->mp.media); } } - atomic64_add(&phc->mp.stream->stats_in.bytes, phc->s.len); + atomic64_add_na(&phc->mp.stream->stats_in->bytes, phc->s.len); atomic64_inc_na(&phc->mp.sfd->local_intf->stats->in.packets); atomic64_add_na(&phc->mp.sfd->local_intf->stats->in.bytes, phc->s.len); atomic64_set(&phc->mp.stream->last_packet, rtpe_now.tv_sec); @@ -3070,7 +3070,7 @@ next_mirror: err_next: ilog(LOG_DEBUG | LOG_FLAG_LIMIT ,"Error when sending message. Error: %s", strerror(errno)); - atomic64_inc(&sink->stats_in.errors); + atomic64_inc_na(&sink->stats_in->errors); if (sink->selected_sfd) atomic64_inc_na(&sink->selected_sfd->local_intf->stats->out.errors); RTPE_STATS_INC(errors_user); @@ -3122,7 +3122,7 @@ out: } if (handler_ret < 0) { - atomic64_inc(&phc->mp.stream->stats_in.errors); + atomic64_inc_na(&phc->mp.stream->stats_in->errors); atomic64_inc_na(&phc->mp.sfd->local_intf->stats->in.errors); RTPE_STATS_INC(errors_user); } diff --git a/daemon/mqtt.c b/daemon/mqtt.c index 50d34feab..c1e4ca00f 100644 --- a/daemon/mqtt.c +++ b/daemon/mqtt.c @@ -329,11 +329,11 @@ static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct cal static void mqtt_stream_stats_dir(const struct stream_stats *s, JsonBuilder *json) { json_builder_set_member_name(json, "bytes"); - json_builder_add_int_value(json, atomic64_get(&s->bytes)); + json_builder_add_int_value(json, atomic64_get_na(&s->bytes)); json_builder_set_member_name(json, "packets"); - json_builder_add_int_value(json, atomic64_get(&s->packets)); + json_builder_add_int_value(json, atomic64_get_na(&s->packets)); json_builder_set_member_name(json, "errors"); - json_builder_add_int_value(json, atomic64_get(&s->errors)); + json_builder_add_int_value(json, atomic64_get_na(&s->errors)); } @@ -351,7 +351,7 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { json_builder_set_member_name(json, "ingress"); json_builder_begin_object(json); - mqtt_stream_stats_dir(&ps->stats_in, json); + mqtt_stream_stats_dir(ps->stats_in, json); json_builder_set_member_name(json, "SSRC"); json_builder_begin_array(json); @@ -372,7 +372,7 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { json_builder_set_member_name(json, "egress"); json_builder_begin_object(json); - mqtt_stream_stats_dir(&ps->stats_out, json); + mqtt_stream_stats_dir(ps->stats_out, json); json_builder_set_member_name(json, "SSRC"); json_builder_begin_array(json); diff --git a/daemon/redis.c b/daemon/redis.c index 3558bf3cc..ae0e2a1cf 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1467,7 +1467,7 @@ static int redis_streams(call_t *c, struct redis_list *streams) { return -1; if (redis_hash_get_endpoint(&ps->advertised_endpoint, rh, "advertised_endpoint")) return -1; - if (redis_hash_get_stats(&ps->stats_in, rh, "stats")) + if (redis_hash_get_stats(ps->stats_in, rh, "stats")) return -1; if (redis_hash_get_sdes_params1(&ps->crypto.params, rh, "") == -1) return -1; @@ -2431,9 +2431,9 @@ char* redis_encode_json(call_t *c) { JSON_SET_SIMPLE("component","%u",ps->component); JSON_SET_SIMPLE_CSTR("endpoint",endpoint_print_buf(&ps->endpoint)); JSON_SET_SIMPLE_CSTR("advertised_endpoint",endpoint_print_buf(&ps->advertised_endpoint)); - JSON_SET_SIMPLE("stats-packets","%" PRIu64, atomic64_get(&ps->stats_in.packets)); - JSON_SET_SIMPLE("stats-bytes","%" PRIu64, atomic64_get(&ps->stats_in.bytes)); - JSON_SET_SIMPLE("stats-errors","%" PRIu64, atomic64_get(&ps->stats_in.errors)); + JSON_SET_SIMPLE("stats-packets","%" PRIu64, atomic64_get_na(&ps->stats_in->packets)); + JSON_SET_SIMPLE("stats-bytes","%" PRIu64, atomic64_get_na(&ps->stats_in->bytes)); + JSON_SET_SIMPLE("stats-errors","%" PRIu64, atomic64_get_na(&ps->stats_in->errors)); json_update_crypto_params(builder, "", &ps->crypto.params); } diff --git a/daemon/statistics.c b/daemon/statistics.c index 105e7d257..698176caf 100644 --- a/daemon/statistics.c +++ b/daemon/statistics.c @@ -153,8 +153,8 @@ found:; if (!ps2) continue; - if (atomic64_get(&ps2->stats_in.packets)==0) { - if (atomic64_get(&ps->stats_in.packets)!=0) + if (atomic64_get_na(&ps2->stats_in->packets)==0) { + if (atomic64_get_na(&ps->stats_in->packets)!=0) RTPE_STATS_INC(oneway_stream_sess); else total_nopacket_relayed_sess++; diff --git a/include/call.h b/include/call.h index af2376a6a..58f0e9e41 100644 --- a/include/call.h +++ b/include/call.h @@ -465,8 +465,8 @@ struct packet_stream { struct jitter_buffer *jb; /* RO */ time_t kernel_time; - struct stream_stats stats_in; - struct stream_stats stats_out; + struct stream_stats *stats_in; + struct stream_stats *stats_out; struct stream_stats kernel_stats_in; struct stream_stats kernel_stats_out; unsigned char in_tos_tclass;