Browse Source

MT#55283 overhaul WS locking logic

Multiple writers may operate on a single WS connection simultaneously.
Make sure they don't get in each other's way while constructing their
messages/responses by holding the lock from the beginning of the
response until the point when it's fully ready. This fixes a problem of
parts of multiple messages getting mixed up with each other.

Change-Id: If84224fc06b423cd65c12981a5b09ee99b121df2
pull/1964/head
Richard Fuchs 6 months ago
parent
commit
1480574001
3 changed files with 126 additions and 79 deletions
  1. +4
    -6
      daemon/janus.c
  2. +114
    -62
      daemon/websocket.c
  3. +8
    -11
      include/websocket.h

+ 4
- 6
daemon/janus.c View File

@ -132,11 +132,9 @@ static void janus_send_json_sync_response(struct websocket_message *wm, JsonBuil
char *result = glib_json_print(builder);
if (wm->method == M_WEBSOCKET)
websocket_write_text(wm->wc, result, true);
else {
websocket_http_response(wm->wc, code, "application/json", strlen(result));
websocket_write_http(wm->wc, result, true);
}
websocket_write_text(wm->wc, result);
else
websocket_http_complete(wm->wc, code, "application/json", strlen(result), result);
g_free(result);
}
@ -154,7 +152,7 @@ static void janus_send_json_async(struct janus_session *session, JsonBuilder *bu
struct websocket_conn *wc;
while (t_hash_table_iter_next(&iter, NULL, &wc)) {
// lock order constraint: janus_session lock first, websocket_conn lock second
websocket_write_text(wc, result, true);
websocket_write_text(wc, result);
}
g_free(result);


+ 114
- 62
daemon/websocket.c View File

@ -96,7 +96,7 @@ static void websocket_output_free(struct websocket_output *wo) {
}
// appends to output buffer without triggering a response - unlocked
// appends to output buffer without triggering a response - non-locking
static void __websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len) {
struct websocket_output *wo = t_queue_peek_tail(&wc->output_q);
@ -111,17 +111,8 @@ static void __websocket_queue_raw(struct websocket_conn *wc, const char *msg, si
}
// appends to output buffer without triggering a response
void websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len) {
LOCK(&wc->lock);
__websocket_queue_raw(wc, msg, len);
}
// num bytes in output buffer
size_t websocket_queue_len(struct websocket_conn *wc) {
LOCK(&wc->lock);
static size_t __websocket_queue_len(struct websocket_conn *wc) {
size_t ret = 0;
for (__auto_type l = wc->output_q.head; l; l = l->next) {
struct websocket_output *wo = l->data;
@ -132,19 +123,30 @@ size_t websocket_queue_len(struct websocket_conn *wc) {
}
// adds data to output buffer (can be null) and optionally triggers specified response
void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
enum lws_write_protocol protocol, bool done)
// adds data to output buffer (can be null) and optionally triggers specified response - non-locking
static void __websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
enum lws_write_protocol protocol)
{
mutex_lock(&wc->lock);
__websocket_queue_raw(wc, msg, len);
struct websocket_output *wo = t_queue_peek_tail(&wc->output_q);
wo->protocol = protocol;
t_queue_push_tail(&wc->output_q, websocket_output_new());
}
mutex_unlock(&wc->lock);
// called after all writes are done - non-locking
// may intermittently release wc->lock
static void __websocket_write_done(struct websocket_conn *wc) {
while (true) {
// If this connection is already closed, don't trigger
// a wakeup of the service thread and don't request a
// writeable callback. We can't write anyway if the
// connection is already closed. Prevents a deadlock
// between acquiring the service lock here and the
// websocket_conn_cleanup called from within the service
// loop.
if (!wc->wsi)
return;
if (done) {
// Sadly lws_callback_on_writable() doesn't do any internal
// locking, therefore we must protect it against a concurrently
// running lws_service(), as well as against other threads
@ -172,40 +174,62 @@ void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
mutex_lock(&websocket_callback_lock);
lws_cancel_service(websocket_context);
mutex_lock(&websocket_service_lock);
if (mutex_trylock(&websocket_service_lock)) {
// Wrong lock order: service_lock should be the outer lock
// and wc->lock should be the inner lock. The connection may
// be in the process of being closed (websocket_conn_cleanup).
// Release wc->lock and try again.
mutex_unlock(&websocket_callback_lock);
mutex_unlock(&wc->lock);
sched_yield();
mutex_lock(&wc->lock);
continue;
}
lws_callback_on_writable(wc->wsi);
mutex_unlock(&websocket_service_lock);
mutex_unlock(&websocket_callback_lock);
break;
}
}
// adds data to output buffer (can be null) and triggers specified response: http or binary websocket
void websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len, bool done) {
websocket_write_raw(wc, msg, len, LWS_WRITE_HTTP, done);
// adds data to output buffer (can be null) and triggers specified response: http or binary websocket - non-locking
void __websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len) {
__websocket_write_raw(wc, msg, len, LWS_WRITE_HTTP);
}
void websocket_write_http(struct websocket_conn *wc, const char *msg, bool done) {
websocket_write_http_len(wc, msg, msg ? strlen(msg) : 0, done);
void __websocket_write_http(struct websocket_conn *wc, const char *msg) {
__websocket_write_http_len(wc, msg, msg ? strlen(msg) : 0);
}
void websocket_write_text(struct websocket_conn *wc, const char *msg, bool done) {
websocket_write_raw(wc, msg, strlen(msg), LWS_WRITE_TEXT, done);
void __websocket_write_text(struct websocket_conn *wc, const char *msg) {
__websocket_write_raw(wc, msg, strlen(msg), LWS_WRITE_TEXT);
}
void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len, bool done) {
websocket_write_raw(wc, msg, len, LWS_WRITE_BINARY, done);
void __websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len) {
__websocket_write_raw(wc, msg, len, LWS_WRITE_BINARY);
}
void websocket_write_next(struct websocket_conn *wc) {
LOCK(&wc->lock);
t_queue_push_tail(&wc->output_q, websocket_output_new());
// singe shot writes with locking
void websocket_write_text(struct websocket_conn *wc, const char *msg) {
{
LOCK(&wc->lock);
__websocket_write_text(wc, msg);
__websocket_write_done(wc);
}
}
static void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len) {
{
LOCK(&wc->lock);
__websocket_write_binary(wc, msg, len);
__websocket_write_done(wc);
}
}
static const char *websocket_echo_process(struct websocket_message *wm) {
ilogs(http, LOG_DEBUG, "Returning %lu bytes websocket echo from %s", (unsigned long) wm->body->len,
endpoint_print_buf(&wm->wc->endpoint));
websocket_write_binary(wm->wc, wm->body->str, wm->body->len, true);
websocket_write_binary(wm->wc, wm->body->str, wm->body->len);
return NULL;
}
@ -306,6 +330,7 @@ static int websocket_dequeue(struct websocket_conn *wc) {
bool is_http = false;
mutex_lock(&wc->lock);
struct websocket_output *wo;
struct lws *wsi = wc->wsi;
while ((wo = t_queue_pop_head(&wc->output_q))) {
@ -360,11 +385,9 @@ next:
return 0;
}
void websocket_http_response(struct websocket_conn *wc, int status, const char *content_type,
void __websocket_http_response(struct websocket_conn *wc, int status, const char *content_type,
ssize_t content_length)
{
LOCK(&wc->lock);
struct websocket_output *wo = t_queue_peek_tail(&wc->output_q);
wo->http_status = status;
@ -374,8 +397,12 @@ void websocket_http_response(struct websocket_conn *wc, int status, const char *
void websocket_http_complete(struct websocket_conn *wc, int status, const char *content_type,
ssize_t content_length, const char *content)
{
websocket_http_response(wc, status, content_type, content_length);
websocket_write_http(wc, content, true);
{
LOCK(&wc->lock);
__websocket_http_response(wc, status, content_type, content_length);
__websocket_write_http(wc, content);
__websocket_write_done(wc);
}
}
@ -426,13 +453,13 @@ static const char *websocket_http_metrics(struct websocket_message *wm) {
// adds printf string to output buffer without triggering response
static size_t websocket_queue_printf(struct cli_writer *cw, const char *fmt, ...) {
static size_t __websocket_queue_printf(struct cli_writer *cw, const char *fmt, ...) {
va_list va;
va_start(va, fmt);
char *s = g_strdup_vprintf(fmt, va);
size_t ret = strlen(s);
va_end(va);
websocket_queue_raw(cw->ptr, s, ret);
__websocket_queue_raw(cw->ptr, s, ret);
g_free(s);
return ret;
}
@ -447,14 +474,19 @@ static const char *websocket_http_cli(struct websocket_message *wm) {
str uri_cmd = STR(uri);
struct cli_writer cw = {
.cw_printf = websocket_queue_printf,
.cw_printf = __websocket_queue_printf,
.ptr = wm->wc,
};
cli_handle(&uri_cmd, &cw);
size_t len = websocket_queue_len(wm->wc);
{
LOCK(&wm->wc->lock);
cli_handle(&uri_cmd, &cw);
size_t len = __websocket_queue_len(wm->wc);
__websocket_http_response(wm->wc, 200, "text/plain", len);
__websocket_write_http(wm->wc, NULL);
__websocket_write_done(wm->wc);
}
websocket_http_complete(wm->wc, 200, "text/plain", len, NULL);
return NULL;
}
@ -463,14 +495,19 @@ static const char *websocket_http_cli_post(struct websocket_message *wm) {
ilogs(http, LOG_DEBUG, "Responding to POST /cli");
struct cli_writer cw = {
.cw_printf = websocket_queue_printf,
.cw_printf = __websocket_queue_printf,
.ptr = wm->wc,
};
cli_handle(&STR_LEN(wm->body->str, wm->body->len), &cw);
size_t len = websocket_queue_len(wm->wc);
{
LOCK(&wm->wc->lock);
cli_handle(&STR_LEN(wm->body->str, wm->body->len), &cw);
size_t len = __websocket_queue_len(wm->wc);
__websocket_http_response(wm->wc, 200, "text/plain", len);
__websocket_write_http(wm->wc, NULL);
__websocket_write_done(wm->wc);
}
websocket_http_complete(wm->wc, 200, "text/plain", len, NULL);
return NULL;
}
@ -481,12 +518,17 @@ static const char *websocket_cli_process(struct websocket_message *wm) {
str uri_cmd = STR_LEN(wm->body->str, wm->body->len);
struct cli_writer cw = {
.cw_printf = websocket_queue_printf,
.cw_printf = __websocket_queue_printf,
.ptr = wm->wc,
};
cli_handle(&uri_cmd, &cw);
websocket_write_binary(wm->wc, NULL, 0, true);
{
LOCK(&wm->wc->lock);
cli_handle(&uri_cmd, &cw);
__websocket_write_binary(wm->wc, NULL, 0);
__websocket_write_done(wm->wc);
}
return NULL;
}
@ -495,25 +537,33 @@ static void websocket_ng_send_ws(str *cookie, str *body, const endpoint_t *sin,
void *p1)
{
struct websocket_conn *wc = p1;
if (cookie) {
websocket_queue_raw(wc, cookie->s, cookie->len);
websocket_queue_raw(wc, " ", 1);
{
LOCK(&wc->lock);
if (cookie) {
__websocket_queue_raw(wc, cookie->s, cookie->len);
__websocket_queue_raw(wc, " ", 1);
}
__websocket_queue_raw(wc, body->s, body->len);
__websocket_write_binary(wc, NULL, 0);
__websocket_write_done(wc);
}
websocket_queue_raw(wc, body->s, body->len);
websocket_write_binary(wc, NULL, 0, true);
}
static void websocket_ng_send_http(str *cookie, str *body, const endpoint_t *sin, const sockaddr_t *from,
void *p1)
{
struct websocket_conn *wc = p1;
websocket_http_response(wc, 200, "application/x-rtpengine-ng",
(cookie ? (cookie->len + 1) : 0) + body->len);
if (cookie) {
websocket_queue_raw(wc, cookie->s, cookie->len);
websocket_queue_raw(wc, " ", 1);
{
LOCK(&wc->lock);
__websocket_http_response(wc, 200, "application/x-rtpengine-ng",
(cookie ? (cookie->len + 1) : 0) + body->len);
if (cookie) {
__websocket_queue_raw(wc, cookie->s, cookie->len);
__websocket_queue_raw(wc, " ", 1);
}
__websocket_queue_raw(wc, body->s, body->len);
__websocket_write_http(wc, NULL);
__websocket_write_done(wc);
}
websocket_queue_raw(wc, body->s, body->len);
websocket_write_http(wc, NULL, true);
}
static void __ng_buf_free(struct websocket_ng_buf *buf) {
@ -715,6 +765,8 @@ static void websocket_conn_cleanup(struct websocket_conn *wc) {
// wait until all remaining tasks are finished
mutex_lock(&wc->lock);
wc->wsi = NULL;
while (wc->jobs)
cond_wait(&wc->cond, &wc->lock);


+ 8
- 11
include/websocket.h View File

@ -39,18 +39,15 @@ int websocket_init(void);
void websocket_start(void);
void websocket_stop(void);
// appends to output buffer without triggering a response
void websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len);
// adds data to output buffer (can be null) and optionally triggers specified response
void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
enum lws_write_protocol protocol, bool done);
// adds data to output buffer (can be null) and triggers specified response: http or binary websocket
void websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len, bool done);
void websocket_write_http(struct websocket_conn *wc, const char *msg, bool done);
void websocket_write_text(struct websocket_conn *wc, const char *msg, bool done);
void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len, bool done);
// num bytes in output buffer
size_t websocket_queue_len(struct websocket_conn *wc);
//void websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len);
//void websocket_write_http(struct websocket_conn *wc, const char *msg);
void websocket_write_text(struct websocket_conn *wc, const char *msg);
//void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len);
// single shot HTTP response
void websocket_http_complete(struct websocket_conn *wc, int status, const char *content_type,
ssize_t content_length, const char *content);
// write HTTP response headers
void websocket_http_response(struct websocket_conn *wc, int status, const char *content_type,


Loading…
Cancel
Save