diff --git a/daemon/websocket.c b/daemon/websocket.c index a965d2ea3..35165b7df 100644 --- a/daemon/websocket.c +++ b/daemon/websocket.c @@ -52,6 +52,8 @@ struct websocket_ng_buf { static GQueue websocket_vhost_configs; static struct lws_context *websocket_context; static GThreadPool *websocket_threads; +static mutex_t websocket_callback_lock = MUTEX_STATIC_INIT; +static mutex_t websocket_service_lock = MUTEX_STATIC_INIT; static struct websocket_message *websocket_message_new(struct websocket_conn *wc) { @@ -133,12 +135,42 @@ void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len, wo->protocol = protocol; g_queue_push_tail(&wc->output_q, websocket_output_new()); + mutex_unlock(&wc->lock); + if (done) { - lws_callback_on_writable(wc->wsi); + // Sadly lws_callback_on_writable() doesn't do any internal + // locking, therefore we must protect it against a concurrently + // running lws_service(), as well as against other threads + // invoking lws_callback_on_writable(). + // + // Acquire the callback lock first, which is normally unlocked, + // then wake up the service thread and try to break out of + // lws_service(). The service thread holds the service lock + // while lws_service() is executing and releases it as soon as + // lws_service() is done. We therefore try to acquire the + // service lock here, which blocks us until lws_service() is + // actually done. At this point the service thread will try to + // acquire the callback lock, which is still held by us here, + // and so the service thread will block until we are done + // calling lws_callback_on_writable(). Finally we release both + // locks, which allows the service thread to resume + // lws_service(). + // + // The suggested approach of using + // LWS_CALLBACK_EVENT_WAIT_CANCELLED together with a queue and + // then calling lws_callback_on_writable() from the service + // thread is not usable as libwebsockets 2.0 doesn't support + // LWS_CALLBACK_EVENT_WAIT_CANCELLED. + + mutex_lock(&websocket_callback_lock); lws_cancel_service(websocket_context); - } - mutex_unlock(&wc->lock); + mutex_lock(&websocket_service_lock); + lws_callback_on_writable(wc->wsi); + + mutex_unlock(&websocket_service_lock); + mutex_unlock(&websocket_callback_lock); + } } @@ -966,8 +998,16 @@ err: static void websocket_loop(void *p) { ilogs(http, LOG_INFO, "Websocket listener thread running"); - while (!rtpe_shutdown) + while (!rtpe_shutdown) { + // see websocket_write_raw() for locking logic + + mutex_lock(&websocket_service_lock); lws_service(websocket_context, 100); + mutex_unlock(&websocket_service_lock); + + mutex_lock(&websocket_callback_lock); + mutex_unlock(&websocket_callback_lock); + } websocket_cleanup(); }