Browse Source

MT#55283 combine in/out_lock

With selected_sfd being protected by in_lock, we pretty much have to
hold at least in_lock everywhere, and end up requiring both locks in
many places. The distinction has become pointless.

Change-Id: Ic0ad976c2d68d9639b9434da7f0e6e9c0d84c185
(cherry picked from commit e03f814855)
(cherry picked from commit 222fcaa4b0)
mr12.5
Richard Fuchs 4 months ago
parent
commit
abf1ad73d2
13 changed files with 122 additions and 141 deletions
  1. +7
    -10
      daemon/call.c
  2. +3
    -4
      daemon/codec.c
  3. +2
    -8
      daemon/dtls.c
  4. +2
    -4
      daemon/ice.c
  5. +6
    -6
      daemon/media_player.c
  6. +75
    -60
      daemon/media_socket.c
  7. +1
    -7
      daemon/mqtt.c
  8. +2
    -4
      daemon/redis.c
  9. +1
    -3
      daemon/rtcp.c
  10. +1
    -1
      daemon/ssrc.c
  11. +3
    -6
      daemon/t38.c
  12. +19
    -27
      include/call.h
  13. +0
    -1
      include/media_socket.h

+ 7
- 10
daemon/call.c View File

@ -186,7 +186,7 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) {
g_autoptr(stream_fd) sfd = NULL;
{
LOCK(&ps->in_lock);
LOCK(&ps->lock);
if (ps->selected_sfd)
sfd = obj_get(ps->selected_sfd);
}
@ -999,8 +999,7 @@ struct packet_stream *__packet_stream_new(call_t *call) {
struct packet_stream *stream;
stream = uid_slice_alloc0(stream, &call->streams.q);
mutex_init(&stream->in_lock);
mutex_init(&stream->out_lock);
mutex_init(&stream->lock);
stream->call = call;
atomic64_set_na(&stream->last_packet, rtpe_now.tv_sec);
stream->rtp_stats = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __rtp_stats_free);
@ -1111,21 +1110,19 @@ void call_stream_crypto_reset(struct packet_stream *ps) {
crypto_reset(&ps->crypto);
if (PS_ISSET(ps, RTP)) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) {
if (!ps->ssrc_in[u]) // end of list
break;
atomic_set_na(&ps->ssrc_in[u]->stats->ext_seq, 0);
}
mutex_unlock(&ps->in_lock);
mutex_lock(&ps->out_lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) {
if (!ps->ssrc_out[u]) // end of list
break;
atomic_set_na(&ps->ssrc_out[u]->stats->ext_seq, 0);
}
mutex_unlock(&ps->out_lock);
mutex_unlock(&ps->lock);
}
}
@ -1152,16 +1149,16 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) {
}
if (MEDIA_ISSET(media, DTLS)) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->lock);
struct dtls_connection *d = dtls_ptr(ps->selected_sfd);
if (d && d->init && !d->connected) {
int dret = dtls(ps->selected_sfd, NULL, NULL);
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->lock);
if (dret == 1)
call_media_unkernelize(media, "DTLS connected");
return CSS_DTLS;
}
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->lock);
}
if (PS_ISSET(ps, PIERCE_NAT) && PS_ISSET(ps, FILLED) && !PS_ISSET(ps, CONFIRMED)) {


+ 3
- 4
daemon/codec.c View File

@ -1015,14 +1015,14 @@ static void __rtcp_timer_run(struct codec_timer *ct) {
struct ssrc_ctx *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,};
if (media->streams.head) {
struct packet_stream *ps = media->streams.head->data;
mutex_lock(&ps->out_lock);
mutex_lock(&ps->lock);
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
if (!ps->ssrc_out[u]) // end of list
break;
ssrc_out[u] = ps->ssrc_out[u];
ssrc_ctx_hold(ssrc_out[u]);
}
mutex_unlock(&ps->out_lock);
mutex_unlock(&ps->lock);
}
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
@ -2973,11 +2973,10 @@ static void send_buffered(struct media_packet *mp, unsigned int log_sys) {
if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, mp))
ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");
mutex_lock(&sink->out_lock);
LOCK(&sink->lock);
if (media_socket_dequeue(mp, sink))
ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT,
"Error sending buffered media to RTP sink");
mutex_unlock(&sink->out_lock);
}
}


+ 2
- 8
daemon/dtls.c View File

@ -841,7 +841,7 @@ error:
return -1;
}
/* called with call locked in W or R with ps->in_lock held */
/* called with call locked in W or R with ps->lock held */
int dtls(stream_fd *sfd, const str *s, const endpoint_t *fsin) {
struct packet_stream *ps = sfd->stream;
int ret;
@ -889,22 +889,16 @@ int dtls(stream_fd *sfd, const str *s, const endpoint_t *fsin) {
else if (ret == 1) {
/* connected! */
dret = 1;
mutex_lock(&ps->out_lock); // nested lock!
if (dtls_setup_crypto(ps, d))
{} /* XXX ?? */
mutex_unlock(&ps->out_lock);
if (PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP) && ps->rtcp_sibling
&& MEDIA_ISSET(ps->media, RTCP_MUX)
&& ps->rtcp_sibling != ps)
{
// nested locks!
mutex_lock(&ps->rtcp_sibling->in_lock);
mutex_lock(&ps->rtcp_sibling->out_lock);
LOCK(&ps->rtcp_sibling->lock);
if (dtls_setup_crypto(ps->rtcp_sibling, d))
{} /* XXX ?? */
mutex_unlock(&ps->rtcp_sibling->out_lock);
mutex_unlock(&ps->rtcp_sibling->in_lock);
}
}


+ 2
- 4
daemon/ice.c View File

@ -1196,7 +1196,7 @@ found:
t_queue_clear(&compo1);
}
/* call(W) or call(R)+agent must be locked - no in_lock or out_lock must be held */
/* call(W) or call(R)+agent must be locked - no ps->lock must be held */
static int __check_valid(struct ice_agent *ag) {
struct call_media *media;
struct packet_stream *ps;
@ -1240,7 +1240,7 @@ static int __check_valid(struct ice_agent *ag) {
ps = l->data;
pair = k->data;
mutex_lock(&ps->out_lock);
LOCK(&ps->lock);
if (memcmp(&ps->endpoint, &pair->remote_candidate->endpoint, sizeof(ps->endpoint))) {
ilogs(ice, LOG_INFO, "ICE negotiated: new peer for component %u is %s%s%s", ps->component,
FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint)));
@ -1250,9 +1250,7 @@ static int __check_valid(struct ice_agent *ag) {
else
ilogs(ice, LOG_INFO, "ICE negotiated: peer for component %u is %s%s%s", ps->component,
FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint)));
mutex_unlock(&ps->out_lock);
LOCK(&ps->in_lock);
for (__auto_type m = ps->sfds.head; m; m = m->next) {
sfd = m->data;
if (sfd->local_intf != pair->local_intf)


+ 6
- 6
daemon/media_player.c View File

@ -375,11 +375,11 @@ static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp)
log_info_call(call);
rwlock_lock_r(&call->master_lock);
mutex_lock(&st->sink->out_lock);
mutex_lock(&st->sink->lock);
__send_timer_send_common(st, cp);
mutex_unlock(&st->sink->out_lock);
mutex_unlock(&st->sink->lock);
__send_timer_rtcp(st, ssrc_out);
ssrc_ctx_put(&ssrc_out);
@ -529,10 +529,10 @@ retry:;
media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet);
mutex_lock(&mp->sink->out_lock);
mutex_lock(&mp->sink->lock);
if (media_socket_dequeue(&packet, mp->sink))
ilog(LOG_ERR, "Error sending playback media to RTP sink");
mutex_unlock(&mp->sink->out_lock);
mutex_unlock(&mp->sink->lock);
// schedule our next run
timeval_add_usec(&mp->next_run, us_dur);
@ -994,10 +994,10 @@ void media_player_add_packet(struct media_player *mp, char *buf, size_t len,
media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet);
mutex_lock(&mp->sink->out_lock);
mutex_lock(&mp->sink->lock);
if (media_socket_dequeue(&packet, mp->sink))
ilog(LOG_ERR, "Error sending playback media to RTP sink");
mutex_unlock(&mp->sink->out_lock);
mutex_unlock(&mp->sink->lock);
timeval_add_usec(&mp->next_run, us_dur);
timerthread_obj_schedule_abs(&mp->tt_obj, &mp->next_run);


+ 75
- 60
daemon/media_socket.c View File

@ -1479,9 +1479,7 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out
goto output;
if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
mutex_lock(&stream->out_lock);
__re_address_translate_ep(&reti->expected_src, MEDIA_ISSET(media, ASYMMETRIC) ? &stream->learned_endpoint : &stream->endpoint);
mutex_unlock(&stream->out_lock);
if (PS_ISSET(stream, STRICT_SOURCE))
reti->src_mismatch = MSM_DROP;
else if (PS_ISSET(stream, MEDIA_HANDOVER))
@ -1620,7 +1618,12 @@ output:
reti->pt_filter = 1;
}
mutex_lock(&sink->out_lock);
// XXX nested lock, avoid possible deadlock. should be reworked not to
// require a nested lock
if (sink != stream && mutex_trylock(&sink->lock)) {
g_slice_free1(sizeof(*redi), redi);
return ""; // indicate deadlock
}
__re_address_translate_ep(&redi->output.dst_addr, &sink->endpoint);
__re_address_translate_ep(&redi->output.src_addr, &sink->selected_sfd->socket.local);
@ -1642,7 +1645,8 @@ output:
handler->out->kernel(&redi->output.encrypt, sink);
mutex_unlock(&sink->out_lock);
if (sink != stream)
mutex_unlock(&sink->lock);
if (!redi->output.encrypt.cipher || !redi->output.encrypt.hmac) {
g_slice_free1(sizeof(*redi), redi);
@ -1659,32 +1663,39 @@ output:
return NULL;
}
// helper function for kernelize()
static void kernelize_one_sink_handler(struct rtpengine_target_info *reti, GQueue *outputs,
static bool kernelize_one_sink_handler(struct rtpengine_target_info *reti, GQueue *outputs,
struct packet_stream *stream, struct sink_handler *sink_handler, sink_handler_q *sinks,
GList **payload_types)
{
struct packet_stream *sink = sink_handler->sink;
if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED))
return;
return true;
const char *err = kernelize_one(reti, outputs, stream, sink_handler, &stream->rtp_sinks,
payload_types);
if (err)
if (err) {
if (!*err)
return false; // indicate deadlock
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err);
}
return true;
}
/* called with in_lock held */
void kernelize(struct packet_stream *stream) {
/* called with master_lock held */
static void kernelize(struct packet_stream *stream) {
call_t *call = stream->call;
const char *nk_warn_msg;
struct call_media *media = stream->media;
if (PS_ISSET(stream, KERNELIZED))
while (true) {
LOCK(&stream->lock);
// set flag, return if set already
if (PS_SET(stream, KERNELIZED))
return;
if (call->recording != NULL && !selected_recording_method->kernel_support)
goto no_kernel;
if (!kernel.is_wanted)
goto no_kernel;
nk_warn_msg = "interface to kernel module not open";
if (!kernel.is_open)
goto no_kernel_warn;
if (MEDIA_ISSET(media, GENERATOR))
@ -1706,27 +1717,36 @@ void kernelize(struct packet_stream *stream) {
if (num_sinks == 0) {
// add blackhole kernel rule
const char *err = kernelize_one(&reti, &outputs, stream, NULL, NULL, &payload_types);
if (err)
if (err) {
if (!*err)
goto restart;
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err);
}
}
else {
for (__auto_type l = stream->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
if (sh->attrs.block_media)
continue;
kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
bool ok = kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
&payload_types);
if (!ok)
goto restart;
}
for (__auto_type l = stream->rtp_mirrors.head; l; l = l->next) {
struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
bool ok = kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
&payload_types);
if (!ok)
goto restart;
}
// record number of RTP destinations
unsigned int num_rtp_dests = reti.num_destinations;
for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks, NULL);
bool ok = kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks, NULL);
if (!ok)
goto restart;
}
reti.num_rtcp_destinations = reti.num_destinations - num_rtp_dests;
}
@ -1750,15 +1770,25 @@ void kernelize(struct packet_stream *stream) {
}
stream->kernel_time = rtpe_now.tv_sec;
PS_SET(stream, KERNELIZED);
return;
no_kernel_warn:
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", nk_warn_msg);
ilog(LOG_WARNING, "No support for kernel packet forwarding available "
"(interface to kernel module not open)");
no_kernel:
PS_SET(stream, KERNELIZED);
stream->kernel_time = rtpe_now.tv_sec;
PS_SET(stream, NO_KERNEL_SUPPORT);
return;
restart: // handle detected deadlock
g_list_free(payload_types);
while ((redi = g_queue_pop_head(&outputs)))
g_slice_free1(sizeof(*redi), redi);
// try again, releases stream->lock
}
}
// must be called with appropriate locks (master lock and/or in/out_lock)
@ -1787,7 +1817,7 @@ struct ssrc_ctx *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_S
}
/* must be called with in_lock held or call->master_lock held in W */
/* must be called with ps->lock held or call->master_lock held in W */
void __unkernelize(struct packet_stream *p, const char *reason) {
if (!p->selected_sfd)
return;
@ -1833,9 +1863,8 @@ void __stream_unconfirm(struct packet_stream *ps, const char *reason) {
static void stream_unconfirm(struct packet_stream *ps, const char *reason) {
if (!ps)
return;
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
__stream_unconfirm(ps, reason);
mutex_unlock(&ps->in_lock);
}
static void unconfirm_sinks(sink_handler_q *q, const char *reason) {
for (__auto_type l = q->head; l; l = l->next) {
@ -1846,9 +1875,8 @@ static void unconfirm_sinks(sink_handler_q *q, const char *reason) {
void unkernelize(struct packet_stream *ps, const char *reason) {
if (!ps)
return;
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
__unkernelize(ps, reason);
mutex_unlock(&ps->in_lock);
}
@ -1885,7 +1913,7 @@ err:
return &__sh_noop;
}
/* must be called with call->master_lock held in R, and in->in_lock held */
/* must be called with call->master_lock held in R, and in->lock held */
// `sh` can be null
static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *sh) {
const struct transport_protocol *in_proto, *out_proto;
@ -1994,7 +2022,7 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc,
static const char *__stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs,
struct ssrc_ctx **ssrc_in_p, struct ssrc_hash *ssrc_hash)
{
return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->in_lock, in_srtp->ssrc_in,
return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->lock, in_srtp->ssrc_in,
&in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, SSRC_DIR_INPUT, "ingress");
}
// check and update output SSRC pointers
@ -2004,12 +2032,12 @@ static const char *__stream_ssrc_out(struct packet_stream *out_srtp, uint32_t ss
bool ssrc_change)
{
if (ssrc_change)
return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->out_lock,
return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->lock,
out_srtp->ssrc_out,
&out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT,
"egress (mapped)");
return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->out_lock,
return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->lock,
out_srtp->ssrc_out,
&out_srtp->ssrc_out_idx, 0, ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT,
"egress (direct)");
@ -2034,14 +2062,13 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) {
}
}
mutex_lock(&phc->mp.stream->in_lock);
LOCK(&phc->mp.stream->lock);
int ret = dtls(phc->mp.sfd, &phc->s, &phc->mp.fsin);
if (ret == 1) {
phc->unkernelize = "DTLS connected";
phc->unkernelize_subscriptions = true;
ret = 0;
}
mutex_unlock(&phc->mp.stream->in_lock);
if (!ret)
return 0;
}
@ -2066,7 +2093,7 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) {
#if RTP_LOOP_PROTECT
// returns: 0 = ok, proceed; -1 = duplicate detected, drop packet
static int media_loop_detect(struct packet_handler_ctx *phc) {
mutex_lock(&phc->mp.stream->in_lock);
mutex_lock(&phc->mp.stream->lock);
for (int i = 0; i < RTP_LOOP_PACKETS; i++) {
if (phc->mp.stream->lp_buf[i].len != phc->s.len)
@ -2080,7 +2107,7 @@ static int media_loop_detect(struct packet_handler_ctx *phc) {
"to avoid potential loop",
RTP_LOOP_MAX_COUNT,
FMT_M(endpoint_print_buf(&phc->mp.fsin)));
mutex_unlock(&phc->mp.stream->in_lock);
mutex_unlock(&phc->mp.stream->lock);
return -1;
}
@ -2094,7 +2121,7 @@ static int media_loop_detect(struct packet_handler_ctx *phc) {
memcpy(phc->mp.stream->lp_buf[phc->mp.stream->lp_idx].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT));
phc->mp.stream->lp_idx = (phc->mp.stream->lp_idx + 1) % RTP_LOOP_PACKETS;
loop_ok:
mutex_unlock(&phc->mp.stream->in_lock);
mutex_unlock(&phc->mp.stream->lock);
return 0;
}
@ -2213,7 +2240,7 @@ static void media_packet_rtp_out(struct packet_handler_ctx *phc, struct sink_han
static int media_packet_decrypt(struct packet_handler_ctx *phc)
{
mutex_lock(&phc->in_srtp->in_lock);
mutex_lock(&phc->in_srtp->lock);
struct sink_handler *first_sh = phc->sinks->length ? phc->sinks->head->data : NULL;
const struct streamhandler *sh = __determine_handler(phc->in_srtp, first_sh);
@ -2234,7 +2261,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc)
phc->mp.payload.len -= ori_s.len - phc->s.len;
}
mutex_unlock(&phc->in_srtp->in_lock);
mutex_unlock(&phc->in_srtp->lock);
if (ret == 1) {
phc->update = true;
@ -2244,7 +2271,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc)
}
static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink_handler *sh)
{
mutex_lock(&phc->in_srtp->in_lock);
mutex_lock(&phc->in_srtp->lock);
__determine_handler(phc->in_srtp, sh);
// XXX use an array with index instead of if/else
@ -2254,7 +2281,7 @@ static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink
phc->encrypt_func = sh->handler->out->rtcp_crypt;
phc->rtcp_filter = sh->handler->in->rtcp_filter;
}
mutex_unlock(&phc->in_srtp->in_lock);
mutex_unlock(&phc->in_srtp->lock);
}
int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, struct media_packet *mp) {
@ -2263,7 +2290,7 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s
if (!encrypt_func)
return 0x00;
mutex_lock(&out->out_lock);
LOCK(&out->lock);
for (__auto_type l = mp->packets_out.head; l; l = l->next) {
struct codec_packet *p = l->data;
@ -2280,8 +2307,6 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s
ret |= 0x01;
}
mutex_unlock(&out->out_lock);
return ret;
}
@ -2301,7 +2326,7 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
struct endpoint endpoint;
bool ret = false;
mutex_lock(&phc->mp.stream->in_lock);
mutex_lock(&phc->mp.stream->lock);
/* we're OK to (potentially) use the source address of this packet as destination
* in the other direction. */
@ -2330,10 +2355,8 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
/* do not pay attention to source addresses of incoming packets for asymmetric streams */
if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC) || phc->mp.stream->el_flags == EL_OFF) {
PS_SET(phc->mp.stream, CONFIRMED);
mutex_lock(&phc->mp.stream->out_lock);
if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC) && !phc->mp.stream->learned_endpoint.address.family)
phc->mp.stream->learned_endpoint = phc->mp.fsin;
mutex_unlock(&phc->mp.stream->out_lock);
}
/* confirm sinks for unidirectional streams in order to kernelize */
@ -2349,7 +2372,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
/* see if we need to compare the source address with the known endpoint */
if (PS_ISSET2(phc->mp.stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
endpoint = phc->mp.fsin;
mutex_lock(&phc->mp.stream->out_lock);
struct endpoint *ps_endpoint = MEDIA_ISSET(phc->mp.media, ASYMMETRIC) ?
&phc->mp.stream->learned_endpoint : &phc->mp.stream->endpoint;
@ -2365,8 +2387,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
goto update_addr;
}
mutex_unlock(&phc->mp.stream->out_lock);
if (tmp && PS_ISSET(phc->mp.stream, STRICT_SOURCE)) {
ilog(LOG_INFO | LOG_FLAG_LIMIT, "Drop due to strict-source attribute; "
"got %s%s:%d%s, "
@ -2440,7 +2460,6 @@ confirm_now:
PS_SET(phc->mp.stream, CONFIRMED);
update_peerinfo:
mutex_lock(&phc->mp.stream->out_lock);
// if we're during the wait time, check the received address against the previously
// learned address. if they're the same, ignore this packet for learning purposes
if (!wait_time || !phc->mp.stream->learned_endpoint.address.family ||
@ -2459,8 +2478,6 @@ update_peerinfo:
}
}
update_addr:
mutex_unlock(&phc->mp.stream->out_lock);
/* check the destination address of the received packet against what we think our
* local interface to use is */
if (phc->mp.stream->selected_sfd && phc->mp.sfd != phc->mp.stream->selected_sfd) {
@ -2483,7 +2500,7 @@ update_addr:
}
out:
mutex_unlock(&phc->mp.stream->in_lock);
mutex_unlock(&phc->mp.stream->lock);
return ret;
}
@ -2503,9 +2520,7 @@ static void media_packet_kernel_check(struct packet_handler_ctx *phc) {
if (ML_ISSET(phc->mp.media->monologue, DTMF_INJECTION_ACTIVE))
return;
mutex_lock(&phc->mp.stream->in_lock);
kernelize(phc->mp.stream);
mutex_unlock(&phc->mp.stream->in_lock);
}
@ -2842,19 +2857,19 @@ static int stream_packet(struct packet_handler_ctx *phc) {
if (ret)
goto next_mirror;
mutex_lock(&mirror_sink->out_lock);
mutex_lock(&mirror_sink->lock);
if (!mirror_sink->advertised_endpoint.port
|| (is_addr_unspecified(&mirror_sink->advertised_endpoint.address)
&& !is_trickle_ice_address(&mirror_sink->advertised_endpoint)))
{
mutex_unlock(&mirror_sink->out_lock);
mutex_unlock(&mirror_sink->lock);
goto next_mirror;
}
media_socket_dequeue(&mirror_phc.mp, mirror_sink);
mutex_unlock(&mirror_sink->out_lock);
mutex_unlock(&mirror_sink->lock);
next_mirror:
media_socket_dequeue(&mirror_phc.mp, NULL); // just free if anything left
@ -2867,13 +2882,13 @@ next_mirror:
if (ret == -1)
goto err_next;
mutex_lock(&sink->out_lock);
mutex_lock(&sink->lock);
if (!sink->advertised_endpoint.port
|| (is_addr_unspecified(&sink->advertised_endpoint.address)
&& !is_trickle_ice_address(&sink->advertised_endpoint)))
{
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
goto next;
}
@ -2882,7 +2897,7 @@ next_mirror:
else
ret = media_socket_dequeue(&phc->mp, NULL);
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
if (ret == 0)
goto next;
@ -2890,10 +2905,10 @@ next_mirror:
err_next:
ilog(LOG_DEBUG | LOG_FLAG_LIMIT ,"Error when sending message. Error: %s", strerror(errno));
atomic64_inc_na(&sink->stats_in->errors);
mutex_lock(&sink->in_lock);
mutex_lock(&sink->lock);
if (sink->selected_sfd)
atomic64_inc_na(&sink->selected_sfd->local_intf->stats->out.errors);
mutex_unlock(&sink->in_lock);
mutex_unlock(&sink->lock);
RTPE_STATS_INC(errors_user);
goto next;


+ 1
- 7
daemon/mqtt.c View File

@ -340,7 +340,7 @@ static void mqtt_stream_stats_dir(const struct stream_stats *s, JsonBuilder *jso
static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
stream_fd *sfd = ps->selected_sfd;
if (sfd) {
@ -368,10 +368,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_end_object(json);
mutex_unlock(&ps->in_lock);
mutex_lock(&ps->out_lock);
json_builder_set_member_name(json, "egress");
json_builder_begin_object(json);
mqtt_stream_stats_dir(ps->stats_out, json);
@ -388,8 +384,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_end_array(json);
json_builder_end_object(json);
mutex_unlock(&ps->out_lock);
}


+ 2
- 4
daemon/redis.c View File

@ -2465,8 +2465,7 @@ char* redis_encode_json(call_t *c) {
for (__auto_type l = c->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
LOCK(&ps->in_lock);
LOCK(&ps->out_lock);
LOCK(&ps->lock);
snprintf(tmp, sizeof(tmp), "stream-%u", ps->unique_id);
json_builder_set_member_name(builder, tmp);
@ -2500,8 +2499,7 @@ char* redis_encode_json(call_t *c) {
struct packet_stream *ps = l->data;
// XXX these should all go into the above loop
LOCK(&ps->in_lock);
LOCK(&ps->out_lock);
LOCK(&ps->lock);
snprintf(tmp, sizeof(tmp), "stream_sfds-%u", ps->unique_id);
json_builder_set_member_name(builder, tmp);


+ 1
- 3
daemon/rtcp.c View File

@ -1572,7 +1572,7 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
rtcp_ps = ps;
}
LOCK(&ps->in_lock);
LOCK(&ps->lock);
if (!ps->selected_sfd || !rtcp_ps->selected_sfd)
return;
@ -1601,8 +1601,6 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
str rtcp_packet = STR_INIT_GS(sr);
LOCK(&ps->out_lock);
const struct streamhandler *crypt_handler = determine_handler(&transport_protocols[PROTO_RTP_AVP],
media, true);


+ 1
- 1
daemon/ssrc.c View File

@ -650,7 +650,7 @@ void ssrc_collect_metrics(struct call_media *media) {
e->jitter = e->jitter * 1000 / rpt->clock_rate;
}
LOCK(&ps->in_lock);
LOCK(&ps->lock);
RTPE_SAMPLE_SFD(jitter_measured, e->jitter, ps->selected_sfd);
}
}

+ 3
- 6
daemon/t38.c View File

@ -218,8 +218,7 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui
stream_fd *sfd = NULL;
if (ps) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->out_lock);
mutex_lock(&ps->lock);
sfd = ps->selected_sfd;
}
if (sfd && sfd->socket.fd != -1 && ps->endpoint.address.family != NULL) {
@ -231,10 +230,8 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui
else
ilog(LOG_WARN | LOG_FLAG_LIMIT, "Unable to send T.38 UDPTL packet due to lack of "
"socket or stream");
if (ps) {
mutex_unlock(&ps->out_lock);
mutex_unlock(&ps->in_lock);
}
if (ps)
mutex_unlock(&ps->lock);
g_string_free(s, TRUE);


+ 19
- 27
include/call.h View File

@ -418,18 +418,10 @@ struct loop_protector {
* This is done through the various bit flags.
*/
struct packet_stream {
/* Both locks valid only with call->master_lock held in R.
/* Lock valid only with call->master_lock held in R.
* Preempted by call->master_lock held in W.
* If both in/out are to be locked, in_lock must be locked first.
*
* The in_lock protects fields relevant to packet reception on that stream,
* meanwhile the out_lock protects fields relevant to packet egress.
*
* This allows packet handling on multiple ports and streams belonging
* to the same call to happen at the same time.
*/
mutex_t in_lock,
out_lock;
mutex_t lock;
struct call_media *media; /* RO */
call_t *call; /* RO */
@ -438,23 +430,23 @@ struct packet_stream {
struct recording_stream recording; /* LOCK: call->master_lock */
stream_fd_q sfds; /* LOCK: call->master_lock */
stream_fd *selected_sfd; // LOCK: in_lock
stream_fd *selected_sfd; // LOCK: ps->lock
endpoint_t last_local_endpoint;
struct dtls_connection ice_dtls; /* LOCK: in_lock */
sink_handler_q rtp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
sink_handler_q rtcp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
struct dtls_connection ice_dtls; /* LOCK: ps->lock */
sink_handler_q rtp_sinks; /* LOCK: call->master_lock, ps->lock for streamhandler */
sink_handler_q rtcp_sinks; /* LOCK: call->master_lock, ps->lock for streamhandler */
struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */
sink_handler_q rtp_mirrors; /* LOCK: call->master_lock, in_lock for streamhandler */
struct endpoint endpoint; /* LOCK: out_lock */
struct endpoint detected_endpoints[4]; /* LOCK: out_lock */
time_t ep_detect_signal; /* LOCK: out_lock */
sink_handler_q rtp_mirrors; /* LOCK: call->master_lock, ps->ock for streamhandler */
struct endpoint endpoint; /* LOCK: ps->lock */
struct endpoint detected_endpoints[4]; /* LOCK: ps->lock */
time_t ep_detect_signal; /* LOCK: ps->lock */
struct endpoint advertised_endpoint; /* RO */
struct endpoint learned_endpoint; /* LOCK: out_lock */
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */
*ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: out_lock */
unsigned int ssrc_in_idx, /* LOCK: in_lock */
ssrc_out_idx; /* LOCK: out_lock */
struct endpoint learned_endpoint; /* LOCK: ps->lock */
struct crypto_context crypto; /* OUT direction, LOCK: ps->lock */
struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: ps->lock */
*ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: ps->lock */
unsigned int ssrc_in_idx, /* LOCK: ps->lock */
ssrc_out_idx; /* LOCK: ps->lock */
struct send_timer *send_timer; /* RO */
struct jitter_buffer *jb; /* RO */
time_t kernel_time;
@ -467,15 +459,15 @@ struct packet_stream {
enum endpoint_learning el_flags;
#if RTP_LOOP_PROTECT
/* LOCK: in_lock: */
/* LOCK: ps->lock: */
unsigned int lp_idx;
struct loop_protector lp_buf[RTP_LOOP_PACKETS];
unsigned int lp_count;
#endif
X509 *dtls_cert; /* LOCK: in_lock */
X509 *dtls_cert; /* LOCK: ps->lock */
/* in_lock must be held for SETTING these: */
/* ps->lock must be held for SETTING these: */
atomic64 ps_flags;
};


+ 0
- 1
include/media_socket.h View File

@ -291,7 +291,6 @@ INLINE int open_intf_socket(socket_t *r, unsigned int port, const struct local_i
return open_socket(r, SOCK_DGRAM, port, &lif->spec->local_address.addr);
}
void kernelize(struct packet_stream *);
void __unkernelize(struct packet_stream *, const char *);
void unkernelize(struct packet_stream *, const char *);
void __stream_unconfirm(struct packet_stream *, const char *);


Loading…
Cancel
Save