From c6d61a3ebf13117859e7f7a01ff03c56357e62e3 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 3 May 2023 09:25:14 -0400 Subject: [PATCH] MT#55283 force sequential socket reads Due to multiple threads polling all sockets for read events, it's possible for one socket to receive a read event in one thread, then immediately receive another read event in another thread, resulting in two threads reading packets from the same socket at the same time. While this is perfectly valid and correctly handled by mutex etc, it can result in packets being processed out of order. In media passthrough scenarios which don't do sequencing this can result in packets being reordered. Using a simple atomic counter we can ensure that only one thread is reading from any one socket at a time. Relevant to #1638 Change-Id: I406491d6ae5e13e618e153ba5463fd9169636016 (cherry picked from commit fdc9b1450913fb236edaa0f2dabe467cc755a08a) --- daemon/media_socket.c | 12 ++++++++++++ include/media_socket.h | 1 + 2 files changed, 13 insertions(+) diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 3e980173b..3a9163dcb 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -2754,6 +2754,11 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { if (sfd->socket.fd != fd) return; + // +1 to active read events. If it was zero then we handle it. If it was non-zero, + // another thread is already handling this socket and will process our event. + if (g_atomic_int_add(&sfd->active_read_events, 1) != 0) + return; + ca = sfd->call ? : NULL; log_info_stream_fd(sfd); @@ -2767,6 +2772,8 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { return; } +restart: + for (iters = 0; ; iters++) { #if MAX_RECV_ITERS if (iters >= MAX_RECV_ITERS) { @@ -2820,6 +2827,11 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { update = true; } + // -1 active read events. If it's non-zero, another thread has received a read event, + // and we must handle it here. + if (!g_atomic_int_dec_and_test(&sfd->active_read_events)) + goto restart; + // no strike if (strikes > 0) g_atomic_int_compare_and_exchange(&sfd->error_strikes, strikes, strikes - 1); diff --git a/include/media_socket.h b/include/media_socket.h index 0ba80b7ce..25adb8044 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -232,6 +232,7 @@ struct stream_fd { struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */ struct dtls_connection dtls; /* LOCK: stream->in_lock */ int error_strikes; + int active_read_events; struct poller *poller; };