From 974f334f3cfc043a823321f805ae9d3ac6faa27c Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Mon, 11 Aug 2025 07:53:12 -0400 Subject: [PATCH] MT#55283 combine in/out_lock With selected_sfd being protected by in_lock, we pretty much have to hold at least in_lock everywhere, and end up requiring both locks in many places. The distinction has become pointless. Change-Id: Ic0ad976c2d68d9639b9434da7f0e6e9c0d84c185 (cherry picked from commit e03f81485530231ed07cc626570d7d992bbeaf19) (cherry picked from commit 222fcaa4b0f1ef33c9256088fffaecf019d38b72) --- daemon/call.c | 17 +++-- daemon/codec.c | 7 +-- daemon/dtls.c | 10 +-- daemon/ice.c | 6 +- daemon/media_player.c | 12 ++-- daemon/media_socket.c | 137 +++++++++++++++++++++++------------------ daemon/mqtt.c | 8 +-- daemon/redis.c | 3 +- daemon/rtcp.c | 4 +- daemon/ssrc.c | 2 +- daemon/t38.c | 9 +-- include/call.h | 46 ++++++-------- include/media_socket.h | 1 - 13 files changed, 124 insertions(+), 138 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 00bc98746..3963b3b12 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -190,7 +190,7 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) { g_autoptr(stream_fd) sfd = NULL; { - LOCK(&ps->in_lock); + LOCK(&ps->lock); if (ps->selected_sfd) sfd = obj_get(ps->selected_sfd); } @@ -1025,8 +1025,7 @@ struct packet_stream *__packet_stream_new(call_t *call) { struct packet_stream *stream; stream = uid_alloc(&call->streams); - mutex_init(&stream->in_lock); - mutex_init(&stream->out_lock); + mutex_init(&stream->lock); stream->call = call; atomic64_set_na(&stream->last_packet, rtpe_now.tv_sec); stream->rtp_stats = rtp_stats_ht_new(); @@ -1137,21 +1136,19 @@ void call_stream_crypto_reset(struct packet_stream *ps) { crypto_reset(&ps->crypto); if (PS_ISSET(ps, RTP)) { - mutex_lock(&ps->in_lock); + mutex_lock(&ps->lock); for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) { if (!ps->ssrc_in[u]) // end of list break; atomic_set_na(&ps->ssrc_in[u]->stats->ext_seq, 0); } - mutex_unlock(&ps->in_lock); - mutex_lock(&ps->out_lock); for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) { if (!ps->ssrc_out[u]) // end of list break; atomic_set_na(&ps->ssrc_out[u]->stats->ext_seq, 0); } - mutex_unlock(&ps->out_lock); + mutex_unlock(&ps->lock); } } @@ -1178,16 +1175,16 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) { } if (MEDIA_ISSET(media, DTLS)) { - mutex_lock(&ps->in_lock); + mutex_lock(&ps->lock); struct dtls_connection *d = dtls_ptr(ps->selected_sfd); if (d && d->init && !d->connected) { int dret = dtls(ps->selected_sfd, NULL, NULL); - mutex_unlock(&ps->in_lock); + mutex_unlock(&ps->lock); if (dret == 1) call_media_unkernelize(media, "DTLS connected"); return CSS_DTLS; } - mutex_unlock(&ps->in_lock); + mutex_unlock(&ps->lock); } if (PS_ISSET(ps, PIERCE_NAT) && PS_ISSET(ps, FILLED) && !PS_ISSET(ps, CONFIRMED)) { diff --git a/daemon/codec.c b/daemon/codec.c index 020e96254..80c4f1d41 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -1320,14 +1320,14 @@ static void __rtcp_timer_run(struct codec_timer *ct) { struct ssrc_ctx *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,}; if (media->streams.head) { struct packet_stream *ps = media->streams.head->data; - mutex_lock(&ps->out_lock); + mutex_lock(&ps->lock); for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) { if (!ps->ssrc_out[u]) // end of list break; ssrc_out[u] = ps->ssrc_out[u]; ssrc_ctx_hold(ssrc_out[u]); } - mutex_unlock(&ps->out_lock); + mutex_unlock(&ps->lock); } for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) { @@ -3360,11 +3360,10 @@ static void send_buffered(struct media_packet *mp, unsigned int log_sys) { if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, mp)) ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media"); - mutex_lock(&sink->out_lock); + LOCK(&sink->lock); if (media_socket_dequeue(mp, sink)) ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT, "Error sending buffered media to RTP sink"); - mutex_unlock(&sink->out_lock); } } diff --git a/daemon/dtls.c b/daemon/dtls.c index e97e9c1a9..810173b37 100644 --- a/daemon/dtls.c +++ b/daemon/dtls.c @@ -840,7 +840,7 @@ error: return -1; } -/* called with call locked in W or R with ps->in_lock held */ +/* called with call locked in W or R with ps->lock held */ int dtls(stream_fd *sfd, const str *s, const endpoint_t *fsin) { struct packet_stream *ps = sfd->stream; int ret; @@ -888,22 +888,16 @@ int dtls(stream_fd *sfd, const str *s, const endpoint_t *fsin) { else if (ret == 1) { /* connected! */ dret = 1; - mutex_lock(&ps->out_lock); // nested lock! if (dtls_setup_crypto(ps, d)) {} /* XXX ?? */ - mutex_unlock(&ps->out_lock); if (PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP) && ps->rtcp_sibling && MEDIA_ISSET(ps->media, RTCP_MUX) && ps->rtcp_sibling != ps) { - // nested locks! - mutex_lock(&ps->rtcp_sibling->in_lock); - mutex_lock(&ps->rtcp_sibling->out_lock); + LOCK(&ps->rtcp_sibling->lock); if (dtls_setup_crypto(ps->rtcp_sibling, d)) {} /* XXX ?? */ - mutex_unlock(&ps->rtcp_sibling->out_lock); - mutex_unlock(&ps->rtcp_sibling->in_lock); } } diff --git a/daemon/ice.c b/daemon/ice.c index 1618c68df..5aa636c95 100644 --- a/daemon/ice.c +++ b/daemon/ice.c @@ -1141,7 +1141,7 @@ found: t_queue_clear(&compo1); } -/* call(W) or call(R)+agent must be locked - no in_lock or out_lock must be held */ +/* call(W) or call(R)+agent must be locked - no ps->lock must be held */ static int __check_valid(struct ice_agent *ag) { struct call_media *media; struct packet_stream *ps; @@ -1185,7 +1185,7 @@ static int __check_valid(struct ice_agent *ag) { ps = l->data; pair = k->data; - mutex_lock(&ps->out_lock); + LOCK(&ps->lock); if (memcmp(&ps->endpoint, &pair->remote_candidate->endpoint, sizeof(ps->endpoint))) { ilogs(ice, LOG_INFO, "ICE negotiated: new peer for component %u is %s%s%s", ps->component, FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint))); @@ -1195,9 +1195,7 @@ static int __check_valid(struct ice_agent *ag) { else ilogs(ice, LOG_INFO, "ICE negotiated: peer for component %u is %s%s%s", ps->component, FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint))); - mutex_unlock(&ps->out_lock); - LOCK(&ps->in_lock); for (__auto_type m = ps->sfds.head; m; m = m->next) { sfd = m->data; if (sfd->local_intf != pair->local_intf) diff --git a/daemon/media_player.c b/daemon/media_player.c index d20b5d43e..7249ef90e 100644 --- a/daemon/media_player.c +++ b/daemon/media_player.c @@ -436,11 +436,11 @@ static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp) log_info_call(call); rwlock_lock_r(&call->master_lock); - mutex_lock(&st->sink->out_lock); + mutex_lock(&st->sink->lock); __send_timer_send_common(st, cp); - mutex_unlock(&st->sink->out_lock); + mutex_unlock(&st->sink->lock); __send_timer_rtcp(st, ssrc_out); ssrc_ctx_put(&ssrc_out); @@ -595,10 +595,10 @@ retry:; media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet); - mutex_lock(&mp->sink->out_lock); + mutex_lock(&mp->sink->lock); if (media_socket_dequeue(&packet, mp->sink)) ilog(LOG_ERR, "Error sending playback media to RTP sink"); - mutex_unlock(&mp->sink->out_lock); + mutex_unlock(&mp->sink->lock); // schedule our next run timeval_add_usec(&mp->next_run, us_dur); @@ -1076,10 +1076,10 @@ void media_player_add_packet(struct media_player *mp, char *buf, size_t len, media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet); - mutex_lock(&mp->sink->out_lock); + mutex_lock(&mp->sink->lock); if (media_socket_dequeue(&packet, mp->sink)) ilog(LOG_ERR, "Error sending playback media to RTP sink"); - mutex_unlock(&mp->sink->out_lock); + mutex_unlock(&mp->sink->lock); timeval_add_usec(&mp->next_run, us_dur); timerthread_obj_schedule_abs(&mp->tt_obj, &mp->next_run); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 14945e9d3..256a3d26d 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1548,9 +1548,7 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st __auto_type reti = &s->reti; if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) { - mutex_lock(&stream->out_lock); __re_address_translate_ep(&reti->expected_src, MEDIA_ISSET(media, ASYMMETRIC) ? &stream->learned_endpoint : &stream->endpoint); - mutex_unlock(&stream->out_lock); if (PS_ISSET(stream, STRICT_SOURCE)) reti->src_mismatch = MSM_DROP; else if (PS_ISSET(stream, MEDIA_HANDOVER)) @@ -1684,6 +1682,12 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st return NULL; } +/** + * The linkage between userspace and kernel module is in the kernelize_one(). + * + * Called with stream->lock held. + * sink_handler can be NULL. + */ __attribute__((nonnull(1, 2, 3))) static const char *kernelize_one(kernelize_state *s, struct packet_stream *stream, struct sink_handler *sink_handler) @@ -1759,7 +1763,12 @@ static const char *kernelize_one(kernelize_state *s, if (MEDIA_ISSET(media, ECHO) || sink_handler->attrs.transcoding) redi->output.ssrc_subst = 1; - mutex_lock(&sink->out_lock); + // XXX nested lock, avoid possible deadlock. should be reworked not to + // require a nested lock + if (sink != stream && mutex_trylock(&sink->lock)) { + g_free(redi); + return ""; // indicate deadlock + } __re_address_translate_ep(&redi->output.dst_addr, &sink->endpoint); __re_address_translate_ep(&redi->output.src_addr, &sink->selected_sfd->socket.local); @@ -1781,7 +1790,8 @@ static const char *kernelize_one(kernelize_state *s, handler->out->kernel(&redi->output.encrypt, sink); - mutex_unlock(&sink->out_lock); + if (sink != stream) + mutex_unlock(&sink->lock); if (!redi->output.encrypt.cipher || !redi->output.encrypt.hmac) { g_free(redi); @@ -1798,31 +1808,39 @@ static const char *kernelize_one(kernelize_state *s, return NULL; } // helper function for kernelize() -static void kernelize_one_sink_handler(kernelize_state *s, +// called with stream->lock held +static bool kernelize_one_sink_handler(kernelize_state *s, struct packet_stream *stream, struct sink_handler *sink_handler) { struct packet_stream *sink = sink_handler->sink; if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED)) - return; + return true; const char *err = kernelize_one(s, stream, sink_handler); - if (err) + if (err) { + if (!*err) + return false; // indicate deadlock ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err); + } + return true; } -/* called with in_lock held */ -void kernelize(struct packet_stream *stream) { +/* called with master_lock held */ +static void kernelize(struct packet_stream *stream) { call_t *call = stream->call; - const char *nk_warn_msg; struct call_media *media = stream->media; g_auto(kernelize_state) s = {0}; - if (PS_ISSET(stream, KERNELIZED)) + while (true) { + + LOCK(&stream->lock); + + // set flag, return if set already + if (PS_SET(stream, KERNELIZED)) return; if (call->recording != NULL && !selected_recording_method->kernel_support) goto no_kernel; if (!kernel.is_wanted) goto no_kernel; - nk_warn_msg = "interface to kernel module not open"; if (!kernel.is_open) goto no_kernel_warn; if (MEDIA_ISSET(media, GENERATOR)) @@ -1843,12 +1861,16 @@ void kernelize(struct packet_stream *stream) { struct sink_handler *sh = l->data; if (sh->attrs.block_media) continue; - kernelize_one_sink_handler(&s, stream, sh); + bool ok = kernelize_one_sink_handler(&s, stream, sh); + if (!ok) + goto restart; } // RTP egress mirrors for (__auto_type l = stream->rtp_mirrors.head; l; l = l->next) { struct sink_handler *sh = l->data; - kernelize_one_sink_handler(&s, stream, sh); + bool ok = kernelize_one_sink_handler(&s, stream, sh); + if (!ok) + goto restart; } // RTP -> RTCP sinks // record number of RTP destinations up to now @@ -1858,7 +1880,9 @@ void kernelize(struct packet_stream *stream) { s.payload_types = NULL; for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) { struct sink_handler *sh = l->data; - kernelize_one_sink_handler(&s, stream, sh); + bool ok = kernelize_one_sink_handler(&s, stream, sh); + if (!ok) + goto restart; } // mark the start of RTCP outputs s.reti.num_rtcp_destinations = s.reti.num_destinations - num_rtp_dests; @@ -1880,15 +1904,25 @@ void kernelize(struct packet_stream *stream) { } stream->kernel_time = rtpe_now.tv_sec; - PS_SET(stream, KERNELIZED); return; no_kernel_warn: - ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", nk_warn_msg); + ilog(LOG_WARNING, "No support for kernel packet forwarding available " + "(interface to kernel module not open)"); no_kernel: - PS_SET(stream, KERNELIZED); stream->kernel_time = rtpe_now.tv_sec; PS_SET(stream, NO_KERNEL_SUPPORT); + return; + +restart: // handle detected deadlock + + rtp_stats_arr_destroy_ptr(s.payload_types); + + while ((redi = t_queue_pop_head(&s.outputs))) + g_free(redi); + + // try again, releases stream->lock + } } // must be called with appropriate locks (master lock and/or in/out_lock) @@ -1917,7 +1951,7 @@ struct ssrc_ctx *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_S } -/* must be called with in_lock held or call->master_lock held in W */ +/* must be called with ps->lock held or call->master_lock held in W */ void __unkernelize(struct packet_stream *p, const char *reason) { if (!p->selected_sfd) return; @@ -1963,9 +1997,8 @@ void __stream_unconfirm(struct packet_stream *ps, const char *reason) { static void stream_unconfirm(struct packet_stream *ps, const char *reason) { if (!ps) return; - mutex_lock(&ps->in_lock); + LOCK(&ps->lock); __stream_unconfirm(ps, reason); - mutex_unlock(&ps->in_lock); } static void unconfirm_sinks(sink_handler_q *q, const char *reason) { for (__auto_type l = q->head; l; l = l->next) { @@ -1976,9 +2009,8 @@ static void unconfirm_sinks(sink_handler_q *q, const char *reason) { void unkernelize(struct packet_stream *ps, const char *reason) { if (!ps) return; - mutex_lock(&ps->in_lock); + LOCK(&ps->lock); __unkernelize(ps, reason); - mutex_unlock(&ps->in_lock); } @@ -2015,7 +2047,7 @@ err: return &__sh_noop; } -/* must be called with call->master_lock held in R, and in->in_lock held */ +/* must be called with call->master_lock held in R, and in->lock held */ // `sh` can be null static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *sh) { const struct transport_protocol *in_proto, *out_proto; @@ -2124,7 +2156,7 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc, static const char *__stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs, struct ssrc_ctx **ssrc_in_p, struct ssrc_hash *ssrc_hash) { - return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->in_lock, in_srtp->ssrc_in, + return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->lock, in_srtp->ssrc_in, &in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, SSRC_DIR_INPUT, "ingress"); } // check and update output SSRC pointers @@ -2134,12 +2166,12 @@ static const char *__stream_ssrc_out(struct packet_stream *out_srtp, uint32_t ss bool ssrc_change) { if (ssrc_change) - return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->out_lock, + return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->lock, out_srtp->ssrc_out, &out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT, "egress (mapped)"); - return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->out_lock, + return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->lock, out_srtp->ssrc_out, &out_srtp->ssrc_out_idx, 0, ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT, "egress (direct)"); @@ -2164,14 +2196,13 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) { } } - mutex_lock(&phc->mp.stream->in_lock); + LOCK(&phc->mp.stream->lock); int ret = dtls(phc->mp.sfd, &phc->s, &phc->mp.fsin); if (ret == 1) { phc->unkernelize = "DTLS connected"; phc->unkernelize_subscriptions = true; ret = 0; } - mutex_unlock(&phc->mp.stream->in_lock); if (!ret) return 0; } @@ -2196,7 +2227,7 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) { #if RTP_LOOP_PROTECT // returns: 0 = ok, proceed; -1 = duplicate detected, drop packet static int media_loop_detect(struct packet_handler_ctx *phc) { - mutex_lock(&phc->mp.stream->in_lock); + mutex_lock(&phc->mp.stream->lock); for (int i = 0; i < RTP_LOOP_PACKETS; i++) { if (phc->mp.stream->lp_buf[i].len != phc->s.len) @@ -2210,7 +2241,7 @@ static int media_loop_detect(struct packet_handler_ctx *phc) { "to avoid potential loop", RTP_LOOP_MAX_COUNT, FMT_M(endpoint_print_buf(&phc->mp.fsin))); - mutex_unlock(&phc->mp.stream->in_lock); + mutex_unlock(&phc->mp.stream->lock); return -1; } @@ -2224,7 +2255,7 @@ static int media_loop_detect(struct packet_handler_ctx *phc) { memcpy(phc->mp.stream->lp_buf[phc->mp.stream->lp_idx].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT)); phc->mp.stream->lp_idx = (phc->mp.stream->lp_idx + 1) % RTP_LOOP_PACKETS; loop_ok: - mutex_unlock(&phc->mp.stream->in_lock); + mutex_unlock(&phc->mp.stream->lock); return 0; } @@ -2337,7 +2368,7 @@ static void media_packet_rtp_out(struct packet_handler_ctx *phc, struct sink_han static int media_packet_decrypt(struct packet_handler_ctx *phc) { - mutex_lock(&phc->in_srtp->in_lock); + mutex_lock(&phc->in_srtp->lock); struct sink_handler *first_sh = phc->sinks->length ? phc->sinks->head->data : NULL; const struct streamhandler *sh = __determine_handler(phc->in_srtp, first_sh); @@ -2358,7 +2389,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc) phc->mp.payload.len -= ori_s.len - phc->s.len; } - mutex_unlock(&phc->in_srtp->in_lock); + mutex_unlock(&phc->in_srtp->lock); if (ret == 1) { phc->update = true; @@ -2368,7 +2399,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc) } static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink_handler *sh) { - mutex_lock(&phc->in_srtp->in_lock); + mutex_lock(&phc->in_srtp->lock); __determine_handler(phc->in_srtp, sh); // XXX use an array with index instead of if/else @@ -2378,7 +2409,7 @@ static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink phc->encrypt_func = sh->handler->out->rtcp_crypt; phc->rtcp_filter = sh->handler->in->rtcp_filter; } - mutex_unlock(&phc->in_srtp->in_lock); + mutex_unlock(&phc->in_srtp->lock); } int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, struct media_packet *mp) { @@ -2387,7 +2418,7 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s if (!encrypt_func) return 0x00; - mutex_lock(&out->out_lock); + LOCK(&out->lock); for (__auto_type l = mp->packets_out.head; l; l = l->next) { struct codec_packet *p = l->data; @@ -2403,8 +2434,6 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s ret |= 0x01; } - mutex_unlock(&out->out_lock); - return ret; } @@ -2424,7 +2453,7 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc) struct endpoint endpoint; bool ret = false; - mutex_lock(&phc->mp.stream->in_lock); + mutex_lock(&phc->mp.stream->lock); /* we're OK to (potentially) use the source address of this packet as destination * in the other direction. */ @@ -2453,10 +2482,8 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc) /* do not pay attention to source addresses of incoming packets for asymmetric streams */ if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC) || phc->mp.stream->el_flags == EL_OFF) { PS_SET(phc->mp.stream, CONFIRMED); - mutex_lock(&phc->mp.stream->out_lock); if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC) && !phc->mp.stream->learned_endpoint.address.family) phc->mp.stream->learned_endpoint = phc->mp.fsin; - mutex_unlock(&phc->mp.stream->out_lock); } /* confirm sinks for unidirectional streams in order to kernelize */ @@ -2472,7 +2499,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc) /* see if we need to compare the source address with the known endpoint */ if (PS_ISSET2(phc->mp.stream, STRICT_SOURCE, MEDIA_HANDOVER)) { endpoint = phc->mp.fsin; - mutex_lock(&phc->mp.stream->out_lock); struct endpoint *ps_endpoint = MEDIA_ISSET(phc->mp.media, ASYMMETRIC) ? &phc->mp.stream->learned_endpoint : &phc->mp.stream->endpoint; @@ -2488,8 +2514,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc) goto update_addr; } - mutex_unlock(&phc->mp.stream->out_lock); - if (tmp && PS_ISSET(phc->mp.stream, STRICT_SOURCE)) { ilog(LOG_INFO | LOG_FLAG_LIMIT, "Drop due to strict-source attribute; " "got %s%s:%d%s, " @@ -2563,7 +2587,6 @@ confirm_now: PS_SET(phc->mp.stream, CONFIRMED); update_peerinfo: - mutex_lock(&phc->mp.stream->out_lock); // if we're during the wait time, check the received address against the previously // learned address. if they're the same, ignore this packet for learning purposes if (!wait_time || !phc->mp.stream->learned_endpoint.address.family || @@ -2582,8 +2605,6 @@ update_peerinfo: } } update_addr: - mutex_unlock(&phc->mp.stream->out_lock); - /* check the destination address of the received packet against what we think our * local interface to use is */ if (phc->mp.stream->selected_sfd && phc->mp.sfd != phc->mp.stream->selected_sfd) { @@ -2606,7 +2627,7 @@ update_addr: } out: - mutex_unlock(&phc->mp.stream->in_lock); + mutex_unlock(&phc->mp.stream->lock); return ret; } @@ -2626,9 +2647,7 @@ static void media_packet_kernel_check(struct packet_handler_ctx *phc) { if (ML_ISSET(phc->mp.media->monologue, DTMF_INJECTION_ACTIVE)) return; - mutex_lock(&phc->mp.stream->in_lock); kernelize(phc->mp.stream); - mutex_unlock(&phc->mp.stream->in_lock); } @@ -2967,19 +2986,19 @@ static int stream_packet(struct packet_handler_ctx *phc) { if (ret) goto next_mirror; - mutex_lock(&mirror_sink->out_lock); + mutex_lock(&mirror_sink->lock); if (!mirror_sink->advertised_endpoint.port || (is_addr_unspecified(&mirror_sink->advertised_endpoint.address) && !is_trickle_ice_address(&mirror_sink->advertised_endpoint))) { - mutex_unlock(&mirror_sink->out_lock); + mutex_unlock(&mirror_sink->lock); goto next_mirror; } media_socket_dequeue(&mirror_phc.mp, mirror_sink); - mutex_unlock(&mirror_sink->out_lock); + mutex_unlock(&mirror_sink->lock); next_mirror: media_socket_dequeue(&mirror_phc.mp, NULL); // just free if anything left @@ -2992,13 +3011,13 @@ next_mirror: if (ret == -1) goto err_next; - mutex_lock(&sink->out_lock); + mutex_lock(&sink->lock); if (!sink->advertised_endpoint.port || (is_addr_unspecified(&sink->advertised_endpoint.address) && !is_trickle_ice_address(&sink->advertised_endpoint))) { - mutex_unlock(&sink->out_lock); + mutex_unlock(&sink->lock); goto next; } @@ -3007,7 +3026,7 @@ next_mirror: else ret = media_socket_dequeue(&phc->mp, NULL); - mutex_unlock(&sink->out_lock); + mutex_unlock(&sink->lock); if (ret == 0) goto next; @@ -3015,10 +3034,10 @@ next_mirror: err_next: ilog(LOG_DEBUG | LOG_FLAG_LIMIT ,"Error when sending message. Error: %s", strerror(errno)); atomic64_inc_na(&sink->stats_in->errors); - mutex_lock(&sink->in_lock); + mutex_lock(&sink->lock); if (sink->selected_sfd) atomic64_inc_na(&sink->selected_sfd->local_intf->stats->out.errors); - mutex_unlock(&sink->in_lock); + mutex_unlock(&sink->lock); RTPE_STATS_INC(errors_user); goto next; diff --git a/daemon/mqtt.c b/daemon/mqtt.c index 22e4619c4..64f32c5b6 100644 --- a/daemon/mqtt.c +++ b/daemon/mqtt.c @@ -340,7 +340,7 @@ static void mqtt_stream_stats_dir(const struct stream_stats *s, JsonBuilder *jso static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { - mutex_lock(&ps->in_lock); + LOCK(&ps->lock); stream_fd *sfd = ps->selected_sfd; if (sfd) { @@ -382,10 +382,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { json_builder_end_object(json); - mutex_unlock(&ps->in_lock); - - mutex_lock(&ps->out_lock); - json_builder_set_member_name(json, "egress"); json_builder_begin_object(json); mqtt_stream_stats_dir(ps->stats_out, json); @@ -402,8 +398,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { json_builder_end_array(json); json_builder_end_object(json); - - mutex_unlock(&ps->out_lock); } diff --git a/daemon/redis.c b/daemon/redis.c index bced4c40c..758da3fe4 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -2511,8 +2511,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c, void **to_free) { for (__auto_type l = c->streams.head; l; l = l->next) { struct packet_stream *ps = l->data; - LOCK(&ps->in_lock); - LOCK(&ps->out_lock); + LOCK(&ps->lock); snprintf(tmp, sizeof(tmp), "stream-%u", ps->unique_id); inner = parser->dict_add_dict_dup(root, tmp); diff --git a/daemon/rtcp.c b/daemon/rtcp.c index e45416d66..69dc394a9 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -1568,7 +1568,7 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) { rtcp_ps = ps; } - LOCK(&ps->in_lock); + LOCK(&ps->lock); if (!ps->selected_sfd || !rtcp_ps->selected_sfd) return; @@ -1597,8 +1597,6 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) { str rtcp_packet = STR_GS(sr); - LOCK(&ps->out_lock); - const struct streamhandler *crypt_handler = determine_handler(&transport_protocols[PROTO_RTP_AVP], media, true); diff --git a/daemon/ssrc.c b/daemon/ssrc.c index 1ed3f36b2..4d293700f 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -756,7 +756,7 @@ void ssrc_collect_metrics(struct call_media *media) { e->jitter = e->jitter * 1000 / rpt->clock_rate; } - LOCK(&ps->in_lock); + LOCK(&ps->lock); RTPE_SAMPLE_SFD(jitter_measured, e->jitter, ps->selected_sfd); } } diff --git a/daemon/t38.c b/daemon/t38.c index ff30dca6c..ada342a0c 100644 --- a/daemon/t38.c +++ b/daemon/t38.c @@ -218,8 +218,7 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui stream_fd *sfd = NULL; if (ps) { - mutex_lock(&ps->in_lock); - mutex_lock(&ps->out_lock); + mutex_lock(&ps->lock); sfd = ps->selected_sfd; } if (sfd && sfd->socket.fd != -1 && ps->endpoint.address.family != NULL) { @@ -231,10 +230,8 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui else ilog(LOG_WARN | LOG_FLAG_LIMIT, "Unable to send T.38 UDPTL packet due to lack of " "socket or stream"); - if (ps) { - mutex_unlock(&ps->out_lock); - mutex_unlock(&ps->in_lock); - } + if (ps) + mutex_unlock(&ps->lock); g_string_free(s, TRUE); diff --git a/include/call.h b/include/call.h index aec8779bf..ec40c3e8a 100644 --- a/include/call.h +++ b/include/call.h @@ -406,18 +406,10 @@ TYPED_GHASHTABLE_PROTO(rtp_stats_ht, void, struct rtp_stats) * This is done through the various bit flags. */ struct packet_stream { - /* Both locks valid only with call->master_lock held in R. + /* Lock valid only with call->master_lock held in R. * Preempted by call->master_lock held in W. - * If both in/out are to be locked, in_lock must be locked first. - * - * The in_lock protects fields relevant to packet reception on that stream, - * meanwhile the out_lock protects fields relevant to packet egress. - * - * This allows packet handling on multiple ports and streams belonging - * to the same call to happen at the same time. */ - mutex_t in_lock, - out_lock; + mutex_t lock; struct call_media *media; /* RO */ call_t *call; /* RO */ @@ -426,23 +418,23 @@ struct packet_stream { struct recording_stream recording; /* LOCK: call->master_lock */ stream_fd_q sfds; /* LOCK: call->master_lock */ - stream_fd *selected_sfd; // LOCK: in_lock + stream_fd *selected_sfd; // LOCK: ps->lock endpoint_t last_local_endpoint; - struct dtls_connection ice_dtls; /* LOCK: in_lock */ - sink_handler_q rtp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */ - sink_handler_q rtcp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */ + struct dtls_connection ice_dtls; /* LOCK: ps->lock */ + sink_handler_q rtp_sinks; /* LOCK: call->master_lock, ps->lock for streamhandler */ + sink_handler_q rtcp_sinks; /* LOCK: call->master_lock, ps->lock for streamhandler */ struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */ - sink_handler_q rtp_mirrors; /* LOCK: call->master_lock, in_lock for streamhandler */ - struct endpoint endpoint; /* LOCK: out_lock */ - struct endpoint detected_endpoints[4]; /* LOCK: out_lock */ - time_t ep_detect_signal; /* LOCK: out_lock */ + sink_handler_q rtp_mirrors; /* LOCK: call->master_lock, ps->ock for streamhandler */ + struct endpoint endpoint; /* LOCK: ps->lock */ + struct endpoint detected_endpoints[4]; /* LOCK: ps->lock */ + time_t ep_detect_signal; /* LOCK: ps->lock */ struct endpoint advertised_endpoint; /* RO */ - struct endpoint learned_endpoint; /* LOCK: out_lock */ - struct crypto_context crypto; /* OUT direction, LOCK: out_lock */ - struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */ - *ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: out_lock */ - unsigned int ssrc_in_idx, /* LOCK: in_lock */ - ssrc_out_idx; /* LOCK: out_lock */ + struct endpoint learned_endpoint; /* LOCK: ps->lock */ + struct crypto_context crypto; /* OUT direction, LOCK: ps->lock */ + struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: ps->lock */ + *ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: ps->lock */ + unsigned int ssrc_in_idx, /* LOCK: ps->lock */ + ssrc_out_idx; /* LOCK: ps->lock */ struct send_timer *send_timer; /* RO */ struct jitter_buffer *jb; /* RO */ time_t kernel_time; @@ -455,15 +447,15 @@ struct packet_stream { enum endpoint_learning el_flags; #if RTP_LOOP_PROTECT - /* LOCK: in_lock: */ + /* LOCK: ps->lock: */ unsigned int lp_idx; struct loop_protector lp_buf[RTP_LOOP_PACKETS]; unsigned int lp_count; #endif - X509 *dtls_cert; /* LOCK: in_lock */ + X509 *dtls_cert; /* LOCK: ps->lock */ - /* in_lock must be held for SETTING these: */ + /* ps->lock must be held for SETTING these: */ atomic64 ps_flags; }; diff --git a/include/media_socket.h b/include/media_socket.h index 063489b24..f6ca65e4c 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -310,7 +310,6 @@ void free_sfd_intf_list(struct sfd_intf_list *il); void free_release_sfd_intf_list(struct sfd_intf_list *il); void free_socket_intf_list(struct socket_intf_list *il); -void kernelize(struct packet_stream *); void __unkernelize(struct packet_stream *, const char *); void unkernelize(struct packet_stream *, const char *); void __stream_unconfirm(struct packet_stream *, const char *);