Browse Source

MT#55283 force sequential socket reads

Due to multiple threads polling all sockets for read events, it's
possible for one socket to receive a read event in one thread, then
immediately receive another read event in another thread, resulting in
two threads reading packets from the same socket at the same time.

While this is perfectly valid and correctly handled by mutex etc, it can
result in packets being processed out of order. In media passthrough
scenarios which don't do sequencing this can result in packets being
reordered.

Using a simple atomic counter we can ensure that only one thread is
reading from any one socket at a time.

Relevant to #1638

Change-Id: I406491d6ae5e13e618e153ba5463fd9169636016
pull/1665/head
Richard Fuchs 3 years ago
parent
commit
fdc9b14509
2 changed files with 13 additions and 0 deletions
  1. +12
    -0
      daemon/media_socket.c
  2. +1
    -0
      include/media_socket.h

+ 12
- 0
daemon/media_socket.c View File

@ -3009,6 +3009,11 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
if (sfd->socket.fd != fd)
return;
// +1 to active read events. If it was zero then we handle it. If it was non-zero,
// another thread is already handling this socket and will process our event.
if (g_atomic_int_add(&sfd->active_read_events, 1) != 0)
return;
ca = sfd->call ? : NULL;
log_info_stream_fd(sfd);
@ -3022,6 +3027,8 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
return;
}
restart:
for (iters = 0; ; iters++) {
#if MAX_RECV_ITERS
if (iters >= MAX_RECV_ITERS) {
@ -3075,6 +3082,11 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
update = true;
}
// -1 active read events. If it's non-zero, another thread has received a read event,
// and we must handle it here.
if (!g_atomic_int_dec_and_test(&sfd->active_read_events))
goto restart;
// no strike
if (strikes > 0)
g_atomic_int_compare_and_exchange(&sfd->error_strikes, strikes, strikes - 1);


+ 1
- 0
include/media_socket.h View File

@ -229,6 +229,7 @@ struct stream_fd {
struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */
struct dtls_connection dtls; /* LOCK: stream->in_lock */
int error_strikes;
int active_read_events;
struct poller *poller;
};


Loading…
Cancel
Save