From e03f81485530231ed07cc626570d7d992bbeaf19 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Mon, 11 Aug 2025 07:53:12 -0400 Subject: [PATCH] MT#55283 combine in/out_lock With selected_sfd being protected by in_lock, we pretty much have to hold at least in_lock everywhere, and end up requiring both locks in many places. The distinction has become pointless. Change-Id: Ic0ad976c2d68d9639b9434da7f0e6e9c0d84c185 --- daemon/call.c | 11 ++-- daemon/codec.c | 3 +- daemon/dtls.c | 10 +--- daemon/ice.c | 6 +-- daemon/media_player.c | 12 ++--- daemon/media_socket.c | 119 ++++++++++++++++++++++++----------------- daemon/mqtt.c | 8 +-- daemon/redis.c | 3 +- daemon/rtcp.c | 4 +- daemon/ssrc.c | 2 +- daemon/t38.c | 9 ++-- include/call.h | 38 ++++++------- include/media_socket.h | 1 - 13 files changed, 108 insertions(+), 118 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 798157f1f..4959fdbdb 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -189,7 +189,7 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) { g_autoptr(stream_fd) sfd = NULL; { - LOCK(&ps->in_lock); + LOCK(&ps->lock); if (ps->selected_sfd) sfd = obj_get(ps->selected_sfd); } @@ -976,8 +976,7 @@ struct packet_stream *__packet_stream_new(call_t *call) { struct packet_stream *stream; stream = uid_alloc(&call->streams); - mutex_init(&stream->in_lock); - mutex_init(&stream->out_lock); + mutex_init(&stream->lock); stream->call = call; atomic64_set_na(&stream->last_packet_us, rtpe_now); stream->rtp_stats = rtp_stats_ht_new(); @@ -1130,16 +1129,16 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) { } if (MEDIA_ISSET(media, DTLS)) { - mutex_lock(&ps->in_lock); + mutex_lock(&ps->lock); struct dtls_connection *d = dtls_ptr(ps->selected_sfd); if (d && d->init && !d->connected) { int dret = dtls(ps->selected_sfd, NULL, NULL); - mutex_unlock(&ps->in_lock); + mutex_unlock(&ps->lock); if (dret == 1) call_media_unkernelize(media, "DTLS connected"); return CSS_DTLS; } - mutex_unlock(&ps->in_lock); + mutex_unlock(&ps->lock); } if (PS_ISSET(ps, PIERCE_NAT) && PS_ISSET(ps, FILLED) && !PS_ISSET(ps, CONFIRMED)) { diff --git a/daemon/codec.c b/daemon/codec.c index 9b9b0c920..480c627e1 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -3440,11 +3440,10 @@ static void send_buffered(struct media_packet *mp, unsigned int log_sys) { if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, mp)) ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media"); - mutex_lock(&sink->out_lock); + LOCK(&sink->lock); if (media_socket_dequeue(mp, sink)) ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT, "Error sending buffered media to RTP sink"); - mutex_unlock(&sink->out_lock); } } diff --git a/daemon/dtls.c b/daemon/dtls.c index 1725736b2..76b6e312e 100644 --- a/daemon/dtls.c +++ b/daemon/dtls.c @@ -882,7 +882,7 @@ error: return -1; } -/* called with call locked in W or R with ps->in_lock held */ +/* called with call locked in W or R with ps->lock held */ int dtls(stream_fd *sfd, const str *s, const endpoint_t *fsin) { struct packet_stream *ps = sfd->stream; int ret; @@ -941,22 +941,16 @@ int dtls(stream_fd *sfd, const str *s, const endpoint_t *fsin) { else if (ret == 1) { /* connected! */ dret = 1; - mutex_lock(&ps->out_lock); // nested lock! if (dtls_setup_crypto(ps, d)) {} /* XXX ?? */ - mutex_unlock(&ps->out_lock); if (PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP) && ps->rtcp_sibling && MEDIA_ISSET(ps->media, RTCP_MUX) && ps->rtcp_sibling != ps) { - // nested locks! - mutex_lock(&ps->rtcp_sibling->in_lock); - mutex_lock(&ps->rtcp_sibling->out_lock); + LOCK(&ps->rtcp_sibling->lock); if (dtls_setup_crypto(ps->rtcp_sibling, d)) {} /* XXX ?? */ - mutex_unlock(&ps->rtcp_sibling->out_lock); - mutex_unlock(&ps->rtcp_sibling->in_lock); } } diff --git a/daemon/ice.c b/daemon/ice.c index 7352ede17..82e1a56dd 100644 --- a/daemon/ice.c +++ b/daemon/ice.c @@ -1141,7 +1141,7 @@ found: t_queue_clear(&compo1); } -/* call(W) or call(R)+agent must be locked - no in_lock or out_lock must be held */ +/* call(W) or call(R)+agent must be locked - no ps->lock must be held */ static int __check_valid(struct ice_agent *ag) { struct call_media *media; struct packet_stream *ps; @@ -1185,7 +1185,7 @@ static int __check_valid(struct ice_agent *ag) { ps = l->data; pair = k->data; - mutex_lock(&ps->out_lock); + LOCK(&ps->lock); if (memcmp(&ps->endpoint, &pair->remote_candidate->endpoint, sizeof(ps->endpoint))) { ilogs(ice, LOG_INFO, "ICE negotiated: new peer for component %u is %s%s%s", ps->component, FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint))); @@ -1195,9 +1195,7 @@ static int __check_valid(struct ice_agent *ag) { else ilogs(ice, LOG_INFO, "ICE negotiated: peer for component %u is %s%s%s", ps->component, FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint))); - mutex_unlock(&ps->out_lock); - LOCK(&ps->in_lock); for (__auto_type m = ps->sfds.head; m; m = m->next) { sfd = m->data; if (sfd->local_intf != pair->local_intf) diff --git a/daemon/media_player.c b/daemon/media_player.c index 2d3dead18..adb7f9e58 100644 --- a/daemon/media_player.c +++ b/daemon/media_player.c @@ -432,11 +432,11 @@ static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp) log_info_call(call); rwlock_lock_r(&call->master_lock); - mutex_lock(&st->sink->out_lock); + mutex_lock(&st->sink->lock); __send_timer_send_common(st, cp); - mutex_unlock(&st->sink->out_lock); + mutex_unlock(&st->sink->lock); __send_timer_rtcp(st, ssrc_out); ssrc_entry_release(ssrc_out); @@ -591,10 +591,10 @@ retry:; media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet); - mutex_lock(&mp->sink->out_lock); + mutex_lock(&mp->sink->lock); if (media_socket_dequeue(&packet, mp->sink)) ilog(LOG_ERR, "Error sending playback media to RTP sink"); - mutex_unlock(&mp->sink->out_lock); + mutex_unlock(&mp->sink->lock); // schedule our next run mp->next_run += us_dur; @@ -1074,10 +1074,10 @@ void media_player_add_packet(struct media_player *mp, char *buf, size_t len, media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet); - mutex_lock(&mp->sink->out_lock); + mutex_lock(&mp->sink->lock); if (media_socket_dequeue(&packet, mp->sink)) ilog(LOG_ERR, "Error sending playback media to RTP sink"); - mutex_unlock(&mp->sink->out_lock); + mutex_unlock(&mp->sink->lock); mp->next_run += us_dur; timerthread_obj_schedule_abs(&mp->tt_obj, mp->next_run); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index f15fed672..be10cc316 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1574,9 +1574,7 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st __auto_type reti = &s->reti; if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) { - mutex_lock(&stream->out_lock); __re_address_translate_ep(&reti->expected_src, MEDIA_ISSET(media, ASYMMETRIC) ? &stream->learned_endpoint : &stream->endpoint); - mutex_unlock(&stream->out_lock); if (PS_ISSET(stream, STRICT_SOURCE)) reti->src_mismatch = MSM_DROP; else if (PS_ISSET(stream, MEDIA_HANDOVER)) @@ -1714,6 +1712,12 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st return NULL; } +/** + * The linkage between userspace and kernel module is in the kernelize_one(). + * + * Called with stream->lock held. + * sink_handler can be NULL. + */ __attribute__((nonnull(1, 2, 3))) static const char *kernelize_one(kernelize_state *s, struct packet_stream *stream, struct sink_handler *sink_handler) @@ -1789,7 +1793,12 @@ static const char *kernelize_one(kernelize_state *s, if (MEDIA_ISSET(media, ECHO) || sink_handler->attrs.transcoding) redi->output.ssrc_subst = 1; - mutex_lock(&sink->out_lock); + // XXX nested lock, avoid possible deadlock. should be reworked not to + // require a nested lock + if (sink != stream && mutex_trylock(&sink->lock)) { + g_free(redi); + return ""; // indicate deadlock + } __re_address_translate_ep(&redi->output.dst_addr, &sink->endpoint); __re_address_translate_ep(&redi->output.src_addr, &sink->selected_sfd->socket.local); @@ -1816,7 +1825,8 @@ static const char *kernelize_one(kernelize_state *s, handler->out->kernel(&redi->output.encrypt, sink); - mutex_unlock(&sink->out_lock); + if (sink != stream) + mutex_unlock(&sink->lock); if (!redi->output.encrypt.cipher || !redi->output.encrypt.hmac) { g_free(redi); @@ -1833,31 +1843,39 @@ static const char *kernelize_one(kernelize_state *s, return NULL; } // helper function for kernelize() -static void kernelize_one_sink_handler(kernelize_state *s, +// called with stream->lock held +static bool kernelize_one_sink_handler(kernelize_state *s, struct packet_stream *stream, struct sink_handler *sink_handler) { struct packet_stream *sink = sink_handler->sink; if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED)) - return; + return true; const char *err = kernelize_one(s, stream, sink_handler); - if (err) + if (err) { + if (!*err) + return false; // indicate deadlock ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err); + } + return true; } -/* called with in_lock held */ -void kernelize(struct packet_stream *stream) { +/* called with master_lock held */ +static void kernelize(struct packet_stream *stream) { call_t *call = stream->call; - const char *nk_warn_msg; struct call_media *media = stream->media; g_auto(kernelize_state) s = {0}; - if (PS_ISSET(stream, KERNELIZED)) + while (true) { + + LOCK(&stream->lock); + + // set flag, return if set already + if (PS_SET(stream, KERNELIZED)) return; if (call->recording != NULL && !selected_recording_method->kernel_support) goto no_kernel; if (!kernel.is_wanted) goto no_kernel; - nk_warn_msg = "interface to kernel module not open"; if (!kernel.is_open) goto no_kernel_warn; if (MEDIA_ISSET(media, GENERATOR)) @@ -1878,12 +1896,16 @@ void kernelize(struct packet_stream *stream) { struct sink_handler *sh = l->data; if (sh->attrs.block_media) continue; - kernelize_one_sink_handler(&s, stream, sh); + bool ok = kernelize_one_sink_handler(&s, stream, sh); + if (!ok) + goto restart; } // RTP egress mirrors for (__auto_type l = stream->rtp_mirrors.head; l; l = l->next) { struct sink_handler *sh = l->data; - kernelize_one_sink_handler(&s, stream, sh); + bool ok = kernelize_one_sink_handler(&s, stream, sh); + if (!ok) + goto restart; } // RTP -> RTCP sinks // record number of RTP destinations up to now @@ -1893,7 +1915,9 @@ void kernelize(struct packet_stream *stream) { s.payload_types = NULL; for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) { struct sink_handler *sh = l->data; - kernelize_one_sink_handler(&s, stream, sh); + bool ok = kernelize_one_sink_handler(&s, stream, sh); + if (!ok) + goto restart; } // mark the start of RTCP outputs s.reti.num_rtcp_destinations = s.reti.num_destinations - num_rtp_dests; @@ -1915,15 +1939,25 @@ void kernelize(struct packet_stream *stream) { } stream->kernel_time_us = rtpe_now; - PS_SET(stream, KERNELIZED); return; no_kernel_warn: - ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", nk_warn_msg); + ilog(LOG_WARNING, "No support for kernel packet forwarding available " + "(interface to kernel module not open)"); no_kernel: - PS_SET(stream, KERNELIZED); stream->kernel_time_us = rtpe_now; PS_SET(stream, NO_KERNEL_SUPPORT); + return; + +restart: // handle detected deadlock + + rtp_stats_arr_destroy_ptr(s.payload_types); + + while ((redi = t_queue_pop_head(&s.outputs))) + g_free(redi); + + // try again, releases stream->lock + } } // must be called with appropriate locks (master lock and/or in/out_lock) @@ -1952,7 +1986,7 @@ struct ssrc_entry_call *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_entry_call *l } -/* must be called with in_lock held or call->master_lock held in W */ +/* must be called with ps->lock held or call->master_lock held in W */ void __unkernelize(struct packet_stream *p, const char *reason) { if (!p->selected_sfd) return; @@ -1998,9 +2032,8 @@ void __stream_unconfirm(struct packet_stream *ps, const char *reason) { static void stream_unconfirm(struct packet_stream *ps, const char *reason) { if (!ps) return; - mutex_lock(&ps->in_lock); + LOCK(&ps->lock); __stream_unconfirm(ps, reason); - mutex_unlock(&ps->in_lock); } static void unconfirm_sinks(sink_handler_q *q, const char *reason) { for (__auto_type l = q->head; l; l = l->next) { @@ -2011,9 +2044,8 @@ static void unconfirm_sinks(sink_handler_q *q, const char *reason) { void unkernelize(struct packet_stream *ps, const char *reason) { if (!ps) return; - mutex_lock(&ps->in_lock); + LOCK(&ps->lock); __unkernelize(ps, reason); - mutex_unlock(&ps->in_lock); } @@ -2050,7 +2082,7 @@ err: return &__sh_noop; } -/* must be called with call->master_lock held in R, and in->in_lock held */ +/* must be called with call->master_lock held in R, and in->lock held */ // `sh` can be null static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *sh) { const struct transport_protocol *in_proto, *out_proto; @@ -2184,14 +2216,13 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) { } } - mutex_lock(&phc->mp.stream->in_lock); + LOCK(&phc->mp.stream->lock); int ret = dtls(phc->mp.sfd, &phc->s, &phc->mp.fsin); if (ret == 1) { phc->unkernelize = "DTLS connected"; phc->unkernelize_subscriptions = true; ret = 0; } - mutex_unlock(&phc->mp.stream->in_lock); if (!ret) return 0; } @@ -2216,7 +2247,7 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) { #if RTP_LOOP_PROTECT // returns: 0 = ok, proceed; -1 = duplicate detected, drop packet static int media_loop_detect(struct packet_handler_ctx *phc) { - LOCK(&phc->mp.stream->in_lock); + LOCK(&phc->mp.stream->lock); for (int i = 0; i < RTP_LOOP_PACKETS; i++) { if (phc->mp.stream->lp_buf[i].len != phc->s.len) @@ -2354,7 +2385,7 @@ static void media_packet_rtp_out(struct packet_handler_ctx *phc, struct sink_han static int media_packet_decrypt(struct packet_handler_ctx *phc) { - LOCK(&phc->in_srtp->in_lock); + LOCK(&phc->in_srtp->lock); struct sink_handler *first_sh = phc->sinks->length ? phc->sinks->head->data : NULL; const struct streamhandler *sh = __determine_handler(phc->in_srtp, first_sh); @@ -2383,7 +2414,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc) } static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink_handler *sh) { - LOCK(&phc->in_srtp->in_lock); + LOCK(&phc->in_srtp->lock); __determine_handler(phc->in_srtp, sh); // XXX use an array with index instead of if/else @@ -2401,7 +2432,7 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s if (!encrypt_func) return 0x00; - mutex_lock(&out->out_lock); + LOCK(&out->lock); for (__auto_type l = mp->packets_out.head; l; l = l->next) { struct codec_packet *p = l->data; @@ -2417,8 +2448,6 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s ret |= 0x01; } - mutex_unlock(&out->out_lock); - return ret; } @@ -2437,7 +2466,7 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc) { bool ret = false; - LOCK(&phc->mp.stream->in_lock); + LOCK(&phc->mp.stream->lock); /* we're OK to (potentially) use the source address of this packet as destination * in the other direction. */ @@ -2488,7 +2517,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc) /* see if we need to compare the source address with the known endpoint */ if (PS_ISSET2(phc->mp.stream, STRICT_SOURCE, MEDIA_HANDOVER)) { endpoint_t endpoint = phc->mp.fsin; - mutex_lock(&phc->mp.stream->out_lock); int tmp = memcmp(&endpoint, update_endpoint, sizeof(endpoint)); if (tmp && PS_ISSET(phc->mp.stream, MEDIA_HANDOVER)) { @@ -2502,8 +2530,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc) goto update_addr; } - mutex_unlock(&phc->mp.stream->out_lock); - if (tmp && PS_ISSET(phc->mp.stream, STRICT_SOURCE)) { ilog(LOG_INFO | LOG_FLAG_LIMIT, "Drop due to strict-source attribute; " "got %s%s%s, " @@ -2593,7 +2619,6 @@ confirm_now: PS_SET(phc->mp.stream, CONFIRMED); update_peerinfo: - mutex_lock(&phc->mp.stream->out_lock); // if we're during the wait time, check the received address against the previously // learned address. if they're the same, ignore this packet for learning purposes if (!wait_time || !phc->mp.stream->learned_endpoint.address.family || @@ -2613,8 +2638,6 @@ update_peerinfo: } } update_addr: - mutex_unlock(&phc->mp.stream->out_lock); - /* check the destination address of the received packet against what we think our * local interface to use is */ if (phc->mp.stream->selected_sfd && phc->mp.sfd != phc->mp.stream->selected_sfd) { @@ -2654,9 +2677,7 @@ static void media_packet_kernel_check(struct packet_handler_ctx *phc) { if (ML_ISSET(phc->mp.media->monologue, DTMF_INJECTION_ACTIVE)) return; - mutex_lock(&phc->mp.stream->in_lock); kernelize(phc->mp.stream); - mutex_unlock(&phc->mp.stream->in_lock); } @@ -2993,19 +3014,19 @@ static int stream_packet(struct packet_handler_ctx *phc) { if (ret) goto next_mirror; - mutex_lock(&mirror_sink->out_lock); + mutex_lock(&mirror_sink->lock); if (!mirror_sink->advertised_endpoint.port || (is_addr_unspecified(&mirror_sink->advertised_endpoint.address) && !is_trickle_ice_address(&mirror_sink->advertised_endpoint))) { - mutex_unlock(&mirror_sink->out_lock); + mutex_unlock(&mirror_sink->lock); goto next_mirror; } media_socket_dequeue(&mirror_phc.mp, mirror_sink); - mutex_unlock(&mirror_sink->out_lock); + mutex_unlock(&mirror_sink->lock); next_mirror: media_socket_dequeue(&mirror_phc.mp, NULL); // just free if anything left @@ -3018,13 +3039,13 @@ next_mirror: if (ret == -1) goto err_next; - mutex_lock(&sink->out_lock); + mutex_lock(&sink->lock); if (!sink->advertised_endpoint.port || (is_addr_unspecified(&sink->advertised_endpoint.address) && !is_trickle_ice_address(&sink->advertised_endpoint))) { - mutex_unlock(&sink->out_lock); + mutex_unlock(&sink->lock); goto next; } @@ -3033,7 +3054,7 @@ next_mirror: else ret = media_socket_dequeue(&phc->mp, NULL); - mutex_unlock(&sink->out_lock); + mutex_unlock(&sink->lock); if (ret == 0) goto next; @@ -3041,10 +3062,10 @@ next_mirror: err_next: ilog(LOG_DEBUG | LOG_FLAG_LIMIT ,"Error when sending message. Error: %s", strerror(errno)); atomic64_inc_na(&sink->stats_in->errors); - mutex_lock(&sink->in_lock); + mutex_lock(&sink->lock); if (sink->selected_sfd) atomic64_inc_na(&sink->selected_sfd->local_intf->stats->out.errors); - mutex_unlock(&sink->in_lock); + mutex_unlock(&sink->lock); RTPE_STATS_INC(errors_user); goto next; diff --git a/daemon/mqtt.c b/daemon/mqtt.c index bcfbd0aba..68edb673f 100644 --- a/daemon/mqtt.c +++ b/daemon/mqtt.c @@ -338,7 +338,7 @@ static void mqtt_stream_stats_dir(const struct stream_stats *s, JsonBuilder *jso static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { - mutex_lock(&ps->in_lock); + LOCK(&ps->lock); stream_fd *sfd = ps->selected_sfd; if (sfd) { @@ -369,17 +369,11 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { json_builder_end_object(json); - mutex_unlock(&ps->in_lock); - - mutex_lock(&ps->out_lock); - json_builder_set_member_name(json, "egress"); json_builder_begin_object(json); mqtt_stream_stats_dir(ps->stats_out, json); json_builder_end_object(json); - - mutex_unlock(&ps->out_lock); } diff --git a/daemon/redis.c b/daemon/redis.c index 44680d192..c082eca86 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -2555,8 +2555,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c, void **to_free) { for (__auto_type l = c->streams.head; l; l = l->next) { struct packet_stream *ps = l->data; - LOCK(&ps->in_lock); - LOCK(&ps->out_lock); + LOCK(&ps->lock); snprintf(tmp, sizeof(tmp), "stream-%u", ps->unique_id); inner = parser->dict_add_dict_dup(root, tmp); diff --git a/daemon/rtcp.c b/daemon/rtcp.c index 4479de83a..67d4a9a7f 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -1566,7 +1566,7 @@ void rtcp_send_report(struct call_media *media, struct ssrc_entry_call *ssrc_out rtcp_ps = ps; } - LOCK(&ps->in_lock); + LOCK(&ps->lock); if (!ps->selected_sfd || !rtcp_ps->selected_sfd) return; @@ -1595,8 +1595,6 @@ void rtcp_send_report(struct call_media *media, struct ssrc_entry_call *ssrc_out str rtcp_packet = STR_GS(sr); - LOCK(&ps->out_lock); - const struct streamhandler *crypt_handler = determine_handler(&transport_protocols[PROTO_RTP_AVP], media, true); diff --git a/daemon/ssrc.c b/daemon/ssrc.c index 2076c6e72..e109387e3 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -721,7 +721,7 @@ void ssrc_collect_metrics(struct call_media *media) { } if (media->streams.head) { - LOCK(&media->streams.head->data->in_lock); + LOCK(&media->streams.head->data->lock); RTPE_SAMPLE_SFD(jitter_measured, s->jitter, media->streams.head->data->selected_sfd); } } diff --git a/daemon/t38.c b/daemon/t38.c index fe96c2028..35e2f0fa9 100644 --- a/daemon/t38.c +++ b/daemon/t38.c @@ -218,8 +218,7 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui stream_fd *sfd = NULL; if (ps) { - mutex_lock(&ps->in_lock); - mutex_lock(&ps->out_lock); + mutex_lock(&ps->lock); sfd = ps->selected_sfd; } if (sfd && sfd->socket.fd != -1 && ps->endpoint.address.family != NULL) { @@ -231,10 +230,8 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui else ilog(LOG_WARN | LOG_FLAG_LIMIT, "Unable to send T.38 UDPTL packet due to lack of " "socket or stream"); - if (ps) { - mutex_unlock(&ps->out_lock); - mutex_unlock(&ps->in_lock); - } + if (ps) + mutex_unlock(&ps->lock); g_string_free(s, TRUE); diff --git a/include/call.h b/include/call.h index c1cefafd5..683110cb6 100644 --- a/include/call.h +++ b/include/call.h @@ -408,18 +408,10 @@ TYPED_GHASHTABLE_PROTO(rtp_stats_ht, void, struct rtp_stats) * This is done through the various bit flags. */ struct packet_stream { - /* Both locks valid only with call->master_lock held in R. + /* Lock valid only with call->master_lock held in R. * Preempted by call->master_lock held in W. - * If both in/out are to be locked, in_lock must be locked first. - * - * The in_lock protects fields relevant to packet reception on that stream, - * meanwhile the out_lock protects fields relevant to packet egress. - * - * This allows packet handling on multiple ports and streams belonging - * to the same call to happen at the same time. */ - mutex_t in_lock, - out_lock; + mutex_t lock; struct call_media *media; /* RO */ call_t *call; /* RO */ @@ -428,19 +420,19 @@ struct packet_stream { struct recording_stream recording; /* LOCK: call->master_lock */ stream_fd_q sfds; /* LOCK: call->master_lock */ - stream_fd *selected_sfd; // LOCK: in_lock + stream_fd *selected_sfd; // LOCK: ps->lock endpoint_t last_local_endpoint; - struct dtls_connection ice_dtls; /* LOCK: in_lock */ - sink_handler_q rtp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */ - sink_handler_q rtcp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */ + struct dtls_connection ice_dtls; /* LOCK: ps->lock */ + sink_handler_q rtp_sinks; /* LOCK: call->master_lock, ps->lock for streamhandler */ + sink_handler_q rtcp_sinks; /* LOCK: call->master_lock, ps->lock for streamhandler */ struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */ - sink_handler_q rtp_mirrors; /* LOCK: call->master_lock, in_lock for streamhandler */ - struct endpoint endpoint; /* LOCK: out_lock */ - struct endpoint detected_endpoints[4]; /* LOCK: out_lock */ - int64_t ep_detect_signal; /* LOCK: out_lock */ + sink_handler_q rtp_mirrors; /* LOCK: call->master_lock, ps->lock for streamhandler */ + struct endpoint endpoint; /* LOCK: ps->lock */ + struct endpoint detected_endpoints[4]; /* LOCK: ps->lock */ + int64_t ep_detect_signal; /* LOCK: ps->lock */ struct endpoint advertised_endpoint; /* RO */ - struct endpoint learned_endpoint; /* LOCK: out_lock */ - struct crypto_context crypto; /* OUT direction, LOCK: out_lock */ + struct endpoint learned_endpoint; /* LOCK: ps->lock */ + struct crypto_context crypto; /* OUT direction, LOCK: ps->lock */ struct send_timer *send_timer; /* RO */ struct jitter_buffer *jb; /* RO */ int64_t kernel_time_us; @@ -453,15 +445,15 @@ struct packet_stream { enum endpoint_learning el_flags; #if RTP_LOOP_PROTECT - /* LOCK: in_lock: */ + /* LOCK: ps->lock: */ unsigned int lp_idx; struct loop_protector lp_buf[RTP_LOOP_PACKETS]; unsigned int lp_count; #endif - X509 *dtls_cert; /* LOCK: in_lock */ + X509 *dtls_cert; /* LOCK: ps->lock */ - /* in_lock must be held for SETTING these: */ + /* ps->lock must be held for SETTING these: */ atomic64 ps_flags; }; diff --git a/include/media_socket.h b/include/media_socket.h index c2bb00c9f..22ed40b85 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -310,7 +310,6 @@ void free_sfd_intf_list(struct sfd_intf_list *il); void free_release_sfd_intf_list(struct sfd_intf_list *il); void free_socket_intf_list(struct socket_intf_list *il); -void kernelize(struct packet_stream *); void __unkernelize(struct packet_stream *, const char *); void unkernelize(struct packet_stream *, const char *); void __stream_unconfirm(struct packet_stream *, const char *);