diff --git a/daemon/websocket.c b/daemon/websocket.c index fae26cda2..481dd0b84 100644 --- a/daemon/websocket.c +++ b/daemon/websocket.c @@ -26,6 +26,8 @@ struct websocket_output { TYPED_GHASHTABLE(janus_sessions_ht, struct janus_session, struct janus_session, g_direct_hash, g_direct_equal, NULL, NULL) +TYPED_GQUEUE(websocket_message, struct websocket_message) +TYPED_GQUEUE(websocket_output, struct websocket_output) struct websocket_conn { // used in the single threaded libwebsockets context @@ -37,12 +39,12 @@ struct websocket_conn { // multithreaded message processing mutex_t lock; unsigned int jobs; - GQueue messages; + websocket_message_q messages; cond_t cond; janus_sessions_ht janus_sessions; // output buffer - also protected by lock - GQueue output_q; + websocket_output_q output_q; }; struct websocket_ng_buf { @@ -85,8 +87,7 @@ static struct websocket_output *websocket_output_new(void) { return wo; } -static void websocket_output_free(void *p) { - struct websocket_output *wo = p; +static void websocket_output_free(struct websocket_output *wo) { if (wo->str) g_string_free(wo->str, TRUE); g_slice_free1(sizeof(*wo), wo); @@ -95,7 +96,7 @@ static void websocket_output_free(void *p) { // appends to output buffer without triggering a response - unlocked static void __websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len) { - struct websocket_output *wo = g_queue_peek_tail(&wc->output_q); + struct websocket_output *wo = t_queue_peek_tail(&wc->output_q); if (!wo->str) { wo->str = g_string_new(""); @@ -121,7 +122,7 @@ size_t websocket_queue_len(struct websocket_conn *wc) { LOCK(&wc->lock); size_t ret = 0; - for (GList *l = wc->output_q.head; l; l = l->next) { + for (__auto_type l = wc->output_q.head; l; l = l->next) { struct websocket_output *wo = l->data; ret += (wo->str->len - LWS_PRE); } @@ -136,9 +137,9 @@ void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len, { mutex_lock(&wc->lock); __websocket_queue_raw(wc, msg, len); - struct websocket_output *wo = g_queue_peek_tail(&wc->output_q); + struct websocket_output *wo = t_queue_peek_tail(&wc->output_q); wo->protocol = protocol; - g_queue_push_tail(&wc->output_q, websocket_output_new()); + t_queue_push_tail(&wc->output_q, websocket_output_new()); mutex_unlock(&wc->lock); @@ -196,7 +197,7 @@ void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t l void websocket_write_next(struct websocket_conn *wc) { LOCK(&wc->lock); - g_queue_push_tail(&wc->output_q, websocket_output_new()); + t_queue_push_tail(&wc->output_q, websocket_output_new()); } @@ -217,7 +218,7 @@ static void websocket_message_push(struct websocket_conn *wc, websocket_message_ assert(wm != NULL); wm->func = func; - g_queue_push_tail(&wc->messages, wm); + t_queue_push_tail(&wc->messages, wm); wc->jobs++; g_thread_pool_push(websocket_threads, wc, NULL); @@ -229,7 +230,7 @@ static void websocket_process(void *p, void *up) { struct websocket_conn *wc = p; mutex_lock(&wc->lock); - struct websocket_message *wm = g_queue_pop_head(&wc->messages); + struct websocket_message *wm = t_queue_pop_head(&wc->messages); mutex_unlock(&wc->lock); assert(wm != NULL); @@ -304,7 +305,7 @@ static int websocket_dequeue(struct websocket_conn *wc) { mutex_lock(&wc->lock); struct websocket_output *wo; struct lws *wsi = wc->wsi; - while ((wo = g_queue_pop_head(&wc->output_q))) { + while ((wo = t_queue_pop_head(&wc->output_q))) { // used buffer slot? if (!wo->str) goto next; @@ -342,7 +343,7 @@ static int websocket_dequeue(struct websocket_conn *wc) { next: websocket_output_free(wo); } - g_queue_push_tail(&wc->output_q, websocket_output_new()); + t_queue_push_tail(&wc->output_q, websocket_output_new()); mutex_unlock(&wc->lock); @@ -359,7 +360,7 @@ void websocket_http_response(struct websocket_conn *wc, int status, const char * { LOCK(&wc->lock); - struct websocket_output *wo = g_queue_peek_tail(&wc->output_q); + struct websocket_output *wo = t_queue_peek_tail(&wc->output_q); wo->http_status = status; wo->content_type = content_type; @@ -694,7 +695,7 @@ static void websocket_conn_cleanup(struct websocket_conn *wc) { free(wc->wm->uri); g_slice_free1(sizeof(*wc->wm), wc->wm); wc->wm = NULL; - g_queue_clear_full(&wc->output_q, websocket_output_free); + t_queue_clear_full(&wc->output_q, websocket_output_free); if (wc->uri) free(wc->uri); @@ -741,8 +742,8 @@ static int websocket_conn_init(struct lws *wsi, void *p) { wc->wsi = wsi; mutex_init(&wc->lock); cond_init(&wc->cond); - g_queue_init(&wc->messages); - g_queue_push_tail(&wc->output_q, websocket_output_new()); + t_queue_init(&wc->messages); + t_queue_push_tail(&wc->output_q, websocket_output_new()); wc->wm = websocket_message_new(wc); wc->janus_sessions = janus_sessions_ht_new();