From 523007a7cb7575d478c42d48a1dd4988fd348f97 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Thu, 17 Jul 2025 14:38:24 -0400 Subject: [PATCH] MT#55283 locking updates Protect selected_sfd with in_lock. Protect RTCP sending with in_lock and out_lock as appropriate. Has the odd side effect of RTCP reports expected in tests to be sent one packet later than before. Closes #1966 Probably fixes #1927 Change-Id: I225b43dff8e8fbb938d3be6aad50249997615d77 (cherry picked from commit ffc539c0d81d497d46c5f73a800c17e1f06c0e4e) --- daemon/call.c | 13 ++++++++++--- daemon/ice.c | 1 + daemon/media_player.c | 32 ++++++++++++++++++++++---------- daemon/media_socket.c | 2 ++ daemon/rtcp.c | 5 +++++ daemon/ssrc.c | 4 +++- daemon/t38.c | 12 ++++++++---- include/call.h | 2 +- t/auto-daemon-tests.pl | 10 ++++++---- 9 files changed, 58 insertions(+), 23 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index b2d9a3bae..6d646a3d7 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -130,7 +130,6 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) { bool do_update = false; bool has_srtp = false; struct packet_stream *ps; - stream_fd *sfd; int tmp_t_reason = UNKNOWN; enum call_stream_state css; int64_t timestamp; @@ -185,8 +184,16 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) { timestamp = packet_stream_last_packet(ps); if (!ps->media) - goto next; - sfd = ps->selected_sfd; + continue; + + g_autoptr(stream_fd) sfd = NULL; + + { + LOCK(&ps->in_lock); + if (ps->selected_sfd) + sfd = obj_get(ps->selected_sfd); + } + if (!sfd) goto no_sfd; diff --git a/daemon/ice.c b/daemon/ice.c index 7bdf39c08..125780e13 100644 --- a/daemon/ice.c +++ b/daemon/ice.c @@ -1197,6 +1197,7 @@ static int __check_valid(struct ice_agent *ag) { FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint))); mutex_unlock(&ps->out_lock); + LOCK(&ps->in_lock); for (__auto_type m = ps->sfds.head; m; m = m->next) { sfd = m->data; if (sfd->local_intf != pair->local_intf) diff --git a/daemon/media_player.c b/daemon/media_player.c index 476cff696..e3160be2b 100644 --- a/daemon/media_player.c +++ b/daemon/media_player.c @@ -305,6 +305,7 @@ struct send_timer *send_timer_new(struct packet_stream *ps) { } // call is locked in R +// ssrc_out is locked static void send_timer_rtcp(struct send_timer *st, struct ssrc_entry_call *ssrc_out) { struct call_media *media = st->sink ? st->sink->media : NULL; if (!media) @@ -402,26 +403,33 @@ static void __send_timer_send_common(struct send_timer *st, struct codec_packet payload_tracker_add(&cp->ssrc_out->tracker, cp->rtp->m_pt & 0x7f); } - // do we send RTCP? - struct ssrc_entry_call *ssrc_out = cp->ssrc_out; - if (ssrc_out && ssrc_out->next_rtcp) { - mutex_lock(&ssrc_out->h.lock); - int64_t diff = ssrc_out->next_rtcp - rtpe_now; - mutex_unlock(&ssrc_out->h.lock); - if (diff < 0) - send_timer_rtcp(st, ssrc_out); - } - out: codec_packet_free(cp); log_info_pop(); } +static void __send_timer_rtcp(struct send_timer *st, struct ssrc_entry_call *ssrc_out) { + // do we send RTCP? + if (!ssrc_out) + return; + if (!ssrc_out->next_rtcp) + return; + + LOCK(&ssrc_out->h.lock); + int64_t diff = ssrc_out->next_rtcp - rtpe_now; + if (diff < 0) + send_timer_rtcp(st, ssrc_out); +} + static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp) { call_t *call = st->call; if (!call) return; + struct ssrc_entry_call *ssrc_out = cp->ssrc_out; + if (ssrc_out) + ssrc_entry_hold(ssrc_out); + log_info_call(call); rwlock_lock_r(&call->master_lock); mutex_lock(&st->sink->out_lock); @@ -429,6 +437,10 @@ static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp) __send_timer_send_common(st, cp); mutex_unlock(&st->sink->out_lock); + + __send_timer_rtcp(st, ssrc_out); + ssrc_entry_release(ssrc_out); + rwlock_unlock_r(&call->master_lock); log_info_pop(); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 1beff5541..1581cfca7 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -3021,8 +3021,10 @@ next_mirror: err_next: ilog(LOG_DEBUG | LOG_FLAG_LIMIT ,"Error when sending message. Error: %s", strerror(errno)); atomic64_inc_na(&sink->stats_in->errors); + mutex_lock(&sink->in_lock); if (sink->selected_sfd) atomic64_inc_na(&sink->selected_sfd->local_intf->stats->out.errors); + mutex_unlock(&sink->in_lock); RTPE_STATS_INC(errors_user); goto next; diff --git a/daemon/rtcp.c b/daemon/rtcp.c index 7a873310e..4479de83a 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -1550,6 +1550,7 @@ static void rtcp_receiver_reports(ssrc_q *out, struct ssrc_hash *hash) { // call must be locked in R +// no in_lock or out_lock must be held void rtcp_send_report(struct call_media *media, struct ssrc_entry_call *ssrc_out) { // figure out where to send it struct packet_stream *ps = media->streams.head->data; @@ -1565,6 +1566,8 @@ void rtcp_send_report(struct call_media *media, struct ssrc_entry_call *ssrc_out rtcp_ps = ps; } + LOCK(&ps->in_lock); + if (!ps->selected_sfd || !rtcp_ps->selected_sfd) return; if (ps->selected_sfd->socket.fd == -1 || ps->endpoint.address.family == NULL) @@ -1592,6 +1595,8 @@ void rtcp_send_report(struct call_media *media, struct ssrc_entry_call *ssrc_out str rtcp_packet = STR_GS(sr); + LOCK(&ps->out_lock); + const struct streamhandler *crypt_handler = determine_handler(&transport_protocols[PROTO_RTP_AVP], media, true); diff --git a/daemon/ssrc.c b/daemon/ssrc.c index f3e1d1134..2076c6e72 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -720,7 +720,9 @@ void ssrc_collect_metrics(struct call_media *media) { s->jitter = s->jitter * 1000 / rpt->clock_rate; } - if (media->streams.head) + if (media->streams.head) { + LOCK(&media->streams.head->data->in_lock); RTPE_SAMPLE_SFD(jitter_measured, s->jitter, media->streams.head->data->selected_sfd); + } } } diff --git a/daemon/t38.c b/daemon/t38.c index 24b136520..fe96c2028 100644 --- a/daemon/t38.c +++ b/daemon/t38.c @@ -215,11 +215,13 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui struct packet_stream *ps = NULL; if (tg->t38_media && tg->t38_media->streams.head) ps = tg->t38_media->streams.head->data; - if (ps) - mutex_lock(&ps->out_lock); + stream_fd *sfd = NULL; - if (ps) + if (ps) { + mutex_lock(&ps->in_lock); + mutex_lock(&ps->out_lock); sfd = ps->selected_sfd; + } if (sfd && sfd->socket.fd != -1 && ps->endpoint.address.family != NULL) { for (int i = 0; i < count; i++) { ilog(LOG_DEBUG, "Sending %u UDPTL bytes", (unsigned int) s->len); @@ -229,8 +231,10 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui else ilog(LOG_WARN | LOG_FLAG_LIMIT, "Unable to send T.38 UDPTL packet due to lack of " "socket or stream"); - if (ps) + if (ps) { mutex_unlock(&ps->out_lock); + mutex_unlock(&ps->in_lock); + } g_string_free(s, TRUE); diff --git a/include/call.h b/include/call.h index 19d0b49d6..a2a9a3c41 100644 --- a/include/call.h +++ b/include/call.h @@ -428,7 +428,7 @@ struct packet_stream { struct recording_stream recording; /* LOCK: call->master_lock */ stream_fd_q sfds; /* LOCK: call->master_lock */ - stream_fd * selected_sfd; + stream_fd *selected_sfd; // LOCK: in_lock endpoint_t last_local_endpoint; struct dtls_connection ice_dtls; /* LOCK: in_lock */ sink_handler_q rtp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */ diff --git a/t/auto-daemon-tests.pl b/t/auto-daemon-tests.pl index 35b7251f3..9c911505e 100755 --- a/t/auto-daemon-tests.pl +++ b/t/auto-daemon-tests.pl @@ -17824,10 +17824,11 @@ $resp = rtpe_req('play media', 'media player', { 'from-tag' => ft(), blob => $wa is $resp->{duration}, 100, 'media duration'; ($seq, $ts, $ssrc) = rcv($sock_a, $port_b, rtpm(8 | 0x80, -1, -1, -1, $pcma_1)); +rcv($sock_a, $port_b, rtpm(8, $seq + 1, $ts + 160, $ssrc, $pcma_2)); # SR LEN SSRC NTP1 NTP2 RTP PACKETS OCTETS SSRC LOST SEQ JITTER LAST SR DLSR CNAME -@ret1 = rcv($sock_ax, $port_bx, qr/^\x81\xc8\x00\x0c(.{4})(.{4})(.{4})(.{4})\x00\x00\x00\x01\x00\x00\x00\xac\x00\x00\x12\x34\x00\x00\x00\x00\x00\x00\x03\xe8\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x81\xca\x00\x05(.{4})\x01\x0c([0-9a-f]{12})\x00\x00$/s); +@ret1 = rcv($sock_ax, $port_bx, qr/^\x81\xc8\x00\x0c(.{4})(.{4})(.{4})(.{4})\x00\x00\x00\x02\x00\x00\x01\x58\x00\x00\x12\x34\x00\x00\x00\x00\x00\x00\x03\xe8\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x81\xca\x00\x05(.{4})\x01\x0c([0-9a-f]{12})\x00\x00$/s); is $ret1[0], $ssrc, 'SSRC matches'; -is $ret1[3], $ts, 'TS matches'; +is $ret1[3], $ts + 160, 'TS matches'; is $ret1[4], $ssrc, 'SSRC matches'; rtpe_req('delete', "delete", { 'from-tag' => ft() }); @@ -17899,10 +17900,11 @@ $resp = rtpe_req('play media', 'media player', { 'from-tag' => ft(), blob => $wa is $resp->{duration}, 100, 'media duration'; ($seq, $ts, $ssrc) = rcv($sock_a, $port_b, rtpm(8 | 0x80, -1, -1, -1, $pcma_1)); +rcv($sock_a, $port_b, rtpm(8, $seq + 1, $ts + 160, $ssrc, $pcma_2)); # SR LEN SSRC NTP1 NTP2 RTP PACKETS OCTETS SSRC LOST SEQ JITTER LAST SR DLSR CNAME -@ret1 = rcv($sock_ax, $port_bx, qr/^\x81\xc8\x00\x0c(.{4})(.{4})(.{4})(.{4})\x00\x00\x00\x01\x00\x00\x00\xac\x00\x00\x12\x34\x00\x00\x00\x00\x00\x00\x03\xe8\x00\x00\x00\x00\x56\x78\x9a\xbc(.{4})\x81\xca\x00\x05(.{4})\x01\x0c([0-9a-f]{12})\x00\x00$/s); +@ret1 = rcv($sock_ax, $port_bx, qr/^\x81\xc8\x00\x0c(.{4})(.{4})(.{4})(.{4})\x00\x00\x00\x02\x00\x00\x01\x58\x00\x00\x12\x34\x00\x00\x00\x00\x00\x00\x03\xe8\x00\x00\x00\x00\x56\x78\x9a\xbc(.{4})\x81\xca\x00\x05(.{4})\x01\x0c([0-9a-f]{12})\x00\x00$/s); is $ret1[0], $ssrc, 'SSRC matches'; -is $ret1[3], $ts, 'TS matches'; +is $ret1[3], $ts + 160, 'TS matches'; cmp_ok $ret1[4], '<', 6553, 'DSLR ok'; is $ret1[5], $ssrc, 'SSRC matches';