Browse Source

MT#55283 use allocated memory for stream stats

Allocate memory from bufferpool for per-stream stats. No functional
change, but it allows sharing these between kernel and user space.

Change-Id: I370a49e1d94bb91c7fd0a2bc7d00ba65f99c4f6a
pull/1826/head
Richard Fuchs 2 years ago
parent
commit
8e3e9fdb5a
11 changed files with 52 additions and 46 deletions
  1. +13
    -8
      daemon/call.c
  2. +5
    -5
      daemon/call_interfaces.c
  3. +6
    -6
      daemon/cdr.c
  4. +4
    -3
      daemon/cli.c
  5. +2
    -2
      daemon/dtls.c
  6. +2
    -2
      daemon/media_player.c
  7. +7
    -7
      daemon/media_socket.c
  8. +5
    -5
      daemon/mqtt.c
  9. +4
    -4
      daemon/redis.c
  10. +2
    -2
      daemon/statistics.c
  11. +2
    -2
      include/call.h

+ 13
- 8
daemon/call.c View File

@ -49,6 +49,7 @@
#include "dtmf.h"
#include "audio_player.h"
#include "sdp.h"
#include "bufferpool.h"
#include "xt_RTPENGINE.h"
@ -901,6 +902,8 @@ struct packet_stream *__packet_stream_new(call_t *call) {
stream->rtp_stats = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __rtp_stats_free);
recording_init_stream(stream);
stream->send_timer = send_timer_new(stream);
stream->stats_in = bufferpool_alloc0(shm_bufferpool, sizeof(*stream->stats_in));
stream->stats_out = bufferpool_alloc0(shm_bufferpool, sizeof(*stream->stats_out));
if (rtpe_config.jb_length && !CALL_ISSET(call, DISABLE_JB))
stream->jb = jitter_buffer_new(call);
@ -1063,8 +1066,8 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) {
if (sfd->socket.fd == -1 || ps->endpoint.address.family == NULL)
continue;
socket_sendto(&sfd->socket, fake_rtp.s, fake_rtp.len, &ps->endpoint);
atomic64_inc(&ps->stats_out.packets);
atomic64_add(&ps->stats_out.bytes, fake_rtp.len);
atomic64_inc_na(&ps->stats_out->packets);
atomic64_add_na(&ps->stats_out->bytes, fake_rtp.len);
}
ret = CSS_PIERCE_NAT;
}
@ -3741,13 +3744,13 @@ void call_destroy(call_t *c) {
FMT_M(addr, ps->endpoint.port),
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
FMT_M(ps->ssrc_in[0] ? ps->ssrc_in[0]->parent->h.ssrc : 0),
atomic64_get(&ps->stats_in.packets),
atomic64_get(&ps->stats_in.bytes),
atomic64_get(&ps->stats_in.errors),
atomic64_get_na(&ps->stats_in->packets),
atomic64_get_na(&ps->stats_in->bytes),
atomic64_get_na(&ps->stats_in->errors),
rtpe_now.tv_sec - atomic64_get(&ps->last_packet),
atomic64_get(&ps->stats_out.packets),
atomic64_get(&ps->stats_out.bytes),
atomic64_get(&ps->stats_out.errors));
atomic64_get_na(&ps->stats_out->packets),
atomic64_get_na(&ps->stats_out->bytes),
atomic64_get_na(&ps->stats_out->errors));
}
}
@ -3930,6 +3933,8 @@ static void __call_free(void *p) {
ssrc_ctx_put(&ps->ssrc_in[u]);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++)
ssrc_ctx_put(&ps->ssrc_out[u]);
bufferpool_unref(ps->stats_in);
bufferpool_unref(ps->stats_out);
g_slice_free1(sizeof(*ps), ps);
}


+ 5
- 5
daemon/call_interfaces.c View File

@ -2333,9 +2333,9 @@ err:
}
static void ng_stats(bencode_item_t *d, const struct stream_stats *s, struct stream_stats *totals) {
bencode_dictionary_add_integer(d, "packets", atomic64_get(&s->packets));
bencode_dictionary_add_integer(d, "bytes", atomic64_get(&s->bytes));
bencode_dictionary_add_integer(d, "errors", atomic64_get(&s->errors));
bencode_dictionary_add_integer(d, "packets", atomic64_get_na(&s->packets));
bencode_dictionary_add_integer(d, "bytes", atomic64_get_na(&s->bytes));
bencode_dictionary_add_integer(d, "errors", atomic64_get_na(&s->errors));
if (!totals)
return;
atomic64_add_na(&totals->packets, atomic64_get(&s->packets));
@ -2423,8 +2423,8 @@ stats:
s = &totals->totals[0];
if (!PS_ISSET(ps, RTP))
s = &totals->totals[1];
ng_stats(bencode_dictionary_add_dictionary(dict, "stats"), &ps->stats_in, s);
ng_stats(bencode_dictionary_add_dictionary(dict, "stats_out"), &ps->stats_out, NULL);
ng_stats(bencode_dictionary_add_dictionary(dict, "stats"), ps->stats_in, s);
ng_stats(bencode_dictionary_add_dictionary(dict, "stats_out"), ps->stats_out, NULL);
}
#define BF_M(k, f) if (MEDIA_ISSET(m, f)) bencode_list_add_string(flags, k)


+ 6
- 6
daemon/cdr.c View File

@ -142,11 +142,11 @@ void cdr_update_entry(call_t * c) {
cdrlinecnt, md->index, protocol, local_addr,
cdrlinecnt, md->index, protocol, ps->last_local_endpoint.port,
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats_in.packets),
atomic64_get_na(&ps->stats_in->packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats_in.bytes),
atomic64_get_na(&ps->stats_in->bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats_in.errors),
atomic64_get_na(&ps->stats_in->errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
cdrlinecnt, md->index, protocol,
@ -167,11 +167,11 @@ void cdr_update_entry(call_t * c) {
cdrlinecnt, md->index, protocol, local_addr,
cdrlinecnt, md->index, protocol, ps->last_local_endpoint.port,
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats_in.packets),
atomic64_get_na(&ps->stats_in->packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats_in.bytes),
atomic64_get_na(&ps->stats_in->bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats_in.errors),
atomic64_get_na(&ps->stats_in->errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
cdrlinecnt, md->index, protocol,


+ 4
- 3
daemon/cli.c View File

@ -714,9 +714,10 @@ static void cli_list_tag_info(struct cli_writer *cw, struct call_monologue *ml)
ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
ps->ssrc_in[0] ? ps->ssrc_in[0]->parent->h.ssrc : 0,
atomic64_get(&ps->stats_in.packets),
atomic64_get(&ps->stats_in.bytes), atomic64_get(&ps->stats_in.errors),
atomic64_get(&ps->last_packet));
atomic64_get_na(&ps->stats_in->packets),
atomic64_get_na(&ps->stats_in->bytes),
atomic64_get_na(&ps->stats_in->errors),
atomic64_get_na(&ps->last_packet));
cw->cw_printf(cw, "\n");
}
}


+ 2
- 2
daemon/dtls.c View File

@ -884,8 +884,8 @@ int dtls(stream_fd *sfd, const str *s, const endpoint_t *fsin) {
if (fsin) {
ilogs(srtp, LOG_DEBUG, "Sending DTLS packet");
socket_sendto(&sfd->socket, buf, ret, fsin);
atomic64_inc(&ps->stats_out.packets);
atomic64_add(&ps->stats_out.bytes, ret);
atomic64_inc_na(&ps->stats_out->packets);
atomic64_add_na(&ps->stats_out->bytes, ret);
}
}


+ 2
- 2
daemon/media_player.c View File

@ -303,8 +303,8 @@ static bool __send_timer_send_1(struct rtp_header *rh, struct packet_stream *sin
dump_packet(&mp, cp->plain.s ? &cp->plain : &cp->s);
}
atomic64_inc(&sink->stats_out.packets);
atomic64_add(&sink->stats_out.bytes, cp->s.len);
atomic64_inc_na(&sink->stats_out->packets);
atomic64_add_na(&sink->stats_out->bytes, cp->s.len);
atomic64_inc_na(&sink_fd->local_intf->stats->out.packets);
atomic64_add_na(&sink_fd->local_intf->stats->out.bytes, cp->s.len);


+ 7
- 7
daemon/media_socket.c View File

@ -52,7 +52,7 @@
diff_ ## x ## _ ## io = 0; \
else \
diff_ ## x ## _ ## io = (ke)->x - ks_val; \
atomic64_add(&ps->stats_ ## io.x, diff_ ## x ## _ ## io); \
atomic64_add_na(&ps->stats_ ## io->x, diff_ ## x ## _ ## io); \
RTPE_STATS_ADD(x ## _kernel, diff_ ## x ## _ ## io); \
} while (0)
@ -2280,7 +2280,7 @@ static void media_packet_rtp_in(struct packet_handler_ctx *phc)
"RTP packet with unknown payload type %u received from %s%s%s",
phc->payload_type,
FMT_M(endpoint_print_buf(&phc->mp.fsin)));
atomic64_inc(&phc->mp.stream->stats_in.errors);
atomic64_inc_na(&phc->mp.stream->stats_in->errors);
atomic64_inc_na(&phc->mp.sfd->local_intf->stats->in.errors);
RTPE_STATS_INC(errors_user);
}
@ -2491,7 +2491,7 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
FMT_M(sockaddr_print_buf(&endpoint.address), endpoint.port),
FMT_M(sockaddr_print_buf(&ps_endpoint->address),
ps_endpoint->port));
atomic64_inc(&phc->mp.stream->stats_in.errors);
atomic64_inc_na(&phc->mp.stream->stats_in->errors);
atomic64_inc_na(&phc->mp.sfd->local_intf->stats->in.errors);
ret = true;
}
@ -2889,14 +2889,14 @@ static int stream_packet(struct packet_handler_ctx *phc) {
phc->mp.raw = phc->s;
if (atomic64_inc(&phc->mp.stream->stats_in.packets) == 0) {
if (atomic64_inc_na(&phc->mp.stream->stats_in->packets) == 0) {
if (phc->mp.stream->component == 1) {
if (phc->mp.media->index == 1)
janus_rtc_up(phc->mp.media->monologue);
janus_media_up(phc->mp.media);
}
}
atomic64_add(&phc->mp.stream->stats_in.bytes, phc->s.len);
atomic64_add_na(&phc->mp.stream->stats_in->bytes, phc->s.len);
atomic64_inc_na(&phc->mp.sfd->local_intf->stats->in.packets);
atomic64_add_na(&phc->mp.sfd->local_intf->stats->in.bytes, phc->s.len);
atomic64_set(&phc->mp.stream->last_packet, rtpe_now.tv_sec);
@ -3070,7 +3070,7 @@ next_mirror:
err_next:
ilog(LOG_DEBUG | LOG_FLAG_LIMIT ,"Error when sending message. Error: %s", strerror(errno));
atomic64_inc(&sink->stats_in.errors);
atomic64_inc_na(&sink->stats_in->errors);
if (sink->selected_sfd)
atomic64_inc_na(&sink->selected_sfd->local_intf->stats->out.errors);
RTPE_STATS_INC(errors_user);
@ -3122,7 +3122,7 @@ out:
}
if (handler_ret < 0) {
atomic64_inc(&phc->mp.stream->stats_in.errors);
atomic64_inc_na(&phc->mp.stream->stats_in->errors);
atomic64_inc_na(&phc->mp.sfd->local_intf->stats->in.errors);
RTPE_STATS_INC(errors_user);
}


+ 5
- 5
daemon/mqtt.c View File

@ -329,11 +329,11 @@ static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct cal
static void mqtt_stream_stats_dir(const struct stream_stats *s, JsonBuilder *json) {
json_builder_set_member_name(json, "bytes");
json_builder_add_int_value(json, atomic64_get(&s->bytes));
json_builder_add_int_value(json, atomic64_get_na(&s->bytes));
json_builder_set_member_name(json, "packets");
json_builder_add_int_value(json, atomic64_get(&s->packets));
json_builder_add_int_value(json, atomic64_get_na(&s->packets));
json_builder_set_member_name(json, "errors");
json_builder_add_int_value(json, atomic64_get(&s->errors));
json_builder_add_int_value(json, atomic64_get_na(&s->errors));
}
@ -351,7 +351,7 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_set_member_name(json, "ingress");
json_builder_begin_object(json);
mqtt_stream_stats_dir(&ps->stats_in, json);
mqtt_stream_stats_dir(ps->stats_in, json);
json_builder_set_member_name(json, "SSRC");
json_builder_begin_array(json);
@ -372,7 +372,7 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_set_member_name(json, "egress");
json_builder_begin_object(json);
mqtt_stream_stats_dir(&ps->stats_out, json);
mqtt_stream_stats_dir(ps->stats_out, json);
json_builder_set_member_name(json, "SSRC");
json_builder_begin_array(json);


+ 4
- 4
daemon/redis.c View File

@ -1467,7 +1467,7 @@ static int redis_streams(call_t *c, struct redis_list *streams) {
return -1;
if (redis_hash_get_endpoint(&ps->advertised_endpoint, rh, "advertised_endpoint"))
return -1;
if (redis_hash_get_stats(&ps->stats_in, rh, "stats"))
if (redis_hash_get_stats(ps->stats_in, rh, "stats"))
return -1;
if (redis_hash_get_sdes_params1(&ps->crypto.params, rh, "") == -1)
return -1;
@ -2431,9 +2431,9 @@ char* redis_encode_json(call_t *c) {
JSON_SET_SIMPLE("component","%u",ps->component);
JSON_SET_SIMPLE_CSTR("endpoint",endpoint_print_buf(&ps->endpoint));
JSON_SET_SIMPLE_CSTR("advertised_endpoint",endpoint_print_buf(&ps->advertised_endpoint));
JSON_SET_SIMPLE("stats-packets","%" PRIu64, atomic64_get(&ps->stats_in.packets));
JSON_SET_SIMPLE("stats-bytes","%" PRIu64, atomic64_get(&ps->stats_in.bytes));
JSON_SET_SIMPLE("stats-errors","%" PRIu64, atomic64_get(&ps->stats_in.errors));
JSON_SET_SIMPLE("stats-packets","%" PRIu64, atomic64_get_na(&ps->stats_in->packets));
JSON_SET_SIMPLE("stats-bytes","%" PRIu64, atomic64_get_na(&ps->stats_in->bytes));
JSON_SET_SIMPLE("stats-errors","%" PRIu64, atomic64_get_na(&ps->stats_in->errors));
json_update_crypto_params(builder, "", &ps->crypto.params);
}


+ 2
- 2
daemon/statistics.c View File

@ -153,8 +153,8 @@ found:;
if (!ps2)
continue;
if (atomic64_get(&ps2->stats_in.packets)==0) {
if (atomic64_get(&ps->stats_in.packets)!=0)
if (atomic64_get_na(&ps2->stats_in->packets)==0) {
if (atomic64_get_na(&ps->stats_in->packets)!=0)
RTPE_STATS_INC(oneway_stream_sess);
else
total_nopacket_relayed_sess++;


+ 2
- 2
include/call.h View File

@ -465,8 +465,8 @@ struct packet_stream {
struct jitter_buffer *jb; /* RO */
time_t kernel_time;
struct stream_stats stats_in;
struct stream_stats stats_out;
struct stream_stats *stats_in;
struct stream_stats *stats_out;
struct stream_stats kernel_stats_in;
struct stream_stats kernel_stats_out;
unsigned char in_tos_tclass;


Loading…
Cancel
Save