@ -52,6 +52,8 @@ struct websocket_ng_buf {
static GQueue websocket_vhost_configs ;
static GQueue websocket_vhost_configs ;
static struct lws_context * websocket_context ;
static struct lws_context * websocket_context ;
static GThreadPool * websocket_threads ;
static GThreadPool * websocket_threads ;
static mutex_t websocket_callback_lock = MUTEX_STATIC_INIT ;
static mutex_t websocket_service_lock = MUTEX_STATIC_INIT ;
static struct websocket_message * websocket_message_new ( struct websocket_conn * wc ) {
static struct websocket_message * websocket_message_new ( struct websocket_conn * wc ) {
@ -133,12 +135,42 @@ void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
wo - > protocol = protocol ;
wo - > protocol = protocol ;
g_queue_push_tail ( & wc - > output_q , websocket_output_new ( ) ) ;
g_queue_push_tail ( & wc - > output_q , websocket_output_new ( ) ) ;
mutex_unlock ( & wc - > lock ) ;
if ( done ) {
if ( done ) {
lws_callback_on_writable ( wc - > wsi ) ;
/ / Sadly lws_callback_on_writable ( ) doesn ' t do any internal
/ / locking , therefore we must protect it against a concurrently
/ / running lws_service ( ) , as well as against other threads
/ / invoking lws_callback_on_writable ( ) .
/ /
/ / Acquire the callback lock first , which is normally unlocked ,
/ / then wake up the service thread and try to break out of
/ / lws_service ( ) . The service thread holds the service lock
/ / while lws_service ( ) is executing and releases it as soon as
/ / lws_service ( ) is done . We therefore try to acquire the
/ / service lock here , which blocks us until lws_service ( ) is
/ / actually done . At this point the service thread will try to
/ / acquire the callback lock , which is still held by us here ,
/ / and so the service thread will block until we are done
/ / calling lws_callback_on_writable ( ) . Finally we release both
/ / locks , which allows the service thread to resume
/ / lws_service ( ) .
/ /
/ / The suggested approach of using
/ / LWS_CALLBACK_EVENT_WAIT_CANCELLED together with a queue and
/ / then calling lws_callback_on_writable ( ) from the service
/ / thread is not usable as libwebsockets 2.0 doesn ' t support
/ / LWS_CALLBACK_EVENT_WAIT_CANCELLED .
mutex_lock ( & websocket_callback_lock ) ;
lws_cancel_service ( websocket_context ) ;
lws_cancel_service ( websocket_context ) ;
}
mutex_unlock ( & wc - > lock ) ;
mutex_lock ( & websocket_service_lock ) ;
lws_callback_on_writable ( wc - > wsi ) ;
mutex_unlock ( & websocket_service_lock ) ;
mutex_unlock ( & websocket_callback_lock ) ;
}
}
}
@ -966,8 +998,16 @@ err:
static void websocket_loop ( void * p ) {
static void websocket_loop ( void * p ) {
ilogs ( http , LOG_INFO , " Websocket listener thread running " ) ;
ilogs ( http , LOG_INFO , " Websocket listener thread running " ) ;
while ( ! rtpe_shutdown )
while ( ! rtpe_shutdown ) {
/ / see websocket_write_raw ( ) for locking logic
mutex_lock ( & websocket_service_lock ) ;
lws_service ( websocket_context , 100 ) ;
lws_service ( websocket_context , 100 ) ;
mutex_unlock ( & websocket_service_lock ) ;
mutex_lock ( & websocket_callback_lock ) ;
mutex_unlock ( & websocket_callback_lock ) ;
}
websocket_cleanup ( ) ;
websocket_cleanup ( ) ;
}
}