Browse Source

MT#55283 combine in/out_lock

With selected_sfd being protected by in_lock, we pretty much have to
hold at least in_lock everywhere, and end up requiring both locks in
many places. The distinction has become pointless.

Change-Id: Ic0ad976c2d68d9639b9434da7f0e6e9c0d84c185
(cherry picked from commit e03f814855)
(cherry picked from commit 222fcaa4b0)
mr13.3.1
Richard Fuchs 4 months ago
parent
commit
974f334f3c
13 changed files with 124 additions and 138 deletions
  1. +7
    -10
      daemon/call.c
  2. +3
    -4
      daemon/codec.c
  3. +2
    -8
      daemon/dtls.c
  4. +2
    -4
      daemon/ice.c
  5. +6
    -6
      daemon/media_player.c
  6. +78
    -59
      daemon/media_socket.c
  7. +1
    -7
      daemon/mqtt.c
  8. +1
    -2
      daemon/redis.c
  9. +1
    -3
      daemon/rtcp.c
  10. +1
    -1
      daemon/ssrc.c
  11. +3
    -6
      daemon/t38.c
  12. +19
    -27
      include/call.h
  13. +0
    -1
      include/media_socket.h

+ 7
- 10
daemon/call.c View File

@ -190,7 +190,7 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) {
g_autoptr(stream_fd) sfd = NULL; g_autoptr(stream_fd) sfd = NULL;
{ {
LOCK(&ps->in_lock);
LOCK(&ps->lock);
if (ps->selected_sfd) if (ps->selected_sfd)
sfd = obj_get(ps->selected_sfd); sfd = obj_get(ps->selected_sfd);
} }
@ -1025,8 +1025,7 @@ struct packet_stream *__packet_stream_new(call_t *call) {
struct packet_stream *stream; struct packet_stream *stream;
stream = uid_alloc(&call->streams); stream = uid_alloc(&call->streams);
mutex_init(&stream->in_lock);
mutex_init(&stream->out_lock);
mutex_init(&stream->lock);
stream->call = call; stream->call = call;
atomic64_set_na(&stream->last_packet, rtpe_now.tv_sec); atomic64_set_na(&stream->last_packet, rtpe_now.tv_sec);
stream->rtp_stats = rtp_stats_ht_new(); stream->rtp_stats = rtp_stats_ht_new();
@ -1137,21 +1136,19 @@ void call_stream_crypto_reset(struct packet_stream *ps) {
crypto_reset(&ps->crypto); crypto_reset(&ps->crypto);
if (PS_ISSET(ps, RTP)) { if (PS_ISSET(ps, RTP)) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) { for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) {
if (!ps->ssrc_in[u]) // end of list if (!ps->ssrc_in[u]) // end of list
break; break;
atomic_set_na(&ps->ssrc_in[u]->stats->ext_seq, 0); atomic_set_na(&ps->ssrc_in[u]->stats->ext_seq, 0);
} }
mutex_unlock(&ps->in_lock);
mutex_lock(&ps->out_lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) { for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) {
if (!ps->ssrc_out[u]) // end of list if (!ps->ssrc_out[u]) // end of list
break; break;
atomic_set_na(&ps->ssrc_out[u]->stats->ext_seq, 0); atomic_set_na(&ps->ssrc_out[u]->stats->ext_seq, 0);
} }
mutex_unlock(&ps->out_lock);
mutex_unlock(&ps->lock);
} }
} }
@ -1178,16 +1175,16 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) {
} }
if (MEDIA_ISSET(media, DTLS)) { if (MEDIA_ISSET(media, DTLS)) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->lock);
struct dtls_connection *d = dtls_ptr(ps->selected_sfd); struct dtls_connection *d = dtls_ptr(ps->selected_sfd);
if (d && d->init && !d->connected) { if (d && d->init && !d->connected) {
int dret = dtls(ps->selected_sfd, NULL, NULL); int dret = dtls(ps->selected_sfd, NULL, NULL);
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->lock);
if (dret == 1) if (dret == 1)
call_media_unkernelize(media, "DTLS connected"); call_media_unkernelize(media, "DTLS connected");
return CSS_DTLS; return CSS_DTLS;
} }
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->lock);
} }
if (PS_ISSET(ps, PIERCE_NAT) && PS_ISSET(ps, FILLED) && !PS_ISSET(ps, CONFIRMED)) { if (PS_ISSET(ps, PIERCE_NAT) && PS_ISSET(ps, FILLED) && !PS_ISSET(ps, CONFIRMED)) {


+ 3
- 4
daemon/codec.c View File

@ -1320,14 +1320,14 @@ static void __rtcp_timer_run(struct codec_timer *ct) {
struct ssrc_ctx *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,}; struct ssrc_ctx *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,};
if (media->streams.head) { if (media->streams.head) {
struct packet_stream *ps = media->streams.head->data; struct packet_stream *ps = media->streams.head->data;
mutex_lock(&ps->out_lock);
mutex_lock(&ps->lock);
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) { for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
if (!ps->ssrc_out[u]) // end of list if (!ps->ssrc_out[u]) // end of list
break; break;
ssrc_out[u] = ps->ssrc_out[u]; ssrc_out[u] = ps->ssrc_out[u];
ssrc_ctx_hold(ssrc_out[u]); ssrc_ctx_hold(ssrc_out[u]);
} }
mutex_unlock(&ps->out_lock);
mutex_unlock(&ps->lock);
} }
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) { for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
@ -3360,11 +3360,10 @@ static void send_buffered(struct media_packet *mp, unsigned int log_sys) {
if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, mp)) if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, mp))
ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media"); ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");
mutex_lock(&sink->out_lock);
LOCK(&sink->lock);
if (media_socket_dequeue(mp, sink)) if (media_socket_dequeue(mp, sink))
ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT, ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT,
"Error sending buffered media to RTP sink"); "Error sending buffered media to RTP sink");
mutex_unlock(&sink->out_lock);
} }
} }


+ 2
- 8
daemon/dtls.c View File

@ -840,7 +840,7 @@ error:
return -1; return -1;
} }
/* called with call locked in W or R with ps->in_lock held */
/* called with call locked in W or R with ps->lock held */
int dtls(stream_fd *sfd, const str *s, const endpoint_t *fsin) { int dtls(stream_fd *sfd, const str *s, const endpoint_t *fsin) {
struct packet_stream *ps = sfd->stream; struct packet_stream *ps = sfd->stream;
int ret; int ret;
@ -888,22 +888,16 @@ int dtls(stream_fd *sfd, const str *s, const endpoint_t *fsin) {
else if (ret == 1) { else if (ret == 1) {
/* connected! */ /* connected! */
dret = 1; dret = 1;
mutex_lock(&ps->out_lock); // nested lock!
if (dtls_setup_crypto(ps, d)) if (dtls_setup_crypto(ps, d))
{} /* XXX ?? */ {} /* XXX ?? */
mutex_unlock(&ps->out_lock);
if (PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP) && ps->rtcp_sibling if (PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP) && ps->rtcp_sibling
&& MEDIA_ISSET(ps->media, RTCP_MUX) && MEDIA_ISSET(ps->media, RTCP_MUX)
&& ps->rtcp_sibling != ps) && ps->rtcp_sibling != ps)
{ {
// nested locks!
mutex_lock(&ps->rtcp_sibling->in_lock);
mutex_lock(&ps->rtcp_sibling->out_lock);
LOCK(&ps->rtcp_sibling->lock);
if (dtls_setup_crypto(ps->rtcp_sibling, d)) if (dtls_setup_crypto(ps->rtcp_sibling, d))
{} /* XXX ?? */ {} /* XXX ?? */
mutex_unlock(&ps->rtcp_sibling->out_lock);
mutex_unlock(&ps->rtcp_sibling->in_lock);
} }
} }


+ 2
- 4
daemon/ice.c View File

@ -1141,7 +1141,7 @@ found:
t_queue_clear(&compo1); t_queue_clear(&compo1);
} }
/* call(W) or call(R)+agent must be locked - no in_lock or out_lock must be held */
/* call(W) or call(R)+agent must be locked - no ps->lock must be held */
static int __check_valid(struct ice_agent *ag) { static int __check_valid(struct ice_agent *ag) {
struct call_media *media; struct call_media *media;
struct packet_stream *ps; struct packet_stream *ps;
@ -1185,7 +1185,7 @@ static int __check_valid(struct ice_agent *ag) {
ps = l->data; ps = l->data;
pair = k->data; pair = k->data;
mutex_lock(&ps->out_lock);
LOCK(&ps->lock);
if (memcmp(&ps->endpoint, &pair->remote_candidate->endpoint, sizeof(ps->endpoint))) { if (memcmp(&ps->endpoint, &pair->remote_candidate->endpoint, sizeof(ps->endpoint))) {
ilogs(ice, LOG_INFO, "ICE negotiated: new peer for component %u is %s%s%s", ps->component, ilogs(ice, LOG_INFO, "ICE negotiated: new peer for component %u is %s%s%s", ps->component,
FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint))); FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint)));
@ -1195,9 +1195,7 @@ static int __check_valid(struct ice_agent *ag) {
else else
ilogs(ice, LOG_INFO, "ICE negotiated: peer for component %u is %s%s%s", ps->component, ilogs(ice, LOG_INFO, "ICE negotiated: peer for component %u is %s%s%s", ps->component,
FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint))); FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint)));
mutex_unlock(&ps->out_lock);
LOCK(&ps->in_lock);
for (__auto_type m = ps->sfds.head; m; m = m->next) { for (__auto_type m = ps->sfds.head; m; m = m->next) {
sfd = m->data; sfd = m->data;
if (sfd->local_intf != pair->local_intf) if (sfd->local_intf != pair->local_intf)


+ 6
- 6
daemon/media_player.c View File

@ -436,11 +436,11 @@ static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp)
log_info_call(call); log_info_call(call);
rwlock_lock_r(&call->master_lock); rwlock_lock_r(&call->master_lock);
mutex_lock(&st->sink->out_lock);
mutex_lock(&st->sink->lock);
__send_timer_send_common(st, cp); __send_timer_send_common(st, cp);
mutex_unlock(&st->sink->out_lock);
mutex_unlock(&st->sink->lock);
__send_timer_rtcp(st, ssrc_out); __send_timer_rtcp(st, ssrc_out);
ssrc_ctx_put(&ssrc_out); ssrc_ctx_put(&ssrc_out);
@ -595,10 +595,10 @@ retry:;
media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet); media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet);
mutex_lock(&mp->sink->out_lock);
mutex_lock(&mp->sink->lock);
if (media_socket_dequeue(&packet, mp->sink)) if (media_socket_dequeue(&packet, mp->sink))
ilog(LOG_ERR, "Error sending playback media to RTP sink"); ilog(LOG_ERR, "Error sending playback media to RTP sink");
mutex_unlock(&mp->sink->out_lock);
mutex_unlock(&mp->sink->lock);
// schedule our next run // schedule our next run
timeval_add_usec(&mp->next_run, us_dur); timeval_add_usec(&mp->next_run, us_dur);
@ -1076,10 +1076,10 @@ void media_player_add_packet(struct media_player *mp, char *buf, size_t len,
media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet); media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet);
mutex_lock(&mp->sink->out_lock);
mutex_lock(&mp->sink->lock);
if (media_socket_dequeue(&packet, mp->sink)) if (media_socket_dequeue(&packet, mp->sink))
ilog(LOG_ERR, "Error sending playback media to RTP sink"); ilog(LOG_ERR, "Error sending playback media to RTP sink");
mutex_unlock(&mp->sink->out_lock);
mutex_unlock(&mp->sink->lock);
timeval_add_usec(&mp->next_run, us_dur); timeval_add_usec(&mp->next_run, us_dur);
timerthread_obj_schedule_abs(&mp->tt_obj, &mp->next_run); timerthread_obj_schedule_abs(&mp->tt_obj, &mp->next_run);


+ 78
- 59
daemon/media_socket.c View File

@ -1548,9 +1548,7 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st
__auto_type reti = &s->reti; __auto_type reti = &s->reti;
if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) { if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
mutex_lock(&stream->out_lock);
__re_address_translate_ep(&reti->expected_src, MEDIA_ISSET(media, ASYMMETRIC) ? &stream->learned_endpoint : &stream->endpoint); __re_address_translate_ep(&reti->expected_src, MEDIA_ISSET(media, ASYMMETRIC) ? &stream->learned_endpoint : &stream->endpoint);
mutex_unlock(&stream->out_lock);
if (PS_ISSET(stream, STRICT_SOURCE)) if (PS_ISSET(stream, STRICT_SOURCE))
reti->src_mismatch = MSM_DROP; reti->src_mismatch = MSM_DROP;
else if (PS_ISSET(stream, MEDIA_HANDOVER)) else if (PS_ISSET(stream, MEDIA_HANDOVER))
@ -1684,6 +1682,12 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st
return NULL; return NULL;
} }
/**
* The linkage between userspace and kernel module is in the kernelize_one().
*
* Called with stream->lock held.
* sink_handler can be NULL.
*/
__attribute__((nonnull(1, 2, 3))) __attribute__((nonnull(1, 2, 3)))
static const char *kernelize_one(kernelize_state *s, static const char *kernelize_one(kernelize_state *s,
struct packet_stream *stream, struct sink_handler *sink_handler) struct packet_stream *stream, struct sink_handler *sink_handler)
@ -1759,7 +1763,12 @@ static const char *kernelize_one(kernelize_state *s,
if (MEDIA_ISSET(media, ECHO) || sink_handler->attrs.transcoding) if (MEDIA_ISSET(media, ECHO) || sink_handler->attrs.transcoding)
redi->output.ssrc_subst = 1; redi->output.ssrc_subst = 1;
mutex_lock(&sink->out_lock);
// XXX nested lock, avoid possible deadlock. should be reworked not to
// require a nested lock
if (sink != stream && mutex_trylock(&sink->lock)) {
g_free(redi);
return ""; // indicate deadlock
}
__re_address_translate_ep(&redi->output.dst_addr, &sink->endpoint); __re_address_translate_ep(&redi->output.dst_addr, &sink->endpoint);
__re_address_translate_ep(&redi->output.src_addr, &sink->selected_sfd->socket.local); __re_address_translate_ep(&redi->output.src_addr, &sink->selected_sfd->socket.local);
@ -1781,7 +1790,8 @@ static const char *kernelize_one(kernelize_state *s,
handler->out->kernel(&redi->output.encrypt, sink); handler->out->kernel(&redi->output.encrypt, sink);
mutex_unlock(&sink->out_lock);
if (sink != stream)
mutex_unlock(&sink->lock);
if (!redi->output.encrypt.cipher || !redi->output.encrypt.hmac) { if (!redi->output.encrypt.cipher || !redi->output.encrypt.hmac) {
g_free(redi); g_free(redi);
@ -1798,31 +1808,39 @@ static const char *kernelize_one(kernelize_state *s,
return NULL; return NULL;
} }
// helper function for kernelize() // helper function for kernelize()
static void kernelize_one_sink_handler(kernelize_state *s,
// called with stream->lock held
static bool kernelize_one_sink_handler(kernelize_state *s,
struct packet_stream *stream, struct sink_handler *sink_handler) struct packet_stream *stream, struct sink_handler *sink_handler)
{ {
struct packet_stream *sink = sink_handler->sink; struct packet_stream *sink = sink_handler->sink;
if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED)) if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED))
return;
return true;
const char *err = kernelize_one(s, stream, sink_handler); const char *err = kernelize_one(s, stream, sink_handler);
if (err)
if (err) {
if (!*err)
return false; // indicate deadlock
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err); ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err);
}
return true;
} }
/* called with in_lock held */
void kernelize(struct packet_stream *stream) {
/* called with master_lock held */
static void kernelize(struct packet_stream *stream) {
call_t *call = stream->call; call_t *call = stream->call;
const char *nk_warn_msg;
struct call_media *media = stream->media; struct call_media *media = stream->media;
g_auto(kernelize_state) s = {0}; g_auto(kernelize_state) s = {0};
if (PS_ISSET(stream, KERNELIZED))
while (true) {
LOCK(&stream->lock);
// set flag, return if set already
if (PS_SET(stream, KERNELIZED))
return; return;
if (call->recording != NULL && !selected_recording_method->kernel_support) if (call->recording != NULL && !selected_recording_method->kernel_support)
goto no_kernel; goto no_kernel;
if (!kernel.is_wanted) if (!kernel.is_wanted)
goto no_kernel; goto no_kernel;
nk_warn_msg = "interface to kernel module not open";
if (!kernel.is_open) if (!kernel.is_open)
goto no_kernel_warn; goto no_kernel_warn;
if (MEDIA_ISSET(media, GENERATOR)) if (MEDIA_ISSET(media, GENERATOR))
@ -1843,12 +1861,16 @@ void kernelize(struct packet_stream *stream) {
struct sink_handler *sh = l->data; struct sink_handler *sh = l->data;
if (sh->attrs.block_media) if (sh->attrs.block_media)
continue; continue;
kernelize_one_sink_handler(&s, stream, sh);
bool ok = kernelize_one_sink_handler(&s, stream, sh);
if (!ok)
goto restart;
} }
// RTP egress mirrors // RTP egress mirrors
for (__auto_type l = stream->rtp_mirrors.head; l; l = l->next) { for (__auto_type l = stream->rtp_mirrors.head; l; l = l->next) {
struct sink_handler *sh = l->data; struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&s, stream, sh);
bool ok = kernelize_one_sink_handler(&s, stream, sh);
if (!ok)
goto restart;
} }
// RTP -> RTCP sinks // RTP -> RTCP sinks
// record number of RTP destinations up to now // record number of RTP destinations up to now
@ -1858,7 +1880,9 @@ void kernelize(struct packet_stream *stream) {
s.payload_types = NULL; s.payload_types = NULL;
for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) { for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data; struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&s, stream, sh);
bool ok = kernelize_one_sink_handler(&s, stream, sh);
if (!ok)
goto restart;
} }
// mark the start of RTCP outputs // mark the start of RTCP outputs
s.reti.num_rtcp_destinations = s.reti.num_destinations - num_rtp_dests; s.reti.num_rtcp_destinations = s.reti.num_destinations - num_rtp_dests;
@ -1880,15 +1904,25 @@ void kernelize(struct packet_stream *stream) {
} }
stream->kernel_time = rtpe_now.tv_sec; stream->kernel_time = rtpe_now.tv_sec;
PS_SET(stream, KERNELIZED);
return; return;
no_kernel_warn: no_kernel_warn:
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", nk_warn_msg);
ilog(LOG_WARNING, "No support for kernel packet forwarding available "
"(interface to kernel module not open)");
no_kernel: no_kernel:
PS_SET(stream, KERNELIZED);
stream->kernel_time = rtpe_now.tv_sec; stream->kernel_time = rtpe_now.tv_sec;
PS_SET(stream, NO_KERNEL_SUPPORT); PS_SET(stream, NO_KERNEL_SUPPORT);
return;
restart: // handle detected deadlock
rtp_stats_arr_destroy_ptr(s.payload_types);
while ((redi = t_queue_pop_head(&s.outputs)))
g_free(redi);
// try again, releases stream->lock
}
} }
// must be called with appropriate locks (master lock and/or in/out_lock) // must be called with appropriate locks (master lock and/or in/out_lock)
@ -1917,7 +1951,7 @@ struct ssrc_ctx *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_S
} }
/* must be called with in_lock held or call->master_lock held in W */
/* must be called with ps->lock held or call->master_lock held in W */
void __unkernelize(struct packet_stream *p, const char *reason) { void __unkernelize(struct packet_stream *p, const char *reason) {
if (!p->selected_sfd) if (!p->selected_sfd)
return; return;
@ -1963,9 +1997,8 @@ void __stream_unconfirm(struct packet_stream *ps, const char *reason) {
static void stream_unconfirm(struct packet_stream *ps, const char *reason) { static void stream_unconfirm(struct packet_stream *ps, const char *reason) {
if (!ps) if (!ps)
return; return;
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
__stream_unconfirm(ps, reason); __stream_unconfirm(ps, reason);
mutex_unlock(&ps->in_lock);
} }
static void unconfirm_sinks(sink_handler_q *q, const char *reason) { static void unconfirm_sinks(sink_handler_q *q, const char *reason) {
for (__auto_type l = q->head; l; l = l->next) { for (__auto_type l = q->head; l; l = l->next) {
@ -1976,9 +2009,8 @@ static void unconfirm_sinks(sink_handler_q *q, const char *reason) {
void unkernelize(struct packet_stream *ps, const char *reason) { void unkernelize(struct packet_stream *ps, const char *reason) {
if (!ps) if (!ps)
return; return;
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
__unkernelize(ps, reason); __unkernelize(ps, reason);
mutex_unlock(&ps->in_lock);
} }
@ -2015,7 +2047,7 @@ err:
return &__sh_noop; return &__sh_noop;
} }
/* must be called with call->master_lock held in R, and in->in_lock held */
/* must be called with call->master_lock held in R, and in->lock held */
// `sh` can be null // `sh` can be null
static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *sh) { static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *sh) {
const struct transport_protocol *in_proto, *out_proto; const struct transport_protocol *in_proto, *out_proto;
@ -2124,7 +2156,7 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc,
static const char *__stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs, static const char *__stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs,
struct ssrc_ctx **ssrc_in_p, struct ssrc_hash *ssrc_hash) struct ssrc_ctx **ssrc_in_p, struct ssrc_hash *ssrc_hash)
{ {
return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->in_lock, in_srtp->ssrc_in,
return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->lock, in_srtp->ssrc_in,
&in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, SSRC_DIR_INPUT, "ingress"); &in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, SSRC_DIR_INPUT, "ingress");
} }
// check and update output SSRC pointers // check and update output SSRC pointers
@ -2134,12 +2166,12 @@ static const char *__stream_ssrc_out(struct packet_stream *out_srtp, uint32_t ss
bool ssrc_change) bool ssrc_change)
{ {
if (ssrc_change) if (ssrc_change)
return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->out_lock,
return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->lock,
out_srtp->ssrc_out, out_srtp->ssrc_out,
&out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT, &out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT,
"egress (mapped)"); "egress (mapped)");
return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->out_lock,
return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->lock,
out_srtp->ssrc_out, out_srtp->ssrc_out,
&out_srtp->ssrc_out_idx, 0, ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT, &out_srtp->ssrc_out_idx, 0, ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT,
"egress (direct)"); "egress (direct)");
@ -2164,14 +2196,13 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) {
} }
} }
mutex_lock(&phc->mp.stream->in_lock);
LOCK(&phc->mp.stream->lock);
int ret = dtls(phc->mp.sfd, &phc->s, &phc->mp.fsin); int ret = dtls(phc->mp.sfd, &phc->s, &phc->mp.fsin);
if (ret == 1) { if (ret == 1) {
phc->unkernelize = "DTLS connected"; phc->unkernelize = "DTLS connected";
phc->unkernelize_subscriptions = true; phc->unkernelize_subscriptions = true;
ret = 0; ret = 0;
} }
mutex_unlock(&phc->mp.stream->in_lock);
if (!ret) if (!ret)
return 0; return 0;
} }
@ -2196,7 +2227,7 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) {
#if RTP_LOOP_PROTECT #if RTP_LOOP_PROTECT
// returns: 0 = ok, proceed; -1 = duplicate detected, drop packet // returns: 0 = ok, proceed; -1 = duplicate detected, drop packet
static int media_loop_detect(struct packet_handler_ctx *phc) { static int media_loop_detect(struct packet_handler_ctx *phc) {
mutex_lock(&phc->mp.stream->in_lock);
mutex_lock(&phc->mp.stream->lock);
for (int i = 0; i < RTP_LOOP_PACKETS; i++) { for (int i = 0; i < RTP_LOOP_PACKETS; i++) {
if (phc->mp.stream->lp_buf[i].len != phc->s.len) if (phc->mp.stream->lp_buf[i].len != phc->s.len)
@ -2210,7 +2241,7 @@ static int media_loop_detect(struct packet_handler_ctx *phc) {
"to avoid potential loop", "to avoid potential loop",
RTP_LOOP_MAX_COUNT, RTP_LOOP_MAX_COUNT,
FMT_M(endpoint_print_buf(&phc->mp.fsin))); FMT_M(endpoint_print_buf(&phc->mp.fsin)));
mutex_unlock(&phc->mp.stream->in_lock);
mutex_unlock(&phc->mp.stream->lock);
return -1; return -1;
} }
@ -2224,7 +2255,7 @@ static int media_loop_detect(struct packet_handler_ctx *phc) {
memcpy(phc->mp.stream->lp_buf[phc->mp.stream->lp_idx].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT)); memcpy(phc->mp.stream->lp_buf[phc->mp.stream->lp_idx].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT));
phc->mp.stream->lp_idx = (phc->mp.stream->lp_idx + 1) % RTP_LOOP_PACKETS; phc->mp.stream->lp_idx = (phc->mp.stream->lp_idx + 1) % RTP_LOOP_PACKETS;
loop_ok: loop_ok:
mutex_unlock(&phc->mp.stream->in_lock);
mutex_unlock(&phc->mp.stream->lock);
return 0; return 0;
} }
@ -2337,7 +2368,7 @@ static void media_packet_rtp_out(struct packet_handler_ctx *phc, struct sink_han
static int media_packet_decrypt(struct packet_handler_ctx *phc) static int media_packet_decrypt(struct packet_handler_ctx *phc)
{ {
mutex_lock(&phc->in_srtp->in_lock);
mutex_lock(&phc->in_srtp->lock);
struct sink_handler *first_sh = phc->sinks->length ? phc->sinks->head->data : NULL; struct sink_handler *first_sh = phc->sinks->length ? phc->sinks->head->data : NULL;
const struct streamhandler *sh = __determine_handler(phc->in_srtp, first_sh); const struct streamhandler *sh = __determine_handler(phc->in_srtp, first_sh);
@ -2358,7 +2389,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc)
phc->mp.payload.len -= ori_s.len - phc->s.len; phc->mp.payload.len -= ori_s.len - phc->s.len;
} }
mutex_unlock(&phc->in_srtp->in_lock);
mutex_unlock(&phc->in_srtp->lock);
if (ret == 1) { if (ret == 1) {
phc->update = true; phc->update = true;
@ -2368,7 +2399,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc)
} }
static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink_handler *sh) static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink_handler *sh)
{ {
mutex_lock(&phc->in_srtp->in_lock);
mutex_lock(&phc->in_srtp->lock);
__determine_handler(phc->in_srtp, sh); __determine_handler(phc->in_srtp, sh);
// XXX use an array with index instead of if/else // XXX use an array with index instead of if/else
@ -2378,7 +2409,7 @@ static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink
phc->encrypt_func = sh->handler->out->rtcp_crypt; phc->encrypt_func = sh->handler->out->rtcp_crypt;
phc->rtcp_filter = sh->handler->in->rtcp_filter; phc->rtcp_filter = sh->handler->in->rtcp_filter;
} }
mutex_unlock(&phc->in_srtp->in_lock);
mutex_unlock(&phc->in_srtp->lock);
} }
int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, struct media_packet *mp) { int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, struct media_packet *mp) {
@ -2387,7 +2418,7 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s
if (!encrypt_func) if (!encrypt_func)
return 0x00; return 0x00;
mutex_lock(&out->out_lock);
LOCK(&out->lock);
for (__auto_type l = mp->packets_out.head; l; l = l->next) { for (__auto_type l = mp->packets_out.head; l; l = l->next) {
struct codec_packet *p = l->data; struct codec_packet *p = l->data;
@ -2403,8 +2434,6 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s
ret |= 0x01; ret |= 0x01;
} }
mutex_unlock(&out->out_lock);
return ret; return ret;
} }
@ -2424,7 +2453,7 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
struct endpoint endpoint; struct endpoint endpoint;
bool ret = false; bool ret = false;
mutex_lock(&phc->mp.stream->in_lock);
mutex_lock(&phc->mp.stream->lock);
/* we're OK to (potentially) use the source address of this packet as destination /* we're OK to (potentially) use the source address of this packet as destination
* in the other direction. */ * in the other direction. */
@ -2453,10 +2482,8 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
/* do not pay attention to source addresses of incoming packets for asymmetric streams */ /* do not pay attention to source addresses of incoming packets for asymmetric streams */
if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC) || phc->mp.stream->el_flags == EL_OFF) { if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC) || phc->mp.stream->el_flags == EL_OFF) {
PS_SET(phc->mp.stream, CONFIRMED); PS_SET(phc->mp.stream, CONFIRMED);
mutex_lock(&phc->mp.stream->out_lock);
if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC) && !phc->mp.stream->learned_endpoint.address.family) if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC) && !phc->mp.stream->learned_endpoint.address.family)
phc->mp.stream->learned_endpoint = phc->mp.fsin; phc->mp.stream->learned_endpoint = phc->mp.fsin;
mutex_unlock(&phc->mp.stream->out_lock);
} }
/* confirm sinks for unidirectional streams in order to kernelize */ /* confirm sinks for unidirectional streams in order to kernelize */
@ -2472,7 +2499,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
/* see if we need to compare the source address with the known endpoint */ /* see if we need to compare the source address with the known endpoint */
if (PS_ISSET2(phc->mp.stream, STRICT_SOURCE, MEDIA_HANDOVER)) { if (PS_ISSET2(phc->mp.stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
endpoint = phc->mp.fsin; endpoint = phc->mp.fsin;
mutex_lock(&phc->mp.stream->out_lock);
struct endpoint *ps_endpoint = MEDIA_ISSET(phc->mp.media, ASYMMETRIC) ? struct endpoint *ps_endpoint = MEDIA_ISSET(phc->mp.media, ASYMMETRIC) ?
&phc->mp.stream->learned_endpoint : &phc->mp.stream->endpoint; &phc->mp.stream->learned_endpoint : &phc->mp.stream->endpoint;
@ -2488,8 +2514,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
goto update_addr; goto update_addr;
} }
mutex_unlock(&phc->mp.stream->out_lock);
if (tmp && PS_ISSET(phc->mp.stream, STRICT_SOURCE)) { if (tmp && PS_ISSET(phc->mp.stream, STRICT_SOURCE)) {
ilog(LOG_INFO | LOG_FLAG_LIMIT, "Drop due to strict-source attribute; " ilog(LOG_INFO | LOG_FLAG_LIMIT, "Drop due to strict-source attribute; "
"got %s%s:%d%s, " "got %s%s:%d%s, "
@ -2563,7 +2587,6 @@ confirm_now:
PS_SET(phc->mp.stream, CONFIRMED); PS_SET(phc->mp.stream, CONFIRMED);
update_peerinfo: update_peerinfo:
mutex_lock(&phc->mp.stream->out_lock);
// if we're during the wait time, check the received address against the previously // if we're during the wait time, check the received address against the previously
// learned address. if they're the same, ignore this packet for learning purposes // learned address. if they're the same, ignore this packet for learning purposes
if (!wait_time || !phc->mp.stream->learned_endpoint.address.family || if (!wait_time || !phc->mp.stream->learned_endpoint.address.family ||
@ -2582,8 +2605,6 @@ update_peerinfo:
} }
} }
update_addr: update_addr:
mutex_unlock(&phc->mp.stream->out_lock);
/* check the destination address of the received packet against what we think our /* check the destination address of the received packet against what we think our
* local interface to use is */ * local interface to use is */
if (phc->mp.stream->selected_sfd && phc->mp.sfd != phc->mp.stream->selected_sfd) { if (phc->mp.stream->selected_sfd && phc->mp.sfd != phc->mp.stream->selected_sfd) {
@ -2606,7 +2627,7 @@ update_addr:
} }
out: out:
mutex_unlock(&phc->mp.stream->in_lock);
mutex_unlock(&phc->mp.stream->lock);
return ret; return ret;
} }
@ -2626,9 +2647,7 @@ static void media_packet_kernel_check(struct packet_handler_ctx *phc) {
if (ML_ISSET(phc->mp.media->monologue, DTMF_INJECTION_ACTIVE)) if (ML_ISSET(phc->mp.media->monologue, DTMF_INJECTION_ACTIVE))
return; return;
mutex_lock(&phc->mp.stream->in_lock);
kernelize(phc->mp.stream); kernelize(phc->mp.stream);
mutex_unlock(&phc->mp.stream->in_lock);
} }
@ -2967,19 +2986,19 @@ static int stream_packet(struct packet_handler_ctx *phc) {
if (ret) if (ret)
goto next_mirror; goto next_mirror;
mutex_lock(&mirror_sink->out_lock);
mutex_lock(&mirror_sink->lock);
if (!mirror_sink->advertised_endpoint.port if (!mirror_sink->advertised_endpoint.port
|| (is_addr_unspecified(&mirror_sink->advertised_endpoint.address) || (is_addr_unspecified(&mirror_sink->advertised_endpoint.address)
&& !is_trickle_ice_address(&mirror_sink->advertised_endpoint))) && !is_trickle_ice_address(&mirror_sink->advertised_endpoint)))
{ {
mutex_unlock(&mirror_sink->out_lock);
mutex_unlock(&mirror_sink->lock);
goto next_mirror; goto next_mirror;
} }
media_socket_dequeue(&mirror_phc.mp, mirror_sink); media_socket_dequeue(&mirror_phc.mp, mirror_sink);
mutex_unlock(&mirror_sink->out_lock);
mutex_unlock(&mirror_sink->lock);
next_mirror: next_mirror:
media_socket_dequeue(&mirror_phc.mp, NULL); // just free if anything left media_socket_dequeue(&mirror_phc.mp, NULL); // just free if anything left
@ -2992,13 +3011,13 @@ next_mirror:
if (ret == -1) if (ret == -1)
goto err_next; goto err_next;
mutex_lock(&sink->out_lock);
mutex_lock(&sink->lock);
if (!sink->advertised_endpoint.port if (!sink->advertised_endpoint.port
|| (is_addr_unspecified(&sink->advertised_endpoint.address) || (is_addr_unspecified(&sink->advertised_endpoint.address)
&& !is_trickle_ice_address(&sink->advertised_endpoint))) && !is_trickle_ice_address(&sink->advertised_endpoint)))
{ {
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
goto next; goto next;
} }
@ -3007,7 +3026,7 @@ next_mirror:
else else
ret = media_socket_dequeue(&phc->mp, NULL); ret = media_socket_dequeue(&phc->mp, NULL);
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
if (ret == 0) if (ret == 0)
goto next; goto next;
@ -3015,10 +3034,10 @@ next_mirror:
err_next: err_next:
ilog(LOG_DEBUG | LOG_FLAG_LIMIT ,"Error when sending message. Error: %s", strerror(errno)); ilog(LOG_DEBUG | LOG_FLAG_LIMIT ,"Error when sending message. Error: %s", strerror(errno));
atomic64_inc_na(&sink->stats_in->errors); atomic64_inc_na(&sink->stats_in->errors);
mutex_lock(&sink->in_lock);
mutex_lock(&sink->lock);
if (sink->selected_sfd) if (sink->selected_sfd)
atomic64_inc_na(&sink->selected_sfd->local_intf->stats->out.errors); atomic64_inc_na(&sink->selected_sfd->local_intf->stats->out.errors);
mutex_unlock(&sink->in_lock);
mutex_unlock(&sink->lock);
RTPE_STATS_INC(errors_user); RTPE_STATS_INC(errors_user);
goto next; goto next;


+ 1
- 7
daemon/mqtt.c View File

@ -340,7 +340,7 @@ static void mqtt_stream_stats_dir(const struct stream_stats *s, JsonBuilder *jso
static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
stream_fd *sfd = ps->selected_sfd; stream_fd *sfd = ps->selected_sfd;
if (sfd) { if (sfd) {
@ -382,10 +382,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_end_object(json); json_builder_end_object(json);
mutex_unlock(&ps->in_lock);
mutex_lock(&ps->out_lock);
json_builder_set_member_name(json, "egress"); json_builder_set_member_name(json, "egress");
json_builder_begin_object(json); json_builder_begin_object(json);
mqtt_stream_stats_dir(ps->stats_out, json); mqtt_stream_stats_dir(ps->stats_out, json);
@ -402,8 +398,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_end_array(json); json_builder_end_array(json);
json_builder_end_object(json); json_builder_end_object(json);
mutex_unlock(&ps->out_lock);
} }


+ 1
- 2
daemon/redis.c View File

@ -2511,8 +2511,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c, void **to_free) {
for (__auto_type l = c->streams.head; l; l = l->next) { for (__auto_type l = c->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data; struct packet_stream *ps = l->data;
LOCK(&ps->in_lock);
LOCK(&ps->out_lock);
LOCK(&ps->lock);
snprintf(tmp, sizeof(tmp), "stream-%u", ps->unique_id); snprintf(tmp, sizeof(tmp), "stream-%u", ps->unique_id);
inner = parser->dict_add_dict_dup(root, tmp); inner = parser->dict_add_dict_dup(root, tmp);


+ 1
- 3
daemon/rtcp.c View File

@ -1568,7 +1568,7 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
rtcp_ps = ps; rtcp_ps = ps;
} }
LOCK(&ps->in_lock);
LOCK(&ps->lock);
if (!ps->selected_sfd || !rtcp_ps->selected_sfd) if (!ps->selected_sfd || !rtcp_ps->selected_sfd)
return; return;
@ -1597,8 +1597,6 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
str rtcp_packet = STR_GS(sr); str rtcp_packet = STR_GS(sr);
LOCK(&ps->out_lock);
const struct streamhandler *crypt_handler = determine_handler(&transport_protocols[PROTO_RTP_AVP], const struct streamhandler *crypt_handler = determine_handler(&transport_protocols[PROTO_RTP_AVP],
media, true); media, true);


+ 1
- 1
daemon/ssrc.c View File

@ -756,7 +756,7 @@ void ssrc_collect_metrics(struct call_media *media) {
e->jitter = e->jitter * 1000 / rpt->clock_rate; e->jitter = e->jitter * 1000 / rpt->clock_rate;
} }
LOCK(&ps->in_lock);
LOCK(&ps->lock);
RTPE_SAMPLE_SFD(jitter_measured, e->jitter, ps->selected_sfd); RTPE_SAMPLE_SFD(jitter_measured, e->jitter, ps->selected_sfd);
} }
} }

+ 3
- 6
daemon/t38.c View File

@ -218,8 +218,7 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui
stream_fd *sfd = NULL; stream_fd *sfd = NULL;
if (ps) { if (ps) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->out_lock);
mutex_lock(&ps->lock);
sfd = ps->selected_sfd; sfd = ps->selected_sfd;
} }
if (sfd && sfd->socket.fd != -1 && ps->endpoint.address.family != NULL) { if (sfd && sfd->socket.fd != -1 && ps->endpoint.address.family != NULL) {
@ -231,10 +230,8 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui
else else
ilog(LOG_WARN | LOG_FLAG_LIMIT, "Unable to send T.38 UDPTL packet due to lack of " ilog(LOG_WARN | LOG_FLAG_LIMIT, "Unable to send T.38 UDPTL packet due to lack of "
"socket or stream"); "socket or stream");
if (ps) {
mutex_unlock(&ps->out_lock);
mutex_unlock(&ps->in_lock);
}
if (ps)
mutex_unlock(&ps->lock);
g_string_free(s, TRUE); g_string_free(s, TRUE);


+ 19
- 27
include/call.h View File

@ -406,18 +406,10 @@ TYPED_GHASHTABLE_PROTO(rtp_stats_ht, void, struct rtp_stats)
* This is done through the various bit flags. * This is done through the various bit flags.
*/ */
struct packet_stream { struct packet_stream {
/* Both locks valid only with call->master_lock held in R.
/* Lock valid only with call->master_lock held in R.
* Preempted by call->master_lock held in W. * Preempted by call->master_lock held in W.
* If both in/out are to be locked, in_lock must be locked first.
*
* The in_lock protects fields relevant to packet reception on that stream,
* meanwhile the out_lock protects fields relevant to packet egress.
*
* This allows packet handling on multiple ports and streams belonging
* to the same call to happen at the same time.
*/ */
mutex_t in_lock,
out_lock;
mutex_t lock;
struct call_media *media; /* RO */ struct call_media *media; /* RO */
call_t *call; /* RO */ call_t *call; /* RO */
@ -426,23 +418,23 @@ struct packet_stream {
struct recording_stream recording; /* LOCK: call->master_lock */ struct recording_stream recording; /* LOCK: call->master_lock */
stream_fd_q sfds; /* LOCK: call->master_lock */ stream_fd_q sfds; /* LOCK: call->master_lock */
stream_fd *selected_sfd; // LOCK: in_lock
stream_fd *selected_sfd; // LOCK: ps->lock
endpoint_t last_local_endpoint; endpoint_t last_local_endpoint;
struct dtls_connection ice_dtls; /* LOCK: in_lock */
sink_handler_q rtp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
sink_handler_q rtcp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
struct dtls_connection ice_dtls; /* LOCK: ps->lock */
sink_handler_q rtp_sinks; /* LOCK: call->master_lock, ps->lock for streamhandler */
sink_handler_q rtcp_sinks; /* LOCK: call->master_lock, ps->lock for streamhandler */
struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */ struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */
sink_handler_q rtp_mirrors; /* LOCK: call->master_lock, in_lock for streamhandler */
struct endpoint endpoint; /* LOCK: out_lock */
struct endpoint detected_endpoints[4]; /* LOCK: out_lock */
time_t ep_detect_signal; /* LOCK: out_lock */
sink_handler_q rtp_mirrors; /* LOCK: call->master_lock, ps->ock for streamhandler */
struct endpoint endpoint; /* LOCK: ps->lock */
struct endpoint detected_endpoints[4]; /* LOCK: ps->lock */
time_t ep_detect_signal; /* LOCK: ps->lock */
struct endpoint advertised_endpoint; /* RO */ struct endpoint advertised_endpoint; /* RO */
struct endpoint learned_endpoint; /* LOCK: out_lock */
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */
*ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: out_lock */
unsigned int ssrc_in_idx, /* LOCK: in_lock */
ssrc_out_idx; /* LOCK: out_lock */
struct endpoint learned_endpoint; /* LOCK: ps->lock */
struct crypto_context crypto; /* OUT direction, LOCK: ps->lock */
struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: ps->lock */
*ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: ps->lock */
unsigned int ssrc_in_idx, /* LOCK: ps->lock */
ssrc_out_idx; /* LOCK: ps->lock */
struct send_timer *send_timer; /* RO */ struct send_timer *send_timer; /* RO */
struct jitter_buffer *jb; /* RO */ struct jitter_buffer *jb; /* RO */
time_t kernel_time; time_t kernel_time;
@ -455,15 +447,15 @@ struct packet_stream {
enum endpoint_learning el_flags; enum endpoint_learning el_flags;
#if RTP_LOOP_PROTECT #if RTP_LOOP_PROTECT
/* LOCK: in_lock: */
/* LOCK: ps->lock: */
unsigned int lp_idx; unsigned int lp_idx;
struct loop_protector lp_buf[RTP_LOOP_PACKETS]; struct loop_protector lp_buf[RTP_LOOP_PACKETS];
unsigned int lp_count; unsigned int lp_count;
#endif #endif
X509 *dtls_cert; /* LOCK: in_lock */
X509 *dtls_cert; /* LOCK: ps->lock */
/* in_lock must be held for SETTING these: */
/* ps->lock must be held for SETTING these: */
atomic64 ps_flags; atomic64 ps_flags;
}; };


+ 0
- 1
include/media_socket.h View File

@ -310,7 +310,6 @@ void free_sfd_intf_list(struct sfd_intf_list *il);
void free_release_sfd_intf_list(struct sfd_intf_list *il); void free_release_sfd_intf_list(struct sfd_intf_list *il);
void free_socket_intf_list(struct socket_intf_list *il); void free_socket_intf_list(struct socket_intf_list *il);
void kernelize(struct packet_stream *);
void __unkernelize(struct packet_stream *, const char *); void __unkernelize(struct packet_stream *, const char *);
void unkernelize(struct packet_stream *, const char *); void unkernelize(struct packet_stream *, const char *);
void __stream_unconfirm(struct packet_stream *, const char *); void __stream_unconfirm(struct packet_stream *, const char *);


Loading…
Cancel
Save