Browse Source

MT#55283 combine in/out_lock

With selected_sfd being protected by in_lock, we pretty much have to
hold at least in_lock everywhere, and end up requiring both locks in
many places. The distinction has become pointless.

Change-Id: Ic0ad976c2d68d9639b9434da7f0e6e9c0d84c185
pull/1998/head
Richard Fuchs 4 months ago
parent
commit
e03f814855
13 changed files with 108 additions and 118 deletions
  1. +5
    -6
      daemon/call.c
  2. +1
    -2
      daemon/codec.c
  3. +2
    -8
      daemon/dtls.c
  4. +2
    -4
      daemon/ice.c
  5. +6
    -6
      daemon/media_player.c
  6. +70
    -49
      daemon/media_socket.c
  7. +1
    -7
      daemon/mqtt.c
  8. +1
    -2
      daemon/redis.c
  9. +1
    -3
      daemon/rtcp.c
  10. +1
    -1
      daemon/ssrc.c
  11. +3
    -6
      daemon/t38.c
  12. +15
    -23
      include/call.h
  13. +0
    -1
      include/media_socket.h

+ 5
- 6
daemon/call.c View File

@ -189,7 +189,7 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) {
g_autoptr(stream_fd) sfd = NULL;
{
LOCK(&ps->in_lock);
LOCK(&ps->lock);
if (ps->selected_sfd)
sfd = obj_get(ps->selected_sfd);
}
@ -976,8 +976,7 @@ struct packet_stream *__packet_stream_new(call_t *call) {
struct packet_stream *stream;
stream = uid_alloc(&call->streams);
mutex_init(&stream->in_lock);
mutex_init(&stream->out_lock);
mutex_init(&stream->lock);
stream->call = call;
atomic64_set_na(&stream->last_packet_us, rtpe_now);
stream->rtp_stats = rtp_stats_ht_new();
@ -1130,16 +1129,16 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) {
}
if (MEDIA_ISSET(media, DTLS)) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->lock);
struct dtls_connection *d = dtls_ptr(ps->selected_sfd);
if (d && d->init && !d->connected) {
int dret = dtls(ps->selected_sfd, NULL, NULL);
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->lock);
if (dret == 1)
call_media_unkernelize(media, "DTLS connected");
return CSS_DTLS;
}
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->lock);
}
if (PS_ISSET(ps, PIERCE_NAT) && PS_ISSET(ps, FILLED) && !PS_ISSET(ps, CONFIRMED)) {


+ 1
- 2
daemon/codec.c View File

@ -3440,11 +3440,10 @@ static void send_buffered(struct media_packet *mp, unsigned int log_sys) {
if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, mp))
ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");
mutex_lock(&sink->out_lock);
LOCK(&sink->lock);
if (media_socket_dequeue(mp, sink))
ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT,
"Error sending buffered media to RTP sink");
mutex_unlock(&sink->out_lock);
}
}


+ 2
- 8
daemon/dtls.c View File

@ -882,7 +882,7 @@ error:
return -1;
}
/* called with call locked in W or R with ps->in_lock held */
/* called with call locked in W or R with ps->lock held */
int dtls(stream_fd *sfd, const str *s, const endpoint_t *fsin) {
struct packet_stream *ps = sfd->stream;
int ret;
@ -941,22 +941,16 @@ int dtls(stream_fd *sfd, const str *s, const endpoint_t *fsin) {
else if (ret == 1) {
/* connected! */
dret = 1;
mutex_lock(&ps->out_lock); // nested lock!
if (dtls_setup_crypto(ps, d))
{} /* XXX ?? */
mutex_unlock(&ps->out_lock);
if (PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP) && ps->rtcp_sibling
&& MEDIA_ISSET(ps->media, RTCP_MUX)
&& ps->rtcp_sibling != ps)
{
// nested locks!
mutex_lock(&ps->rtcp_sibling->in_lock);
mutex_lock(&ps->rtcp_sibling->out_lock);
LOCK(&ps->rtcp_sibling->lock);
if (dtls_setup_crypto(ps->rtcp_sibling, d))
{} /* XXX ?? */
mutex_unlock(&ps->rtcp_sibling->out_lock);
mutex_unlock(&ps->rtcp_sibling->in_lock);
}
}


+ 2
- 4
daemon/ice.c View File

@ -1141,7 +1141,7 @@ found:
t_queue_clear(&compo1);
}
/* call(W) or call(R)+agent must be locked - no in_lock or out_lock must be held */
/* call(W) or call(R)+agent must be locked - no ps->lock must be held */
static int __check_valid(struct ice_agent *ag) {
struct call_media *media;
struct packet_stream *ps;
@ -1185,7 +1185,7 @@ static int __check_valid(struct ice_agent *ag) {
ps = l->data;
pair = k->data;
mutex_lock(&ps->out_lock);
LOCK(&ps->lock);
if (memcmp(&ps->endpoint, &pair->remote_candidate->endpoint, sizeof(ps->endpoint))) {
ilogs(ice, LOG_INFO, "ICE negotiated: new peer for component %u is %s%s%s", ps->component,
FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint)));
@ -1195,9 +1195,7 @@ static int __check_valid(struct ice_agent *ag) {
else
ilogs(ice, LOG_INFO, "ICE negotiated: peer for component %u is %s%s%s", ps->component,
FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint)));
mutex_unlock(&ps->out_lock);
LOCK(&ps->in_lock);
for (__auto_type m = ps->sfds.head; m; m = m->next) {
sfd = m->data;
if (sfd->local_intf != pair->local_intf)


+ 6
- 6
daemon/media_player.c View File

@ -432,11 +432,11 @@ static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp)
log_info_call(call);
rwlock_lock_r(&call->master_lock);
mutex_lock(&st->sink->out_lock);
mutex_lock(&st->sink->lock);
__send_timer_send_common(st, cp);
mutex_unlock(&st->sink->out_lock);
mutex_unlock(&st->sink->lock);
__send_timer_rtcp(st, ssrc_out);
ssrc_entry_release(ssrc_out);
@ -591,10 +591,10 @@ retry:;
media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet);
mutex_lock(&mp->sink->out_lock);
mutex_lock(&mp->sink->lock);
if (media_socket_dequeue(&packet, mp->sink))
ilog(LOG_ERR, "Error sending playback media to RTP sink");
mutex_unlock(&mp->sink->out_lock);
mutex_unlock(&mp->sink->lock);
// schedule our next run
mp->next_run += us_dur;
@ -1074,10 +1074,10 @@ void media_player_add_packet(struct media_player *mp, char *buf, size_t len,
media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet);
mutex_lock(&mp->sink->out_lock);
mutex_lock(&mp->sink->lock);
if (media_socket_dequeue(&packet, mp->sink))
ilog(LOG_ERR, "Error sending playback media to RTP sink");
mutex_unlock(&mp->sink->out_lock);
mutex_unlock(&mp->sink->lock);
mp->next_run += us_dur;
timerthread_obj_schedule_abs(&mp->tt_obj, mp->next_run);


+ 70
- 49
daemon/media_socket.c View File

@ -1574,9 +1574,7 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st
__auto_type reti = &s->reti;
if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
mutex_lock(&stream->out_lock);
__re_address_translate_ep(&reti->expected_src, MEDIA_ISSET(media, ASYMMETRIC) ? &stream->learned_endpoint : &stream->endpoint);
mutex_unlock(&stream->out_lock);
if (PS_ISSET(stream, STRICT_SOURCE))
reti->src_mismatch = MSM_DROP;
else if (PS_ISSET(stream, MEDIA_HANDOVER))
@ -1714,6 +1712,12 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st
return NULL;
}
/**
* The linkage between userspace and kernel module is in the kernelize_one().
*
* Called with stream->lock held.
* sink_handler can be NULL.
*/
__attribute__((nonnull(1, 2, 3)))
static const char *kernelize_one(kernelize_state *s,
struct packet_stream *stream, struct sink_handler *sink_handler)
@ -1789,7 +1793,12 @@ static const char *kernelize_one(kernelize_state *s,
if (MEDIA_ISSET(media, ECHO) || sink_handler->attrs.transcoding)
redi->output.ssrc_subst = 1;
mutex_lock(&sink->out_lock);
// XXX nested lock, avoid possible deadlock. should be reworked not to
// require a nested lock
if (sink != stream && mutex_trylock(&sink->lock)) {
g_free(redi);
return ""; // indicate deadlock
}
__re_address_translate_ep(&redi->output.dst_addr, &sink->endpoint);
__re_address_translate_ep(&redi->output.src_addr, &sink->selected_sfd->socket.local);
@ -1816,7 +1825,8 @@ static const char *kernelize_one(kernelize_state *s,
handler->out->kernel(&redi->output.encrypt, sink);
mutex_unlock(&sink->out_lock);
if (sink != stream)
mutex_unlock(&sink->lock);
if (!redi->output.encrypt.cipher || !redi->output.encrypt.hmac) {
g_free(redi);
@ -1833,31 +1843,39 @@ static const char *kernelize_one(kernelize_state *s,
return NULL;
}
// helper function for kernelize()
static void kernelize_one_sink_handler(kernelize_state *s,
// called with stream->lock held
static bool kernelize_one_sink_handler(kernelize_state *s,
struct packet_stream *stream, struct sink_handler *sink_handler)
{
struct packet_stream *sink = sink_handler->sink;
if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED))
return;
return true;
const char *err = kernelize_one(s, stream, sink_handler);
if (err)
if (err) {
if (!*err)
return false; // indicate deadlock
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err);
}
return true;
}
/* called with in_lock held */
void kernelize(struct packet_stream *stream) {
/* called with master_lock held */
static void kernelize(struct packet_stream *stream) {
call_t *call = stream->call;
const char *nk_warn_msg;
struct call_media *media = stream->media;
g_auto(kernelize_state) s = {0};
if (PS_ISSET(stream, KERNELIZED))
while (true) {
LOCK(&stream->lock);
// set flag, return if set already
if (PS_SET(stream, KERNELIZED))
return;
if (call->recording != NULL && !selected_recording_method->kernel_support)
goto no_kernel;
if (!kernel.is_wanted)
goto no_kernel;
nk_warn_msg = "interface to kernel module not open";
if (!kernel.is_open)
goto no_kernel_warn;
if (MEDIA_ISSET(media, GENERATOR))
@ -1878,12 +1896,16 @@ void kernelize(struct packet_stream *stream) {
struct sink_handler *sh = l->data;
if (sh->attrs.block_media)
continue;
kernelize_one_sink_handler(&s, stream, sh);
bool ok = kernelize_one_sink_handler(&s, stream, sh);
if (!ok)
goto restart;
}
// RTP egress mirrors
for (__auto_type l = stream->rtp_mirrors.head; l; l = l->next) {
struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&s, stream, sh);
bool ok = kernelize_one_sink_handler(&s, stream, sh);
if (!ok)
goto restart;
}
// RTP -> RTCP sinks
// record number of RTP destinations up to now
@ -1893,7 +1915,9 @@ void kernelize(struct packet_stream *stream) {
s.payload_types = NULL;
for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&s, stream, sh);
bool ok = kernelize_one_sink_handler(&s, stream, sh);
if (!ok)
goto restart;
}
// mark the start of RTCP outputs
s.reti.num_rtcp_destinations = s.reti.num_destinations - num_rtp_dests;
@ -1915,15 +1939,25 @@ void kernelize(struct packet_stream *stream) {
}
stream->kernel_time_us = rtpe_now;
PS_SET(stream, KERNELIZED);
return;
no_kernel_warn:
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", nk_warn_msg);
ilog(LOG_WARNING, "No support for kernel packet forwarding available "
"(interface to kernel module not open)");
no_kernel:
PS_SET(stream, KERNELIZED);
stream->kernel_time_us = rtpe_now;
PS_SET(stream, NO_KERNEL_SUPPORT);
return;
restart: // handle detected deadlock
rtp_stats_arr_destroy_ptr(s.payload_types);
while ((redi = t_queue_pop_head(&s.outputs)))
g_free(redi);
// try again, releases stream->lock
}
}
// must be called with appropriate locks (master lock and/or in/out_lock)
@ -1952,7 +1986,7 @@ struct ssrc_entry_call *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_entry_call *l
}
/* must be called with in_lock held or call->master_lock held in W */
/* must be called with ps->lock held or call->master_lock held in W */
void __unkernelize(struct packet_stream *p, const char *reason) {
if (!p->selected_sfd)
return;
@ -1998,9 +2032,8 @@ void __stream_unconfirm(struct packet_stream *ps, const char *reason) {
static void stream_unconfirm(struct packet_stream *ps, const char *reason) {
if (!ps)
return;
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
__stream_unconfirm(ps, reason);
mutex_unlock(&ps->in_lock);
}
static void unconfirm_sinks(sink_handler_q *q, const char *reason) {
for (__auto_type l = q->head; l; l = l->next) {
@ -2011,9 +2044,8 @@ static void unconfirm_sinks(sink_handler_q *q, const char *reason) {
void unkernelize(struct packet_stream *ps, const char *reason) {
if (!ps)
return;
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
__unkernelize(ps, reason);
mutex_unlock(&ps->in_lock);
}
@ -2050,7 +2082,7 @@ err:
return &__sh_noop;
}
/* must be called with call->master_lock held in R, and in->in_lock held */
/* must be called with call->master_lock held in R, and in->lock held */
// `sh` can be null
static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *sh) {
const struct transport_protocol *in_proto, *out_proto;
@ -2184,14 +2216,13 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) {
}
}
mutex_lock(&phc->mp.stream->in_lock);
LOCK(&phc->mp.stream->lock);
int ret = dtls(phc->mp.sfd, &phc->s, &phc->mp.fsin);
if (ret == 1) {
phc->unkernelize = "DTLS connected";
phc->unkernelize_subscriptions = true;
ret = 0;
}
mutex_unlock(&phc->mp.stream->in_lock);
if (!ret)
return 0;
}
@ -2216,7 +2247,7 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) {
#if RTP_LOOP_PROTECT
// returns: 0 = ok, proceed; -1 = duplicate detected, drop packet
static int media_loop_detect(struct packet_handler_ctx *phc) {
LOCK(&phc->mp.stream->in_lock);
LOCK(&phc->mp.stream->lock);
for (int i = 0; i < RTP_LOOP_PACKETS; i++) {
if (phc->mp.stream->lp_buf[i].len != phc->s.len)
@ -2354,7 +2385,7 @@ static void media_packet_rtp_out(struct packet_handler_ctx *phc, struct sink_han
static int media_packet_decrypt(struct packet_handler_ctx *phc)
{
LOCK(&phc->in_srtp->in_lock);
LOCK(&phc->in_srtp->lock);
struct sink_handler *first_sh = phc->sinks->length ? phc->sinks->head->data : NULL;
const struct streamhandler *sh = __determine_handler(phc->in_srtp, first_sh);
@ -2383,7 +2414,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc)
}
static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink_handler *sh)
{
LOCK(&phc->in_srtp->in_lock);
LOCK(&phc->in_srtp->lock);
__determine_handler(phc->in_srtp, sh);
// XXX use an array with index instead of if/else
@ -2401,7 +2432,7 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s
if (!encrypt_func)
return 0x00;
mutex_lock(&out->out_lock);
LOCK(&out->lock);
for (__auto_type l = mp->packets_out.head; l; l = l->next) {
struct codec_packet *p = l->data;
@ -2417,8 +2448,6 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s
ret |= 0x01;
}
mutex_unlock(&out->out_lock);
return ret;
}
@ -2437,7 +2466,7 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
{
bool ret = false;
LOCK(&phc->mp.stream->in_lock);
LOCK(&phc->mp.stream->lock);
/* we're OK to (potentially) use the source address of this packet as destination
* in the other direction. */
@ -2488,7 +2517,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
/* see if we need to compare the source address with the known endpoint */
if (PS_ISSET2(phc->mp.stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
endpoint_t endpoint = phc->mp.fsin;
mutex_lock(&phc->mp.stream->out_lock);
int tmp = memcmp(&endpoint, update_endpoint, sizeof(endpoint));
if (tmp && PS_ISSET(phc->mp.stream, MEDIA_HANDOVER)) {
@ -2502,8 +2530,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
goto update_addr;
}
mutex_unlock(&phc->mp.stream->out_lock);
if (tmp && PS_ISSET(phc->mp.stream, STRICT_SOURCE)) {
ilog(LOG_INFO | LOG_FLAG_LIMIT, "Drop due to strict-source attribute; "
"got %s%s%s, "
@ -2593,7 +2619,6 @@ confirm_now:
PS_SET(phc->mp.stream, CONFIRMED);
update_peerinfo:
mutex_lock(&phc->mp.stream->out_lock);
// if we're during the wait time, check the received address against the previously
// learned address. if they're the same, ignore this packet for learning purposes
if (!wait_time || !phc->mp.stream->learned_endpoint.address.family ||
@ -2613,8 +2638,6 @@ update_peerinfo:
}
}
update_addr:
mutex_unlock(&phc->mp.stream->out_lock);
/* check the destination address of the received packet against what we think our
* local interface to use is */
if (phc->mp.stream->selected_sfd && phc->mp.sfd != phc->mp.stream->selected_sfd) {
@ -2654,9 +2677,7 @@ static void media_packet_kernel_check(struct packet_handler_ctx *phc) {
if (ML_ISSET(phc->mp.media->monologue, DTMF_INJECTION_ACTIVE))
return;
mutex_lock(&phc->mp.stream->in_lock);
kernelize(phc->mp.stream);
mutex_unlock(&phc->mp.stream->in_lock);
}
@ -2993,19 +3014,19 @@ static int stream_packet(struct packet_handler_ctx *phc) {
if (ret)
goto next_mirror;
mutex_lock(&mirror_sink->out_lock);
mutex_lock(&mirror_sink->lock);
if (!mirror_sink->advertised_endpoint.port
|| (is_addr_unspecified(&mirror_sink->advertised_endpoint.address)
&& !is_trickle_ice_address(&mirror_sink->advertised_endpoint)))
{
mutex_unlock(&mirror_sink->out_lock);
mutex_unlock(&mirror_sink->lock);
goto next_mirror;
}
media_socket_dequeue(&mirror_phc.mp, mirror_sink);
mutex_unlock(&mirror_sink->out_lock);
mutex_unlock(&mirror_sink->lock);
next_mirror:
media_socket_dequeue(&mirror_phc.mp, NULL); // just free if anything left
@ -3018,13 +3039,13 @@ next_mirror:
if (ret == -1)
goto err_next;
mutex_lock(&sink->out_lock);
mutex_lock(&sink->lock);
if (!sink->advertised_endpoint.port
|| (is_addr_unspecified(&sink->advertised_endpoint.address)
&& !is_trickle_ice_address(&sink->advertised_endpoint)))
{
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
goto next;
}
@ -3033,7 +3054,7 @@ next_mirror:
else
ret = media_socket_dequeue(&phc->mp, NULL);
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
if (ret == 0)
goto next;
@ -3041,10 +3062,10 @@ next_mirror:
err_next:
ilog(LOG_DEBUG | LOG_FLAG_LIMIT ,"Error when sending message. Error: %s", strerror(errno));
atomic64_inc_na(&sink->stats_in->errors);
mutex_lock(&sink->in_lock);
mutex_lock(&sink->lock);
if (sink->selected_sfd)
atomic64_inc_na(&sink->selected_sfd->local_intf->stats->out.errors);
mutex_unlock(&sink->in_lock);
mutex_unlock(&sink->lock);
RTPE_STATS_INC(errors_user);
goto next;


+ 1
- 7
daemon/mqtt.c View File

@ -338,7 +338,7 @@ static void mqtt_stream_stats_dir(const struct stream_stats *s, JsonBuilder *jso
static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
stream_fd *sfd = ps->selected_sfd;
if (sfd) {
@ -369,17 +369,11 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_end_object(json);
mutex_unlock(&ps->in_lock);
mutex_lock(&ps->out_lock);
json_builder_set_member_name(json, "egress");
json_builder_begin_object(json);
mqtt_stream_stats_dir(ps->stats_out, json);
json_builder_end_object(json);
mutex_unlock(&ps->out_lock);
}


+ 1
- 2
daemon/redis.c View File

@ -2555,8 +2555,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c, void **to_free) {
for (__auto_type l = c->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
LOCK(&ps->in_lock);
LOCK(&ps->out_lock);
LOCK(&ps->lock);
snprintf(tmp, sizeof(tmp), "stream-%u", ps->unique_id);
inner = parser->dict_add_dict_dup(root, tmp);


+ 1
- 3
daemon/rtcp.c View File

@ -1566,7 +1566,7 @@ void rtcp_send_report(struct call_media *media, struct ssrc_entry_call *ssrc_out
rtcp_ps = ps;
}
LOCK(&ps->in_lock);
LOCK(&ps->lock);
if (!ps->selected_sfd || !rtcp_ps->selected_sfd)
return;
@ -1595,8 +1595,6 @@ void rtcp_send_report(struct call_media *media, struct ssrc_entry_call *ssrc_out
str rtcp_packet = STR_GS(sr);
LOCK(&ps->out_lock);
const struct streamhandler *crypt_handler = determine_handler(&transport_protocols[PROTO_RTP_AVP],
media, true);


+ 1
- 1
daemon/ssrc.c View File

@ -721,7 +721,7 @@ void ssrc_collect_metrics(struct call_media *media) {
}
if (media->streams.head) {
LOCK(&media->streams.head->data->in_lock);
LOCK(&media->streams.head->data->lock);
RTPE_SAMPLE_SFD(jitter_measured, s->jitter, media->streams.head->data->selected_sfd);
}
}


+ 3
- 6
daemon/t38.c View File

@ -218,8 +218,7 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui
stream_fd *sfd = NULL;
if (ps) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->out_lock);
mutex_lock(&ps->lock);
sfd = ps->selected_sfd;
}
if (sfd && sfd->socket.fd != -1 && ps->endpoint.address.family != NULL) {
@ -231,10 +230,8 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui
else
ilog(LOG_WARN | LOG_FLAG_LIMIT, "Unable to send T.38 UDPTL packet due to lack of "
"socket or stream");
if (ps) {
mutex_unlock(&ps->out_lock);
mutex_unlock(&ps->in_lock);
}
if (ps)
mutex_unlock(&ps->lock);
g_string_free(s, TRUE);


+ 15
- 23
include/call.h View File

@ -408,18 +408,10 @@ TYPED_GHASHTABLE_PROTO(rtp_stats_ht, void, struct rtp_stats)
* This is done through the various bit flags.
*/
struct packet_stream {
/* Both locks valid only with call->master_lock held in R.
/* Lock valid only with call->master_lock held in R.
* Preempted by call->master_lock held in W.
* If both in/out are to be locked, in_lock must be locked first.
*
* The in_lock protects fields relevant to packet reception on that stream,
* meanwhile the out_lock protects fields relevant to packet egress.
*
* This allows packet handling on multiple ports and streams belonging
* to the same call to happen at the same time.
*/
mutex_t in_lock,
out_lock;
mutex_t lock;
struct call_media *media; /* RO */
call_t *call; /* RO */
@ -428,19 +420,19 @@ struct packet_stream {
struct recording_stream recording; /* LOCK: call->master_lock */
stream_fd_q sfds; /* LOCK: call->master_lock */
stream_fd *selected_sfd; // LOCK: in_lock
stream_fd *selected_sfd; // LOCK: ps->lock
endpoint_t last_local_endpoint;
struct dtls_connection ice_dtls; /* LOCK: in_lock */
sink_handler_q rtp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
sink_handler_q rtcp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
struct dtls_connection ice_dtls; /* LOCK: ps->lock */
sink_handler_q rtp_sinks; /* LOCK: call->master_lock, ps->lock for streamhandler */
sink_handler_q rtcp_sinks; /* LOCK: call->master_lock, ps->lock for streamhandler */
struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */
sink_handler_q rtp_mirrors; /* LOCK: call->master_lock, in_lock for streamhandler */
struct endpoint endpoint; /* LOCK: out_lock */
struct endpoint detected_endpoints[4]; /* LOCK: out_lock */
int64_t ep_detect_signal; /* LOCK: out_lock */
sink_handler_q rtp_mirrors; /* LOCK: call->master_lock, ps->lock for streamhandler */
struct endpoint endpoint; /* LOCK: ps->lock */
struct endpoint detected_endpoints[4]; /* LOCK: ps->lock */
int64_t ep_detect_signal; /* LOCK: ps->lock */
struct endpoint advertised_endpoint; /* RO */
struct endpoint learned_endpoint; /* LOCK: out_lock */
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct endpoint learned_endpoint; /* LOCK: ps->lock */
struct crypto_context crypto; /* OUT direction, LOCK: ps->lock */
struct send_timer *send_timer; /* RO */
struct jitter_buffer *jb; /* RO */
int64_t kernel_time_us;
@ -453,15 +445,15 @@ struct packet_stream {
enum endpoint_learning el_flags;
#if RTP_LOOP_PROTECT
/* LOCK: in_lock: */
/* LOCK: ps->lock: */
unsigned int lp_idx;
struct loop_protector lp_buf[RTP_LOOP_PACKETS];
unsigned int lp_count;
#endif
X509 *dtls_cert; /* LOCK: in_lock */
X509 *dtls_cert; /* LOCK: ps->lock */
/* in_lock must be held for SETTING these: */
/* ps->lock must be held for SETTING these: */
atomic64 ps_flags;
};


+ 0
- 1
include/media_socket.h View File

@ -310,7 +310,6 @@ void free_sfd_intf_list(struct sfd_intf_list *il);
void free_release_sfd_intf_list(struct sfd_intf_list *il);
void free_socket_intf_list(struct socket_intf_list *il);
void kernelize(struct packet_stream *);
void __unkernelize(struct packet_stream *, const char *);
void unkernelize(struct packet_stream *, const char *);
void __stream_unconfirm(struct packet_stream *, const char *);


Loading…
Cancel
Save