diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 3e980173b..3a9163dcb 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -2754,6 +2754,11 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { if (sfd->socket.fd != fd) return; + // +1 to active read events. If it was zero then we handle it. If it was non-zero, + // another thread is already handling this socket and will process our event. + if (g_atomic_int_add(&sfd->active_read_events, 1) != 0) + return; + ca = sfd->call ? : NULL; log_info_stream_fd(sfd); @@ -2767,6 +2772,8 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { return; } +restart: + for (iters = 0; ; iters++) { #if MAX_RECV_ITERS if (iters >= MAX_RECV_ITERS) { @@ -2820,6 +2827,11 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { update = true; } + // -1 active read events. If it's non-zero, another thread has received a read event, + // and we must handle it here. + if (!g_atomic_int_dec_and_test(&sfd->active_read_events)) + goto restart; + // no strike if (strikes > 0) g_atomic_int_compare_and_exchange(&sfd->error_strikes, strikes, strikes - 1); diff --git a/include/media_socket.h b/include/media_socket.h index 0ba80b7ce..25adb8044 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -232,6 +232,7 @@ struct stream_fd { struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */ struct dtls_connection dtls; /* LOCK: stream->in_lock */ int error_strikes; + int active_read_events; struct poller *poller; };