From 148057400121a19a9891f22c4e3881f6736adbd4 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Mon, 2 Jun 2025 15:56:36 -0400 Subject: [PATCH] MT#55283 overhaul WS locking logic Multiple writers may operate on a single WS connection simultaneously. Make sure they don't get in each other's way while constructing their messages/responses by holding the lock from the beginning of the response until the point when it's fully ready. This fixes a problem of parts of multiple messages getting mixed up with each other. Change-Id: If84224fc06b423cd65c12981a5b09ee99b121df2 --- daemon/janus.c | 10 +-- daemon/websocket.c | 176 ++++++++++++++++++++++++++++---------------- include/websocket.h | 19 ++--- 3 files changed, 126 insertions(+), 79 deletions(-) diff --git a/daemon/janus.c b/daemon/janus.c index 09949fc15..124eefa63 100644 --- a/daemon/janus.c +++ b/daemon/janus.c @@ -132,11 +132,9 @@ static void janus_send_json_sync_response(struct websocket_message *wm, JsonBuil char *result = glib_json_print(builder); if (wm->method == M_WEBSOCKET) - websocket_write_text(wm->wc, result, true); - else { - websocket_http_response(wm->wc, code, "application/json", strlen(result)); - websocket_write_http(wm->wc, result, true); - } + websocket_write_text(wm->wc, result); + else + websocket_http_complete(wm->wc, code, "application/json", strlen(result), result); g_free(result); } @@ -154,7 +152,7 @@ static void janus_send_json_async(struct janus_session *session, JsonBuilder *bu struct websocket_conn *wc; while (t_hash_table_iter_next(&iter, NULL, &wc)) { // lock order constraint: janus_session lock first, websocket_conn lock second - websocket_write_text(wc, result, true); + websocket_write_text(wc, result); } g_free(result); diff --git a/daemon/websocket.c b/daemon/websocket.c index af2170487..c27a0dfc7 100644 --- a/daemon/websocket.c +++ b/daemon/websocket.c @@ -96,7 +96,7 @@ static void websocket_output_free(struct websocket_output *wo) { } -// appends to output buffer without triggering a response - unlocked +// appends to output buffer without triggering a response - non-locking static void __websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len) { struct websocket_output *wo = t_queue_peek_tail(&wc->output_q); @@ -111,17 +111,8 @@ static void __websocket_queue_raw(struct websocket_conn *wc, const char *msg, si } -// appends to output buffer without triggering a response -void websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len) { - LOCK(&wc->lock); - __websocket_queue_raw(wc, msg, len); -} - - // num bytes in output buffer -size_t websocket_queue_len(struct websocket_conn *wc) { - LOCK(&wc->lock); - +static size_t __websocket_queue_len(struct websocket_conn *wc) { size_t ret = 0; for (__auto_type l = wc->output_q.head; l; l = l->next) { struct websocket_output *wo = l->data; @@ -132,19 +123,30 @@ size_t websocket_queue_len(struct websocket_conn *wc) { } -// adds data to output buffer (can be null) and optionally triggers specified response -void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len, - enum lws_write_protocol protocol, bool done) +// adds data to output buffer (can be null) and optionally triggers specified response - non-locking +static void __websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len, + enum lws_write_protocol protocol) { - mutex_lock(&wc->lock); __websocket_queue_raw(wc, msg, len); struct websocket_output *wo = t_queue_peek_tail(&wc->output_q); wo->protocol = protocol; t_queue_push_tail(&wc->output_q, websocket_output_new()); +} - mutex_unlock(&wc->lock); +// called after all writes are done - non-locking +// may intermittently release wc->lock +static void __websocket_write_done(struct websocket_conn *wc) { + while (true) { + // If this connection is already closed, don't trigger + // a wakeup of the service thread and don't request a + // writeable callback. We can't write anyway if the + // connection is already closed. Prevents a deadlock + // between acquiring the service lock here and the + // websocket_conn_cleanup called from within the service + // loop. + if (!wc->wsi) + return; - if (done) { // Sadly lws_callback_on_writable() doesn't do any internal // locking, therefore we must protect it against a concurrently // running lws_service(), as well as against other threads @@ -172,40 +174,62 @@ void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len, mutex_lock(&websocket_callback_lock); lws_cancel_service(websocket_context); - mutex_lock(&websocket_service_lock); + if (mutex_trylock(&websocket_service_lock)) { + // Wrong lock order: service_lock should be the outer lock + // and wc->lock should be the inner lock. The connection may + // be in the process of being closed (websocket_conn_cleanup). + // Release wc->lock and try again. + mutex_unlock(&websocket_callback_lock); + mutex_unlock(&wc->lock); + sched_yield(); + mutex_lock(&wc->lock); + continue; + } lws_callback_on_writable(wc->wsi); mutex_unlock(&websocket_service_lock); mutex_unlock(&websocket_callback_lock); + + break; } } -// adds data to output buffer (can be null) and triggers specified response: http or binary websocket -void websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len, bool done) { - websocket_write_raw(wc, msg, len, LWS_WRITE_HTTP, done); +// adds data to output buffer (can be null) and triggers specified response: http or binary websocket - non-locking +void __websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len) { + __websocket_write_raw(wc, msg, len, LWS_WRITE_HTTP); } -void websocket_write_http(struct websocket_conn *wc, const char *msg, bool done) { - websocket_write_http_len(wc, msg, msg ? strlen(msg) : 0, done); +void __websocket_write_http(struct websocket_conn *wc, const char *msg) { + __websocket_write_http_len(wc, msg, msg ? strlen(msg) : 0); } -void websocket_write_text(struct websocket_conn *wc, const char *msg, bool done) { - websocket_write_raw(wc, msg, strlen(msg), LWS_WRITE_TEXT, done); +void __websocket_write_text(struct websocket_conn *wc, const char *msg) { + __websocket_write_raw(wc, msg, strlen(msg), LWS_WRITE_TEXT); } -void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len, bool done) { - websocket_write_raw(wc, msg, len, LWS_WRITE_BINARY, done); +void __websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len) { + __websocket_write_raw(wc, msg, len, LWS_WRITE_BINARY); } - -void websocket_write_next(struct websocket_conn *wc) { - LOCK(&wc->lock); - t_queue_push_tail(&wc->output_q, websocket_output_new()); +// singe shot writes with locking +void websocket_write_text(struct websocket_conn *wc, const char *msg) { + { + LOCK(&wc->lock); + __websocket_write_text(wc, msg); + __websocket_write_done(wc); + } +} +static void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len) { + { + LOCK(&wc->lock); + __websocket_write_binary(wc, msg, len); + __websocket_write_done(wc); + } } static const char *websocket_echo_process(struct websocket_message *wm) { ilogs(http, LOG_DEBUG, "Returning %lu bytes websocket echo from %s", (unsigned long) wm->body->len, endpoint_print_buf(&wm->wc->endpoint)); - websocket_write_binary(wm->wc, wm->body->str, wm->body->len, true); + websocket_write_binary(wm->wc, wm->body->str, wm->body->len); return NULL; } @@ -306,6 +330,7 @@ static int websocket_dequeue(struct websocket_conn *wc) { bool is_http = false; mutex_lock(&wc->lock); + struct websocket_output *wo; struct lws *wsi = wc->wsi; while ((wo = t_queue_pop_head(&wc->output_q))) { @@ -360,11 +385,9 @@ next: return 0; } -void websocket_http_response(struct websocket_conn *wc, int status, const char *content_type, +void __websocket_http_response(struct websocket_conn *wc, int status, const char *content_type, ssize_t content_length) { - LOCK(&wc->lock); - struct websocket_output *wo = t_queue_peek_tail(&wc->output_q); wo->http_status = status; @@ -374,8 +397,12 @@ void websocket_http_response(struct websocket_conn *wc, int status, const char * void websocket_http_complete(struct websocket_conn *wc, int status, const char *content_type, ssize_t content_length, const char *content) { - websocket_http_response(wc, status, content_type, content_length); - websocket_write_http(wc, content, true); + { + LOCK(&wc->lock); + __websocket_http_response(wc, status, content_type, content_length); + __websocket_write_http(wc, content); + __websocket_write_done(wc); + } } @@ -426,13 +453,13 @@ static const char *websocket_http_metrics(struct websocket_message *wm) { // adds printf string to output buffer without triggering response -static size_t websocket_queue_printf(struct cli_writer *cw, const char *fmt, ...) { +static size_t __websocket_queue_printf(struct cli_writer *cw, const char *fmt, ...) { va_list va; va_start(va, fmt); char *s = g_strdup_vprintf(fmt, va); size_t ret = strlen(s); va_end(va); - websocket_queue_raw(cw->ptr, s, ret); + __websocket_queue_raw(cw->ptr, s, ret); g_free(s); return ret; } @@ -447,14 +474,19 @@ static const char *websocket_http_cli(struct websocket_message *wm) { str uri_cmd = STR(uri); struct cli_writer cw = { - .cw_printf = websocket_queue_printf, + .cw_printf = __websocket_queue_printf, .ptr = wm->wc, }; - cli_handle(&uri_cmd, &cw); - size_t len = websocket_queue_len(wm->wc); + { + LOCK(&wm->wc->lock); + cli_handle(&uri_cmd, &cw); + size_t len = __websocket_queue_len(wm->wc); + __websocket_http_response(wm->wc, 200, "text/plain", len); + __websocket_write_http(wm->wc, NULL); + __websocket_write_done(wm->wc); + } - websocket_http_complete(wm->wc, 200, "text/plain", len, NULL); return NULL; } @@ -463,14 +495,19 @@ static const char *websocket_http_cli_post(struct websocket_message *wm) { ilogs(http, LOG_DEBUG, "Responding to POST /cli"); struct cli_writer cw = { - .cw_printf = websocket_queue_printf, + .cw_printf = __websocket_queue_printf, .ptr = wm->wc, }; - cli_handle(&STR_LEN(wm->body->str, wm->body->len), &cw); - size_t len = websocket_queue_len(wm->wc); + { + LOCK(&wm->wc->lock); + cli_handle(&STR_LEN(wm->body->str, wm->body->len), &cw); + size_t len = __websocket_queue_len(wm->wc); + __websocket_http_response(wm->wc, 200, "text/plain", len); + __websocket_write_http(wm->wc, NULL); + __websocket_write_done(wm->wc); + } - websocket_http_complete(wm->wc, 200, "text/plain", len, NULL); return NULL; } @@ -481,12 +518,17 @@ static const char *websocket_cli_process(struct websocket_message *wm) { str uri_cmd = STR_LEN(wm->body->str, wm->body->len); struct cli_writer cw = { - .cw_printf = websocket_queue_printf, + .cw_printf = __websocket_queue_printf, .ptr = wm->wc, }; - cli_handle(&uri_cmd, &cw); - websocket_write_binary(wm->wc, NULL, 0, true); + { + LOCK(&wm->wc->lock); + cli_handle(&uri_cmd, &cw); + __websocket_write_binary(wm->wc, NULL, 0); + __websocket_write_done(wm->wc); + } + return NULL; } @@ -495,25 +537,33 @@ static void websocket_ng_send_ws(str *cookie, str *body, const endpoint_t *sin, void *p1) { struct websocket_conn *wc = p1; - if (cookie) { - websocket_queue_raw(wc, cookie->s, cookie->len); - websocket_queue_raw(wc, " ", 1); + { + LOCK(&wc->lock); + if (cookie) { + __websocket_queue_raw(wc, cookie->s, cookie->len); + __websocket_queue_raw(wc, " ", 1); + } + __websocket_queue_raw(wc, body->s, body->len); + __websocket_write_binary(wc, NULL, 0); + __websocket_write_done(wc); } - websocket_queue_raw(wc, body->s, body->len); - websocket_write_binary(wc, NULL, 0, true); } static void websocket_ng_send_http(str *cookie, str *body, const endpoint_t *sin, const sockaddr_t *from, void *p1) { struct websocket_conn *wc = p1; - websocket_http_response(wc, 200, "application/x-rtpengine-ng", - (cookie ? (cookie->len + 1) : 0) + body->len); - if (cookie) { - websocket_queue_raw(wc, cookie->s, cookie->len); - websocket_queue_raw(wc, " ", 1); + { + LOCK(&wc->lock); + __websocket_http_response(wc, 200, "application/x-rtpengine-ng", + (cookie ? (cookie->len + 1) : 0) + body->len); + if (cookie) { + __websocket_queue_raw(wc, cookie->s, cookie->len); + __websocket_queue_raw(wc, " ", 1); + } + __websocket_queue_raw(wc, body->s, body->len); + __websocket_write_http(wc, NULL); + __websocket_write_done(wc); } - websocket_queue_raw(wc, body->s, body->len); - websocket_write_http(wc, NULL, true); } static void __ng_buf_free(struct websocket_ng_buf *buf) { @@ -715,6 +765,8 @@ static void websocket_conn_cleanup(struct websocket_conn *wc) { // wait until all remaining tasks are finished mutex_lock(&wc->lock); + + wc->wsi = NULL; while (wc->jobs) cond_wait(&wc->cond, &wc->lock); diff --git a/include/websocket.h b/include/websocket.h index 0d7572310..057e9293d 100644 --- a/include/websocket.h +++ b/include/websocket.h @@ -39,18 +39,15 @@ int websocket_init(void); void websocket_start(void); void websocket_stop(void); -// appends to output buffer without triggering a response -void websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len); -// adds data to output buffer (can be null) and optionally triggers specified response -void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len, - enum lws_write_protocol protocol, bool done); // adds data to output buffer (can be null) and triggers specified response: http or binary websocket -void websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len, bool done); -void websocket_write_http(struct websocket_conn *wc, const char *msg, bool done); -void websocket_write_text(struct websocket_conn *wc, const char *msg, bool done); -void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len, bool done); -// num bytes in output buffer -size_t websocket_queue_len(struct websocket_conn *wc); +//void websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len); +//void websocket_write_http(struct websocket_conn *wc, const char *msg); +void websocket_write_text(struct websocket_conn *wc, const char *msg); +//void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len); + +// single shot HTTP response +void websocket_http_complete(struct websocket_conn *wc, int status, const char *content_type, + ssize_t content_length, const char *content); // write HTTP response headers void websocket_http_response(struct websocket_conn *wc, int status, const char *content_type,